1use std::collections::BTreeMap;
46use std::time::{SystemTime, UNIX_EPOCH};
47
48use anyhow::{Context, Result, anyhow};
49use base64::Engine;
50use base64::engine::general_purpose::URL_SAFE_NO_PAD;
51use ed25519_dalek::{Signer, Verifier};
52use rusqlite::{Connection, OptionalExtension, params};
53use sha2::{Digest, Sha256};
54
55use crate::identity::keypair::AgentKeypair;
56use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
57
58const ZSTD_LEVEL: i32 = 3;
61
62pub const MAX_DECOMPRESSED_BYTES: usize = 16 * 1024 * 1024;
67
68pub const DEFAULT_MAX_OFFLOAD_BLOB_BYTES: u32 = 1_048_576;
73
74const BASE32_ALPHABET: &[u8; 32] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
79
80const REF_ID_PREFIX: &str = "ofl_";
85
86#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct OffloadResult {
91 pub ref_id: String,
92 pub content_sha256: String,
93 pub stored_at: i64,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct DerefResult {
100 pub content: String,
101 pub stored_at: i64,
102 pub sha256: String,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
111pub enum OffloadError {
112 SizeLimitExceeded { actual: usize, limit: usize },
113 IntegrityFailed { ref_id: String },
114 SignatureFailed { ref_id: String },
115 NotFound { ref_id: String },
116}
117
118impl std::fmt::Display for OffloadError {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 match self {
121 Self::SizeLimitExceeded { actual, limit } => {
122 write!(f, "offload blob {actual} bytes exceeds policy max {limit}")
123 }
124 Self::IntegrityFailed { ref_id } => write!(
125 f,
126 "offloaded blob {ref_id} integrity check failed (content tampered)"
127 ),
128 Self::SignatureFailed { ref_id } => {
129 write!(f, "offloaded blob {ref_id} signature verification failed")
130 }
131 Self::NotFound { ref_id } => write!(f, "offloaded blob {ref_id} not found"),
132 }
133 }
134}
135
136impl std::error::Error for OffloadError {}
137
138#[derive(Debug, Clone)]
140pub struct OffloadConfig {
141 pub max_offload_blob_bytes: u32,
146 pub default_offload_ttl_seconds: Option<u64>,
150}
151
152impl Default for OffloadConfig {
153 fn default() -> Self {
154 Self {
155 max_offload_blob_bytes: DEFAULT_MAX_OFFLOAD_BLOB_BYTES,
156 default_offload_ttl_seconds: None,
157 }
158 }
159}
160
161pub struct ContextOffloader<'a> {
165 conn: &'a Connection,
166 signer: Option<&'a AgentKeypair>,
167 config: OffloadConfig,
168}
169
170impl<'a> ContextOffloader<'a> {
171 #[must_use]
174 pub fn new(
175 conn: &'a Connection,
176 signer: Option<&'a AgentKeypair>,
177 config: OffloadConfig,
178 ) -> Self {
179 Self {
180 conn,
181 signer,
182 config,
183 }
184 }
185
186 pub fn offload(
195 &self,
196 content: &str,
197 namespace: &str,
198 ttl_seconds: Option<u64>,
199 agent_id: &str,
200 ) -> Result<OffloadResult> {
201 let limit = self.config.max_offload_blob_bytes as usize;
202 if content.len() > limit {
203 return Err(anyhow!(OffloadError::SizeLimitExceeded {
204 actual: content.len(),
205 limit,
206 }));
207 }
208 let sha = sha256_hex(content.as_bytes());
210 let ref_id = ref_id_from_sha(&sha);
211 let stored_at = now_unix_seconds();
212 let effective_ttl = ttl_seconds.or(self.config.default_offload_ttl_seconds);
213 let blob = zstd_compress(content.as_bytes()).context("zstd compression failed")?;
218
219 let stored_at_signed: i64 = stored_at;
223 let signature_b64 = if let Some(keypair) = self.signer {
224 let payload = canonical_payload(&ref_id, &sha, stored_at_signed, namespace)?;
225 let signing = keypair.private.as_ref().with_context(|| {
226 format!(
227 "AgentKeypair for {} has no private key — cannot sign offload",
228 keypair.agent_id
229 )
230 })?;
231 URL_SAFE_NO_PAD.encode(signing.sign(&payload).to_bytes())
232 } else {
233 String::new()
234 };
235
236 let ttl_param: Option<i64> = effective_ttl.and_then(|n| i64::try_from(n).ok());
237 self.conn
238 .execute(
239 "INSERT INTO offloaded_blobs (
240 ref_id, namespace, content_zstd, content_sha256,
241 stored_at, ttl_seconds, agent_id, signature_b64
242 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
243 ON CONFLICT(ref_id) DO UPDATE SET
244 namespace = excluded.namespace,
245 content_zstd = excluded.content_zstd,
246 content_sha256 = excluded.content_sha256,
247 stored_at = excluded.stored_at,
248 ttl_seconds = excluded.ttl_seconds,
249 agent_id = excluded.agent_id,
250 signature_b64 = excluded.signature_b64",
251 params![
252 ref_id,
253 namespace,
254 blob,
255 sha,
256 stored_at,
257 ttl_param,
258 agent_id,
259 signature_b64,
260 ],
261 )
262 .context("INSERT into offloaded_blobs failed")?;
263
264 append_audit_row(
269 self.conn,
270 agent_id,
271 "context_offloaded",
272 &ref_id,
273 &sha,
274 namespace,
275 stored_at_signed,
276 &signature_b64,
277 )?;
278
279 Ok(OffloadResult {
280 ref_id,
281 content_sha256: sha,
282 stored_at: stored_at_signed,
283 })
284 }
285
286 pub fn deref(&self, ref_id: &str, caller_agent_id: Option<&str>) -> Result<DerefResult> {
310 let row: Option<(Vec<u8>, String, i64, String, String, String)> = self
311 .conn
312 .query_row(
313 "SELECT content_zstd, content_sha256, stored_at, namespace,
314 agent_id, signature_b64
315 FROM offloaded_blobs WHERE ref_id = ?1",
316 params![ref_id],
317 |r| {
318 Ok((
319 r.get(0)?,
320 r.get(1)?,
321 r.get(2)?,
322 r.get(3)?,
323 r.get(4)?,
324 r.get(5)?,
325 ))
326 },
327 )
328 .optional()
329 .context("SELECT offloaded_blobs failed")?;
330
331 let (blob, stored_sha, stored_at, namespace, agent_id, signature_b64) =
332 row.ok_or_else(|| {
333 anyhow!(OffloadError::NotFound {
334 ref_id: ref_id.to_string(),
335 })
336 })?;
337
338 if let Some(caller) = caller_agent_id
347 && caller != agent_id
348 {
349 tracing::info!(
350 ref_id = %ref_id,
351 caller = %caller,
352 "SEC-4: handle_deref ownership mismatch — surfacing NotFound (leak-resistant)"
353 );
354 return Err(anyhow!(OffloadError::NotFound {
355 ref_id: ref_id.to_string(),
356 }));
357 }
358
359 if let Some(keypair) = self.signer {
364 if !signature_b64.is_empty() {
365 let payload = canonical_payload(ref_id, &stored_sha, stored_at, &namespace)?;
366 let sig_bytes = URL_SAFE_NO_PAD
367 .decode(signature_b64.as_bytes())
368 .context("decode stored signature_b64")?;
369 let sig_arr: [u8; 64] = sig_bytes
370 .as_slice()
371 .try_into()
372 .context("stored signature is not 64 bytes")?;
373 let sig = ed25519_dalek::Signature::from_bytes(&sig_arr);
374 if keypair.public.verify(&payload, &sig).is_err() {
375 return Err(anyhow!(OffloadError::SignatureFailed {
376 ref_id: ref_id.to_string(),
377 }));
378 }
379 }
380 }
381
382 let bytes = zstd_decompress(&blob).context("zstd decompression failed")?;
383 let content = String::from_utf8(bytes).map_err(|_| OffloadError::IntegrityFailed {
387 ref_id: ref_id.to_string(),
388 })?;
389 let recomputed = sha256_hex(content.as_bytes());
390 if recomputed != stored_sha {
391 return Err(anyhow!(OffloadError::IntegrityFailed {
392 ref_id: ref_id.to_string(),
393 }));
394 }
395
396 append_audit_row(
397 self.conn,
398 &agent_id,
399 "context_dereferenced",
400 ref_id,
401 &stored_sha,
402 &namespace,
403 stored_at,
404 &signature_b64,
405 )?;
406
407 Ok(DerefResult {
408 content,
409 stored_at,
410 sha256: stored_sha,
411 })
412 }
413}
414
415fn sha256_hex(input: &[u8]) -> String {
417 let mut hasher = Sha256::new();
418 hasher.update(input);
419 bytes_to_hex(&hasher.finalize())
420}
421
422fn bytes_to_hex(bytes: &[u8]) -> String {
425 const HEX: &[u8; 16] = b"0123456789abcdef";
426 let mut out = String::with_capacity(bytes.len() * 2);
427 for byte in bytes {
428 out.push(HEX[(byte >> 4) as usize] as char);
429 out.push(HEX[(byte & 0x0F) as usize] as char);
430 }
431 out
432}
433
434fn base32_encode(bytes: &[u8]) -> String {
437 let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
438 let mut buffer: u32 = 0;
439 let mut bits: u32 = 0;
440 for byte in bytes {
441 buffer = (buffer << 8) | u32::from(*byte);
442 bits += 8;
443 while bits >= 5 {
444 bits -= 5;
445 let idx = ((buffer >> bits) & 0x1F) as usize;
446 out.push(BASE32_ALPHABET[idx] as char);
447 }
448 }
449 if bits > 0 {
450 let idx = ((buffer << (5 - bits)) & 0x1F) as usize;
451 out.push(BASE32_ALPHABET[idx] as char);
452 }
453 out
454}
455
456fn ref_id_from_sha(sha_hex: &str) -> String {
459 let mut first_8 = [0u8; 8];
461 for (i, byte) in first_8.iter_mut().enumerate() {
462 let hi = hex_nibble(sha_hex.as_bytes()[i * 2]);
463 let lo = hex_nibble(sha_hex.as_bytes()[i * 2 + 1]);
464 *byte = (hi << 4) | lo;
465 }
466 format!("{REF_ID_PREFIX}{}", base32_encode(&first_8))
467}
468
469fn hex_nibble(byte: u8) -> u8 {
470 match byte {
471 b'0'..=b'9' => byte - b'0',
472 b'a'..=b'f' => byte - b'a' + 10,
473 b'A'..=b'F' => byte - b'A' + 10,
474 _ => 0,
475 }
476}
477
478fn canonical_payload(
483 ref_id: &str,
484 content_sha256: &str,
485 stored_at: i64,
486 namespace: &str,
487) -> Result<Vec<u8>> {
488 let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
489 map.insert(
490 crate::models::field_names::CONTENT_SHA256,
491 ciborium::Value::Text(content_sha256.to_string()),
492 );
493 map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
494 map.insert("ref_id", ciborium::Value::Text(ref_id.to_string()));
495 map.insert("stored_at", ciborium::Value::Integer(stored_at.into()));
496 let value = ciborium::Value::Map(
497 map.into_iter()
498 .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
499 .collect(),
500 );
501 let mut buf = Vec::new();
502 ciborium::into_writer(&value, &mut buf).context("encode canonical offload payload")?;
503 Ok(buf)
504}
505
506fn now_unix_seconds() -> i64 {
507 SystemTime::now()
508 .duration_since(UNIX_EPOCH)
509 .map(|d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX))
510 .unwrap_or(0)
511}
512
513fn zstd_compress(input: &[u8]) -> Result<Vec<u8>> {
514 use std::io::Write;
515 let mut out = Vec::with_capacity(input.len() / 4 + 64);
516 {
517 let mut encoder = zstd::stream::write::Encoder::new(&mut out, ZSTD_LEVEL)?;
518 encoder.write_all(input)?;
519 encoder.finish()?;
520 }
521 Ok(out)
522}
523
524fn zstd_decompress(input: &[u8]) -> Result<Vec<u8>> {
525 use std::io::Read;
526 let init_cap = std::cmp::min(input.len() * 4, MAX_DECOMPRESSED_BYTES);
527 let mut out = Vec::with_capacity(init_cap);
528 let mut decoder = zstd::stream::read::Decoder::new(input)?;
529 let mut buf = [0u8; 64 * 1024];
530 loop {
531 let n = decoder.read(&mut buf)?;
532 if n == 0 {
533 break;
534 }
535 if out.len().saturating_add(n) > MAX_DECOMPRESSED_BYTES {
536 return Err(anyhow!(
537 "offloaded blob decompression exceeded {MAX_DECOMPRESSED_BYTES} byte cap"
538 ));
539 }
540 out.extend_from_slice(&buf[..n]);
541 }
542 Ok(out)
543}
544
545fn append_audit_row(
550 conn: &Connection,
551 agent_id: &str,
552 event_type: &str,
553 ref_id: &str,
554 content_sha256: &str,
555 namespace: &str,
556 stored_at: i64,
557 signature_b64: &str,
558) -> Result<()> {
559 let payload = canonical_payload(ref_id, content_sha256, stored_at, namespace)?;
560 let hash = payload_hash(&payload);
561 let signature_bytes = if signature_b64.is_empty() {
562 None
563 } else {
564 Some(
565 URL_SAFE_NO_PAD
566 .decode(signature_b64.as_bytes())
567 .context("decode signature_b64 for audit row")?,
568 )
569 };
570 let attest_level = if signature_bytes.is_some() {
575 crate::models::AttestLevel::SelfSigned.as_str()
576 } else {
577 crate::models::AttestLevel::Unsigned.as_str()
578 };
579 let event = SignedEvent {
580 id: uuid::Uuid::new_v4().to_string(),
581 agent_id: agent_id.to_string(),
582 event_type: event_type.to_string(),
583 payload_hash: hash,
584 signature: signature_bytes,
585 attest_level: attest_level.to_string(),
586 timestamp: chrono::Utc::now().to_rfc3339(),
587 prev_hash: Vec::new(),
588 sequence: 0,
589 };
590 append_signed_event(conn, &event)?;
591 Ok(())
592}
593
594pub fn sweep_expired(
607 conn: &Connection,
608 now_unix: i64,
609 max_per_run: usize,
610 sleep_between_deletes: std::time::Duration,
611) -> Result<usize> {
612 let limit_i64 = i64::try_from(max_per_run).unwrap_or(i64::MAX);
613 let mut stmt = conn
614 .prepare(
615 "SELECT ref_id FROM offloaded_blobs
616 WHERE ttl_seconds IS NOT NULL
617 AND (stored_at + ttl_seconds) < ?1
618 ORDER BY stored_at ASC
619 LIMIT ?2",
620 )
621 .context("prepare TTL sweep select")?;
622 let candidates: Vec<String> = stmt
623 .query_map(params![now_unix, limit_i64], |r| r.get::<_, String>(0))
624 .context("execute TTL sweep select")?
625 .collect::<rusqlite::Result<Vec<_>>>()
626 .context("collect TTL sweep candidates")?;
627 drop(stmt);
628
629 let mut deleted = 0usize;
630 for ref_id in candidates {
631 let rows = conn
635 .execute(
636 "DELETE FROM offloaded_blobs
637 WHERE ref_id = ?1
638 AND ttl_seconds IS NOT NULL
639 AND (stored_at + ttl_seconds) < ?2",
640 params![ref_id, now_unix],
641 )
642 .with_context(|| format!("DELETE offloaded_blob {ref_id}"))?;
643 if rows > 0 {
644 deleted += 1;
645 }
646 if !sleep_between_deletes.is_zero() {
647 std::thread::sleep(sleep_between_deletes);
648 }
649 }
650 Ok(deleted)
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656 use crate::storage as db;
657 use std::path::Path;
658
659 fn fresh_db() -> Connection {
660 db::open(Path::new(":memory:")).expect("open in-memory db")
661 }
662
663 #[test]
664 fn ref_id_is_stable_for_identical_content() {
665 let a = ref_id_from_sha(&sha256_hex(b"hello world"));
666 let b = ref_id_from_sha(&sha256_hex(b"hello world"));
667 assert_eq!(a, b);
668 assert!(a.starts_with("ofl_"));
669 assert_eq!(a.len(), "ofl_".len() + 13);
671 }
672
673 #[test]
674 fn ref_id_differs_for_distinct_content() {
675 let a = ref_id_from_sha(&sha256_hex(b"alpha"));
676 let b = ref_id_from_sha(&sha256_hex(b"beta"));
677 assert_ne!(a, b);
678 }
679
680 #[test]
681 fn canonical_payload_is_deterministic() {
682 let p1 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
683 let p2 = canonical_payload("ofl_X", "deadbeef", 1234, "ns").unwrap();
684 assert_eq!(p1, p2);
685 }
686
687 #[test]
688 fn offload_deref_round_trip_no_signer() {
689 let conn = fresh_db();
690 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
691 let content = "the quick brown fox jumps over the lazy dog";
692 let r = off
693 .offload(content, "ns/test", None, "ai:alice")
694 .expect("offload");
695 let back = off.deref(&r.ref_id, None).expect("deref");
696 assert_eq!(back.content, content);
697 assert_eq!(back.sha256, r.content_sha256);
698 }
699
700 #[test]
701 fn offload_refuses_oversize_blob() {
702 let conn = fresh_db();
703 let cfg = OffloadConfig {
704 max_offload_blob_bytes: 16,
705 ..Default::default()
706 };
707 let off = ContextOffloader::new(&conn, None, cfg);
708 let err = off
709 .offload("0123456789ABCDEF_extra", "ns", None, "ai:alice")
710 .err()
711 .expect("size error");
712 let downcast = err
713 .downcast_ref::<OffloadError>()
714 .expect("OffloadError variant");
715 matches!(downcast, OffloadError::SizeLimitExceeded { .. });
716 }
717
718 #[test]
719 fn deref_refuses_when_content_tampered() {
720 let conn = fresh_db();
721 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
722 let r = off
723 .offload("hello world", "ns", None, "ai:alice")
724 .expect("offload");
725
726 let tampered = zstd_compress(b"GOODBYE WORLD").expect("compress");
729 conn.execute(
730 "UPDATE offloaded_blobs SET content_zstd = ?1 WHERE ref_id = ?2",
731 params![tampered, r.ref_id],
732 )
733 .expect("tamper");
734
735 let err = off.deref(&r.ref_id, None).err().expect("deref must reject");
736 let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
737 assert!(matches!(downcast, OffloadError::IntegrityFailed { .. }));
738 }
739
740 #[test]
741 fn deref_refuses_unknown_ref_id() {
742 let conn = fresh_db();
743 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
744 let err = off
745 .deref("ofl_DOESNOTEXIST", None)
746 .err()
747 .expect("not found");
748 let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
749 assert!(matches!(downcast, OffloadError::NotFound { .. }));
750 }
751
752 #[test]
756 fn deref_refuses_cross_agent_caller_with_notfound() {
757 let conn = fresh_db();
758 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
759 let r = off
760 .offload("alice's secret", "ns", None, "ai:alice")
761 .expect("offload");
762 let err = off
764 .deref(&r.ref_id, Some("ai:bob"))
765 .err()
766 .expect("cross-agent deref must reject");
767 let downcast = err.downcast_ref::<OffloadError>().expect("OffloadError");
768 assert!(
769 matches!(downcast, OffloadError::NotFound { .. }),
770 "cross-agent deref must map to NotFound (leak-resistant), got: {downcast:?}"
771 );
772 let owner_back = off
774 .deref(&r.ref_id, Some("ai:alice"))
775 .expect("owner deref ok");
776 assert_eq!(owner_back.content, "alice's secret");
777 let internal_back = off
779 .deref(&r.ref_id, None)
780 .expect("substrate-internal deref ok");
781 assert_eq!(internal_back.content, "alice's secret");
782 }
783
784 #[test]
785 fn sweep_purges_expired_rows() {
786 let conn = fresh_db();
787 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
788 let a = off
790 .offload("alpha", "ns", Some(60), "ai:alice")
791 .expect("offload a");
792 let b = off
793 .offload("beta", "ns", Some(60), "ai:alice")
794 .expect("offload b");
795 let c = off
796 .offload("gamma", "ns", None, "ai:alice")
797 .expect("offload c");
798
799 let future = a.stored_at + 60 * 60;
801 let deleted = sweep_expired(&conn, future, 1000, std::time::Duration::ZERO).expect("sweep");
802 assert_eq!(deleted, 2);
803
804 assert!(off.deref(&a.ref_id, None).is_err());
806 assert!(off.deref(&b.ref_id, None).is_err());
807 assert!(off.deref(&c.ref_id, None).is_ok());
808 }
809
810 #[test]
815 fn sweep_does_not_drop_blob_refreshed_after_select() {
816 let conn = fresh_db();
817 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
818 let r = off
819 .offload("racy", "ns", Some(60), "ai:alice")
820 .expect("offload");
821 let original_stored_at = r.stored_at;
822 let sweep_now = original_stored_at + 60 * 60;
823
824 conn.execute(
831 "UPDATE offloaded_blobs SET stored_at = ?1 WHERE ref_id = ?2",
832 params![sweep_now, r.ref_id],
833 )
834 .expect("simulate concurrent refresh");
835
836 let deleted =
841 sweep_expired(&conn, sweep_now, 1000, std::time::Duration::ZERO).expect("sweep");
842 assert_eq!(
843 deleted, 0,
844 "sweep must not drop a row whose stored_at was refreshed past expiry"
845 );
846 let back = off.deref(&r.ref_id, None).expect("blob must still exist");
847 assert_eq!(back.content, "racy");
848 }
849
850 #[test]
851 fn signed_events_chain_captures_offload_and_deref() {
852 let conn = fresh_db();
853 let off = ContextOffloader::new(&conn, None, OffloadConfig::default());
854 let r = off
855 .offload("traced", "ns", None, "ai:alice")
856 .expect("offload");
857 let _ = off.deref(&r.ref_id, None).expect("deref");
858 let rows = crate::signed_events::list_signed_events(&conn, None, 100, 0).expect("list");
859 let kinds: Vec<&str> = rows.iter().map(|r| r.event_type.as_str()).collect();
860 assert!(kinds.contains(&"context_offloaded"));
861 assert!(kinds.contains(&"context_dereferenced"));
862 }
863}