entelix_session/session_graph.rs
1//! `SessionGraph` — append-only durable audit log for one conversation
2//! thread (invariant 1: session is event `SSoT`).
3//!
4//! Distinct from the runtime `EventBus` (`entelix-core::events`) per F1
5//! mitigation: `EventBus` is fire-and-forget broadcast for hooks /
6//! observability, `SessionGraph` is the durable replay source.
7
8use chrono::Utc;
9use entelix_core::ir::{ContentPart, Message, Role};
10use serde::{Deserialize, Serialize};
11
12use crate::event::GraphEvent;
13
14/// Append-only event log for a single conversation thread.
15///
16/// `events` is the only first-class data — every higher-level view
17/// (current branch messages, checkpoint markers, warning summaries) is
18/// derived. Entries before `archival_watermark` may have been moved to
19/// cold storage; consumers should treat indices `< archival_watermark`
20/// as opaque.
21///
22/// `#[non_exhaustive]` so internal bookkeeping fields (e.g.
23/// `schema_version`, archival metadata) can be added without
24/// breaking downstream `SessionLog` impls. Construct via
25/// `SessionGraph::new(thread_id)`.
26#[derive(Clone, Debug, Serialize, Deserialize)]
27#[non_exhaustive]
28pub struct SessionGraph {
29 /// Conversation identifier this log belongs to.
30 pub thread_id: String,
31 /// All events in append order.
32 pub events: Vec<GraphEvent>,
33 /// Index above which events are guaranteed to still live in `events`.
34 /// Below the watermark, entries may have been pruned.
35 archival_watermark: usize,
36}
37
38impl SessionGraph {
39 /// Empty session bound to `thread_id`.
40 pub fn new(thread_id: impl Into<String>) -> Self {
41 Self {
42 thread_id: thread_id.into(),
43 events: Vec::new(),
44 archival_watermark: 0,
45 }
46 }
47
48 /// Append one event to the log. Returns the index assigned to it.
49 pub fn append(&mut self, event: GraphEvent) -> usize {
50 self.events.push(event);
51 self.events.len().saturating_sub(1)
52 }
53
54 /// Number of events currently in memory (excludes archived ranges).
55 pub const fn len(&self) -> usize {
56 self.events.len()
57 }
58
59 /// True when no events have been appended.
60 pub const fn is_empty(&self) -> bool {
61 self.events.is_empty()
62 }
63
64 /// Borrow the slice of events at indices `>= cursor`.
65 pub fn events_since(&self, cursor: usize) -> &[GraphEvent] {
66 self.events.get(cursor..).unwrap_or(&[])
67 }
68
69 /// Derive a value of arbitrary type `S` by folding every event in
70 /// the log through `reducer`, oldest-first. The closure is called
71 /// once per event with `(&mut state, &GraphEvent)` so it can both
72 /// inspect and mutate the accumulator.
73 ///
74 /// This is the closure form of "the audit log is the source of
75 /// truth": every domain that needs a derived view (message
76 /// transcript, error count per turn, custom analytics) walks the
77 /// log directly through this method rather than maintaining a
78 /// parallel projection that could diverge.
79 pub fn replay_into<S, R>(&self, initial: S, mut reducer: R) -> S
80 where
81 R: FnMut(&mut S, &GraphEvent),
82 {
83 let mut state = initial;
84 for event in &self.events {
85 reducer(&mut state, event);
86 }
87 state
88 }
89
90 /// Render the conversation as a `Vec<Message>` suitable for
91 /// `ChatModel::complete`. Only `UserMessage` and `AssistantMessage`
92 /// events contribute; tool events live inside the assistant's content
93 /// blocks, which the codec handles separately.
94 pub fn current_branch_messages(&self) -> Vec<Message> {
95 let mut out = Vec::new();
96 for event in &self.events {
97 match event {
98 GraphEvent::UserMessage { content, .. } => {
99 out.push(Message::new(Role::User, content.clone()));
100 }
101 GraphEvent::AssistantMessage { content, .. } => {
102 out.push(Message::new(Role::Assistant, content.clone()));
103 }
104 GraphEvent::ToolResult {
105 tool_use_id,
106 name,
107 content,
108 is_error,
109 ..
110 } => out.push(Message::new(
111 Role::Tool,
112 vec![ContentPart::ToolResult {
113 tool_use_id: tool_use_id.clone(),
114 name: name.clone(),
115 content: content.clone(),
116 is_error: *is_error,
117 cache_control: None,
118 provider_echoes: Vec::new(),
119 }],
120 )),
121 _ => {}
122 }
123 }
124 out
125 }
126
127 /// Fork: produce a fresh session whose events are a copy of this
128 /// session's events at indices `0..=branch_at`, bound to `new_thread_id`.
129 /// A `BranchCreated` event is appended **to the parent** to record the
130 /// fork point.
131 ///
132 /// Returns `None` if `branch_at` is out of range.
133 pub fn fork(&mut self, branch_at: usize, new_thread_id: impl Into<String>) -> Option<Self> {
134 let cloned_events = match self.events.get(..=branch_at) {
135 Some(slice) => slice.to_vec(),
136 None => return None,
137 };
138 let new_thread_id = new_thread_id.into();
139 self.append(GraphEvent::BranchCreated {
140 branch_id: new_thread_id.clone(),
141 parent_event: branch_at,
142 timestamp: Utc::now(),
143 });
144 Some(Self {
145 thread_id: new_thread_id,
146 events: cloned_events,
147 archival_watermark: 0,
148 })
149 }
150
151 /// Mark events at indices `< watermark` as archived. Does not actually
152 /// drop them from `events` — a persistence backend may purge them
153 /// during cold-storage migration. Watermarks are monotonic and
154 /// silently ignore non-advancing values: a `watermark` ≤ the
155 /// current archival point or beyond `events.len()` is a no-op
156 /// without error, mirroring [`crate::SessionLog::archive_before`].
157 pub const fn archive_before(&mut self, watermark: usize) {
158 if watermark > self.archival_watermark && watermark <= self.events.len() {
159 self.archival_watermark = watermark;
160 }
161 }
162
163 /// Effective archival cut-off.
164 pub const fn archival_watermark(&self) -> usize {
165 self.archival_watermark
166 }
167
168 /// Convenience iterator over `BranchCreated` events for tooling that
169 /// renders branch trees.
170 pub fn branch_events(&self) -> impl Iterator<Item = (&str, usize)> {
171 self.events.iter().filter_map(|e| match e {
172 GraphEvent::BranchCreated {
173 branch_id,
174 parent_event,
175 ..
176 } => Some((branch_id.as_str(), *parent_event)),
177 _ => None,
178 })
179 }
180}