agtrace_engine/session/
assembler.rs

1use std::collections::HashMap;
2
3use super::stats::calculate_session_stats;
4use super::turn_builder::TurnBuilder;
5use super::types::*;
6use agtrace_types::{AgentEvent, EventPayload, SpawnContext, StreamId};
7
8/// Assemble all streams from events into separate sessions.
9///
10/// Returns a Vec of AgentSession, one per distinct StreamId found in the events.
11/// Each session contains only events from its respective stream.
12/// For sidechain sessions, attempts to link back to the parent turn/step via spawned_by.
13pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
14    if events.is_empty() {
15        return Vec::new();
16    }
17
18    // Group events by stream_id
19    let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
20    for event in events {
21        streams
22            .entry(event.stream_id.clone())
23            .or_default()
24            .push(event.clone());
25    }
26
27    // First, assemble the main session to build spawn context map
28    let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
29    let spawn_map = build_spawn_context_map(&main_events);
30
31    // Assemble each stream into a session
32    streams
33        .into_iter()
34        .filter_map(|(stream_id, stream_events)| {
35            let spawned_by = match &stream_id {
36                StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
37                _ => None,
38            };
39            assemble_session_for_stream(&stream_events, stream_id, spawned_by)
40        })
41        .collect()
42}
43
44/// Build a map from agent_id to SpawnContext by scanning ToolResult events with agent_id.
45fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
46    let mut spawn_map = HashMap::new();
47
48    // Build turns first to get proper indices
49    let turns = build_turns(events);
50
51    for (turn_idx, turn) in turns.iter().enumerate() {
52        for (step_idx, step) in turn.steps.iter().enumerate() {
53            for tool in &step.tools {
54                if let Some(ref result) = tool.result
55                    && let Some(ref agent_id) = result.content.agent_id
56                {
57                    spawn_map.insert(
58                        agent_id.clone(),
59                        SpawnContext {
60                            turn_index: turn_idx,
61                            step_index: step_idx,
62                        },
63                    );
64                }
65            }
66        }
67    }
68
69    spawn_map
70}
71
72/// Assemble the main stream from events into a session.
73///
74/// This is the backward-compatible function that filters to Main stream only.
75/// For multi-stream support, use `assemble_sessions()` instead.
76pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
77    if events.is_empty() {
78        return None;
79    }
80
81    // Filter to main stream events only
82    let main_events: Vec<_> = events
83        .iter()
84        .filter(|e| matches!(e.stream_id, StreamId::Main))
85        .cloned()
86        .collect();
87
88    if main_events.is_empty() {
89        return None;
90    }
91
92    assemble_session_for_stream(&main_events, StreamId::Main, None)
93}
94
95/// Internal: Assemble a session from events belonging to a single stream.
96fn assemble_session_for_stream(
97    events: &[AgentEvent],
98    stream_id: StreamId,
99    spawned_by: Option<SpawnContext>,
100) -> Option<AgentSession> {
101    if events.is_empty() {
102        return None;
103    }
104
105    let session_id = events.first()?.session_id;
106    let start_time = events.first()?.timestamp;
107    let end_time = events.last().map(|e| e.timestamp);
108
109    let turns = build_turns(events);
110    let stats = calculate_session_stats(&turns, start_time, end_time);
111
112    Some(AgentSession {
113        session_id,
114        stream_id,
115        spawned_by,
116        start_time,
117        end_time,
118        turns,
119        stats,
120    })
121}
122
123fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
124    let mut turns = Vec::new();
125    let mut current_turn: Option<TurnBuilder> = None;
126
127    for event in events {
128        match &event.payload {
129            EventPayload::User(user) => {
130                if let Some(builder) = current_turn.take()
131                    && let Some(turn) = builder.build()
132                {
133                    turns.push(turn);
134                }
135
136                current_turn = Some(TurnBuilder::new(
137                    event.id,
138                    event.timestamp,
139                    UserMessage {
140                        event_id: event.id,
141                        content: user.clone(),
142                    },
143                ));
144            }
145            _ => {
146                if let Some(ref mut builder) = current_turn {
147                    builder.add_event(event);
148                }
149            }
150        }
151    }
152
153    if let Some(builder) = current_turn
154        && let Some(turn) = builder.build()
155    {
156        turns.push(turn);
157    }
158
159    turns
160}