agtrace_engine/session/
assembler.rs1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{AgentEvent, EventPayload, SpawnContext, StreamId};
7
8pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
14 if events.is_empty() {
15 return Vec::new();
16 }
17
18 let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
20 for event in events {
21 streams
22 .entry(event.stream_id.clone())
23 .or_default()
24 .push(event.clone());
25 }
26
27 let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
29 let spawn_map = build_spawn_context_map(&main_events);
30
31 let mut sessions: Vec<AgentSession> = streams
33 .into_iter()
34 .filter_map(|(stream_id, stream_events)| {
35 let spawned_by = match &stream_id {
36 StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
37 _ => None,
38 };
39 assemble_session_for_stream(&stream_events, stream_id, spawned_by)
40 })
41 .collect();
42
43 sessions.sort_by(|a, b| {
45 use std::cmp::Ordering;
46 match (&a.stream_id, &b.stream_id) {
47 (StreamId::Main, StreamId::Main) => Ordering::Equal,
48 (StreamId::Main, _) => Ordering::Less,
49 (_, StreamId::Main) => Ordering::Greater,
50 _ => a.start_time.cmp(&b.start_time),
51 }
52 });
53
54 sessions
55}
56
57fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
59 let mut spawn_map = HashMap::new();
60
61 let turns = build_turns(events);
63
64 for (turn_idx, turn) in turns.iter().enumerate() {
65 for (step_idx, step) in turn.steps.iter().enumerate() {
66 for tool in &step.tools {
67 if let Some(ref result) = tool.result
68 && let Some(ref agent_id) = result.content.agent_id
69 {
70 spawn_map.insert(
71 agent_id.clone(),
72 SpawnContext {
73 turn_index: turn_idx,
74 step_index: step_idx,
75 },
76 );
77 }
78 }
79 }
80 }
81
82 spawn_map
83}
84
85pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
90 if events.is_empty() {
91 return None;
92 }
93
94 let main_events: Vec<_> = events
96 .iter()
97 .filter(|e| matches!(e.stream_id, StreamId::Main))
98 .cloned()
99 .collect();
100
101 if main_events.is_empty() {
102 return None;
103 }
104
105 assemble_session_for_stream(&main_events, StreamId::Main, None)
106}
107
108fn assemble_session_for_stream(
110 events: &[AgentEvent],
111 stream_id: StreamId,
112 spawned_by: Option<SpawnContext>,
113) -> Option<AgentSession> {
114 if events.is_empty() {
115 return None;
116 }
117
118 let session_id = events.first()?.session_id;
119 let start_time = events.first()?.timestamp;
120 let end_time = events.last().map(|e| e.timestamp);
121
122 let turns = build_turns(events);
123 let stats = calculate_session_stats(&turns, start_time, end_time);
124
125 Some(AgentSession {
126 session_id,
127 stream_id,
128 spawned_by,
129 start_time,
130 end_time,
131 turns,
132 stats,
133 })
134}
135
136fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
137 let mut turns = Vec::new();
138 let mut current_turn: Option<TurnBuilder> = None;
139
140 for event in events {
141 match &event.payload {
142 EventPayload::User(user) => {
143 if let Some(builder) = current_turn.take()
144 && let Some(turn) = builder.build()
145 {
146 turns.push(turn);
147 }
148
149 current_turn = Some(TurnBuilder::new(
150 event.id,
151 event.timestamp,
152 UserMessage {
153 event_id: event.id,
154 content: user.clone(),
155 },
156 ));
157 }
158 _ => {
159 if let Some(ref mut builder) = current_turn {
160 builder.add_event(event);
161 }
162 }
163 }
164 }
165
166 if let Some(builder) = current_turn
167 && let Some(turn) = builder.build()
168 {
169 turns.push(turn);
170 }
171
172 turns
173}