1use async_trait::async_trait;
7use chrono::Utc;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::{Duration, SystemTime};
11use tempfile::NamedTempFile;
12
13use super::types::{
14 AgentContext, ContextError, ContextId, ContextPersistence, HierarchicalMemory, KnowledgeBase,
15 MemoryItem, MemoryType, RetentionPolicy, SemanticMemoryItem, SessionId, StorageStats,
16};
17use crate::types::AgentId;
18
19pub struct MarkdownMemoryStore {
30 root_dir: PathBuf,
31 retention: Duration,
32}
33
34impl MarkdownMemoryStore {
35 pub fn new(root_dir: PathBuf, retention: Duration) -> Self {
41 Self {
42 root_dir,
43 retention,
44 }
45 }
46
47 fn agent_dir(&self, agent_id: AgentId) -> PathBuf {
49 self.root_dir.join(agent_id.to_string())
50 }
51
52 fn memory_path(&self, agent_id: AgentId) -> PathBuf {
54 self.agent_dir(agent_id).join("memory.md")
55 }
56
57 fn logs_dir(&self, agent_id: AgentId) -> PathBuf {
59 self.agent_dir(agent_id).join("logs")
60 }
61
62 fn memory_to_markdown(&self, agent_id: AgentId, memory: &HierarchicalMemory) -> String {
64 let now = Utc::now().format("%Y-%m-%dT%H:%M:%SZ");
65 let mut md = format!("# Agent Memory: {}\nUpdated: {}\n", agent_id, now);
66
67 let facts: Vec<&MemoryItem> = memory
69 .long_term
70 .iter()
71 .filter(|item| item.memory_type == MemoryType::Factual)
72 .collect();
73 if !facts.is_empty() {
74 md.push_str("\n## Facts\n");
75 for fact in &facts {
76 md.push_str(&format!("- {}\n", fact.content));
77 }
78 }
79
80 let procedures: Vec<&MemoryItem> = memory
82 .long_term
83 .iter()
84 .filter(|item| item.memory_type == MemoryType::Procedural)
85 .collect();
86 if !procedures.is_empty() {
87 md.push_str("\n## Procedures\n");
88 for proc in &procedures {
89 md.push_str(&format!("- {}\n", proc.content));
90 }
91 }
92
93 if !memory.semantic_memory.is_empty() {
95 md.push_str("\n## Learned Patterns\n");
96 for item in &memory.semantic_memory {
97 md.push_str(&format!("- {}\n", item.concept));
98 }
99 }
100
101 md
102 }
103
104 fn markdown_to_memory(&self, markdown: &str) -> HierarchicalMemory {
106 let mut memory = HierarchicalMemory::default();
107 let mut current_section: Option<&str> = None;
108
109 for line in markdown.lines() {
110 let trimmed = line.trim();
111
112 if trimmed == "## Facts" {
113 current_section = Some("facts");
114 continue;
115 } else if trimmed == "## Procedures" {
116 current_section = Some("procedures");
117 continue;
118 } else if trimmed == "## Learned Patterns" {
119 current_section = Some("patterns");
120 continue;
121 } else if trimmed.starts_with("## ") || trimmed.starts_with("# ") {
122 current_section = None;
123 continue;
124 }
125
126 if let Some(content) = trimmed.strip_prefix("- ") {
127 let now = SystemTime::now();
128 match current_section {
129 Some("facts") => {
130 memory.long_term.push(MemoryItem {
131 id: ContextId::new(),
132 content: content.to_string(),
133 memory_type: MemoryType::Factual,
134 importance: 0.5,
135 access_count: 0,
136 last_accessed: now,
137 created_at: now,
138 embedding: None,
139 metadata: HashMap::new(),
140 });
141 }
142 Some("procedures") => {
143 memory.long_term.push(MemoryItem {
144 id: ContextId::new(),
145 content: content.to_string(),
146 memory_type: MemoryType::Procedural,
147 importance: 0.5,
148 access_count: 0,
149 last_accessed: now,
150 created_at: now,
151 embedding: None,
152 metadata: HashMap::new(),
153 });
154 }
155 Some("patterns") => {
156 memory.semantic_memory.push(SemanticMemoryItem {
157 id: ContextId::new(),
158 concept: content.to_string(),
159 relationships: vec![],
160 properties: HashMap::new(),
161 confidence: 0.5,
162 created_at: now,
163 updated_at: now,
164 });
165 }
166 _ => {}
167 }
168 }
169 }
170
171 memory
172 }
173
174 pub async fn compact(&self, agent_id: AgentId) -> Result<(), ContextError> {
176 let logs_dir = self.logs_dir(agent_id);
177 let retention = self.retention;
178
179 tokio::task::spawn_blocking(move || {
180 if !logs_dir.exists() {
181 return Ok(());
182 }
183
184 let cutoff = SystemTime::now()
185 .checked_sub(retention)
186 .unwrap_or(SystemTime::UNIX_EPOCH);
187
188 let entries = std::fs::read_dir(&logs_dir).map_err(|e| ContextError::StorageError {
189 reason: format!("Failed to read logs directory: {}", e),
190 })?;
191
192 for entry in entries {
193 let entry = entry.map_err(|e| ContextError::StorageError {
194 reason: format!("Failed to read log entry: {}", e),
195 })?;
196
197 let path = entry.path();
198 if path.extension().and_then(|e| e.to_str()) != Some("md") {
199 continue;
200 }
201
202 let metadata =
203 std::fs::metadata(&path).map_err(|e| ContextError::StorageError {
204 reason: format!("Failed to read log metadata: {}", e),
205 })?;
206
207 let modified = metadata
208 .modified()
209 .map_err(|e| ContextError::StorageError {
210 reason: format!("Failed to read modification time: {}", e),
211 })?;
212
213 if modified < cutoff {
214 std::fs::remove_file(&path).map_err(|e| ContextError::StorageError {
215 reason: format!("Failed to remove old log file: {}", e),
216 })?;
217 }
218 }
219
220 Ok(())
221 })
222 .await
223 .map_err(|e| ContextError::StorageError {
224 reason: format!("Blocking task failed: {}", e),
225 })?
226 }
227}
228
229fn append_daily_log_sync(
231 logs_dir: &std::path::Path,
232 context: &AgentContext,
233) -> Result<(), ContextError> {
234 std::fs::create_dir_all(logs_dir).map_err(|e| ContextError::StorageError {
235 reason: format!("Failed to create logs directory: {}", e),
236 })?;
237
238 let today = Utc::now().format("%Y-%m-%d").to_string();
239 let log_path = logs_dir.join(format!("{}.md", today));
240
241 use std::io::Write;
242 let mut file = std::fs::OpenOptions::new()
243 .create(true)
244 .append(true)
245 .open(&log_path)
246 .map_err(|e| ContextError::StorageError {
247 reason: format!("Failed to open daily log: {}", e),
248 })?;
249
250 let now = Utc::now().format("%Y-%m-%dT%H:%M:%SZ");
251 let memory_count = context.memory.long_term.len() + context.memory.short_term.len();
252 let knowledge_count = context.knowledge_base.facts.len()
253 + context.knowledge_base.procedures.len()
254 + context.knowledge_base.learned_patterns.len();
255
256 writeln!(
257 file,
258 "### {}\n- Memory items: {}\n- Knowledge items: {}\n",
259 now, memory_count, knowledge_count
260 )
261 .map_err(|e| ContextError::StorageError {
262 reason: format!("Failed to write daily log: {}", e),
263 })?;
264
265 Ok(())
266}
267
268fn dir_size_sync(path: &std::path::Path) -> Result<u64, ContextError> {
270 let mut total: u64 = 0;
271 if !path.exists() {
272 return Ok(0);
273 }
274 let entries = std::fs::read_dir(path).map_err(|e| ContextError::StorageError {
275 reason: format!("Failed to read directory: {}", e),
276 })?;
277 for entry in entries {
278 let entry = entry.map_err(|e| ContextError::StorageError {
279 reason: format!("Failed to read entry: {}", e),
280 })?;
281 let meta = entry.metadata().map_err(|e| ContextError::StorageError {
282 reason: format!("Failed to read metadata: {}", e),
283 })?;
284 if meta.is_dir() {
285 total += dir_size_sync(&entry.path())?;
286 } else {
287 total += meta.len();
288 }
289 }
290 Ok(total)
291}
292
293#[async_trait]
294impl ContextPersistence for MarkdownMemoryStore {
295 async fn save_context(
296 &self,
297 agent_id: AgentId,
298 context: &AgentContext,
299 ) -> Result<(), ContextError> {
300 let agent_dir = self.agent_dir(agent_id);
301 let markdown = self.memory_to_markdown(agent_id, &context.memory);
302 let memory_path = self.memory_path(agent_id);
303 let logs_dir = self.logs_dir(agent_id);
304 let context_clone = context.clone();
305
306 tokio::task::spawn_blocking(move || {
307 std::fs::create_dir_all(&agent_dir).map_err(|e| ContextError::StorageError {
308 reason: format!("Failed to create agent directory: {}", e),
309 })?;
310
311 let temp =
313 NamedTempFile::new_in(&agent_dir).map_err(|e| ContextError::StorageError {
314 reason: format!("Failed to create temp file: {}", e),
315 })?;
316
317 std::fs::write(temp.path(), markdown.as_bytes()).map_err(|e| {
318 ContextError::StorageError {
319 reason: format!("Failed to write temp file: {}", e),
320 }
321 })?;
322
323 temp.persist(&memory_path)
324 .map_err(|e| ContextError::StorageError {
325 reason: format!("Failed to persist memory file: {}", e),
326 })?;
327
328 append_daily_log_sync(&logs_dir, &context_clone)
330 })
331 .await
332 .map_err(|e| ContextError::StorageError {
333 reason: format!("Blocking task failed: {}", e),
334 })?
335 }
336
337 async fn load_context(&self, agent_id: AgentId) -> Result<Option<AgentContext>, ContextError> {
338 let memory_path = self.memory_path(agent_id);
339
340 let markdown = match tokio::fs::read_to_string(&memory_path).await {
341 Ok(content) => content,
342 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
343 Err(e) => {
344 return Err(ContextError::StorageError {
345 reason: format!("Failed to read memory file: {}", e),
346 })
347 }
348 };
349
350 let memory = self.markdown_to_memory(&markdown);
351 let now = SystemTime::now();
352
353 let context = AgentContext {
354 agent_id,
355 session_id: SessionId::new(),
356 memory,
357 knowledge_base: KnowledgeBase::default(),
358 conversation_history: vec![],
359 metadata: HashMap::new(),
360 created_at: now,
361 updated_at: now,
362 retention_policy: RetentionPolicy::default(),
363 };
364
365 Ok(Some(context))
366 }
367
368 async fn delete_context(&self, agent_id: AgentId) -> Result<(), ContextError> {
369 let agent_dir = self.agent_dir(agent_id);
370 match tokio::fs::remove_dir_all(&agent_dir).await {
371 Ok(()) => Ok(()),
372 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
373 Err(e) => Err(ContextError::StorageError {
374 reason: format!("Failed to delete agent directory: {}", e),
375 }),
376 }
377 }
378
379 async fn list_agent_contexts(&self) -> Result<Vec<AgentId>, ContextError> {
380 let root_dir = self.root_dir.clone();
381 tokio::task::spawn_blocking(move || {
382 let mut agent_ids = Vec::new();
383
384 if !root_dir.exists() {
385 return Ok(agent_ids);
386 }
387
388 let entries = std::fs::read_dir(&root_dir).map_err(|e| ContextError::StorageError {
389 reason: format!("Failed to read root directory: {}", e),
390 })?;
391
392 for entry in entries {
393 let entry = entry.map_err(|e| ContextError::StorageError {
394 reason: format!("Failed to read directory entry: {}", e),
395 })?;
396
397 if entry.metadata().map(|m| m.is_dir()).unwrap_or(false) {
398 if let Some(name) = entry.file_name().to_str() {
399 if let Ok(uuid) = uuid::Uuid::parse_str(name) {
400 agent_ids.push(AgentId(uuid));
401 }
402 }
403 }
404 }
405
406 Ok(agent_ids)
407 })
408 .await
409 .map_err(|e| ContextError::StorageError {
410 reason: format!("Blocking task failed: {}", e),
411 })?
412 }
413
414 async fn context_exists(&self, agent_id: AgentId) -> Result<bool, ContextError> {
415 Ok(tokio::fs::try_exists(self.memory_path(agent_id))
416 .await
417 .unwrap_or(false))
418 }
419
420 async fn get_storage_stats(&self) -> Result<StorageStats, ContextError> {
421 let root_dir = self.root_dir.clone();
422 let (total_contexts, total_size_bytes) = tokio::task::spawn_blocking(move || {
423 let mut total_contexts: usize = 0;
424 let mut total_size_bytes: u64 = 0;
425
426 if root_dir.exists() {
427 let entries =
428 std::fs::read_dir(&root_dir).map_err(|e| ContextError::StorageError {
429 reason: format!("Failed to read root directory: {}", e),
430 })?;
431
432 for entry in entries {
433 let entry = entry.map_err(|e| ContextError::StorageError {
434 reason: format!("Failed to read entry: {}", e),
435 })?;
436
437 if entry.metadata().map(|m| m.is_dir()).unwrap_or(false) {
438 total_contexts += 1;
439 total_size_bytes += dir_size_sync(&entry.path())?;
440 }
441 }
442 }
443
444 Ok::<_, ContextError>((total_contexts, total_size_bytes))
445 })
446 .await
447 .map_err(|e| ContextError::StorageError {
448 reason: format!("Blocking task failed: {}", e),
449 })??;
450
451 Ok(StorageStats {
452 total_contexts,
453 total_size_bytes,
454 last_cleanup: SystemTime::now(),
455 storage_path: self.root_dir.clone(),
456 })
457 }
458
459 fn as_any(&self) -> &dyn std::any::Any {
460 self
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use std::collections::HashMap;
468
469 fn sample_context(agent_id: AgentId) -> AgentContext {
471 let now = SystemTime::now();
472
473 let factual_item = MemoryItem {
474 id: ContextId::new(),
475 content: "User prefers dark mode".to_string(),
476 memory_type: MemoryType::Factual,
477 importance: 0.8,
478 access_count: 1,
479 last_accessed: now,
480 created_at: now,
481 embedding: None,
482 metadata: HashMap::new(),
483 };
484
485 let procedural_item = MemoryItem {
486 id: ContextId::new(),
487 content: "Deploy via cargo shuttle deploy".to_string(),
488 memory_type: MemoryType::Procedural,
489 importance: 0.7,
490 access_count: 2,
491 last_accessed: now,
492 created_at: now,
493 embedding: None,
494 metadata: HashMap::new(),
495 };
496
497 let semantic_item = SemanticMemoryItem {
498 id: ContextId::new(),
499 concept: "User asks about metrics after deployments".to_string(),
500 relationships: vec![],
501 properties: HashMap::new(),
502 confidence: 0.6,
503 created_at: now,
504 updated_at: now,
505 };
506
507 let memory = HierarchicalMemory {
508 working_memory: Default::default(),
509 short_term: vec![],
510 long_term: vec![factual_item, procedural_item],
511 episodic_memory: vec![],
512 semantic_memory: vec![semantic_item],
513 };
514
515 AgentContext {
516 agent_id,
517 session_id: SessionId::new(),
518 memory,
519 knowledge_base: KnowledgeBase::default(),
520 conversation_history: vec![],
521 metadata: HashMap::new(),
522 created_at: now,
523 updated_at: now,
524 retention_policy: RetentionPolicy::default(),
525 }
526 }
527
528 #[tokio::test]
529 async fn test_save_and_load_roundtrip() {
530 let dir = tempfile::tempdir().unwrap();
531 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
532
533 let agent_id = AgentId::new();
534 let context = sample_context(agent_id);
535
536 store.save_context(agent_id, &context).await.unwrap();
537 let loaded = store.load_context(agent_id).await.unwrap().unwrap();
538
539 assert_eq!(loaded.agent_id, agent_id);
541 assert_eq!(loaded.memory.long_term.len(), 2);
542 assert_eq!(loaded.memory.semantic_memory.len(), 1);
543
544 let facts: Vec<&MemoryItem> = loaded
546 .memory
547 .long_term
548 .iter()
549 .filter(|i| i.memory_type == MemoryType::Factual)
550 .collect();
551 assert_eq!(facts.len(), 1);
552 assert_eq!(facts[0].content, "User prefers dark mode");
553
554 let procs: Vec<&MemoryItem> = loaded
556 .memory
557 .long_term
558 .iter()
559 .filter(|i| i.memory_type == MemoryType::Procedural)
560 .collect();
561 assert_eq!(procs.len(), 1);
562 assert_eq!(procs[0].content, "Deploy via cargo shuttle deploy");
563
564 assert_eq!(
566 loaded.memory.semantic_memory[0].concept,
567 "User asks about metrics after deployments"
568 );
569 }
570
571 #[tokio::test]
572 async fn test_load_missing_returns_none() {
573 let dir = tempfile::tempdir().unwrap();
574 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
575
576 let agent_id = AgentId::new();
577 let result = store.load_context(agent_id).await.unwrap();
578 assert!(result.is_none());
579 }
580
581 #[tokio::test]
582 async fn test_delete_context() {
583 let dir = tempfile::tempdir().unwrap();
584 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
585
586 let agent_id = AgentId::new();
587 let context = sample_context(agent_id);
588
589 store.save_context(agent_id, &context).await.unwrap();
590 assert!(store.context_exists(agent_id).await.unwrap());
591
592 store.delete_context(agent_id).await.unwrap();
593 assert!(!store.context_exists(agent_id).await.unwrap());
594 }
595
596 #[tokio::test]
597 async fn test_list_agent_contexts() {
598 let dir = tempfile::tempdir().unwrap();
599 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
600
601 let agent1 = AgentId::new();
602 let agent2 = AgentId::new();
603
604 store
605 .save_context(agent1, &sample_context(agent1))
606 .await
607 .unwrap();
608 store
609 .save_context(agent2, &sample_context(agent2))
610 .await
611 .unwrap();
612
613 let agents = store.list_agent_contexts().await.unwrap();
614 assert_eq!(agents.len(), 2);
615 }
616
617 #[tokio::test]
618 async fn test_daily_log_created() {
619 let dir = tempfile::tempdir().unwrap();
620 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
621
622 let agent_id = AgentId::new();
623 let context = sample_context(agent_id);
624
625 store.save_context(agent_id, &context).await.unwrap();
626
627 let logs_dir = store.logs_dir(agent_id);
628 assert!(logs_dir.exists());
629
630 let today = Utc::now().format("%Y-%m-%d").to_string();
631 let log_file = logs_dir.join(format!("{}.md", today));
632 assert!(log_file.exists());
633 }
634
635 #[tokio::test]
636 async fn test_storage_stats() {
637 let dir = tempfile::tempdir().unwrap();
638 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
639
640 let agent_id = AgentId::new();
641 let context = sample_context(agent_id);
642
643 store.save_context(agent_id, &context).await.unwrap();
644
645 let stats = store.get_storage_stats().await.unwrap();
646 assert_eq!(stats.total_contexts, 1);
647 assert!(stats.total_size_bytes > 0);
648 }
649
650 #[tokio::test]
651 async fn test_memory_to_markdown_format() {
652 let dir = tempfile::tempdir().unwrap();
653 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
654
655 let agent_id = AgentId::new();
656 let context = sample_context(agent_id);
657
658 let markdown = store.memory_to_markdown(agent_id, &context.memory);
659
660 assert!(markdown.contains(&format!("# Agent Memory: {}", agent_id)));
661 assert!(markdown.contains("## Facts"));
662 assert!(markdown.contains("- User prefers dark mode"));
663 assert!(markdown.contains("## Procedures"));
664 assert!(markdown.contains("- Deploy via cargo shuttle deploy"));
665 assert!(markdown.contains("## Learned Patterns"));
666 assert!(markdown.contains("- User asks about metrics after deployments"));
667 }
668
669 #[tokio::test]
670 async fn test_compact_removes_old_logs() {
671 let dir = tempfile::tempdir().unwrap();
672 let store = MarkdownMemoryStore::new(dir.path().to_path_buf(), Duration::from_secs(86400));
674
675 let agent_id = AgentId::new();
676 let context = sample_context(agent_id);
677 store.save_context(agent_id, &context).await.unwrap();
678
679 let logs_dir = store.logs_dir(agent_id);
681 let stale_log = logs_dir.join("2020-01-01.md");
682 std::fs::write(&stale_log, "# Old log\n").unwrap();
683
684 let old_time = filetime::FileTime::from_system_time(
685 SystemTime::now() - Duration::from_secs(86400 * 3),
686 );
687 filetime::set_file_mtime(&stale_log, old_time).unwrap();
688
689 assert!(stale_log.exists());
690
691 store.compact(agent_id).await.unwrap();
692
693 assert!(!stale_log.exists());
695
696 let today = Utc::now().format("%Y-%m-%d").to_string();
697 let today_log = logs_dir.join(format!("{}.md", today));
698 assert!(today_log.exists());
699 }
700}