use std::collections::BTreeMap;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, anyhow};
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use ed25519_dalek::{Signer, Verifier};
use rusqlite::{Connection, OptionalExtension, params};
use sha2::{Digest, Sha256};
use crate::identity::keypair::AgentKeypair;
use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
const ZSTD_LEVEL: i32 = 3;
pub const MAX_DECOMPRESSED_BYTES: usize = 16 * 1024 * 1024;
pub const DEFAULT_MAX_OFFLOAD_BLOB_BYTES: u32 = 1_048_576;
const BASE32_ALPHABET: &[u8; 32] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
const REF_ID_PREFIX: &str = "ofl_";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OffloadResult {
pub ref_id: String,
pub content_sha256: String,
pub stored_at: i64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DerefResult {
pub content: String,
pub stored_at: i64,
pub sha256: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OffloadError {
SizeLimitExceeded { actual: usize, limit: usize },
IntegrityFailed { ref_id: String },
SignatureFailed { ref_id: String },
NotFound { ref_id: String },
}
impl std::fmt::Display for OffloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SizeLimitExceeded { actual, limit } => {
write!(f, "offload blob {actual} bytes exceeds policy max {limit}")
}
Self::IntegrityFailed { ref_id } => write!(
f,
"offloaded blob {ref_id} integrity check failed (content tampered)"
),
Self::SignatureFailed { ref_id } => {
write!(f, "offloaded blob {ref_id} signature verification failed")
}
Self::NotFound { ref_id } => write!(f, "offloaded blob {ref_id} not found"),
}
}
}
impl std::error::Error for OffloadError {}
#[derive(Debug, Clone)]
pub struct OffloadConfig {
pub max_offload_blob_bytes: u32,
pub default_offload_ttl_seconds: Option<u64>,
}
impl Default for OffloadConfig {
fn default() -> Self {
Self {
max_offload_blob_bytes: DEFAULT_MAX_OFFLOAD_BLOB_BYTES,
default_offload_ttl_seconds: None,
}
}
}
pub struct ContextOffloader<'a> {
conn: &'a Connection,
signer: Option<&'a AgentKeypair>,
config: OffloadConfig,
}
impl<'a> ContextOffloader<'a> {
#[must_use]
pub fn new(
conn: &'a Connection,
signer: Option<&'a AgentKeypair>,
config: OffloadConfig,
) -> Self {
Self {
conn,
signer,
config,
}
}
pub fn offload(
&self,
content: &str,
namespace: &str,
ttl_seconds: Option<u64>,
agent_id: &str,
) -> Result<OffloadResult> {
let limit = self.config.max_offload_blob_bytes as usize;
if content.len() > limit {
return Err(anyhow!(OffloadError::SizeLimitExceeded {
actual: content.len(),
limit,
}));
}
let sha = sha256_hex(content.as_bytes());
let ref_id = ref_id_from_sha(&sha);
let stored_at = now_unix_seconds();
let effective_ttl = ttl_seconds.or(self.config.default_offload_ttl_seconds);
let blob = zstd_compress(content.as_bytes()).context("zstd compression failed")?;
let stored_at_signed: i64 = stored_at;
let signature_b64 = if let Some(keypair) = self.signer {
let payload = canonical_payload(&ref_id, &sha, stored_at_signed, namespace)?;
let signing = keypair.private.as_ref().with_context(|| {
format!(
"AgentKeypair for {} has no private key — cannot sign offload",
keypair.agent_id
)
})?;
URL_SAFE_NO_PAD.encode(signing.sign(&payload).to_bytes())
} else {
String::new()
};
let ttl_param: Option<i64> = effective_ttl.and_then(|n| i64::try_from(n).ok());
self.conn
.execute(
"INSERT INTO offloaded_blobs (
ref_id, namespace, content_zstd, content_sha256,
stored_at, ttl_seconds, agent_id, signature_b64
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
ON CONFLICT(ref_id) DO UPDATE SET
namespace = excluded.namespace,
content_zstd = excluded.content_zstd,
content_sha256 = excluded.content_sha256,
stored_at = excluded.stored_at,
ttl_seconds = excluded.ttl_seconds,
agent_id = excluded.agent_id,
signature_b64 = excluded.signature_b64",
params![
ref_id,
namespace,
blob,
sha,
stored_at,
ttl_param,
agent_id,
signature_b64,
],
)
.context("INSERT into offloaded_blobs failed")?;
append_audit_row(
self.conn,
agent_id,
"context_offloaded",
&ref_id,
&sha,
namespace,
stored_at_signed,
&signature_b64,
)?;
Ok(OffloadResult {
ref_id,
content_sha256: sha,
stored_at: stored_at_signed,
})
}
pub fn deref(&self, ref_id: &str, caller_agent_id: Option<&str>) -> Result<DerefResult> {
let row: Option<(Vec<u8>, String, i64, String, String, String)> = self
.conn
.query_row(
"SELECT content_zstd, content_sha256, stored_at, namespace,
agent_id, signature_b64
FROM offloaded_blobs WHERE ref_id = ?1",
params![ref_id],
|r| {
Ok((
r.get(0)?,
r.get(1)?,
r.get(2)?,
r.get(3)?,
r.get(4)?,
r.get(5)?,
))
},
)
.optional()
.context("SELECT offloaded_blobs failed")?;
let (blob, stored_sha, stored_at, namespace, agent_id, signature_b64) =
row.ok_or_else(|| {
anyhow!(OffloadError::NotFound {
ref_id: ref_id.to_string(),
})
})?;
if let Some(caller) = caller_agent_id
&& caller != agent_id
{
tracing::info!(
ref_id = %ref_id,
caller = %caller,
"SEC-4: handle_deref ownership mismatch — surfacing NotFound (leak-resistant)"
);
return Err(anyhow!(OffloadError::NotFound {
ref_id: ref_id.to_string(),
}));
}
if let Some(keypair) = self.signer {
if !signature_b64.is_empty() {
let payload = canonical_payload(ref_id, &stored_sha, stored_at, &namespace)?;
let sig_bytes = URL_SAFE_NO_PAD
.decode(signature_b64.as_bytes())
.context("decode stored signature_b64")?;
let sig_arr: [u8; 64] = sig_bytes
.as_slice()
.try_into()
.context("stored signature is not 64 bytes")?;
let sig = ed25519_dalek::Signature::from_bytes(&sig_arr);
if keypair.public.verify(&payload, &sig).is_err() {
return Err(anyhow!(OffloadError::SignatureFailed {
ref_id: ref_id.to_string(),
}));
}
}
}
let bytes = zstd_decompress(&blob).context("zstd decompression failed")?;
let content = String::from_utf8(bytes).map_err(|_| OffloadError::IntegrityFailed {
ref_id: ref_id.to_string(),
})?;
let recomputed = sha256_hex(content.as_bytes());
if recomputed != stored_sha {
return Err(anyhow!(OffloadError::IntegrityFailed {
ref_id: ref_id.to_string(),
}));
}
append_audit_row(
self.conn,
&agent_id,
"context_dereferenced",
ref_id,
&stored_sha,
&namespace,
stored_at,
&signature_b64,
)?;
Ok(DerefResult {
content,
stored_at,
sha256: stored_sha,
})
}
}
fn sha256_hex(input: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(input);
bytes_to_hex(&hasher.finalize())
}
fn bytes_to_hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for byte in bytes {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0F) as usize] as char);
}
out
}
fn base32_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
let mut buffer: u32 = 0;
let mut bits: u32 = 0;
for byte in bytes {
buffer = (buffer << 8) | u32::from(*byte);
bits += 8;
while bits >= 5 {
bits -= 5;
let idx = ((buffer >> bits) & 0x1F) as usize;
out.push(BASE32_ALPHABET[idx] as char);
}
}
if bits > 0 {
let idx = ((buffer << (5 - bits)) & 0x1F) as usize;
out.push(BASE32_ALPHABET[idx] as char);
}
out
}
fn ref_id_from_sha(sha_hex: &str) -> String {
let mut first_8 = [0u8; 8];
for (i, byte) in first_8.iter_mut().enumerate() {
let hi = hex_nibble(sha_hex.as_bytes()[i * 2]);
let lo = hex_nibble(sha_hex.as_bytes()[i * 2 + 1]);
*byte = (hi << 4) | lo;
}
format!("{REF_ID_PREFIX}{}", base32_encode(&first_8))
}
fn hex_nibble(byte: u8) -> u8 {
match byte {
b'0'..=b'9' => byte - b'0',
b'a'..=b'f' => byte - b'a' + 10,
b'A'..=b'F' => byte - b'A' + 10,
_ => 0,
}
}
fn canonical_payload(
ref_id: &str,
content_sha256: &str,
stored_at: i64,
namespace: &str,
) -> Result<Vec<u8>> {
let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
map.insert(
crate::models::field_names::CONTENT_SHA256,
ciborium::Value::Text(content_sha256.to_string()),
);
map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
map.insert("ref_id", ciborium::Value::Text(ref_id.to_string()));
map.insert("stored_at", ciborium::Value::Integer(stored_at.into()));
let value = ciborium::Value::Map(
map.into_iter()
.map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
.collect(),
);
let mut buf = Vec::new();
ciborium::into_writer(&value, &mut buf).context("encode canonical offload payload")?;
Ok(buf)
}
fn now_unix_seconds() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
.unwrap_or(0)
}
fn zstd_compress(input: &[u8]) -> Result<Vec<u8>> {
use std::io::Write;
let mut out = Vec::with_capacity(input.len() / 4 + 64);
{
let mut encoder = zstd::stream::write::Encoder::new(&mut out, ZSTD_LEVEL)?;
encoder.write_all(input)?;
encoder.finish()?;
}
Ok(out)
}
fn zstd_decompress(input: &[u8]) -> Result<Vec<u8>> {
use std::io::Read;
let init_cap = std::cmp::min(input.len() * 4, MAX_DECOMPRESSED_BYTES);
let mut out = Vec::with_capacity(init_cap);
let mut decoder = zstd::stream::read::Decoder::new(input)?;
let mut buf = [0u8; 64 * 1024];
loop {
let n = decoder.read(&mut buf)?;
if n == 0 {
break;
}
if out.len().saturating_add(n) > MAX_DECOMPRESSED_BYTES {
return Err(anyhow!(
"offloaded blob decompression exceeded {MAX_DECOMPRESSED_BYTES} byte cap"
));
}
out.extend_from_slice(&buf[..n]);
}
Ok(out)
}
fn append_audit_row(
conn: &Connection,
agent_id: &str,
event_type: &str,
ref_id: &str,
content_sha256: &str,
namespace: &str,
stored_at: i64,
signature_b64: &str,
) -> Result<()> {
let payload = canonical_payload(ref_id, content_sha256, stored_at, namespace)?;
let hash = payload_hash(&payload);
let signature_bytes = if signature_b64.is_empty() {
None
} else {
Some(
URL_SAFE_NO_PAD
.decode(signature_b64.as_bytes())
.context("decode signature_b64 for audit row")?,
)
};
let attest_level = if signature_bytes.is_some() {
crate::models::AttestLevel::SelfSigned.as_str()
} else {
crate::models::AttestLevel::Unsigned.as_str()
};
let event = SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: agent_id.to_string(),
event_type: event_type.to_string(),
payload_hash: hash,
signature: signature_bytes,
attest_level: attest_level.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
prev_hash: Vec::new(),
sequence: 0,
};
append_signed_event(conn, &event)?;
Ok(())
}
pub fn sweep_expired(
conn: &Connection,
now_unix: i64,
max_per_run: usize,
sleep_between_deletes: std::time::Duration,
) -> Result<usize> {
let limit_i64 = i64::try_from(max_per_run).unwrap_or(i64::MAX);
let mut stmt = conn
.prepare(
"SELECT ref_id FROM offloaded_blobs
WHERE ttl_seconds IS NOT NULL
AND (stored_at + ttl_seconds) < ?1
ORDER BY stored_at ASC
LIMIT ?2",
)
.context("prepare TTL sweep select")?;
let candidates: Vec<String> = stmt
.query_map(params![now_unix, limit_i64], |r| r.get::<_, String>(0))
.context("execute TTL sweep select")?
.collect::<rusqlite::Result<Vec<_>>>()
.context("collect TTL sweep candidates")?;
drop(stmt);
let mut deleted = 0usize;
for ref_id in candidates {
let rows = conn
.execute(
"DELETE FROM offloaded_blobs
WHERE ref_id = ?1
AND ttl_seconds IS NOT NULL
AND (stored_at + ttl_seconds) < ?2",
params![ref_id, now_unix],
)
.with_context(|| format!("DELETE offloaded_blob {ref_id}"))?;
if rows > 0 {
deleted += 1;
}
if !sleep_between_deletes.is_zero() {
std::thread::sleep(sleep_between_deletes);
}
}
Ok(deleted)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage as db;
use std::path::Path;
fn fresh_db() -> Connection {
db::open(Path::new(":memory:")).expect("open in-memory db")
}
#[test]
fn ref_id_is_stable_for_identical_content() {
let a = ref_id_from_sha(&sha256_hex(b"hello world"));
let b = ref_id_from_sha(&sha256_hex(b"hello world"));
assert_eq!(a, b);
assert!(a.starts_with("ofl_"));
assert_eq!(a.len(), "ofl_".len() + 13);
}
#[test]
fn ref_id_differs_for_distinct_content() {
let a = ref_id_from_sha(&sha256_hex(b"alpha"));
let b = ref_id_from_sha(&sha256_hex(b"beta"));
assert_ne!(a, b);
}
#[test]
fn canonical_payload_is_deterministic() {
let p1 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
let p2 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
assert_eq!(p1, p2);
}
#[test]
fn offload_deref_round_trip_no_signer() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let content = "the quick brown fox jumps over the lazy dog";
let r = off
.offload(content, "ns/test", None, "ai:alice")
.expect("offload");
let back = off.deref(&r.ref_id, None).expect("deref");
assert_eq!(back.content, content);
assert_eq!(back.sha256, r.content_sha256);
}
#[test]
fn offload_refuses_oversize_blob() {
let conn = fresh_db();
let cfg = OffloadConfig {
max_offload_blob_bytes: 16,
..Default::default()
};
let off = ContextOffloader::new(&conn, None, cfg);
let err = off
.offload("0123456789ABCDEF_extra", "ns", None, "ai:alice")
.err()
.expect("size error");
let downcast = err
.downcast_ref::<OffloadError>()
.expect("OffloadError variant");
matches!(downcast, OffloadError::SizeLimitExceeded { .. });
}
#[test]
fn deref_refuses_when_content_tampered() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let r = off
.offload("hello world", "ns", None, "ai:alice")
.expect("offload");
let tampered = zstd_compress(b"GOODBYE WORLD").expect("compress");
conn.execute(
"UPDATE offloaded_blobs SET content_zstd = ?1 WHERE ref_id = ?2",
params![tampered, r.ref_id],
)
.expect("tamper");
let err = off.deref(&r.ref_id, None).err().expect("deref must reject");
let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
assert!(matches!(downcast, OffloadError::IntegrityFailed { .. }));
}
#[test]
fn deref_refuses_unknown_ref_id() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let err = off
.deref("ofl_DOESNOTEXIST", None)
.err()
.expect("not found");
let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
assert!(matches!(downcast, OffloadError::NotFound { .. }));
}
#[test]
fn deref_refuses_cross_agent_caller_with_notfound() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let r = off
.offload("alice's secret", "ns", None, "ai:alice")
.expect("offload");
let err = off
.deref(&r.ref_id, Some("ai:bob"))
.err()
.expect("cross-agent deref must reject");
let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
assert!(
matches!(downcast, OffloadError::NotFound { .. }),
"cross-agent deref must map to NotFound (leak-resistant), got: {downcast:?}"
);
let owner_back = off
.deref(&r.ref_id, Some("ai:alice"))
.expect("owner deref ok");
assert_eq!(owner_back.content, "alice's secret");
let internal_back = off
.deref(&r.ref_id, None)
.expect("substrate-internal deref ok");
assert_eq!(internal_back.content, "alice's secret");
}
#[test]
fn sweep_purges_expired_rows() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let a = off
.offload("alpha", "ns", Some(60), "ai:alice")
.expect("offload a");
let b = off
.offload("beta", "ns", Some(60), "ai:alice")
.expect("offload b");
let c = off
.offload("gamma", "ns", None, "ai:alice")
.expect("offload c");
let future = a.stored_at + 60 * 60;
let deleted = sweep_expired(&conn, future, 1000, std::time::Duration::ZERO).expect("sweep");
assert_eq!(deleted, 2);
assert!(off.deref(&a.ref_id, None).is_err());
assert!(off.deref(&b.ref_id, None).is_err());
assert!(off.deref(&c.ref_id, None).is_ok());
}
#[test]
fn sweep_does_not_drop_blob_refreshed_after_select() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let r = off
.offload("racy", "ns", Some(60), "ai:alice")
.expect("offload");
let original_stored_at = r.stored_at;
let sweep_now = original_stored_at + 60 * 60;
conn.execute(
"UPDATE offloaded_blobs SET stored_at = ?1 WHERE ref_id = ?2",
params![sweep_now, r.ref_id],
)
.expect("simulate concurrent refresh");
let deleted =
sweep_expired(&conn, sweep_now, 1000, std::time::Duration::ZERO).expect("sweep");
assert_eq!(
deleted, 0,
"sweep must not drop a row whose stored_at was refreshed past expiry"
);
let back = off.deref(&r.ref_id, None).expect("blob must still exist");
assert_eq!(back.content, "racy");
}
#[test]
fn signed_events_chain_captures_offload_and_deref() {
let conn = fresh_db();
let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
let r = off
.offload("traced", "ns", None, "ai:alice")
.expect("offload");
let _ = off.deref(&r.ref_id, None).expect("deref");
let rows = crate::signed_events::list_signed_events(&conn, None, 100, 0).expect("list");
let kinds: Vec<&str> = rows.iter().map(|r| r.event_type.as_str()).collect();
assert!(kinds.contains(&"context_offloaded"));
assert!(kinds.contains(&"context_dereferenced"));
}
}