use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use std::collections::{HashSet, VecDeque};
use crate::transcripts::storage::{
Transcript, TranscriptLink, fetch_metadata, transcripts_for_memory,
};
#[derive(Debug, Clone)]
pub struct ReplayEntry {
pub memory_id: String,
pub link: TranscriptLink,
pub meta: Transcript,
}
pub fn replay_transcript_union(
conn: &Connection,
memory_id: &str,
depth: Option<u32>,
) -> Result<Vec<ReplayEntry>> {
let kind = match crate::db::get(conn, memory_id)? {
Some(m) => m.memory_kind,
None => return Ok(Vec::new()),
};
let mut visited: HashSet<String> = HashSet::new();
let mut frontier: VecDeque<(String, u32)> = VecDeque::new();
let mut union_memory_ids: Vec<String> = Vec::new();
visited.insert(memory_id.to_string());
union_memory_ids.push(memory_id.to_string());
frontier.push_back((memory_id.to_string(), 0));
let walk =
matches!(kind, crate::models::MemoryKind::Reflection) && depth.is_none_or(|d| d >= 1);
if walk {
while let Some((current, hop)) = frontier.pop_front() {
if depth.is_some_and(|cap| hop >= cap) {
continue;
}
for next in fetch_reflects_on_targets(conn, ¤t)? {
if visited.insert(next.clone()) {
union_memory_ids.push(next.clone());
frontier.push_back((next, hop + 1));
}
}
}
}
let mut entries: Vec<ReplayEntry> = Vec::new();
let mut seen_transcripts: HashSet<String> = HashSet::new();
for mid in &union_memory_ids {
let links = transcripts_for_memory(conn, mid)
.with_context(|| format!("transcripts_for_memory({mid}) failed"))?;
for link in links {
if !seen_transcripts.insert(link.transcript_id.clone()) {
continue;
}
match fetch_metadata(conn, &link.transcript_id)? {
Some(meta) => entries.push(ReplayEntry {
memory_id: mid.clone(),
link,
meta,
}),
None => {
tracing::warn!(
target: "transcripts.replay",
"dangling transcript_id {} for memory {mid}",
link.transcript_id
);
}
}
}
}
entries.sort_by(|a, b| {
a.meta
.created_at
.cmp(&b.meta.created_at)
.then_with(|| a.meta.id.cmp(&b.meta.id))
});
Ok(entries)
}
fn fetch_reflects_on_targets(conn: &Connection, source_id: &str) -> Result<Vec<String>> {
let mut stmt = conn
.prepare(
"SELECT target_id FROM memory_links
WHERE source_id = ?1 AND relation = 'reflects_on'
ORDER BY target_id",
)
.context("PREPARE reflects_on edge scan failed")?;
let rows = stmt
.query_map(params![source_id], |r| r.get::<_, String>(0))
.context("QUERY reflects_on edge scan failed")?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.context("decode reflects_on edge rows")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transcripts::storage::{link_transcript, store};
use chrono::Utc;
use rusqlite::Connection;
fn fresh_db() -> Connection {
crate::db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn insert_memory(conn: &Connection, id: &str, namespace: &str, kind: &str) {
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memories (
id, tier, namespace, title, content, created_at, updated_at, memory_kind
) VALUES (?1, 'short', ?2, ?3, 'body', ?4, ?4, ?5)",
rusqlite::params![id, namespace, format!("title-{id}"), now, kind],
)
.expect("insert test memory");
}
fn link_reflects_on(conn: &Connection, source: &str, target: &str) {
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_links (source_id, target_id, relation, created_at, valid_from)
VALUES (?1, ?2, 'reflects_on', ?3, ?3)",
rusqlite::params![source, target, now],
)
.expect("insert reflects_on link");
}
fn backdate_transcript(conn: &Connection, id: &str, ts: &str) {
conn.execute(
"UPDATE memory_transcripts SET created_at = ?1 WHERE id = ?2",
rusqlite::params![ts, id],
)
.expect("backdate transcript created_at");
}
#[test]
fn observation_returns_only_its_own_transcripts() {
let conn = fresh_db();
insert_memory(&conn, "obs-1", "team/eng", "observation");
let t1 = store(&conn, "team/eng", "body-1", None).unwrap();
link_transcript(&conn, "obs-1", &t1.id, None, None).unwrap();
insert_memory(&conn, "obs-2", "team/eng", "observation");
let t2 = store(&conn, "team/eng", "body-2", None).unwrap();
link_transcript(&conn, "obs-2", &t2.id, None, None).unwrap();
let entries = replay_transcript_union(&conn, "obs-1", None).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].meta.id, t1.id);
assert_eq!(entries[0].memory_id, "obs-1");
let entries = replay_transcript_union(&conn, "obs-1", Some(5)).unwrap();
assert_eq!(entries.len(), 1);
}
#[test]
fn reflection_returns_union_of_self_plus_three_sources() {
let conn = fresh_db();
for (i, ts) in [
("obs-a", "2026-01-01T00:00:00Z"),
("obs-b", "2026-01-02T00:00:00Z"),
("obs-c", "2026-01-03T00:00:00Z"),
]
.iter()
.enumerate()
{
let _ = i;
insert_memory(&conn, ts.0, "team/eng", "observation");
let t = store(&conn, "team/eng", &format!("body-{}", ts.0), None).unwrap();
backdate_transcript(&conn, &t.id, ts.1);
link_transcript(&conn, ts.0, &t.id, None, None).unwrap();
}
insert_memory(&conn, "ref-1", "team/eng", "reflection");
let t_ref = store(&conn, "team/eng", "reflection-body", None).unwrap();
backdate_transcript(&conn, &t_ref.id, "2026-01-04T00:00:00Z");
link_transcript(&conn, "ref-1", &t_ref.id, None, None).unwrap();
for src in ["obs-a", "obs-b", "obs-c"] {
link_reflects_on(&conn, "ref-1", src);
}
let entries = replay_transcript_union(&conn, "ref-1", None).unwrap();
assert_eq!(entries.len(), 4, "self + 3 source transcripts");
let ids: Vec<&str> = entries.iter().map(|e| e.meta.id.as_str()).collect();
let timestamps: Vec<&str> = entries.iter().map(|e| e.meta.created_at.as_str()).collect();
assert_eq!(
timestamps,
vec![
"2026-01-01T00:00:00Z",
"2026-01-02T00:00:00Z",
"2026-01-03T00:00:00Z",
"2026-01-04T00:00:00Z",
],
"ascending created_at: {ids:?}"
);
let anchor_ids: Vec<&str> = entries.iter().map(|e| e.memory_id.as_str()).collect();
assert!(anchor_ids.contains(&"obs-a"));
assert!(anchor_ids.contains(&"obs-b"));
assert!(anchor_ids.contains(&"obs-c"));
assert!(anchor_ids.contains(&"ref-1"));
}
#[test]
fn depth_cap_bounds_chain_walk() {
let conn = fresh_db();
insert_memory(&conn, "obs-leaf", "team/eng", "observation");
let t_leaf = store(&conn, "team/eng", "leaf", None).unwrap();
backdate_transcript(&conn, &t_leaf.id, "2026-01-01T00:00:00Z");
link_transcript(&conn, "obs-leaf", &t_leaf.id, None, None).unwrap();
insert_memory(&conn, "ref-mid", "team/eng", "reflection");
let t_mid = store(&conn, "team/eng", "mid", None).unwrap();
backdate_transcript(&conn, &t_mid.id, "2026-01-02T00:00:00Z");
link_transcript(&conn, "ref-mid", &t_mid.id, None, None).unwrap();
link_reflects_on(&conn, "ref-mid", "obs-leaf");
insert_memory(&conn, "ref-top", "team/eng", "reflection");
let t_top = store(&conn, "team/eng", "top", None).unwrap();
backdate_transcript(&conn, &t_top.id, "2026-01-03T00:00:00Z");
link_transcript(&conn, "ref-top", &t_top.id, None, None).unwrap();
link_reflects_on(&conn, "ref-top", "ref-mid");
let entries = replay_transcript_union(&conn, "ref-top", None).unwrap();
assert_eq!(entries.len(), 3, "full chain returns all 3 transcripts");
let entries = replay_transcript_union(&conn, "ref-top", Some(1)).unwrap();
assert_eq!(entries.len(), 2);
let ids: Vec<&str> = entries.iter().map(|e| e.meta.id.as_str()).collect();
assert!(ids.contains(&t_top.id.as_str()));
assert!(ids.contains(&t_mid.id.as_str()));
assert!(!ids.contains(&t_leaf.id.as_str()));
let entries = replay_transcript_union(&conn, "ref-top", Some(0)).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].meta.id, t_top.id);
}
#[test]
fn missing_root_returns_empty_union() {
let conn = fresh_db();
let entries = replay_transcript_union(&conn, "does-not-exist", None).unwrap();
assert!(entries.is_empty());
}
#[test]
fn cycle_in_reflects_on_does_not_loop_forever() {
let conn = fresh_db();
insert_memory(&conn, "ref-a", "team/eng", "reflection");
let t_a = store(&conn, "team/eng", "a", None).unwrap();
link_transcript(&conn, "ref-a", &t_a.id, None, None).unwrap();
insert_memory(&conn, "ref-b", "team/eng", "reflection");
let t_b = store(&conn, "team/eng", "b", None).unwrap();
link_transcript(&conn, "ref-b", &t_b.id, None, None).unwrap();
link_reflects_on(&conn, "ref-a", "ref-b");
link_reflects_on(&conn, "ref-b", "ref-a");
let entries = replay_transcript_union(&conn, "ref-a", None).unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn shared_transcript_deduplicates_to_one_entry() {
let conn = fresh_db();
insert_memory(&conn, "obs-shared", "team/eng", "observation");
let t_shared = store(&conn, "team/eng", "shared", None).unwrap();
link_transcript(&conn, "obs-shared", &t_shared.id, None, None).unwrap();
insert_memory(&conn, "ref-1", "team/eng", "reflection");
link_transcript(&conn, "ref-1", &t_shared.id, None, None).unwrap();
link_reflects_on(&conn, "ref-1", "obs-shared");
let entries = replay_transcript_union(&conn, "ref-1", None).unwrap();
assert_eq!(
entries.len(),
1,
"dedup keeps a single entry per transcript_id"
);
assert_eq!(entries[0].meta.id, t_shared.id);
}
#[test]
fn orphan_reflection_returns_only_self() {
let conn = fresh_db();
insert_memory(&conn, "ref-orphan", "team/eng", "reflection");
let t = store(&conn, "team/eng", "orphan", None).unwrap();
link_transcript(&conn, "ref-orphan", &t.id, None, None).unwrap();
let entries = replay_transcript_union(&conn, "ref-orphan", None).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].memory_id, "ref-orphan");
}
#[test]
fn dangling_transcript_id_is_silently_dropped() {
let conn = fresh_db();
insert_memory(&conn, "obs-1", "team/eng", "observation");
let t = store(&conn, "team/eng", "body", None).unwrap();
link_transcript(&conn, "obs-1", &t.id, None, None).unwrap();
conn.execute(
"DELETE FROM memory_transcripts WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let entries = replay_transcript_union(&conn, "obs-1", None).unwrap();
assert!(entries.is_empty(), "dangling link drops, no error surfaced");
}
}