Skip to main content

phi_core/session/
recorder.rs

1use super::helpers::*;
2use super::model::*;
3use crate::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7// ---------------------------------------------------------------------------
8// SessionRecorderConfig
9// ---------------------------------------------------------------------------
10
11// ── Session-level callback types (G2) ────────────────────────────────────
12
13/// Called when a new session is first created (first `AgentStart` with a new `session_id`).
14///
15/// Arguments: the newly created `Session` (header fields populated, no loops yet).
16/// Return `false` to reject the session (the recorder will still create it but mark it rejected).
17pub type BeforeTaskFn = Arc<dyn Fn(&Session) -> bool + Send + Sync>;
18
19/// Called when a session is finalized (via `flush()` or explicit close).
20///
21/// Arguments: the completed `Session` with all loops finalized.
22pub type AfterTaskFn = Arc<dyn Fn(&Session) + Send + Sync>;
23
24// ── SessionRecorderConfig ────────────────────────────────────────────────
25
26/// Configuration for [`SessionRecorder`].
27#[derive(Clone, Default)]
28pub struct SessionRecorderConfig {
29    /// Store `MessageUpdate` (streaming delta) events in [`LoopRecord::events`].
30    ///
31    /// Default: `false`. Streaming deltas are 100–1 000× more numerous than
32    /// final messages and are not needed for replay or branching. Enable only
33    /// for debugging or playback use cases.
34    pub include_streaming_events: bool,
35
36    /// Capture [`AnnotatedRequestPayload`] from [`AgentEvent::TurnRequest`]
37    /// onto each materialized [`Turn::request_payload`].
38    ///
39    /// Default: `false`. Each per-turn payload can be hundreds of KB (full
40    /// system prompt + message vec); enable only when reconstructing the
41    /// exact request the model received is required (debugging, golden-output
42    /// replay, cost analysis). The `TurnRequest` event is emitted regardless
43    /// of this flag — disabling it only suppresses persistence.
44    ///
45    /// Added in phi-core 0.9.0.
46    pub capture_turn_requests: bool,
47
48    /// Session-level callback: fires when a new session is first created (G2).
49    pub before_task: Option<BeforeTaskFn>,
50
51    /// Session-level callback: fires when a session is finalized (G2).
52    pub after_task: Option<AfterTaskFn>,
53}
54
55impl std::fmt::Debug for SessionRecorderConfig {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("SessionRecorderConfig")
58            .field("include_streaming_events", &self.include_streaming_events)
59            .field("capture_turn_requests", &self.capture_turn_requests)
60            .field("before_task", &self.before_task.as_ref().map(|_| "..."))
61            .field("after_task", &self.after_task.as_ref().map(|_| "..."))
62            .finish()
63    }
64}
65
66// ---------------------------------------------------------------------------
67// SessionRecorder internals
68// ---------------------------------------------------------------------------
69
70/// Partial state for a parallel-evaluation group, accumulated as `ParallelLoopStart`
71/// arrives before `ParallelLoopEnd`.
72struct PartialParallelGroup {
73    all_loop_ids: Vec<String>,
74}
75
76/// Partial turn state accumulated between `TurnStart` and `TurnEnd`.
77/// Finalized into a [`Turn`] when `TurnEnd` is received.
78struct PartialTurn {
79    turn_id: TurnId,
80    triggered_by: TurnTrigger,
81    started_at: chrono::DateTime<chrono::Utc>,
82    input_messages: Vec<AgentMessage>,
83    /// 0.9.0 — accumulated from [`AgentEvent::TurnRequest`] when
84    /// [`SessionRecorderConfig::capture_turn_requests`] is `true`.
85    request_payload: Option<AnnotatedRequestPayload>,
86}
87
88// ---------------------------------------------------------------------------
89// SessionRecorder
90// ---------------------------------------------------------------------------
91
92/// Records every [`AgentEvent`] into a structured tree of [`Session`]s and
93/// [`LoopRecord`]s.
94///
95/// Call [`on_event`][Self::on_event] for every event emitted on the agent's
96/// `tx` channel, then [`flush`][Self::flush] before shutdown or saving.
97///
98/// ## Session grouping
99///
100/// Sessions are keyed by `session_id`. Every `AgentStart` event that carries a
101/// `session_id` the recorder has not seen before opens a new [`Session`]; all
102/// subsequent loops with the same `session_id` are appended to that session.
103///
104/// **The recorder never rotates sessions on its own.** If you want a new session
105/// to start after a period of inactivity, call
106/// [`BasicAgent::check_and_rotate`][crate::BasicAgent::check_and_rotate] (or
107/// [`BasicAgent::new_session`][crate::BasicAgent::new_session]) before the next
108/// prompt. The next `AgentStart` will carry the new `session_id` and the recorder
109/// will open a fresh [`Session`] automatically, with
110/// [`SessionFormation::InactivityTimeout`] or [`SessionFormation::FirstLoop`]
111/// as the recorded reason.
112///
113/// ## Example
114///
115/// ```rust,no_run
116/// use phi_core::session::{SessionRecorder, SessionRecorderConfig};
117/// use phi_core::AgentEvent;
118///
119/// let mut recorder = SessionRecorder::new(SessionRecorderConfig::default());
120/// // Feed events as they arrive:
121/// // recorder.on_event(event);
122/// recorder.flush();
123/// ```
124pub struct SessionRecorder {
125    config: SessionRecorderConfig,
126
127    /// Completed sessions (all their loops are closed).
128    completed: Vec<Session>,
129
130    /// Sessions that still have open loops.
131    open_sessions: HashMap<String, Session>,
132
133    /// Loops currently executing (between AgentStart and AgentEnd).
134    open_loops: HashMap<String, OpenLoop>,
135
136    /// Parallel groups announced by ParallelLoopStart but not yet closed.
137    partial_groups: HashMap<String, PartialParallelGroup>,
138
139    /// Turns being accumulated between `TurnStart` and `TurnEnd`, keyed by `loop_id`.
140    partial_turns: HashMap<String, PartialTurn>,
141}
142
143impl SessionRecorder {
144    /// Create a new recorder with the given configuration.
145    pub fn new(config: SessionRecorderConfig) -> Self {
146        SessionRecorder {
147            config,
148            completed: Vec::new(),
149            open_sessions: HashMap::new(),
150            open_loops: HashMap::new(),
151            partial_groups: HashMap::new(),
152            partial_turns: HashMap::new(),
153        }
154    }
155
156    /// Feed one event into the recorder.
157    ///
158    /// Must be called for every event emitted on the agent's `tx` channel.
159    pub fn on_event(&mut self, event: AgentEvent) {
160        match &event {
161            // ── ParallelLoopStart ─────────────────────────────────────────
162            AgentEvent::ParallelLoopStart { loop_ids, .. } => {
163                for lid in loop_ids {
164                    // Pre-register a Pending record; will be promoted to Running when AgentStart arrives.
165                    let group_key = lid.clone();
166                    self.partial_groups
167                        .entry(group_key)
168                        .or_insert_with(|| PartialParallelGroup {
169                            all_loop_ids: loop_ids.clone(),
170                        });
171                    // We don't have agent_id / session_id yet — those arrive in AgentStart.
172                }
173            }
174
175            // ── AgentStart ────────────────────────────────────────────────
176            AgentEvent::AgentStart {
177                agent_id,
178                session_id,
179                loop_id,
180                parent_loop_id,
181                continuation_kind,
182                timestamp,
183                metadata,
184                config_snapshot,
185            } => {
186                // Ensure the session exists.
187                let now = *timestamp;
188                let is_new_session = !self.open_sessions.contains_key(session_id);
189                let session = self
190                    .open_sessions
191                    .entry(session_id.clone())
192                    .or_insert_with(|| Session {
193                        session_id: session_id.clone(),
194                        agent_id: agent_id.clone(),
195                        created_at: now,
196                        last_active_at: now,
197                        formation: SessionFormation::FirstLoop { timestamp: now },
198                        parent_spawn_ref: None,
199                        scope: SessionScope::Ephemeral,
200                        loops: Vec::new(),
201                    });
202                session.last_active_at = now;
203
204                // G2: fire before_task callback on new session creation
205                if is_new_session {
206                    if let Some(ref hook) = self.config.before_task {
207                        hook(session);
208                    }
209                }
210
211                // If parent_loop_id is set and belongs to a DIFFERENT session, this is a
212                // sub-agent spawn — record the inbound SpawnRef on this session.
213                if let Some(ref plid) = parent_loop_id {
214                    let parent_session_id = session_id_from_loop_id(plid);
215                    if parent_session_id != *session_id && session.parent_spawn_ref.is_none() {
216                        // We don't have the tool_call_id / tool_name here; those come from the parent's
217                        // ToolExecutionEnd. Set what we know; callers can enrich later if needed.
218                        session.parent_spawn_ref = Some(SpawnRef {
219                            parent_session_id,
220                            parent_loop_id: plid.clone(),
221                            tool_call_id: String::new(), // enriched when ChildLoopRef is processed
222                            tool_name: String::new(),
223                        });
224                    }
225                }
226
227                // Create the LoopRecord (Pending → Running).
228                let record = LoopRecord {
229                    loop_id: loop_id.clone(),
230                    session_id: session_id.clone(),
231                    agent_id: agent_id.clone(),
232                    parent_loop_id: parent_loop_id.clone(),
233                    continuation_kind: continuation_kind.clone(),
234                    started_at: now,
235                    ended_at: None,
236                    status: LoopStatus::Running,
237                    rejection: None,
238                    config: config_snapshot.clone(),
239                    messages: Vec::new(),
240                    turns: Vec::new(),
241                    usage: Usage::default(),
242                    metadata: metadata.clone(),
243                    events: Vec::new(),
244                    children_loop_ids: Vec::new(),
245                    child_loop_refs: Vec::new(),
246                    parallel_group: None,
247                    compaction_block: None,
248                };
249                let open = OpenLoop {
250                    record,
251                    next_seq: 0,
252                };
253                self.open_loops.insert(loop_id.clone(), open);
254                // Append AgentStart to event stream.
255                self.append_event(loop_id, event.clone());
256            }
257
258            // ── AgentEnd ──────────────────────────────────────────────────
259            AgentEvent::AgentEnd {
260                loop_id,
261                messages,
262                usage,
263                timestamp,
264                rejection,
265            } => {
266                self.append_event(loop_id, event.clone());
267                // Discard any orphaned partial turn for this loop.
268                self.partial_turns.remove(loop_id.as_str());
269                if let Some(mut open) = self.open_loops.remove(loop_id) {
270                    open.record.ended_at = Some(*timestamp);
271                    open.record.status = if rejection.is_some() {
272                        LoopStatus::Rejected
273                    } else {
274                        LoopStatus::Completed
275                    };
276                    open.record.rejection = rejection.clone();
277                    open.record.messages = messages.clone();
278                    open.record.usage = usage.clone();
279
280                    // Extract config snapshot from first assistant message.
281                    if open.record.config.is_none() {
282                        open.record.config = extract_config_snapshot(messages, loop_id);
283                    }
284
285                    let session_id = open.record.session_id.clone();
286                    let parent_loop_id = open.record.parent_loop_id.clone();
287
288                    // Link parent → child within same session.
289                    if let Some(ref plid) = parent_loop_id {
290                        // Check if parent is in the same session.
291                        let parent_in_session = self
292                            .open_sessions
293                            .get(&session_id)
294                            .map(|s| s.loops.iter().any(|l| &l.loop_id == plid))
295                            .unwrap_or(false);
296                        let parent_in_open = self.open_loops.contains_key(plid.as_str());
297
298                        if parent_in_session {
299                            if let Some(s) = self.open_sessions.get_mut(&session_id) {
300                                if let Some(p) = s.loops.iter_mut().find(|l| &l.loop_id == plid) {
301                                    if !p.children_loop_ids.contains(loop_id) {
302                                        p.children_loop_ids.push(loop_id.clone());
303                                    }
304                                }
305                            }
306                        } else if parent_in_open {
307                            if let Some(p) = self.open_loops.get_mut(plid.as_str()) {
308                                // Only link same-session children. Cross-session sub-agent
309                                // children are tracked via child_loop_refs / SpawnRef.
310                                if p.record.session_id == session_id
311                                    && !p.record.children_loop_ids.contains(loop_id)
312                                {
313                                    p.record.children_loop_ids.push(loop_id.clone());
314                                }
315                            }
316                        }
317                    }
318
319                    // Move into session.
320                    if let Some(session) = self.open_sessions.get_mut(&session_id) {
321                        session.loops.push(open.record);
322                    }
323                }
324            }
325
326            // ── TurnStart — begin accumulating a partial turn ────────────
327            AgentEvent::TurnStart {
328                loop_id,
329                turn_index,
330                timestamp,
331                triggered_by,
332            } => {
333                self.partial_turns.insert(
334                    loop_id.clone(),
335                    PartialTurn {
336                        turn_id: TurnId {
337                            loop_id: loop_id.clone(),
338                            turn_index: *turn_index,
339                        },
340                        triggered_by: triggered_by.clone(),
341                        started_at: *timestamp,
342                        input_messages: Vec::new(),
343                        request_payload: None,
344                    },
345                );
346                self.append_event(loop_id, event.clone());
347            }
348
349            // ── TurnRequest — opt-in capture of the assembled LLM payload ─
350            AgentEvent::TurnRequest {
351                loop_id, payload, ..
352            } => {
353                if self.config.capture_turn_requests {
354                    if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
355                        partial.request_payload = Some(payload.clone());
356                    }
357                }
358                self.append_event(loop_id, event.clone());
359            }
360
361            // ── MessageEnd — capture non-assistant messages as turn input ─
362            AgentEvent::MessageEnd {
363                loop_id, message, ..
364            } => {
365                if message.role() != "assistant" {
366                    if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
367                        partial.input_messages.push(message.clone());
368                    }
369                }
370                self.append_event(loop_id, event.clone());
371            }
372
373            // ── TurnEnd — finalize turn + extract config snapshot ─────────
374            AgentEvent::TurnEnd {
375                loop_id,
376                message,
377                usage,
378                timestamp,
379                tool_results,
380            } => {
381                self.append_event(loop_id, event.clone());
382
383                // Finalize the partial turn into a materialized Turn.
384                if let Some(partial) = self.partial_turns.remove(loop_id.as_str()) {
385                    let tid = Some(partial.turn_id.clone());
386                    let turn = Turn {
387                        turn_id: partial.turn_id,
388                        triggered_by: partial.triggered_by,
389                        usage: usage.clone(),
390                        input_messages: partial.input_messages,
391                        output_message: message.clone(),
392                        tool_results: tool_results
393                            .iter()
394                            .map(|m| AgentMessage::from(m.clone()).with_turn_id(tid.clone()))
395                            .collect(),
396                        started_at: partial.started_at,
397                        ended_at: *timestamp,
398                        request_payload: partial.request_payload,
399                    };
400                    if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
401                        open.record.turns.push(turn);
402                    }
403                }
404
405                // Extract config snapshot from assistant message.
406                if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
407                    if open.record.config.is_none() {
408                        open.record.config =
409                            extract_config_snapshot(std::slice::from_ref(message), loop_id);
410                    }
411                }
412            }
413
414            // ── ToolExecutionEnd — record child loop ref ──────────────────
415            AgentEvent::ToolExecutionEnd {
416                loop_id,
417                tool_call_id,
418                tool_name,
419                result,
420                // child_loop_id is also a top-level field on ToolExecutionEnd (mirrors
421                // result.child_loop_id for ergonomic pattern matching). We read from
422                // result.child_loop_id here so the ChildLoopRef is populated from the
423                // same authoritative source as ToolResult.
424                ..
425            } => {
426                self.append_event(loop_id, event.clone());
427                if let Some(child_lid) = &result.child_loop_id {
428                    if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
429                        let child_session_id = session_id_from_loop_id(child_lid);
430                        open.record.child_loop_refs.push(ChildLoopRef {
431                            tool_call_id: tool_call_id.clone(),
432                            tool_name: tool_name.clone(),
433                            child_loop_id: child_lid.clone(),
434                            child_session_id: child_session_id.clone(),
435                        });
436
437                        // Enrich child session's parent_spawn_ref with the tool details we now know.
438                        // The child may still be in open_sessions (common case) or already in
439                        // completed (if flush() was called between child AgentEnd and this event).
440                        // We check both to avoid a silent enrichment skip.
441                        let parent_session_id = open.record.session_id.clone();
442                        let parent_lid = loop_id.clone();
443                        let tc_id = tool_call_id.clone();
444                        let tn = tool_name.clone();
445                        let csl = child_session_id.clone();
446                        let enrich = move |session: &mut Session| {
447                            if let Some(ref mut sr) = session.parent_spawn_ref {
448                                if sr.tool_call_id.is_empty() {
449                                    sr.parent_session_id = parent_session_id;
450                                    sr.parent_loop_id = parent_lid;
451                                    sr.tool_call_id = tc_id;
452                                    sr.tool_name = tn;
453                                }
454                            }
455                        };
456                        if let Some(child_sess) = self.open_sessions.get_mut(&csl) {
457                            enrich(child_sess);
458                        } else if let Some(child_sess) =
459                            self.completed.iter_mut().find(|s| s.session_id == csl)
460                        {
461                            enrich(child_sess);
462                        }
463                    }
464                }
465            }
466
467            // ── ParallelLoopEnd ───────────────────────────────────────────
468            AgentEvent::ParallelLoopEnd {
469                selected_loop_id,
470                selected_config_index,
471                evaluation_usage,
472                ..
473            } => {
474                // Recover all_loop_ids from the partial_groups registered at ParallelLoopStart.
475                let all_loop_ids = self
476                    .partial_groups
477                    .get(selected_loop_id.as_str())
478                    .map(|pg| pg.all_loop_ids.clone())
479                    .unwrap_or_else(|| vec![selected_loop_id.clone()]);
480                let group = ParallelGroupRecord {
481                    all_loop_ids: all_loop_ids.clone(),
482                    selected_loop_id: selected_loop_id.clone(),
483                    selected_config_index: *selected_config_index,
484                    evaluation_usage: evaluation_usage.clone(),
485                    is_selected: false, // will be set per-record below
486                };
487
488                // Retroactively set ParallelGroupRecord on all branch LoopRecords.
489                for lid in &all_loop_ids {
490                    let is_selected = lid == selected_loop_id;
491                    let pg = ParallelGroupRecord {
492                        is_selected,
493                        ..group.clone()
494                    };
495
496                    // Check open_loops first (loop may not be closed yet).
497                    if let Some(open) = self.open_loops.get_mut(lid.as_str()) {
498                        open.record.parallel_group = Some(pg.clone());
499                    }
500
501                    // Also retroactively update already-closed loops in sessions.
502                    for session in self.open_sessions.values_mut() {
503                        if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
504                            lr.parallel_group = Some(pg.clone());
505                        }
506                    }
507                    for session in self.completed.iter_mut() {
508                        if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
509                            lr.parallel_group = Some(pg.clone());
510                        }
511                    }
512                }
513
514                // Clean up partial group entries.
515                for lid in &all_loop_ids {
516                    self.partial_groups.remove(lid.as_str());
517                }
518            }
519
520            // ── MessageUpdate — optional streaming events ─────────────────
521            AgentEvent::MessageUpdate { loop_id, .. } => {
522                if self.config.include_streaming_events {
523                    self.append_event(loop_id, event.clone());
524                }
525            }
526
527            // ── All other events — append to loop stream ──────────────────
528            other => {
529                if let Some(lid) = loop_id_of(other) {
530                    self.append_event(lid, event.clone());
531                }
532            }
533        }
534    }
535
536    /// Finalize all open [`LoopRecord`]s (status → [`LoopStatus::Aborted`]) and
537    /// move them into their sessions.
538    ///
539    /// Call before saving or on process shutdown.
540    pub fn flush(&mut self) {
541        // Discard orphaned partial turns (TurnStart received but no TurnEnd).
542        self.partial_turns.clear();
543
544        let loop_ids: Vec<String> = self.open_loops.keys().cloned().collect();
545        for lid in loop_ids {
546            if let Some(mut open) = self.open_loops.remove(&lid) {
547                open.record.status = LoopStatus::Aborted;
548                let session_id = open.record.session_id.clone();
549                if let Some(session) = self.open_sessions.get_mut(&session_id) {
550                    session.loops.push(open.record);
551                }
552            }
553        }
554        // Move fully-closed sessions from open_sessions to completed.
555        let session_ids: Vec<String> = self.open_sessions.keys().cloned().collect();
556        for sid in session_ids {
557            // A session is "complete" when all its loops have ended.
558            // Since we just flushed all open loops, every session is complete.
559            if let Some(session) = self.open_sessions.remove(&sid) {
560                // G2: fire after_task callback on session finalization
561                if let Some(ref hook) = self.config.after_task {
562                    hook(&session);
563                }
564                self.completed.push(session);
565            }
566        }
567    }
568
569    /// Promote sessions that have no remaining open loops to the completed list,
570    /// without aborting any running loops.
571    ///
572    /// A session is eligible when every loop belonging to it has already received
573    /// an [`AgentEnd`][crate::AgentEvent::AgentEnd] event (i.e. it has no entry in
574    /// the internal open-loops map). Sessions that still have active loops are left
575    /// in place.
576    ///
577    /// This is intended for **periodic checkpointing** in production: save finished
578    /// sessions to disk while leaving in-flight agent runs untouched. In contrast,
579    /// [`flush`][Self::flush] first aborts all open loops and then promotes
580    /// everything.
581    ///
582    /// Returns the number of sessions that were promoted.
583    pub fn checkpoint(&mut self) -> usize {
584        // Collect session_ids that still have open loops.
585        let sessions_with_open_loops: Vec<String> = self
586            .open_loops
587            .values()
588            .map(|l| l.record.session_id.clone())
589            .collect();
590        // Promote sessions whose id is not in that set.
591        let promotable: Vec<String> = self
592            .open_sessions
593            .keys()
594            .filter(|sid| !sessions_with_open_loops.contains(sid))
595            .cloned()
596            .collect();
597        let count = promotable.len();
598        for sid in promotable {
599            if let Some(session) = self.open_sessions.remove(&sid) {
600                self.completed.push(session);
601            }
602        }
603        count
604    }
605
606    /// Drain all completed sessions out of the recorder (consuming them).
607    ///
608    /// Useful for periodic checkpointing. Call [`flush`][Self::flush] first
609    /// if you want to include in-progress sessions, or [`checkpoint`][Self::checkpoint]
610    /// to drain only fully-finished sessions without aborting active loops.
611    pub fn drain_completed(&mut self) -> Vec<Session> {
612        std::mem::take(&mut self.completed)
613    }
614
615    /// All sessions known to this recorder (completed and in-progress).
616    pub fn sessions(&self) -> impl Iterator<Item = &Session> {
617        self.completed.iter().chain(self.open_sessions.values())
618    }
619
620    /// Look up a session by `session_id`.
621    pub fn get_session(&self, session_id: &str) -> Option<&Session> {
622        self.completed
623            .iter()
624            .find(|s| s.session_id == session_id)
625            .or_else(|| self.open_sessions.get(session_id))
626    }
627
628    /// Look up an in-progress [`LoopRecord`] by `loop_id`.
629    pub fn current_loop(&self, loop_id: &str) -> Option<&LoopRecord> {
630        self.open_loops.get(loop_id).map(|o| &o.record)
631    }
632
633    // ── Private helpers ───────────────────────────────────────────────────
634
635    fn append_event(&mut self, loop_id: &str, event: AgentEvent) {
636        if let Some(open) = self.open_loops.get_mut(loop_id) {
637            let seq = open.next_seq;
638            open.next_seq += 1;
639            open.record.events.push(LoopEvent {
640                sequence: seq,
641                event,
642            });
643        }
644    }
645}