agtrace_engine/session/
assembler.rs1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{
7 AgentEvent, EventPayload, SpawnContext, StreamId, SystemGeneratedReason, TurnOrigin,
8 UserPayload,
9};
10
11pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
17 if events.is_empty() {
18 return Vec::new();
19 }
20
21 let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
23 for event in events {
24 streams
25 .entry(event.stream_id.clone())
26 .or_default()
27 .push(event.clone());
28 }
29
30 let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
32 let spawn_map = build_spawn_context_map(&main_events);
33
34 let mut sessions: Vec<AgentSession> = streams
36 .into_iter()
37 .filter_map(|(stream_id, stream_events)| {
38 let spawned_by = match &stream_id {
39 StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
40 _ => None,
41 };
42 assemble_session_for_stream(&stream_events, stream_id, spawned_by)
43 })
44 .collect();
45
46 sessions.sort_by(|a, b| {
48 use std::cmp::Ordering;
49 match (&a.stream_id, &b.stream_id) {
50 (StreamId::Main, StreamId::Main) => Ordering::Equal,
51 (StreamId::Main, _) => Ordering::Less,
52 (_, StreamId::Main) => Ordering::Greater,
53 _ => a.start_time.cmp(&b.start_time),
54 }
55 });
56
57 sessions
58}
59
60fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
62 let mut spawn_map = HashMap::new();
63
64 let turns = build_turns(events);
66
67 for (turn_idx, turn) in turns.iter().enumerate() {
68 for (step_idx, step) in turn.steps.iter().enumerate() {
69 for tool in &step.tools {
70 if let Some(ref result) = tool.result
71 && let Some(ref agent_id) = result.content.agent_id
72 {
73 spawn_map.insert(
74 agent_id.clone(),
75 SpawnContext {
76 turn_index: turn_idx,
77 step_index: step_idx,
78 },
79 );
80 }
81 }
82 }
83 }
84
85 spawn_map
86}
87
88pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
93 if events.is_empty() {
94 return None;
95 }
96
97 let main_events: Vec<_> = events
99 .iter()
100 .filter(|e| matches!(e.stream_id, StreamId::Main))
101 .cloned()
102 .collect();
103
104 if main_events.is_empty() {
105 return None;
106 }
107
108 assemble_session_for_stream(&main_events, StreamId::Main, None)
109}
110
111fn assemble_session_for_stream(
113 events: &[AgentEvent],
114 stream_id: StreamId,
115 spawned_by: Option<SpawnContext>,
116) -> Option<AgentSession> {
117 if events.is_empty() {
118 return None;
119 }
120
121 let session_id = events.first()?.session_id;
122 let start_time = events.first()?.timestamp;
123 let end_time = events.last().map(|e| e.timestamp);
124
125 let turns = build_turns(events);
126 let stats = calculate_session_stats(&turns, start_time, end_time);
127
128 Some(AgentSession {
129 session_id,
130 stream_id,
131 spawned_by,
132 start_time,
133 end_time,
134 turns,
135 stats,
136 })
137}
138
139fn detect_turn_origin(content: &UserPayload) -> TurnOrigin {
144 if content.text.starts_with("This session is being continued") {
146 return TurnOrigin::SystemGenerated {
147 reason: SystemGeneratedReason::ContextCompaction,
148 };
149 }
150
151 TurnOrigin::User
152}
153
154fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
155 let mut turns = Vec::new();
156 let mut current_turn: Option<TurnBuilder> = None;
157
158 for event in events {
159 match &event.payload {
160 EventPayload::User(user) => {
161 if user.text.starts_with("[Request interrupted") {
164 if let Some(builder) = current_turn.take()
165 && let Some(turn) = builder.build()
166 {
167 turns.push(turn);
168 }
169 continue;
171 }
172
173 if let Some(ref mut builder) = current_turn
176 && builder.is_slash_command_pending()
177 {
178 builder.set_expanded_content(user.clone());
179 continue;
180 }
181
182 if let Some(builder) = current_turn.take()
183 && let Some(turn) = builder.build()
184 {
185 turns.push(turn);
186 }
187
188 current_turn = Some(TurnBuilder::new(
189 event.id,
190 event.timestamp,
191 UserMessage {
192 event_id: event.id,
193 content: user.clone(),
194 slash_command: None,
195 origin: detect_turn_origin(user),
196 },
197 ));
198 }
199 EventPayload::SlashCommand(cmd) => {
200 if let Some(builder) = current_turn.take()
201 && let Some(turn) = builder.build()
202 {
203 turns.push(turn);
204 }
205
206 current_turn = Some(TurnBuilder::new_slash_command(
207 event.id,
208 event.timestamp,
209 cmd.clone(),
210 ));
211 }
212 _ => {
213 if let Some(ref mut builder) = current_turn {
214 builder.add_event(event);
215 }
216 }
217 }
218 }
219
220 if let Some(builder) = current_turn
221 && let Some(turn) = builder.build()
222 {
223 turns.push(turn);
224 }
225
226 turns
227}