Skip to main content

ai_memory/transcripts/
storage.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! CRUD, zstd helpers, lifecycle sweep, and data types for `memory_transcripts`.
5//!
6//! All I1/I2/I3 logic lives here. See [`super`] for the module-level
7//! overview of the v0.7.0 I-track substrate.
8
9use anyhow::{Context, Result, anyhow};
10use chrono::{DateTime, Duration, Utc};
11use rusqlite::{Connection, OptionalExtension, params};
12use std::io::{Read, Write};
13
14use crate::config::{ResolvedTranscriptLifecycle, TranscriptsConfig};
15
16/// Tracing target for transcript retention / lifecycle sweeps
17/// (#1558 tracing-target SSOT).
18const LIFECYCLE_TRACE_TARGET: &str = "transcripts.lifecycle";
19
20/// Default zstd compression level. Matches `cli::logs::zstd_compress`
21/// for cross-codebase consistency.
22const ZSTD_LEVEL: i32 = 3;
23
24/// v0.7.0 I1 hardening — hard cap on the size of a single decompressed
25/// transcript. A pathological zstd blob (e.g. 1 KB compressed → 1 GB
26/// decompressed) would otherwise OOM the daemon when [`fetch`] runs.
27/// 16 MiB is large enough that no legitimate transcript stored via
28/// [`store`] is rejected (the store path itself ingests `&str`, so
29/// rows above this ceiling could only have been hand-crafted by a
30/// hostile writer with direct DB access). Surfaced as a constant so a
31/// downstream operator can audit the boundary in a code review without
32/// chasing magic numbers across modules.
33pub const MAX_DECOMPRESSED_BYTES: usize = 16 * 1024 * 1024;
34
35/// Lightweight handle for a stored transcript. Does NOT carry the blob
36/// itself — callers fetch the decompressed content on demand via
37/// [`fetch`]. The [`Transcript`] handle is what insert/list operations
38/// return so the (potentially multi-MB) payload doesn't need to flow
39/// through every API surface.
40#[derive(Debug, Clone)]
41pub struct Transcript {
42    pub id: String,
43    pub namespace: String,
44    pub created_at: String,
45    pub expires_at: Option<String>,
46    pub compressed_size: i64,
47    pub original_size: i64,
48}
49
50/// Compress `content` with zstd-3 and write a row to `memory_transcripts`.
51///
52/// `ttl` is interpreted as a duration from "now"; `None` means no
53/// expiry (the row is retained until explicitly deleted by I3's
54/// archive-prune sweeper).
55///
56/// The returned [`Transcript`] handle lets callers persist the id +
57/// metadata (e.g. into the I2 join table) without re-reading the row.
58///
59/// # Errors
60///
61/// Returns an error if zstd encoding fails (out-of-memory) or the
62/// SQLite INSERT fails (disk full, schema mismatch, etc.).
63pub fn store(
64    conn: &Connection,
65    namespace: &str,
66    content: &str,
67    ttl: Option<Duration>,
68) -> Result<Transcript> {
69    let id = uuid::Uuid::new_v4().to_string();
70    let now = Utc::now();
71    let created_at = now.to_rfc3339();
72    let expires_at = ttl.map(|d| (now + d).to_rfc3339());
73
74    let original_size =
75        i64::try_from(content.len()).context("transcript content length overflows i64")?;
76    let blob = zstd_compress(content.as_bytes())
77        .context("zstd compression failed for transcript content")?;
78    let compressed_size =
79        i64::try_from(blob.len()).context("compressed transcript length overflows i64")?;
80
81    conn.execute(
82        "INSERT INTO memory_transcripts (
83            id, namespace, created_at, expires_at,
84            compressed_size, original_size, zstd_level, content_blob
85         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
86        params![
87            id,
88            namespace,
89            created_at,
90            expires_at,
91            compressed_size,
92            original_size,
93            ZSTD_LEVEL,
94            blob,
95        ],
96    )
97    .context("INSERT into memory_transcripts failed")?;
98
99    Ok(Transcript {
100        id,
101        namespace: namespace.to_string(),
102        created_at,
103        expires_at,
104        compressed_size,
105        original_size,
106    })
107}
108
109/// Fetch + decompress the transcript identified by `id`. Returns
110/// `Ok(None)` when no row matches; callers treat that as "transcript
111/// expired or never existed" and surface a structured error upstream.
112///
113/// # Errors
114///
115/// Returns an error when the row exists but the blob cannot be
116/// decoded (corrupt blob, OOM during decompression) or when the
117/// decompressed bytes are not valid UTF-8.
118pub fn fetch(conn: &Connection, id: &str) -> Result<Option<String>> {
119    let row: Option<Vec<u8>> = conn
120        .query_row(
121            "SELECT content_blob FROM memory_transcripts WHERE id = ?1",
122            params![id],
123            |r| r.get::<_, Vec<u8>>(0),
124        )
125        .optional()
126        .context("SELECT memory_transcripts failed")?;
127
128    let Some(blob) = row else {
129        return Ok(None);
130    };
131
132    let bytes = zstd_decompress(&blob).context("zstd decompression failed")?;
133    let text = String::from_utf8(bytes).context("transcript blob did not decode to valid UTF-8")?;
134    Ok(Some(text))
135}
136
137/// v0.7.0 I4 — fetch the lightweight metadata for a transcript without
138/// pulling the (potentially multi-MB) decompressed content blob.
139/// Returns `Ok(None)` when no row matches, mirroring [`fetch`]. The
140/// `Transcript` handle carries `created_at`, `compressed_size`, and
141/// `original_size`, which I4's `memory_replay` joins with the I2
142/// link spans to assemble a per-transcript metadata block.
143///
144/// # Errors
145///
146/// Returns an error when the SELECT fails (disk I/O, schema drift).
147pub fn fetch_metadata(conn: &Connection, id: &str) -> Result<Option<Transcript>> {
148    let row = conn
149        .query_row(
150            "SELECT id, namespace, created_at, expires_at,
151                    compressed_size, original_size
152               FROM memory_transcripts WHERE id = ?1",
153            params![id],
154            |r| {
155                Ok(Transcript {
156                    id: r.get(0)?,
157                    namespace: r.get(1)?,
158                    created_at: r.get(2)?,
159                    expires_at: r.get(3)?,
160                    compressed_size: r.get(4)?,
161                    original_size: r.get(5)?,
162                })
163            },
164        )
165        .optional()
166        .context("SELECT memory_transcripts metadata failed")?;
167    Ok(row)
168}
169
170/// Delete every row whose `expires_at` is in the past (relative to
171/// "now"). Returns the number of rows removed.
172///
173/// I1 only deletes past-expiry rows. The full archive-then-prune
174/// lifecycle (separate `archived_transcripts` mirror table, two-stage
175/// retention) lands in I3.
176///
177/// # Errors
178///
179/// Returns an error when the DELETE fails (e.g. disk write error).
180pub fn purge_expired(conn: &Connection) -> Result<usize> {
181    let now = Utc::now().to_rfc3339();
182    let n = conn
183        .execute(
184            "DELETE FROM memory_transcripts
185             WHERE expires_at IS NOT NULL AND expires_at <= ?1",
186            params![now],
187        )
188        .context("DELETE expired memory_transcripts failed")?;
189    Ok(n)
190}
191
192/// v0.7.0 I2 — provenance edge between a memory and a transcript span.
193///
194/// Establishes that `memory_id` was extracted (or otherwise derived)
195/// from the transcript identified by `transcript_id`. The optional
196/// (`span_start`, `span_end`) byte offsets address a sub-region of the
197/// decompressed transcript; both `None` means "the whole transcript".
198/// Offsets are 0-based byte positions into the UTF-8 decompressed
199/// bytes, half-open `[start, end)` per the usual Rust slicing
200/// convention.
201///
202/// The PRIMARY KEY on the join table is `(memory_id, transcript_id)`,
203/// so a memory can only be linked to a given transcript once. Callers
204/// that need to record multiple disjoint spans from the same transcript
205/// should merge them into a single bounding pair upstream.
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub struct TranscriptLink {
208    pub memory_id: String,
209    pub transcript_id: String,
210    pub span_start: Option<i64>,
211    pub span_end: Option<i64>,
212}
213
214/// Insert (or replace) a provenance edge between a memory and a
215/// transcript. Both ids must already exist in their respective tables —
216/// the foreign keys are enforced (`PRAGMA foreign_keys = ON` is set on
217/// every connection opened by [`crate::db::open`]).
218///
219/// Uses `INSERT OR REPLACE` so re-linking the same `(memory_id,
220/// transcript_id)` pair with a different span is a no-fuss update; the
221/// I-track currently has no caller that needs to detect the duplicate.
222///
223/// # Errors
224///
225/// Returns an error when the INSERT fails — most commonly a foreign-key
226/// violation (one of the ids is unknown or has been deleted), or a
227/// disk-write failure.
228pub fn link_transcript(
229    conn: &Connection,
230    memory_id: &str,
231    transcript_id: &str,
232    span_start: Option<i64>,
233    span_end: Option<i64>,
234) -> Result<()> {
235    conn.execute(
236        "INSERT OR REPLACE INTO memory_transcript_links (
237            memory_id, transcript_id, span_start, span_end
238         ) VALUES (?1, ?2, ?3, ?4)",
239        params![memory_id, transcript_id, span_start, span_end],
240    )
241    .context("INSERT into memory_transcript_links failed")?;
242    Ok(())
243}
244
245/// Return every transcript provenance edge for a given memory.
246///
247/// Order is stable on `transcript_id` so callers (notably I4's
248/// `memory_replay`) get a deterministic replay sequence.
249///
250/// # Errors
251///
252/// Returns an error when the SELECT or row decoding fails.
253pub fn transcripts_for_memory(conn: &Connection, memory_id: &str) -> Result<Vec<TranscriptLink>> {
254    let mut stmt = conn
255        .prepare(
256            "SELECT memory_id, transcript_id, span_start, span_end
257             FROM memory_transcript_links
258             WHERE memory_id = ?1
259             ORDER BY transcript_id",
260        )
261        .context("PREPARE transcripts_for_memory failed")?;
262    let rows = stmt
263        .query_map(params![memory_id], row_to_link)
264        .context("QUERY transcripts_for_memory failed")?;
265    let mut out = Vec::new();
266    for r in rows {
267        out.push(r.context("decode transcripts_for_memory row")?);
268    }
269    Ok(out)
270}
271
272/// Return every memory derived from a given transcript.
273///
274/// Order is stable on `memory_id` so the fan-in is deterministic for
275/// downstream tooling (e.g. archive sweepers in I3).
276///
277/// # Errors
278///
279/// Returns an error when the SELECT or row decoding fails.
280pub fn memories_for_transcript(
281    conn: &Connection,
282    transcript_id: &str,
283) -> Result<Vec<TranscriptLink>> {
284    let mut stmt = conn
285        .prepare(
286            "SELECT memory_id, transcript_id, span_start, span_end
287             FROM memory_transcript_links
288             WHERE transcript_id = ?1
289             ORDER BY memory_id",
290        )
291        .context("PREPARE memories_for_transcript failed")?;
292    let rows = stmt
293        .query_map(params![transcript_id], row_to_link)
294        .context("QUERY memories_for_transcript failed")?;
295    let mut out = Vec::new();
296    for r in rows {
297        out.push(r.context("decode memories_for_transcript row")?);
298    }
299    Ok(out)
300}
301
302fn row_to_link(row: &rusqlite::Row<'_>) -> rusqlite::Result<TranscriptLink> {
303    Ok(TranscriptLink {
304        memory_id: row.get(0)?,
305        transcript_id: row.get(1)?,
306        span_start: row.get(2)?,
307        span_end: row.get(3)?,
308    })
309}
310
311/// v0.7.0 I3 — outcome of one [`sweep_transcript_lifecycle`] pass.
312///
313/// `archived` and `pruned` count distinct rows touched in each phase
314/// of the same sweep tick; a row archived this tick will not be
315/// pruned until at least the next tick (and only after its grace
316/// window expires). `errors` is best-effort observability — the
317/// sweeper logs and continues past per-row failures so a single
318/// poison row cannot stall the loop, but the count is surfaced for
319/// the daemon's structured logs and the future doctor overlay.
320#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
321pub struct SweepReport {
322    /// Number of rows transitioned to `archived` this tick.
323    pub archived: usize,
324    /// Number of rows hard-deleted (prune phase) this tick.
325    pub pruned: usize,
326    /// Per-row errors swallowed during the sweep (e.g. a single
327    /// transcript with a corrupt namespace string). The aggregate
328    /// sweep call still returns `Ok` so the background loop keeps
329    /// running.
330    pub errors: usize,
331}
332
333/// v0.7.0 I3 — drive the transcript archive→prune lifecycle once.
334///
335/// Two phases run in order against the supplied connection. Per the
336/// I3 contract the connection is held for the full sweep so the two
337/// phases see a consistent `now`:
338///
339/// * **Phase 1 — ARCHIVE.** For every live transcript (non-NULL
340///   `archived_at` is skipped), resolve the per-namespace lifecycle
341///   from `cfg`, then archive the row when:
342///   1. `created_at + default_ttl_secs < now` (transcript itself is
343///      old enough to retire), and
344///   2. every memory linked to the transcript via the I2 join table
345///      has either expired or been deleted (a transcript with no
346///      linked memories trivially satisfies this — there is nothing
347///      keeping it live).
348///
349/// * **Phase 2 — PRUNE.** Hard-DELETE every archived row whose
350///   `archived_at + archive_grace_secs < now`. The
351///   `ON DELETE CASCADE` declared on `memory_transcript_links`
352///   cleans up the join table without an explicit second statement.
353///
354/// Phase 2 runs even if Phase 1 had errors so a single poisonous row
355/// in the archive scan does not block the prune side. `SweepReport`
356/// is the wire-shape returned to the daemon's metrics emitter.
357///
358/// # Errors
359///
360/// Returns an error only on infrastructure-level SQLite failures
361/// (connection lost, disk full). Per-row failures are folded into
362/// `SweepReport::errors`.
363pub fn sweep_transcript_lifecycle(
364    conn: &Connection,
365    cfg: &TranscriptsConfig,
366) -> Result<SweepReport> {
367    let now = Utc::now();
368    let mut report = SweepReport::default();
369
370    // Phase 1 — ARCHIVE.
371    archive_phase(conn, cfg, now, &mut report)?;
372
373    // Phase 2 — PRUNE. Each row carries its own `archived_at` so we
374    // re-resolve the per-namespace grace window for each candidate.
375    prune_phase(conn, cfg, now, &mut report)?;
376
377    Ok(report)
378}
379
380/// Phase-1 helper extracted for readability — see
381/// [`sweep_transcript_lifecycle`] for the full contract.
382fn archive_phase(
383    conn: &Connection,
384    cfg: &TranscriptsConfig,
385    now: DateTime<Utc>,
386    report: &mut SweepReport,
387) -> Result<()> {
388    // Pull every live row up front — the per-namespace TTL means we
389    // cannot push the age cutoff into SQL without committing to the
390    // global default, and the per-row aliveness check (all linked
391    // memories expired) needs another query anyway.
392    let live_candidates: Vec<(String, String, String)> = {
393        let mut stmt = conn
394            .prepare(
395                "SELECT id, namespace, created_at
396                 FROM memory_transcripts
397                 WHERE archived_at IS NULL",
398            )
399            .context("PREPARE archive_phase scan failed")?;
400        let rows = stmt
401            .query_map([], |r| {
402                Ok((
403                    r.get::<_, String>(0)?,
404                    r.get::<_, String>(1)?,
405                    r.get::<_, String>(2)?,
406                ))
407            })
408            .context("QUERY archive_phase scan failed")?;
409        rows.collect::<rusqlite::Result<Vec<_>>>()
410            .context("decode archive_phase rows")?
411    };
412
413    for (id, namespace, created_at) in live_candidates {
414        let resolved = cfg.resolve(&namespace);
415        match should_archive(conn, &id, &created_at, now, resolved) {
416            Ok(true) => {
417                let stamp = now.to_rfc3339();
418                if let Err(e) = conn.execute(
419                    "UPDATE memory_transcripts
420                        SET archived_at = ?1
421                      WHERE id = ?2 AND archived_at IS NULL",
422                    params![stamp, id],
423                ) {
424                    tracing::warn!(
425                        target: LIFECYCLE_TRACE_TARGET,
426                        "archive UPDATE failed for transcript {id}: {e}"
427                    );
428                    report.errors += 1;
429                } else {
430                    report.archived += 1;
431                }
432            }
433            Ok(false) => {}
434            Err(e) => {
435                tracing::warn!(
436                    target: LIFECYCLE_TRACE_TARGET,
437                    "archive eligibility check failed for transcript {id}: {e}"
438                );
439                report.errors += 1;
440            }
441        }
442    }
443    Ok(())
444}
445
446/// Phase-2 helper extracted for readability — see
447/// [`sweep_transcript_lifecycle`].
448fn prune_phase(
449    conn: &Connection,
450    cfg: &TranscriptsConfig,
451    now: DateTime<Utc>,
452    report: &mut SweepReport,
453) -> Result<()> {
454    let candidates: Vec<(String, String, String)> = {
455        let mut stmt = conn
456            .prepare(
457                "SELECT id, namespace, archived_at
458                 FROM memory_transcripts
459                 WHERE archived_at IS NOT NULL",
460            )
461            .context("PREPARE prune_phase scan failed")?;
462        let rows = stmt
463            .query_map([], |r| {
464                Ok((
465                    r.get::<_, String>(0)?,
466                    r.get::<_, String>(1)?,
467                    r.get::<_, String>(2)?,
468                ))
469            })
470            .context("QUERY prune_phase scan failed")?;
471        rows.collect::<rusqlite::Result<Vec<_>>>()
472            .context("decode prune_phase rows")?
473    };
474
475    for (id, namespace, archived_at) in candidates {
476        let resolved = cfg.resolve(&namespace);
477        let archived_at = match DateTime::parse_from_rfc3339(&archived_at) {
478            Ok(t) => t.with_timezone(&Utc),
479            Err(e) => {
480                tracing::warn!(
481                    target: LIFECYCLE_TRACE_TARGET,
482                    "transcript {id} has unparseable archived_at {archived_at:?}: {e}"
483                );
484                report.errors += 1;
485                continue;
486            }
487        };
488        let prune_at = archived_at + Duration::seconds(resolved.archive_grace_secs);
489        if prune_at >= now {
490            continue;
491        }
492        match conn.execute("DELETE FROM memory_transcripts WHERE id = ?1", params![id]) {
493            Ok(n) => report.pruned += n,
494            Err(e) => {
495                tracing::warn!(
496                    target: LIFECYCLE_TRACE_TARGET,
497                    "prune DELETE failed for transcript {id}: {e}"
498                );
499                report.errors += 1;
500            }
501        }
502    }
503    Ok(())
504}
505
506/// Decide whether a single transcript is archive-eligible at `now`
507/// given the resolved [`ResolvedTranscriptLifecycle`].
508///
509/// Returns `Ok(true)` only when BOTH conditions hold:
510///   1. `created_at + default_ttl_secs < now`
511///   2. every memory linked via `memory_transcript_links` has
512///      `expires_at` in the past, OR no memories link the transcript
513///      at all.
514///
515/// A memory whose `expires_at` is NULL counts as "live forever" and
516/// keeps the transcript live too — same as the substrate's
517/// [`purge_expired`] semantics for transcript rows themselves.
518fn should_archive(
519    conn: &Connection,
520    transcript_id: &str,
521    created_at: &str,
522    now: DateTime<Utc>,
523    resolved: ResolvedTranscriptLifecycle,
524) -> Result<bool> {
525    // Age cutoff first — cheaper than the join.
526    let created = DateTime::parse_from_rfc3339(created_at)
527        .with_context(|| format!("transcript {transcript_id} has unparseable created_at"))?
528        .with_timezone(&Utc);
529    let archive_at = created + Duration::seconds(resolved.default_ttl_secs);
530    if archive_at >= now {
531        return Ok(false);
532    }
533
534    // Aliveness check: count linked memories with NULL or future
535    // `expires_at`. SQLite returns 0 for the COUNT when the join is
536    // empty, so a transcript with no links is trivially eligible.
537    let now_str = now.to_rfc3339();
538    let alive: i64 = conn
539        .query_row(
540            "SELECT COUNT(*)
541               FROM memory_transcript_links l
542               JOIN memories m ON m.id = l.memory_id
543              WHERE l.transcript_id = ?1
544                AND (m.expires_at IS NULL OR m.expires_at > ?2)",
545            params![transcript_id, now_str],
546            |r| r.get(0),
547        )
548        .with_context(|| format!("alive-memory count failed for transcript {transcript_id}"))?;
549    Ok(alive == 0)
550}
551
552fn zstd_compress(input: &[u8]) -> Result<Vec<u8>> {
553    let mut out = Vec::with_capacity(input.len() / 4 + 64);
554    {
555        let mut encoder = zstd::stream::write::Encoder::new(&mut out, ZSTD_LEVEL)?;
556        encoder.write_all(input)?;
557        encoder.finish()?;
558    }
559    Ok(out)
560}
561
562/// v0.7.0 I1 hardening — bounded zstd decoder.
563///
564/// Streams the decoder one fixed-size chunk at a time and bails the
565/// moment the accumulated decompressed length would exceed
566/// [`MAX_DECOMPRESSED_BYTES`]. Without this cap a hostile writer with
567/// direct DB access could ship a small (~1 KB) zstd blob that decodes
568/// into gigabytes and OOMs the daemon (a classic decompression bomb).
569///
570/// On overflow the function returns an error AND emits a structured
571/// `tracing::warn!` line under the `transcripts.bomb` target so a
572/// downstream audit log captures the rejection without the SQLite
573/// row id (the caller does not pass it in here — the surrounding
574/// [`fetch`] caller logs the id alongside the bubbled error).
575fn zstd_decompress(input: &[u8]) -> Result<Vec<u8>> {
576    // Cap the initial allocation too — a blob whose compressed size
577    // alone is enormous is itself a smell, but `with_capacity` on a
578    // hostile input shouldn't reserve gigabytes upfront either.
579    let init_cap = std::cmp::min(input.len() * 4, MAX_DECOMPRESSED_BYTES);
580    let mut out = Vec::with_capacity(init_cap);
581    let mut decoder = zstd::stream::read::Decoder::new(input)?;
582    // 64 KiB read window — large enough to amortise syscall overhead
583    // on a normal-sized transcript, small enough to bound the
584    // post-overflow drain to a single buffer.
585    let mut buf = [0u8; 64 * 1024];
586    loop {
587        let n = decoder.read(&mut buf)?;
588        if n == 0 {
589            break;
590        }
591        if out.len().saturating_add(n) > MAX_DECOMPRESSED_BYTES {
592            tracing::warn!(
593                target: "transcripts.bomb",
594                cap_bytes = MAX_DECOMPRESSED_BYTES,
595                so_far = out.len(),
596                "rejecting transcript: decompressed size would exceed cap"
597            );
598            return Err(anyhow!(
599                "transcript decompression exceeded {} byte cap (decompression bomb defence)",
600                MAX_DECOMPRESSED_BYTES
601            ));
602        }
603        out.extend_from_slice(&buf[..n]);
604    }
605    Ok(out)
606}
607
608// -----------------------------------------------------------------
609// L0.7-2 Tier A — transcripts/storage tests
610// All paths exercised over `:memory:` SQLite via `crate::db::open` so
611// the daemon's schema is applied. No /tmp writes.
612// -----------------------------------------------------------------
613#[cfg(test)]
614mod tests {
615    use super::*;
616    use rusqlite::Connection;
617
618    fn fresh_db() -> Connection {
619        crate::db::open(std::path::Path::new(":memory:")).expect("open in-memory db")
620    }
621
622    fn insert_memory(conn: &Connection, id: &str, expires_at: Option<&str>) {
623        let now = Utc::now().to_rfc3339();
624        // v0.7.0 fix campaign R1-M2 — substrate CHECK trigger now
625        // enforces tier ∈ {short, mid, long}. Pre-fix this fixture
626        // wrote `'short_term'`; that string was always semantically
627        // wrong (`Tier::Short.as_str()` == "short").
628        conn.execute(
629            "INSERT INTO memories (
630                id, tier, namespace, title, content, expires_at, created_at, updated_at
631             ) VALUES (?1, 'short', 'ns', ?2, 'body', ?3, ?4, ?4)",
632            rusqlite::params![id, format!("title-{id}"), expires_at, now],
633        )
634        .expect("insert test memory");
635    }
636
637    #[test]
638    fn store_and_fetch_round_trips_content() {
639        let conn = fresh_db();
640        let body = "hello transcripts";
641        let t = store(&conn, "ns-x", body, None).expect("store ok");
642        assert_eq!(t.namespace, "ns-x");
643        assert!(t.compressed_size > 0);
644        assert_eq!(t.original_size, body.len() as i64);
645        let back = fetch(&conn, &t.id).expect("fetch ok").expect("present");
646        assert_eq!(back, body);
647    }
648
649    #[test]
650    fn store_with_ttl_sets_expires_at() {
651        let conn = fresh_db();
652        let t = store(&conn, "ns-x", "body", Some(Duration::seconds(120))).expect("store ok");
653        assert!(t.expires_at.is_some());
654    }
655
656    #[test]
657    fn fetch_missing_id_returns_none() {
658        let conn = fresh_db();
659        let r = fetch(&conn, "no-such-id").expect("query ok");
660        assert!(r.is_none());
661    }
662
663    #[test]
664    fn fetch_metadata_returns_handle_without_blob() {
665        let conn = fresh_db();
666        let t = store(&conn, "ns-x", "body", None).expect("store ok");
667        let meta = fetch_metadata(&conn, &t.id)
668            .expect("query ok")
669            .expect("present");
670        assert_eq!(meta.id, t.id);
671        assert_eq!(meta.namespace, "ns-x");
672        assert_eq!(meta.original_size, t.original_size);
673    }
674
675    #[test]
676    fn fetch_metadata_missing_returns_none() {
677        let conn = fresh_db();
678        let r = fetch_metadata(&conn, "no-such-id").expect("query ok");
679        assert!(r.is_none());
680    }
681
682    #[test]
683    fn purge_expired_removes_only_past_due_rows() {
684        let conn = fresh_db();
685        // Past: 1 hour ago
686        let _live = store(&conn, "ns-x", "live", None).expect("store live");
687        // Manually set an expires_at in the past on a second row.
688        let past = store(&conn, "ns-x", "past", None).expect("store past");
689        conn.execute(
690            "UPDATE memory_transcripts SET expires_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
691            rusqlite::params![past.id],
692        )
693        .unwrap();
694        let n = purge_expired(&conn).expect("purge ok");
695        assert_eq!(n, 1, "exactly one past-expiry row");
696        assert!(fetch(&conn, &past.id).unwrap().is_none());
697    }
698
699    #[test]
700    fn link_and_transcripts_for_memory_round_trip() {
701        let conn = fresh_db();
702        insert_memory(&conn, "m1", None);
703        let t = store(&conn, "ns-x", "body", None).expect("store ok");
704        link_transcript(&conn, "m1", &t.id, Some(0), Some(4)).expect("link ok");
705        let links = transcripts_for_memory(&conn, "m1").expect("query ok");
706        assert_eq!(links.len(), 1);
707        assert_eq!(links[0].memory_id, "m1");
708        assert_eq!(links[0].transcript_id, t.id);
709        assert_eq!(links[0].span_start, Some(0));
710        assert_eq!(links[0].span_end, Some(4));
711    }
712
713    #[test]
714    fn memories_for_transcript_round_trip() {
715        let conn = fresh_db();
716        insert_memory(&conn, "m1", None);
717        insert_memory(&conn, "m2", None);
718        let t = store(&conn, "ns-x", "body", None).expect("store ok");
719        link_transcript(&conn, "m1", &t.id, None, None).expect("link ok");
720        link_transcript(&conn, "m2", &t.id, None, None).expect("link ok");
721        let mems = memories_for_transcript(&conn, &t.id).expect("query ok");
722        assert_eq!(mems.len(), 2);
723        // Ordered by memory_id alphabetically per the SQL spec
724        assert_eq!(mems[0].memory_id, "m1");
725        assert_eq!(mems[1].memory_id, "m2");
726    }
727
728    #[test]
729    fn link_transcript_replaces_on_duplicate_pair() {
730        let conn = fresh_db();
731        insert_memory(&conn, "m1", None);
732        let t = store(&conn, "ns-x", "body", None).expect("store ok");
733        link_transcript(&conn, "m1", &t.id, Some(0), Some(4)).expect("link ok");
734        // Re-link the same pair with different span — INSERT OR REPLACE.
735        link_transcript(&conn, "m1", &t.id, Some(2), Some(10)).expect("relink ok");
736        let links = transcripts_for_memory(&conn, "m1").expect("query ok");
737        assert_eq!(links.len(), 1);
738        assert_eq!(links[0].span_start, Some(2));
739        assert_eq!(links[0].span_end, Some(10));
740    }
741
742    #[test]
743    fn sweep_archives_aged_rows_with_no_links() {
744        let conn = fresh_db();
745        let t = store(&conn, "ns-x", "old", None).expect("store ok");
746        // Backdate created_at far enough that the default TTL fires.
747        conn.execute(
748            "UPDATE memory_transcripts SET created_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
749            rusqlite::params![t.id],
750        )
751        .unwrap();
752        let cfg = TranscriptsConfig::default();
753        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
754        assert!(report.archived >= 1, "expected archive: {report:?}");
755    }
756
757    #[test]
758    fn sweep_prunes_archived_rows_past_grace() {
759        let conn = fresh_db();
760        let t = store(&conn, "ns-x", "old", None).expect("store ok");
761        // Mark archived a long time ago so the grace window has elapsed.
762        conn.execute(
763            "UPDATE memory_transcripts SET archived_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
764            rusqlite::params![t.id],
765        )
766        .unwrap();
767        let cfg = TranscriptsConfig::default();
768        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
769        assert_eq!(report.pruned, 1, "expected prune: {report:?}");
770        assert!(fetch_metadata(&conn, &t.id).unwrap().is_none());
771    }
772
773    #[test]
774    fn sweep_skips_live_rows() {
775        let conn = fresh_db();
776        let t = store(&conn, "ns-x", "fresh body", None).expect("store ok");
777        let cfg = TranscriptsConfig::default();
778        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
779        // Just created — nothing to archive or prune.
780        assert_eq!(report.archived, 0);
781        assert_eq!(report.pruned, 0);
782        assert!(fetch_metadata(&conn, &t.id).unwrap().is_some());
783    }
784
785    #[test]
786    fn sweep_skips_archive_when_memory_still_alive() {
787        // Phase 1 archive requires every linked memory to have expired.
788        // A live memory keeps the transcript alive.
789        let conn = fresh_db();
790        insert_memory(&conn, "m1", None); // expires_at NULL ⇒ live forever
791        let t = store(&conn, "ns-x", "body", None).expect("store ok");
792        link_transcript(&conn, "m1", &t.id, None, None).expect("link ok");
793        // Age the transcript out.
794        conn.execute(
795            "UPDATE memory_transcripts SET created_at = '2000-01-01T00:00:00+00:00' WHERE id = ?1",
796            rusqlite::params![t.id],
797        )
798        .unwrap();
799        let cfg = TranscriptsConfig::default();
800        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
801        // Memory is still live → should_archive returns false → archived 0.
802        assert_eq!(
803            report.archived, 0,
804            "live memory keeps transcript: {report:?}"
805        );
806    }
807
808    #[test]
809    fn sweep_handles_unparseable_archived_at() {
810        // Prune phase walks archived rows and tolerates an unparseable
811        // archived_at by incrementing the errors counter and skipping.
812        let conn = fresh_db();
813        let t = store(&conn, "ns-x", "body", None).expect("store ok");
814        conn.execute(
815            "UPDATE memory_transcripts SET archived_at = 'not-a-date' WHERE id = ?1",
816            rusqlite::params![t.id],
817        )
818        .unwrap();
819        let cfg = TranscriptsConfig::default();
820        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
821        assert!(report.errors >= 1, "expected error tally: {report:?}");
822        assert_eq!(report.pruned, 0, "unparseable row must not be pruned");
823    }
824
825    #[test]
826    fn should_archive_returns_false_when_within_ttl() {
827        let conn = fresh_db();
828        let t = store(&conn, "ns-x", "fresh", None).expect("store ok");
829        // No backdating — created_at is "now". should_archive must
830        // return false on the age cutoff.
831        let cfg = TranscriptsConfig::default();
832        let resolved = cfg.resolve("ns-x");
833        let res = super::should_archive(&conn, &t.id, &t.created_at, Utc::now(), resolved)
834            .expect("should_archive ok");
835        assert!(!res, "fresh row must not be archive-eligible");
836    }
837
838    #[test]
839    fn sweep_archive_phase_tallies_should_archive_failure() {
840        // Lines 430-435: archive_phase increments errors when
841        // should_archive itself returns Err (unparseable created_at on
842        // the row).
843        let conn = fresh_db();
844        let t = store(&conn, "ns-x", "body", None).expect("store ok");
845        conn.execute(
846            "UPDATE memory_transcripts SET created_at = 'not-a-date' WHERE id = ?1",
847            rusqlite::params![t.id],
848        )
849        .unwrap();
850        let cfg = TranscriptsConfig::default();
851        let report = sweep_transcript_lifecycle(&conn, &cfg).expect("sweep ok");
852        assert!(report.errors >= 1, "expected error tally: {report:?}");
853        assert_eq!(report.archived, 0);
854    }
855
856    #[test]
857    fn should_archive_propagates_unparseable_created_at() {
858        let conn = fresh_db();
859        let cfg = TranscriptsConfig::default();
860        let resolved = cfg.resolve("ns-x");
861        let err =
862            super::should_archive(&conn, "id", "not-a-date", Utc::now(), resolved).unwrap_err();
863        let msg = format!("{err:#}");
864        assert!(msg.contains("unparseable created_at"), "got: {msg}");
865    }
866
867    #[test]
868    fn zstd_round_trip_decodes_to_original() {
869        let original = b"some non-trivial bytes \x00\x01\x02 with binary";
870        let blob = super::zstd_compress(original).expect("compress");
871        let back = super::zstd_decompress(&blob).expect("decompress");
872        assert_eq!(back, original);
873    }
874
875    #[test]
876    fn zstd_decompress_rejects_oversized_blob() {
877        // Build a blob that decompresses to > MAX_DECOMPRESSED_BYTES.
878        // Cheapest path: compress 17 MiB of zeros — zstd compresses
879        // this down to a small blob but decompression must trip the cap.
880        let big = vec![0u8; super::MAX_DECOMPRESSED_BYTES + 1024];
881        let blob = super::zstd_compress(&big).expect("compress");
882        let err = super::zstd_decompress(&blob).unwrap_err();
883        let msg = format!("{err}");
884        assert!(msg.contains("decompression bomb"), "got: {msg}");
885    }
886
887    #[test]
888    fn fetch_invalid_utf8_blob_returns_error() {
889        // Surgically replace a transcript's content_blob with a valid
890        // zstd-of-invalid-utf8 sequence; fetch must surface a
891        // "did not decode to valid UTF-8" error.
892        let conn = fresh_db();
893        let t = store(&conn, "ns-x", "placeholder", None).expect("store");
894        let bad_blob = super::zstd_compress(&[0xFF, 0xFE, 0xFD]).expect("compress bad utf8");
895        conn.execute(
896            "UPDATE memory_transcripts SET content_blob = ?1 WHERE id = ?2",
897            rusqlite::params![bad_blob, t.id],
898        )
899        .unwrap();
900        let err = fetch(&conn, &t.id).unwrap_err();
901        let msg = format!("{err:#}");
902        assert!(msg.contains("UTF-8") || msg.contains("utf"), "got: {msg}");
903    }
904}