1use std::collections::HashMap;
33use std::sync::Mutex;
34use uuid::Uuid;
35
36use crate::error::{Error, Result};
37use crate::model::memory::MemoryRecord;
38
39#[derive(Debug, Clone)]
41pub struct ColdStorageConfig {
42 pub bucket: String,
44 pub prefix: String,
46 pub endpoint: Option<String>,
48 pub region: String,
50}
51
52#[derive(Debug, Clone)]
54pub struct ArchiveResult {
55 pub memory_id: Uuid,
57 pub s3_key: String,
59 pub size_bytes: usize,
61}
62
63#[derive(Debug, Clone)]
65pub struct RestoreResult {
66 pub memory_id: Uuid,
68 pub record: MemoryRecord,
70}
71
72#[async_trait::async_trait]
77pub trait ColdStorage: Send + Sync {
78 async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult>;
83
84 async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult>;
88
89 async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>>;
94
95 async fn delete_archived(&self, memory_id: Uuid) -> Result<()>;
99
100 async fn is_archived(&self, memory_id: Uuid) -> Result<bool>;
102}
103
104#[derive(Debug, Clone)]
109struct ArchivedEntry {
110 data: Vec<u8>,
112 #[allow(dead_code)]
116 s3_key: String,
117 agent_id: String,
119}
120
121pub struct InMemoryColdStorage {
127 config: ColdStorageConfig,
128 store: Mutex<HashMap<Uuid, ArchivedEntry>>,
129}
130
131impl InMemoryColdStorage {
132 pub fn new(config: ColdStorageConfig) -> Self {
134 Self {
135 config,
136 store: Mutex::new(HashMap::new()),
137 }
138 }
139
140 fn s3_key(&self, agent_id: &str, memory_id: Uuid) -> String {
142 format!("{}/{}/{}.json", self.config.prefix, agent_id, memory_id)
143 }
144}
145
146#[async_trait::async_trait]
147impl ColdStorage for InMemoryColdStorage {
148 async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult> {
149 let data = serde_json::to_vec(record)?;
150 let size_bytes = data.len();
151 let s3_key = self.s3_key(&record.agent_id, record.id);
152
153 let entry = ArchivedEntry {
154 data,
155 s3_key: s3_key.clone(),
156 agent_id: record.agent_id.clone(),
157 };
158
159 self.store
160 .lock()
161 .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?
162 .insert(record.id, entry);
163
164 Ok(ArchiveResult {
165 memory_id: record.id,
166 s3_key,
167 size_bytes,
168 })
169 }
170
171 async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult> {
172 let guard = self
173 .store
174 .lock()
175 .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
176
177 let entry = guard
178 .get(&memory_id)
179 .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found")))?;
180
181 let record: MemoryRecord = serde_json::from_slice(&entry.data)?;
182
183 Ok(RestoreResult { memory_id, record })
184 }
185
186 async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>> {
187 let guard = self
188 .store
189 .lock()
190 .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
191
192 let ids: Vec<Uuid> = guard
193 .iter()
194 .filter(|(_, entry)| agent_id.is_none_or(|aid| entry.agent_id == aid))
195 .map(|(id, _)| *id)
196 .take(limit)
197 .collect();
198
199 Ok(ids)
200 }
201
202 async fn delete_archived(&self, memory_id: Uuid) -> Result<()> {
203 let mut guard = self
204 .store
205 .lock()
206 .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
207
208 guard
209 .remove(&memory_id)
210 .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found")))?;
211
212 Ok(())
213 }
214
215 async fn is_archived(&self, memory_id: Uuid) -> Result<bool> {
216 let guard = self
217 .store
218 .lock()
219 .map_err(|e| Error::Internal(format!("lock poisoned: {e}")))?;
220
221 Ok(guard.contains_key(&memory_id))
222 }
223}
224
225#[cfg(feature = "s3")]
255pub struct S3ColdStorage {
256 client: aws_sdk_s3::Client,
258 config: ColdStorageConfig,
260}
261
262#[cfg(feature = "s3")]
263impl S3ColdStorage {
264 pub async fn new(config: ColdStorageConfig) -> Self {
271 let mut aws_cfg_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
272 .region(aws_config::Region::new(config.region.clone()));
273
274 if let Some(ref endpoint) = config.endpoint {
275 aws_cfg_loader = aws_cfg_loader.endpoint_url(endpoint);
276 }
277
278 let aws_cfg = aws_cfg_loader.load().await;
279
280 let client = aws_sdk_s3::Client::new(&aws_cfg);
281
282 Self { client, config }
283 }
284
285 fn s3_key(&self, agent_id: &str, memory_id: Uuid) -> String {
287 format!("{}/{}/{}.json", self.config.prefix, agent_id, memory_id)
288 }
289
290 fn agent_prefix(&self, agent_id: &str) -> String {
292 format!("{}/{}/", self.config.prefix, agent_id)
293 }
294
295 fn bare_prefix(&self) -> String {
297 format!("{}/", self.config.prefix)
298 }
299}
300
301#[cfg(feature = "s3")]
302#[async_trait::async_trait]
303impl ColdStorage for S3ColdStorage {
304 async fn archive(&self, record: &MemoryRecord) -> Result<ArchiveResult> {
305 let data = serde_json::to_vec(record)?;
306 let size_bytes = data.len();
307 let s3_key = self.s3_key(&record.agent_id, record.id);
308
309 self.client
310 .put_object()
311 .bucket(&self.config.bucket)
312 .key(&s3_key)
313 .body(aws_sdk_s3::primitives::ByteStream::from(data))
314 .content_type("application/json")
315 .send()
316 .await
317 .map_err(|e| Error::Storage(format!("S3 put_object failed: {e}")))?;
318
319 Ok(ArchiveResult {
320 memory_id: record.id,
321 s3_key,
322 size_bytes,
323 })
324 }
325
326 async fn restore(&self, memory_id: Uuid) -> Result<RestoreResult> {
327 let prefix = self.bare_prefix();
332
333 let list_resp = self
334 .client
335 .list_objects_v2()
336 .bucket(&self.config.bucket)
337 .prefix(&prefix)
338 .send()
339 .await
340 .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
341
342 let target_suffix = format!("{memory_id}.json");
343
344 let key = list_resp
345 .contents()
346 .iter()
347 .filter_map(|obj| obj.key())
348 .find(|k| k.ends_with(&target_suffix))
349 .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found in S3")))?
350 .to_string();
351
352 let get_resp = self
353 .client
354 .get_object()
355 .bucket(&self.config.bucket)
356 .key(&key)
357 .send()
358 .await
359 .map_err(|e| Error::Storage(format!("S3 get_object failed: {e}")))?;
360
361 let body = get_resp
362 .body
363 .collect()
364 .await
365 .map_err(|e| Error::Storage(format!("S3 body collect failed: {e}")))?;
366
367 let record: MemoryRecord = serde_json::from_slice(&body.into_bytes())?;
368
369 Ok(RestoreResult { memory_id, record })
370 }
371
372 async fn list_archived(&self, agent_id: Option<&str>, limit: usize) -> Result<Vec<Uuid>> {
373 let prefix = match agent_id {
374 Some(aid) => self.agent_prefix(aid),
375 None => self.bare_prefix(),
376 };
377
378 let mut ids: Vec<Uuid> = Vec::new();
379 let mut continuation_token: Option<String> = None;
380
381 loop {
382 let mut req = self
383 .client
384 .list_objects_v2()
385 .bucket(&self.config.bucket)
386 .prefix(&prefix)
387 .max_keys(limit.min(1000) as i32);
388
389 if let Some(ref token) = continuation_token {
390 req = req.continuation_token(token);
391 }
392
393 let resp = req
394 .send()
395 .await
396 .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
397
398 for obj in resp.contents() {
399 if ids.len() >= limit {
400 return Ok(ids);
401 }
402
403 if let Some(key) = obj.key() {
404 if let Some(filename) = key.rsplit('/').next() {
406 if let Some(uuid_str) = filename.strip_suffix(".json") {
407 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
408 ids.push(uuid);
409 }
410 }
411 }
412 }
413 }
414
415 if ids.len() >= limit {
416 return Ok(ids);
417 }
418
419 match resp.next_continuation_token() {
420 Some(token) if resp.is_truncated() == Some(true) => {
421 continuation_token = Some(token.to_string());
422 }
423 _ => break,
424 }
425 }
426
427 Ok(ids)
428 }
429
430 async fn delete_archived(&self, memory_id: Uuid) -> Result<()> {
431 let prefix = self.bare_prefix();
433 let target_suffix = format!("{memory_id}.json");
434
435 let list_resp = self
436 .client
437 .list_objects_v2()
438 .bucket(&self.config.bucket)
439 .prefix(&prefix)
440 .send()
441 .await
442 .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
443
444 let key = list_resp
445 .contents()
446 .iter()
447 .filter_map(|obj| obj.key())
448 .find(|k| k.ends_with(&target_suffix))
449 .ok_or_else(|| Error::NotFound(format!("archived memory {memory_id} not found in S3")))?
450 .to_string();
451
452 self.client
453 .delete_object()
454 .bucket(&self.config.bucket)
455 .key(&key)
456 .send()
457 .await
458 .map_err(|e| Error::Storage(format!("S3 delete_object failed: {e}")))?;
459
460 Ok(())
461 }
462
463 async fn is_archived(&self, memory_id: Uuid) -> Result<bool> {
464 let prefix = self.bare_prefix();
468 let target_suffix = format!("{memory_id}.json");
469
470 let list_resp = self
471 .client
472 .list_objects_v2()
473 .bucket(&self.config.bucket)
474 .prefix(&prefix)
475 .send()
476 .await
477 .map_err(|e| Error::Storage(format!("S3 list_objects_v2 failed: {e}")))?;
478
479 Ok(list_resp
480 .contents()
481 .iter()
482 .filter_map(|obj| obj.key())
483 .any(|k| k.ends_with(&target_suffix)))
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use crate::model::memory::{ConsolidationState, MemoryType, Scope, SourceType};
491
492 fn sample_config() -> ColdStorageConfig {
493 ColdStorageConfig {
494 bucket: "test-bucket".to_string(),
495 prefix: "memories".to_string(),
496 endpoint: None,
497 region: "us-east-1".to_string(),
498 }
499 }
500
501 fn sample_record(agent_id: &str) -> MemoryRecord {
502 MemoryRecord {
503 id: Uuid::now_v7(),
504 agent_id: agent_id.to_string(),
505 content: "The user prefers dark mode".to_string(),
506 memory_type: MemoryType::Semantic,
507 scope: Scope::Private,
508 importance: 0.8,
509 tags: vec!["preference".to_string(), "ui".to_string()],
510 metadata: serde_json::json!({"source": "conversation"}),
511 embedding: None,
512 content_hash: vec![1, 2, 3],
513 prev_hash: None,
514 source_type: SourceType::Agent,
515 source_id: None,
516 consolidation_state: ConsolidationState::Raw,
517 access_count: 0,
518 org_id: None,
519 thread_id: None,
520 created_at: "2025-01-01T00:00:00Z".to_string(),
521 updated_at: "2025-01-01T00:00:00Z".to_string(),
522 last_accessed_at: None,
523 expires_at: None,
524 deleted_at: None,
525 decay_rate: None,
526 created_by: None,
527 version: 1,
528 prev_version_id: None,
529 quarantined: false,
530 quarantine_reason: None,
531 decay_function: None,
532 }
533 }
534
535 #[tokio::test]
536 async fn test_archive_and_restore() {
537 let storage = InMemoryColdStorage::new(sample_config());
538 let record = sample_record("agent-1");
539 let id = record.id;
540
541 let result = storage.archive(&record).await.unwrap();
543 assert_eq!(result.memory_id, id);
544 assert!(result.size_bytes > 0);
545 assert_eq!(result.s3_key, format!("memories/agent-1/{id}.json"));
546
547 let restored = storage.restore(id).await.unwrap();
549 assert_eq!(restored.memory_id, id);
550 assert_eq!(restored.record, record);
551 }
552
553 #[tokio::test]
554 async fn test_list_archived() {
555 let storage = InMemoryColdStorage::new(sample_config());
556
557 let r1 = sample_record("agent-1");
558 let r2 = sample_record("agent-1");
559 let r3 = sample_record("agent-2");
560
561 let id1 = r1.id;
562 let id2 = r2.id;
563 let id3 = r3.id;
564
565 storage.archive(&r1).await.unwrap();
566 storage.archive(&r2).await.unwrap();
567 storage.archive(&r3).await.unwrap();
568
569 let all = storage.list_archived(None, 100).await.unwrap();
571 assert_eq!(all.len(), 3);
572 assert!(all.contains(&id1));
573 assert!(all.contains(&id2));
574 assert!(all.contains(&id3));
575
576 let agent1_ids = storage.list_archived(Some("agent-1"), 100).await.unwrap();
578 assert_eq!(agent1_ids.len(), 2);
579 assert!(agent1_ids.contains(&id1));
580 assert!(agent1_ids.contains(&id2));
581
582 let agent2_ids = storage.list_archived(Some("agent-2"), 100).await.unwrap();
584 assert_eq!(agent2_ids.len(), 1);
585 assert!(agent2_ids.contains(&id3));
586
587 let limited = storage.list_archived(None, 2).await.unwrap();
589 assert_eq!(limited.len(), 2);
590 }
591
592 #[tokio::test]
593 async fn test_delete_archived() {
594 let storage = InMemoryColdStorage::new(sample_config());
595 let record = sample_record("agent-1");
596 let id = record.id;
597
598 storage.archive(&record).await.unwrap();
599 assert!(storage.is_archived(id).await.unwrap());
600
601 storage.delete_archived(id).await.unwrap();
603 assert!(!storage.is_archived(id).await.unwrap());
604
605 let err = storage.restore(id).await.unwrap_err();
607 assert!(
608 err.to_string().contains("not found"),
609 "expected not-found error, got: {err}"
610 );
611
612 let err = storage.delete_archived(id).await.unwrap_err();
614 assert!(
615 err.to_string().contains("not found"),
616 "expected not-found error, got: {err}"
617 );
618 }
619
620 #[tokio::test]
621 async fn test_is_archived() {
622 let storage = InMemoryColdStorage::new(sample_config());
623 let record = sample_record("agent-1");
624 let id = record.id;
625
626 assert!(!storage.is_archived(id).await.unwrap());
628
629 storage.archive(&record).await.unwrap();
631 assert!(storage.is_archived(id).await.unwrap());
632 }
633}