Skip to main content

ai_memory/offload/
mod.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 QW-3 — context-offload substrate primitive.
5//!
6//! Substrate plumbing for the offload+deref pattern absorbed from
7//! the Tencent comparison (2026-05-15). The FULL pattern (Mermaid
8//! canvas, auto-cadence, node_id integration) targets v0.8.0; this
9//! module ships the substrate so v0.8.0 has plumbing to call.
10//!
11//! # Pipeline
12//!
13//! - SHA-256 over the original bytes (decompressed) is the integrity
14//!   commitment.
15//! - `ref_id` format: `ofl_<base32-of-sha256-first-8-bytes>`. 13 chars
16//!   of payload after the `ofl_` prefix — short enough to keep in an
17//!   agent's working window, long enough that a 40-bit prefix
18//!   collision is vanishingly rare for typical fleet scales.
19//! - Body compressed with zstd level 3 — matches `memory_transcripts`
20//!   (the existing sidechain transcripts pipeline) for cross-codebase
21//!   parity.
22//! - Ed25519 signature is over the canonical bundle
23//!   `{ ref_id, content_sha256, stored_at, namespace }` encoded as
24//!   deterministic CBOR (RFC 8949 §4.2.1). Same encoder family as
25//!   `identity::sign::canonical_cbor` (the H2 link signer).
26//! - A sibling row lands in `signed_events` with `event_type =
27//!   context_offloaded` or `context_dereferenced`, binding the
28//!   substrate write into the H5 audit chain.
29//!
30//! # Tamper handling
31//!
32//! `deref` recomputes the SHA-256 of the freshly-decompressed bytes
33//! and refuses with `OffloadError::IntegrityFailed` when it disagrees
34//! with the stored `content_sha256`. The signature is verified against
35//! the storing agent's public key when that key is provided to the
36//! offloader at construction; absent the key, the integrity check
37//! alone is the load-bearing tamper guard.
38//!
39//! # Out of scope (v0.7.0)
40//!
41//! - Mermaid canvas integration (v0.8.0).
42//! - Auto-cadence trigger from the recall pipeline (v0.8.0).
43//! - `node_id` cross-link into the `memories` table (v0.8.0).
44
45use std::collections::BTreeMap;
46use std::time::{SystemTime, UNIX_EPOCH};
47
48use anyhow::{Context, Result, anyhow};
49use base64::Engine;
50use base64::engine::general_purpose::URL_SAFE_NO_PAD;
51use ed25519_dalek::{Signer, Verifier};
52use rusqlite::{Connection, OptionalExtension, params};
53use sha2::{Digest, Sha256};
54
55use crate::identity::keypair::AgentKeypair;
56use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
57
58/// Default zstd compression level — matches the sidechain transcripts
59/// pipeline (`transcripts::storage::ZSTD_LEVEL`).
60const ZSTD_LEVEL: i32 = 3;
61
62/// Hard cap on the decompressed size of a single offloaded blob. Same
63/// 16 MiB ceiling the transcripts module enforces — defends against
64/// pathological zstd bombs landing through `deref`. v0.8.0 may raise
65/// this for the Mermaid-canvas use case after threat-modelling.
66pub const MAX_DECOMPRESSED_BYTES: usize = 16 * 1024 * 1024;
67
68/// Default per-blob byte limit when no namespace policy override is set.
69/// 1 MiB — Tencent's offload primitive uses ~256 KB chunks; 1 MiB
70/// gives headroom for batched tool outputs without crossing the
71/// hostile-bomb threshold above.
72pub const DEFAULT_MAX_OFFLOAD_BLOB_BYTES: u32 = 1_048_576;
73
74/// RFC 4648 base32 alphabet (without padding). Used to encode the
75/// 8-byte prefix of the content's SHA-256 into a 13-char `ref_id`
76/// body. Avoids pulling a one-trick crate (no `base32` / `data-
77/// encoding` is currently in the dependency tree).
78const BASE32_ALPHABET: &[u8; 32] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
79
80/// `ref_id` prefix — `ofl_` keeps the offload class identifiable in
81/// audit logs, recall queries, and operator dashboards. Pairs the
82/// `mem-` / `lnk-` ergonomic convention used elsewhere in the
83/// substrate.
84const REF_ID_PREFIX: &str = "ofl_";
85
86/// Outcome of [`ContextOffloader::offload`]. Callers persist
87/// `ref_id` and discard the content payload — that is the whole
88/// point of offload+deref.
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct OffloadResult {
91    pub ref_id: String,
92    pub content_sha256: String,
93    pub stored_at: i64,
94}
95
96/// Outcome of [`ContextOffloader::deref`]. Returns the original
97/// (decompressed) content alongside the metadata that committed it.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct DerefResult {
100    pub content: String,
101    pub stored_at: i64,
102    pub sha256: String,
103}
104
105/// Domain errors callers may want to discriminate on (size limits,
106/// integrity failures, signature mismatches). All other failure modes
107/// bubble through `anyhow::Error`. `Display` and `std::error::Error`
108/// are implemented by hand to avoid pulling the optional `thiserror`
109/// crate into the default feature set.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub enum OffloadError {
112    SizeLimitExceeded { actual: usize, limit: usize },
113    IntegrityFailed { ref_id: String },
114    SignatureFailed { ref_id: String },
115    NotFound { ref_id: String },
116}
117
118impl std::fmt::Display for OffloadError {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self {
121            Self::SizeLimitExceeded { actual, limit } => {
122                write!(f, "offload blob {actual} bytes exceeds policy max {limit}")
123            }
124            Self::IntegrityFailed { ref_id } => write!(
125                f,
126                "offloaded blob {ref_id} integrity check failed (content tampered)"
127            ),
128            Self::SignatureFailed { ref_id } => {
129                write!(f, "offloaded blob {ref_id} signature verification failed")
130            }
131            Self::NotFound { ref_id } => write!(f, "offloaded blob {ref_id} not found"),
132        }
133    }
134}
135
136impl std::error::Error for OffloadError {}
137
138/// Static configuration consumed by [`ContextOffloader`].
139#[derive(Debug, Clone)]
140pub struct OffloadConfig {
141    /// Hard ceiling on the decompressed content length (bytes).
142    /// Callers can shrink this below [`DEFAULT_MAX_OFFLOAD_BLOB_BYTES`]
143    /// per namespace via the `max_offload_blob_bytes` policy knob in
144    /// v0.8.0 (substrate-only in v0.7.0).
145    pub max_offload_blob_bytes: u32,
146    /// Default TTL applied when the caller passes `ttl_seconds = None`.
147    /// `None` (the default) means "permanent until explicit operator
148    /// delete".
149    pub default_offload_ttl_seconds: Option<u64>,
150}
151
152impl Default for OffloadConfig {
153    fn default() -> Self {
154        Self {
155            max_offload_blob_bytes: DEFAULT_MAX_OFFLOAD_BLOB_BYTES,
156            default_offload_ttl_seconds: None,
157        }
158    }
159}
160
161/// Substrate-level engine for offload+deref. Composed from the
162/// caller's keypair, the existing SQLite connection, and the
163/// `OffloadConfig` defaults.
164pub struct ContextOffloader<'a> {
165    conn: &'a Connection,
166    signer: Option<&'a AgentKeypair>,
167    config: OffloadConfig,
168}
169
170impl<'a> ContextOffloader<'a> {
171    /// Construct a new offloader. Pass `signer = None` for read-only
172    /// `deref` workflows.
173    #[must_use]
174    pub fn new(
175        conn: &'a Connection,
176        signer: Option<&'a AgentKeypair>,
177        config: OffloadConfig,
178    ) -> Self {
179        Self {
180            conn,
181            signer,
182            config,
183        }
184    }
185
186    /// Offload `content` and return the `ref_id` callers persist in
187    /// place of the full payload.
188    ///
189    /// # Errors
190    ///
191    /// - [`OffloadError::SizeLimitExceeded`] when `content` is larger
192    ///   than the configured per-blob ceiling.
193    /// - `anyhow::Error` for zstd / SQLite / signing failures.
194    pub fn offload(
195        &self,
196        content: &str,
197        namespace: &str,
198        ttl_seconds: Option<u64>,
199        agent_id: &str,
200    ) -> Result<OffloadResult> {
201        let limit = self.config.max_offload_blob_bytes as usize;
202        if content.len() > limit {
203            return Err(anyhow!(OffloadError::SizeLimitExceeded {
204                actual: content.len(),
205                limit,
206            }));
207        }
208        // SHA-256 of the original bytes — the integrity commitment.
209        let sha = sha256_hex(content.as_bytes());
210        let ref_id = ref_id_from_sha(&sha);
211        let stored_at = now_unix_seconds();
212        let effective_ttl = ttl_seconds.or(self.config.default_offload_ttl_seconds);
213        // Compress AFTER the integrity hash is taken — the stored
214        // sha256 commits to the ORIGINAL bytes so a future codec
215        // upgrade can decode legacy rows without breaking the
216        // integrity check.
217        let blob = zstd_compress(content.as_bytes()).context("zstd compression failed")?;
218
219        // Canonical signing bundle. `i64::try_from` keeps the encoded
220        // bytes byte-stable for the verifier; an `i128`-shaped value
221        // would never survive the round-trip through SQLite anyway.
222        let stored_at_signed: i64 = stored_at;
223        let signature_b64 = if let Some(keypair) = self.signer {
224            let payload = canonical_payload(&ref_id, &sha, stored_at_signed, namespace)?;
225            let signing = keypair.private.as_ref().with_context(|| {
226                format!(
227                    "AgentKeypair for {} has no private key — cannot sign offload",
228                    keypair.agent_id
229                )
230            })?;
231            URL_SAFE_NO_PAD.encode(signing.sign(&payload).to_bytes())
232        } else {
233            String::new()
234        };
235
236        let ttl_param: Option<i64> = effective_ttl.and_then(|n| i64::try_from(n).ok());
237        self.conn
238            .execute(
239                "INSERT INTO offloaded_blobs (
240                    ref_id, namespace, content_zstd, content_sha256,
241                    stored_at, ttl_seconds, agent_id, signature_b64
242                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
243                 ON CONFLICT(ref_id) DO UPDATE SET
244                    namespace = excluded.namespace,
245                    content_zstd = excluded.content_zstd,
246                    content_sha256 = excluded.content_sha256,
247                    stored_at = excluded.stored_at,
248                    ttl_seconds = excluded.ttl_seconds,
249                    agent_id = excluded.agent_id,
250                    signature_b64 = excluded.signature_b64",
251                params![
252                    ref_id,
253                    namespace,
254                    blob,
255                    sha,
256                    stored_at,
257                    ttl_param,
258                    agent_id,
259                    signature_b64,
260                ],
261            )
262            .context("INSERT into offloaded_blobs failed")?;
263
264        // Audit: sibling row in signed_events binds this write to the
265        // H5 cross-row hash chain so a downstream auditor can replay
266        // the exact offload event without diffing the mutable
267        // offloaded_blobs table.
268        append_audit_row(
269            self.conn,
270            agent_id,
271            "context_offloaded",
272            &ref_id,
273            &sha,
274            namespace,
275            stored_at_signed,
276            &signature_b64,
277        )?;
278
279        Ok(OffloadResult {
280            ref_id,
281            content_sha256: sha,
282            stored_at: stored_at_signed,
283        })
284    }
285
286    /// Dereference a `ref_id` and return the original content.
287    ///
288    /// # IDOR (SEC-4, Cluster D, issue #767)
289    ///
290    /// `caller_agent_id` is the **authenticated** identity of the
291    /// caller (resolved via [`crate::identity::resolve_agent_id`]
292    /// upstream). The stored row's `agent_id` is consulted as the
293    /// owner of the blob; when the caller is not the owner, this
294    /// function returns [`OffloadError::NotFound`] (leak-resistant
295    /// — does NOT reveal the blob exists). A future K9 cross-agent
296    /// grant check can layer on top of this; pass `None` to BYPASS
297    /// the ownership gate (substrate-internal sweepers, integrity
298    /// audits, operator dump tools — none of which originate from
299    /// an authenticated agent context).
300    ///
301    /// # Errors
302    ///
303    /// - [`OffloadError::NotFound`] when `ref_id` has no row OR
304    ///   when the caller is not the stored owner (leak-resistant).
305    /// - [`OffloadError::IntegrityFailed`] when the decompressed
306    ///   content's SHA-256 disagrees with the stored hash (tamper).
307    /// - [`OffloadError::SignatureFailed`] when a signer was provided
308    ///   and the stored Ed25519 signature fails to verify.
309    pub fn deref(&self, ref_id: &str, caller_agent_id: Option<&str>) -> Result<DerefResult> {
310        let row: Option<(Vec<u8>, String, i64, String, String, String)> = self
311            .conn
312            .query_row(
313                "SELECT content_zstd, content_sha256, stored_at, namespace,
314                        agent_id, signature_b64
315                 FROM offloaded_blobs WHERE ref_id = ?1",
316                params![ref_id],
317                |r| {
318                    Ok((
319                        r.get(0)?,
320                        r.get(1)?,
321                        r.get(2)?,
322                        r.get(3)?,
323                        r.get(4)?,
324                        r.get(5)?,
325                    ))
326                },
327            )
328            .optional()
329            .context("SELECT offloaded_blobs failed")?;
330
331        let (blob, stored_sha, stored_at, namespace, agent_id, signature_b64) =
332            row.ok_or_else(|| {
333                anyhow!(OffloadError::NotFound {
334                    ref_id: ref_id.to_string(),
335                })
336            })?;
337
338        // SEC-4 (Cluster D, issue #767) — IDOR gate. The MCP
339        // `handle_deref` handler always passes an authenticated
340        // `caller_agent_id`; substrate-internal callers that
341        // legitimately need to reach any row (TTL sweeper, integrity
342        // audit, operator dump) pass `None` to BYPASS this check.
343        // Mismatch maps to NotFound (not Unauthorized) so a casual
344        // probe cannot enumerate ref_ids by error-message
345        // differentiation.
346        if let Some(caller) = caller_agent_id
347            && caller != agent_id
348        {
349            tracing::info!(
350                ref_id = %ref_id,
351                caller = %caller,
352                "SEC-4: handle_deref ownership mismatch — surfacing NotFound (leak-resistant)"
353            );
354            return Err(anyhow!(OffloadError::NotFound {
355                ref_id: ref_id.to_string(),
356            }));
357        }
358
359        // Optional: verify the signature against the supplied key BEFORE
360        // decompressing. Catches tampered blobs early without the zstd
361        // round-trip cost. Skipped when the offloader has no keypair
362        // (read-only workflows).
363        if let Some(keypair) = self.signer {
364            if !signature_b64.is_empty() {
365                let payload = canonical_payload(ref_id, &stored_sha, stored_at, &namespace)?;
366                let sig_bytes = URL_SAFE_NO_PAD
367                    .decode(signature_b64.as_bytes())
368                    .context("decode stored signature_b64")?;
369                let sig_arr: [u8; 64] = sig_bytes
370                    .as_slice()
371                    .try_into()
372                    .context("stored signature is not 64 bytes")?;
373                let sig = ed25519_dalek::Signature::from_bytes(&sig_arr);
374                if keypair.public.verify(&payload, &sig).is_err() {
375                    return Err(anyhow!(OffloadError::SignatureFailed {
376                        ref_id: ref_id.to_string(),
377                    }));
378                }
379            }
380        }
381
382        let bytes = zstd_decompress(&blob).context("zstd decompression failed")?;
383        // Refuse to surface non-UTF-8 content — the offload API is
384        // string-shaped at the input boundary, so a non-UTF-8 stream
385        // is by definition a tampered or corrupted row.
386        let content = String::from_utf8(bytes).map_err(|_| OffloadError::IntegrityFailed {
387            ref_id: ref_id.to_string(),
388        })?;
389        let recomputed = sha256_hex(content.as_bytes());
390        if recomputed != stored_sha {
391            return Err(anyhow!(OffloadError::IntegrityFailed {
392                ref_id: ref_id.to_string(),
393            }));
394        }
395
396        append_audit_row(
397            self.conn,
398            &agent_id,
399            "context_dereferenced",
400            ref_id,
401            &stored_sha,
402            &namespace,
403            stored_at,
404            &signature_b64,
405        )?;
406
407        Ok(DerefResult {
408            content,
409            stored_at,
410            sha256: stored_sha,
411        })
412    }
413}
414
415/// SHA-256 hex-string helper.
416fn sha256_hex(input: &[u8]) -> String {
417    let mut hasher = Sha256::new();
418    hasher.update(input);
419    bytes_to_hex(&hasher.finalize())
420}
421
422/// Lower-case hex encoding of an arbitrary byte slice. Hand-rolled to
423/// avoid a `hex` crate dep.
424fn bytes_to_hex(bytes: &[u8]) -> String {
425    const HEX: &[u8; 16] = b"0123456789abcdef";
426    let mut out = String::with_capacity(bytes.len() * 2);
427    for byte in bytes {
428        out.push(HEX[(byte >> 4) as usize] as char);
429        out.push(HEX[(byte & 0x0F) as usize] as char);
430    }
431    out
432}
433
434/// RFC 4648 base32 (no padding) of `bytes`. Matches the alphabet used
435/// in standard CLI output for short identifiers.
436fn base32_encode(bytes: &[u8]) -> String {
437    let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
438    let mut buffer: u32 = 0;
439    let mut bits: u32 = 0;
440    for byte in bytes {
441        buffer = (buffer << 8) | u32::from(*byte);
442        bits += 8;
443        while bits >= 5 {
444            bits -= 5;
445            let idx = ((buffer >> bits) & 0x1F) as usize;
446            out.push(BASE32_ALPHABET[idx] as char);
447        }
448    }
449    if bits > 0 {
450        let idx = ((buffer << (5 - bits)) & 0x1F) as usize;
451        out.push(BASE32_ALPHABET[idx] as char);
452    }
453    out
454}
455
456/// `ofl_<base32-of-sha256-first-8-bytes>`. Pure function of the
457/// hex-encoded SHA so callers can reconstruct the id offline.
458fn ref_id_from_sha(sha_hex: &str) -> String {
459    // First 8 bytes = first 16 hex chars.
460    let mut first_8 = [0u8; 8];
461    for (i, byte) in first_8.iter_mut().enumerate() {
462        let hi = hex_nibble(sha_hex.as_bytes()[i * 2]);
463        let lo = hex_nibble(sha_hex.as_bytes()[i * 2 + 1]);
464        *byte = (hi << 4) | lo;
465    }
466    format!("{REF_ID_PREFIX}{}", base32_encode(&first_8))
467}
468
469fn hex_nibble(byte: u8) -> u8 {
470    match byte {
471        b'0'..=b'9' => byte - b'0',
472        b'a'..=b'f' => byte - b'a' + 10,
473        b'A'..=b'F' => byte - b'A' + 10,
474        _ => 0,
475    }
476}
477
478/// RFC 8949 §4.2.1 deterministic CBOR over `{ ref_id, content_sha256,
479/// stored_at, namespace }`. Same encoder family as the H2 link
480/// signer; map keys are sorted lexicographically by the underlying
481/// BTreeMap iteration.
482fn canonical_payload(
483    ref_id: &str,
484    content_sha256: &str,
485    stored_at: i64,
486    namespace: &str,
487) -> Result<Vec<u8>> {
488    let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
489    map.insert(
490        crate::models::field_names::CONTENT_SHA256,
491        ciborium::Value::Text(content_sha256.to_string()),
492    );
493    map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
494    map.insert("ref_id", ciborium::Value::Text(ref_id.to_string()));
495    map.insert("stored_at", ciborium::Value::Integer(stored_at.into()));
496    let value = ciborium::Value::Map(
497        map.into_iter()
498            .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
499            .collect(),
500    );
501    let mut buf = Vec::new();
502    ciborium::into_writer(&value, &mut buf).context("encode canonical offload payload")?;
503    Ok(buf)
504}
505
506fn now_unix_seconds() -> i64 {
507    SystemTime::now()
508        .duration_since(UNIX_EPOCH)
509        .map(|d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
510        .unwrap_or(0)
511}
512
513fn zstd_compress(input: &[u8]) -> Result<Vec<u8>> {
514    use std::io::Write;
515    let mut out = Vec::with_capacity(input.len() / 4 + 64);
516    {
517        let mut encoder = zstd::stream::write::Encoder::new(&mut out, ZSTD_LEVEL)?;
518        encoder.write_all(input)?;
519        encoder.finish()?;
520    }
521    Ok(out)
522}
523
524fn zstd_decompress(input: &[u8]) -> Result<Vec<u8>> {
525    use std::io::Read;
526    let init_cap = std::cmp::min(input.len() * 4, MAX_DECOMPRESSED_BYTES);
527    let mut out = Vec::with_capacity(init_cap);
528    let mut decoder = zstd::stream::read::Decoder::new(input)?;
529    let mut buf = [0u8; 64 * 1024];
530    loop {
531        let n = decoder.read(&mut buf)?;
532        if n == 0 {
533            break;
534        }
535        if out.len().saturating_add(n) > MAX_DECOMPRESSED_BYTES {
536            return Err(anyhow!(
537                "offloaded blob decompression exceeded {MAX_DECOMPRESSED_BYTES} byte cap"
538            ));
539        }
540        out.extend_from_slice(&buf[..n]);
541    }
542    Ok(out)
543}
544
545/// Audit-row helper. Keeps the offload + deref call sites
546/// symmetrical — both event types commit to the same canonical
547/// payload bytes, so a downstream verifier can re-derive the hash
548/// without branching on event type.
549fn append_audit_row(
550    conn: &Connection,
551    agent_id: &str,
552    event_type: &str,
553    ref_id: &str,
554    content_sha256: &str,
555    namespace: &str,
556    stored_at: i64,
557    signature_b64: &str,
558) -> Result<()> {
559    let payload = canonical_payload(ref_id, content_sha256, stored_at, namespace)?;
560    let hash = payload_hash(&payload);
561    let signature_bytes = if signature_b64.is_empty() {
562        None
563    } else {
564        Some(
565            URL_SAFE_NO_PAD
566                .decode(signature_b64.as_bytes())
567                .context("decode signature_b64 for audit row")?,
568        )
569    };
570    // #1438 fix: prior code wrote orphan attest_level="signed" — not a valid
571    // AttestLevel variant, leaking a fourth-shape that AttestLevel::from_str
572    // silently downgraded to None. The offload audit row is signed locally
573    // by the daemon, so the canonical variant is SelfSigned.
574    let attest_level = if signature_bytes.is_some() {
575        crate::models::AttestLevel::SelfSigned.as_str()
576    } else {
577        crate::models::AttestLevel::Unsigned.as_str()
578    };
579    let event = SignedEvent {
580        id: uuid::Uuid::new_v4().to_string(),
581        agent_id: agent_id.to_string(),
582        event_type: event_type.to_string(),
583        payload_hash: hash,
584        signature: signature_bytes,
585        attest_level: attest_level.to_string(),
586        timestamp: chrono::Utc::now().to_rfc3339(),
587        prev_hash: Vec::new(),
588        sequence: 0,
589    };
590    append_signed_event(conn, &event)?;
591    Ok(())
592}
593
594/// Daily TTL sweep. Removes every blob whose `stored_at +
595/// ttl_seconds < now`. Bounded to `max_per_run` rows per call so a
596/// pathological backlog can't monopolise the connection; callers
597/// (the daemon background loop) re-invoke at the configured cadence.
598///
599/// `sleep_between_deletes` is honoured between row deletions to keep
600/// the connection lock window short under contended write traffic.
601///
602/// # Errors
603///
604/// Bubbles SQLite errors. A successful run returns the count of
605/// deleted rows.
606pub fn sweep_expired(
607    conn: &Connection,
608    now_unix: i64,
609    max_per_run: usize,
610    sleep_between_deletes: std::time::Duration,
611) -> Result<usize> {
612    let limit_i64 = i64::try_from(max_per_run).unwrap_or(i64::MAX);
613    let mut stmt = conn
614        .prepare(
615            "SELECT ref_id FROM offloaded_blobs
616             WHERE ttl_seconds IS NOT NULL
617               AND (stored_at + ttl_seconds) < ?1
618             ORDER BY stored_at ASC
619             LIMIT ?2",
620        )
621        .context("prepare TTL sweep select")?;
622    let candidates: Vec<String> = stmt
623        .query_map(params![now_unix, limit_i64], |r| r.get::<_, String>(0))
624        .context("execute TTL sweep select")?
625        .collect::<rusqlite::Result<Vec<_>>>()
626        .context("collect TTL sweep candidates")?;
627    drop(stmt);
628
629    let mut deleted = 0usize;
630    for ref_id in candidates {
631        // Re-evaluate the TTL predicate in the DELETE itself so a concurrent
632        // `offload()` that refreshed `stored_at` between the SELECT above and
633        // this row's DELETE does not cause the fresh blob to be dropped (#1264).
634        let rows = conn
635            .execute(
636                "DELETE FROM offloaded_blobs
637                 WHERE ref_id = ?1
638                   AND ttl_seconds IS NOT NULL
639                   AND (stored_at + ttl_seconds) < ?2",
640                params![ref_id, now_unix],
641            )
642            .with_context(|| format!("DELETE offloaded_blob {ref_id}"))?;
643        if rows > 0 {
644            deleted += 1;
645        }
646        if !sleep_between_deletes.is_zero() {
647            std::thread::sleep(sleep_between_deletes);
648        }
649    }
650    Ok(deleted)
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    use crate::storage as db;
657    use std::path::Path;
658
659    fn fresh_db() -> Connection {
660        db::open(Path::new(":memory:")).expect("open in-memory db")
661    }
662
663    #[test]
664    fn ref_id_is_stable_for_identical_content() {
665        let a = ref_id_from_sha(&sha256_hex(b"hello world"));
666        let b = ref_id_from_sha(&sha256_hex(b"hello world"));
667        assert_eq!(a, b);
668        assert!(a.starts_with("ofl_"));
669        // 8 bytes = 64 bits = 13 base32 chars (8 * 8 / 5 = 12.8, ceil = 13).
670        assert_eq!(a.len(), "ofl_".len() + 13);
671    }
672
673    #[test]
674    fn ref_id_differs_for_distinct_content() {
675        let a = ref_id_from_sha(&sha256_hex(b"alpha"));
676        let b = ref_id_from_sha(&sha256_hex(b"beta"));
677        assert_ne!(a, b);
678    }
679
680    #[test]
681    fn canonical_payload_is_deterministic() {
682        let p1 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
683        let p2 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
684        assert_eq!(p1, p2);
685    }
686
687    #[test]
688    fn offload_deref_round_trip_no_signer() {
689        let conn = fresh_db();
690        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
691        let content = "the quick brown fox jumps over the lazy dog";
692        let r = off
693            .offload(content, "ns/test", None, "ai:alice")
694            .expect("offload");
695        let back = off.deref(&r.ref_id, None).expect("deref");
696        assert_eq!(back.content, content);
697        assert_eq!(back.sha256, r.content_sha256);
698    }
699
700    #[test]
701    fn offload_refuses_oversize_blob() {
702        let conn = fresh_db();
703        let cfg = OffloadConfig {
704            max_offload_blob_bytes: 16,
705            ..Default::default()
706        };
707        let off = ContextOffloader::new(&conn, None, cfg);
708        let err = off
709            .offload("0123456789ABCDEF_extra", "ns", None, "ai:alice")
710            .err()
711            .expect("size error");
712        let downcast = err
713            .downcast_ref::<OffloadError>()
714            .expect("OffloadError variant");
715        matches!(downcast, OffloadError::SizeLimitExceeded { .. });
716    }
717
718    #[test]
719    fn deref_refuses_when_content_tampered() {
720        let conn = fresh_db();
721        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
722        let r = off
723            .offload("hello world", "ns", None, "ai:alice")
724            .expect("offload");
725
726        // Swap the stored zstd blob for one whose decompressed bytes
727        // do NOT match the stored sha256.
728        let tampered = zstd_compress(b"GOODBYE WORLD").expect("compress");
729        conn.execute(
730            "UPDATE offloaded_blobs SET content_zstd = ?1 WHERE ref_id = ?2",
731            params![tampered, r.ref_id],
732        )
733        .expect("tamper");
734
735        let err = off.deref(&r.ref_id, None).err().expect("deref must reject");
736        let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
737        assert!(matches!(downcast, OffloadError::IntegrityFailed { .. }));
738    }
739
740    #[test]
741    fn deref_refuses_unknown_ref_id() {
742        let conn = fresh_db();
743        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
744        let err = off
745            .deref("ofl_DOESNOTEXIST", None)
746            .err()
747            .expect("not found");
748        let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
749        assert!(matches!(downcast, OffloadError::NotFound { .. }));
750    }
751
752    /// SEC-4 (Cluster D, issue #767) — cross-agent deref must surface
753    /// `NotFound` (leak-resistant), NOT a typed permission error that
754    /// would let a probe enumerate ref_ids by message differentiation.
755    #[test]
756    fn deref_refuses_cross_agent_caller_with_notfound() {
757        let conn = fresh_db();
758        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
759        let r = off
760            .offload("alice's secret", "ns", None, "ai:alice")
761            .expect("offload");
762        // Bob tries to deref Alice's blob — must fail with NotFound.
763        let err = off
764            .deref(&r.ref_id, Some("ai:bob"))
765            .err()
766            .expect("cross-agent deref must reject");
767        let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
768        assert!(
769            matches!(downcast, OffloadError::NotFound { .. }),
770            "cross-agent deref must map to NotFound (leak-resistant), got: {downcast:?}"
771        );
772        // Owner can still read.
773        let owner_back = off
774            .deref(&r.ref_id, Some("ai:alice"))
775            .expect("owner deref ok");
776        assert_eq!(owner_back.content, "alice's secret");
777        // Substrate-internal callers (None) BYPASS the ownership gate.
778        let internal_back = off
779            .deref(&r.ref_id, None)
780            .expect("substrate-internal deref ok");
781        assert_eq!(internal_back.content, "alice's secret");
782    }
783
784    #[test]
785    fn sweep_purges_expired_rows() {
786        let conn = fresh_db();
787        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
788        // Two TTL'd rows, one permanent row.
789        let a = off
790            .offload("alpha", "ns", Some(60), "ai:alice")
791            .expect("offload a");
792        let b = off
793            .offload("beta", "ns", Some(60), "ai:alice")
794            .expect("offload b");
795        let c = off
796            .offload("gamma", "ns", None, "ai:alice")
797            .expect("offload c");
798
799        // Sweep with `now` well beyond stored_at + 60s.
800        let future = a.stored_at + 60 * 60;
801        let deleted = sweep_expired(&conn, future, 1000, std::time::Duration::ZERO).expect("sweep");
802        assert_eq!(deleted, 2);
803
804        // a + b are gone; c (permanent) remains.
805        assert!(off.deref(&a.ref_id, None).is_err());
806        assert!(off.deref(&b.ref_id, None).is_err());
807        assert!(off.deref(&c.ref_id, None).is_ok());
808    }
809
810    /// Regression for #1264 — TOCTOU race between `sweep_expired` SELECT
811    /// and DELETE. If a concurrent `offload()` refreshes `stored_at` after
812    /// the row is selected but before its DELETE runs, the per-row DELETE
813    /// must re-evaluate the TTL predicate and SKIP the now-fresh row.
814    #[test]
815    fn sweep_does_not_drop_blob_refreshed_after_select() {
816        let conn = fresh_db();
817        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
818        let r = off
819            .offload("racy", "ns", Some(60), "ai:alice")
820            .expect("offload");
821        let original_stored_at = r.stored_at;
822        let sweep_now = original_stored_at + 60 * 60;
823
824        // Simulate the race: between sweep's SELECT and DELETE the row gets
825        // its `stored_at` bumped to `sweep_now` (e.g. a concurrent
826        // `offload()` calling `ON CONFLICT DO UPDATE SET stored_at =
827        // excluded.stored_at`). Bumping stored_at to `sweep_now` makes the
828        // row's expiry `sweep_now + 60` — i.e. NOT expired against
829        // `sweep_now`.
830        conn.execute(
831            "UPDATE offloaded_blobs SET stored_at = ?1 WHERE ref_id = ?2",
832            params![sweep_now, r.ref_id],
833        )
834        .expect("simulate concurrent refresh");
835
836        // Sweep with the same `now` the SELECT phase used. Pre-fix, the
837        // DELETE drops the row because the predicate is only `ref_id = ?`.
838        // Post-fix, the DELETE re-checks `(stored_at + ttl_seconds) < now`
839        // and the refreshed row survives.
840        let deleted =
841            sweep_expired(&conn, sweep_now, 1000, std::time::Duration::ZERO).expect("sweep");
842        assert_eq!(
843            deleted, 0,
844            "sweep must not drop a row whose stored_at was refreshed past expiry"
845        );
846        let back = off.deref(&r.ref_id, None).expect("blob must still exist");
847        assert_eq!(back.content, "racy");
848    }
849
850    #[test]
851    fn signed_events_chain_captures_offload_and_deref() {
852        let conn = fresh_db();
853        let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
854        let r = off
855            .offload("traced", "ns", None, "ai:alice")
856            .expect("offload");
857        let _ = off.deref(&r.ref_id, None).expect("deref");
858        let rows = crate::signed_events::list_signed_events(&conn, None, 100, 0).expect("list");
859        let kinds: Vec<&str> = rows.iter().map(|r| r.event_type.as_str()).collect();
860        assert!(kinds.contains(&"context_offloaded"));
861        assert!(kinds.contains(&"context_dereferenced"));
862    }
863}