phi_core/session/recorder.rs
1use super::helpers::*;
2use super::model::*;
3use crate::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7// ---------------------------------------------------------------------------
8// SessionRecorderConfig
9// ---------------------------------------------------------------------------
10
11// ── Session-level callback types (G2) ────────────────────────────────────
12
13/// Called when a new session is first created (first `AgentStart` with a new `session_id`).
14///
15/// Arguments: the newly created `Session` (header fields populated, no loops yet).
16/// Return `false` to reject the session (the recorder will still create it but mark it rejected).
17pub type BeforeTaskFn = Arc<dyn Fn(&Session) -> bool + Send + Sync>;
18
19/// Called when a session is finalized (via `flush()` or explicit close).
20///
21/// Arguments: the completed `Session` with all loops finalized.
22pub type AfterTaskFn = Arc<dyn Fn(&Session) + Send + Sync>;
23
24// ── SessionRecorderConfig ────────────────────────────────────────────────
25
26/// Configuration for [`SessionRecorder`].
27#[derive(Clone, Default)]
28pub struct SessionRecorderConfig {
29 /// Store `MessageUpdate` (streaming delta) events in [`LoopRecord::events`].
30 ///
31 /// Default: `false`. Streaming deltas are 100–1 000× more numerous than
32 /// final messages and are not needed for replay or branching. Enable only
33 /// for debugging or playback use cases.
34 pub include_streaming_events: bool,
35
36 /// Session-level callback: fires when a new session is first created (G2).
37 pub before_task: Option<BeforeTaskFn>,
38
39 /// Session-level callback: fires when a session is finalized (G2).
40 pub after_task: Option<AfterTaskFn>,
41}
42
43impl std::fmt::Debug for SessionRecorderConfig {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("SessionRecorderConfig")
46 .field("include_streaming_events", &self.include_streaming_events)
47 .field("before_task", &self.before_task.as_ref().map(|_| "..."))
48 .field("after_task", &self.after_task.as_ref().map(|_| "..."))
49 .finish()
50 }
51}
52
53// ---------------------------------------------------------------------------
54// SessionRecorder internals
55// ---------------------------------------------------------------------------
56
57/// Partial state for a parallel-evaluation group, accumulated as `ParallelLoopStart`
58/// arrives before `ParallelLoopEnd`.
59struct PartialParallelGroup {
60 all_loop_ids: Vec<String>,
61}
62
63/// Partial turn state accumulated between `TurnStart` and `TurnEnd`.
64/// Finalized into a [`Turn`] when `TurnEnd` is received.
65struct PartialTurn {
66 turn_id: TurnId,
67 triggered_by: TurnTrigger,
68 started_at: chrono::DateTime<chrono::Utc>,
69 input_messages: Vec<AgentMessage>,
70}
71
72// ---------------------------------------------------------------------------
73// SessionRecorder
74// ---------------------------------------------------------------------------
75
76/// Records every [`AgentEvent`] into a structured tree of [`Session`]s and
77/// [`LoopRecord`]s.
78///
79/// Call [`on_event`][Self::on_event] for every event emitted on the agent's
80/// `tx` channel, then [`flush`][Self::flush] before shutdown or saving.
81///
82/// ## Session grouping
83///
84/// Sessions are keyed by `session_id`. Every `AgentStart` event that carries a
85/// `session_id` the recorder has not seen before opens a new [`Session`]; all
86/// subsequent loops with the same `session_id` are appended to that session.
87///
88/// **The recorder never rotates sessions on its own.** If you want a new session
89/// to start after a period of inactivity, call
90/// [`BasicAgent::check_and_rotate`][crate::BasicAgent::check_and_rotate] (or
91/// [`BasicAgent::new_session`][crate::BasicAgent::new_session]) before the next
92/// prompt. The next `AgentStart` will carry the new `session_id` and the recorder
93/// will open a fresh [`Session`] automatically, with
94/// [`SessionFormation::InactivityTimeout`] or [`SessionFormation::FirstLoop`]
95/// as the recorded reason.
96///
97/// ## Example
98///
99/// ```rust,no_run
100/// use phi_core::session::{SessionRecorder, SessionRecorderConfig};
101/// use phi_core::AgentEvent;
102///
103/// let mut recorder = SessionRecorder::new(SessionRecorderConfig::default());
104/// // Feed events as they arrive:
105/// // recorder.on_event(event);
106/// recorder.flush();
107/// ```
108pub struct SessionRecorder {
109 config: SessionRecorderConfig,
110
111 /// Completed sessions (all their loops are closed).
112 completed: Vec<Session>,
113
114 /// Sessions that still have open loops.
115 open_sessions: HashMap<String, Session>,
116
117 /// Loops currently executing (between AgentStart and AgentEnd).
118 open_loops: HashMap<String, OpenLoop>,
119
120 /// Parallel groups announced by ParallelLoopStart but not yet closed.
121 partial_groups: HashMap<String, PartialParallelGroup>,
122
123 /// Turns being accumulated between `TurnStart` and `TurnEnd`, keyed by `loop_id`.
124 partial_turns: HashMap<String, PartialTurn>,
125}
126
127impl SessionRecorder {
128 /// Create a new recorder with the given configuration.
129 pub fn new(config: SessionRecorderConfig) -> Self {
130 SessionRecorder {
131 config,
132 completed: Vec::new(),
133 open_sessions: HashMap::new(),
134 open_loops: HashMap::new(),
135 partial_groups: HashMap::new(),
136 partial_turns: HashMap::new(),
137 }
138 }
139
140 /// Feed one event into the recorder.
141 ///
142 /// Must be called for every event emitted on the agent's `tx` channel.
143 pub fn on_event(&mut self, event: AgentEvent) {
144 match &event {
145 // ── ParallelLoopStart ─────────────────────────────────────────
146 AgentEvent::ParallelLoopStart { loop_ids, .. } => {
147 for lid in loop_ids {
148 // Pre-register a Pending record; will be promoted to Running when AgentStart arrives.
149 let group_key = lid.clone();
150 self.partial_groups
151 .entry(group_key)
152 .or_insert_with(|| PartialParallelGroup {
153 all_loop_ids: loop_ids.clone(),
154 });
155 // We don't have agent_id / session_id yet — those arrive in AgentStart.
156 }
157 }
158
159 // ── AgentStart ────────────────────────────────────────────────
160 AgentEvent::AgentStart {
161 agent_id,
162 session_id,
163 loop_id,
164 parent_loop_id,
165 continuation_kind,
166 timestamp,
167 metadata,
168 config_snapshot,
169 } => {
170 // Ensure the session exists.
171 let now = *timestamp;
172 let is_new_session = !self.open_sessions.contains_key(session_id);
173 let session = self
174 .open_sessions
175 .entry(session_id.clone())
176 .or_insert_with(|| Session {
177 session_id: session_id.clone(),
178 agent_id: agent_id.clone(),
179 created_at: now,
180 last_active_at: now,
181 formation: SessionFormation::FirstLoop { timestamp: now },
182 parent_spawn_ref: None,
183 scope: SessionScope::Ephemeral,
184 loops: Vec::new(),
185 });
186 session.last_active_at = now;
187
188 // G2: fire before_task callback on new session creation
189 if is_new_session {
190 if let Some(ref hook) = self.config.before_task {
191 hook(session);
192 }
193 }
194
195 // If parent_loop_id is set and belongs to a DIFFERENT session, this is a
196 // sub-agent spawn — record the inbound SpawnRef on this session.
197 if let Some(ref plid) = parent_loop_id {
198 let parent_session_id = session_id_from_loop_id(plid);
199 if parent_session_id != *session_id && session.parent_spawn_ref.is_none() {
200 // We don't have the tool_call_id / tool_name here; those come from the parent's
201 // ToolExecutionEnd. Set what we know; callers can enrich later if needed.
202 session.parent_spawn_ref = Some(SpawnRef {
203 parent_session_id,
204 parent_loop_id: plid.clone(),
205 tool_call_id: String::new(), // enriched when ChildLoopRef is processed
206 tool_name: String::new(),
207 });
208 }
209 }
210
211 // Create the LoopRecord (Pending → Running).
212 let record = LoopRecord {
213 loop_id: loop_id.clone(),
214 session_id: session_id.clone(),
215 agent_id: agent_id.clone(),
216 parent_loop_id: parent_loop_id.clone(),
217 continuation_kind: continuation_kind.clone(),
218 started_at: now,
219 ended_at: None,
220 status: LoopStatus::Running,
221 rejection: None,
222 config: config_snapshot.clone(),
223 messages: Vec::new(),
224 turns: Vec::new(),
225 usage: Usage::default(),
226 metadata: metadata.clone(),
227 events: Vec::new(),
228 children_loop_ids: Vec::new(),
229 child_loop_refs: Vec::new(),
230 parallel_group: None,
231 compaction_block: None,
232 };
233 let open = OpenLoop {
234 record,
235 next_seq: 0,
236 };
237 self.open_loops.insert(loop_id.clone(), open);
238 // Append AgentStart to event stream.
239 self.append_event(loop_id, event.clone());
240 }
241
242 // ── AgentEnd ──────────────────────────────────────────────────
243 AgentEvent::AgentEnd {
244 loop_id,
245 messages,
246 usage,
247 timestamp,
248 rejection,
249 } => {
250 self.append_event(loop_id, event.clone());
251 // Discard any orphaned partial turn for this loop.
252 self.partial_turns.remove(loop_id.as_str());
253 if let Some(mut open) = self.open_loops.remove(loop_id) {
254 open.record.ended_at = Some(*timestamp);
255 open.record.status = if rejection.is_some() {
256 LoopStatus::Rejected
257 } else {
258 LoopStatus::Completed
259 };
260 open.record.rejection = rejection.clone();
261 open.record.messages = messages.clone();
262 open.record.usage = usage.clone();
263
264 // Extract config snapshot from first assistant message.
265 if open.record.config.is_none() {
266 open.record.config = extract_config_snapshot(messages, loop_id);
267 }
268
269 let session_id = open.record.session_id.clone();
270 let parent_loop_id = open.record.parent_loop_id.clone();
271
272 // Link parent → child within same session.
273 if let Some(ref plid) = parent_loop_id {
274 // Check if parent is in the same session.
275 let parent_in_session = self
276 .open_sessions
277 .get(&session_id)
278 .map(|s| s.loops.iter().any(|l| &l.loop_id == plid))
279 .unwrap_or(false);
280 let parent_in_open = self.open_loops.contains_key(plid.as_str());
281
282 if parent_in_session {
283 if let Some(s) = self.open_sessions.get_mut(&session_id) {
284 if let Some(p) = s.loops.iter_mut().find(|l| &l.loop_id == plid) {
285 if !p.children_loop_ids.contains(loop_id) {
286 p.children_loop_ids.push(loop_id.clone());
287 }
288 }
289 }
290 } else if parent_in_open {
291 if let Some(p) = self.open_loops.get_mut(plid.as_str()) {
292 // Only link same-session children. Cross-session sub-agent
293 // children are tracked via child_loop_refs / SpawnRef.
294 if p.record.session_id == session_id
295 && !p.record.children_loop_ids.contains(loop_id)
296 {
297 p.record.children_loop_ids.push(loop_id.clone());
298 }
299 }
300 }
301 }
302
303 // Move into session.
304 if let Some(session) = self.open_sessions.get_mut(&session_id) {
305 session.loops.push(open.record);
306 }
307 }
308 }
309
310 // ── TurnStart — begin accumulating a partial turn ────────────
311 AgentEvent::TurnStart {
312 loop_id,
313 turn_index,
314 timestamp,
315 triggered_by,
316 } => {
317 self.partial_turns.insert(
318 loop_id.clone(),
319 PartialTurn {
320 turn_id: TurnId {
321 loop_id: loop_id.clone(),
322 turn_index: *turn_index,
323 },
324 triggered_by: triggered_by.clone(),
325 started_at: *timestamp,
326 input_messages: Vec::new(),
327 },
328 );
329 self.append_event(loop_id, event.clone());
330 }
331
332 // ── MessageEnd — capture non-assistant messages as turn input ─
333 AgentEvent::MessageEnd {
334 loop_id, message, ..
335 } => {
336 if message.role() != "assistant" {
337 if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
338 partial.input_messages.push(message.clone());
339 }
340 }
341 self.append_event(loop_id, event.clone());
342 }
343
344 // ── TurnEnd — finalize turn + extract config snapshot ─────────
345 AgentEvent::TurnEnd {
346 loop_id,
347 message,
348 usage,
349 timestamp,
350 tool_results,
351 } => {
352 self.append_event(loop_id, event.clone());
353
354 // Finalize the partial turn into a materialized Turn.
355 if let Some(partial) = self.partial_turns.remove(loop_id.as_str()) {
356 let tid = Some(partial.turn_id.clone());
357 let turn = Turn {
358 turn_id: partial.turn_id,
359 triggered_by: partial.triggered_by,
360 usage: usage.clone(),
361 input_messages: partial.input_messages,
362 output_message: message.clone(),
363 tool_results: tool_results
364 .iter()
365 .map(|m| AgentMessage::from(m.clone()).with_turn_id(tid.clone()))
366 .collect(),
367 started_at: partial.started_at,
368 ended_at: *timestamp,
369 };
370 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
371 open.record.turns.push(turn);
372 }
373 }
374
375 // Extract config snapshot from assistant message.
376 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
377 if open.record.config.is_none() {
378 open.record.config =
379 extract_config_snapshot(std::slice::from_ref(message), loop_id);
380 }
381 }
382 }
383
384 // ── ToolExecutionEnd — record child loop ref ──────────────────
385 AgentEvent::ToolExecutionEnd {
386 loop_id,
387 tool_call_id,
388 tool_name,
389 result,
390 // child_loop_id is also a top-level field on ToolExecutionEnd (mirrors
391 // result.child_loop_id for ergonomic pattern matching). We read from
392 // result.child_loop_id here so the ChildLoopRef is populated from the
393 // same authoritative source as ToolResult.
394 ..
395 } => {
396 self.append_event(loop_id, event.clone());
397 if let Some(child_lid) = &result.child_loop_id {
398 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
399 let child_session_id = session_id_from_loop_id(child_lid);
400 open.record.child_loop_refs.push(ChildLoopRef {
401 tool_call_id: tool_call_id.clone(),
402 tool_name: tool_name.clone(),
403 child_loop_id: child_lid.clone(),
404 child_session_id: child_session_id.clone(),
405 });
406
407 // Enrich child session's parent_spawn_ref with the tool details we now know.
408 // The child may still be in open_sessions (common case) or already in
409 // completed (if flush() was called between child AgentEnd and this event).
410 // We check both to avoid a silent enrichment skip.
411 let parent_session_id = open.record.session_id.clone();
412 let parent_lid = loop_id.clone();
413 let tc_id = tool_call_id.clone();
414 let tn = tool_name.clone();
415 let csl = child_session_id.clone();
416 let enrich = move |session: &mut Session| {
417 if let Some(ref mut sr) = session.parent_spawn_ref {
418 if sr.tool_call_id.is_empty() {
419 sr.parent_session_id = parent_session_id;
420 sr.parent_loop_id = parent_lid;
421 sr.tool_call_id = tc_id;
422 sr.tool_name = tn;
423 }
424 }
425 };
426 if let Some(child_sess) = self.open_sessions.get_mut(&csl) {
427 enrich(child_sess);
428 } else if let Some(child_sess) =
429 self.completed.iter_mut().find(|s| s.session_id == csl)
430 {
431 enrich(child_sess);
432 }
433 }
434 }
435 }
436
437 // ── ParallelLoopEnd ───────────────────────────────────────────
438 AgentEvent::ParallelLoopEnd {
439 selected_loop_id,
440 selected_config_index,
441 evaluation_usage,
442 ..
443 } => {
444 // Recover all_loop_ids from the partial_groups registered at ParallelLoopStart.
445 let all_loop_ids = self
446 .partial_groups
447 .get(selected_loop_id.as_str())
448 .map(|pg| pg.all_loop_ids.clone())
449 .unwrap_or_else(|| vec![selected_loop_id.clone()]);
450 let group = ParallelGroupRecord {
451 all_loop_ids: all_loop_ids.clone(),
452 selected_loop_id: selected_loop_id.clone(),
453 selected_config_index: *selected_config_index,
454 evaluation_usage: evaluation_usage.clone(),
455 is_selected: false, // will be set per-record below
456 };
457
458 // Retroactively set ParallelGroupRecord on all branch LoopRecords.
459 for lid in &all_loop_ids {
460 let is_selected = lid == selected_loop_id;
461 let pg = ParallelGroupRecord {
462 is_selected,
463 ..group.clone()
464 };
465
466 // Check open_loops first (loop may not be closed yet).
467 if let Some(open) = self.open_loops.get_mut(lid.as_str()) {
468 open.record.parallel_group = Some(pg.clone());
469 }
470
471 // Also retroactively update already-closed loops in sessions.
472 for session in self.open_sessions.values_mut() {
473 if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
474 lr.parallel_group = Some(pg.clone());
475 }
476 }
477 for session in self.completed.iter_mut() {
478 if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
479 lr.parallel_group = Some(pg.clone());
480 }
481 }
482 }
483
484 // Clean up partial group entries.
485 for lid in &all_loop_ids {
486 self.partial_groups.remove(lid.as_str());
487 }
488 }
489
490 // ── MessageUpdate — optional streaming events ─────────────────
491 AgentEvent::MessageUpdate { loop_id, .. } => {
492 if self.config.include_streaming_events {
493 self.append_event(loop_id, event.clone());
494 }
495 }
496
497 // ── All other events — append to loop stream ──────────────────
498 other => {
499 if let Some(lid) = loop_id_of(other) {
500 self.append_event(lid, event.clone());
501 }
502 }
503 }
504 }
505
506 /// Finalize all open [`LoopRecord`]s (status → [`LoopStatus::Aborted`]) and
507 /// move them into their sessions.
508 ///
509 /// Call before saving or on process shutdown.
510 pub fn flush(&mut self) {
511 // Discard orphaned partial turns (TurnStart received but no TurnEnd).
512 self.partial_turns.clear();
513
514 let loop_ids: Vec<String> = self.open_loops.keys().cloned().collect();
515 for lid in loop_ids {
516 if let Some(mut open) = self.open_loops.remove(&lid) {
517 open.record.status = LoopStatus::Aborted;
518 let session_id = open.record.session_id.clone();
519 if let Some(session) = self.open_sessions.get_mut(&session_id) {
520 session.loops.push(open.record);
521 }
522 }
523 }
524 // Move fully-closed sessions from open_sessions to completed.
525 let session_ids: Vec<String> = self.open_sessions.keys().cloned().collect();
526 for sid in session_ids {
527 // A session is "complete" when all its loops have ended.
528 // Since we just flushed all open loops, every session is complete.
529 if let Some(session) = self.open_sessions.remove(&sid) {
530 // G2: fire after_task callback on session finalization
531 if let Some(ref hook) = self.config.after_task {
532 hook(&session);
533 }
534 self.completed.push(session);
535 }
536 }
537 }
538
539 /// Promote sessions that have no remaining open loops to the completed list,
540 /// without aborting any running loops.
541 ///
542 /// A session is eligible when every loop belonging to it has already received
543 /// an [`AgentEnd`][crate::AgentEvent::AgentEnd] event (i.e. it has no entry in
544 /// the internal open-loops map). Sessions that still have active loops are left
545 /// in place.
546 ///
547 /// This is intended for **periodic checkpointing** in production: save finished
548 /// sessions to disk while leaving in-flight agent runs untouched. In contrast,
549 /// [`flush`][Self::flush] first aborts all open loops and then promotes
550 /// everything.
551 ///
552 /// Returns the number of sessions that were promoted.
553 pub fn checkpoint(&mut self) -> usize {
554 // Collect session_ids that still have open loops.
555 let sessions_with_open_loops: Vec<String> = self
556 .open_loops
557 .values()
558 .map(|l| l.record.session_id.clone())
559 .collect();
560 // Promote sessions whose id is not in that set.
561 let promotable: Vec<String> = self
562 .open_sessions
563 .keys()
564 .filter(|sid| !sessions_with_open_loops.contains(sid))
565 .cloned()
566 .collect();
567 let count = promotable.len();
568 for sid in promotable {
569 if let Some(session) = self.open_sessions.remove(&sid) {
570 self.completed.push(session);
571 }
572 }
573 count
574 }
575
576 /// Drain all completed sessions out of the recorder (consuming them).
577 ///
578 /// Useful for periodic checkpointing. Call [`flush`][Self::flush] first
579 /// if you want to include in-progress sessions, or [`checkpoint`][Self::checkpoint]
580 /// to drain only fully-finished sessions without aborting active loops.
581 pub fn drain_completed(&mut self) -> Vec<Session> {
582 std::mem::take(&mut self.completed)
583 }
584
585 /// All sessions known to this recorder (completed and in-progress).
586 pub fn sessions(&self) -> impl Iterator<Item = &Session> {
587 self.completed.iter().chain(self.open_sessions.values())
588 }
589
590 /// Look up a session by `session_id`.
591 pub fn get_session(&self, session_id: &str) -> Option<&Session> {
592 self.completed
593 .iter()
594 .find(|s| s.session_id == session_id)
595 .or_else(|| self.open_sessions.get(session_id))
596 }
597
598 /// Look up an in-progress [`LoopRecord`] by `loop_id`.
599 pub fn current_loop(&self, loop_id: &str) -> Option<&LoopRecord> {
600 self.open_loops.get(loop_id).map(|o| &o.record)
601 }
602
603 // ── Private helpers ───────────────────────────────────────────────────
604
605 fn append_event(&mut self, loop_id: &str, event: AgentEvent) {
606 if let Some(open) = self.open_loops.get_mut(loop_id) {
607 let seq = open.next_seq;
608 open.next_seq += 1;
609 open.record.events.push(LoopEvent {
610 sequence: seq,
611 event,
612 });
613 }
614 }
615}