Skip to main content

phi_core/session/
recorder.rs

1use super::helpers::*;
2use super::model::*;
3use crate::types::*;
4use std::collections::HashMap;
5use std::sync::Arc;
6
7// ---------------------------------------------------------------------------
8// SessionRecorderConfig
9// ---------------------------------------------------------------------------
10
11// ── Session-level callback types (G2) ────────────────────────────────────
12
13/// Called when a new session is first created (first `AgentStart` with a new `session_id`).
14///
15/// Arguments: the newly created `Session` (header fields populated, no loops yet).
16/// Return `false` to reject the session (the recorder will still create it but mark it rejected).
17pub type BeforeTaskFn = Arc<dyn Fn(&Session) -> bool + Send + Sync>;
18
19/// Called when a session is finalized (via `flush()` or explicit close).
20///
21/// Arguments: the completed `Session` with all loops finalized.
22pub type AfterTaskFn = Arc<dyn Fn(&Session) + Send + Sync>;
23
24// ── SessionRecorderConfig ────────────────────────────────────────────────
25
26/// Configuration for [`SessionRecorder`].
27#[derive(Clone, Default)]
28pub struct SessionRecorderConfig {
29    /// Store `MessageUpdate` (streaming delta) events in [`LoopRecord::events`].
30    ///
31    /// Default: `false`. Streaming deltas are 100–1 000× more numerous than
32    /// final messages and are not needed for replay or branching. Enable only
33    /// for debugging or playback use cases.
34    pub include_streaming_events: bool,
35
36    /// Session-level callback: fires when a new session is first created (G2).
37    pub before_task: Option<BeforeTaskFn>,
38
39    /// Session-level callback: fires when a session is finalized (G2).
40    pub after_task: Option<AfterTaskFn>,
41}
42
43impl std::fmt::Debug for SessionRecorderConfig {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("SessionRecorderConfig")
46            .field("include_streaming_events", &self.include_streaming_events)
47            .field("before_task", &self.before_task.as_ref().map(|_| "..."))
48            .field("after_task", &self.after_task.as_ref().map(|_| "..."))
49            .finish()
50    }
51}
52
53// ---------------------------------------------------------------------------
54// SessionRecorder internals
55// ---------------------------------------------------------------------------
56
57/// Partial state for a parallel-evaluation group, accumulated as `ParallelLoopStart`
58/// arrives before `ParallelLoopEnd`.
59struct PartialParallelGroup {
60    all_loop_ids: Vec<String>,
61}
62
63/// Partial turn state accumulated between `TurnStart` and `TurnEnd`.
64/// Finalized into a [`Turn`] when `TurnEnd` is received.
65struct PartialTurn {
66    turn_id: TurnId,
67    triggered_by: TurnTrigger,
68    started_at: chrono::DateTime<chrono::Utc>,
69    input_messages: Vec<AgentMessage>,
70}
71
72// ---------------------------------------------------------------------------
73// SessionRecorder
74// ---------------------------------------------------------------------------
75
76/// Records every [`AgentEvent`] into a structured tree of [`Session`]s and
77/// [`LoopRecord`]s.
78///
79/// Call [`on_event`][Self::on_event] for every event emitted on the agent's
80/// `tx` channel, then [`flush`][Self::flush] before shutdown or saving.
81///
82/// ## Session grouping
83///
84/// Sessions are keyed by `session_id`. Every `AgentStart` event that carries a
85/// `session_id` the recorder has not seen before opens a new [`Session`]; all
86/// subsequent loops with the same `session_id` are appended to that session.
87///
88/// **The recorder never rotates sessions on its own.** If you want a new session
89/// to start after a period of inactivity, call
90/// [`BasicAgent::check_and_rotate`][crate::BasicAgent::check_and_rotate] (or
91/// [`BasicAgent::new_session`][crate::BasicAgent::new_session]) before the next
92/// prompt. The next `AgentStart` will carry the new `session_id` and the recorder
93/// will open a fresh [`Session`] automatically, with
94/// [`SessionFormation::InactivityTimeout`] or [`SessionFormation::FirstLoop`]
95/// as the recorded reason.
96///
97/// ## Example
98///
99/// ```rust,no_run
100/// use phi_core::session::{SessionRecorder, SessionRecorderConfig};
101/// use phi_core::AgentEvent;
102///
103/// let mut recorder = SessionRecorder::new(SessionRecorderConfig::default());
104/// // Feed events as they arrive:
105/// // recorder.on_event(event);
106/// recorder.flush();
107/// ```
108pub struct SessionRecorder {
109    config: SessionRecorderConfig,
110
111    /// Completed sessions (all their loops are closed).
112    completed: Vec<Session>,
113
114    /// Sessions that still have open loops.
115    open_sessions: HashMap<String, Session>,
116
117    /// Loops currently executing (between AgentStart and AgentEnd).
118    open_loops: HashMap<String, OpenLoop>,
119
120    /// Parallel groups announced by ParallelLoopStart but not yet closed.
121    partial_groups: HashMap<String, PartialParallelGroup>,
122
123    /// Turns being accumulated between `TurnStart` and `TurnEnd`, keyed by `loop_id`.
124    partial_turns: HashMap<String, PartialTurn>,
125}
126
127impl SessionRecorder {
128    /// Create a new recorder with the given configuration.
129    pub fn new(config: SessionRecorderConfig) -> Self {
130        SessionRecorder {
131            config,
132            completed: Vec::new(),
133            open_sessions: HashMap::new(),
134            open_loops: HashMap::new(),
135            partial_groups: HashMap::new(),
136            partial_turns: HashMap::new(),
137        }
138    }
139
140    /// Feed one event into the recorder.
141    ///
142    /// Must be called for every event emitted on the agent's `tx` channel.
143    pub fn on_event(&mut self, event: AgentEvent) {
144        match &event {
145            // ── ParallelLoopStart ─────────────────────────────────────────
146            AgentEvent::ParallelLoopStart { loop_ids, .. } => {
147                for lid in loop_ids {
148                    // Pre-register a Pending record; will be promoted to Running when AgentStart arrives.
149                    let group_key = lid.clone();
150                    self.partial_groups
151                        .entry(group_key)
152                        .or_insert_with(|| PartialParallelGroup {
153                            all_loop_ids: loop_ids.clone(),
154                        });
155                    // We don't have agent_id / session_id yet — those arrive in AgentStart.
156                }
157            }
158
159            // ── AgentStart ────────────────────────────────────────────────
160            AgentEvent::AgentStart {
161                agent_id,
162                session_id,
163                loop_id,
164                parent_loop_id,
165                continuation_kind,
166                timestamp,
167                metadata,
168                config_snapshot,
169            } => {
170                // Ensure the session exists.
171                let now = *timestamp;
172                let is_new_session = !self.open_sessions.contains_key(session_id);
173                let session = self
174                    .open_sessions
175                    .entry(session_id.clone())
176                    .or_insert_with(|| Session {
177                        session_id: session_id.clone(),
178                        agent_id: agent_id.clone(),
179                        created_at: now,
180                        last_active_at: now,
181                        formation: SessionFormation::FirstLoop { timestamp: now },
182                        parent_spawn_ref: None,
183                        scope: SessionScope::Ephemeral,
184                        loops: Vec::new(),
185                    });
186                session.last_active_at = now;
187
188                // G2: fire before_task callback on new session creation
189                if is_new_session {
190                    if let Some(ref hook) = self.config.before_task {
191                        hook(session);
192                    }
193                }
194
195                // If parent_loop_id is set and belongs to a DIFFERENT session, this is a
196                // sub-agent spawn — record the inbound SpawnRef on this session.
197                if let Some(ref plid) = parent_loop_id {
198                    let parent_session_id = session_id_from_loop_id(plid);
199                    if parent_session_id != *session_id && session.parent_spawn_ref.is_none() {
200                        // We don't have the tool_call_id / tool_name here; those come from the parent's
201                        // ToolExecutionEnd. Set what we know; callers can enrich later if needed.
202                        session.parent_spawn_ref = Some(SpawnRef {
203                            parent_session_id,
204                            parent_loop_id: plid.clone(),
205                            tool_call_id: String::new(), // enriched when ChildLoopRef is processed
206                            tool_name: String::new(),
207                        });
208                    }
209                }
210
211                // Create the LoopRecord (Pending → Running).
212                let record = LoopRecord {
213                    loop_id: loop_id.clone(),
214                    session_id: session_id.clone(),
215                    agent_id: agent_id.clone(),
216                    parent_loop_id: parent_loop_id.clone(),
217                    continuation_kind: continuation_kind.clone(),
218                    started_at: now,
219                    ended_at: None,
220                    status: LoopStatus::Running,
221                    rejection: None,
222                    config: config_snapshot.clone(),
223                    messages: Vec::new(),
224                    turns: Vec::new(),
225                    usage: Usage::default(),
226                    metadata: metadata.clone(),
227                    events: Vec::new(),
228                    children_loop_ids: Vec::new(),
229                    child_loop_refs: Vec::new(),
230                    parallel_group: None,
231                    compaction_block: None,
232                };
233                let open = OpenLoop {
234                    record,
235                    next_seq: 0,
236                };
237                self.open_loops.insert(loop_id.clone(), open);
238                // Append AgentStart to event stream.
239                self.append_event(loop_id, event.clone());
240            }
241
242            // ── AgentEnd ──────────────────────────────────────────────────
243            AgentEvent::AgentEnd {
244                loop_id,
245                messages,
246                usage,
247                timestamp,
248                rejection,
249            } => {
250                self.append_event(loop_id, event.clone());
251                // Discard any orphaned partial turn for this loop.
252                self.partial_turns.remove(loop_id.as_str());
253                if let Some(mut open) = self.open_loops.remove(loop_id) {
254                    open.record.ended_at = Some(*timestamp);
255                    open.record.status = if rejection.is_some() {
256                        LoopStatus::Rejected
257                    } else {
258                        LoopStatus::Completed
259                    };
260                    open.record.rejection = rejection.clone();
261                    open.record.messages = messages.clone();
262                    open.record.usage = usage.clone();
263
264                    // Extract config snapshot from first assistant message.
265                    if open.record.config.is_none() {
266                        open.record.config = extract_config_snapshot(messages, loop_id);
267                    }
268
269                    let session_id = open.record.session_id.clone();
270                    let parent_loop_id = open.record.parent_loop_id.clone();
271
272                    // Link parent → child within same session.
273                    if let Some(ref plid) = parent_loop_id {
274                        // Check if parent is in the same session.
275                        let parent_in_session = self
276                            .open_sessions
277                            .get(&session_id)
278                            .map(|s| s.loops.iter().any(|l| &l.loop_id == plid))
279                            .unwrap_or(false);
280                        let parent_in_open = self.open_loops.contains_key(plid.as_str());
281
282                        if parent_in_session {
283                            if let Some(s) = self.open_sessions.get_mut(&session_id) {
284                                if let Some(p) = s.loops.iter_mut().find(|l| &l.loop_id == plid) {
285                                    if !p.children_loop_ids.contains(loop_id) {
286                                        p.children_loop_ids.push(loop_id.clone());
287                                    }
288                                }
289                            }
290                        } else if parent_in_open {
291                            if let Some(p) = self.open_loops.get_mut(plid.as_str()) {
292                                // Only link same-session children. Cross-session sub-agent
293                                // children are tracked via child_loop_refs / SpawnRef.
294                                if p.record.session_id == session_id
295                                    && !p.record.children_loop_ids.contains(loop_id)
296                                {
297                                    p.record.children_loop_ids.push(loop_id.clone());
298                                }
299                            }
300                        }
301                    }
302
303                    // Move into session.
304                    if let Some(session) = self.open_sessions.get_mut(&session_id) {
305                        session.loops.push(open.record);
306                    }
307                }
308            }
309
310            // ── TurnStart — begin accumulating a partial turn ────────────
311            AgentEvent::TurnStart {
312                loop_id,
313                turn_index,
314                timestamp,
315                triggered_by,
316            } => {
317                self.partial_turns.insert(
318                    loop_id.clone(),
319                    PartialTurn {
320                        turn_id: TurnId {
321                            loop_id: loop_id.clone(),
322                            turn_index: *turn_index,
323                        },
324                        triggered_by: triggered_by.clone(),
325                        started_at: *timestamp,
326                        input_messages: Vec::new(),
327                    },
328                );
329                self.append_event(loop_id, event.clone());
330            }
331
332            // ── MessageEnd — capture non-assistant messages as turn input ─
333            AgentEvent::MessageEnd {
334                loop_id, message, ..
335            } => {
336                if message.role() != "assistant" {
337                    if let Some(partial) = self.partial_turns.get_mut(loop_id.as_str()) {
338                        partial.input_messages.push(message.clone());
339                    }
340                }
341                self.append_event(loop_id, event.clone());
342            }
343
344            // ── TurnEnd — finalize turn + extract config snapshot ─────────
345            AgentEvent::TurnEnd {
346                loop_id,
347                message,
348                usage,
349                timestamp,
350                tool_results,
351            } => {
352                self.append_event(loop_id, event.clone());
353
354                // Finalize the partial turn into a materialized Turn.
355                if let Some(partial) = self.partial_turns.remove(loop_id.as_str()) {
356                    let tid = Some(partial.turn_id.clone());
357                    let turn = Turn {
358                        turn_id: partial.turn_id,
359                        triggered_by: partial.triggered_by,
360                        usage: usage.clone(),
361                        input_messages: partial.input_messages,
362                        output_message: message.clone(),
363                        tool_results: tool_results
364                            .iter()
365                            .map(|m| AgentMessage::from(m.clone()).with_turn_id(tid.clone()))
366                            .collect(),
367                        started_at: partial.started_at,
368                        ended_at: *timestamp,
369                    };
370                    if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
371                        open.record.turns.push(turn);
372                    }
373                }
374
375                // Extract config snapshot from assistant message.
376                if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
377                    if open.record.config.is_none() {
378                        open.record.config =
379                            extract_config_snapshot(std::slice::from_ref(message), loop_id);
380                    }
381                }
382            }
383
384            // ── ToolExecutionEnd — record child loop ref ──────────────────
385            AgentEvent::ToolExecutionEnd {
386                loop_id,
387                tool_call_id,
388                tool_name,
389                result,
390                // child_loop_id is also a top-level field on ToolExecutionEnd (mirrors
391                // result.child_loop_id for ergonomic pattern matching). We read from
392                // result.child_loop_id here so the ChildLoopRef is populated from the
393                // same authoritative source as ToolResult.
394                ..
395            } => {
396                self.append_event(loop_id, event.clone());
397                if let Some(child_lid) = &result.child_loop_id {
398                    if let Some(open) = self.open_loops.get_mut(loop_id.as_str()) {
399                        let child_session_id = session_id_from_loop_id(child_lid);
400                        open.record.child_loop_refs.push(ChildLoopRef {
401                            tool_call_id: tool_call_id.clone(),
402                            tool_name: tool_name.clone(),
403                            child_loop_id: child_lid.clone(),
404                            child_session_id: child_session_id.clone(),
405                        });
406
407                        // Enrich child session's parent_spawn_ref with the tool details we now know.
408                        // The child may still be in open_sessions (common case) or already in
409                        // completed (if flush() was called between child AgentEnd and this event).
410                        // We check both to avoid a silent enrichment skip.
411                        let parent_session_id = open.record.session_id.clone();
412                        let parent_lid = loop_id.clone();
413                        let tc_id = tool_call_id.clone();
414                        let tn = tool_name.clone();
415                        let csl = child_session_id.clone();
416                        let enrich = move |session: &mut Session| {
417                            if let Some(ref mut sr) = session.parent_spawn_ref {
418                                if sr.tool_call_id.is_empty() {
419                                    sr.parent_session_id = parent_session_id;
420                                    sr.parent_loop_id = parent_lid;
421                                    sr.tool_call_id = tc_id;
422                                    sr.tool_name = tn;
423                                }
424                            }
425                        };
426                        if let Some(child_sess) = self.open_sessions.get_mut(&csl) {
427                            enrich(child_sess);
428                        } else if let Some(child_sess) =
429                            self.completed.iter_mut().find(|s| s.session_id == csl)
430                        {
431                            enrich(child_sess);
432                        }
433                    }
434                }
435            }
436
437            // ── ParallelLoopEnd ───────────────────────────────────────────
438            AgentEvent::ParallelLoopEnd {
439                selected_loop_id,
440                selected_config_index,
441                evaluation_usage,
442                ..
443            } => {
444                // Recover all_loop_ids from the partial_groups registered at ParallelLoopStart.
445                let all_loop_ids = self
446                    .partial_groups
447                    .get(selected_loop_id.as_str())
448                    .map(|pg| pg.all_loop_ids.clone())
449                    .unwrap_or_else(|| vec![selected_loop_id.clone()]);
450                let group = ParallelGroupRecord {
451                    all_loop_ids: all_loop_ids.clone(),
452                    selected_loop_id: selected_loop_id.clone(),
453                    selected_config_index: *selected_config_index,
454                    evaluation_usage: evaluation_usage.clone(),
455                    is_selected: false, // will be set per-record below
456                };
457
458                // Retroactively set ParallelGroupRecord on all branch LoopRecords.
459                for lid in &all_loop_ids {
460                    let is_selected = lid == selected_loop_id;
461                    let pg = ParallelGroupRecord {
462                        is_selected,
463                        ..group.clone()
464                    };
465
466                    // Check open_loops first (loop may not be closed yet).
467                    if let Some(open) = self.open_loops.get_mut(lid.as_str()) {
468                        open.record.parallel_group = Some(pg.clone());
469                    }
470
471                    // Also retroactively update already-closed loops in sessions.
472                    for session in self.open_sessions.values_mut() {
473                        if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
474                            lr.parallel_group = Some(pg.clone());
475                        }
476                    }
477                    for session in self.completed.iter_mut() {
478                        if let Some(lr) = session.loops.iter_mut().find(|l| &l.loop_id == lid) {
479                            lr.parallel_group = Some(pg.clone());
480                        }
481                    }
482                }
483
484                // Clean up partial group entries.
485                for lid in &all_loop_ids {
486                    self.partial_groups.remove(lid.as_str());
487                }
488            }
489
490            // ── MessageUpdate — optional streaming events ─────────────────
491            AgentEvent::MessageUpdate { loop_id, .. } => {
492                if self.config.include_streaming_events {
493                    self.append_event(loop_id, event.clone());
494                }
495            }
496
497            // ── All other events — append to loop stream ──────────────────
498            other => {
499                if let Some(lid) = loop_id_of(other) {
500                    self.append_event(lid, event.clone());
501                }
502            }
503        }
504    }
505
506    /// Finalize all open [`LoopRecord`]s (status → [`LoopStatus::Aborted`]) and
507    /// move them into their sessions.
508    ///
509    /// Call before saving or on process shutdown.
510    pub fn flush(&mut self) {
511        // Discard orphaned partial turns (TurnStart received but no TurnEnd).
512        self.partial_turns.clear();
513
514        let loop_ids: Vec<String> = self.open_loops.keys().cloned().collect();
515        for lid in loop_ids {
516            if let Some(mut open) = self.open_loops.remove(&lid) {
517                open.record.status = LoopStatus::Aborted;
518                let session_id = open.record.session_id.clone();
519                if let Some(session) = self.open_sessions.get_mut(&session_id) {
520                    session.loops.push(open.record);
521                }
522            }
523        }
524        // Move fully-closed sessions from open_sessions to completed.
525        let session_ids: Vec<String> = self.open_sessions.keys().cloned().collect();
526        for sid in session_ids {
527            // A session is "complete" when all its loops have ended.
528            // Since we just flushed all open loops, every session is complete.
529            if let Some(session) = self.open_sessions.remove(&sid) {
530                // G2: fire after_task callback on session finalization
531                if let Some(ref hook) = self.config.after_task {
532                    hook(&session);
533                }
534                self.completed.push(session);
535            }
536        }
537    }
538
539    /// Promote sessions that have no remaining open loops to the completed list,
540    /// without aborting any running loops.
541    ///
542    /// A session is eligible when every loop belonging to it has already received
543    /// an [`AgentEnd`][crate::AgentEvent::AgentEnd] event (i.e. it has no entry in
544    /// the internal open-loops map). Sessions that still have active loops are left
545    /// in place.
546    ///
547    /// This is intended for **periodic checkpointing** in production: save finished
548    /// sessions to disk while leaving in-flight agent runs untouched. In contrast,
549    /// [`flush`][Self::flush] first aborts all open loops and then promotes
550    /// everything.
551    ///
552    /// Returns the number of sessions that were promoted.
553    pub fn checkpoint(&mut self) -> usize {
554        // Collect session_ids that still have open loops.
555        let sessions_with_open_loops: Vec<String> = self
556            .open_loops
557            .values()
558            .map(|l| l.record.session_id.clone())
559            .collect();
560        // Promote sessions whose id is not in that set.
561        let promotable: Vec<String> = self
562            .open_sessions
563            .keys()
564            .filter(|sid| !sessions_with_open_loops.contains(sid))
565            .cloned()
566            .collect();
567        let count = promotable.len();
568        for sid in promotable {
569            if let Some(session) = self.open_sessions.remove(&sid) {
570                self.completed.push(session);
571            }
572        }
573        count
574    }
575
576    /// Drain all completed sessions out of the recorder (consuming them).
577    ///
578    /// Useful for periodic checkpointing. Call [`flush`][Self::flush] first
579    /// if you want to include in-progress sessions, or [`checkpoint`][Self::checkpoint]
580    /// to drain only fully-finished sessions without aborting active loops.
581    pub fn drain_completed(&mut self) -> Vec<Session> {
582        std::mem::take(&mut self.completed)
583    }
584
585    /// All sessions known to this recorder (completed and in-progress).
586    pub fn sessions(&self) -> impl Iterator<Item = &Session> {
587        self.completed.iter().chain(self.open_sessions.values())
588    }
589
590    /// Look up a session by `session_id`.
591    pub fn get_session(&self, session_id: &str) -> Option<&Session> {
592        self.completed
593            .iter()
594            .find(|s| s.session_id == session_id)
595            .or_else(|| self.open_sessions.get(session_id))
596    }
597
598    /// Look up an in-progress [`LoopRecord`] by `loop_id`.
599    pub fn current_loop(&self, loop_id: &str) -> Option<&LoopRecord> {
600        self.open_loops.get(loop_id).map(|o| &o.record)
601    }
602
603    // ── Private helpers ───────────────────────────────────────────────────
604
605    fn append_event(&mut self, loop_id: &str, event: AgentEvent) {
606        if let Some(open) = self.open_loops.get_mut(loop_id) {
607            let seq = open.next_seq;
608            open.next_seq += 1;
609            open.record.events.push(LoopEvent {
610                sequence: seq,
611                event,
612            });
613        }
614    }
615}