use std::collections::HashMap;
use super::stats::calculate_session_stats;
use super::turn_builder::TurnBuilder;
use super::types::*;
use agtrace_types::{
AgentEvent, EventPayload, SpawnContext, StreamId, SystemGeneratedReason, TurnOrigin,
UserPayload,
};
pub fn assemble_sessions(events: &[AgentEvent]) -> Vec<AgentSession> {
if events.is_empty() {
return Vec::new();
}
let mut streams: HashMap<StreamId, Vec<AgentEvent>> = HashMap::new();
for event in events {
streams
.entry(event.stream_id.clone())
.or_default()
.push(event.clone());
}
let main_events = streams.get(&StreamId::Main).cloned().unwrap_or_default();
let spawn_map = build_spawn_context_map(&main_events);
let mut sessions: Vec<AgentSession> = streams
.into_iter()
.filter_map(|(stream_id, stream_events)| {
let spawned_by = match &stream_id {
StreamId::Sidechain { agent_id } => spawn_map.get(agent_id).cloned(),
_ => None,
};
assemble_session_for_stream(&stream_events, stream_id, spawned_by)
})
.collect();
sessions.sort_by(|a, b| {
use std::cmp::Ordering;
match (&a.stream_id, &b.stream_id) {
(StreamId::Main, StreamId::Main) => Ordering::Equal,
(StreamId::Main, _) => Ordering::Less,
(_, StreamId::Main) => Ordering::Greater,
_ => a.start_time.cmp(&b.start_time),
}
});
sessions
}
fn build_spawn_context_map(events: &[AgentEvent]) -> HashMap<String, SpawnContext> {
let mut spawn_map = HashMap::new();
let turns = build_turns(events);
for (turn_idx, turn) in turns.iter().enumerate() {
for (step_idx, step) in turn.steps.iter().enumerate() {
for tool in &step.tools {
if let Some(ref result) = tool.result
&& let Some(ref agent_id) = result.content.agent_id
{
spawn_map.insert(
agent_id.clone(),
SpawnContext {
turn_index: turn_idx,
step_index: step_idx,
},
);
}
}
}
}
spawn_map
}
pub fn assemble_session(events: &[AgentEvent]) -> Option<AgentSession> {
if events.is_empty() {
return None;
}
let main_events: Vec<_> = events
.iter()
.filter(|e| matches!(e.stream_id, StreamId::Main))
.cloned()
.collect();
if main_events.is_empty() {
return None;
}
assemble_session_for_stream(&main_events, StreamId::Main, None)
}
fn assemble_session_for_stream(
events: &[AgentEvent],
stream_id: StreamId,
spawned_by: Option<SpawnContext>,
) -> Option<AgentSession> {
if events.is_empty() {
return None;
}
let session_id = events.first()?.session_id;
let start_time = events.first()?.timestamp;
let end_time = events.last().map(|e| e.timestamp);
let turns = build_turns(events);
let stats = calculate_session_stats(&turns, start_time, end_time);
Some(AgentSession {
session_id,
stream_id,
spawned_by,
start_time,
end_time,
turns,
stats,
})
}
fn detect_turn_origin(content: &UserPayload) -> TurnOrigin {
if content.text.starts_with("This session is being continued") {
return TurnOrigin::SystemGenerated {
reason: SystemGeneratedReason::ContextCompaction,
};
}
TurnOrigin::User
}
fn build_turns(events: &[AgentEvent]) -> Vec<AgentTurn> {
let mut turns = Vec::new();
let mut current_turn: Option<TurnBuilder> = None;
for event in events {
match &event.payload {
EventPayload::User(user) => {
if user.text.starts_with("[Request interrupted") {
if let Some(builder) = current_turn.take()
&& let Some(turn) = builder.build()
{
turns.push(turn);
}
continue;
}
if let Some(ref mut builder) = current_turn
&& builder.is_slash_command_pending()
{
builder.set_expanded_content(user.clone());
continue;
}
if let Some(builder) = current_turn.take()
&& let Some(turn) = builder.build()
{
turns.push(turn);
}
current_turn = Some(TurnBuilder::new(
event.id,
event.timestamp,
UserMessage {
event_id: event.id,
content: user.clone(),
slash_command: None,
origin: detect_turn_origin(user),
},
));
}
EventPayload::SlashCommand(cmd) => {
if let Some(builder) = current_turn.take()
&& let Some(turn) = builder.build()
{
turns.push(turn);
}
current_turn = Some(TurnBuilder::new_slash_command(
event.id,
event.timestamp,
cmd.clone(),
));
}
_ => {
if let Some(ref mut builder) = current_turn {
builder.add_event(event);
}
}
}
}
if let Some(builder) = current_turn
&& let Some(turn) = builder.build()
{
turns.push(turn);
}
turns
}