use super::helpers::*;
use super::model::*;
use crate::types::*;
use std::collections::HashMap;
use std::sync::Arc;
pub type BeforeTaskFn = Arc<dyn Fn(&Session) -> bool + Send + Sync>;
pub type AfterTaskFn = Arc<dyn Fn(&Session) + Send + Sync>;
#[derive(Clone, Default)]
pub struct SessionRecorderConfig {
pub include_streaming_events: bool,
pub before_task: Option<BeforeTaskFn>,
pub after_task: Option<AfterTaskFn>,
}
impl std::fmt::Debug for SessionRecorderConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionRecorderConfig")
.field("include_streaming_events", &self.include_streaming_events)
.field("before_task", &self.before_task.as_ref().map(|_| "..."))
.field("after_task", &self.after_task.as_ref().map(|_| "..."))
.finish()
}
}
struct PartialParallelGroup {
all_loop_ids: Vec<String>,
}
struct PartialTurn {
turn_id: TurnId,
triggered_by: TurnTrigger,
started_at: chrono::DateTime<chrono::Utc>,
input_messages: Vec<AgentMessage>,
}
pub struct SessionRecorder {
config: SessionRecorderConfig,
completed: Vec<Session>,
open_sessions: HashMap<String, Session>,
open_loops: HashMap<String, OpenLoop>,
partial_groups: HashMap<String, PartialParallelGroup>,
partial_turns: HashMap<String, PartialTurn>,
}
impl SessionRecorder {
pub fn new(config: SessionRecorderConfig) -> Self {
SessionRecorder {
config,
completed: Vec::new(),
open_sessions: HashMap::new(),
open_loops: HashMap::new(),
partial_groups: HashMap::new(),
partial_turns: HashMap::new(),
}
}
pub fn on_event(&mut self, event: AgentEvent) {
match &event {
AgentEvent::ParallelLoopStart { loop_ids, .. } => {
for lid in loop_ids {
let group_key = lid.clone();
self.partial_groups
.entry(group_key)
.or_insert_with(|| PartialParallelGroup {
all_loop_ids: loop_ids.clone(),
});
}
}
AgentEvent::AgentStart {
agent_id,
session_id,
loop_id,
parent_loop_id,
continuation_kind,
timestamp,
metadata,
config_snapshot,
} => {
let now = *timestamp;
let is_new_session = !self.open_sessions.contains_key(session_id);
let session = self
.open_sessions
.entry(session_id.clone())
.or_insert_with(|| Session {
session_id: session_id.clone(),
agent_id: agent_id.clone(),
created_at: now,
last_active_at: now,
formation: SessionFormation::FirstLoop { timestamp: now },
parent_spawn_ref: None,
scope: SessionScope::Ephemeral,
loops: Vec::new(),
});
session.last_active_at = now;
if is_new_session {
if let Some(ref hook) = self.config.before_task {
hook(session);
}
}
if let Some(ref plid) = parent_loop_id {
let parent_session_id = session_id_from_loop_id(plid);
if parent_session_id != *session_id && session.parent_spawn_ref.is_none() {
session.parent_spawn_ref = Some(SpawnRef {
parent_session_id,
parent_loop_id: plid.clone(),
tool_call_id: String::new(), tool_name: String::new(),
});
}
}
let record = LoopRecord {
loop_id: loop_id.clone(),
session_id: session_id.clone(),
agent_id: agent_id.clone(),
parent_loop_id: parent_loop_id.clone(),
continuation_kind: continuation_kind.clone(),
started_at: now,
ended_at: None,
status: LoopStatus::Running,
rejection: None,
config: config_snapshot.clone(),
messages: Vec::new(),
turns: Vec::new(),
usage: Usage::default(),
metadata: metadata.clone(),
events: Vec::new(),
children_loop_ids: Vec::new(),
child_loop_refs: Vec::new(),
parallel_group: None,
compaction_block: None,
};
let open = OpenLoop {
record,
next_seq: 0,
};
self.open_loops.insert(loop_id.clone(), open);
self.append_event(loop_id, event.clone());
}
AgentEvent::AgentEnd {
loop_id,
messages,
usage,
timestamp,
rejection,
} => {
self.append_event(loop_id, event.clone());
self.partial_turns.remove(loop_id.as_str());
if let Some(mut open) = self.open_loops.remove(loop_id) {
open.record.ended_at = Some(*timestamp);
open.record.status = if rejection.is_some() {
LoopStatus::Rejected
} else {
LoopStatus::Completed
};
open.record.rejection = rejection.clone();
open.record.messages = messages.clone();
open.record.usage = usage.clone();
if open.record.config.is_none() {
open.record.config = extract_config_snapshot(messages, loop_id);
}
let session_id = open.record.session_id.clone();
let parent_loop_id = open.record.parent_loop_id.clone();
if let Some(ref plid) = parent_loop_id {
let parent_in_session = self
.open_sessions
.get(&session_id)
.map(|s| s.loops.iter().any(|l| &l.loop_id == plid))
.unwrap_or(false);
let parent_in_open = self.open_loops.contains_key(plid.as_str());
if parent_in_session {
if let Some(s) = self.open_sessions.get_mut(&session_id) {
if let Some(p) = s.loops.iter_mut().find(|l| &l.loop_id == plid) {
if !p.children_loop_ids.contains(loop_id) {
p.children_loop_ids.push(loop_id.clone());
}
}
}
} else if parent_in_open {
if let Some(p) = self.open_loops.get_mut(plid.as_str()) {
if p.record.session_id == session_id
&& !p.record.children_loop_ids.contains(loop_id)
{
p.record.children_loop_ids.push(loop_id.clone());
}
}
}
}
if let Some(session) = self.open_sessions.get_mut(&session_id) {
session.loops.push(open.record);
}
}
}
AgentEvent::TurnStart {
loop_id,
turn_index,
timestamp,
triggered_by,
} => {
self.partial_turns.insert(
loop_id.clone(),
PartialTurn {
turn_id: TurnId {
loop_id: loop_id.clone(),
turn_index: *turn_index,
},
triggered_by: triggered_by.clone(),
started_at: *timestamp,
input_messages: Vec::new(),
},
);
self.append_event(loop_id, event.clone());
}
AgentEvent::MessageEnd {
loop_id, message, ..
} => {
if message.role() != "assistant" {
if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
partial.input_messages.push(message.clone());
}
}
self.append_event(loop_id, event.clone());
}
AgentEvent::TurnEnd {
loop_id,
message,
usage,
timestamp,
tool_results,
} => {
self.append_event(loop_id, event.clone());
if let Some(partial) = self.partial_turns.remove(loop_id.as_str()) {
let tid = Some(partial.turn_id.clone());
let turn = Turn {
turn_id: partial.turn_id,
triggered_by: partial.triggered_by,
usage: usage.clone(),
input_messages: partial.input_messages,
output_message: message.clone(),
tool_results: tool_results
.iter()
.map(|m| AgentMessage::from(m.clone()).with_turn_id(tid.clone()))
.collect(),
started_at: partial.started_at,
ended_at: *timestamp,
};
if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
open.record.turns.push(turn);
}
}
if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
if open.record.config.is_none() {
open.record.config =
extract_config_snapshot(std::slice::from_ref(message), loop_id);
}
}
}
AgentEvent::ToolExecutionEnd {
loop_id,
tool_call_id,
tool_name,
result,
..
} => {
self.append_event(loop_id, event.clone());
if let Some(child_lid) = &result.child_loop_id {
if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
let child_session_id = session_id_from_loop_id(child_lid);
open.record.child_loop_refs.push(ChildLoopRef {
tool_call_id: tool_call_id.clone(),
tool_name: tool_name.clone(),
child_loop_id: child_lid.clone(),
child_session_id: child_session_id.clone(),
});
let parent_session_id = open.record.session_id.clone();
let parent_lid = loop_id.clone();
let tc_id = tool_call_id.clone();
let tn = tool_name.clone();
let csl = child_session_id.clone();
let enrich = move |session: &mut Session| {
if let Some(ref mut sr) = session.parent_spawn_ref {
if sr.tool_call_id.is_empty() {
sr.parent_session_id = parent_session_id;
sr.parent_loop_id = parent_lid;
sr.tool_call_id = tc_id;
sr.tool_name = tn;
}
}
};
if let Some(child_sess) = self.open_sessions.get_mut(&csl) {
enrich(child_sess);
} else if let Some(child_sess) =
self.completed.iter_mut().find(|s| s.session_id == csl)
{
enrich(child_sess);
}
}
}
}
AgentEvent::ParallelLoopEnd {
selected_loop_id,
selected_config_index,
evaluation_usage,
..
} => {
let all_loop_ids = self
.partial_groups
.get(selected_loop_id.as_str())
.map(|pg| pg.all_loop_ids.clone())
.unwrap_or_else(|| vec![selected_loop_id.clone()]);
let group = ParallelGroupRecord {
all_loop_ids: all_loop_ids.clone(),
selected_loop_id: selected_loop_id.clone(),
selected_config_index: *selected_config_index,
evaluation_usage: evaluation_usage.clone(),
is_selected: false, };
for lid in &all_loop_ids {
let is_selected = lid == selected_loop_id;
let pg = ParallelGroupRecord {
is_selected,
..group.clone()
};
if let Some(open) = self.open_loops.get_mut(lid.as_str()) {
open.record.parallel_group = Some(pg.clone());
}
for session in self.open_sessions.values_mut() {
if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
lr.parallel_group = Some(pg.clone());
}
}
for session in self.completed.iter_mut() {
if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
lr.parallel_group = Some(pg.clone());
}
}
}
for lid in &all_loop_ids {
self.partial_groups.remove(lid.as_str());
}
}
AgentEvent::MessageUpdate { loop_id, .. } => {
if self.config.include_streaming_events {
self.append_event(loop_id, event.clone());
}
}
other => {
if let Some(lid) = loop_id_of(other) {
self.append_event(lid, event.clone());
}
}
}
}
pub fn flush(&mut self) {
self.partial_turns.clear();
let loop_ids: Vec<String> = self.open_loops.keys().cloned().collect();
for lid in loop_ids {
if let Some(mut open) = self.open_loops.remove(&lid) {
open.record.status = LoopStatus::Aborted;
let session_id = open.record.session_id.clone();
if let Some(session) = self.open_sessions.get_mut(&session_id) {
session.loops.push(open.record);
}
}
}
let session_ids: Vec<String> = self.open_sessions.keys().cloned().collect();
for sid in session_ids {
if let Some(session) = self.open_sessions.remove(&sid) {
if let Some(ref hook) = self.config.after_task {
hook(&session);
}
self.completed.push(session);
}
}
}
pub fn checkpoint(&mut self) -> usize {
let sessions_with_open_loops: Vec<String> = self
.open_loops
.values()
.map(|l| l.record.session_id.clone())
.collect();
let promotable: Vec<String> = self
.open_sessions
.keys()
.filter(|sid| !sessions_with_open_loops.contains(sid))
.cloned()
.collect();
let count = promotable.len();
for sid in promotable {
if let Some(session) = self.open_sessions.remove(&sid) {
self.completed.push(session);
}
}
count
}
pub fn drain_completed(&mut self) -> Vec<Session> {
std::mem::take(&mut self.completed)
}
pub fn sessions(&self) -> impl Iterator<Item = &Session> {
self.completed.iter().chain(self.open_sessions.values())
}
pub fn get_session(&self, session_id: &str) -> Option<&Session> {
self.completed
.iter()
.find(|s| s.session_id == session_id)
.or_else(|| self.open_sessions.get(session_id))
}
pub fn current_loop(&self, loop_id: &str) -> Option<&LoopRecord> {
self.open_loops.get(loop_id).map(|o| &o.record)
}
fn append_event(&mut self, loop_id: &str, event: AgentEvent) {
if let Some(open) = self.open_loops.get_mut(loop_id) {
let seq = open.next_seq;
open.next_seq += 1;
open.record.events.push(LoopEvent {
sequence: seq,
event,
});
}
}
}