1use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::fs;
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::RwLock;
13
14use super::types::*;
15#[cfg(feature = "vector-db")]
16use super::vector_db::QdrantClientWrapper;
17use super::vector_db::{
18 EmbeddingService, MockEmbeddingService, NoOpVectorDatabase, QdrantConfig, VectorDatabase,
19};
20use crate::integrations::policy_engine::{MockPolicyEngine, PolicyEngine};
21use crate::secrets::{SecretStore, SecretsConfig};
22use crate::types::AgentId;
23
24#[async_trait]
26pub trait ContextManager: Send + Sync {
27 async fn store_context(
29 &self,
30 agent_id: AgentId,
31 context: AgentContext,
32 ) -> Result<ContextId, ContextError>;
33
34 async fn retrieve_context(
36 &self,
37 agent_id: AgentId,
38 session_id: Option<SessionId>,
39 ) -> Result<Option<AgentContext>, ContextError>;
40
41 async fn query_context(
43 &self,
44 agent_id: AgentId,
45 query: ContextQuery,
46 ) -> Result<Vec<ContextItem>, ContextError>;
47
48 async fn update_memory(
50 &self,
51 agent_id: AgentId,
52 memory_updates: Vec<MemoryUpdate>,
53 ) -> Result<(), ContextError>;
54
55 async fn add_knowledge(
57 &self,
58 agent_id: AgentId,
59 knowledge: Knowledge,
60 ) -> Result<KnowledgeId, ContextError>;
61
62 async fn search_knowledge(
64 &self,
65 agent_id: AgentId,
66 query: &str,
67 limit: usize,
68 ) -> Result<Vec<KnowledgeItem>, ContextError>;
69
70 async fn share_knowledge(
72 &self,
73 from_agent: AgentId,
74 to_agent: AgentId,
75 knowledge_id: KnowledgeId,
76 access_level: AccessLevel,
77 ) -> Result<(), ContextError>;
78
79 async fn get_shared_knowledge(
81 &self,
82 agent_id: AgentId,
83 ) -> Result<Vec<SharedKnowledgeRef>, ContextError>;
84
85 async fn archive_context(
87 &self,
88 agent_id: AgentId,
89 before: SystemTime,
90 ) -> Result<u32, ContextError>;
91
92 async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError>;
94
95 async fn shutdown(&self) -> Result<(), ContextError>;
97}
98
99pub struct StandardContextManager {
101 contexts: Arc<RwLock<HashMap<AgentId, AgentContext>>>,
103 config: ContextManagerConfig,
105 shared_knowledge: Arc<RwLock<HashMap<KnowledgeId, SharedKnowledgeItem>>>,
107 vector_db: Arc<dyn VectorDatabase>,
109 embedding_service: Arc<MockEmbeddingService>,
111 persistence: Arc<dyn ContextPersistence>,
113 secrets: Box<dyn SecretStore + Send + Sync>,
115 #[allow(dead_code)]
117 policy_engine: Arc<dyn PolicyEngine>,
118 shutdown_flag: Arc<RwLock<bool>>,
120 background_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
122}
123
124#[derive(Debug, Clone)]
126pub struct ContextManagerConfig {
127 pub max_contexts_in_memory: usize,
129 pub default_retention_policy: RetentionPolicy,
131 pub enable_auto_archiving: bool,
133 pub archiving_interval: std::time::Duration,
135 pub max_memory_items_per_agent: usize,
137 pub max_knowledge_items_per_agent: usize,
139 pub qdrant_config: QdrantConfig,
141 pub enable_vector_db: bool,
143 pub persistence_config: FilePersistenceConfig,
145 pub enable_persistence: bool,
147 pub secrets_config: SecretsConfig,
149}
150
151impl Default for ContextManagerConfig {
152 fn default() -> Self {
153 use std::path::PathBuf;
154
155 Self {
156 max_contexts_in_memory: 1000,
157 default_retention_policy: RetentionPolicy::default(),
158 enable_auto_archiving: true,
159 archiving_interval: std::time::Duration::from_secs(3600), max_memory_items_per_agent: 10000,
161 max_knowledge_items_per_agent: 5000,
162 qdrant_config: QdrantConfig::default(),
163 enable_vector_db: false,
164 persistence_config: FilePersistenceConfig::default(),
165 enable_persistence: true,
166 secrets_config: SecretsConfig::file_json(PathBuf::from("secrets.json")),
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173struct ImportanceWeights {
174 pub base_importance: f32,
176 pub access_frequency: f32,
178 pub recency: f32,
180 pub user_feedback: f32,
182 pub no_access_penalty: f32,
184}
185
186impl Default for ImportanceWeights {
187 fn default() -> Self {
188 Self {
189 base_importance: 0.3,
190 access_frequency: 0.25,
191 recency: 0.3,
192 user_feedback: 0.15,
193 no_access_penalty: 0.1,
194 }
195 }
196}
197
198#[derive(Debug, Clone)]
200struct SharedKnowledgeItem {
201 knowledge: Knowledge,
202 source_agent: AgentId,
203 access_level: AccessLevel,
204 created_at: SystemTime,
205 access_count: u32,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210struct ArchivedContext {
211 agent_id: AgentId,
212 archived_at: SystemTime,
213 archive_reason: String,
214 memory: HierarchicalMemory,
215 conversation_history: Vec<ConversationItem>,
216 knowledge_base: KnowledgeBase,
217 metadata: HashMap<String, String>,
218}
219
220impl ArchivedContext {
221 fn new(agent_id: AgentId, before: SystemTime) -> Self {
222 Self {
223 agent_id,
224 archived_at: SystemTime::now(),
225 archive_reason: format!("Archiving items before {:?}", before),
226 memory: HierarchicalMemory::default(),
227 conversation_history: Vec::new(),
228 knowledge_base: KnowledgeBase::default(),
229 metadata: HashMap::new(),
230 }
231 }
232}
233
234pub struct FilePersistence {
236 config: FilePersistenceConfig,
237}
238
239impl FilePersistence {
240 pub fn new(config: FilePersistenceConfig) -> Self {
242 Self { config }
243 }
244
245 pub async fn initialize(&self) -> Result<(), ContextError> {
247 self.config
248 .ensure_directories()
249 .await
250 .map_err(|e| ContextError::StorageError {
251 reason: format!("Failed to create storage directories: {}", e),
252 })?;
253 Ok(())
254 }
255
256 fn get_context_path(&self, agent_id: AgentId) -> PathBuf {
258 let filename = if self.config.enable_compression {
259 format!("{}.json.gz", agent_id)
260 } else {
261 format!("{}.json", agent_id)
262 };
263 self.config.agent_contexts_path().join(filename)
264 }
265
266 async fn serialize_context(&self, context: &AgentContext) -> Result<Vec<u8>, ContextError> {
268 let json_data =
269 serde_json::to_vec_pretty(context).map_err(|e| ContextError::SerializationError {
270 reason: format!("Failed to serialize context: {}", e),
271 })?;
272
273 if self.config.enable_compression {
274 use flate2::write::GzEncoder;
275 use flate2::Compression;
276 use std::io::Write;
277
278 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
279 encoder
280 .write_all(&json_data)
281 .map_err(|e| ContextError::SerializationError {
282 reason: format!("Failed to compress context: {}", e),
283 })?;
284 encoder
285 .finish()
286 .map_err(|e| ContextError::SerializationError {
287 reason: format!("Failed to finalize compression: {}", e),
288 })
289 } else {
290 Ok(json_data)
291 }
292 }
293
294 async fn deserialize_context(&self, data: Vec<u8>) -> Result<AgentContext, ContextError> {
296 let json_data = if self.config.enable_compression {
297 use flate2::read::GzDecoder;
298 use std::io::Read;
299
300 let mut decoder = GzDecoder::new(&data[..]);
301 let mut decompressed = Vec::new();
302 decoder.read_to_end(&mut decompressed).map_err(|e| {
303 ContextError::SerializationError {
304 reason: format!("Failed to decompress context: {}", e),
305 }
306 })?;
307 decompressed
308 } else {
309 data
310 };
311
312 serde_json::from_slice(&json_data).map_err(|e| ContextError::SerializationError {
313 reason: format!("Failed to deserialize context: {}", e),
314 })
315 }
316
317 async fn create_backup(&self, agent_id: AgentId) -> Result<(), ContextError> {
319 let context_path = self.get_context_path(agent_id);
320 if !context_path.exists() {
321 return Ok(());
322 }
323
324 let backup_path = context_path.with_extension(format!(
325 "backup.{}.json",
326 SystemTime::now()
327 .duration_since(SystemTime::UNIX_EPOCH)
328 .unwrap_or_default()
329 .as_secs()
330 ));
331
332 fs::copy(&context_path, &backup_path)
333 .await
334 .map_err(|e| ContextError::StorageError {
335 reason: format!("Failed to create backup: {}", e),
336 })?;
337
338 self.cleanup_old_backups(agent_id).await?;
340 Ok(())
341 }
342
343 async fn cleanup_old_backups(&self, agent_id: AgentId) -> Result<(), ContextError> {
345 let mut backup_files = Vec::new();
346 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
347 .await
348 .map_err(|e| ContextError::StorageError {
349 reason: format!("Failed to read storage directory: {}", e),
350 })?;
351
352 while let Some(entry) = dir
353 .next_entry()
354 .await
355 .map_err(|e| ContextError::StorageError {
356 reason: format!("Failed to read directory entry: {}", e),
357 })?
358 {
359 let path = entry.path();
360 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
361 if filename.starts_with(&format!("{}.backup.", agent_id)) {
362 if let Ok(metadata) = entry.metadata().await {
363 if let Ok(modified) = metadata.modified() {
364 backup_files.push((path, modified));
365 }
366 }
367 }
368 }
369 }
370
371 backup_files.sort_by(|a, b| b.1.cmp(&a.1));
373
374 for (path, _) in backup_files.into_iter().skip(self.config.backup_count) {
376 if let Err(e) = fs::remove_file(&path).await {
377 eprintln!(
378 "Warning: Failed to remove old backup {}: {}",
379 path.display(),
380 e
381 );
382 }
383 }
384
385 Ok(())
386 }
387}
388
389#[async_trait]
390impl ContextPersistence for FilePersistence {
391 async fn save_context(
392 &self,
393 agent_id: AgentId,
394 context: &AgentContext,
395 ) -> Result<(), ContextError> {
396 self.create_backup(agent_id).await?;
398
399 let data = self.serialize_context(context).await?;
401
402 let context_path = self.get_context_path(agent_id);
404 let mut file =
405 fs::File::create(&context_path)
406 .await
407 .map_err(|e| ContextError::StorageError {
408 reason: format!("Failed to create context file: {}", e),
409 })?;
410
411 file.write_all(&data)
412 .await
413 .map_err(|e| ContextError::StorageError {
414 reason: format!("Failed to write context data: {}", e),
415 })?;
416
417 file.sync_all()
418 .await
419 .map_err(|e| ContextError::StorageError {
420 reason: format!("Failed to sync context file: {}", e),
421 })?;
422
423 Ok(())
424 }
425
426 async fn load_context(&self, agent_id: AgentId) -> Result<Option<AgentContext>, ContextError> {
427 let context_path = self.get_context_path(agent_id);
428
429 if !context_path.exists() {
430 return Ok(None);
431 }
432
433 let mut file =
434 fs::File::open(&context_path)
435 .await
436 .map_err(|e| ContextError::StorageError {
437 reason: format!("Failed to open context file: {}", e),
438 })?;
439
440 let mut data = Vec::new();
441 file.read_to_end(&mut data)
442 .await
443 .map_err(|e| ContextError::StorageError {
444 reason: format!("Failed to read context file: {}", e),
445 })?;
446
447 let context = self.deserialize_context(data).await?;
448 Ok(Some(context))
449 }
450
451 async fn delete_context(&self, agent_id: AgentId) -> Result<(), ContextError> {
452 let context_path = self.get_context_path(agent_id);
453
454 if context_path.exists() {
455 fs::remove_file(&context_path)
456 .await
457 .map_err(|e| ContextError::StorageError {
458 reason: format!("Failed to delete context file: {}", e),
459 })?;
460 }
461
462 Ok(())
463 }
464
465 async fn list_agent_contexts(&self) -> Result<Vec<AgentId>, ContextError> {
466 let mut agent_ids = Vec::new();
467 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
468 .await
469 .map_err(|e| ContextError::StorageError {
470 reason: format!("Failed to read storage directory: {}", e),
471 })?;
472
473 while let Some(entry) = dir
474 .next_entry()
475 .await
476 .map_err(|e| ContextError::StorageError {
477 reason: format!("Failed to read directory entry: {}", e),
478 })?
479 {
480 let path = entry.path();
481 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
482 if filename.ends_with(".json") || filename.ends_with(".json.gz") {
483 let agent_id_str = filename
484 .strip_suffix(".json.gz")
485 .or_else(|| filename.strip_suffix(".json"))
486 .unwrap_or(filename);
487
488 if let Ok(uuid) = uuid::Uuid::parse_str(agent_id_str) {
489 agent_ids.push(AgentId(uuid));
490 }
491 }
492 }
493 }
494
495 Ok(agent_ids)
496 }
497
498 async fn context_exists(&self, agent_id: AgentId) -> Result<bool, ContextError> {
499 let context_path = self.get_context_path(agent_id);
500 Ok(context_path.exists())
501 }
502
503 async fn get_storage_stats(&self) -> Result<StorageStats, ContextError> {
504 let mut total_contexts = 0;
505 let mut total_size_bytes = 0;
506
507 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
508 .await
509 .map_err(|e| ContextError::StorageError {
510 reason: format!("Failed to read storage directory: {}", e),
511 })?;
512
513 while let Some(entry) = dir
514 .next_entry()
515 .await
516 .map_err(|e| ContextError::StorageError {
517 reason: format!("Failed to read directory entry: {}", e),
518 })?
519 {
520 let path = entry.path();
521 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
522 if filename.ends_with(".json") || filename.ends_with(".json.gz") {
523 total_contexts += 1;
524 if let Ok(metadata) = entry.metadata().await {
525 total_size_bytes += metadata.len();
526 }
527 }
528 }
529 }
530
531 Ok(StorageStats {
532 total_contexts,
533 total_size_bytes,
534 last_cleanup: SystemTime::now(),
535 storage_path: self.config.agent_contexts_path(),
536 })
537 }
538
539 fn as_any(&self) -> &dyn std::any::Any {
540 self
541 }
542}
543
544impl StandardContextManager {
545 pub async fn new(config: ContextManagerConfig, agent_id: &str) -> Result<Self, ContextError> {
547 let vector_db: Arc<dyn VectorDatabase> = {
548 #[cfg(feature = "vector-db")]
549 {
550 if config.enable_vector_db {
551 Arc::new(QdrantClientWrapper::new(config.qdrant_config.clone()))
552 } else {
553 Arc::new(NoOpVectorDatabase)
554 }
555 }
556 #[cfg(not(feature = "vector-db"))]
557 {
558 let _ = &config.enable_vector_db;
559 Arc::new(NoOpVectorDatabase)
560 }
561 };
562
563 let embedding_service = Arc::new(MockEmbeddingService::new(
564 config.qdrant_config.vector_dimension,
565 ));
566
567 let persistence: Arc<dyn ContextPersistence> = if config.enable_persistence {
568 Arc::new(FilePersistence::new(config.persistence_config.clone()))
569 } else {
570 Arc::new(FilePersistence::new(config.persistence_config.clone()))
572 };
573
574 let secrets = crate::secrets::new_secret_store(&config.secrets_config, agent_id)
576 .await
577 .map_err(|e| ContextError::StorageError {
578 reason: format!("Failed to initialize secrets store: {}", e),
579 })?;
580
581 let policy_engine: Arc<dyn PolicyEngine> = Arc::new(MockPolicyEngine::new());
583
584 Ok(Self {
585 contexts: Arc::new(RwLock::new(HashMap::new())),
586 config,
587 shared_knowledge: Arc::new(RwLock::new(HashMap::new())),
588 vector_db,
589 embedding_service,
590 persistence,
591 secrets,
592 policy_engine,
593 shutdown_flag: Arc::new(RwLock::new(false)),
594 background_tasks: Arc::new(RwLock::new(Vec::new())),
595 })
596 }
597
598 pub fn secrets(&self) -> &(dyn SecretStore + Send + Sync) {
600 self.secrets.as_ref()
601 }
602
603 pub async fn initialize(&self) -> Result<(), ContextError> {
605 if self.config.enable_vector_db {
607 self.vector_db.initialize().await?;
608 }
609
610 if self.config.enable_persistence {
612 if let Some(file_persistence) =
613 self.persistence.as_any().downcast_ref::<FilePersistence>()
614 {
615 file_persistence.initialize().await?;
616 }
617
618 self.load_existing_contexts().await?;
620 }
621
622 self.setup_retention_scheduler().await?;
624
625 Ok(())
626 }
627
628 async fn load_existing_contexts(&self) -> Result<(), ContextError> {
630 let agent_ids = self.persistence.list_agent_contexts().await?;
631 let mut contexts = self.contexts.write().await;
632
633 for agent_id in agent_ids {
634 if let Some(context) = self.persistence.load_context(agent_id).await? {
635 contexts.insert(agent_id, context);
636 }
637 }
638
639 Ok(())
640 }
641
642 pub async fn shutdown(&self) -> Result<(), ContextError> {
644 {
646 let shutdown_flag = self.shutdown_flag.read().await;
647 if *shutdown_flag {
648 tracing::info!("ContextManager already shutdown, skipping");
649 return Ok(());
650 }
651 }
652
653 tracing::info!("Starting ContextManager shutdown sequence");
654
655 {
657 let mut shutdown_flag = self.shutdown_flag.write().await;
658 *shutdown_flag = true;
659 }
660
661 self.stop_background_tasks().await?;
663
664 self.save_all_contexts().await?;
666
667 tracing::info!("Vector database connections will be closed when client is dropped");
671
672 tracing::info!("Secrets store cleanup handled by Drop trait");
675
676 tracing::info!("ContextManager shutdown completed successfully");
677 Ok(())
678 }
679
680 async fn stop_background_tasks(&self) -> Result<(), ContextError> {
682 let mut tasks = self.background_tasks.write().await;
683
684 if tasks.is_empty() {
685 tracing::debug!("No background tasks to stop");
686 return Ok(());
687 }
688
689 tracing::info!("Stopping {} background tasks", tasks.len());
690
691 for task in tasks.drain(..) {
693 task.abort();
694
695 match tokio::time::timeout(std::time::Duration::from_secs(5), task).await {
697 Ok(result) => match result {
698 Ok(_) => tracing::debug!("Background task completed successfully"),
699 Err(e) if e.is_cancelled() => tracing::debug!("Background task was cancelled"),
700 Err(e) => tracing::warn!("Background task finished with error: {}", e),
701 },
702 Err(_) => tracing::warn!("Background task did not finish within timeout"),
703 }
704 }
705
706 tracing::info!("All background tasks stopped");
707 Ok(())
708 }
709
710 async fn save_all_contexts(&self) -> Result<(), ContextError> {
712 if !self.config.enable_persistence {
713 tracing::debug!("Persistence disabled, skipping context save");
714 return Ok(());
715 }
716
717 let contexts = self.contexts.read().await;
718
719 if contexts.is_empty() {
720 tracing::debug!("No contexts to save");
721 return Ok(());
722 }
723
724 tracing::info!("Saving {} contexts to persistent storage", contexts.len());
725
726 let mut save_errors = Vec::new();
727
728 for (agent_id, context) in contexts.iter() {
729 match self.persistence.save_context(*agent_id, context).await {
730 Ok(_) => tracing::debug!("Saved context for agent {}", agent_id),
731 Err(e) => {
732 tracing::error!("Failed to save context for agent {}: {}", agent_id, e);
733 save_errors.push((*agent_id, e));
734 }
735 }
736 }
737
738 if !save_errors.is_empty() {
739 let error_msg = format!(
740 "Failed to save {} out of {} contexts during shutdown",
741 save_errors.len(),
742 contexts.len()
743 );
744 tracing::error!("{}", error_msg);
745
746 if let Some((agent_id, error)) = save_errors.into_iter().next() {
748 return Err(ContextError::StorageError {
749 reason: format!("Failed to save context for agent {}: {}", agent_id, error),
750 });
751 }
752 }
753
754 tracing::info!("All contexts saved successfully");
755 Ok(())
756 }
757
758 async fn setup_retention_scheduler(&self) -> Result<(), ContextError> {
760 if !self.config.enable_auto_archiving {
761 tracing::debug!("Auto-archiving disabled, skipping retention scheduler setup");
762 return Ok(());
763 }
764
765 let contexts = self.contexts.clone();
766 let persistence = self.persistence.clone();
767 let config = self.config.clone();
768 let shutdown_flag = self.shutdown_flag.clone();
769
770 let task = tokio::spawn(async move {
771 let mut interval = tokio::time::interval(config.archiving_interval);
772
773 tracing::info!(
774 "Retention policy scheduler started with interval {:?}",
775 config.archiving_interval
776 );
777
778 loop {
779 tokio::select! {
780 _ = interval.tick() => {
781 if *shutdown_flag.read().await {
783 tracing::info!("Retention scheduler shutting down");
784 break;
785 }
786
787 Self::run_retention_check(&contexts, &persistence, &config).await;
789 }
790 }
791 }
792 });
793
794 self.add_background_task(task).await;
795 tracing::info!("Retention policy scheduler initialized successfully");
796 Ok(())
797 }
798
799 async fn run_retention_check(
801 contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
802 persistence: &Arc<dyn ContextPersistence>,
803 config: &ContextManagerConfig,
804 ) {
805 let current_time = SystemTime::now();
806
807 let agents_to_check: Vec<(AgentId, usize)> = {
809 let context_guard = contexts.read().await;
810 let agent_count = context_guard.len();
811
812 if agent_count == 0 {
813 tracing::debug!("No agent contexts to process for retention");
814 return;
815 }
816
817 tracing::info!(
818 "Starting retention check for {} agent contexts",
819 agent_count
820 );
821
822 context_guard
823 .iter()
824 .map(|(agent_id, context)| {
825 let retention_stats = Self::calculate_retention_statistics_static(context);
826 (*agent_id, retention_stats.items_to_archive)
827 })
828 .collect()
829 };
830
831 let start_time = std::time::Instant::now();
832 let total_agents = agents_to_check.len();
833
834 for (agent_id, items_to_archive) in agents_to_check {
836 if items_to_archive > 0 {
837 tracing::debug!(
838 "Agent {} has {} items eligible for archiving",
839 agent_id,
840 items_to_archive
841 );
842
843 let archive_result = Self::archive_agent_context_static(
845 agent_id,
846 current_time,
847 contexts,
848 persistence,
849 config,
850 )
851 .await;
852
853 match archive_result {
854 Ok(archived_count) => {
855 tracing::info!(
856 "Successfully archived {} items for agent {}",
857 archived_count,
858 agent_id
859 );
860 }
861 Err(e) => {
862 tracing::error!("Failed to archive context for agent {}: {}", agent_id, e);
863 }
864 }
865 }
866 }
867
868 let elapsed = start_time.elapsed();
869 tracing::info!(
870 "Retention check completed for {} agents in {:?}",
871 total_agents,
872 elapsed
873 );
874 }
875
876 fn calculate_retention_statistics_static(context: &AgentContext) -> RetentionStatus {
878 let now = SystemTime::now();
879 let retention_policy = &context.retention_policy;
880
881 let mut items_to_archive = 0;
882 let items_to_delete = 0;
883
884 let memory_cutoff = now
886 .checked_sub(retention_policy.memory_retention)
887 .unwrap_or(SystemTime::UNIX_EPOCH);
888
889 let knowledge_cutoff = now
890 .checked_sub(retention_policy.knowledge_retention)
891 .unwrap_or(SystemTime::UNIX_EPOCH);
892
893 let conversation_cutoff = now
894 .checked_sub(retention_policy.session_retention)
895 .unwrap_or(SystemTime::UNIX_EPOCH);
896
897 for item in &context.memory.short_term {
899 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
900 items_to_archive += 1;
901 }
902 }
903
904 for item in &context.memory.long_term {
905 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
906 items_to_archive += 1;
907 }
908 }
909
910 for item in &context.conversation_history {
912 if item.timestamp < conversation_cutoff {
913 items_to_archive += 1;
914 }
915 }
916
917 for fact in &context.knowledge_base.facts {
919 if fact.created_at < knowledge_cutoff {
920 items_to_archive += 1;
921 }
922 }
923
924 let next_cleanup = now + Duration::from_secs(86400); RetentionStatus {
928 items_to_archive,
929 items_to_delete,
930 next_cleanup,
931 }
932 }
933
934 async fn archive_agent_context_static(
936 agent_id: AgentId,
937 before: SystemTime,
938 contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
939 persistence: &Arc<dyn ContextPersistence>,
940 config: &ContextManagerConfig,
941 ) -> Result<u32, ContextError> {
942 let mut total_archived = 0u32;
943 let mut archived_context = ArchivedContext::new(agent_id, before);
944
945 let mut contexts_guard = contexts.write().await;
947 if let Some(context) = contexts_guard.get_mut(&agent_id) {
948 let retention_policy = &context.retention_policy;
950
951 let memory_cutoff = before
953 .checked_sub(retention_policy.memory_retention)
954 .unwrap_or(SystemTime::UNIX_EPOCH);
955
956 let conversation_cutoff = before
957 .checked_sub(retention_policy.session_retention)
958 .unwrap_or(SystemTime::UNIX_EPOCH);
959
960 let knowledge_cutoff = before
961 .checked_sub(retention_policy.knowledge_retention)
962 .unwrap_or(SystemTime::UNIX_EPOCH);
963
964 let mut retained_short_term = Vec::new();
966 for item in context.memory.short_term.drain(..) {
967 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
968 archived_context.memory.short_term.push(item);
969 total_archived += 1;
970 } else {
971 retained_short_term.push(item);
972 }
973 }
974 context.memory.short_term = retained_short_term;
975
976 let mut retained_long_term = Vec::new();
978 for item in context.memory.long_term.drain(..) {
979 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
980 archived_context.memory.long_term.push(item);
981 total_archived += 1;
982 } else {
983 retained_long_term.push(item);
984 }
985 }
986 context.memory.long_term = retained_long_term;
987
988 let mut retained_conversations = Vec::new();
990 for item in context.conversation_history.drain(..) {
991 if item.timestamp < conversation_cutoff {
992 archived_context.conversation_history.push(item);
993 total_archived += 1;
994 } else {
995 retained_conversations.push(item);
996 }
997 }
998 context.conversation_history = retained_conversations;
999
1000 let mut retained_facts = Vec::new();
1002 for fact in context.knowledge_base.facts.drain(..) {
1003 if fact.created_at < knowledge_cutoff {
1004 archived_context.knowledge_base.facts.push(fact);
1005 total_archived += 1;
1006 } else {
1007 retained_facts.push(fact);
1008 }
1009 }
1010 context.knowledge_base.facts = retained_facts;
1011
1012 if total_archived > 0 {
1014 context.updated_at = SystemTime::now();
1015 context.metadata.insert(
1016 "last_archived".to_string(),
1017 SystemTime::now()
1018 .duration_since(SystemTime::UNIX_EPOCH)
1019 .unwrap_or_default()
1020 .as_secs()
1021 .to_string(),
1022 );
1023 context
1024 .metadata
1025 .insert("archived_count".to_string(), total_archived.to_string());
1026
1027 if config.enable_persistence {
1029 Self::save_archived_context_static(
1030 agent_id,
1031 &archived_context,
1032 persistence,
1033 config,
1034 )
1035 .await?;
1036 persistence.save_context(agent_id, context).await?;
1038 }
1039 }
1040 }
1041
1042 Ok(total_archived)
1043 }
1044
1045 async fn save_archived_context_static(
1047 agent_id: AgentId,
1048 archived_context: &ArchivedContext,
1049 _persistence: &Arc<dyn ContextPersistence>,
1050 config: &ContextManagerConfig,
1051 ) -> Result<(), ContextError> {
1052 let archive_dir = config
1053 .persistence_config
1054 .agent_contexts_path()
1055 .join("archives")
1056 .join(agent_id.to_string());
1057
1058 tokio::fs::create_dir_all(&archive_dir)
1060 .await
1061 .map_err(|e| ContextError::StorageError {
1062 reason: format!("Failed to create archive directory: {}", e),
1063 })?;
1064
1065 let timestamp = archived_context
1067 .archived_at
1068 .duration_since(SystemTime::UNIX_EPOCH)
1069 .unwrap_or_default()
1070 .as_secs();
1071 let archive_filename = format!("archive_{}.json", timestamp);
1072 let archive_path = archive_dir.join(archive_filename);
1073
1074 let archive_data = serde_json::to_vec_pretty(archived_context).map_err(|e| {
1076 ContextError::SerializationError {
1077 reason: format!("Failed to serialize archived context: {}", e),
1078 }
1079 })?;
1080
1081 tokio::fs::write(&archive_path, &archive_data)
1083 .await
1084 .map_err(|e| ContextError::StorageError {
1085 reason: format!("Failed to write archive file: {}", e),
1086 })?;
1087
1088 tracing::debug!(
1089 "Saved archived context for agent {} to {}",
1090 agent_id,
1091 archive_path.display()
1092 );
1093
1094 Ok(())
1095 }
1096
1097 pub async fn add_background_task(&self, task: tokio::task::JoinHandle<()>) {
1099 let mut tasks = self.background_tasks.write().await;
1100 tasks.push(task);
1101 }
1102
1103 pub async fn is_shutdown(&self) -> bool {
1105 let shutdown_flag = self.shutdown_flag.read().await;
1106 *shutdown_flag
1107 }
1108
1109 pub async fn create_session(&self, agent_id: AgentId) -> Result<SessionId, ContextError> {
1111 let session_id = SessionId::new();
1112
1113 let context = AgentContext {
1115 agent_id,
1116 session_id,
1117 memory: HierarchicalMemory::default(),
1118 knowledge_base: KnowledgeBase::default(),
1119 conversation_history: Vec::new(),
1120 metadata: HashMap::new(),
1121 created_at: SystemTime::now(),
1122 updated_at: SystemTime::now(),
1123 retention_policy: self.config.default_retention_policy.clone(),
1124 };
1125
1126 ContextManager::store_context(self, agent_id, context).await?;
1127 Ok(session_id)
1128 }
1129
1130 async fn validate_access(
1132 &self,
1133 agent_id: AgentId,
1134 operation: &str,
1135 ) -> Result<(), ContextError> {
1136 let is_dangerous_operation = matches!(operation, "archive_context");
1142
1143 if is_dangerous_operation {
1144 tracing::warn!(
1145 "Potentially dangerous operation {} denied for agent {} by default policy",
1146 operation,
1147 agent_id
1148 );
1149 Err(ContextError::AccessDenied {
1150 reason: format!("Operation {} requires explicit approval", operation),
1151 })
1152 } else {
1153 tracing::debug!("Policy engine allowed {} for agent {}", operation, agent_id);
1154 Ok(())
1155 }
1156 }
1157
1158 async fn generate_embeddings(&self, content: &str) -> Result<Vec<f32>, ContextError> {
1160 self.embedding_service.generate_embedding(content).await
1161 }
1162
1163 async fn semantic_search_memory(
1165 &self,
1166 agent_id: AgentId,
1167 query: &str,
1168 limit: usize,
1169 ) -> Result<Vec<ContextItem>, ContextError> {
1170 if self.config.enable_vector_db {
1171 let query_embedding = self.generate_embeddings(query).await?;
1173
1174 let threshold = 0.7; self.vector_db
1177 .semantic_search(agent_id, query_embedding, limit, threshold)
1178 .await
1179 } else {
1180 let contexts = self.contexts.read().await;
1182 if let Some(context) = contexts.get(&agent_id) {
1183 let mut results = Vec::new();
1184
1185 for memory_item in &context.memory.short_term {
1186 if memory_item
1187 .content
1188 .to_lowercase()
1189 .contains(&query.to_lowercase())
1190 {
1191 let importance_score = self.calculate_importance(memory_item);
1193 let relevance_score = (importance_score + 0.8) / 2.0; results.push(ContextItem {
1196 id: memory_item.id,
1197 content: memory_item.content.clone(),
1198 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1199 relevance_score,
1200 timestamp: memory_item.created_at,
1201 metadata: memory_item.metadata.clone(),
1202 });
1203 }
1204 }
1205
1206 results.truncate(limit);
1207 Ok(results)
1208 } else {
1209 Ok(Vec::new())
1210 }
1211 }
1212 }
1213
1214 fn calculate_importance(&self, memory_item: &MemoryItem) -> f32 {
1226 let weights = ImportanceWeights::default();
1228
1229 let base_score = memory_item.importance.clamp(0.0, 1.0);
1231
1232 let access_score = if memory_item.access_count == 0 {
1234 weights.no_access_penalty
1235 } else {
1236 let log_access = (memory_item.access_count as f32 + 1.0).ln();
1237 (log_access / 10.0).min(1.0) };
1239
1240 let recency_score = self.calculate_recency_factor(memory_item);
1242
1243 let feedback_score = self.extract_user_feedback_score(memory_item);
1245
1246 let type_multiplier = self.get_memory_type_multiplier(&memory_item.memory_type);
1248
1249 let age_decay = self.calculate_age_decay(memory_item);
1251
1252 let combined_score = (base_score * weights.base_importance
1254 + access_score * weights.access_frequency
1255 + recency_score * weights.recency
1256 + feedback_score * weights.user_feedback)
1257 * type_multiplier
1258 * age_decay;
1259
1260 combined_score.clamp(0.0, 1.0)
1262 }
1263
1264 fn calculate_recency_factor(&self, memory_item: &MemoryItem) -> f32 {
1266 let now = SystemTime::now();
1267
1268 let most_recent = memory_item.last_accessed.max(memory_item.created_at);
1270
1271 let time_since_access = now
1272 .duration_since(most_recent)
1273 .unwrap_or_else(|_| std::time::Duration::from_secs(0));
1274
1275 let hours_since_access = time_since_access.as_secs() as f32 / 3600.0;
1276
1277 let half_life_hours = 24.0; let decay_factor = 2.0_f32.powf(-hours_since_access / half_life_hours);
1280
1281 decay_factor.max(0.01)
1283 }
1284
1285 fn extract_user_feedback_score(&self, memory_item: &MemoryItem) -> f32 {
1287 let mut feedback_score = 0.5; if let Some(rating_str) = memory_item.metadata.get("user_rating") {
1291 if let Ok(rating) = rating_str.parse::<f32>() {
1292 feedback_score = (rating / 5.0).clamp(0.0, 1.0); }
1294 }
1295
1296 if let Some(helpful_str) = memory_item.metadata.get("helpful") {
1298 match helpful_str.to_lowercase().as_str() {
1299 "true" | "yes" | "1" => feedback_score = feedback_score.max(0.8),
1300 "false" | "no" | "0" => feedback_score = feedback_score.min(0.2),
1301 _ => {}
1302 }
1303 }
1304
1305 if memory_item.metadata.contains_key("corrected")
1307 || memory_item.metadata.contains_key("incorrect")
1308 {
1309 feedback_score = feedback_score.min(0.3);
1310 }
1311
1312 if memory_item.metadata.contains_key("bookmarked")
1314 || memory_item.metadata.contains_key("favorite")
1315 {
1316 feedback_score = feedback_score.max(0.9);
1317 }
1318
1319 if let Some(context) = memory_item.metadata.get("usage_context") {
1321 match context.to_lowercase().as_str() {
1322 "critical" | "important" => feedback_score = feedback_score.max(0.95),
1323 "routine" | "common" => feedback_score = feedback_score.max(0.7),
1324 "experimental" | "trial" => feedback_score = feedback_score.min(0.4),
1325 _ => {}
1326 }
1327 }
1328
1329 feedback_score.clamp(0.0, 1.0)
1330 }
1331
1332 fn get_memory_type_multiplier(&self, memory_type: &MemoryType) -> f32 {
1334 match memory_type {
1335 MemoryType::Factual => 1.0, MemoryType::Procedural => 1.2, MemoryType::Episodic => 0.9, MemoryType::Semantic => 1.1, MemoryType::Working => 1.3, }
1341 }
1342
1343 fn calculate_age_decay(&self, memory_item: &MemoryItem) -> f32 {
1345 let now = SystemTime::now();
1346 let age = now
1347 .duration_since(memory_item.created_at)
1348 .unwrap_or_else(|_| std::time::Duration::from_secs(0));
1349
1350 let days_old = age.as_secs() as f32 / 86400.0; let decay_rate = match memory_item.memory_type {
1354 MemoryType::Working => 0.1, MemoryType::Factual => 0.01, MemoryType::Procedural => 0.005, MemoryType::Episodic => 0.02, MemoryType::Semantic => 0.008, };
1360
1361 let decay_factor = (-decay_rate * days_old).exp();
1363
1364 decay_factor.max(0.05)
1366 }
1367
1368 async fn keyword_search_memory(
1370 &self,
1371 agent_id: AgentId,
1372 query: &ContextQuery,
1373 ) -> Result<Vec<ContextItem>, ContextError> {
1374 let contexts = self.contexts.read().await;
1375 if let Some(context) = contexts.get(&agent_id) {
1376 let mut results = Vec::new();
1377
1378 let search_terms: Vec<String> = query
1380 .search_terms
1381 .iter()
1382 .map(|term| term.to_lowercase())
1383 .collect();
1384
1385 if search_terms.is_empty() {
1386 return Ok(results);
1387 }
1388
1389 for memory_item in context
1391 .memory
1392 .short_term
1393 .iter()
1394 .chain(context.memory.long_term.iter())
1395 {
1396 if !query.memory_types.is_empty()
1398 && !query.memory_types.contains(&memory_item.memory_type)
1399 {
1400 continue;
1401 }
1402
1403 let content_lower = memory_item.content.to_lowercase();
1404 let mut match_score = 0.0f32;
1405 let mut matched_terms = 0;
1406
1407 for term in &search_terms {
1409 if content_lower.contains(term) {
1410 matched_terms += 1;
1411 if content_lower.split_whitespace().any(|word| word == term) {
1413 match_score += 1.0;
1414 } else {
1415 match_score += 0.5;
1416 }
1417 }
1418 }
1419
1420 if matched_terms > 0 {
1421 let importance_score = self.calculate_importance(memory_item);
1423 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1424 let relevance_score = (match_score * term_coverage + importance_score) / 2.0;
1425
1426 if relevance_score >= query.relevance_threshold {
1427 results.push(ContextItem {
1428 id: memory_item.id,
1429 content: memory_item.content.clone(),
1430 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1431 relevance_score,
1432 timestamp: memory_item.created_at,
1433 metadata: memory_item.metadata.clone(),
1434 });
1435 }
1436 }
1437 }
1438
1439 for episode in &context.memory.episodic_memory {
1441 let episode_content = format!("{} {}", episode.title, episode.description);
1442 let content_lower = episode_content.to_lowercase();
1443 let mut match_score = 0.0f32;
1444 let mut matched_terms = 0;
1445
1446 for term in &search_terms {
1447 if content_lower.contains(term) {
1448 matched_terms += 1;
1449 match_score += if content_lower.split_whitespace().any(|word| word == term)
1450 {
1451 1.0
1452 } else {
1453 0.5
1454 };
1455 }
1456 }
1457
1458 if matched_terms > 0 {
1459 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1460 let relevance_score = (match_score * term_coverage + episode.importance) / 2.0;
1461
1462 if relevance_score >= query.relevance_threshold {
1463 results.push(ContextItem {
1464 id: episode.id,
1465 content: episode_content,
1466 item_type: ContextItemType::Episode,
1467 relevance_score,
1468 timestamp: episode.timestamp,
1469 metadata: HashMap::new(),
1470 });
1471 }
1472 }
1473 }
1474
1475 for conv_item in &context.conversation_history {
1477 let content_lower = conv_item.content.to_lowercase();
1478 let mut match_score = 0.0f32;
1479 let mut matched_terms = 0;
1480
1481 for term in &search_terms {
1482 if content_lower.contains(term) {
1483 matched_terms += 1;
1484 match_score += if content_lower.split_whitespace().any(|word| word == term)
1485 {
1486 1.0
1487 } else {
1488 0.5
1489 };
1490 }
1491 }
1492
1493 if matched_terms > 0 {
1494 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1495 let relevance_score = match_score * term_coverage;
1496
1497 if relevance_score >= query.relevance_threshold {
1498 results.push(ContextItem {
1499 id: conv_item.id,
1500 content: conv_item.content.clone(),
1501 item_type: ContextItemType::Conversation,
1502 relevance_score,
1503 timestamp: conv_item.timestamp,
1504 metadata: HashMap::new(),
1505 });
1506 }
1507 }
1508 }
1509
1510 results.sort_by(|a, b| {
1512 b.relevance_score
1513 .partial_cmp(&a.relevance_score)
1514 .unwrap_or(std::cmp::Ordering::Equal)
1515 });
1516 results.truncate(query.max_results);
1517
1518 Ok(results)
1519 } else {
1520 Ok(Vec::new())
1521 }
1522 }
1523
1524 async fn temporal_search_memory(
1526 &self,
1527 agent_id: AgentId,
1528 query: &ContextQuery,
1529 ) -> Result<Vec<ContextItem>, ContextError> {
1530 let time_range = match &query.time_range {
1531 Some(range) => range,
1532 None => return Ok(Vec::new()), };
1534
1535 let contexts = self.contexts.read().await;
1536 if let Some(context) = contexts.get(&agent_id) {
1537 let mut results = Vec::new();
1538
1539 for memory_item in context
1541 .memory
1542 .short_term
1543 .iter()
1544 .chain(context.memory.long_term.iter())
1545 {
1546 if !query.memory_types.is_empty()
1548 && !query.memory_types.contains(&memory_item.memory_type)
1549 {
1550 continue;
1551 }
1552
1553 if memory_item.created_at >= time_range.start
1555 && memory_item.created_at <= time_range.end
1556 {
1557 let passes_keyword_filter = if query.search_terms.is_empty() {
1559 true
1560 } else {
1561 let content_lower = memory_item.content.to_lowercase();
1562 query
1563 .search_terms
1564 .iter()
1565 .any(|term| content_lower.contains(&term.to_lowercase()))
1566 };
1567
1568 if passes_keyword_filter {
1569 let importance_score = self.calculate_importance(memory_item);
1570 let recency_score = self.calculate_recency_factor(memory_item);
1571 let relevance_score = (importance_score + recency_score) / 2.0;
1572
1573 if relevance_score >= query.relevance_threshold {
1574 results.push(ContextItem {
1575 id: memory_item.id,
1576 content: memory_item.content.clone(),
1577 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1578 relevance_score,
1579 timestamp: memory_item.created_at,
1580 metadata: memory_item.metadata.clone(),
1581 });
1582 }
1583 }
1584 }
1585 }
1586
1587 for episode in &context.memory.episodic_memory {
1589 if episode.timestamp >= time_range.start && episode.timestamp <= time_range.end {
1590 let passes_keyword_filter = if query.search_terms.is_empty() {
1591 true
1592 } else {
1593 let episode_content = format!("{} {}", episode.title, episode.description);
1594 let content_lower = episode_content.to_lowercase();
1595 query
1596 .search_terms
1597 .iter()
1598 .any(|term| content_lower.contains(&term.to_lowercase()))
1599 };
1600
1601 if passes_keyword_filter && episode.importance >= query.relevance_threshold {
1602 results.push(ContextItem {
1603 id: episode.id,
1604 content: format!("{} {}", episode.title, episode.description),
1605 item_type: ContextItemType::Episode,
1606 relevance_score: episode.importance,
1607 timestamp: episode.timestamp,
1608 metadata: HashMap::new(),
1609 });
1610 }
1611 }
1612 }
1613
1614 for conv_item in &context.conversation_history {
1616 if conv_item.timestamp >= time_range.start && conv_item.timestamp <= time_range.end
1617 {
1618 let passes_keyword_filter = if query.search_terms.is_empty() {
1619 true
1620 } else {
1621 let content_lower = conv_item.content.to_lowercase();
1622 query
1623 .search_terms
1624 .iter()
1625 .any(|term| content_lower.contains(&term.to_lowercase()))
1626 };
1627
1628 if passes_keyword_filter {
1629 let time_since_start = conv_item
1631 .timestamp
1632 .duration_since(time_range.start)
1633 .unwrap_or_default()
1634 .as_secs() as f32;
1635 let range_duration = time_range
1636 .end
1637 .duration_since(time_range.start)
1638 .unwrap_or_default()
1639 .as_secs() as f32;
1640
1641 let temporal_score = if range_duration > 0.0 {
1642 1.0 - (time_since_start / range_duration)
1643 } else {
1644 1.0
1645 };
1646
1647 if temporal_score >= query.relevance_threshold {
1648 results.push(ContextItem {
1649 id: conv_item.id,
1650 content: conv_item.content.clone(),
1651 item_type: ContextItemType::Conversation,
1652 relevance_score: temporal_score,
1653 timestamp: conv_item.timestamp,
1654 metadata: HashMap::new(),
1655 });
1656 }
1657 }
1658 }
1659 }
1660
1661 results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1663 results.truncate(query.max_results);
1664
1665 Ok(results)
1666 } else {
1667 Ok(Vec::new())
1668 }
1669 }
1670
1671 async fn similarity_search_memory(
1673 &self,
1674 agent_id: AgentId,
1675 query: &ContextQuery,
1676 ) -> Result<Vec<ContextItem>, ContextError> {
1677 if self.config.enable_vector_db && !query.search_terms.is_empty() {
1678 let search_term = query.search_terms.join(" ");
1680 let query_embedding = self.generate_embeddings(&search_term).await?;
1681
1682 let threshold = query.relevance_threshold;
1684 self.vector_db
1685 .semantic_search(agent_id, query_embedding, query.max_results, threshold)
1686 .await
1687 } else {
1688 let contexts = self.contexts.read().await;
1690 if let Some(context) = contexts.get(&agent_id) {
1691 if query.search_terms.is_empty() {
1692 return Ok(Vec::new());
1693 }
1694
1695 let search_term = query.search_terms.join(" ");
1697 let query_embedding = match self.generate_embeddings(&search_term).await {
1698 Ok(embedding) => embedding,
1699 Err(_) => return Ok(Vec::new()), };
1701
1702 let mut results = Vec::new();
1703
1704 for memory_item in context
1706 .memory
1707 .short_term
1708 .iter()
1709 .chain(context.memory.long_term.iter())
1710 {
1711 if !query.memory_types.is_empty()
1713 && !query.memory_types.contains(&memory_item.memory_type)
1714 {
1715 continue;
1716 }
1717
1718 if let Some(ref item_embedding) = memory_item.embedding {
1719 let similarity = self.cosine_similarity(&query_embedding, item_embedding);
1721
1722 if similarity >= query.relevance_threshold {
1723 let importance_score = self.calculate_importance(memory_item);
1724 let relevance_score = (similarity + importance_score) / 2.0;
1725
1726 results.push(ContextItem {
1727 id: memory_item.id,
1728 content: memory_item.content.clone(),
1729 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1730 relevance_score,
1731 timestamp: memory_item.created_at,
1732 metadata: memory_item.metadata.clone(),
1733 });
1734 }
1735 }
1736 }
1737
1738 results.sort_by(|a, b| {
1740 b.relevance_score
1741 .partial_cmp(&a.relevance_score)
1742 .unwrap_or(std::cmp::Ordering::Equal)
1743 });
1744 results.truncate(query.max_results);
1745
1746 Ok(results)
1747 } else {
1748 Ok(Vec::new())
1749 }
1750 }
1751 }
1752
1753 async fn hybrid_search_memory(
1755 &self,
1756 agent_id: AgentId,
1757 query: &ContextQuery,
1758 ) -> Result<Vec<ContextItem>, ContextError> {
1759 let keyword_results = self.keyword_search_memory(agent_id, query).await?;
1761 let similarity_results = self.similarity_search_memory(agent_id, query).await?;
1762
1763 let mut combined_results: HashMap<ContextId, ContextItem> = HashMap::new();
1765
1766 let keyword_weight = 0.4;
1768 let similarity_weight = 0.6;
1769
1770 for mut item in keyword_results {
1772 item.relevance_score *= keyword_weight;
1773 combined_results.insert(item.id, item);
1774 }
1775
1776 for mut item in similarity_results {
1778 item.relevance_score *= similarity_weight;
1779
1780 if let Some(existing_item) = combined_results.get_mut(&item.id) {
1781 existing_item.relevance_score += item.relevance_score;
1783 existing_item.relevance_score =
1785 existing_item.relevance_score.max(item.relevance_score);
1786 } else {
1787 combined_results.insert(item.id, item);
1788 }
1789 }
1790
1791 let mut final_results: Vec<ContextItem> = combined_results.into_values().collect();
1793 final_results.sort_by(|a, b| {
1794 b.relevance_score
1795 .partial_cmp(&a.relevance_score)
1796 .unwrap_or(std::cmp::Ordering::Equal)
1797 });
1798
1799 final_results.retain(|item| item.relevance_score >= query.relevance_threshold);
1801 final_results.truncate(query.max_results);
1802
1803 Ok(final_results)
1804 }
1805
1806 fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1808 if a.len() != b.len() {
1809 return 0.0;
1810 }
1811
1812 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1813 let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1814 let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1815
1816 if magnitude_a == 0.0 || magnitude_b == 0.0 {
1817 0.0
1818 } else {
1819 dot_product / (magnitude_a * magnitude_b)
1820 }
1821 }
1822
1823 fn calculate_knowledge_relevance(
1833 &self,
1834 content: &str,
1835 query: &str,
1836 confidence: f32,
1837 ) -> Option<f32> {
1838 let content_lower = content.to_lowercase();
1839 let query_lower = query.to_lowercase();
1840 let query_terms: Vec<&str> = query_lower.split_whitespace().collect();
1841
1842 if query_terms.is_empty() {
1843 return None;
1844 }
1845
1846 let mut total_match_score = 0.0f32;
1847 let mut matched_terms = 0;
1848
1849 for term in &query_terms {
1851 if content_lower.contains(term) {
1852 matched_terms += 1;
1853
1854 if content_lower.split_whitespace().any(|word| word == *term) {
1856 total_match_score += 1.0; } else {
1858 total_match_score += 0.6; }
1860
1861 let occurrences = content_lower.matches(term).count();
1863 if occurrences > 1 {
1864 total_match_score += (occurrences as f32 - 1.0) * 0.1;
1865 }
1866 }
1867 }
1868
1869 if matched_terms == 0 {
1871 return None;
1872 }
1873
1874 let term_coverage = matched_terms as f32 / query_terms.len() as f32;
1876 let match_quality = total_match_score / query_terms.len() as f32;
1877
1878 let base_score = match_quality * 0.5 + term_coverage * 0.3 + confidence * 0.2;
1880
1881 let content_length_factor = if content.len() < 50 {
1883 1.0 } else if content.len() < 200 {
1885 0.95 } else {
1887 0.9 };
1889
1890 let position_bonus = if content_lower.starts_with(&query_lower) {
1892 0.1 } else if query_terms
1894 .iter()
1895 .any(|term| content_lower.starts_with(term))
1896 {
1897 0.05 } else {
1899 0.0
1900 };
1901
1902 let final_score = (base_score + position_bonus) * content_length_factor;
1903 Some(final_score.clamp(0.0, 1.0))
1904 }
1905
1906 fn calculate_trust_score(&self, shared_item: &SharedKnowledgeItem) -> f32 {
1908 let mut trust_score = 0.5;
1910
1911 let access_factor = (shared_item.access_count as f32 + 1.0).ln() / 10.0;
1913 trust_score += access_factor;
1914
1915 let knowledge_factor = match &shared_item.knowledge {
1917 Knowledge::Fact(_) => 0.2,
1918 Knowledge::Procedure(_) => 0.1,
1919 Knowledge::Pattern(_) => 0.05,
1920 };
1921 trust_score += knowledge_factor;
1922
1923 trust_score.clamp(0.0, 1.0)
1925 }
1926
1927 fn calculate_memory_size_bytes(&self, context: &AgentContext) -> usize {
1929 let mut total_size = 0;
1930
1931 total_size += std::mem::size_of::<WorkingMemory>();
1933 for (key, value) in &context.memory.working_memory.variables {
1934 total_size += key.len();
1935 total_size += estimate_json_value_size(value);
1936 }
1937 for goal in &context.memory.working_memory.active_goals {
1938 total_size += goal.len();
1939 }
1940 if let Some(ref current_context) = context.memory.working_memory.current_context {
1941 total_size += current_context.len();
1942 }
1943 for focus in &context.memory.working_memory.attention_focus {
1944 total_size += focus.len();
1945 }
1946
1947 for item in &context.memory.short_term {
1949 total_size += estimate_memory_item_size(item);
1950 }
1951
1952 for item in &context.memory.long_term {
1954 total_size += estimate_memory_item_size(item);
1955 }
1956
1957 for episode in &context.memory.episodic_memory {
1959 total_size += estimate_episode_size(episode);
1960 }
1961
1962 for item in &context.memory.semantic_memory {
1964 total_size += estimate_semantic_memory_item_size(item);
1965 }
1966
1967 for fact in &context.knowledge_base.facts {
1969 total_size += estimate_knowledge_fact_size(fact);
1970 }
1971 for procedure in &context.knowledge_base.procedures {
1972 total_size += estimate_procedure_size(procedure);
1973 }
1974 for pattern in &context.knowledge_base.learned_patterns {
1975 total_size += estimate_pattern_size(pattern);
1976 }
1977
1978 for item in &context.conversation_history {
1980 total_size += estimate_conversation_item_size(item);
1981 }
1982
1983 for (key, value) in &context.metadata {
1985 total_size += key.len() + value.len();
1986 }
1987
1988 total_size += std::mem::size_of::<AgentContext>();
1990
1991 total_size
1992 }
1993
1994 fn calculate_retention_statistics(&self, context: &AgentContext) -> RetentionStatus {
1996 let now = SystemTime::now();
1997 let retention_policy = &context.retention_policy;
1998
1999 let mut items_to_archive = 0;
2000 let mut items_to_delete = 0;
2001
2002 let memory_cutoff = now
2004 .checked_sub(retention_policy.memory_retention)
2005 .unwrap_or(SystemTime::UNIX_EPOCH);
2006
2007 let knowledge_cutoff = now
2008 .checked_sub(retention_policy.knowledge_retention)
2009 .unwrap_or(SystemTime::UNIX_EPOCH);
2010
2011 let conversation_cutoff = now
2012 .checked_sub(retention_policy.session_retention)
2013 .unwrap_or(SystemTime::UNIX_EPOCH);
2014
2015 for item in &context.memory.short_term {
2017 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2018 items_to_archive += 1;
2019 }
2020 }
2021
2022 for item in &context.memory.long_term {
2023 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2024 items_to_archive += 1;
2025 }
2026 }
2027
2028 for episode in &context.memory.episodic_memory {
2029 if episode.timestamp < memory_cutoff {
2030 items_to_archive += 1;
2031 }
2032 }
2033
2034 let semantic_cutoff = now
2036 .checked_sub(retention_policy.memory_retention * 2)
2037 .unwrap_or(SystemTime::UNIX_EPOCH);
2038
2039 for item in &context.memory.semantic_memory {
2040 if item.created_at < semantic_cutoff {
2041 items_to_archive += 1;
2042 }
2043 }
2044
2045 for fact in &context.knowledge_base.facts {
2047 if fact.created_at < knowledge_cutoff {
2048 items_to_archive += 1;
2049 }
2050 }
2051
2052 for procedure in &context.knowledge_base.procedures {
2054 if procedure.success_rate < 0.3 {
2055 items_to_archive += 1;
2056 }
2057 }
2058
2059 for pattern in &context.knowledge_base.learned_patterns {
2061 if pattern.confidence < 0.4 || pattern.occurrences < 2 {
2062 items_to_archive += 1;
2063 }
2064 }
2065
2066 for item in &context.conversation_history {
2068 if item.timestamp < conversation_cutoff {
2069 items_to_archive += 1;
2070 }
2071 }
2072
2073 let delete_cutoff_memory = now
2075 .checked_sub(retention_policy.memory_retention * 2)
2076 .unwrap_or(SystemTime::UNIX_EPOCH);
2077
2078 let delete_cutoff_knowledge = now
2079 .checked_sub(retention_policy.knowledge_retention * 2)
2080 .unwrap_or(SystemTime::UNIX_EPOCH);
2081
2082 let delete_cutoff_conversation = now
2083 .checked_sub(retention_policy.session_retention * 2)
2084 .unwrap_or(SystemTime::UNIX_EPOCH);
2085
2086 for item in &context.memory.short_term {
2088 if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
2089 items_to_delete += 1;
2090 }
2091 }
2092
2093 for item in &context.memory.long_term {
2094 if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
2095 items_to_delete += 1;
2096 }
2097 }
2098
2099 for fact in &context.knowledge_base.facts {
2101 if fact.created_at < delete_cutoff_knowledge && !fact.verified {
2102 items_to_delete += 1;
2103 }
2104 }
2105
2106 for item in &context.conversation_history {
2108 if item.timestamp < delete_cutoff_conversation {
2109 items_to_delete += 1;
2110 }
2111 }
2112
2113 let next_cleanup = now + Duration::from_secs(86400); RetentionStatus {
2117 items_to_archive,
2118 items_to_delete,
2119 next_cleanup,
2120 }
2121 }
2122
2123 async fn archive_memory_items(
2125 &self,
2126 context: &mut AgentContext,
2127 before: SystemTime,
2128 archived_context: &mut ArchivedContext,
2129 ) -> Result<u32, ContextError> {
2130 let mut archived_count = 0u32;
2131 let retention_policy = &context.retention_policy;
2132
2133 let memory_cutoff = before
2135 .checked_sub(retention_policy.memory_retention)
2136 .unwrap_or(SystemTime::UNIX_EPOCH);
2137
2138 let mut retained_short_term = Vec::new();
2140 for item in context.memory.short_term.drain(..) {
2141 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2142 archived_context.memory.short_term.push(item);
2143 archived_count += 1;
2144 } else {
2145 retained_short_term.push(item);
2146 }
2147 }
2148 context.memory.short_term = retained_short_term;
2149
2150 let mut retained_long_term = Vec::new();
2152 for item in context.memory.long_term.drain(..) {
2153 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2154 archived_context.memory.long_term.push(item);
2155 archived_count += 1;
2156 } else {
2157 retained_long_term.push(item);
2158 }
2159 }
2160 context.memory.long_term = retained_long_term;
2161
2162 let mut retained_episodes = Vec::new();
2164 for episode in context.memory.episodic_memory.drain(..) {
2165 if episode.timestamp < memory_cutoff {
2166 archived_context.memory.episodic_memory.push(episode);
2167 archived_count += 1;
2168 } else {
2169 retained_episodes.push(episode);
2170 }
2171 }
2172 context.memory.episodic_memory = retained_episodes;
2173
2174 let semantic_cutoff = before
2176 .checked_sub(retention_policy.memory_retention * 2)
2177 .unwrap_or(SystemTime::UNIX_EPOCH);
2178 let mut retained_semantic = Vec::new();
2179 for item in context.memory.semantic_memory.drain(..) {
2180 if item.created_at < semantic_cutoff {
2181 archived_context.memory.semantic_memory.push(item);
2182 archived_count += 1;
2183 } else {
2184 retained_semantic.push(item);
2185 }
2186 }
2187 context.memory.semantic_memory = retained_semantic;
2188
2189 Ok(archived_count)
2190 }
2191
2192 async fn archive_conversation_history(
2194 &self,
2195 context: &mut AgentContext,
2196 before: SystemTime,
2197 archived_context: &mut ArchivedContext,
2198 ) -> Result<u32, ContextError> {
2199 let mut archived_count = 0u32;
2200 let retention_policy = &context.retention_policy;
2201
2202 let conversation_cutoff = before
2204 .checked_sub(retention_policy.session_retention)
2205 .unwrap_or(SystemTime::UNIX_EPOCH);
2206
2207 let mut retained_conversations = Vec::new();
2209 for item in context.conversation_history.drain(..) {
2210 if item.timestamp < conversation_cutoff {
2211 archived_context.conversation_history.push(item);
2212 archived_count += 1;
2213 } else {
2214 retained_conversations.push(item);
2215 }
2216 }
2217 context.conversation_history = retained_conversations;
2218
2219 Ok(archived_count)
2220 }
2221
2222 async fn archive_knowledge_items(
2224 &self,
2225 context: &mut AgentContext,
2226 before: SystemTime,
2227 archived_context: &mut ArchivedContext,
2228 ) -> Result<u32, ContextError> {
2229 let mut archived_count = 0u32;
2230 let retention_policy = &context.retention_policy;
2231
2232 let knowledge_cutoff = before
2234 .checked_sub(retention_policy.knowledge_retention)
2235 .unwrap_or(SystemTime::UNIX_EPOCH);
2236
2237 let mut retained_facts = Vec::new();
2239 for fact in context.knowledge_base.facts.drain(..) {
2240 if fact.created_at < knowledge_cutoff {
2241 archived_context.knowledge_base.facts.push(fact);
2242 archived_count += 1;
2243 } else {
2244 retained_facts.push(fact);
2245 }
2246 }
2247 context.knowledge_base.facts = retained_facts;
2248
2249 let mut retained_procedures = Vec::new();
2252 for procedure in context.knowledge_base.procedures.drain(..) {
2253 if procedure.success_rate < 0.3 {
2255 archived_context.knowledge_base.procedures.push(procedure);
2256 archived_count += 1;
2257 } else {
2258 retained_procedures.push(procedure);
2259 }
2260 }
2261 context.knowledge_base.procedures = retained_procedures;
2262
2263 let mut retained_patterns = Vec::new();
2265 for pattern in context.knowledge_base.learned_patterns.drain(..) {
2266 if pattern.confidence < 0.4 || pattern.occurrences < 2 {
2267 archived_context
2268 .knowledge_base
2269 .learned_patterns
2270 .push(pattern);
2271 archived_count += 1;
2272 } else {
2273 retained_patterns.push(pattern);
2274 }
2275 }
2276 context.knowledge_base.learned_patterns = retained_patterns;
2277
2278 Ok(archived_count)
2279 }
2280
2281 async fn save_archived_context(
2283 &self,
2284 agent_id: AgentId,
2285 archived_context: &ArchivedContext,
2286 ) -> Result<(), ContextError> {
2287 if !self.config.enable_persistence {
2288 return Ok(());
2289 }
2290
2291 let archive_dir = self.get_archive_directory_path(agent_id).await?;
2293
2294 let timestamp = archived_context
2296 .archived_at
2297 .duration_since(SystemTime::UNIX_EPOCH)
2298 .unwrap_or_default()
2299 .as_secs();
2300 let archive_filename = format!("archive_{}.json", timestamp);
2301 let archive_path = archive_dir.join(archive_filename);
2302
2303 let archive_data = serde_json::to_vec_pretty(archived_context).map_err(|e| {
2305 ContextError::SerializationError {
2306 reason: format!("Failed to serialize archived context: {}", e),
2307 }
2308 })?;
2309
2310 let final_data = if self.config.persistence_config.enable_compression {
2312 self.compress_data(&archive_data)?
2313 } else {
2314 archive_data
2315 };
2316
2317 let temp_path = archive_path.with_extension("tmp");
2319 fs::write(&temp_path, &final_data)
2320 .await
2321 .map_err(|e| ContextError::StorageError {
2322 reason: format!("Failed to write archive file: {}", e),
2323 })?;
2324
2325 fs::rename(&temp_path, &archive_path)
2327 .await
2328 .map_err(|e| ContextError::StorageError {
2329 reason: format!("Failed to finalize archive file: {}", e),
2330 })?;
2331
2332 tracing::info!(
2333 "Saved archived context for agent {} to {}",
2334 agent_id,
2335 archive_path.display()
2336 );
2337
2338 Ok(())
2339 }
2340
2341 async fn get_archive_directory_path(&self, agent_id: AgentId) -> Result<PathBuf, ContextError> {
2343 let archive_dir = self
2344 .config
2345 .persistence_config
2346 .agent_contexts_path()
2347 .join("archives")
2348 .join(agent_id.to_string());
2349
2350 fs::create_dir_all(&archive_dir)
2352 .await
2353 .map_err(|e| ContextError::StorageError {
2354 reason: format!("Failed to create archive directory: {}", e),
2355 })?;
2356
2357 Ok(archive_dir)
2358 }
2359
2360 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>, ContextError> {
2362 use flate2::write::GzEncoder;
2363 use flate2::Compression;
2364 use std::io::Write;
2365
2366 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2367 encoder
2368 .write_all(data)
2369 .map_err(|e| ContextError::SerializationError {
2370 reason: format!("Failed to compress archive data: {}", e),
2371 })?;
2372 encoder
2373 .finish()
2374 .map_err(|e| ContextError::SerializationError {
2375 reason: format!("Failed to finalize compression: {}", e),
2376 })
2377 }
2378
2379 fn knowledge_to_item(
2381 &self,
2382 knowledge: &Knowledge,
2383 knowledge_id: KnowledgeId,
2384 ) -> Result<KnowledgeItem, ContextError> {
2385 match knowledge {
2386 Knowledge::Fact(fact) => {
2387 Ok(KnowledgeItem {
2388 id: knowledge_id,
2389 content: format!("{} {} {}", fact.subject, fact.predicate, fact.object),
2390 knowledge_type: KnowledgeType::Fact,
2391 confidence: fact.confidence,
2392 relevance_score: 1.0, source: fact.source.clone(),
2394 created_at: fact.created_at,
2395 })
2396 }
2397 Knowledge::Procedure(procedure) => {
2398 Ok(KnowledgeItem {
2399 id: knowledge_id,
2400 content: format!("{}: {}", procedure.name, procedure.description),
2401 knowledge_type: KnowledgeType::Procedure,
2402 confidence: procedure.success_rate,
2403 relevance_score: 1.0, source: KnowledgeSource::Learning,
2405 created_at: SystemTime::now(),
2406 })
2407 }
2408 Knowledge::Pattern(pattern) => {
2409 Ok(KnowledgeItem {
2410 id: knowledge_id,
2411 content: format!("Pattern: {}", pattern.description),
2412 knowledge_type: KnowledgeType::Pattern,
2413 confidence: pattern.confidence,
2414 relevance_score: 1.0, source: KnowledgeSource::Learning,
2416 created_at: SystemTime::now(),
2417 })
2418 }
2419 }
2420 }
2421}
2422
2423fn estimate_json_value_size(value: &Value) -> usize {
2425 match value {
2426 Value::Null => 4,
2427 Value::Bool(_) => 1,
2428 Value::Number(n) => std::mem::size_of::<f64>() + n.to_string().len(),
2429 Value::String(s) => s.len(),
2430 Value::Array(arr) => {
2431 arr.iter().map(estimate_json_value_size).sum::<usize>()
2432 + std::mem::size_of::<Vec<Value>>()
2433 }
2434 Value::Object(obj) => {
2435 obj.iter()
2436 .map(|(k, v)| k.len() + estimate_json_value_size(v))
2437 .sum::<usize>()
2438 + std::mem::size_of::<serde_json::Map<String, Value>>()
2439 }
2440 }
2441}
2442
2443fn estimate_memory_item_size(item: &MemoryItem) -> usize {
2445 let mut size = std::mem::size_of::<MemoryItem>();
2446 size += item.content.len();
2447
2448 if let Some(ref embedding) = item.embedding {
2450 size += embedding.len() * std::mem::size_of::<f32>();
2451 }
2452
2453 for (key, value) in &item.metadata {
2455 size += key.len() + value.len();
2456 }
2457
2458 size
2459}
2460
2461fn estimate_episode_size(episode: &Episode) -> usize {
2463 let mut size = std::mem::size_of::<Episode>();
2464 size += episode.title.len();
2465 size += episode.description.len();
2466
2467 if let Some(ref outcome) = episode.outcome {
2468 size += outcome.len();
2469 }
2470
2471 for event in &episode.events {
2472 size += event.action.len();
2473 size += event.result.len();
2474 for (key, value) in &event.context {
2475 size += key.len() + value.len();
2476 }
2477 }
2478
2479 for lesson in &episode.lessons_learned {
2480 size += lesson.len();
2481 }
2482
2483 size
2484}
2485
2486fn estimate_semantic_memory_item_size(item: &SemanticMemoryItem) -> usize {
2488 let mut size = std::mem::size_of::<SemanticMemoryItem>();
2489 size += item.concept.len();
2490
2491 for relationship in &item.relationships {
2492 size += relationship.target_concept.len();
2493 size += std::mem::size_of::<ConceptRelationship>();
2494 }
2495
2496 for (key, value) in &item.properties {
2497 size += key.len() + estimate_json_value_size(value);
2498 }
2499
2500 size
2501}
2502
2503fn estimate_knowledge_fact_size(fact: &KnowledgeFact) -> usize {
2505 std::mem::size_of::<KnowledgeFact>()
2506 + fact.subject.len()
2507 + fact.predicate.len()
2508 + fact.object.len()
2509}
2510
2511fn estimate_procedure_size(procedure: &Procedure) -> usize {
2513 let mut size = std::mem::size_of::<Procedure>();
2514 size += procedure.name.len();
2515 size += procedure.description.len();
2516
2517 for step in &procedure.steps {
2518 size += step.action.len();
2519 size += step.expected_result.len();
2520 if let Some(ref error_handling) = step.error_handling {
2521 size += error_handling.len();
2522 }
2523 }
2524
2525 for condition in &procedure.preconditions {
2526 size += condition.len();
2527 }
2528
2529 for condition in &procedure.postconditions {
2530 size += condition.len();
2531 }
2532
2533 size
2534}
2535
2536fn estimate_pattern_size(pattern: &Pattern) -> usize {
2538 let mut size = std::mem::size_of::<Pattern>();
2539 size += pattern.name.len();
2540 size += pattern.description.len();
2541
2542 for condition in &pattern.conditions {
2543 size += condition.len();
2544 }
2545
2546 for outcome in &pattern.outcomes {
2547 size += outcome.len();
2548 }
2549
2550 size
2551}
2552
2553fn estimate_conversation_item_size(item: &ConversationItem) -> usize {
2555 let mut size = std::mem::size_of::<ConversationItem>();
2556 size += item.content.len();
2557
2558 size += item.context_used.len() * std::mem::size_of::<ContextId>();
2560 size += item.knowledge_used.len() * std::mem::size_of::<KnowledgeId>();
2561
2562 size
2563}
2564
2565#[async_trait]
2566impl ContextManager for StandardContextManager {
2567 async fn store_context(
2568 &self,
2569 agent_id: AgentId,
2570 mut context: AgentContext,
2571 ) -> Result<ContextId, ContextError> {
2572 self.validate_access(agent_id, "store_context").await?;
2573
2574 context.updated_at = SystemTime::now();
2575 let context_id = ContextId::new();
2576
2577 if self.config.enable_persistence {
2579 self.persistence.save_context(agent_id, &context).await?;
2580 }
2581
2582 let mut contexts = self.contexts.write().await;
2584 contexts.insert(agent_id, context);
2585
2586 Ok(context_id)
2587 }
2588
2589 async fn retrieve_context(
2590 &self,
2591 agent_id: AgentId,
2592 session_id: Option<SessionId>,
2593 ) -> Result<Option<AgentContext>, ContextError> {
2594 self.validate_access(agent_id, "retrieve_context").await?;
2595
2596 {
2598 let contexts = self.contexts.read().await;
2599 if let Some(context) = contexts.get(&agent_id) {
2600 if let Some(sid) = session_id {
2602 if context.session_id == sid {
2603 return Ok(Some(context.clone()));
2604 }
2605 } else {
2606 return Ok(Some(context.clone()));
2607 }
2608 }
2609 }
2610
2611 if self.config.enable_persistence {
2613 if let Some(context) = self.persistence.load_context(agent_id).await? {
2614 if let Some(sid) = session_id {
2616 if context.session_id != sid {
2617 return Ok(None);
2618 }
2619 }
2620
2621 let mut contexts = self.contexts.write().await;
2623 contexts.insert(agent_id, context.clone());
2624
2625 return Ok(Some(context));
2626 }
2627 }
2628
2629 Ok(None)
2630 }
2631
2632 async fn query_context(
2633 &self,
2634 agent_id: AgentId,
2635 query: ContextQuery,
2636 ) -> Result<Vec<ContextItem>, ContextError> {
2637 self.validate_access(agent_id, "query_context").await?;
2638
2639 match query.query_type {
2640 QueryType::Semantic => {
2641 let search_term = query.search_terms.join(" ");
2642 self.semantic_search_memory(agent_id, &search_term, query.max_results)
2643 .await
2644 }
2645 QueryType::Keyword => self.keyword_search_memory(agent_id, &query).await,
2646 QueryType::Temporal => self.temporal_search_memory(agent_id, &query).await,
2647 QueryType::Similarity => self.similarity_search_memory(agent_id, &query).await,
2648 QueryType::Hybrid => self.hybrid_search_memory(agent_id, &query).await,
2649 }
2650 }
2651
2652 async fn update_memory(
2653 &self,
2654 agent_id: AgentId,
2655 memory_updates: Vec<MemoryUpdate>,
2656 ) -> Result<(), ContextError> {
2657 self.validate_access(agent_id, "update_memory").await?;
2658
2659 let mut contexts = self.contexts.write().await;
2660 if let Some(context) = contexts.get_mut(&agent_id) {
2661 for update in memory_updates {
2662 match update.operation {
2663 UpdateOperation::Add => {
2664 match update.target {
2665 MemoryTarget::ShortTerm(_) => {
2666 if let Ok(memory_item_data) =
2668 serde_json::from_value::<serde_json::Map<String, Value>>(
2669 update.data,
2670 )
2671 {
2672 let memory_item = MemoryItem {
2673 id: ContextId::new(),
2674 content: memory_item_data
2675 .get("content")
2676 .and_then(|v| v.as_str())
2677 .unwrap_or("")
2678 .to_string(),
2679 memory_type: memory_item_data
2680 .get("memory_type")
2681 .and_then(|v| serde_json::from_value(v.clone()).ok())
2682 .unwrap_or(MemoryType::Factual),
2683 importance: memory_item_data
2684 .get("importance")
2685 .and_then(|v| v.as_f64())
2686 .unwrap_or(0.5)
2687 as f32,
2688 access_count: 0,
2689 last_accessed: SystemTime::now(),
2690 created_at: SystemTime::now(),
2691 embedding: None,
2692 metadata: memory_item_data
2693 .get("metadata")
2694 .and_then(|v| serde_json::from_value(v.clone()).ok())
2695 .unwrap_or_default(),
2696 };
2697 context.memory.short_term.push(memory_item);
2698 }
2699 }
2700 MemoryTarget::LongTerm(_) => {
2701 if let Ok(memory_item_data) =
2703 serde_json::from_value::<serde_json::Map<String, Value>>(
2704 update.data,
2705 )
2706 {
2707 let memory_item = MemoryItem {
2708 id: ContextId::new(),
2709 content: memory_item_data
2710 .get("content")
2711 .and_then(|v| v.as_str())
2712 .unwrap_or("")
2713 .to_string(),
2714 memory_type: memory_item_data
2715 .get("memory_type")
2716 .and_then(|v| serde_json::from_value(v.clone()).ok())
2717 .unwrap_or(MemoryType::Factual),
2718 importance: memory_item_data
2719 .get("importance")
2720 .and_then(|v| v.as_f64())
2721 .unwrap_or(0.5)
2722 as f32,
2723 access_count: 0,
2724 last_accessed: SystemTime::now(),
2725 created_at: SystemTime::now(),
2726 embedding: None,
2727 metadata: memory_item_data
2728 .get("metadata")
2729 .and_then(|v| serde_json::from_value(v.clone()).ok())
2730 .unwrap_or_default(),
2731 };
2732 context.memory.long_term.push(memory_item);
2733 }
2734 }
2735 MemoryTarget::Working(key) => {
2736 context
2738 .memory
2739 .working_memory
2740 .variables
2741 .insert(key, update.data);
2742 }
2743 MemoryTarget::Episodic(_) => {
2744 if let Ok(episode_data) =
2746 serde_json::from_value::<serde_json::Map<String, Value>>(
2747 update.data,
2748 )
2749 {
2750 let episode = Episode {
2751 id: ContextId::new(),
2752 title: episode_data
2753 .get("title")
2754 .and_then(|v| v.as_str())
2755 .unwrap_or("Untitled Episode")
2756 .to_string(),
2757 description: episode_data
2758 .get("description")
2759 .and_then(|v| v.as_str())
2760 .unwrap_or("")
2761 .to_string(),
2762 events: episode_data
2763 .get("events")
2764 .and_then(|v| serde_json::from_value(v.clone()).ok())
2765 .unwrap_or_default(),
2766 outcome: episode_data
2767 .get("outcome")
2768 .and_then(|v| v.as_str())
2769 .map(|s| s.to_string()),
2770 lessons_learned: episode_data
2771 .get("lessons_learned")
2772 .and_then(|v| serde_json::from_value(v.clone()).ok())
2773 .unwrap_or_default(),
2774 timestamp: SystemTime::now(),
2775 importance: episode_data
2776 .get("importance")
2777 .and_then(|v| v.as_f64())
2778 .unwrap_or(0.5)
2779 as f32,
2780 };
2781 context.memory.episodic_memory.push(episode);
2782 }
2783 }
2784 MemoryTarget::Semantic(_) => {
2785 if let Ok(semantic_data) =
2787 serde_json::from_value::<serde_json::Map<String, Value>>(
2788 update.data,
2789 )
2790 {
2791 let semantic_item = SemanticMemoryItem {
2792 id: ContextId::new(),
2793 concept: semantic_data
2794 .get("concept")
2795 .and_then(|v| v.as_str())
2796 .unwrap_or("")
2797 .to_string(),
2798 relationships: semantic_data
2799 .get("relationships")
2800 .and_then(|v| serde_json::from_value(v.clone()).ok())
2801 .unwrap_or_default(),
2802 properties: semantic_data
2803 .get("properties")
2804 .and_then(|v| serde_json::from_value(v.clone()).ok())
2805 .unwrap_or_default(),
2806 confidence: semantic_data
2807 .get("confidence")
2808 .and_then(|v| v.as_f64())
2809 .unwrap_or(0.5)
2810 as f32,
2811 created_at: SystemTime::now(),
2812 updated_at: SystemTime::now(),
2813 };
2814 context.memory.semantic_memory.push(semantic_item);
2815 }
2816 }
2817 }
2818 }
2819 UpdateOperation::Update => {
2820 match update.target {
2821 MemoryTarget::ShortTerm(target_id) => {
2822 if let Some(memory_item) = context
2824 .memory
2825 .short_term
2826 .iter_mut()
2827 .find(|item| item.id == target_id)
2828 {
2829 if let Ok(update_data) =
2830 serde_json::from_value::<serde_json::Map<String, Value>>(
2831 update.data,
2832 )
2833 {
2834 if let Some(content) =
2835 update_data.get("content").and_then(|v| v.as_str())
2836 {
2837 memory_item.content = content.to_string();
2838 }
2839 if let Some(importance) =
2840 update_data.get("importance").and_then(|v| v.as_f64())
2841 {
2842 memory_item.importance = importance as f32;
2843 }
2844 if let Some(metadata) = update_data.get("metadata") {
2845 if let Ok(new_metadata) =
2846 serde_json::from_value(metadata.clone())
2847 {
2848 memory_item.metadata = new_metadata;
2849 }
2850 }
2851 memory_item.last_accessed = SystemTime::now();
2852 memory_item.access_count += 1;
2853 }
2854 }
2855 }
2856 MemoryTarget::LongTerm(target_id) => {
2857 if let Some(memory_item) = context
2859 .memory
2860 .long_term
2861 .iter_mut()
2862 .find(|item| item.id == target_id)
2863 {
2864 if let Ok(update_data) =
2865 serde_json::from_value::<serde_json::Map<String, Value>>(
2866 update.data,
2867 )
2868 {
2869 if let Some(content) =
2870 update_data.get("content").and_then(|v| v.as_str())
2871 {
2872 memory_item.content = content.to_string();
2873 }
2874 if let Some(importance) =
2875 update_data.get("importance").and_then(|v| v.as_f64())
2876 {
2877 memory_item.importance = importance as f32;
2878 }
2879 if let Some(metadata) = update_data.get("metadata") {
2880 if let Ok(new_metadata) =
2881 serde_json::from_value(metadata.clone())
2882 {
2883 memory_item.metadata = new_metadata;
2884 }
2885 }
2886 memory_item.last_accessed = SystemTime::now();
2887 memory_item.access_count += 1;
2888 }
2889 }
2890 }
2891 MemoryTarget::Working(key) => {
2892 context
2894 .memory
2895 .working_memory
2896 .variables
2897 .insert(key, update.data);
2898 }
2899 MemoryTarget::Episodic(target_id) => {
2900 if let Some(episode) = context
2902 .memory
2903 .episodic_memory
2904 .iter_mut()
2905 .find(|ep| ep.id == target_id)
2906 {
2907 if let Ok(update_data) =
2908 serde_json::from_value::<serde_json::Map<String, Value>>(
2909 update.data,
2910 )
2911 {
2912 if let Some(title) =
2913 update_data.get("title").and_then(|v| v.as_str())
2914 {
2915 episode.title = title.to_string();
2916 }
2917 if let Some(description) =
2918 update_data.get("description").and_then(|v| v.as_str())
2919 {
2920 episode.description = description.to_string();
2921 }
2922 if let Some(outcome) =
2923 update_data.get("outcome").and_then(|v| v.as_str())
2924 {
2925 episode.outcome = Some(outcome.to_string());
2926 }
2927 if let Some(importance) =
2928 update_data.get("importance").and_then(|v| v.as_f64())
2929 {
2930 episode.importance = importance as f32;
2931 }
2932 if let Some(lessons) = update_data.get("lessons_learned") {
2933 if let Ok(new_lessons) =
2934 serde_json::from_value(lessons.clone())
2935 {
2936 episode.lessons_learned = new_lessons;
2937 }
2938 }
2939 }
2940 }
2941 }
2942 MemoryTarget::Semantic(target_id) => {
2943 if let Some(semantic_item) = context
2945 .memory
2946 .semantic_memory
2947 .iter_mut()
2948 .find(|item| item.id == target_id)
2949 {
2950 if let Ok(update_data) =
2951 serde_json::from_value::<serde_json::Map<String, Value>>(
2952 update.data,
2953 )
2954 {
2955 if let Some(concept) =
2956 update_data.get("concept").and_then(|v| v.as_str())
2957 {
2958 semantic_item.concept = concept.to_string();
2959 }
2960 if let Some(confidence) =
2961 update_data.get("confidence").and_then(|v| v.as_f64())
2962 {
2963 semantic_item.confidence = confidence as f32;
2964 }
2965 if let Some(relationships) =
2966 update_data.get("relationships")
2967 {
2968 if let Ok(new_relationships) =
2969 serde_json::from_value(relationships.clone())
2970 {
2971 semantic_item.relationships = new_relationships;
2972 }
2973 }
2974 if let Some(properties) = update_data.get("properties") {
2975 if let Ok(new_properties) =
2976 serde_json::from_value(properties.clone())
2977 {
2978 semantic_item.properties = new_properties;
2979 }
2980 }
2981 semantic_item.updated_at = SystemTime::now();
2982 }
2983 }
2984 }
2985 }
2986 }
2987 UpdateOperation::Delete => {
2988 match update.target {
2989 MemoryTarget::ShortTerm(target_id) => {
2990 context
2992 .memory
2993 .short_term
2994 .retain(|item| item.id != target_id);
2995 }
2996 MemoryTarget::LongTerm(target_id) => {
2997 context.memory.long_term.retain(|item| item.id != target_id);
2999 }
3000 MemoryTarget::Working(key) => {
3001 context.memory.working_memory.variables.remove(&key);
3003 }
3004 MemoryTarget::Episodic(target_id) => {
3005 context
3007 .memory
3008 .episodic_memory
3009 .retain(|ep| ep.id != target_id);
3010 }
3011 MemoryTarget::Semantic(target_id) => {
3012 context
3014 .memory
3015 .semantic_memory
3016 .retain(|item| item.id != target_id);
3017 }
3018 }
3019 }
3020 UpdateOperation::Increment => {
3021 match update.target {
3022 MemoryTarget::ShortTerm(target_id) => {
3023 if let Some(memory_item) = context
3025 .memory
3026 .short_term
3027 .iter_mut()
3028 .find(|item| item.id == target_id)
3029 {
3030 if let Ok(increment_data) =
3031 serde_json::from_value::<serde_json::Map<String, Value>>(
3032 update.data,
3033 )
3034 {
3035 if let Some(importance_increment) = increment_data
3036 .get("importance")
3037 .and_then(|v| v.as_f64())
3038 {
3039 memory_item.importance = (memory_item.importance
3040 + importance_increment as f32)
3041 .clamp(0.0, 1.0);
3042 }
3043 if let Some(access_increment) = increment_data
3044 .get("access_count")
3045 .and_then(|v| v.as_u64())
3046 {
3047 memory_item.access_count = memory_item
3048 .access_count
3049 .saturating_add(access_increment as u32);
3050 }
3051 memory_item.last_accessed = SystemTime::now();
3052 }
3053 }
3054 }
3055 MemoryTarget::LongTerm(target_id) => {
3056 if let Some(memory_item) = context
3058 .memory
3059 .long_term
3060 .iter_mut()
3061 .find(|item| item.id == target_id)
3062 {
3063 if let Ok(increment_data) =
3064 serde_json::from_value::<serde_json::Map<String, Value>>(
3065 update.data,
3066 )
3067 {
3068 if let Some(importance_increment) = increment_data
3069 .get("importance")
3070 .and_then(|v| v.as_f64())
3071 {
3072 memory_item.importance = (memory_item.importance
3073 + importance_increment as f32)
3074 .clamp(0.0, 1.0);
3075 }
3076 if let Some(access_increment) = increment_data
3077 .get("access_count")
3078 .and_then(|v| v.as_u64())
3079 {
3080 memory_item.access_count = memory_item
3081 .access_count
3082 .saturating_add(access_increment as u32);
3083 }
3084 memory_item.last_accessed = SystemTime::now();
3085 }
3086 }
3087 }
3088 MemoryTarget::Working(key) => {
3089 if let Some(existing_value) =
3091 context.memory.working_memory.variables.get(&key)
3092 {
3093 if let (Some(current), Some(increment)) =
3094 (existing_value.as_f64(), update.data.as_f64())
3095 {
3096 let new_value = current + increment;
3097 context.memory.working_memory.variables.insert(
3098 key,
3099 Value::Number(
3100 serde_json::Number::from_f64(new_value)
3101 .unwrap_or_else(|| serde_json::Number::from(0)),
3102 ),
3103 );
3104 } else if let (Some(current), Some(increment)) =
3105 (existing_value.as_i64(), update.data.as_i64())
3106 {
3107 let new_value = current.saturating_add(increment);
3108 context.memory.working_memory.variables.insert(
3109 key,
3110 Value::Number(serde_json::Number::from(new_value)),
3111 );
3112 }
3113 }
3114 }
3115 MemoryTarget::Episodic(target_id) => {
3116 if let Some(episode) = context
3118 .memory
3119 .episodic_memory
3120 .iter_mut()
3121 .find(|ep| ep.id == target_id)
3122 {
3123 if let Some(importance_increment) = update.data.as_f64() {
3124 episode.importance = (episode.importance
3125 + importance_increment as f32)
3126 .clamp(0.0, 1.0);
3127 }
3128 }
3129 }
3130 MemoryTarget::Semantic(target_id) => {
3131 if let Some(semantic_item) = context
3133 .memory
3134 .semantic_memory
3135 .iter_mut()
3136 .find(|item| item.id == target_id)
3137 {
3138 if let Some(confidence_increment) = update.data.as_f64() {
3139 semantic_item.confidence = (semantic_item.confidence
3140 + confidence_increment as f32)
3141 .clamp(0.0, 1.0);
3142 semantic_item.updated_at = SystemTime::now();
3143 }
3144 }
3145 }
3146 }
3147 }
3148 }
3149 }
3150
3151 context.updated_at = SystemTime::now();
3153
3154 if self.config.enable_persistence {
3156 if let Err(e) = self.persistence.save_context(agent_id, context).await {
3157 tracing::error!(
3158 "Failed to persist memory updates for agent {}: {}",
3159 agent_id,
3160 e
3161 );
3162 return Err(e);
3163 }
3164 }
3165 } else {
3166 return Err(ContextError::NotFound {
3167 id: ContextId::new(),
3168 });
3169 }
3170
3171 Ok(())
3172 }
3173
3174 async fn add_knowledge(
3175 &self,
3176 agent_id: AgentId,
3177 knowledge: Knowledge,
3178 ) -> Result<KnowledgeId, ContextError> {
3179 self.validate_access(agent_id, "add_knowledge").await?;
3180
3181 let knowledge_id = KnowledgeId::new();
3182
3183 if self.config.enable_vector_db {
3185 let knowledge_item = self.knowledge_to_item(&knowledge, knowledge_id)?;
3186 let embedding = self.generate_embeddings(&knowledge_item.content).await?;
3187 let _vector_id = self
3188 .vector_db
3189 .store_knowledge_item(&knowledge_item, embedding)
3190 .await?;
3191 }
3192
3193 let mut contexts = self.contexts.write().await;
3195 if let Some(context) = contexts.get_mut(&agent_id) {
3196 match knowledge {
3197 Knowledge::Fact(fact) => {
3198 context.knowledge_base.facts.push(fact);
3199 }
3200 Knowledge::Procedure(procedure) => {
3201 context.knowledge_base.procedures.push(procedure);
3202 }
3203 Knowledge::Pattern(pattern) => {
3204 context.knowledge_base.learned_patterns.push(pattern);
3205 }
3206 }
3207 context.updated_at = SystemTime::now();
3208 }
3209
3210 Ok(knowledge_id)
3211 }
3212
3213 async fn search_knowledge(
3214 &self,
3215 agent_id: AgentId,
3216 query: &str,
3217 limit: usize,
3218 ) -> Result<Vec<KnowledgeItem>, ContextError> {
3219 self.validate_access(agent_id, "search_knowledge").await?;
3220
3221 if self.config.enable_vector_db {
3222 let query_embedding = self.generate_embeddings(query).await?;
3224
3225 self.vector_db
3227 .search_knowledge_base(agent_id, query_embedding, limit)
3228 .await
3229 } else {
3230 let contexts = self.contexts.read().await;
3232 if let Some(context) = contexts.get(&agent_id) {
3233 let mut results = Vec::new();
3234
3235 for fact in &context.knowledge_base.facts {
3237 let content = format!("{} {} {}", fact.subject, fact.predicate, fact.object);
3238 if let Some(relevance_score) =
3239 self.calculate_knowledge_relevance(&content, query, fact.confidence)
3240 {
3241 results.push(KnowledgeItem {
3242 id: fact.id,
3243 content,
3244 knowledge_type: KnowledgeType::Fact,
3245 confidence: fact.confidence,
3246 relevance_score,
3247 source: fact.source.clone(),
3248 created_at: fact.created_at,
3249 });
3250 }
3251 }
3252
3253 for procedure in &context.knowledge_base.procedures {
3255 let searchable_content =
3256 format!("{} {}", procedure.name, procedure.description);
3257 if let Some(relevance_score) = self.calculate_knowledge_relevance(
3258 &searchable_content,
3259 query,
3260 procedure.success_rate,
3261 ) {
3262 results.push(KnowledgeItem {
3263 id: procedure.id,
3264 content: format!("{}: {}", procedure.name, procedure.description),
3265 knowledge_type: KnowledgeType::Procedure,
3266 confidence: procedure.success_rate,
3267 relevance_score,
3268 source: KnowledgeSource::Learning,
3269 created_at: SystemTime::now(), });
3271 }
3272 }
3273
3274 for pattern in &context.knowledge_base.learned_patterns {
3276 let searchable_content = format!("{} {}", pattern.name, pattern.description);
3277 if let Some(relevance_score) = self.calculate_knowledge_relevance(
3278 &searchable_content,
3279 query,
3280 pattern.confidence,
3281 ) {
3282 results.push(KnowledgeItem {
3283 id: pattern.id,
3284 content: format!("Pattern: {}", pattern.description),
3285 knowledge_type: KnowledgeType::Pattern,
3286 confidence: pattern.confidence,
3287 relevance_score,
3288 source: KnowledgeSource::Learning,
3289 created_at: SystemTime::now(), });
3291 }
3292 }
3293
3294 results.sort_by(|a, b| {
3296 b.relevance_score
3297 .partial_cmp(&a.relevance_score)
3298 .unwrap_or(std::cmp::Ordering::Equal)
3299 });
3300
3301 results.truncate(limit);
3302 Ok(results)
3303 } else {
3304 Ok(Vec::new())
3305 }
3306 }
3307 }
3308
3309 async fn share_knowledge(
3310 &self,
3311 from_agent: AgentId,
3312 _to_agent: AgentId,
3313 knowledge_id: KnowledgeId,
3314 access_level: AccessLevel,
3315 ) -> Result<(), ContextError> {
3316 self.validate_access(from_agent, "share_knowledge").await?;
3317
3318 let contexts = self.contexts.read().await;
3320 if let Some(from_context) = contexts.get(&from_agent) {
3321 let knowledge = if let Some(fact) = from_context
3323 .knowledge_base
3324 .facts
3325 .iter()
3326 .find(|f| f.id == knowledge_id)
3327 {
3328 Some(Knowledge::Fact(fact.clone()))
3329 } else if let Some(procedure) = from_context
3330 .knowledge_base
3331 .procedures
3332 .iter()
3333 .find(|p| p.id == knowledge_id)
3334 {
3335 Some(Knowledge::Procedure(procedure.clone()))
3336 } else {
3337 from_context
3338 .knowledge_base
3339 .learned_patterns
3340 .iter()
3341 .find(|p| p.id == knowledge_id)
3342 .map(|pattern| Knowledge::Pattern(pattern.clone()))
3343 };
3344
3345 if let Some(knowledge) = knowledge {
3346 let shared_item = SharedKnowledgeItem {
3348 knowledge,
3349 source_agent: from_agent,
3350 access_level,
3351 created_at: SystemTime::now(),
3352 access_count: 0,
3353 };
3354
3355 let mut shared_knowledge = self.shared_knowledge.write().await;
3356 shared_knowledge.insert(knowledge_id, shared_item);
3357
3358 Ok(())
3359 } else {
3360 Err(ContextError::KnowledgeNotFound { id: knowledge_id })
3361 }
3362 } else {
3363 Err(ContextError::NotFound {
3364 id: ContextId::new(),
3365 })
3366 }
3367 }
3368
3369 async fn get_shared_knowledge(
3370 &self,
3371 agent_id: AgentId,
3372 ) -> Result<Vec<SharedKnowledgeRef>, ContextError> {
3373 self.validate_access(agent_id, "get_shared_knowledge")
3374 .await?;
3375
3376 let shared_knowledge = self.shared_knowledge.read().await;
3377 let mut results = Vec::new();
3378
3379 for (knowledge_id, shared_item) in shared_knowledge.iter() {
3380 match shared_item.access_level {
3382 AccessLevel::Public => {
3383 let trust_score = self.calculate_trust_score(shared_item);
3385
3386 tracing::debug!(
3387 "Shared knowledge {} accessed {} times, trust score: {}",
3388 knowledge_id,
3389 shared_item.access_count,
3390 trust_score
3391 );
3392
3393 results.push(SharedKnowledgeRef {
3394 knowledge_id: *knowledge_id,
3395 source_agent: shared_item.source_agent,
3396 shared_at: shared_item.created_at,
3397 access_level: shared_item.access_level.clone(),
3398 trust_score,
3399 });
3400 }
3401 _ => {
3402 }
3404 }
3405 }
3406
3407 Ok(results)
3408 }
3409
3410 async fn archive_context(
3411 &self,
3412 agent_id: AgentId,
3413 before: SystemTime,
3414 ) -> Result<u32, ContextError> {
3415 self.validate_access(agent_id, "archive_context").await?;
3416
3417 let mut total_archived = 0u32;
3418 let mut archived_context = ArchivedContext::new(agent_id, before);
3419
3420 let mut contexts = self.contexts.write().await;
3422 if let Some(context) = contexts.get_mut(&agent_id) {
3423 total_archived += self
3425 .archive_memory_items(context, before, &mut archived_context)
3426 .await?;
3427
3428 total_archived += self
3430 .archive_conversation_history(context, before, &mut archived_context)
3431 .await?;
3432
3433 total_archived += self
3435 .archive_knowledge_items(context, before, &mut archived_context)
3436 .await?;
3437
3438 if total_archived > 0 {
3440 self.save_archived_context(agent_id, &archived_context)
3441 .await?;
3442
3443 context.updated_at = SystemTime::now();
3445 context.metadata.insert(
3446 "last_archived".to_string(),
3447 SystemTime::now()
3448 .duration_since(SystemTime::UNIX_EPOCH)
3449 .unwrap_or_default()
3450 .as_secs()
3451 .to_string(),
3452 );
3453 context
3454 .metadata
3455 .insert("archived_count".to_string(), total_archived.to_string());
3456
3457 if self.config.enable_persistence {
3459 self.persistence.save_context(agent_id, context).await?;
3460 }
3461 }
3462
3463 tracing::info!(
3464 "Archived {} items for agent {} before timestamp {:?}",
3465 total_archived,
3466 agent_id,
3467 before
3468 );
3469 } else {
3470 tracing::warn!("No context found for agent {} during archiving", agent_id);
3471 }
3472
3473 Ok(total_archived)
3474 }
3475
3476 async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError> {
3477 self.validate_access(agent_id, "get_context_stats").await?;
3478
3479 let contexts = self.contexts.read().await;
3480 if let Some(context) = contexts.get(&agent_id) {
3481 let total_memory_items = context.memory.short_term.len()
3482 + context.memory.long_term.len()
3483 + context.memory.episodic_memory.len()
3484 + context.memory.semantic_memory.len();
3485
3486 let total_knowledge_items = context.knowledge_base.facts.len()
3487 + context.knowledge_base.procedures.len()
3488 + context.knowledge_base.learned_patterns.len();
3489
3490 let memory_size_bytes = self.calculate_memory_size_bytes(context);
3492
3493 let retention_stats = self.calculate_retention_statistics(context);
3495
3496 Ok(ContextStats {
3497 total_memory_items,
3498 total_knowledge_items,
3499 total_conversations: context.conversation_history.len(),
3500 total_episodes: context.memory.episodic_memory.len(),
3501 memory_size_bytes,
3502 last_activity: context.updated_at,
3503 retention_status: retention_stats,
3504 })
3505 } else {
3506 Err(ContextError::NotFound {
3507 id: ContextId::new(),
3508 })
3509 }
3510 }
3511
3512 async fn shutdown(&self) -> Result<(), ContextError> {
3513 StandardContextManager::shutdown(self).await
3515 }
3516}