use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Duration, Utc};
use rusqlite::{Connection, OptionalExtension, params};
use std::io::{Read, Write};
use crate::config::{ResolvedTranscriptLifecycle, TranscriptsConfig};
const LIFECYCLE_TRACE_TARGET: &str = "transcripts.lifecycle";
const ZSTD_LEVEL: i32 = 3;
pub const MAX_DECOMPRESSED_BYTES: usize = 16 * 1024 * 1024;
#[derive(Debug, Clone)]
pub struct Transcript {
pub id: String,
pub namespace: String,
pub created_at: String,
pub expires_at: Option<String>,
pub compressed_size: i64,
pub original_size: i64,
}
pub fn store(
conn: &Connection,
namespace: &str,
content: &str,
ttl: Option<Duration>,
) -> Result<Transcript> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now();
let created_at = now.to_rfc3339();
let expires_at = ttl.map(|d| (now + d).to_rfc3339());
let original_size =
i64::try_from(content.len()).context("transcript content length overflows i64")?;
let blob = zstd_compress(content.as_bytes())
.context("zstd compression failed for transcript content")?;
let compressed_size =
i64::try_from(blob.len()).context("compressed transcript length overflows i64")?;
conn.execute(
"INSERT INTO memory_transcripts (
id, namespace, created_at, expires_at,
compressed_size, original_size, zstd_level, content_blob
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
id,
namespace,
created_at,
expires_at,
compressed_size,
original_size,
ZSTD_LEVEL,
blob,
],
)
.context("INSERT into memory_transcripts failed")?;
Ok(Transcript {
id,
namespace: namespace.to_string(),
created_at,
expires_at,
compressed_size,
original_size,
})
}
pub fn fetch(conn: &Connection, id: &str) -> Result<Option<String>> {
let row: Option<Vec<u8>> = conn
.query_row(
"SELECT content_blob FROM memory_transcripts WHERE id = ?1",
params![id],
|r| r.get::<_, Vec<u8>>(0),
)
.optional()
.context("SELECT memory_transcripts failed")?;
let Some(blob) = row else {
return Ok(None);
};
let bytes = zstd_decompress(&blob).context("zstd decompression failed")?;
let text = String::from_utf8(bytes).context("transcript blob did not decode to valid UTF-8")?;
Ok(Some(text))
}
pub fn fetch_metadata(conn: &Connection, id: &str) -> Result<Option<Transcript>> {
let row = conn
.query_row(
"SELECT id, namespace, created_at, expires_at,
compressed_size, original_size
FROM memory_transcripts WHERE id = ?1",
params![id],
|r| {
Ok(Transcript {
id: r.get(0)?,
namespace: r.get(1)?,
created_at: r.get(2)?,
expires_at: r.get(3)?,
compressed_size: r.get(4)?,
original_size: r.get(5)?,
})
},
)
.optional()
.context("SELECT memory_transcripts metadata failed")?;
Ok(row)
}
pub fn purge_expired(conn: &Connection) -> Result<usize> {
let now = Utc::now().to_rfc3339();
let n = conn
.execute(
"DELETE FROM memory_transcripts
WHERE expires_at IS NOT NULL AND expires_at <= ?1",
params![now],
)
.context("DELETE expired memory_transcripts failed")?;
Ok(n)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TranscriptLink {
pub memory_id: String,
pub transcript_id: String,
pub span_start: Option<i64>,
pub span_end: Option<i64>,
}
pub fn link_transcript(
conn: &Connection,
memory_id: &str,
transcript_id: &str,
span_start: Option<i64>,
span_end: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO memory_transcript_links (
memory_id, transcript_id, span_start, span_end
) VALUES (?1, ?2, ?3, ?4)",
params![memory_id, transcript_id, span_start, span_end],
)
.context("INSERT into memory_transcript_links failed")?;
Ok(())
}
pub fn transcripts_for_memory(conn: &Connection, memory_id: &str) -> Result<Vec<TranscriptLink>> {
let mut stmt = conn
.prepare(
"SELECT memory_id, transcript_id, span_start, span_end
FROM memory_transcript_links
WHERE memory_id = ?1
ORDER BY transcript_id",
)
.context("PREPARE transcripts_for_memory failed")?;
let rows = stmt
.query_map(params![memory_id], row_to_link)
.context("QUERY transcripts_for_memory failed")?;
let mut out = Vec::new();
for r in rows {
out.push(r.context("decode transcripts_for_memory row")?);
}
Ok(out)
}
pub fn memories_for_transcript(
conn: &Connection,
transcript_id: &str,
) -> Result<Vec<TranscriptLink>> {
let mut stmt = conn
.prepare(
"SELECT memory_id, transcript_id, span_start, span_end
FROM memory_transcript_links
WHERE transcript_id = ?1
ORDER BY memory_id",
)
.context("PREPARE memories_for_transcript failed")?;
let rows = stmt
.query_map(params![transcript_id], row_to_link)
.context("QUERY memories_for_transcript failed")?;
let mut out = Vec::new();
for r in rows {
out.push(r.context("decode memories_for_transcript row")?);
}
Ok(out)
}
fn row_to_link(row: &rusqlite::Row<'_>) -> rusqlite::Result<TranscriptLink> {
Ok(TranscriptLink {
memory_id: row.get(0)?,
transcript_id: row.get(1)?,
span_start: row.get(2)?,
span_end: row.get(3)?,
})
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct SweepReport {
pub archived: usize,
pub pruned: usize,
pub errors: usize,
}
pub fn sweep_transcript_lifecycle(
conn: &Connection,
cfg: &TranscriptsConfig,
) -> Result<SweepReport> {
let now = Utc::now();
let mut report = SweepReport::default();
archive_phase(conn, cfg, now, &mut report)?;
prune_phase(conn, cfg, now, &mut report)?;
Ok(report)
}
fn archive_phase(
conn: &Connection,
cfg: &TranscriptsConfig,
now: DateTime<Utc>,
report: &mut SweepReport,
) -> Result<()> {
let live_candidates: Vec<(String, String, String)> = {
let mut stmt = conn
.prepare(
"SELECT id, namespace, created_at
FROM memory_transcripts
WHERE archived_at IS NULL",
)
.context("PREPARE archive_phase scan failed")?;
let rows = stmt
.query_map([], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
))
})
.context("QUERY archive_phase scan failed")?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.context("decode archive_phase rows")?
};
for (id, namespace, created_at) in live_candidates {
let resolved = cfg.resolve(&namespace);
match should_archive(conn, &id, &created_at, now, resolved) {
Ok(true) => {
let stamp = now.to_rfc3339();
if let Err(e) = conn.execute(
"UPDATE memory_transcripts
SET archived_at = ?1
WHERE id = ?2 AND archived_at IS NULL",
params![stamp, id],
) {
tracing::warn!(
target: LIFECYCLE_TRACE_TARGET,
"archive UPDATE failed for transcript {id}: {e}"
);
report.errors += 1;
} else {
report.archived += 1;
}
}
Ok(false) => {}
Err(e) => {
tracing::warn!(
target: LIFECYCLE_TRACE_TARGET,
"archive eligibility check failed for transcript {id}: {e}"
);
report.errors += 1;
}
}
}
Ok(())
}
fn prune_phase(
conn: &Connection,
cfg: &TranscriptsConfig,
now: DateTime<Utc>,
report: &mut SweepReport,
) -> Result<()> {
let candidates: Vec<(String, String, String)> = {
let mut stmt = conn
.prepare(
"SELECT id, namespace, archived_at
FROM memory_transcripts
WHERE archived_at IS NOT NULL",
)
.context("PREPARE prune_phase scan failed")?;
let rows = stmt
.query_map([], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
))
})
.context("QUERY prune_phase scan failed")?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.context("decode prune_phase rows")?
};
for (id, namespace, archived_at) in candidates {
let resolved = cfg.resolve(&namespace);
let archived_at = match DateTime::parse_from_rfc3339(&archived_at) {
Ok(t) => t.with_timezone(&Utc),
Err(e) => {
tracing::warn!(
target: LIFECYCLE_TRACE_TARGET,
"transcript {id} has unparseable archived_at {archived_at:?}: {e}"
);
report.errors += 1;
continue;
}
};
let prune_at = archived_at + Duration::seconds(resolved.archive_grace_secs);
if prune_at >= now {
continue;
}
match conn.execute("DELETE FROM memory_transcripts WHERE id = ?1", params![id]) {
Ok(n) => report.pruned += n,
Err(e) => {
tracing::warn!(
target: LIFECYCLE_TRACE_TARGET,
"prune DELETE failed for transcript {id}: {e}"
);
report.errors += 1;
}
}
}
Ok(())
}
fn should_archive(
conn: &Connection,
transcript_id: &str,
created_at: &str,
now: DateTime<Utc>,
resolved: ResolvedTranscriptLifecycle,
) -> Result<bool> {
let created = DateTime::parse_from_rfc3339(created_at)
.with_context(|| format!("transcript {transcript_id} has unparseable created_at"))?
.with_timezone(&Utc);
let archive_at = created + Duration::seconds(resolved.default_ttl_secs);
if archive_at >= now {
return Ok(false);
}
let now_str = now.to_rfc3339();
let alive: i64 = conn
.query_row(
"SELECT COUNT(*)
FROM memory_transcript_links l
JOIN memories m ON m.id = l.memory_id
WHERE l.transcript_id = ?1
AND (m.expires_at IS NULL OR m.expires_at > ?2)",
params![transcript_id, now_str],
|r| r.get(0),
)
.with_context(|| format!("alive-memory count failed for transcript {transcript_id}"))?;
Ok(alive == 0)
}
fn zstd_compress(input: &[u8]) -> Result<Vec<u8>> {
let mut out = Vec::with_capacity(input.len() / 4 + 64);
{
let mut encoder = zstd::stream::write::Encoder::new(&mut out, ZSTD_LEVEL)?;
encoder.write_all(input)?;
encoder.finish()?;
}
Ok(out)
}
fn zstd_decompress(input: &[u8]) -> Result<Vec<u8>> {
let init_cap = std::cmp::min(input.len() * 4, MAX_DECOMPRESSED_BYTES);
let mut out = Vec::with_capacity(init_cap);
let mut decoder = zstd::stream::read::Decoder::new(input)?;
let mut buf = [0u8; 64 * 1024];
loop {
let n = decoder.read(&mut buf)?;
if n == 0 {
break;
}
if out.len().saturating_add(n) > MAX_DECOMPRESSED_BYTES {
tracing::warn!(
target: "transcripts.bomb",
cap_bytes = MAX_DECOMPRESSED_BYTES,
so_far = out.len(),
"rejecting transcript: decompressed size would exceed cap"
);
return Err(anyhow!(
"transcript decompression exceeded {} byte cap (decompression bomb defence)",
MAX_DECOMPRESSED_BYTES
));
}
out.extend_from_slice(&buf[..n]);
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
fn fresh_db() -> Connection {
crate::db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn insert_memory(conn: &Connection, id: &str, expires_at: Option<&str>) {
let now = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memories (
id, tier, namespace, title, content, expires_at, created_at, updated_at
) VALUES (?1, 'short', 'ns', ?2, 'body', ?3, ?4, ?4)",
rusqlite::params![id, format!("title-{id}"), expires_at, now],
)
.expect("insert test memory");
}
#[test]
fn store_and_fetch_round_trips_content() {
let conn = fresh_db();
let body = "hello transcripts";
let t = store(&conn, "ns-x", body, None).expect("store ok");
assert_eq!(t.namespace, "ns-x");
assert!(t.compressed_size > 0);
assert_eq!(t.original_size, body.len() as i64);
let back = fetch(&conn, &t.id).expect("fetch ok").expect("present");
assert_eq!(back, body);
}
#[test]
fn store_with_ttl_sets_expires_at() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "body", Some(Duration::seconds(120))).expect("store ok");
assert!(t.expires_at.is_some());
}
#[test]
fn fetch_missing_id_returns_none() {
let conn = fresh_db();
let r = fetch(&conn, "no-such-id").expect("query ok");
assert!(r.is_none());
}
#[test]
fn fetch_metadata_returns_handle_without_blob() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "body", None).expect("store ok");
let meta = fetch_metadata(&conn, &t.id)
.expect("query ok")
.expect("present");
assert_eq!(meta.id, t.id);
assert_eq!(meta.namespace, "ns-x");
assert_eq!(meta.original_size, t.original_size);
}
#[test]
fn fetch_metadata_missing_returns_none() {
let conn = fresh_db();
let r = fetch_metadata(&conn, "no-such-id").expect("query ok");
assert!(r.is_none());
}
#[test]
fn purge_expired_removes_only_past_due_rows() {
let conn = fresh_db();
let _live = store(&conn, "ns-x", "live", None).expect("store live");
let past = store(&conn, "ns-x", "past", None).expect("store past");
conn.execute(
"UPDATE memory_transcripts SET expires_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
rusqlite::params![past.id],
)
.unwrap();
let n = purge_expired(&conn).expect("purge ok");
assert_eq!(n, 1, "exactly one past-expiry row");
assert!(fetch(&conn, &past.id).unwrap().is_none());
}
#[test]
fn link_and_transcripts_for_memory_round_trip() {
let conn = fresh_db();
insert_memory(&conn, "m1", None);
let t = store(&conn, "ns-x", "body", None).expect("store ok");
link_transcript(&conn, "m1", &t.id, Some(0), Some(4)).expect("link ok");
let links = transcripts_for_memory(&conn, "m1").expect("query ok");
assert_eq!(links.len(), 1);
assert_eq!(links[0].memory_id, "m1");
assert_eq!(links[0].transcript_id, t.id);
assert_eq!(links[0].span_start, Some(0));
assert_eq!(links[0].span_end, Some(4));
}
#[test]
fn memories_for_transcript_round_trip() {
let conn = fresh_db();
insert_memory(&conn, "m1", None);
insert_memory(&conn, "m2", None);
let t = store(&conn, "ns-x", "body", None).expect("store ok");
link_transcript(&conn, "m1", &t.id, None, None).expect("link ok");
link_transcript(&conn, "m2", &t.id, None, None).expect("link ok");
let mems = memories_for_transcript(&conn, &t.id).expect("query ok");
assert_eq!(mems.len(), 2);
assert_eq!(mems[0].memory_id, "m1");
assert_eq!(mems[1].memory_id, "m2");
}
#[test]
fn link_transcript_replaces_on_duplicate_pair() {
let conn = fresh_db();
insert_memory(&conn, "m1", None);
let t = store(&conn, "ns-x", "body", None).expect("store ok");
link_transcript(&conn, "m1", &t.id, Some(0), Some(4)).expect("link ok");
link_transcript(&conn, "m1", &t.id, Some(2), Some(10)).expect("relink ok");
let links = transcripts_for_memory(&conn, "m1").expect("query ok");
assert_eq!(links.len(), 1);
assert_eq!(links[0].span_start, Some(2));
assert_eq!(links[0].span_end, Some(10));
}
#[test]
fn sweep_archives_aged_rows_with_no_links() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "old", None).expect("store ok");
conn.execute(
"UPDATE memory_transcripts SET created_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert!(report.archived >= 1, "expected archive: {report:?}");
}
#[test]
fn sweep_prunes_archived_rows_past_grace() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "old", None).expect("store ok");
conn.execute(
"UPDATE memory_transcripts SET archived_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert_eq!(report.pruned, 1, "expected prune: {report:?}");
assert!(fetch_metadata(&conn, &t.id).unwrap().is_none());
}
#[test]
fn sweep_skips_live_rows() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "fresh body", None).expect("store ok");
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert_eq!(report.archived, 0);
assert_eq!(report.pruned, 0);
assert!(fetch_metadata(&conn, &t.id).unwrap().is_some());
}
#[test]
fn sweep_skips_archive_when_memory_still_alive() {
let conn = fresh_db();
insert_memory(&conn, "m1", None); let t = store(&conn, "ns-x", "body", None).expect("store ok");
link_transcript(&conn, "m1", &t.id, None, None).expect("link ok");
conn.execute(
"UPDATE memory_transcripts SET created_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert_eq!(
report.archived, 0,
"live memory keeps transcript: {report:?}"
);
}
#[test]
fn sweep_handles_unparseable_archived_at() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "body", None).expect("store ok");
conn.execute(
"UPDATE memory_transcripts SET archived_at = 'not-a-date' WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert!(report.errors >= 1, "expected error tally: {report:?}");
assert_eq!(report.pruned, 0, "unparseable row must not be pruned");
}
#[test]
fn should_archive_returns_false_when_within_ttl() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "fresh", None).expect("store ok");
let cfg = TranscriptsConfig::default();
let resolved = cfg.resolve("ns-x");
let res = super::should_archive(&conn, &t.id, &t.created_at, Utc::now(), resolved)
.expect("should_archive ok");
assert!(!res, "fresh row must not be archive-eligible");
}
#[test]
fn sweep_archive_phase_tallies_should_archive_failure() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "body", None).expect("store ok");
conn.execute(
"UPDATE memory_transcripts SET created_at = 'not-a-date' WHERE id = ?1",
rusqlite::params![t.id],
)
.unwrap();
let cfg = TranscriptsConfig::default();
let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
assert!(report.errors >= 1, "expected error tally: {report:?}");
assert_eq!(report.archived, 0);
}
#[test]
fn should_archive_propagates_unparseable_created_at() {
let conn = fresh_db();
let cfg = TranscriptsConfig::default();
let resolved = cfg.resolve("ns-x");
let err =
super::should_archive(&conn, "id", "not-a-date", Utc::now(), resolved).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("unparseable created_at"), "got: {msg}");
}
#[test]
fn zstd_round_trip_decodes_to_original() {
let original = b"some non-trivial bytes \x00\x01\x02 with binary";
let blob = super::zstd_compress(original).expect("compress");
let back = super::zstd_decompress(&blob).expect("decompress");
assert_eq!(back, original);
}
#[test]
fn zstd_decompress_rejects_oversized_blob() {
let big = vec![0u8; super::MAX_DECOMPRESSED_BYTES + 1024];
let blob = super::zstd_compress(&big).expect("compress");
let err = super::zstd_decompress(&blob).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("decompression bomb"), "got: {msg}");
}
#[test]
fn fetch_invalid_utf8_blob_returns_error() {
let conn = fresh_db();
let t = store(&conn, "ns-x", "placeholder", None).expect("store");
let bad_blob = super::zstd_compress(&[0xFF, 0xFE, 0xFD]).expect("compress bad utf8");
conn.execute(
"UPDATE memory_transcripts SET content_blob = ?1 WHERE id = ?2",
rusqlite::params![bad_blob, t.id],
)
.unwrap();
let err = fetch(&conn, &t.id).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("UTF-8") || msg.contains("utf"), "got: {msg}");
}
}