agtrace_engine/session/
assembler.rs

1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{AgentEvent, EventPayload, SpawnContext, StreamId};
7
8/// Assemble all streams from events into separate sessions.
9///
10/// Returns a Vec of AgentSession, one per distinct StreamId found in the events.
11/// Each session contains only events from its respective stream.
12/// For sidechain sessions, attempts to link back to the parent turn/step via spawned_by.
13pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
14    if events.is_empty() {
15        return Vec::new();
16    }
17
18    // Group events by stream_id
19    let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
20    for event in events {
21        streams
22            .entry(event.stream_id.clone())
23            .or_default()
24            .push(event.clone());
25    }
26
27    // First, assemble the main session to build spawn context map
28    let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
29    let spawn_map = build_spawn_context_map(&main_events);
30
31    // Assemble each stream into a session
32    let mut sessions: Vec<AgentSession> = streams
33        .into_iter()
34        .filter_map(|(stream_id, stream_events)| {
35            let spawned_by = match &stream_id {
36                StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
37                _ => None,
38            };
39            assemble_session_for_stream(&stream_events, stream_id, spawned_by)
40        })
41        .collect();
42
43    // Sort sessions: Main first, then sidechains/subagents by start_time
44    sessions.sort_by(|a, b| {
45        use std::cmp::Ordering;
46        match (&a.stream_id, &b.stream_id) {
47            (StreamId::Main, StreamId::Main) => Ordering::Equal,
48            (StreamId::Main, _) => Ordering::Less,
49            (_, StreamId::Main) => Ordering::Greater,
50            _ => a.start_time.cmp(&b.start_time),
51        }
52    });
53
54    sessions
55}
56
57/// Build a map from agent_id to SpawnContext by scanning ToolResult events with agent_id.
58fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
59    let mut spawn_map = HashMap::new();
60
61    // Build turns first to get proper indices
62    let turns = build_turns(events);
63
64    for (turn_idx, turn) in turns.iter().enumerate() {
65        for (step_idx, step) in turn.steps.iter().enumerate() {
66            for tool in &step.tools {
67                if let Some(ref result) = tool.result
68                    && let Some(ref agent_id) = result.content.agent_id
69                {
70                    spawn_map.insert(
71                        agent_id.clone(),
72                        SpawnContext {
73                            turn_index: turn_idx,
74                            step_index: step_idx,
75                        },
76                    );
77                }
78            }
79        }
80    }
81
82    spawn_map
83}
84
85/// Assemble the main stream from events into a session.
86///
87/// This is the backward-compatible function that filters to Main stream only.
88/// For multi-stream support, use `assemble_sessions()` instead.
89pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
90    if events.is_empty() {
91        return None;
92    }
93
94    // Filter to main stream events only
95    let main_events: Vec<_> = events
96        .iter()
97        .filter(|e| matches!(e.stream_id, StreamId::Main))
98        .cloned()
99        .collect();
100
101    if main_events.is_empty() {
102        return None;
103    }
104
105    assemble_session_for_stream(&main_events, StreamId::Main, None)
106}
107
108/// Internal: Assemble a session from events belonging to a single stream.
109fn assemble_session_for_stream(
110    events: &[AgentEvent],
111    stream_id: StreamId,
112    spawned_by: Option<SpawnContext>,
113) -> Option<AgentSession> {
114    if events.is_empty() {
115        return None;
116    }
117
118    let session_id = events.first()?.session_id;
119    let start_time = events.first()?.timestamp;
120    let end_time = events.last().map(|e| e.timestamp);
121
122    let turns = build_turns(events);
123    let stats = calculate_session_stats(&turns, start_time, end_time);
124
125    Some(AgentSession {
126        session_id,
127        stream_id,
128        spawned_by,
129        start_time,
130        end_time,
131        turns,
132        stats,
133    })
134}
135
136fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
137    let mut turns = Vec::new();
138    let mut current_turn: Option<TurnBuilder> = None;
139
140    for event in events {
141        match &event.payload {
142            EventPayload::User(user) => {
143                if let Some(builder) = current_turn.take()
144                    && let Some(turn) = builder.build()
145                {
146                    turns.push(turn);
147                }
148
149                current_turn = Some(TurnBuilder::new(
150                    event.id,
151                    event.timestamp,
152                    UserMessage {
153                        event_id: event.id,
154                        content: user.clone(),
155                    },
156                ));
157            }
158            _ => {
159                if let Some(ref mut builder) = current_turn {
160                    builder.add_event(event);
161                }
162            }
163        }
164    }
165
166    if let Some(builder) = current_turn
167        && let Some(turn) = builder.build()
168    {
169        turns.push(turn);
170    }
171
172    turns
173}