phi_core/session/recorder.rs
1use super::helpers::*;
2use super::model::*;
3use crate::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7// ---------------------------------------------------------------------------
8// SessionRecorderConfig
9// ---------------------------------------------------------------------------
10
11// ── Session-level callback types (G2) ────────────────────────────────────
12
13/// Called when a new session is first created (first `AgentStart` with a new `session_id`).
14///
15/// Arguments: the newly created `Session` (header fields populated, no loops yet).
16/// Return `false` to reject the session (the recorder will still create it but mark it rejected).
17pub type BeforeTaskFn = Arc<dyn Fn(&Session) -> bool + Send + Sync>;
18
19/// Called when a session is finalized (via `flush()` or explicit close).
20///
21/// Arguments: the completed `Session` with all loops finalized.
22pub type AfterTaskFn = Arc<dyn Fn(&Session) + Send + Sync>;
23
24// ── SessionRecorderConfig ────────────────────────────────────────────────
25
26/// Configuration for [`SessionRecorder`].
27#[derive(Clone, Default)]
28pub struct SessionRecorderConfig {
29 /// Store `MessageUpdate` (streaming delta) events in [`LoopRecord::events`].
30 ///
31 /// Default: `false`. Streaming deltas are 100–1 000× more numerous than
32 /// final messages and are not needed for replay or branching. Enable only
33 /// for debugging or playback use cases.
34 pub include_streaming_events: bool,
35
36 /// Capture [`AnnotatedRequestPayload`] from [`AgentEvent::TurnRequest`]
37 /// onto each materialized [`Turn::request_payload`].
38 ///
39 /// Default: `false`. Each per-turn payload can be hundreds of KB (full
40 /// system prompt + message vec); enable only when reconstructing the
41 /// exact request the model received is required (debugging, golden-output
42 /// replay, cost analysis). The `TurnRequest` event is emitted regardless
43 /// of this flag — disabling it only suppresses persistence.
44 ///
45 /// Added in phi-core 0.9.0.
46 pub capture_turn_requests: bool,
47
48 /// Session-level callback: fires when a new session is first created (G2).
49 pub before_task: Option<BeforeTaskFn>,
50
51 /// Session-level callback: fires when a session is finalized (G2).
52 pub after_task: Option<AfterTaskFn>,
53}
54
55impl std::fmt::Debug for SessionRecorderConfig {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("SessionRecorderConfig")
58 .field("include_streaming_events", &self.include_streaming_events)
59 .field("capture_turn_requests", &self.capture_turn_requests)
60 .field("before_task", &self.before_task.as_ref().map(|_| "..."))
61 .field("after_task", &self.after_task.as_ref().map(|_| "..."))
62 .finish()
63 }
64}
65
66// ---------------------------------------------------------------------------
67// SessionRecorder internals
68// ---------------------------------------------------------------------------
69
70/// Partial state for a parallel-evaluation group, accumulated as `ParallelLoopStart`
71/// arrives before `ParallelLoopEnd`.
72struct PartialParallelGroup {
73 all_loop_ids: Vec<String>,
74}
75
76/// Partial turn state accumulated between `TurnStart` and `TurnEnd`.
77/// Finalized into a [`Turn`] when `TurnEnd` is received.
78struct PartialTurn {
79 turn_id: TurnId,
80 triggered_by: TurnTrigger,
81 started_at: chrono::DateTime<chrono::Utc>,
82 input_messages: Vec<AgentMessage>,
83 /// 0.9.0 — accumulated from [`AgentEvent::TurnRequest`] when
84 /// [`SessionRecorderConfig::capture_turn_requests`] is `true`.
85 request_payload: Option<AnnotatedRequestPayload>,
86}
87
88// ---------------------------------------------------------------------------
89// SessionRecorder
90// ---------------------------------------------------------------------------
91
92/// Records every [`AgentEvent`] into a structured tree of [`Session`]s and
93/// [`LoopRecord`]s.
94///
95/// Call [`on_event`][Self::on_event] for every event emitted on the agent's
96/// `tx` channel, then [`flush`][Self::flush] before shutdown or saving.
97///
98/// ## Session grouping
99///
100/// Sessions are keyed by `session_id`. Every `AgentStart` event that carries a
101/// `session_id` the recorder has not seen before opens a new [`Session`]; all
102/// subsequent loops with the same `session_id` are appended to that session.
103///
104/// **The recorder never rotates sessions on its own.** If you want a new session
105/// to start after a period of inactivity, call
106/// [`BasicAgent::check_and_rotate`][crate::BasicAgent::check_and_rotate] (or
107/// [`BasicAgent::new_session`][crate::BasicAgent::new_session]) before the next
108/// prompt. The next `AgentStart` will carry the new `session_id` and the recorder
109/// will open a fresh [`Session`] automatically, with
110/// [`SessionFormation::InactivityTimeout`] or [`SessionFormation::FirstLoop`]
111/// as the recorded reason.
112///
113/// ## Example
114///
115/// ```rust,no_run
116/// use phi_core::session::{SessionRecorder, SessionRecorderConfig};
117/// use phi_core::AgentEvent;
118///
119/// let mut recorder = SessionRecorder::new(SessionRecorderConfig::default());
120/// // Feed events as they arrive:
121/// // recorder.on_event(event);
122/// recorder.flush();
123/// ```
124pub struct SessionRecorder {
125 config: SessionRecorderConfig,
126
127 /// Completed sessions (all their loops are closed).
128 completed: Vec<Session>,
129
130 /// Sessions that still have open loops.
131 open_sessions: HashMap<String, Session>,
132
133 /// Loops currently executing (between AgentStart and AgentEnd).
134 open_loops: HashMap<String, OpenLoop>,
135
136 /// Parallel groups announced by ParallelLoopStart but not yet closed.
137 partial_groups: HashMap<String, PartialParallelGroup>,
138
139 /// Turns being accumulated between `TurnStart` and `TurnEnd`, keyed by `loop_id`.
140 partial_turns: HashMap<String, PartialTurn>,
141}
142
143impl SessionRecorder {
144 /// Create a new recorder with the given configuration.
145 pub fn new(config: SessionRecorderConfig) -> Self {
146 SessionRecorder {
147 config,
148 completed: Vec::new(),
149 open_sessions: HashMap::new(),
150 open_loops: HashMap::new(),
151 partial_groups: HashMap::new(),
152 partial_turns: HashMap::new(),
153 }
154 }
155
156 /// Feed one event into the recorder.
157 ///
158 /// Must be called for every event emitted on the agent's `tx` channel.
159 pub fn on_event(&mut self, event: AgentEvent) {
160 match &event {
161 // ── ParallelLoopStart ─────────────────────────────────────────
162 AgentEvent::ParallelLoopStart { loop_ids, .. } => {
163 for lid in loop_ids {
164 // Pre-register a Pending record; will be promoted to Running when AgentStart arrives.
165 let group_key = lid.clone();
166 self.partial_groups
167 .entry(group_key)
168 .or_insert_with(|| PartialParallelGroup {
169 all_loop_ids: loop_ids.clone(),
170 });
171 // We don't have agent_id / session_id yet — those arrive in AgentStart.
172 }
173 }
174
175 // ── AgentStart ────────────────────────────────────────────────
176 AgentEvent::AgentStart {
177 agent_id,
178 session_id,
179 loop_id,
180 parent_loop_id,
181 continuation_kind,
182 timestamp,
183 metadata,
184 config_snapshot,
185 } => {
186 // Ensure the session exists.
187 let now = *timestamp;
188 let is_new_session = !self.open_sessions.contains_key(session_id);
189 let session = self
190 .open_sessions
191 .entry(session_id.clone())
192 .or_insert_with(|| Session {
193 session_id: session_id.clone(),
194 agent_id: agent_id.clone(),
195 created_at: now,
196 last_active_at: now,
197 formation: SessionFormation::FirstLoop { timestamp: now },
198 parent_spawn_ref: None,
199 scope: SessionScope::Ephemeral,
200 loops: Vec::new(),
201 });
202 session.last_active_at = now;
203
204 // G2: fire before_task callback on new session creation
205 if is_new_session {
206 if let Some(ref hook) = self.config.before_task {
207 hook(session);
208 }
209 }
210
211 // If parent_loop_id is set and belongs to a DIFFERENT session, this is a
212 // sub-agent spawn — record the inbound SpawnRef on this session.
213 if let Some(ref plid) = parent_loop_id {
214 let parent_session_id = session_id_from_loop_id(plid);
215 if parent_session_id != *session_id && session.parent_spawn_ref.is_none() {
216 // We don't have the tool_call_id / tool_name here; those come from the parent's
217 // ToolExecutionEnd. Set what we know; callers can enrich later if needed.
218 session.parent_spawn_ref = Some(SpawnRef {
219 parent_session_id,
220 parent_loop_id: plid.clone(),
221 tool_call_id: String::new(), // enriched when ChildLoopRef is processed
222 tool_name: String::new(),
223 });
224 }
225 }
226
227 // Create the LoopRecord (Pending → Running).
228 let record = LoopRecord {
229 loop_id: loop_id.clone(),
230 session_id: session_id.clone(),
231 agent_id: agent_id.clone(),
232 parent_loop_id: parent_loop_id.clone(),
233 continuation_kind: continuation_kind.clone(),
234 started_at: now,
235 ended_at: None,
236 status: LoopStatus::Running,
237 rejection: None,
238 config: config_snapshot.clone(),
239 messages: Vec::new(),
240 turns: Vec::new(),
241 usage: Usage::default(),
242 metadata: metadata.clone(),
243 events: Vec::new(),
244 children_loop_ids: Vec::new(),
245 child_loop_refs: Vec::new(),
246 parallel_group: None,
247 compaction_block: None,
248 };
249 let open = OpenLoop {
250 record,
251 next_seq: 0,
252 };
253 self.open_loops.insert(loop_id.clone(), open);
254 // Append AgentStart to event stream.
255 self.append_event(loop_id, event.clone());
256 }
257
258 // ── AgentEnd ──────────────────────────────────────────────────
259 AgentEvent::AgentEnd {
260 loop_id,
261 messages,
262 usage,
263 timestamp,
264 rejection,
265 } => {
266 self.append_event(loop_id, event.clone());
267 // Discard any orphaned partial turn for this loop.
268 self.partial_turns.remove(loop_id.as_str());
269 if let Some(mut open) = self.open_loops.remove(loop_id) {
270 open.record.ended_at = Some(*timestamp);
271 open.record.status = if rejection.is_some() {
272 LoopStatus::Rejected
273 } else {
274 LoopStatus::Completed
275 };
276 open.record.rejection = rejection.clone();
277 open.record.messages = messages.clone();
278 open.record.usage = usage.clone();
279
280 // Extract config snapshot from first assistant message.
281 if open.record.config.is_none() {
282 open.record.config = extract_config_snapshot(messages, loop_id);
283 }
284
285 let session_id = open.record.session_id.clone();
286 let parent_loop_id = open.record.parent_loop_id.clone();
287
288 // Link parent → child within same session.
289 if let Some(ref plid) = parent_loop_id {
290 // Check if parent is in the same session.
291 let parent_in_session = self
292 .open_sessions
293 .get(&session_id)
294 .map(|s| s.loops.iter().any(|l| &l.loop_id == plid))
295 .unwrap_or(false);
296 let parent_in_open = self.open_loops.contains_key(plid.as_str());
297
298 if parent_in_session {
299 if let Some(s) = self.open_sessions.get_mut(&session_id) {
300 if let Some(p) = s.loops.iter_mut().find(|l| &l.loop_id == plid) {
301 if !p.children_loop_ids.contains(loop_id) {
302 p.children_loop_ids.push(loop_id.clone());
303 }
304 }
305 }
306 } else if parent_in_open {
307 if let Some(p) = self.open_loops.get_mut(plid.as_str()) {
308 // Only link same-session children. Cross-session sub-agent
309 // children are tracked via child_loop_refs / SpawnRef.
310 if p.record.session_id == session_id
311 && !p.record.children_loop_ids.contains(loop_id)
312 {
313 p.record.children_loop_ids.push(loop_id.clone());
314 }
315 }
316 }
317 }
318
319 // Move into session.
320 if let Some(session) = self.open_sessions.get_mut(&session_id) {
321 session.loops.push(open.record);
322 }
323 }
324 }
325
326 // ── TurnStart — begin accumulating a partial turn ────────────
327 AgentEvent::TurnStart {
328 loop_id,
329 turn_index,
330 timestamp,
331 triggered_by,
332 } => {
333 self.partial_turns.insert(
334 loop_id.clone(),
335 PartialTurn {
336 turn_id: TurnId {
337 loop_id: loop_id.clone(),
338 turn_index: *turn_index,
339 },
340 triggered_by: triggered_by.clone(),
341 started_at: *timestamp,
342 input_messages: Vec::new(),
343 request_payload: None,
344 },
345 );
346 self.append_event(loop_id, event.clone());
347 }
348
349 // ── TurnRequest — opt-in capture of the assembled LLM payload ─
350 AgentEvent::TurnRequest {
351 loop_id, payload, ..
352 } => {
353 if self.config.capture_turn_requests {
354 if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
355 partial.request_payload = Some(payload.clone());
356 }
357 }
358 self.append_event(loop_id, event.clone());
359 }
360
361 // ── MessageEnd — capture non-assistant messages as turn input ─
362 AgentEvent::MessageEnd {
363 loop_id, message, ..
364 } => {
365 if message.role() != "assistant" {
366 if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
367 partial.input_messages.push(message.clone());
368 }
369 }
370 self.append_event(loop_id, event.clone());
371 }
372
373 // ── TurnEnd — finalize turn + extract config snapshot ─────────
374 AgentEvent::TurnEnd {
375 loop_id,
376 message,
377 usage,
378 timestamp,
379 tool_results,
380 } => {
381 self.append_event(loop_id, event.clone());
382
383 // Finalize the partial turn into a materialized Turn.
384 if let Some(partial) = self.partial_turns.remove(loop_id.as_str()) {
385 let tid = Some(partial.turn_id.clone());
386 let turn = Turn {
387 turn_id: partial.turn_id,
388 triggered_by: partial.triggered_by,
389 usage: usage.clone(),
390 input_messages: partial.input_messages,
391 output_message: message.clone(),
392 tool_results: tool_results
393 .iter()
394 .map(|m| AgentMessage::from(m.clone()).with_turn_id(tid.clone()))
395 .collect(),
396 started_at: partial.started_at,
397 ended_at: *timestamp,
398 request_payload: partial.request_payload,
399 };
400 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
401 open.record.turns.push(turn);
402 }
403 }
404
405 // Extract config snapshot from assistant message.
406 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
407 if open.record.config.is_none() {
408 open.record.config =
409 extract_config_snapshot(std::slice::from_ref(message), loop_id);
410 }
411 }
412 }
413
414 // ── ToolExecutionEnd — record child loop ref ──────────────────
415 AgentEvent::ToolExecutionEnd {
416 loop_id,
417 tool_call_id,
418 tool_name,
419 result,
420 // child_loop_id is also a top-level field on ToolExecutionEnd (mirrors
421 // result.child_loop_id for ergonomic pattern matching). We read from
422 // result.child_loop_id here so the ChildLoopRef is populated from the
423 // same authoritative source as ToolResult.
424 ..
425 } => {
426 self.append_event(loop_id, event.clone());
427 if let Some(child_lid) = &result.child_loop_id {
428 if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
429 let child_session_id = session_id_from_loop_id(child_lid);
430 open.record.child_loop_refs.push(ChildLoopRef {
431 tool_call_id: tool_call_id.clone(),
432 tool_name: tool_name.clone(),
433 child_loop_id: child_lid.clone(),
434 child_session_id: child_session_id.clone(),
435 });
436
437 // Enrich child session's parent_spawn_ref with the tool details we now know.
438 // The child may still be in open_sessions (common case) or already in
439 // completed (if flush() was called between child AgentEnd and this event).
440 // We check both to avoid a silent enrichment skip.
441 let parent_session_id = open.record.session_id.clone();
442 let parent_lid = loop_id.clone();
443 let tc_id = tool_call_id.clone();
444 let tn = tool_name.clone();
445 let csl = child_session_id.clone();
446 let enrich = move |session: &mut Session| {
447 if let Some(ref mut sr) = session.parent_spawn_ref {
448 if sr.tool_call_id.is_empty() {
449 sr.parent_session_id = parent_session_id;
450 sr.parent_loop_id = parent_lid;
451 sr.tool_call_id = tc_id;
452 sr.tool_name = tn;
453 }
454 }
455 };
456 if let Some(child_sess) = self.open_sessions.get_mut(&csl) {
457 enrich(child_sess);
458 } else if let Some(child_sess) =
459 self.completed.iter_mut().find(|s| s.session_id == csl)
460 {
461 enrich(child_sess);
462 }
463 }
464 }
465 }
466
467 // ── ParallelLoopEnd ───────────────────────────────────────────
468 AgentEvent::ParallelLoopEnd {
469 selected_loop_id,
470 selected_config_index,
471 evaluation_usage,
472 ..
473 } => {
474 // Recover all_loop_ids from the partial_groups registered at ParallelLoopStart.
475 let all_loop_ids = self
476 .partial_groups
477 .get(selected_loop_id.as_str())
478 .map(|pg| pg.all_loop_ids.clone())
479 .unwrap_or_else(|| vec![selected_loop_id.clone()]);
480 let group = ParallelGroupRecord {
481 all_loop_ids: all_loop_ids.clone(),
482 selected_loop_id: selected_loop_id.clone(),
483 selected_config_index: *selected_config_index,
484 evaluation_usage: evaluation_usage.clone(),
485 is_selected: false, // will be set per-record below
486 };
487
488 // Retroactively set ParallelGroupRecord on all branch LoopRecords.
489 for lid in &all_loop_ids {
490 let is_selected = lid == selected_loop_id;
491 let pg = ParallelGroupRecord {
492 is_selected,
493 ..group.clone()
494 };
495
496 // Check open_loops first (loop may not be closed yet).
497 if let Some(open) = self.open_loops.get_mut(lid.as_str()) {
498 open.record.parallel_group = Some(pg.clone());
499 }
500
501 // Also retroactively update already-closed loops in sessions.
502 for session in self.open_sessions.values_mut() {
503 if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
504 lr.parallel_group = Some(pg.clone());
505 }
506 }
507 for session in self.completed.iter_mut() {
508 if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
509 lr.parallel_group = Some(pg.clone());
510 }
511 }
512 }
513
514 // Clean up partial group entries.
515 for lid in &all_loop_ids {
516 self.partial_groups.remove(lid.as_str());
517 }
518 }
519
520 // ── MessageUpdate — optional streaming events ─────────────────
521 AgentEvent::MessageUpdate { loop_id, .. } => {
522 if self.config.include_streaming_events {
523 self.append_event(loop_id, event.clone());
524 }
525 }
526
527 // ── All other events — append to loop stream ──────────────────
528 other => {
529 if let Some(lid) = loop_id_of(other) {
530 self.append_event(lid, event.clone());
531 }
532 }
533 }
534 }
535
536 /// Finalize all open [`LoopRecord`]s (status → [`LoopStatus::Aborted`]) and
537 /// move them into their sessions.
538 ///
539 /// Call before saving or on process shutdown.
540 pub fn flush(&mut self) {
541 // Discard orphaned partial turns (TurnStart received but no TurnEnd).
542 self.partial_turns.clear();
543
544 let loop_ids: Vec<String> = self.open_loops.keys().cloned().collect();
545 for lid in loop_ids {
546 if let Some(mut open) = self.open_loops.remove(&lid) {
547 open.record.status = LoopStatus::Aborted;
548 let session_id = open.record.session_id.clone();
549 if let Some(session) = self.open_sessions.get_mut(&session_id) {
550 session.loops.push(open.record);
551 }
552 }
553 }
554 // Move fully-closed sessions from open_sessions to completed.
555 let session_ids: Vec<String> = self.open_sessions.keys().cloned().collect();
556 for sid in session_ids {
557 // A session is "complete" when all its loops have ended.
558 // Since we just flushed all open loops, every session is complete.
559 if let Some(session) = self.open_sessions.remove(&sid) {
560 // G2: fire after_task callback on session finalization
561 if let Some(ref hook) = self.config.after_task {
562 hook(&session);
563 }
564 self.completed.push(session);
565 }
566 }
567 }
568
569 /// Promote sessions that have no remaining open loops to the completed list,
570 /// without aborting any running loops.
571 ///
572 /// A session is eligible when every loop belonging to it has already received
573 /// an [`AgentEnd`][crate::AgentEvent::AgentEnd] event (i.e. it has no entry in
574 /// the internal open-loops map). Sessions that still have active loops are left
575 /// in place.
576 ///
577 /// This is intended for **periodic checkpointing** in production: save finished
578 /// sessions to disk while leaving in-flight agent runs untouched. In contrast,
579 /// [`flush`][Self::flush] first aborts all open loops and then promotes
580 /// everything.
581 ///
582 /// Returns the number of sessions that were promoted.
583 pub fn checkpoint(&mut self) -> usize {
584 // Collect session_ids that still have open loops.
585 let sessions_with_open_loops: Vec<String> = self
586 .open_loops
587 .values()
588 .map(|l| l.record.session_id.clone())
589 .collect();
590 // Promote sessions whose id is not in that set.
591 let promotable: Vec<String> = self
592 .open_sessions
593 .keys()
594 .filter(|sid| !sessions_with_open_loops.contains(sid))
595 .cloned()
596 .collect();
597 let count = promotable.len();
598 for sid in promotable {
599 if let Some(session) = self.open_sessions.remove(&sid) {
600 self.completed.push(session);
601 }
602 }
603 count
604 }
605
606 /// Drain all completed sessions out of the recorder (consuming them).
607 ///
608 /// Useful for periodic checkpointing. Call [`flush`][Self::flush] first
609 /// if you want to include in-progress sessions, or [`checkpoint`][Self::checkpoint]
610 /// to drain only fully-finished sessions without aborting active loops.
611 pub fn drain_completed(&mut self) -> Vec<Session> {
612 std::mem::take(&mut self.completed)
613 }
614
615 /// All sessions known to this recorder (completed and in-progress).
616 pub fn sessions(&self) -> impl Iterator<Item = &Session> {
617 self.completed.iter().chain(self.open_sessions.values())
618 }
619
620 /// Look up a session by `session_id`.
621 pub fn get_session(&self, session_id: &str) -> Option<&Session> {
622 self.completed
623 .iter()
624 .find(|s| s.session_id == session_id)
625 .or_else(|| self.open_sessions.get(session_id))
626 }
627
628 /// Look up an in-progress [`LoopRecord`] by `loop_id`.
629 pub fn current_loop(&self, loop_id: &str) -> Option<&LoopRecord> {
630 self.open_loops.get(loop_id).map(|o| &o.record)
631 }
632
633 // ── Private helpers ───────────────────────────────────────────────────
634
635 fn append_event(&mut self, loop_id: &str, event: AgentEvent) {
636 if let Some(open) = self.open_loops.get_mut(loop_id) {
637 let seq = open.next_seq;
638 open.next_seq += 1;
639 open.record.events.push(LoopEvent {
640 sequence: seq,
641 event,
642 });
643 }
644 }
645}