Skip to main content

entelix_session/
session_graph.rs

1//! `SessionGraph` — append-only durable audit log for one conversation
2//! thread (invariant 1: session is event `SSoT`).
3//!
4//! Distinct from the runtime `EventBus` (`entelix-core::events`) per F1
5//! mitigation: `EventBus` is fire-and-forget broadcast for hooks /
6//! observability, `SessionGraph` is the durable replay source.
7
8use chrono::Utc;
9use entelix_core::ir::{ContentPart, Message, Role};
10use serde::{Deserialize, Serialize};
11
12use crate::event::GraphEvent;
13
14/// Append-only event log for a single conversation thread.
15///
16/// `events` is the only first-class data — every higher-level view
17/// (current branch messages, checkpoint markers, warning summaries) is
18/// derived. Entries before `archival_watermark` may have been moved to
19/// cold storage; consumers should treat indices `< archival_watermark`
20/// as opaque.
21///
22/// `#[non_exhaustive]` so internal bookkeeping fields (e.g.
23/// `schema_version`, archival metadata) can be added without
24/// breaking downstream `SessionLog` impls. Construct via
25/// `SessionGraph::new(thread_id)`.
26#[derive(Clone, Debug, Serialize, Deserialize)]
27#[non_exhaustive]
28pub struct SessionGraph {
29    /// Conversation identifier this log belongs to.
30    pub thread_id: String,
31    /// All events in append order.
32    pub events: Vec<GraphEvent>,
33    /// Index above which events are guaranteed to still live in `events`.
34    /// Below the watermark, entries may have been pruned.
35    archival_watermark: usize,
36}
37
38impl SessionGraph {
39    /// Empty session bound to `thread_id`.
40    pub fn new(thread_id: impl Into<String>) -> Self {
41        Self {
42            thread_id: thread_id.into(),
43            events: Vec::new(),
44            archival_watermark: 0,
45        }
46    }
47
48    /// Append one event to the log. Returns the index assigned to it.
49    pub fn append(&mut self, event: GraphEvent) -> usize {
50        self.events.push(event);
51        self.events.len().saturating_sub(1)
52    }
53
54    /// Number of events currently in memory (excludes archived ranges).
55    pub const fn len(&self) -> usize {
56        self.events.len()
57    }
58
59    /// True when no events have been appended.
60    pub const fn is_empty(&self) -> bool {
61        self.events.is_empty()
62    }
63
64    /// Borrow the slice of events at indices `>= cursor`.
65    pub fn events_since(&self, cursor: usize) -> &[GraphEvent] {
66        self.events.get(cursor..).unwrap_or(&[])
67    }
68
69    /// Derive a value of arbitrary type `S` by folding every event in
70    /// the log through `reducer`, oldest-first. The closure is called
71    /// once per event with `(&mut state, &GraphEvent)` so it can both
72    /// inspect and mutate the accumulator.
73    ///
74    /// This is the closure form of "the audit log is the source of
75    /// truth": every domain that needs a derived view (message
76    /// transcript, error count per turn, custom analytics) walks the
77    /// log directly through this method rather than maintaining a
78    /// parallel projection that could diverge.
79    pub fn replay_into<S, R>(&self, initial: S, mut reducer: R) -> S
80    where
81        R: FnMut(&mut S, &GraphEvent),
82    {
83        let mut state = initial;
84        for event in &self.events {
85            reducer(&mut state, event);
86        }
87        state
88    }
89
90    /// Render the conversation as a `Vec<Message>` suitable for
91    /// `ChatModel::complete`. Only `UserMessage` and `AssistantMessage`
92    /// events contribute; tool events live inside the assistant's content
93    /// blocks, which the codec handles separately.
94    pub fn current_branch_messages(&self) -> Vec<Message> {
95        let mut out = Vec::new();
96        for event in &self.events {
97            match event {
98                GraphEvent::UserMessage { content, .. } => {
99                    out.push(Message::new(Role::User, content.clone()));
100                }
101                GraphEvent::AssistantMessage { content, .. } => {
102                    out.push(Message::new(Role::Assistant, content.clone()));
103                }
104                GraphEvent::ToolResult {
105                    tool_use_id,
106                    name,
107                    content,
108                    is_error,
109                    ..
110                } => out.push(Message::new(
111                    Role::Tool,
112                    vec![ContentPart::ToolResult {
113                        tool_use_id: tool_use_id.clone(),
114                        name: name.clone(),
115                        content: content.clone(),
116                        is_error: *is_error,
117                        cache_control: None,
118                        provider_echoes: Vec::new(),
119                    }],
120                )),
121                _ => {}
122            }
123        }
124        out
125    }
126
127    /// Fork: produce a fresh session whose events are a copy of this
128    /// session's events at indices `0..=branch_at`, bound to `new_thread_id`.
129    /// A `BranchCreated` event is appended **to the parent** to record the
130    /// fork point.
131    ///
132    /// Returns `None` if `branch_at` is out of range.
133    pub fn fork(&mut self, branch_at: usize, new_thread_id: impl Into<String>) -> Option<Self> {
134        let cloned_events = match self.events.get(..=branch_at) {
135            Some(slice) => slice.to_vec(),
136            None => return None,
137        };
138        let new_thread_id = new_thread_id.into();
139        self.append(GraphEvent::BranchCreated {
140            branch_id: new_thread_id.clone(),
141            parent_event: branch_at,
142            timestamp: Utc::now(),
143        });
144        Some(Self {
145            thread_id: new_thread_id,
146            events: cloned_events,
147            archival_watermark: 0,
148        })
149    }
150
151    /// Mark events at indices `< watermark` as archived. Does not actually
152    /// drop them from `events` — a persistence backend may purge them
153    /// during cold-storage migration. Watermarks are monotonic and
154    /// silently ignore non-advancing values: a `watermark` ≤ the
155    /// current archival point or beyond `events.len()` is a no-op
156    /// without error, mirroring [`crate::SessionLog::archive_before`].
157    pub const fn archive_before(&mut self, watermark: usize) {
158        if watermark > self.archival_watermark && watermark <= self.events.len() {
159            self.archival_watermark = watermark;
160        }
161    }
162
163    /// Effective archival cut-off.
164    pub const fn archival_watermark(&self) -> usize {
165        self.archival_watermark
166    }
167
168    /// Convenience iterator over `BranchCreated` events for tooling that
169    /// renders branch trees.
170    pub fn branch_events(&self) -> impl Iterator<Item = (&str, usize)> {
171        self.events.iter().filter_map(|e| match e {
172            GraphEvent::BranchCreated {
173                branch_id,
174                parent_event,
175                ..
176            } => Some((branch_id.as_str(), *parent_event)),
177            _ => None,
178        })
179    }
180}