agtrace_engine/session/
assembler.rs

1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{
7    AgentEvent, EventPayload, SpawnContext, StreamId, SystemGeneratedReason, TurnOrigin,
8    UserPayload,
9};
10
11/// Assemble all streams from events into separate sessions.
12///
13/// Returns a Vec of AgentSession, one per distinct StreamId found in the events.
14/// Each session contains only events from its respective stream.
15/// For sidechain sessions, attempts to link back to the parent turn/step via spawned_by.
16pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
17    if events.is_empty() {
18        return Vec::new();
19    }
20
21    // Group events by stream_id
22    let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
23    for event in events {
24        streams
25            .entry(event.stream_id.clone())
26            .or_default()
27            .push(event.clone());
28    }
29
30    // First, assemble the main session to build spawn context map
31    let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
32    let spawn_map = build_spawn_context_map(&main_events);
33
34    // Assemble each stream into a session
35    let mut sessions: Vec<AgentSession> = streams
36        .into_iter()
37        .filter_map(|(stream_id, stream_events)| {
38            let spawned_by = match &stream_id {
39                StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
40                _ => None,
41            };
42            assemble_session_for_stream(&stream_events, stream_id, spawned_by)
43        })
44        .collect();
45
46    // Sort sessions: Main first, then sidechains/subagents by start_time
47    sessions.sort_by(|a, b| {
48        use std::cmp::Ordering;
49        match (&a.stream_id, &b.stream_id) {
50            (StreamId::Main, StreamId::Main) => Ordering::Equal,
51            (StreamId::Main, _) => Ordering::Less,
52            (_, StreamId::Main) => Ordering::Greater,
53            _ => a.start_time.cmp(&b.start_time),
54        }
55    });
56
57    sessions
58}
59
60/// Build a map from agent_id to SpawnContext by scanning ToolResult events with agent_id.
61fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
62    let mut spawn_map = HashMap::new();
63
64    // Build turns first to get proper indices
65    let turns = build_turns(events);
66
67    for (turn_idx, turn) in turns.iter().enumerate() {
68        for (step_idx, step) in turn.steps.iter().enumerate() {
69            for tool in &step.tools {
70                if let Some(ref result) = tool.result
71                    && let Some(ref agent_id) = result.content.agent_id
72                {
73                    spawn_map.insert(
74                        agent_id.clone(),
75                        SpawnContext {
76                            turn_index: turn_idx,
77                            step_index: step_idx,
78                        },
79                    );
80                }
81            }
82        }
83    }
84
85    spawn_map
86}
87
88/// Assemble the main stream from events into a session.
89///
90/// This is the backward-compatible function that filters to Main stream only.
91/// For multi-stream support, use `assemble_sessions()` instead.
92pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
93    if events.is_empty() {
94        return None;
95    }
96
97    // Filter to main stream events only
98    let main_events: Vec<_> = events
99        .iter()
100        .filter(|e| matches!(e.stream_id, StreamId::Main))
101        .cloned()
102        .collect();
103
104    if main_events.is_empty() {
105        return None;
106    }
107
108    assemble_session_for_stream(&main_events, StreamId::Main, None)
109}
110
111/// Internal: Assemble a session from events belonging to a single stream.
112fn assemble_session_for_stream(
113    events: &[AgentEvent],
114    stream_id: StreamId,
115    spawned_by: Option<SpawnContext>,
116) -> Option<AgentSession> {
117    if events.is_empty() {
118        return None;
119    }
120
121    let session_id = events.first()?.session_id;
122    let start_time = events.first()?.timestamp;
123    let end_time = events.last().map(|e| e.timestamp);
124
125    let turns = build_turns(events);
126    let stats = calculate_session_stats(&turns, start_time, end_time);
127
128    Some(AgentSession {
129        session_id,
130        stream_id,
131        spawned_by,
132        start_time,
133        end_time,
134        turns,
135        stats,
136    })
137}
138
139/// Detect the origin of a user message.
140///
141/// Identifies system-generated messages (like context compaction continuation)
142/// vs user-typed messages.
143fn detect_turn_origin(content: &UserPayload) -> TurnOrigin {
144    // Context compaction pattern from Claude Code
145    if content.text.starts_with("This session is being continued") {
146        return TurnOrigin::SystemGenerated {
147            reason: SystemGeneratedReason::ContextCompaction,
148        };
149    }
150
151    TurnOrigin::User
152}
153
154fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
155    let mut turns = Vec::new();
156    let mut current_turn: Option<TurnBuilder> = None;
157
158    for event in events {
159        match &event.payload {
160            EventPayload::User(user) => {
161                // "[Request interrupted by user]" is a termination marker, not a new turn.
162                // It indicates the previous turn was interrupted - just finalize that turn.
163                if user.text.starts_with("[Request interrupted") {
164                    if let Some(builder) = current_turn.take()
165                        && let Some(turn) = builder.build()
166                    {
167                        turns.push(turn);
168                    }
169                    // Don't start a new turn - wait for actual user input
170                    continue;
171                }
172
173                // If current turn was started by SlashCommand and has no steps yet,
174                // merge this User content into it (slash command expansion)
175                if let Some(ref mut builder) = current_turn
176                    && builder.is_slash_command_pending()
177                {
178                    builder.set_expanded_content(user.clone());
179                    continue;
180                }
181
182                if let Some(builder) = current_turn.take()
183                    && let Some(turn) = builder.build()
184                {
185                    turns.push(turn);
186                }
187
188                current_turn = Some(TurnBuilder::new(
189                    event.id,
190                    event.timestamp,
191                    UserMessage {
192                        event_id: event.id,
193                        content: user.clone(),
194                        slash_command: None,
195                        origin: detect_turn_origin(user),
196                    },
197                ));
198            }
199            EventPayload::SlashCommand(cmd) => {
200                if let Some(builder) = current_turn.take()
201                    && let Some(turn) = builder.build()
202                {
203                    turns.push(turn);
204                }
205
206                current_turn = Some(TurnBuilder::new_slash_command(
207                    event.id,
208                    event.timestamp,
209                    cmd.clone(),
210                ));
211            }
212            _ => {
213                if let Some(ref mut builder) = current_turn {
214                    builder.add_event(event);
215                }
216            }
217        }
218    }
219
220    if let Some(builder) = current_turn
221        && let Some(turn) = builder.build()
222    {
223        turns.push(turn);
224    }
225
226    turns
227}