agtrace_engine/session/
assembler.rs1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{AgentEvent, EventPayload, SpawnContext, StreamId};
7
8pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
14 if events.is_empty() {
15 return Vec::new();
16 }
17
18 let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
20 for event in events {
21 streams
22 .entry(event.stream_id.clone())
23 .or_default()
24 .push(event.clone());
25 }
26
27 let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
29 let spawn_map = build_spawn_context_map(&main_events);
30
31 streams
33 .into_iter()
34 .filter_map(|(stream_id, stream_events)| {
35 let spawned_by = match &stream_id {
36 StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
37 _ => None,
38 };
39 assemble_session_for_stream(&stream_events, stream_id, spawned_by)
40 })
41 .collect()
42}
43
44fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
46 let mut spawn_map = HashMap::new();
47
48 let turns = build_turns(events);
50
51 for (turn_idx, turn) in turns.iter().enumerate() {
52 for (step_idx, step) in turn.steps.iter().enumerate() {
53 for tool in &step.tools {
54 if let Some(ref result) = tool.result
55 && let Some(ref agent_id) = result.content.agent_id
56 {
57 spawn_map.insert(
58 agent_id.clone(),
59 SpawnContext {
60 turn_index: turn_idx,
61 step_index: step_idx,
62 },
63 );
64 }
65 }
66 }
67 }
68
69 spawn_map
70}
71
72pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
77 if events.is_empty() {
78 return None;
79 }
80
81 let main_events: Vec<_> = events
83 .iter()
84 .filter(|e| matches!(e.stream_id, StreamId::Main))
85 .cloned()
86 .collect();
87
88 if main_events.is_empty() {
89 return None;
90 }
91
92 assemble_session_for_stream(&main_events, StreamId::Main, None)
93}
94
95fn assemble_session_for_stream(
97 events: &[AgentEvent],
98 stream_id: StreamId,
99 spawned_by: Option<SpawnContext>,
100) -> Option<AgentSession> {
101 if events.is_empty() {
102 return None;
103 }
104
105 let session_id = events.first()?.session_id;
106 let start_time = events.first()?.timestamp;
107 let end_time = events.last().map(|e| e.timestamp);
108
109 let turns = build_turns(events);
110 let stats = calculate_session_stats(&turns, start_time, end_time);
111
112 Some(AgentSession {
113 session_id,
114 stream_id,
115 spawned_by,
116 start_time,
117 end_time,
118 turns,
119 stats,
120 })
121}
122
123fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
124 let mut turns = Vec::new();
125 let mut current_turn: Option<TurnBuilder> = None;
126
127 for event in events {
128 match &event.payload {
129 EventPayload::User(user) => {
130 if let Some(builder) = current_turn.take()
131 && let Some(turn) = builder.build()
132 {
133 turns.push(turn);
134 }
135
136 current_turn = Some(TurnBuilder::new(
137 event.id,
138 event.timestamp,
139 UserMessage {
140 event_id: event.id,
141 content: user.clone(),
142 },
143 ));
144 }
145 _ => {
146 if let Some(ref mut builder) = current_turn {
147 builder.add_event(event);
148 }
149 }
150 }
151 }
152
153 if let Some(builder) = current_turn
154 && let Some(turn) = builder.build()
155 {
156 turns.push(turn);
157 }
158
159 turns
160}