ai_memory/transcripts/replay.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 L2-4 — transcript replay extended to the reflection union.
5//!
6//! ## What this module owns
7//!
8//! `memory_replay` (v0.7.0 I4) originally returned the transcripts linked
9//! to a *single* memory id via the I2 join table. L2-4 (issue #669)
10//! generalises the read: when the memory is a `Reflection`
11//! (`memory_kind = 'reflection'`, L1-1), the replay must reconstruct the
12//! **union** of every transcript reachable from the reflection by
13//! walking `reflects_on` edges backward to the source observations.
14//!
15//! The walk is BFS over the `reflects_on` adjacency (source_id ->
16//! target_id). Each visited memory contributes its own
17//! `transcripts_for_memory` rows; the final entry list is deduplicated
18//! by transcript id (first-seen wins so the closest ancestor's span
19//! metadata is preferred when the same transcript is reachable through
20//! more than one path) and sorted by `created_at` ascending. Ties on
21//! `created_at` fall back to `transcript_id` so two transcripts minted
22//! in the same RFC3339 millisecond still produce a deterministic
23//! ordering — same tie-break the I4 handler used.
24//!
25//! ## Depth contract
26//!
27//! Callers may cap the BFS at `depth` hops via the `depth` parameter
28//! threaded through `memory_replay(depth=N)`. `None` (the default)
29//! means "walk the full chain" — every transitively-reachable ancestor.
30//! `Some(0)` means "self only" (skip the union; same shape as the
31//! pre-L2-4 I4 read). `Some(N>=1)` means "self plus N hops of
32//! ancestors". This matches the depth-counting convention used by
33//! `reflection_depth` on the memory row.
34//!
35//! ## Non-Reflection passthrough
36//!
37//! When the input memory is `MemoryKind::Observation` (or the row
38//! cannot be loaded — substrate may have GC'd it between the
39//! permission check and now), the walk is skipped entirely and the
40//! result is exactly what `transcripts_for_memory` returns for the
41//! single memory id. This is the explicit "non-reflection
42//! `memory_replay` MUST be unchanged" acceptance criterion from #669.
43//!
44//! ## Cycle safety
45//!
46//! L1-2 (#659) already refuses to add a `reflects_on` edge that
47//! would close a cycle. The walk here still maintains a `visited`
48//! set on `memory_id` so a stale cycle that slipped past the
49//! anti-cycle guard (e.g. via direct SQL writes from a legacy
50//! migration) cannot induce an infinite loop. Cycle detection is
51//! a hard safety net, not a correctness shortcut.
52
53use anyhow::{Context, Result};
54use rusqlite::{Connection, params};
55use std::collections::{HashSet, VecDeque};
56
57use crate::transcripts::storage::{
58 Transcript, TranscriptLink, fetch_metadata, transcripts_for_memory,
59};
60
61/// One row of the L2-4 union replay stream. Carries both the transcript
62/// metadata (compressed/original size, namespace, created_at) and the
63/// I2 link span — plus the `memory_id` the link was discovered through,
64/// which the I4 handler returns to operators so they can see which
65/// ancestor in the chain contributed each transcript.
66#[derive(Debug, Clone)]
67pub struct ReplayEntry {
68 /// Memory id the transcript link is anchored to. For a non-
69 /// reflection replay this always equals the input `memory_id`; for
70 /// a reflection union it can be any ancestor reachable within
71 /// `depth` hops.
72 pub memory_id: String,
73 /// I2 link row, including the span offsets (may be `None` for
74 /// whole-transcript provenance).
75 pub link: TranscriptLink,
76 /// Transcript metadata. The blob is NOT loaded — the I4 handler
77 /// decompresses on demand under the verbose / truncation rule.
78 pub meta: Transcript,
79}
80
81/// Replay a memory's transcripts. When the memory is a reflection,
82/// gather the union of every transcript reachable by walking
83/// `reflects_on` edges to `depth` hops.
84///
85/// `depth = None` walks the full chain; `Some(N)` caps the BFS at N
86/// hops from the input memory (a hop is one `reflects_on` edge).
87///
88/// # Errors
89///
90/// Returns an error when the underlying SQLite reads fail (disk I/O,
91/// schema drift, lock contention). The walk itself is resilient to
92/// missing rows — an unreachable id in the chain (pruned by GC, etc.)
93/// is silently dropped, the same shape the I4 handler already used for
94/// dangling transcripts.
95pub fn replay_transcript_union(
96 conn: &Connection,
97 memory_id: &str,
98 depth: Option<u32>,
99) -> Result<Vec<ReplayEntry>> {
100 // Resolve the root memory's kind. A failed lookup (row missing) is
101 // not an error here — the I4 handler decides whether to surface
102 // "no transcripts" vs "memory not found"; the substrate read just
103 // returns an empty union in that case. `crate::db::get` returns
104 // `Ok(None)` cleanly when the id does not exist.
105 let kind = match crate::db::get(conn, memory_id)? {
106 Some(m) => m.memory_kind,
107 None => return Ok(Vec::new()),
108 };
109
110 let mut visited: HashSet<String> = HashSet::new();
111 let mut frontier: VecDeque<(String, u32)> = VecDeque::new();
112 let mut union_memory_ids: Vec<String> = Vec::new();
113
114 visited.insert(memory_id.to_string());
115 union_memory_ids.push(memory_id.to_string());
116 frontier.push_back((memory_id.to_string(), 0));
117
118 // Only Reflection-kind inputs trigger the BFS over reflects_on.
119 // Observations short-circuit to the single-memory read (the
120 // pre-L2-4 I4 behaviour) — the acceptance contract on #669 pins
121 // "existing memory_replay for non-reflection memories MUST be
122 // unchanged". A reflection with `depth=Some(0)` also takes this
123 // path (self-only union).
124 let walk =
125 matches!(kind, crate::models::MemoryKind::Reflection) && depth.is_none_or(|d| d >= 1);
126
127 if walk {
128 while let Some((current, hop)) = frontier.pop_front() {
129 // Stop expanding once we hit the configured depth cap.
130 // `None` (full chain) folds into `is_some_and` returning
131 // false so we always expand.
132 if depth.is_some_and(|cap| hop >= cap) {
133 continue;
134 }
135 for next in fetch_reflects_on_targets(conn, ¤t)? {
136 if visited.insert(next.clone()) {
137 union_memory_ids.push(next.clone());
138 frontier.push_back((next, hop + 1));
139 }
140 }
141 }
142 }
143
144 // Gather every transcript link anchored to any visited memory.
145 // Deduplicate by transcript_id — the SAME transcript can be linked
146 // to multiple memories (legitimate fan-in: a single conversation
147 // produced both an observation and the reflection that summarises
148 // it). First-seen wins so the closest ancestor's span metadata is
149 // preferred; BFS order means "closest first" matches the
150 // chronological intuition of the walk.
151 let mut entries: Vec<ReplayEntry> = Vec::new();
152 let mut seen_transcripts: HashSet<String> = HashSet::new();
153 for mid in &union_memory_ids {
154 let links = transcripts_for_memory(conn, mid)
155 .with_context(|| format!("transcripts_for_memory({mid}) failed"))?;
156 for link in links {
157 if !seen_transcripts.insert(link.transcript_id.clone()) {
158 continue;
159 }
160 match fetch_metadata(conn, &link.transcript_id)? {
161 Some(meta) => entries.push(ReplayEntry {
162 memory_id: mid.clone(),
163 link,
164 meta,
165 }),
166 None => {
167 // Transcript pruned out from under us — drop
168 // silently, same shape as the I4 handler.
169 tracing::warn!(
170 target: "transcripts.replay",
171 "dangling transcript_id {} for memory {mid}",
172 link.transcript_id
173 );
174 }
175 }
176 }
177 }
178
179 // Chronological sort, with transcript_id as the deterministic
180 // tie-breaker (matches the I4 handler's pre-L2-4 ordering).
181 entries.sort_by(|a, b| {
182 a.meta
183 .created_at
184 .cmp(&b.meta.created_at)
185 .then_with(|| a.meta.id.cmp(&b.meta.id))
186 });
187
188 Ok(entries)
189}
190
191/// Read every `target_id` reachable from `source_id` via a
192/// `reflects_on` edge. Returns ids in `target_id` order so the BFS
193/// expansion is deterministic regardless of insertion order at the
194/// SQL layer.
195fn fetch_reflects_on_targets(conn: &Connection, source_id: &str) -> Result<Vec<String>> {
196 let mut stmt = conn
197 .prepare(
198 "SELECT target_id FROM memory_links
199 WHERE source_id = ?1 AND relation = 'reflects_on'
200 ORDER BY target_id",
201 )
202 .context("PREPARE reflects_on edge scan failed")?;
203 let rows = stmt
204 .query_map(params![source_id], |r| r.get::<_, String>(0))
205 .context("QUERY reflects_on edge scan failed")?;
206 rows.collect::<rusqlite::Result<Vec<_>>>()
207 .context("decode reflects_on edge rows")
208}
209
210// -----------------------------------------------------------------
211// Unit tests — exercise the BFS, depth cap, cycle safety, and the
212// non-reflection passthrough on a `:memory:` SQLite with the full
213// production schema applied via `crate::db::open`.
214// -----------------------------------------------------------------
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::transcripts::storage::{link_transcript, store};
219 use chrono::Utc;
220 use rusqlite::Connection;
221
222 fn fresh_db() -> Connection {
223 crate::db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
224 }
225
226 /// Insert a memory row with the given id, namespace, and
227 /// `memory_kind`. `created_at` is "now" so the substrate's CHECK
228 /// triggers accept it. The fixture keeps the minimal column set —
229 /// the replay walker only reads `memory_kind` from the row, plus
230 /// the `memory_links` it edges out to.
231 fn insert_memory(conn: &Connection, id: &str, namespace: &str, kind: &str) {
232 let now = Utc::now().to_rfc3339();
233 conn.execute(
234 "INSERT INTO memories (
235 id, tier, namespace, title, content, created_at, updated_at, memory_kind
236 ) VALUES (?1, 'short', ?2, ?3, 'body', ?4, ?4, ?5)",
237 rusqlite::params![id, namespace, format!("title-{id}"), now, kind],
238 )
239 .expect("insert test memory");
240 }
241
242 /// Write a `reflects_on` edge in `memory_links`. Minimal column
243 /// set — the relation CHECK constraint covers the value, the
244 /// `created_at` column is required.
245 fn link_reflects_on(conn: &Connection, source: &str, target: &str) {
246 let now = Utc::now().to_rfc3339();
247 conn.execute(
248 "INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from)
249 VALUES (?1, ?2, 'reflects_on', ?3, ?3)",
250 rusqlite::params![source, target, now],
251 )
252 .expect("insert reflects_on link");
253 }
254
255 /// Backdate a transcript's `created_at` so the chronological sort
256 /// in the union has a deterministic earlier timestamp to anchor on.
257 fn backdate_transcript(conn: &Connection, id: &str, ts: &str) {
258 conn.execute(
259 "UPDATE memory_transcripts SET created_at = ?1 WHERE id = ?2",
260 rusqlite::params![ts, id],
261 )
262 .expect("backdate transcript created_at");
263 }
264
265 /// Non-reflection passthrough: an Observation memory with its own
266 /// transcripts returns exactly those transcripts, regardless of the
267 /// depth parameter. This is the contract pinned by #669:
268 /// "Existing memory_replay for non-reflection memories MUST be
269 /// unchanged."
270 #[test]
271 fn observation_returns_only_its_own_transcripts() {
272 let conn = fresh_db();
273 insert_memory(&conn, "obs-1", "team/eng", "observation");
274 let t1 = store(&conn, "team/eng", "body-1", None).unwrap();
275 link_transcript(&conn, "obs-1", &t1.id, None, None).unwrap();
276
277 // A sibling memory with its own transcript — must NOT leak
278 // into the obs-1 replay even when full-chain depth is set.
279 insert_memory(&conn, "obs-2", "team/eng", "observation");
280 let t2 = store(&conn, "team/eng", "body-2", None).unwrap();
281 link_transcript(&conn, "obs-2", &t2.id, None, None).unwrap();
282
283 let entries = replay_transcript_union(&conn, "obs-1", None).unwrap();
284 assert_eq!(entries.len(), 1);
285 assert_eq!(entries[0].meta.id, t1.id);
286 assert_eq!(entries[0].memory_id, "obs-1");
287
288 // depth=Some(N) on an observation must also short-circuit to
289 // the single-memory result — the walk only fires for kind=Reflection.
290 let entries = replay_transcript_union(&conn, "obs-1", Some(5)).unwrap();
291 assert_eq!(entries.len(), 1);
292 }
293
294 /// Reflection-union happy path: a reflection with 3 sources, each
295 /// with one transcript, plus the reflection's own transcript →
296 /// returns the 4-transcript union. Matches the #669 acceptance
297 /// fixture verbatim.
298 #[test]
299 fn reflection_returns_union_of_self_plus_three_sources() {
300 let conn = fresh_db();
301
302 // Three observation sources, each with one transcript.
303 for (i, ts) in [
304 ("obs-a", "2026-01-01T00:00:00Z"),
305 ("obs-b", "2026-01-02T00:00:00Z"),
306 ("obs-c", "2026-01-03T00:00:00Z"),
307 ]
308 .iter()
309 .enumerate()
310 {
311 let _ = i;
312 insert_memory(&conn, ts.0, "team/eng", "observation");
313 let t = store(&conn, "team/eng", &format!("body-{}", ts.0), None).unwrap();
314 backdate_transcript(&conn, &t.id, ts.1);
315 link_transcript(&conn, ts.0, &t.id, None, None).unwrap();
316 }
317
318 // Reflection memory with its own transcript and reflects_on
319 // edges to each source.
320 insert_memory(&conn, "ref-1", "team/eng", "reflection");
321 let t_ref = store(&conn, "team/eng", "reflection-body", None).unwrap();
322 backdate_transcript(&conn, &t_ref.id, "2026-01-04T00:00:00Z");
323 link_transcript(&conn, "ref-1", &t_ref.id, None, None).unwrap();
324 for src in ["obs-a", "obs-b", "obs-c"] {
325 link_reflects_on(&conn, "ref-1", src);
326 }
327
328 let entries = replay_transcript_union(&conn, "ref-1", None).unwrap();
329 assert_eq!(entries.len(), 4, "self + 3 source transcripts");
330
331 // Chronological order pinned by the backdated created_at column.
332 let ids: Vec<&str> = entries.iter().map(|e| e.meta.id.as_str()).collect();
333 let timestamps: Vec<&str> = entries.iter().map(|e| e.meta.created_at.as_str()).collect();
334 assert_eq!(
335 timestamps,
336 vec![
337 "2026-01-01T00:00:00Z",
338 "2026-01-02T00:00:00Z",
339 "2026-01-03T00:00:00Z",
340 "2026-01-04T00:00:00Z",
341 ],
342 "ascending created_at: {ids:?}"
343 );
344
345 // Every source memory appears as the anchor of exactly one
346 // entry (the reflection itself anchors the final one).
347 let anchor_ids: Vec<&str> = entries.iter().map(|e| e.memory_id.as_str()).collect();
348 assert!(anchor_ids.contains(&"obs-a"));
349 assert!(anchor_ids.contains(&"obs-b"));
350 assert!(anchor_ids.contains(&"obs-c"));
351 assert!(anchor_ids.contains(&"ref-1"));
352 }
353
354 /// Depth-2 reflection chain: `ref-top -> ref-mid -> obs-leaf`.
355 /// `depth = None` (full) returns all three transcripts; `depth =
356 /// Some(1)` stops at `ref-mid` (2 transcripts: self + mid);
357 /// `depth = Some(0)` returns only the top reflection's own
358 /// transcripts. Pins the depth-cap contract from #669.
359 #[test]
360 fn depth_cap_bounds_chain_walk() {
361 let conn = fresh_db();
362
363 insert_memory(&conn, "obs-leaf", "team/eng", "observation");
364 let t_leaf = store(&conn, "team/eng", "leaf", None).unwrap();
365 backdate_transcript(&conn, &t_leaf.id, "2026-01-01T00:00:00Z");
366 link_transcript(&conn, "obs-leaf", &t_leaf.id, None, None).unwrap();
367
368 insert_memory(&conn, "ref-mid", "team/eng", "reflection");
369 let t_mid = store(&conn, "team/eng", "mid", None).unwrap();
370 backdate_transcript(&conn, &t_mid.id, "2026-01-02T00:00:00Z");
371 link_transcript(&conn, "ref-mid", &t_mid.id, None, None).unwrap();
372 link_reflects_on(&conn, "ref-mid", "obs-leaf");
373
374 insert_memory(&conn, "ref-top", "team/eng", "reflection");
375 let t_top = store(&conn, "team/eng", "top", None).unwrap();
376 backdate_transcript(&conn, &t_top.id, "2026-01-03T00:00:00Z");
377 link_transcript(&conn, "ref-top", &t_top.id, None, None).unwrap();
378 link_reflects_on(&conn, "ref-top", "ref-mid");
379
380 // depth=None: full chain — 3 transcripts.
381 let entries = replay_transcript_union(&conn, "ref-top", None).unwrap();
382 assert_eq!(entries.len(), 3, "full chain returns all 3 transcripts");
383
384 // depth=Some(1): self + one hop — 2 transcripts (ref-top, ref-mid).
385 let entries = replay_transcript_union(&conn, "ref-top", Some(1)).unwrap();
386 assert_eq!(entries.len(), 2);
387 let ids: Vec<&str> = entries.iter().map(|e| e.meta.id.as_str()).collect();
388 assert!(ids.contains(&t_top.id.as_str()));
389 assert!(ids.contains(&t_mid.id.as_str()));
390 assert!(!ids.contains(&t_leaf.id.as_str()));
391
392 // depth=Some(0): self only — 1 transcript.
393 let entries = replay_transcript_union(&conn, "ref-top", Some(0)).unwrap();
394 assert_eq!(entries.len(), 1);
395 assert_eq!(entries[0].meta.id, t_top.id);
396 }
397
398 /// Missing root memory: substrate returns an empty union rather
399 /// than erroring. The MCP handler layers its own "memory not
400 /// found" semantics on top; the substrate read is forgiving.
401 #[test]
402 fn missing_root_returns_empty_union() {
403 let conn = fresh_db();
404 let entries = replay_transcript_union(&conn, "does-not-exist", None).unwrap();
405 assert!(entries.is_empty());
406 }
407
408 /// Cycle safety: a hand-written cycle (`a -> b -> a`) does NOT
409 /// loop indefinitely. L1-2 (#659) refuses to add such an edge at
410 /// the API layer, but the walker keeps its own visited set as a
411 /// defense-in-depth. Directly inserts the cycle via the
412 /// `memory_links` table so the test does not depend on bypassing
413 /// L1-2's guard.
414 #[test]
415 fn cycle_in_reflects_on_does_not_loop_forever() {
416 let conn = fresh_db();
417
418 insert_memory(&conn, "ref-a", "team/eng", "reflection");
419 let t_a = store(&conn, "team/eng", "a", None).unwrap();
420 link_transcript(&conn, "ref-a", &t_a.id, None, None).unwrap();
421
422 insert_memory(&conn, "ref-b", "team/eng", "reflection");
423 let t_b = store(&conn, "team/eng", "b", None).unwrap();
424 link_transcript(&conn, "ref-b", &t_b.id, None, None).unwrap();
425
426 // a → b
427 link_reflects_on(&conn, "ref-a", "ref-b");
428 // b → a (the cycle L1-2 normally refuses)
429 link_reflects_on(&conn, "ref-b", "ref-a");
430
431 let entries = replay_transcript_union(&conn, "ref-a", None).unwrap();
432 // Two distinct transcripts — the cycle does NOT inflate the
433 // dedup count.
434 assert_eq!(entries.len(), 2);
435 }
436
437 /// Transcript shared by two memories in the union appears exactly
438 /// once. Mirrors the legitimate fan-in case: a single conversation
439 /// produced both an observation and the reflection that summarises
440 /// it.
441 #[test]
442 fn shared_transcript_deduplicates_to_one_entry() {
443 let conn = fresh_db();
444
445 insert_memory(&conn, "obs-shared", "team/eng", "observation");
446 let t_shared = store(&conn, "team/eng", "shared", None).unwrap();
447 link_transcript(&conn, "obs-shared", &t_shared.id, None, None).unwrap();
448
449 insert_memory(&conn, "ref-1", "team/eng", "reflection");
450 // Reflection ALSO links the same transcript — fan-in.
451 link_transcript(&conn, "ref-1", &t_shared.id, None, None).unwrap();
452 link_reflects_on(&conn, "ref-1", "obs-shared");
453
454 let entries = replay_transcript_union(&conn, "ref-1", None).unwrap();
455 assert_eq!(
456 entries.len(),
457 1,
458 "dedup keeps a single entry per transcript_id"
459 );
460 assert_eq!(entries[0].meta.id, t_shared.id);
461 }
462
463 /// Reflection with NO `reflects_on` edges (an orphan reflection,
464 /// e.g. one whose ancestry was hard-deleted) still returns its
465 /// own transcripts. Defends the "self always counts" invariant.
466 #[test]
467 fn orphan_reflection_returns_only_self() {
468 let conn = fresh_db();
469 insert_memory(&conn, "ref-orphan", "team/eng", "reflection");
470 let t = store(&conn, "team/eng", "orphan", None).unwrap();
471 link_transcript(&conn, "ref-orphan", &t.id, None, None).unwrap();
472
473 let entries = replay_transcript_union(&conn, "ref-orphan", None).unwrap();
474 assert_eq!(entries.len(), 1);
475 assert_eq!(entries[0].memory_id, "ref-orphan");
476 }
477
478 /// Dangling transcript_id (link row exists, transcript row was
479 /// pruned by I3) is silently dropped rather than surfaced as an
480 /// error. Matches the pre-L2-4 I4 handler's tolerance.
481 #[test]
482 fn dangling_transcript_id_is_silently_dropped() {
483 let conn = fresh_db();
484 insert_memory(&conn, "obs-1", "team/eng", "observation");
485 let t = store(&conn, "team/eng", "body", None).unwrap();
486 link_transcript(&conn, "obs-1", &t.id, None, None).unwrap();
487 // Hard-delete the transcript row but leave the link.
488 conn.execute(
489 "DELETE FROM memory_transcripts WHERE id = ?1",
490 rusqlite::params![t.id],
491 )
492 .unwrap();
493
494 let entries = replay_transcript_union(&conn, "obs-1", None).unwrap();
495 assert!(entries.is_empty(), "dangling link drops, no error surfaced");
496 }
497}