Skip to main content

mnemo_core/storage/
cold.rs

1//! Cold storage interface for archiving memories to object storage.
2//!
3//! Provides a trait and types for archiving memories to S3-compatible
4//! object storage. Memories can be archived from the primary database
5//! and restored when needed.
6//!
7//! # Architecture
8//!
9//! The [`ColdStorage`] trait defines the contract for any cold storage backend.
10//! An [`InMemoryColdStorage`] implementation is provided for testing without
11//! requiring real S3 credentials or network access.
12//!
13//! S3 keys follow the format: `{prefix}/{agent_id}/{memory_id}.json`
14//!
15//! # Example
16//!
17//! ```rust
18//! use mnemo_core::storage::cold::{ColdStorage, InMemoryColdStorage, ColdStorageConfig};
19//!
20//! # async fn example() -> mnemo_core::error::Result<()> {
21//! let config = ColdStorageConfig {
22//!     bucket: "my-bucket".to_string(),
23//!     prefix: "memories".to_string(),
24//!     endpoint: None,
25//!     region: "us-east-1".to_string(),
26//! };
27//! let storage = InMemoryColdStorage::new(config);
28//! # Ok(())
29//! # }
30//! ```
31
32use std::collections::HashMap;
33use std::sync::Mutex;
34use uuid::Uuid;
35
36use crate::error::{Error, Result};
37use crate::model::memory::MemoryRecord;
38
39/// Configuration for S3-compatible cold storage.
40#[derive(Debug, Clone)]
41pub struct ColdStorageConfig {
42    /// S3 bucket name.
43    pub bucket: String,
44    /// S3 key prefix for archived memories.
45    pub prefix: String,
46    /// S3 endpoint URL (for S3-compatible services like MinIO).
47    pub endpoint: Option<String>,
48    /// AWS region.
49    pub region: String,
50}
51
52/// Result of an archive operation.
53#[derive(Debug, Clone)]
54pub struct ArchiveResult {
55    /// The UUID of the archived memory.
56    pub memory_id: Uuid,
57    /// The S3 key where the memory was stored.
58    pub s3_key: String,
59    /// Size of the serialized payload in bytes.
60    pub size_bytes: usize,
61}
62
63/// Result of a restore operation.
64#[derive(Debug, Clone)]
65pub struct RestoreResult {
66    /// The UUID of the restored memory.
67    pub memory_id: Uuid,
68    /// The deserialized memory record.
69    pub record: MemoryRecord,
70}
71
72/// Trait for cold storage backends.
73///
74/// Implementations handle archiving memory records to durable object storage
75/// (e.g., S3, MinIO, GCS) and restoring them on demand.
76#[async_trait::async_trait]
77pub trait ColdStorage: Send + Sync {
78    /// Archive a memory record to cold storage.
79    ///
80    /// Serializes the record to JSON and writes it to the configured bucket
81    /// under the key `{prefix}/{agent_id}/{memory_id}.json`.
82    async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult>;
83
84    /// Restore a memory from cold storage by ID.
85    ///
86    /// Returns an error if the memory is not found in cold storage.
87    async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult>;
88
89    /// List archived memory IDs with optional agent filter.
90    ///
91    /// If `agent_id` is provided, only memories belonging to that agent are
92    /// returned. Results are capped at `limit`.
93    async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>>;
94
95    /// Delete an archived memory permanently.
96    ///
97    /// Returns an error if the memory is not found in cold storage.
98    async fn delete_archived(&self, memory_id: Uuid) -> Result<()>;
99
100    /// Check if a memory is archived.
101    async fn is_archived(&self, memory_id: Uuid) -> Result<bool>;
102}
103
104/// Entry stored in the in-memory cold storage backend.
105///
106/// Holds the serialized JSON bytes alongside the agent ID for filtering
107/// during `list_archived` calls.
108#[derive(Debug, Clone)]
109struct ArchivedEntry {
110    /// The serialized JSON bytes of the memory record.
111    data: Vec<u8>,
112    /// The S3 key that would be used in a real backend.
113    /// Retained for parity with a real S3 backend; not read by the
114    /// in-memory implementation itself.
115    #[allow(dead_code)]
116    s3_key: String,
117    /// Agent ID cached for efficient filtering in `list_archived`.
118    agent_id: String,
119}
120
121/// In-memory cold storage implementation for testing.
122///
123/// Stores serialized memory records in a `HashMap` protected by a `Mutex`.
124/// This avoids any external dependencies while exercising the full
125/// [`ColdStorage`] trait contract.
126pub struct InMemoryColdStorage {
127    config: ColdStorageConfig,
128    store: Mutex<HashMap<Uuid, ArchivedEntry>>,
129}
130
131impl InMemoryColdStorage {
132    /// Create a new in-memory cold storage backend with the given config.
133    pub fn new(config: ColdStorageConfig) -> Self {
134        Self {
135            config,
136            store: Mutex::new(HashMap::new()),
137        }
138    }
139
140    /// Build the S3 key for a given record.
141    fn s3_key(&self, agent_id: &str, memory_id: Uuid) -> String {
142        format!("{}/{}/{}.json", self.config.prefix, agent_id, memory_id)
143    }
144}
145
146#[async_trait::async_trait]
147impl ColdStorage for InMemoryColdStorage {
148    async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult> {
149        let data = serde_json::to_vec(record)?;
150        let size_bytes = data.len();
151        let s3_key = self.s3_key(&record.agent_id, record.id);
152
153        let entry = ArchivedEntry {
154            data,
155            s3_key: s3_key.clone(),
156            agent_id: record.agent_id.clone(),
157        };
158
159        self.store
160            .lock()
161            .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?
162            .insert(record.id, entry);
163
164        Ok(ArchiveResult {
165            memory_id: record.id,
166            s3_key,
167            size_bytes,
168        })
169    }
170
171    async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult> {
172        let guard = self
173            .store
174            .lock()
175            .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
176
177        let entry = guard
178            .get(&memory_id)
179            .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found")))?;
180
181        let record: MemoryRecord = serde_json::from_slice(&entry.data)?;
182
183        Ok(RestoreResult { memory_id, record })
184    }
185
186    async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>> {
187        let guard = self
188            .store
189            .lock()
190            .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
191
192        let ids: Vec<Uuid> = guard
193            .iter()
194            .filter(|(_, entry)| agent_id.is_none_or(|aid| entry.agent_id == aid))
195            .map(|(id, _)| *id)
196            .take(limit)
197            .collect();
198
199        Ok(ids)
200    }
201
202    async fn delete_archived(&self, memory_id: Uuid) -> Result<()> {
203        let mut guard = self
204            .store
205            .lock()
206            .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
207
208        guard
209            .remove(&memory_id)
210            .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found")))?;
211
212        Ok(())
213    }
214
215    async fn is_archived(&self, memory_id: Uuid) -> Result<bool> {
216        let guard = self
217            .store
218            .lock()
219            .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
220
221        Ok(guard.contains_key(&memory_id))
222    }
223}
224
225// ---------------------------------------------------------------------------
226// S3ColdStorage -- real AWS S3 / S3-compatible backend (feature-gated)
227// ---------------------------------------------------------------------------
228
229/// S3-compatible cold storage backend.
230///
231/// Archives memory records as JSON objects in an S3 bucket. Keys follow the
232/// format `{prefix}/{agent_id}/{memory_id}.json`.
233///
234/// # Feature gate
235///
236/// This type is only available when the `s3` Cargo feature is enabled.
237///
238/// # Example
239///
240/// ```rust,ignore
241/// use mnemo_core::storage::cold::{ColdStorageConfig, S3ColdStorage};
242///
243/// # async fn example() -> mnemo_core::error::Result<()> {
244/// let config = ColdStorageConfig {
245///     bucket: "my-bucket".to_string(),
246///     prefix: "memories".to_string(),
247///     endpoint: Some("http://localhost:9000".to_string()),
248///     region: "us-east-1".to_string(),
249/// };
250/// let storage = S3ColdStorage::new(config).await;
251/// # Ok(())
252/// # }
253/// ```
254#[cfg(feature = "s3")]
255pub struct S3ColdStorage {
256    /// The underlying AWS S3 client.
257    client: aws_sdk_s3::Client,
258    /// Cold storage configuration (bucket, prefix, region, endpoint).
259    config: ColdStorageConfig,
260}
261
262#[cfg(feature = "s3")]
263impl S3ColdStorage {
264    /// Create a new S3 cold storage backend.
265    ///
266    /// Loads AWS credentials from the default provider chain. When
267    /// `config.endpoint` is set the client will target that URL instead of
268    /// the real AWS endpoint, which is useful for S3-compatible services
269    /// such as MinIO or LocalStack.
270    pub async fn new(config: ColdStorageConfig) -> Self {
271        let mut aws_cfg_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
272            .region(aws_config::Region::new(config.region.clone()));
273
274        if let Some(ref endpoint) = config.endpoint {
275            aws_cfg_loader = aws_cfg_loader.endpoint_url(endpoint);
276        }
277
278        let aws_cfg = aws_cfg_loader.load().await;
279
280        let client = aws_sdk_s3::Client::new(&aws_cfg);
281
282        Self { client, config }
283    }
284
285    /// Build the S3 key for a given agent and memory.
286    fn s3_key(&self, agent_id: &str, memory_id: Uuid) -> String {
287        format!("{}/{}/{}.json", self.config.prefix, agent_id, memory_id)
288    }
289
290    /// Build the S3 prefix for listing objects belonging to an agent.
291    fn agent_prefix(&self, agent_id: &str) -> String {
292        format!("{}/{}/", self.config.prefix, agent_id)
293    }
294
295    /// Build the bare prefix for listing all archived objects.
296    fn bare_prefix(&self) -> String {
297        format!("{}/", self.config.prefix)
298    }
299}
300
301#[cfg(feature = "s3")]
302#[async_trait::async_trait]
303impl ColdStorage for S3ColdStorage {
304    async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult> {
305        let data = serde_json::to_vec(record)?;
306        let size_bytes = data.len();
307        let s3_key = self.s3_key(&record.agent_id, record.id);
308
309        self.client
310            .put_object()
311            .bucket(&self.config.bucket)
312            .key(&s3_key)
313            .body(aws_sdk_s3::primitives::ByteStream::from(data))
314            .content_type("application/json")
315            .send()
316            .await
317            .map_err(|e| Error::Storage(format!("S3 put_object failed: {e}")))?;
318
319        Ok(ArchiveResult {
320            memory_id: record.id,
321            s3_key,
322            size_bytes,
323        })
324    }
325
326    async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult> {
327        // We do not know the agent_id up front, so we search for the key by
328        // listing objects with the bare prefix and filtering by the memory_id
329        // suffix. This costs one LIST call but avoids requiring the caller to
330        // pass the agent_id for restores.
331        let prefix = self.bare_prefix();
332
333        let list_resp = self
334            .client
335            .list_objects_v2()
336            .bucket(&self.config.bucket)
337            .prefix(&prefix)
338            .send()
339            .await
340            .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
341
342        let target_suffix = format!("{memory_id}.json");
343
344        let key = list_resp
345            .contents()
346            .iter()
347            .filter_map(|obj| obj.key())
348            .find(|k| k.ends_with(&target_suffix))
349            .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found in S3")))?
350            .to_string();
351
352        let get_resp = self
353            .client
354            .get_object()
355            .bucket(&self.config.bucket)
356            .key(&key)
357            .send()
358            .await
359            .map_err(|e| Error::Storage(format!("S3 get_object failed: {e}")))?;
360
361        let body = get_resp
362            .body
363            .collect()
364            .await
365            .map_err(|e| Error::Storage(format!("S3 body collect failed: {e}")))?;
366
367        let record: MemoryRecord = serde_json::from_slice(&body.into_bytes())?;
368
369        Ok(RestoreResult { memory_id, record })
370    }
371
372    async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>> {
373        let prefix = match agent_id {
374            Some(aid) => self.agent_prefix(aid),
375            None => self.bare_prefix(),
376        };
377
378        let mut ids: Vec<Uuid> = Vec::new();
379        let mut continuation_token: Option<String> = None;
380
381        loop {
382            let mut req = self
383                .client
384                .list_objects_v2()
385                .bucket(&self.config.bucket)
386                .prefix(&prefix)
387                .max_keys(limit.min(1000) as i32);
388
389            if let Some(ref token) = continuation_token {
390                req = req.continuation_token(token);
391            }
392
393            let resp = req
394                .send()
395                .await
396                .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
397
398            for obj in resp.contents() {
399                if ids.len() >= limit {
400                    return Ok(ids);
401                }
402
403                if let Some(key) = obj.key() {
404                    // Extract UUID from key: {prefix}/{agent_id}/{uuid}.json
405                    if let Some(filename) = key.rsplit('/').next() {
406                        if let Some(uuid_str) = filename.strip_suffix(".json") {
407                            if let Ok(uuid) = Uuid::parse_str(uuid_str) {
408                                ids.push(uuid);
409                            }
410                        }
411                    }
412                }
413            }
414
415            if ids.len() >= limit {
416                return Ok(ids);
417            }
418
419            match resp.next_continuation_token() {
420                Some(token) if resp.is_truncated() == Some(true) => {
421                    continuation_token = Some(token.to_string());
422                }
423                _ => break,
424            }
425        }
426
427        Ok(ids)
428    }
429
430    async fn delete_archived(&self, memory_id: Uuid) -> Result<()> {
431        // Find the key first (we need the full path including agent_id).
432        let prefix = self.bare_prefix();
433        let target_suffix = format!("{memory_id}.json");
434
435        let list_resp = self
436            .client
437            .list_objects_v2()
438            .bucket(&self.config.bucket)
439            .prefix(&prefix)
440            .send()
441            .await
442            .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
443
444        let key = list_resp
445            .contents()
446            .iter()
447            .filter_map(|obj| obj.key())
448            .find(|k| k.ends_with(&target_suffix))
449            .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found in S3")))?
450            .to_string();
451
452        self.client
453            .delete_object()
454            .bucket(&self.config.bucket)
455            .key(&key)
456            .send()
457            .await
458            .map_err(|e| Error::Storage(format!("S3 delete_object failed: {e}")))?;
459
460        Ok(())
461    }
462
463    async fn is_archived(&self, memory_id: Uuid) -> Result<bool> {
464        // We search for the key by listing with the bare prefix and checking
465        // for a matching suffix. An alternative approach using head_object
466        // would require knowing the full key (including agent_id).
467        let prefix = self.bare_prefix();
468        let target_suffix = format!("{memory_id}.json");
469
470        let list_resp = self
471            .client
472            .list_objects_v2()
473            .bucket(&self.config.bucket)
474            .prefix(&prefix)
475            .send()
476            .await
477            .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
478
479        Ok(list_resp
480            .contents()
481            .iter()
482            .filter_map(|obj| obj.key())
483            .any(|k| k.ends_with(&target_suffix)))
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490    use crate::model::memory::{ConsolidationState, MemoryType, Scope, SourceType};
491
492    fn sample_config() -> ColdStorageConfig {
493        ColdStorageConfig {
494            bucket: "test-bucket".to_string(),
495            prefix: "memories".to_string(),
496            endpoint: None,
497            region: "us-east-1".to_string(),
498        }
499    }
500
501    fn sample_record(agent_id: &str) -> MemoryRecord {
502        MemoryRecord {
503            id: Uuid::now_v7(),
504            agent_id: agent_id.to_string(),
505            content: "The user prefers dark mode".to_string(),
506            memory_type: MemoryType::Semantic,
507            scope: Scope::Private,
508            importance: 0.8,
509            tags: vec!["preference".to_string(), "ui".to_string()],
510            metadata: serde_json::json!({"source": "conversation"}),
511            embedding: None,
512            content_hash: vec![1, 2, 3],
513            prev_hash: None,
514            source_type: SourceType::Agent,
515            source_id: None,
516            consolidation_state: ConsolidationState::Raw,
517            access_count: 0,
518            org_id: None,
519            thread_id: None,
520            created_at: "2025-01-01T00:00:00Z".to_string(),
521            updated_at: "2025-01-01T00:00:00Z".to_string(),
522            last_accessed_at: None,
523            expires_at: None,
524            deleted_at: None,
525            decay_rate: None,
526            created_by: None,
527            version: 1,
528            prev_version_id: None,
529            quarantined: false,
530            quarantine_reason: None,
531            decay_function: None,
532        }
533    }
534
535    #[tokio::test]
536    async fn test_archive_and_restore() {
537        let storage = InMemoryColdStorage::new(sample_config());
538        let record = sample_record("agent-1");
539        let id = record.id;
540
541        // Archive
542        let result = storage.archive(&record).await.unwrap();
543        assert_eq!(result.memory_id, id);
544        assert!(result.size_bytes > 0);
545        assert_eq!(result.s3_key, format!("memories/agent-1/{id}.json"));
546
547        // Restore and verify round-trip fidelity
548        let restored = storage.restore(id).await.unwrap();
549        assert_eq!(restored.memory_id, id);
550        assert_eq!(restored.record, record);
551    }
552
553    #[tokio::test]
554    async fn test_list_archived() {
555        let storage = InMemoryColdStorage::new(sample_config());
556
557        let r1 = sample_record("agent-1");
558        let r2 = sample_record("agent-1");
559        let r3 = sample_record("agent-2");
560
561        let id1 = r1.id;
562        let id2 = r2.id;
563        let id3 = r3.id;
564
565        storage.archive(&r1).await.unwrap();
566        storage.archive(&r2).await.unwrap();
567        storage.archive(&r3).await.unwrap();
568
569        // List all
570        let all = storage.list_archived(None, 100).await.unwrap();
571        assert_eq!(all.len(), 3);
572        assert!(all.contains(&id1));
573        assert!(all.contains(&id2));
574        assert!(all.contains(&id3));
575
576        // Filter by agent-1
577        let agent1_ids = storage.list_archived(Some("agent-1"), 100).await.unwrap();
578        assert_eq!(agent1_ids.len(), 2);
579        assert!(agent1_ids.contains(&id1));
580        assert!(agent1_ids.contains(&id2));
581
582        // Filter by agent-2
583        let agent2_ids = storage.list_archived(Some("agent-2"), 100).await.unwrap();
584        assert_eq!(agent2_ids.len(), 1);
585        assert!(agent2_ids.contains(&id3));
586
587        // Limit
588        let limited = storage.list_archived(None, 2).await.unwrap();
589        assert_eq!(limited.len(), 2);
590    }
591
592    #[tokio::test]
593    async fn test_delete_archived() {
594        let storage = InMemoryColdStorage::new(sample_config());
595        let record = sample_record("agent-1");
596        let id = record.id;
597
598        storage.archive(&record).await.unwrap();
599        assert!(storage.is_archived(id).await.unwrap());
600
601        // Delete
602        storage.delete_archived(id).await.unwrap();
603        assert!(!storage.is_archived(id).await.unwrap());
604
605        // Restore should fail
606        let err = storage.restore(id).await.unwrap_err();
607        assert!(
608            err.to_string().contains("not found"),
609            "expected not-found error, got: {err}"
610        );
611
612        // Double-delete should fail
613        let err = storage.delete_archived(id).await.unwrap_err();
614        assert!(
615            err.to_string().contains("not found"),
616            "expected not-found error, got: {err}"
617        );
618    }
619
620    #[tokio::test]
621    async fn test_is_archived() {
622        let storage = InMemoryColdStorage::new(sample_config());
623        let record = sample_record("agent-1");
624        let id = record.id;
625
626        // Not archived yet
627        assert!(!storage.is_archived(id).await.unwrap());
628
629        // Archive
630        storage.archive(&record).await.unwrap();
631        assert!(storage.is_archived(id).await.unwrap());
632    }
633}