1use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use tokio::fs;
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::RwLock;
13
14use super::embedding::create_embedding_service_from_env;
15use super::types::*;
16use super::vector_db::{EmbeddingService, NoOpVectorDatabase, QdrantConfig};
17use super::vector_db_factory::{create_vector_backend, resolve_vector_config, VectorBackendConfig};
18use super::vector_db_trait::VectorDb;
19use crate::integrations::policy_engine::{MockPolicyEngine, PolicyEngine};
20use crate::secrets::{SecretStore, SecretsConfig};
21use crate::types::AgentId;
22
23#[async_trait]
25pub trait ContextManager: Send + Sync {
26 async fn store_context(
28 &self,
29 agent_id: AgentId,
30 context: AgentContext,
31 ) -> Result<ContextId, ContextError>;
32
33 async fn retrieve_context(
35 &self,
36 agent_id: AgentId,
37 session_id: Option<SessionId>,
38 ) -> Result<Option<AgentContext>, ContextError>;
39
40 async fn query_context(
42 &self,
43 agent_id: AgentId,
44 query: ContextQuery,
45 ) -> Result<Vec<ContextItem>, ContextError>;
46
47 async fn update_memory(
49 &self,
50 agent_id: AgentId,
51 memory_updates: Vec<MemoryUpdate>,
52 ) -> Result<(), ContextError>;
53
54 async fn add_knowledge(
56 &self,
57 agent_id: AgentId,
58 knowledge: Knowledge,
59 ) -> Result<KnowledgeId, ContextError>;
60
61 async fn search_knowledge(
63 &self,
64 agent_id: AgentId,
65 query: &str,
66 limit: usize,
67 ) -> Result<Vec<KnowledgeItem>, ContextError>;
68
69 async fn share_knowledge(
71 &self,
72 from_agent: AgentId,
73 to_agent: AgentId,
74 knowledge_id: KnowledgeId,
75 access_level: AccessLevel,
76 ) -> Result<(), ContextError>;
77
78 async fn get_shared_knowledge(
80 &self,
81 agent_id: AgentId,
82 ) -> Result<Vec<SharedKnowledgeRef>, ContextError>;
83
84 async fn archive_context(
86 &self,
87 agent_id: AgentId,
88 before: SystemTime,
89 ) -> Result<u32, ContextError>;
90
91 async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError>;
93
94 async fn shutdown(&self) -> Result<(), ContextError>;
96}
97
98pub struct StandardContextManager {
105 contexts: Arc<RwLock<HashMap<AgentId, AgentContext>>>,
107 config: ContextManagerConfig,
109 shared_knowledge: Arc<RwLock<HashMap<KnowledgeId, SharedKnowledgeItem>>>,
111 vector_db: Arc<dyn VectorDb>,
113 embedding_service: Arc<dyn EmbeddingService>,
115 persistence: Arc<dyn ContextPersistence>,
117 secrets: Box<dyn SecretStore + Send + Sync>,
119 #[allow(dead_code)]
121 policy_engine: Arc<dyn PolicyEngine>,
122 shutdown_flag: Arc<RwLock<bool>>,
124 background_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
126}
127
128#[derive(Debug, Clone)]
130pub struct ContextManagerConfig {
131 pub max_contexts_in_memory: usize,
133 pub default_retention_policy: RetentionPolicy,
135 pub enable_auto_archiving: bool,
137 pub archiving_interval: std::time::Duration,
139 pub max_memory_items_per_agent: usize,
141 pub max_knowledge_items_per_agent: usize,
143 pub vector_backend: Option<VectorBackendConfig>,
145 pub qdrant_config: QdrantConfig,
147 pub enable_vector_db: bool,
149 pub persistence_config: FilePersistenceConfig,
151 pub enable_persistence: bool,
153 pub secrets_config: SecretsConfig,
155}
156
157impl Default for ContextManagerConfig {
158 fn default() -> Self {
159 use std::path::PathBuf;
160
161 Self {
162 max_contexts_in_memory: 1000,
163 default_retention_policy: RetentionPolicy::default(),
164 enable_auto_archiving: true,
165 archiving_interval: std::time::Duration::from_secs(3600), max_memory_items_per_agent: 10000,
167 max_knowledge_items_per_agent: 5000,
168 vector_backend: None,
169 qdrant_config: QdrantConfig::default(),
170 enable_vector_db: false,
171 persistence_config: FilePersistenceConfig::default(),
172 enable_persistence: true,
173 secrets_config: SecretsConfig::file_json(PathBuf::from("secrets.json")),
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
180struct ImportanceWeights {
181 pub base_importance: f32,
183 pub access_frequency: f32,
185 pub recency: f32,
187 pub user_feedback: f32,
189 pub no_access_penalty: f32,
191}
192
193impl Default for ImportanceWeights {
194 fn default() -> Self {
195 Self {
196 base_importance: 0.3,
197 access_frequency: 0.25,
198 recency: 0.3,
199 user_feedback: 0.15,
200 no_access_penalty: 0.1,
201 }
202 }
203}
204
205#[derive(Debug, Clone)]
207struct SharedKnowledgeItem {
208 knowledge: Knowledge,
209 source_agent: AgentId,
210 access_level: AccessLevel,
211 created_at: SystemTime,
212 access_count: u32,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217struct ArchivedContext {
218 agent_id: AgentId,
219 archived_at: SystemTime,
220 archive_reason: String,
221 memory: HierarchicalMemory,
222 conversation_history: Vec<ConversationItem>,
223 knowledge_base: KnowledgeBase,
224 metadata: HashMap<String, String>,
225}
226
227impl ArchivedContext {
228 fn new(agent_id: AgentId, before: SystemTime) -> Self {
229 Self {
230 agent_id,
231 archived_at: SystemTime::now(),
232 archive_reason: format!("Archiving items before {:?}", before),
233 memory: HierarchicalMemory::default(),
234 conversation_history: Vec::new(),
235 knowledge_base: KnowledgeBase::default(),
236 metadata: HashMap::new(),
237 }
238 }
239}
240
241pub struct FilePersistence {
243 config: FilePersistenceConfig,
244}
245
246impl FilePersistence {
247 pub fn new(config: FilePersistenceConfig) -> Self {
249 Self { config }
250 }
251
252 pub async fn initialize(&self) -> Result<(), ContextError> {
254 self.config
255 .ensure_directories()
256 .await
257 .map_err(|e| ContextError::StorageError {
258 reason: format!("Failed to create storage directories: {}", e),
259 })?;
260 Ok(())
261 }
262
263 fn get_context_path(&self, agent_id: AgentId) -> PathBuf {
265 let filename = if self.config.enable_compression {
266 format!("{}.json.gz", agent_id)
267 } else {
268 format!("{}.json", agent_id)
269 };
270 self.config.agent_contexts_path().join(filename)
271 }
272
273 async fn serialize_context(&self, context: &AgentContext) -> Result<Vec<u8>, ContextError> {
275 let json_data =
276 serde_json::to_vec_pretty(context).map_err(|e| ContextError::SerializationError {
277 reason: format!("Failed to serialize context: {}", e),
278 })?;
279
280 if self.config.enable_compression {
281 use flate2::write::GzEncoder;
282 use flate2::Compression;
283 use std::io::Write;
284
285 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
286 encoder
287 .write_all(&json_data)
288 .map_err(|e| ContextError::SerializationError {
289 reason: format!("Failed to compress context: {}", e),
290 })?;
291 encoder
292 .finish()
293 .map_err(|e| ContextError::SerializationError {
294 reason: format!("Failed to finalize compression: {}", e),
295 })
296 } else {
297 Ok(json_data)
298 }
299 }
300
301 async fn deserialize_context(&self, data: Vec<u8>) -> Result<AgentContext, ContextError> {
303 let json_data = if self.config.enable_compression {
304 use flate2::read::GzDecoder;
305 use std::io::Read;
306
307 let mut decoder = GzDecoder::new(&data[..]);
308 let mut decompressed = Vec::new();
309 decoder.read_to_end(&mut decompressed).map_err(|e| {
310 ContextError::SerializationError {
311 reason: format!("Failed to decompress context: {}", e),
312 }
313 })?;
314 decompressed
315 } else {
316 data
317 };
318
319 serde_json::from_slice(&json_data).map_err(|e| ContextError::SerializationError {
320 reason: format!("Failed to deserialize context: {}", e),
321 })
322 }
323
324 async fn create_backup(&self, agent_id: AgentId) -> Result<(), ContextError> {
326 let context_path = self.get_context_path(agent_id);
327 if !context_path.exists() {
328 return Ok(());
329 }
330
331 let backup_path = context_path.with_extension(format!(
332 "backup.{}.json",
333 SystemTime::now()
334 .duration_since(SystemTime::UNIX_EPOCH)
335 .unwrap_or_default()
336 .as_secs()
337 ));
338
339 fs::copy(&context_path, &backup_path)
340 .await
341 .map_err(|e| ContextError::StorageError {
342 reason: format!("Failed to create backup: {}", e),
343 })?;
344
345 self.cleanup_old_backups(agent_id).await?;
347 Ok(())
348 }
349
350 async fn cleanup_old_backups(&self, agent_id: AgentId) -> Result<(), ContextError> {
352 let mut backup_files = Vec::new();
353 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
354 .await
355 .map_err(|e| ContextError::StorageError {
356 reason: format!("Failed to read storage directory: {}", e),
357 })?;
358
359 while let Some(entry) = dir
360 .next_entry()
361 .await
362 .map_err(|e| ContextError::StorageError {
363 reason: format!("Failed to read directory entry: {}", e),
364 })?
365 {
366 let path = entry.path();
367 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
368 if filename.starts_with(&format!("{}.backup.", agent_id)) {
369 if let Ok(metadata) = entry.metadata().await {
370 if let Ok(modified) = metadata.modified() {
371 backup_files.push((path, modified));
372 }
373 }
374 }
375 }
376 }
377
378 backup_files.sort_by(|a, b| b.1.cmp(&a.1));
380
381 for (path, _) in backup_files.into_iter().skip(self.config.backup_count) {
383 if let Err(e) = fs::remove_file(&path).await {
384 eprintln!(
385 "Warning: Failed to remove old backup {}: {}",
386 path.display(),
387 e
388 );
389 }
390 }
391
392 Ok(())
393 }
394}
395
396#[async_trait]
397impl ContextPersistence for FilePersistence {
398 async fn save_context(
399 &self,
400 agent_id: AgentId,
401 context: &AgentContext,
402 ) -> Result<(), ContextError> {
403 self.create_backup(agent_id).await?;
405
406 let data = self.serialize_context(context).await?;
408
409 let context_path = self.get_context_path(agent_id);
411 let mut file =
412 fs::File::create(&context_path)
413 .await
414 .map_err(|e| ContextError::StorageError {
415 reason: format!("Failed to create context file: {}", e),
416 })?;
417
418 file.write_all(&data)
419 .await
420 .map_err(|e| ContextError::StorageError {
421 reason: format!("Failed to write context data: {}", e),
422 })?;
423
424 file.sync_all()
425 .await
426 .map_err(|e| ContextError::StorageError {
427 reason: format!("Failed to sync context file: {}", e),
428 })?;
429
430 Ok(())
431 }
432
433 async fn load_context(&self, agent_id: AgentId) -> Result<Option<AgentContext>, ContextError> {
434 let context_path = self.get_context_path(agent_id);
435
436 if !context_path.exists() {
437 return Ok(None);
438 }
439
440 let mut file =
441 fs::File::open(&context_path)
442 .await
443 .map_err(|e| ContextError::StorageError {
444 reason: format!("Failed to open context file: {}", e),
445 })?;
446
447 let mut data = Vec::new();
448 file.read_to_end(&mut data)
449 .await
450 .map_err(|e| ContextError::StorageError {
451 reason: format!("Failed to read context file: {}", e),
452 })?;
453
454 let context = self.deserialize_context(data).await?;
455 Ok(Some(context))
456 }
457
458 async fn delete_context(&self, agent_id: AgentId) -> Result<(), ContextError> {
459 let context_path = self.get_context_path(agent_id);
460
461 if context_path.exists() {
462 fs::remove_file(&context_path)
463 .await
464 .map_err(|e| ContextError::StorageError {
465 reason: format!("Failed to delete context file: {}", e),
466 })?;
467 }
468
469 Ok(())
470 }
471
472 async fn list_agent_contexts(&self) -> Result<Vec<AgentId>, ContextError> {
473 let mut agent_ids = Vec::new();
474 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
475 .await
476 .map_err(|e| ContextError::StorageError {
477 reason: format!("Failed to read storage directory: {}", e),
478 })?;
479
480 while let Some(entry) = dir
481 .next_entry()
482 .await
483 .map_err(|e| ContextError::StorageError {
484 reason: format!("Failed to read directory entry: {}", e),
485 })?
486 {
487 let path = entry.path();
488 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
489 if filename.ends_with(".json") || filename.ends_with(".json.gz") {
490 let agent_id_str = filename
491 .strip_suffix(".json.gz")
492 .or_else(|| filename.strip_suffix(".json"))
493 .unwrap_or(filename);
494
495 if let Ok(uuid) = uuid::Uuid::parse_str(agent_id_str) {
496 agent_ids.push(AgentId(uuid));
497 }
498 }
499 }
500 }
501
502 Ok(agent_ids)
503 }
504
505 async fn context_exists(&self, agent_id: AgentId) -> Result<bool, ContextError> {
506 let context_path = self.get_context_path(agent_id);
507 Ok(context_path.exists())
508 }
509
510 async fn get_storage_stats(&self) -> Result<StorageStats, ContextError> {
511 let mut total_contexts = 0;
512 let mut total_size_bytes = 0;
513
514 let mut dir = fs::read_dir(&self.config.agent_contexts_path())
515 .await
516 .map_err(|e| ContextError::StorageError {
517 reason: format!("Failed to read storage directory: {}", e),
518 })?;
519
520 while let Some(entry) = dir
521 .next_entry()
522 .await
523 .map_err(|e| ContextError::StorageError {
524 reason: format!("Failed to read directory entry: {}", e),
525 })?
526 {
527 let path = entry.path();
528 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
529 if filename.ends_with(".json") || filename.ends_with(".json.gz") {
530 total_contexts += 1;
531 if let Ok(metadata) = entry.metadata().await {
532 total_size_bytes += metadata.len();
533 }
534 }
535 }
536 }
537
538 Ok(StorageStats {
539 total_contexts,
540 total_size_bytes,
541 last_cleanup: SystemTime::now(),
542 storage_path: self.config.agent_contexts_path(),
543 })
544 }
545
546 fn as_any(&self) -> &dyn std::any::Any {
547 self
548 }
549}
550
551impl StandardContextManager {
552 pub async fn new(config: ContextManagerConfig, agent_id: &str) -> Result<Self, ContextError> {
554 let vector_db: Arc<dyn VectorDb> = if config.enable_vector_db {
555 let db = if let Some(ref backend_config) = config.vector_backend {
556 create_vector_backend(backend_config.clone())
557 .await
558 .unwrap_or_else(|e| {
559 tracing::warn!("Failed to create vector backend: {}, using NoOp", e);
560 Arc::new(NoOpVectorDatabase)
561 })
562 } else {
563 let resolved = resolve_vector_config();
564 create_vector_backend(resolved).await.unwrap_or_else(|e| {
565 tracing::warn!("Failed to create vector backend: {}, using NoOp", e);
566 Arc::new(NoOpVectorDatabase)
567 })
568 };
569 if let Err(e) = db.initialize().await {
571 tracing::warn!("Failed to initialize vector DB: {}, queries may fail", e);
572 }
573 db
574 } else {
575 Arc::new(NoOpVectorDatabase)
576 };
577
578 let embedding_service =
579 create_embedding_service_from_env(config.qdrant_config.vector_dimension)?;
580
581 let persistence: Arc<dyn ContextPersistence> = if config.enable_persistence {
582 Arc::new(FilePersistence::new(config.persistence_config.clone()))
583 } else {
584 Arc::new(FilePersistence::new(config.persistence_config.clone()))
586 };
587
588 let secrets = crate::secrets::new_secret_store(&config.secrets_config, agent_id)
590 .await
591 .map_err(|e| ContextError::StorageError {
592 reason: format!("Failed to initialize secrets store: {}", e),
593 })?;
594
595 let policy_engine: Arc<dyn PolicyEngine> = Arc::new(MockPolicyEngine::new());
597
598 Ok(Self {
599 contexts: Arc::new(RwLock::new(HashMap::new())),
600 config,
601 shared_knowledge: Arc::new(RwLock::new(HashMap::new())),
602 vector_db,
603 embedding_service,
604 persistence,
605 secrets,
606 policy_engine,
607 shutdown_flag: Arc::new(RwLock::new(false)),
608 background_tasks: Arc::new(RwLock::new(Vec::new())),
609 })
610 }
611
612 pub fn secrets(&self) -> &(dyn SecretStore + Send + Sync) {
614 self.secrets.as_ref()
615 }
616
617 pub async fn check_and_compact(
622 &self,
623 agent_id: &AgentId,
624 session_id: &SessionId,
625 config: &super::compaction::CompactionConfig,
626 counter: &dyn super::token_counter::TokenCounter,
627 ) -> Result<Option<super::compaction::CompactionResult>, ContextError> {
628 use super::compaction::{select_tier, truncate_items, CompactionTier};
629
630 if !config.enabled {
631 return Ok(None);
632 }
633
634 let context = match self.retrieve_context(*agent_id, Some(*session_id)).await? {
636 Some(ctx) => ctx,
637 None => return Ok(None),
638 };
639
640 let current_tokens = counter.count_messages(&context.conversation_history);
642 let limit = counter.model_context_limit();
643
644 if limit == 0 {
645 return Ok(None);
646 }
647
648 let usage_ratio = current_tokens as f32 / limit as f32;
649
650 let tier = match select_tier(usage_ratio, config) {
651 Some(t) => t,
652 None => return Ok(None),
653 };
654
655 let start = std::time::Instant::now();
656
657 match tier {
658 CompactionTier::Truncate => {
659 let (new_items, affected) = truncate_items(
660 &context.conversation_history,
661 config,
662 config.summarize_threshold,
663 );
664
665 if affected == 0 {
666 return Ok(None);
667 }
668
669 let tokens_after = counter.count_messages(&new_items);
670
671 let mut updated = context.clone();
672 updated.conversation_history = new_items;
673 self.store_context(*agent_id, updated).await?;
674
675 Ok(Some(super::compaction::CompactionResult {
676 tier_applied: CompactionTier::Truncate,
677 tokens_before: current_tokens,
678 tokens_after,
679 tokens_saved: current_tokens.saturating_sub(tokens_after),
680 items_affected: affected,
681 duration_ms: start.elapsed().as_millis() as u64,
682 summary_generated: None,
683 }))
684 }
685 CompactionTier::Summarize => {
686 tracing::info!(
689 agent = %agent_id,
690 "compaction: Summarize tier selected but no LLM client in context manager, falling back to truncate"
691 );
692 let (new_items, affected) = truncate_items(
693 &context.conversation_history,
694 config,
695 config.summarize_threshold,
696 );
697
698 if affected == 0 {
699 return Ok(None);
700 }
701
702 let tokens_after = counter.count_messages(&new_items);
703 let mut updated = context.clone();
704 updated.conversation_history = new_items;
705 self.store_context(*agent_id, updated).await?;
706
707 Ok(Some(super::compaction::CompactionResult {
708 tier_applied: CompactionTier::Truncate, tokens_before: current_tokens,
710 tokens_after,
711 tokens_saved: current_tokens.saturating_sub(tokens_after),
712 items_affected: affected,
713 duration_ms: start.elapsed().as_millis() as u64,
714 summary_generated: None,
715 }))
716 }
717 CompactionTier::CompressEpisodic | CompactionTier::ArchiveToMemory => {
718 Ok(None)
720 }
721 }
722 }
723
724 pub async fn initialize(&self) -> Result<(), ContextError> {
726 if self.config.enable_vector_db {
728 self.vector_db.initialize().await?;
729 }
730
731 if self.config.enable_persistence {
733 if let Some(file_persistence) =
734 self.persistence.as_any().downcast_ref::<FilePersistence>()
735 {
736 file_persistence.initialize().await?;
737 }
738
739 self.load_existing_contexts().await?;
741 }
742
743 self.setup_retention_scheduler().await?;
745
746 Ok(())
747 }
748
749 async fn load_existing_contexts(&self) -> Result<(), ContextError> {
751 let agent_ids = self.persistence.list_agent_contexts().await?;
752 let mut contexts = self.contexts.write().await;
753
754 for agent_id in agent_ids {
755 if let Some(context) = self.persistence.load_context(agent_id).await? {
756 contexts.insert(agent_id, context);
757 }
758 }
759
760 Ok(())
761 }
762
763 pub async fn shutdown(&self) -> Result<(), ContextError> {
765 {
767 let shutdown_flag = self.shutdown_flag.read().await;
768 if *shutdown_flag {
769 tracing::info!("ContextManager already shutdown, skipping");
770 return Ok(());
771 }
772 }
773
774 tracing::info!("Starting ContextManager shutdown sequence");
775
776 {
778 let mut shutdown_flag = self.shutdown_flag.write().await;
779 *shutdown_flag = true;
780 }
781
782 self.stop_background_tasks().await?;
784
785 self.save_all_contexts().await?;
787
788 tracing::info!("Vector database connections will be closed when client is dropped");
792
793 tracing::info!("Secrets store cleanup handled by Drop trait");
796
797 tracing::info!("ContextManager shutdown completed successfully");
798 Ok(())
799 }
800
801 async fn stop_background_tasks(&self) -> Result<(), ContextError> {
803 let mut tasks = self.background_tasks.write().await;
804
805 if tasks.is_empty() {
806 tracing::debug!("No background tasks to stop");
807 return Ok(());
808 }
809
810 tracing::info!("Stopping {} background tasks", tasks.len());
811
812 for task in tasks.drain(..) {
814 task.abort();
815
816 match tokio::time::timeout(std::time::Duration::from_secs(5), task).await {
818 Ok(result) => match result {
819 Ok(_) => tracing::debug!("Background task completed successfully"),
820 Err(e) if e.is_cancelled() => tracing::debug!("Background task was cancelled"),
821 Err(e) => tracing::warn!("Background task finished with error: {}", e),
822 },
823 Err(_) => tracing::warn!("Background task did not finish within timeout"),
824 }
825 }
826
827 tracing::info!("All background tasks stopped");
828 Ok(())
829 }
830
831 async fn save_all_contexts(&self) -> Result<(), ContextError> {
833 if !self.config.enable_persistence {
834 tracing::debug!("Persistence disabled, skipping context save");
835 return Ok(());
836 }
837
838 let contexts = self.contexts.read().await;
839
840 if contexts.is_empty() {
841 tracing::debug!("No contexts to save");
842 return Ok(());
843 }
844
845 tracing::info!("Saving {} contexts to persistent storage", contexts.len());
846
847 let mut save_errors = Vec::new();
848
849 for (agent_id, context) in contexts.iter() {
850 match self.persistence.save_context(*agent_id, context).await {
851 Ok(_) => tracing::debug!("Saved context for agent {}", agent_id),
852 Err(e) => {
853 tracing::error!("Failed to save context for agent {}: {}", agent_id, e);
854 save_errors.push((*agent_id, e));
855 }
856 }
857 }
858
859 if !save_errors.is_empty() {
860 let error_msg = format!(
861 "Failed to save {} out of {} contexts during shutdown",
862 save_errors.len(),
863 contexts.len()
864 );
865 tracing::error!("{}", error_msg);
866
867 if let Some((agent_id, error)) = save_errors.into_iter().next() {
869 return Err(ContextError::StorageError {
870 reason: format!("Failed to save context for agent {}: {}", agent_id, error),
871 });
872 }
873 }
874
875 tracing::info!("All contexts saved successfully");
876 Ok(())
877 }
878
879 async fn setup_retention_scheduler(&self) -> Result<(), ContextError> {
881 if !self.config.enable_auto_archiving {
882 tracing::debug!("Auto-archiving disabled, skipping retention scheduler setup");
883 return Ok(());
884 }
885
886 let contexts = self.contexts.clone();
887 let persistence = self.persistence.clone();
888 let config = self.config.clone();
889 let shutdown_flag = self.shutdown_flag.clone();
890
891 let task = tokio::spawn(async move {
892 let mut interval = tokio::time::interval(config.archiving_interval);
893
894 tracing::info!(
895 "Retention policy scheduler started with interval {:?}",
896 config.archiving_interval
897 );
898
899 loop {
900 tokio::select! {
901 _ = interval.tick() => {
902 if *shutdown_flag.read().await {
904 tracing::info!("Retention scheduler shutting down");
905 break;
906 }
907
908 Self::run_retention_check(&contexts, &persistence, &config).await;
910 }
911 }
912 }
913 });
914
915 self.add_background_task(task).await;
916 tracing::info!("Retention policy scheduler initialized successfully");
917 Ok(())
918 }
919
920 async fn run_retention_check(
922 contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
923 persistence: &Arc<dyn ContextPersistence>,
924 config: &ContextManagerConfig,
925 ) {
926 let current_time = SystemTime::now();
927
928 let agents_to_check: Vec<(AgentId, usize)> = {
930 let context_guard = contexts.read().await;
931 let agent_count = context_guard.len();
932
933 if agent_count == 0 {
934 tracing::debug!("No agent contexts to process for retention");
935 return;
936 }
937
938 tracing::info!(
939 "Starting retention check for {} agent contexts",
940 agent_count
941 );
942
943 context_guard
944 .iter()
945 .map(|(agent_id, context)| {
946 let retention_stats = Self::calculate_retention_statistics_static(context);
947 (*agent_id, retention_stats.items_to_archive)
948 })
949 .collect()
950 };
951
952 let start_time = std::time::Instant::now();
953 let total_agents = agents_to_check.len();
954
955 for (agent_id, items_to_archive) in agents_to_check {
957 if items_to_archive > 0 {
958 tracing::debug!(
959 "Agent {} has {} items eligible for archiving",
960 agent_id,
961 items_to_archive
962 );
963
964 let archive_result = Self::archive_agent_context_static(
966 agent_id,
967 current_time,
968 contexts,
969 persistence,
970 config,
971 )
972 .await;
973
974 match archive_result {
975 Ok(archived_count) => {
976 tracing::info!(
977 "Successfully archived {} items for agent {}",
978 archived_count,
979 agent_id
980 );
981 }
982 Err(e) => {
983 tracing::error!("Failed to archive context for agent {}: {}", agent_id, e);
984 }
985 }
986 }
987 }
988
989 let elapsed = start_time.elapsed();
990 tracing::info!(
991 "Retention check completed for {} agents in {:?}",
992 total_agents,
993 elapsed
994 );
995 }
996
997 fn calculate_retention_statistics_static(context: &AgentContext) -> RetentionStatus {
999 let now = SystemTime::now();
1000 let retention_policy = &context.retention_policy;
1001
1002 let mut items_to_archive = 0;
1003 let items_to_delete = 0;
1004
1005 let memory_cutoff = now
1007 .checked_sub(retention_policy.memory_retention)
1008 .unwrap_or(SystemTime::UNIX_EPOCH);
1009
1010 let knowledge_cutoff = now
1011 .checked_sub(retention_policy.knowledge_retention)
1012 .unwrap_or(SystemTime::UNIX_EPOCH);
1013
1014 let conversation_cutoff = now
1015 .checked_sub(retention_policy.session_retention)
1016 .unwrap_or(SystemTime::UNIX_EPOCH);
1017
1018 for item in &context.memory.short_term {
1020 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
1021 items_to_archive += 1;
1022 }
1023 }
1024
1025 for item in &context.memory.long_term {
1026 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
1027 items_to_archive += 1;
1028 }
1029 }
1030
1031 for item in &context.conversation_history {
1033 if item.timestamp < conversation_cutoff {
1034 items_to_archive += 1;
1035 }
1036 }
1037
1038 for fact in &context.knowledge_base.facts {
1040 if fact.created_at < knowledge_cutoff {
1041 items_to_archive += 1;
1042 }
1043 }
1044
1045 let next_cleanup = now + Duration::from_secs(86400); RetentionStatus {
1049 items_to_archive,
1050 items_to_delete,
1051 next_cleanup,
1052 }
1053 }
1054
1055 async fn archive_agent_context_static(
1057 agent_id: AgentId,
1058 before: SystemTime,
1059 contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
1060 persistence: &Arc<dyn ContextPersistence>,
1061 config: &ContextManagerConfig,
1062 ) -> Result<u32, ContextError> {
1063 let mut total_archived = 0u32;
1064 let mut archived_context = ArchivedContext::new(agent_id, before);
1065
1066 let mut contexts_guard = contexts.write().await;
1068 if let Some(context) = contexts_guard.get_mut(&agent_id) {
1069 let retention_policy = &context.retention_policy;
1071
1072 let memory_cutoff = before
1074 .checked_sub(retention_policy.memory_retention)
1075 .unwrap_or(SystemTime::UNIX_EPOCH);
1076
1077 let conversation_cutoff = before
1078 .checked_sub(retention_policy.session_retention)
1079 .unwrap_or(SystemTime::UNIX_EPOCH);
1080
1081 let knowledge_cutoff = before
1082 .checked_sub(retention_policy.knowledge_retention)
1083 .unwrap_or(SystemTime::UNIX_EPOCH);
1084
1085 let mut retained_short_term = Vec::new();
1087 for item in context.memory.short_term.drain(..) {
1088 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
1089 archived_context.memory.short_term.push(item);
1090 total_archived += 1;
1091 } else {
1092 retained_short_term.push(item);
1093 }
1094 }
1095 context.memory.short_term = retained_short_term;
1096
1097 let mut retained_long_term = Vec::new();
1099 for item in context.memory.long_term.drain(..) {
1100 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
1101 archived_context.memory.long_term.push(item);
1102 total_archived += 1;
1103 } else {
1104 retained_long_term.push(item);
1105 }
1106 }
1107 context.memory.long_term = retained_long_term;
1108
1109 let mut retained_conversations = Vec::new();
1111 for item in context.conversation_history.drain(..) {
1112 if item.timestamp < conversation_cutoff {
1113 archived_context.conversation_history.push(item);
1114 total_archived += 1;
1115 } else {
1116 retained_conversations.push(item);
1117 }
1118 }
1119 context.conversation_history = retained_conversations;
1120
1121 let mut retained_facts = Vec::new();
1123 for fact in context.knowledge_base.facts.drain(..) {
1124 if fact.created_at < knowledge_cutoff {
1125 archived_context.knowledge_base.facts.push(fact);
1126 total_archived += 1;
1127 } else {
1128 retained_facts.push(fact);
1129 }
1130 }
1131 context.knowledge_base.facts = retained_facts;
1132
1133 if total_archived > 0 {
1135 context.updated_at = SystemTime::now();
1136 context.metadata.insert(
1137 "last_archived".to_string(),
1138 SystemTime::now()
1139 .duration_since(SystemTime::UNIX_EPOCH)
1140 .unwrap_or_default()
1141 .as_secs()
1142 .to_string(),
1143 );
1144 context
1145 .metadata
1146 .insert("archived_count".to_string(), total_archived.to_string());
1147
1148 if config.enable_persistence {
1150 Self::save_archived_context_static(
1151 agent_id,
1152 &archived_context,
1153 persistence,
1154 config,
1155 )
1156 .await?;
1157 persistence.save_context(agent_id, context).await?;
1159 }
1160 }
1161 }
1162
1163 Ok(total_archived)
1164 }
1165
1166 async fn save_archived_context_static(
1168 agent_id: AgentId,
1169 archived_context: &ArchivedContext,
1170 _persistence: &Arc<dyn ContextPersistence>,
1171 config: &ContextManagerConfig,
1172 ) -> Result<(), ContextError> {
1173 let archive_dir = config
1174 .persistence_config
1175 .agent_contexts_path()
1176 .join("archives")
1177 .join(agent_id.to_string());
1178
1179 tokio::fs::create_dir_all(&archive_dir)
1181 .await
1182 .map_err(|e| ContextError::StorageError {
1183 reason: format!("Failed to create archive directory: {}", e),
1184 })?;
1185
1186 let timestamp = archived_context
1188 .archived_at
1189 .duration_since(SystemTime::UNIX_EPOCH)
1190 .unwrap_or_default()
1191 .as_secs();
1192 let archive_filename = format!("archive_{}.json", timestamp);
1193 let archive_path = archive_dir.join(archive_filename);
1194
1195 let archive_data = serde_json::to_vec_pretty(archived_context).map_err(|e| {
1197 ContextError::SerializationError {
1198 reason: format!("Failed to serialize archived context: {}", e),
1199 }
1200 })?;
1201
1202 tokio::fs::write(&archive_path, &archive_data)
1204 .await
1205 .map_err(|e| ContextError::StorageError {
1206 reason: format!("Failed to write archive file: {}", e),
1207 })?;
1208
1209 tracing::debug!(
1210 "Saved archived context for agent {} to {}",
1211 agent_id,
1212 archive_path.display()
1213 );
1214
1215 Ok(())
1216 }
1217
1218 pub async fn add_background_task(&self, task: tokio::task::JoinHandle<()>) {
1220 let mut tasks = self.background_tasks.write().await;
1221 tasks.push(task);
1222 }
1223
1224 pub async fn is_shutdown(&self) -> bool {
1226 let shutdown_flag = self.shutdown_flag.read().await;
1227 *shutdown_flag
1228 }
1229
1230 pub async fn create_session(&self, agent_id: AgentId) -> Result<SessionId, ContextError> {
1232 let session_id = SessionId::new();
1233
1234 let context = AgentContext {
1236 agent_id,
1237 session_id,
1238 memory: HierarchicalMemory::default(),
1239 knowledge_base: KnowledgeBase::default(),
1240 conversation_history: Vec::new(),
1241 metadata: HashMap::new(),
1242 created_at: SystemTime::now(),
1243 updated_at: SystemTime::now(),
1244 retention_policy: self.config.default_retention_policy.clone(),
1245 };
1246
1247 ContextManager::store_context(self, agent_id, context).await?;
1248 Ok(session_id)
1249 }
1250
1251 async fn validate_access(
1253 &self,
1254 agent_id: AgentId,
1255 operation: &str,
1256 ) -> Result<(), ContextError> {
1257 let is_dangerous_operation = matches!(operation, "archive_context");
1263
1264 if is_dangerous_operation {
1265 tracing::warn!(
1266 "Potentially dangerous operation {} denied for agent {} by default policy",
1267 operation,
1268 agent_id
1269 );
1270 Err(ContextError::AccessDenied {
1271 reason: format!("Operation {} requires explicit approval", operation),
1272 })
1273 } else {
1274 tracing::debug!("Policy engine allowed {} for agent {}", operation, agent_id);
1275 Ok(())
1276 }
1277 }
1278
1279 async fn generate_embeddings(&self, content: &str) -> Result<Vec<f32>, ContextError> {
1281 self.embedding_service.generate_embedding(content).await
1282 }
1283
1284 async fn semantic_search_memory(
1286 &self,
1287 agent_id: AgentId,
1288 query: &str,
1289 limit: usize,
1290 ) -> Result<Vec<ContextItem>, ContextError> {
1291 if self.config.enable_vector_db {
1292 let query_embedding = self.generate_embeddings(query).await?;
1294
1295 let threshold = 0.7; self.vector_db
1298 .semantic_search(agent_id, query_embedding, limit, threshold)
1299 .await
1300 } else {
1301 let contexts = self.contexts.read().await;
1303 if let Some(context) = contexts.get(&agent_id) {
1304 let mut results = Vec::new();
1305
1306 for memory_item in &context.memory.short_term {
1307 if memory_item
1308 .content
1309 .to_lowercase()
1310 .contains(&query.to_lowercase())
1311 {
1312 let importance_score = self.calculate_importance(memory_item);
1314 let relevance_score = (importance_score + 0.8) / 2.0; results.push(ContextItem {
1317 id: memory_item.id,
1318 content: memory_item.content.clone(),
1319 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1320 relevance_score,
1321 timestamp: memory_item.created_at,
1322 metadata: memory_item.metadata.clone(),
1323 });
1324 }
1325 }
1326
1327 results.truncate(limit);
1328 Ok(results)
1329 } else {
1330 Ok(Vec::new())
1331 }
1332 }
1333 }
1334
1335 fn calculate_importance(&self, memory_item: &MemoryItem) -> f32 {
1347 let weights = ImportanceWeights::default();
1349
1350 let base_score = memory_item.importance.clamp(0.0, 1.0);
1352
1353 let access_score = if memory_item.access_count == 0 {
1355 weights.no_access_penalty
1356 } else {
1357 let log_access = (memory_item.access_count as f32 + 1.0).ln();
1358 (log_access / 10.0).min(1.0) };
1360
1361 let recency_score = self.calculate_recency_factor(memory_item);
1363
1364 let feedback_score = self.extract_user_feedback_score(memory_item);
1366
1367 let type_multiplier = self.get_memory_type_multiplier(&memory_item.memory_type);
1369
1370 let age_decay = self.calculate_age_decay(memory_item);
1372
1373 let combined_score = (base_score * weights.base_importance
1375 + access_score * weights.access_frequency
1376 + recency_score * weights.recency
1377 + feedback_score * weights.user_feedback)
1378 * type_multiplier
1379 * age_decay;
1380
1381 combined_score.clamp(0.0, 1.0)
1383 }
1384
1385 fn calculate_recency_factor(&self, memory_item: &MemoryItem) -> f32 {
1387 let now = SystemTime::now();
1388
1389 let most_recent = memory_item.last_accessed.max(memory_item.created_at);
1391
1392 let time_since_access = now
1393 .duration_since(most_recent)
1394 .unwrap_or_else(|_| std::time::Duration::from_secs(0));
1395
1396 let hours_since_access = time_since_access.as_secs() as f32 / 3600.0;
1397
1398 let half_life_hours = 24.0; let decay_factor = 2.0_f32.powf(-hours_since_access / half_life_hours);
1401
1402 decay_factor.max(0.01)
1404 }
1405
1406 fn extract_user_feedback_score(&self, memory_item: &MemoryItem) -> f32 {
1408 let mut feedback_score = 0.5; if let Some(rating_str) = memory_item.metadata.get("user_rating") {
1412 if let Ok(rating) = rating_str.parse::<f32>() {
1413 feedback_score = (rating / 5.0).clamp(0.0, 1.0); }
1415 }
1416
1417 if let Some(helpful_str) = memory_item.metadata.get("helpful") {
1419 match helpful_str.to_lowercase().as_str() {
1420 "true" | "yes" | "1" => feedback_score = feedback_score.max(0.8),
1421 "false" | "no" | "0" => feedback_score = feedback_score.min(0.2),
1422 _ => {}
1423 }
1424 }
1425
1426 if memory_item.metadata.contains_key("corrected")
1428 || memory_item.metadata.contains_key("incorrect")
1429 {
1430 feedback_score = feedback_score.min(0.3);
1431 }
1432
1433 if memory_item.metadata.contains_key("bookmarked")
1435 || memory_item.metadata.contains_key("favorite")
1436 {
1437 feedback_score = feedback_score.max(0.9);
1438 }
1439
1440 if let Some(context) = memory_item.metadata.get("usage_context") {
1442 match context.to_lowercase().as_str() {
1443 "critical" | "important" => feedback_score = feedback_score.max(0.95),
1444 "routine" | "common" => feedback_score = feedback_score.max(0.7),
1445 "experimental" | "trial" => feedback_score = feedback_score.min(0.4),
1446 _ => {}
1447 }
1448 }
1449
1450 feedback_score.clamp(0.0, 1.0)
1451 }
1452
1453 fn get_memory_type_multiplier(&self, memory_type: &MemoryType) -> f32 {
1455 match memory_type {
1456 MemoryType::Factual => 1.0, MemoryType::Procedural => 1.2, MemoryType::Episodic => 0.9, MemoryType::Semantic => 1.1, MemoryType::Working => 1.3, }
1462 }
1463
1464 fn calculate_age_decay(&self, memory_item: &MemoryItem) -> f32 {
1466 let now = SystemTime::now();
1467 let age = now
1468 .duration_since(memory_item.created_at)
1469 .unwrap_or_else(|_| std::time::Duration::from_secs(0));
1470
1471 let days_old = age.as_secs() as f32 / 86400.0; let decay_rate = match memory_item.memory_type {
1475 MemoryType::Working => 0.1, MemoryType::Factual => 0.01, MemoryType::Procedural => 0.005, MemoryType::Episodic => 0.02, MemoryType::Semantic => 0.008, };
1481
1482 let decay_factor = (-decay_rate * days_old).exp();
1484
1485 decay_factor.max(0.05)
1487 }
1488
1489 async fn keyword_search_memory(
1491 &self,
1492 agent_id: AgentId,
1493 query: &ContextQuery,
1494 ) -> Result<Vec<ContextItem>, ContextError> {
1495 let contexts = self.contexts.read().await;
1496 if let Some(context) = contexts.get(&agent_id) {
1497 let mut results = Vec::new();
1498
1499 let search_terms: Vec<String> = query
1501 .search_terms
1502 .iter()
1503 .map(|term| term.to_lowercase())
1504 .collect();
1505
1506 if search_terms.is_empty() {
1507 return Ok(results);
1508 }
1509
1510 for memory_item in context
1512 .memory
1513 .short_term
1514 .iter()
1515 .chain(context.memory.long_term.iter())
1516 {
1517 if !query.memory_types.is_empty()
1519 && !query.memory_types.contains(&memory_item.memory_type)
1520 {
1521 continue;
1522 }
1523
1524 let content_lower = memory_item.content.to_lowercase();
1525 let mut match_score = 0.0f32;
1526 let mut matched_terms = 0;
1527
1528 for term in &search_terms {
1530 if content_lower.contains(term) {
1531 matched_terms += 1;
1532 if content_lower.split_whitespace().any(|word| word == term) {
1534 match_score += 1.0;
1535 } else {
1536 match_score += 0.5;
1537 }
1538 }
1539 }
1540
1541 if matched_terms > 0 {
1542 let importance_score = self.calculate_importance(memory_item);
1544 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1545 let relevance_score = (match_score * term_coverage + importance_score) / 2.0;
1546
1547 if relevance_score >= query.relevance_threshold {
1548 results.push(ContextItem {
1549 id: memory_item.id,
1550 content: memory_item.content.clone(),
1551 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1552 relevance_score,
1553 timestamp: memory_item.created_at,
1554 metadata: memory_item.metadata.clone(),
1555 });
1556 }
1557 }
1558 }
1559
1560 for episode in &context.memory.episodic_memory {
1562 let episode_content = format!("{} {}", episode.title, episode.description);
1563 let content_lower = episode_content.to_lowercase();
1564 let mut match_score = 0.0f32;
1565 let mut matched_terms = 0;
1566
1567 for term in &search_terms {
1568 if content_lower.contains(term) {
1569 matched_terms += 1;
1570 match_score += if content_lower.split_whitespace().any(|word| word == term)
1571 {
1572 1.0
1573 } else {
1574 0.5
1575 };
1576 }
1577 }
1578
1579 if matched_terms > 0 {
1580 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1581 let relevance_score = (match_score * term_coverage + episode.importance) / 2.0;
1582
1583 if relevance_score >= query.relevance_threshold {
1584 results.push(ContextItem {
1585 id: episode.id,
1586 content: episode_content,
1587 item_type: ContextItemType::Episode,
1588 relevance_score,
1589 timestamp: episode.timestamp,
1590 metadata: HashMap::new(),
1591 });
1592 }
1593 }
1594 }
1595
1596 for conv_item in &context.conversation_history {
1598 let content_lower = conv_item.content.to_lowercase();
1599 let mut match_score = 0.0f32;
1600 let mut matched_terms = 0;
1601
1602 for term in &search_terms {
1603 if content_lower.contains(term) {
1604 matched_terms += 1;
1605 match_score += if content_lower.split_whitespace().any(|word| word == term)
1606 {
1607 1.0
1608 } else {
1609 0.5
1610 };
1611 }
1612 }
1613
1614 if matched_terms > 0 {
1615 let term_coverage = matched_terms as f32 / search_terms.len() as f32;
1616 let relevance_score = match_score * term_coverage;
1617
1618 if relevance_score >= query.relevance_threshold {
1619 results.push(ContextItem {
1620 id: conv_item.id,
1621 content: conv_item.content.clone(),
1622 item_type: ContextItemType::Conversation,
1623 relevance_score,
1624 timestamp: conv_item.timestamp,
1625 metadata: HashMap::new(),
1626 });
1627 }
1628 }
1629 }
1630
1631 results.sort_by(|a, b| {
1633 b.relevance_score
1634 .partial_cmp(&a.relevance_score)
1635 .unwrap_or(std::cmp::Ordering::Equal)
1636 });
1637 results.truncate(query.max_results);
1638
1639 Ok(results)
1640 } else {
1641 Ok(Vec::new())
1642 }
1643 }
1644
1645 async fn temporal_search_memory(
1647 &self,
1648 agent_id: AgentId,
1649 query: &ContextQuery,
1650 ) -> Result<Vec<ContextItem>, ContextError> {
1651 let time_range = match &query.time_range {
1652 Some(range) => range,
1653 None => return Ok(Vec::new()), };
1655
1656 let contexts = self.contexts.read().await;
1657 if let Some(context) = contexts.get(&agent_id) {
1658 let mut results = Vec::new();
1659
1660 for memory_item in context
1662 .memory
1663 .short_term
1664 .iter()
1665 .chain(context.memory.long_term.iter())
1666 {
1667 if !query.memory_types.is_empty()
1669 && !query.memory_types.contains(&memory_item.memory_type)
1670 {
1671 continue;
1672 }
1673
1674 if memory_item.created_at >= time_range.start
1676 && memory_item.created_at <= time_range.end
1677 {
1678 let passes_keyword_filter = if query.search_terms.is_empty() {
1680 true
1681 } else {
1682 let content_lower = memory_item.content.to_lowercase();
1683 query
1684 .search_terms
1685 .iter()
1686 .any(|term| content_lower.contains(&term.to_lowercase()))
1687 };
1688
1689 if passes_keyword_filter {
1690 let importance_score = self.calculate_importance(memory_item);
1691 let recency_score = self.calculate_recency_factor(memory_item);
1692 let relevance_score = (importance_score + recency_score) / 2.0;
1693
1694 if relevance_score >= query.relevance_threshold {
1695 results.push(ContextItem {
1696 id: memory_item.id,
1697 content: memory_item.content.clone(),
1698 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1699 relevance_score,
1700 timestamp: memory_item.created_at,
1701 metadata: memory_item.metadata.clone(),
1702 });
1703 }
1704 }
1705 }
1706 }
1707
1708 for episode in &context.memory.episodic_memory {
1710 if episode.timestamp >= time_range.start && episode.timestamp <= time_range.end {
1711 let passes_keyword_filter = if query.search_terms.is_empty() {
1712 true
1713 } else {
1714 let episode_content = format!("{} {}", episode.title, episode.description);
1715 let content_lower = episode_content.to_lowercase();
1716 query
1717 .search_terms
1718 .iter()
1719 .any(|term| content_lower.contains(&term.to_lowercase()))
1720 };
1721
1722 if passes_keyword_filter && episode.importance >= query.relevance_threshold {
1723 results.push(ContextItem {
1724 id: episode.id,
1725 content: format!("{} {}", episode.title, episode.description),
1726 item_type: ContextItemType::Episode,
1727 relevance_score: episode.importance,
1728 timestamp: episode.timestamp,
1729 metadata: HashMap::new(),
1730 });
1731 }
1732 }
1733 }
1734
1735 for conv_item in &context.conversation_history {
1737 if conv_item.timestamp >= time_range.start && conv_item.timestamp <= time_range.end
1738 {
1739 let passes_keyword_filter = if query.search_terms.is_empty() {
1740 true
1741 } else {
1742 let content_lower = conv_item.content.to_lowercase();
1743 query
1744 .search_terms
1745 .iter()
1746 .any(|term| content_lower.contains(&term.to_lowercase()))
1747 };
1748
1749 if passes_keyword_filter {
1750 let time_since_start = conv_item
1752 .timestamp
1753 .duration_since(time_range.start)
1754 .unwrap_or_default()
1755 .as_secs() as f32;
1756 let range_duration = time_range
1757 .end
1758 .duration_since(time_range.start)
1759 .unwrap_or_default()
1760 .as_secs() as f32;
1761
1762 let temporal_score = if range_duration > 0.0 {
1763 1.0 - (time_since_start / range_duration)
1764 } else {
1765 1.0
1766 };
1767
1768 if temporal_score >= query.relevance_threshold {
1769 results.push(ContextItem {
1770 id: conv_item.id,
1771 content: conv_item.content.clone(),
1772 item_type: ContextItemType::Conversation,
1773 relevance_score: temporal_score,
1774 timestamp: conv_item.timestamp,
1775 metadata: HashMap::new(),
1776 });
1777 }
1778 }
1779 }
1780 }
1781
1782 results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1784 results.truncate(query.max_results);
1785
1786 Ok(results)
1787 } else {
1788 Ok(Vec::new())
1789 }
1790 }
1791
1792 async fn similarity_search_memory(
1794 &self,
1795 agent_id: AgentId,
1796 query: &ContextQuery,
1797 ) -> Result<Vec<ContextItem>, ContextError> {
1798 if self.config.enable_vector_db && !query.search_terms.is_empty() {
1799 let search_term = query.search_terms.join(" ");
1801 let query_embedding = self.generate_embeddings(&search_term).await?;
1802
1803 let threshold = query.relevance_threshold;
1805 self.vector_db
1806 .semantic_search(agent_id, query_embedding, query.max_results, threshold)
1807 .await
1808 } else {
1809 let contexts = self.contexts.read().await;
1811 if let Some(context) = contexts.get(&agent_id) {
1812 if query.search_terms.is_empty() {
1813 return Ok(Vec::new());
1814 }
1815
1816 let search_term = query.search_terms.join(" ");
1818 let query_embedding = match self.generate_embeddings(&search_term).await {
1819 Ok(embedding) => embedding,
1820 Err(_) => return Ok(Vec::new()), };
1822
1823 let mut results = Vec::new();
1824
1825 for memory_item in context
1827 .memory
1828 .short_term
1829 .iter()
1830 .chain(context.memory.long_term.iter())
1831 {
1832 if !query.memory_types.is_empty()
1834 && !query.memory_types.contains(&memory_item.memory_type)
1835 {
1836 continue;
1837 }
1838
1839 if let Some(ref item_embedding) = memory_item.embedding {
1840 let similarity = self.cosine_similarity(&query_embedding, item_embedding);
1842
1843 if similarity >= query.relevance_threshold {
1844 let importance_score = self.calculate_importance(memory_item);
1845 let relevance_score = (similarity + importance_score) / 2.0;
1846
1847 results.push(ContextItem {
1848 id: memory_item.id,
1849 content: memory_item.content.clone(),
1850 item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
1851 relevance_score,
1852 timestamp: memory_item.created_at,
1853 metadata: memory_item.metadata.clone(),
1854 });
1855 }
1856 }
1857 }
1858
1859 results.sort_by(|a, b| {
1861 b.relevance_score
1862 .partial_cmp(&a.relevance_score)
1863 .unwrap_or(std::cmp::Ordering::Equal)
1864 });
1865 results.truncate(query.max_results);
1866
1867 Ok(results)
1868 } else {
1869 Ok(Vec::new())
1870 }
1871 }
1872 }
1873
1874 async fn hybrid_search_memory(
1876 &self,
1877 agent_id: AgentId,
1878 query: &ContextQuery,
1879 ) -> Result<Vec<ContextItem>, ContextError> {
1880 let keyword_results = self.keyword_search_memory(agent_id, query).await?;
1882 let similarity_results = self.similarity_search_memory(agent_id, query).await?;
1883
1884 let mut combined_results: HashMap<ContextId, ContextItem> = HashMap::new();
1886
1887 let keyword_weight = 0.4;
1889 let similarity_weight = 0.6;
1890
1891 for mut item in keyword_results {
1893 item.relevance_score *= keyword_weight;
1894 combined_results.insert(item.id, item);
1895 }
1896
1897 for mut item in similarity_results {
1899 item.relevance_score *= similarity_weight;
1900
1901 if let Some(existing_item) = combined_results.get_mut(&item.id) {
1902 existing_item.relevance_score += item.relevance_score;
1904 existing_item.relevance_score =
1906 existing_item.relevance_score.max(item.relevance_score);
1907 } else {
1908 combined_results.insert(item.id, item);
1909 }
1910 }
1911
1912 let mut final_results: Vec<ContextItem> = combined_results.into_values().collect();
1914 final_results.sort_by(|a, b| {
1915 b.relevance_score
1916 .partial_cmp(&a.relevance_score)
1917 .unwrap_or(std::cmp::Ordering::Equal)
1918 });
1919
1920 final_results.retain(|item| item.relevance_score >= query.relevance_threshold);
1922 final_results.truncate(query.max_results);
1923
1924 Ok(final_results)
1925 }
1926
1927 fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
1929 if a.len() != b.len() {
1930 return 0.0;
1931 }
1932
1933 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1934 let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1935 let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1936
1937 if magnitude_a == 0.0 || magnitude_b == 0.0 {
1938 0.0
1939 } else {
1940 dot_product / (magnitude_a * magnitude_b)
1941 }
1942 }
1943
1944 fn calculate_knowledge_relevance(
1954 &self,
1955 content: &str,
1956 query: &str,
1957 confidence: f32,
1958 ) -> Option<f32> {
1959 let content_lower = content.to_lowercase();
1960 let query_lower = query.to_lowercase();
1961 let query_terms: Vec<&str> = query_lower.split_whitespace().collect();
1962
1963 if query_terms.is_empty() {
1964 return None;
1965 }
1966
1967 let mut total_match_score = 0.0f32;
1968 let mut matched_terms = 0;
1969
1970 for term in &query_terms {
1972 if content_lower.contains(term) {
1973 matched_terms += 1;
1974
1975 if content_lower.split_whitespace().any(|word| word == *term) {
1977 total_match_score += 1.0; } else {
1979 total_match_score += 0.6; }
1981
1982 let occurrences = content_lower.matches(term).count();
1984 if occurrences > 1 {
1985 total_match_score += (occurrences as f32 - 1.0) * 0.1;
1986 }
1987 }
1988 }
1989
1990 if matched_terms == 0 {
1992 return None;
1993 }
1994
1995 let term_coverage = matched_terms as f32 / query_terms.len() as f32;
1997 let match_quality = total_match_score / query_terms.len() as f32;
1998
1999 let base_score = match_quality * 0.5 + term_coverage * 0.3 + confidence * 0.2;
2001
2002 let content_length_factor = if content.len() < 50 {
2004 1.0 } else if content.len() < 200 {
2006 0.95 } else {
2008 0.9 };
2010
2011 let position_bonus = if content_lower.starts_with(&query_lower) {
2013 0.1 } else if query_terms
2015 .iter()
2016 .any(|term| content_lower.starts_with(term))
2017 {
2018 0.05 } else {
2020 0.0
2021 };
2022
2023 let final_score = (base_score + position_bonus) * content_length_factor;
2024 Some(final_score.clamp(0.0, 1.0))
2025 }
2026
2027 fn calculate_trust_score(&self, shared_item: &SharedKnowledgeItem) -> f32 {
2029 let mut trust_score = 0.5;
2031
2032 let access_factor = (shared_item.access_count as f32 + 1.0).ln() / 10.0;
2034 trust_score += access_factor;
2035
2036 let knowledge_factor = match &shared_item.knowledge {
2038 Knowledge::Fact(_) => 0.2,
2039 Knowledge::Procedure(_) => 0.1,
2040 Knowledge::Pattern(_) => 0.05,
2041 };
2042 trust_score += knowledge_factor;
2043
2044 trust_score.clamp(0.0, 1.0)
2046 }
2047
2048 fn calculate_memory_size_bytes(&self, context: &AgentContext) -> usize {
2050 let mut total_size = 0;
2051
2052 total_size += std::mem::size_of::<WorkingMemory>();
2054 for (key, value) in &context.memory.working_memory.variables {
2055 total_size += key.len();
2056 total_size += estimate_json_value_size(value);
2057 }
2058 for goal in &context.memory.working_memory.active_goals {
2059 total_size += goal.len();
2060 }
2061 if let Some(ref current_context) = context.memory.working_memory.current_context {
2062 total_size += current_context.len();
2063 }
2064 for focus in &context.memory.working_memory.attention_focus {
2065 total_size += focus.len();
2066 }
2067
2068 for item in &context.memory.short_term {
2070 total_size += estimate_memory_item_size(item);
2071 }
2072
2073 for item in &context.memory.long_term {
2075 total_size += estimate_memory_item_size(item);
2076 }
2077
2078 for episode in &context.memory.episodic_memory {
2080 total_size += estimate_episode_size(episode);
2081 }
2082
2083 for item in &context.memory.semantic_memory {
2085 total_size += estimate_semantic_memory_item_size(item);
2086 }
2087
2088 for fact in &context.knowledge_base.facts {
2090 total_size += estimate_knowledge_fact_size(fact);
2091 }
2092 for procedure in &context.knowledge_base.procedures {
2093 total_size += estimate_procedure_size(procedure);
2094 }
2095 for pattern in &context.knowledge_base.learned_patterns {
2096 total_size += estimate_pattern_size(pattern);
2097 }
2098
2099 for item in &context.conversation_history {
2101 total_size += estimate_conversation_item_size(item);
2102 }
2103
2104 for (key, value) in &context.metadata {
2106 total_size += key.len() + value.len();
2107 }
2108
2109 total_size += std::mem::size_of::<AgentContext>();
2111
2112 total_size
2113 }
2114
2115 fn calculate_retention_statistics(&self, context: &AgentContext) -> RetentionStatus {
2117 let now = SystemTime::now();
2118 let retention_policy = &context.retention_policy;
2119
2120 let mut items_to_archive = 0;
2121 let mut items_to_delete = 0;
2122
2123 let memory_cutoff = now
2125 .checked_sub(retention_policy.memory_retention)
2126 .unwrap_or(SystemTime::UNIX_EPOCH);
2127
2128 let knowledge_cutoff = now
2129 .checked_sub(retention_policy.knowledge_retention)
2130 .unwrap_or(SystemTime::UNIX_EPOCH);
2131
2132 let conversation_cutoff = now
2133 .checked_sub(retention_policy.session_retention)
2134 .unwrap_or(SystemTime::UNIX_EPOCH);
2135
2136 for item in &context.memory.short_term {
2138 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2139 items_to_archive += 1;
2140 }
2141 }
2142
2143 for item in &context.memory.long_term {
2144 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2145 items_to_archive += 1;
2146 }
2147 }
2148
2149 for episode in &context.memory.episodic_memory {
2150 if episode.timestamp < memory_cutoff {
2151 items_to_archive += 1;
2152 }
2153 }
2154
2155 let semantic_cutoff = now
2157 .checked_sub(retention_policy.memory_retention * 2)
2158 .unwrap_or(SystemTime::UNIX_EPOCH);
2159
2160 for item in &context.memory.semantic_memory {
2161 if item.created_at < semantic_cutoff {
2162 items_to_archive += 1;
2163 }
2164 }
2165
2166 for fact in &context.knowledge_base.facts {
2168 if fact.created_at < knowledge_cutoff {
2169 items_to_archive += 1;
2170 }
2171 }
2172
2173 for procedure in &context.knowledge_base.procedures {
2175 if procedure.success_rate < 0.3 {
2176 items_to_archive += 1;
2177 }
2178 }
2179
2180 for pattern in &context.knowledge_base.learned_patterns {
2182 if pattern.confidence < 0.4 || pattern.occurrences < 2 {
2183 items_to_archive += 1;
2184 }
2185 }
2186
2187 for item in &context.conversation_history {
2189 if item.timestamp < conversation_cutoff {
2190 items_to_archive += 1;
2191 }
2192 }
2193
2194 let delete_cutoff_memory = now
2196 .checked_sub(retention_policy.memory_retention * 2)
2197 .unwrap_or(SystemTime::UNIX_EPOCH);
2198
2199 let delete_cutoff_knowledge = now
2200 .checked_sub(retention_policy.knowledge_retention * 2)
2201 .unwrap_or(SystemTime::UNIX_EPOCH);
2202
2203 let delete_cutoff_conversation = now
2204 .checked_sub(retention_policy.session_retention * 2)
2205 .unwrap_or(SystemTime::UNIX_EPOCH);
2206
2207 for item in &context.memory.short_term {
2209 if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
2210 items_to_delete += 1;
2211 }
2212 }
2213
2214 for item in &context.memory.long_term {
2215 if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
2216 items_to_delete += 1;
2217 }
2218 }
2219
2220 for fact in &context.knowledge_base.facts {
2222 if fact.created_at < delete_cutoff_knowledge && !fact.verified {
2223 items_to_delete += 1;
2224 }
2225 }
2226
2227 for item in &context.conversation_history {
2229 if item.timestamp < delete_cutoff_conversation {
2230 items_to_delete += 1;
2231 }
2232 }
2233
2234 let next_cleanup = now + Duration::from_secs(86400); RetentionStatus {
2238 items_to_archive,
2239 items_to_delete,
2240 next_cleanup,
2241 }
2242 }
2243
2244 async fn archive_memory_items(
2246 &self,
2247 context: &mut AgentContext,
2248 before: SystemTime,
2249 archived_context: &mut ArchivedContext,
2250 ) -> Result<u32, ContextError> {
2251 let mut archived_count = 0u32;
2252 let retention_policy = &context.retention_policy;
2253
2254 let memory_cutoff = before
2256 .checked_sub(retention_policy.memory_retention)
2257 .unwrap_or(SystemTime::UNIX_EPOCH);
2258
2259 let mut retained_short_term = Vec::new();
2261 for item in context.memory.short_term.drain(..) {
2262 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2263 archived_context.memory.short_term.push(item);
2264 archived_count += 1;
2265 } else {
2266 retained_short_term.push(item);
2267 }
2268 }
2269 context.memory.short_term = retained_short_term;
2270
2271 let mut retained_long_term = Vec::new();
2273 for item in context.memory.long_term.drain(..) {
2274 if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
2275 archived_context.memory.long_term.push(item);
2276 archived_count += 1;
2277 } else {
2278 retained_long_term.push(item);
2279 }
2280 }
2281 context.memory.long_term = retained_long_term;
2282
2283 let mut retained_episodes = Vec::new();
2285 for episode in context.memory.episodic_memory.drain(..) {
2286 if episode.timestamp < memory_cutoff {
2287 archived_context.memory.episodic_memory.push(episode);
2288 archived_count += 1;
2289 } else {
2290 retained_episodes.push(episode);
2291 }
2292 }
2293 context.memory.episodic_memory = retained_episodes;
2294
2295 let semantic_cutoff = before
2297 .checked_sub(retention_policy.memory_retention * 2)
2298 .unwrap_or(SystemTime::UNIX_EPOCH);
2299 let mut retained_semantic = Vec::new();
2300 for item in context.memory.semantic_memory.drain(..) {
2301 if item.created_at < semantic_cutoff {
2302 archived_context.memory.semantic_memory.push(item);
2303 archived_count += 1;
2304 } else {
2305 retained_semantic.push(item);
2306 }
2307 }
2308 context.memory.semantic_memory = retained_semantic;
2309
2310 Ok(archived_count)
2311 }
2312
2313 async fn archive_conversation_history(
2315 &self,
2316 context: &mut AgentContext,
2317 before: SystemTime,
2318 archived_context: &mut ArchivedContext,
2319 ) -> Result<u32, ContextError> {
2320 let mut archived_count = 0u32;
2321 let retention_policy = &context.retention_policy;
2322
2323 let conversation_cutoff = before
2325 .checked_sub(retention_policy.session_retention)
2326 .unwrap_or(SystemTime::UNIX_EPOCH);
2327
2328 let mut retained_conversations = Vec::new();
2330 for item in context.conversation_history.drain(..) {
2331 if item.timestamp < conversation_cutoff {
2332 archived_context.conversation_history.push(item);
2333 archived_count += 1;
2334 } else {
2335 retained_conversations.push(item);
2336 }
2337 }
2338 context.conversation_history = retained_conversations;
2339
2340 Ok(archived_count)
2341 }
2342
2343 async fn archive_knowledge_items(
2345 &self,
2346 context: &mut AgentContext,
2347 before: SystemTime,
2348 archived_context: &mut ArchivedContext,
2349 ) -> Result<u32, ContextError> {
2350 let mut archived_count = 0u32;
2351 let retention_policy = &context.retention_policy;
2352
2353 let knowledge_cutoff = before
2355 .checked_sub(retention_policy.knowledge_retention)
2356 .unwrap_or(SystemTime::UNIX_EPOCH);
2357
2358 let mut retained_facts = Vec::new();
2360 for fact in context.knowledge_base.facts.drain(..) {
2361 if fact.created_at < knowledge_cutoff {
2362 archived_context.knowledge_base.facts.push(fact);
2363 archived_count += 1;
2364 } else {
2365 retained_facts.push(fact);
2366 }
2367 }
2368 context.knowledge_base.facts = retained_facts;
2369
2370 let mut retained_procedures = Vec::new();
2373 for procedure in context.knowledge_base.procedures.drain(..) {
2374 if procedure.success_rate < 0.3 {
2376 archived_context.knowledge_base.procedures.push(procedure);
2377 archived_count += 1;
2378 } else {
2379 retained_procedures.push(procedure);
2380 }
2381 }
2382 context.knowledge_base.procedures = retained_procedures;
2383
2384 let mut retained_patterns = Vec::new();
2386 for pattern in context.knowledge_base.learned_patterns.drain(..) {
2387 if pattern.confidence < 0.4 || pattern.occurrences < 2 {
2388 archived_context
2389 .knowledge_base
2390 .learned_patterns
2391 .push(pattern);
2392 archived_count += 1;
2393 } else {
2394 retained_patterns.push(pattern);
2395 }
2396 }
2397 context.knowledge_base.learned_patterns = retained_patterns;
2398
2399 Ok(archived_count)
2400 }
2401
2402 async fn save_archived_context(
2404 &self,
2405 agent_id: AgentId,
2406 archived_context: &ArchivedContext,
2407 ) -> Result<(), ContextError> {
2408 if !self.config.enable_persistence {
2409 return Ok(());
2410 }
2411
2412 let archive_dir = self.get_archive_directory_path(agent_id).await?;
2414
2415 let timestamp = archived_context
2417 .archived_at
2418 .duration_since(SystemTime::UNIX_EPOCH)
2419 .unwrap_or_default()
2420 .as_secs();
2421 let archive_filename = format!("archive_{}.json", timestamp);
2422 let archive_path = archive_dir.join(archive_filename);
2423
2424 let archive_data = serde_json::to_vec_pretty(archived_context).map_err(|e| {
2426 ContextError::SerializationError {
2427 reason: format!("Failed to serialize archived context: {}", e),
2428 }
2429 })?;
2430
2431 let final_data = if self.config.persistence_config.enable_compression {
2433 self.compress_data(&archive_data)?
2434 } else {
2435 archive_data
2436 };
2437
2438 let temp_path = archive_path.with_extension("tmp");
2440 fs::write(&temp_path, &final_data)
2441 .await
2442 .map_err(|e| ContextError::StorageError {
2443 reason: format!("Failed to write archive file: {}", e),
2444 })?;
2445
2446 fs::rename(&temp_path, &archive_path)
2448 .await
2449 .map_err(|e| ContextError::StorageError {
2450 reason: format!("Failed to finalize archive file: {}", e),
2451 })?;
2452
2453 tracing::info!(
2454 "Saved archived context for agent {} to {}",
2455 agent_id,
2456 archive_path.display()
2457 );
2458
2459 Ok(())
2460 }
2461
2462 async fn get_archive_directory_path(&self, agent_id: AgentId) -> Result<PathBuf, ContextError> {
2464 let archive_dir = self
2465 .config
2466 .persistence_config
2467 .agent_contexts_path()
2468 .join("archives")
2469 .join(agent_id.to_string());
2470
2471 fs::create_dir_all(&archive_dir)
2473 .await
2474 .map_err(|e| ContextError::StorageError {
2475 reason: format!("Failed to create archive directory: {}", e),
2476 })?;
2477
2478 Ok(archive_dir)
2479 }
2480
2481 fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>, ContextError> {
2483 use flate2::write::GzEncoder;
2484 use flate2::Compression;
2485 use std::io::Write;
2486
2487 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2488 encoder
2489 .write_all(data)
2490 .map_err(|e| ContextError::SerializationError {
2491 reason: format!("Failed to compress archive data: {}", e),
2492 })?;
2493 encoder
2494 .finish()
2495 .map_err(|e| ContextError::SerializationError {
2496 reason: format!("Failed to finalize compression: {}", e),
2497 })
2498 }
2499
2500 fn knowledge_to_item(
2502 &self,
2503 knowledge: &Knowledge,
2504 knowledge_id: KnowledgeId,
2505 ) -> Result<KnowledgeItem, ContextError> {
2506 match knowledge {
2507 Knowledge::Fact(fact) => {
2508 Ok(KnowledgeItem {
2509 id: knowledge_id,
2510 content: format!("{} {} {}", fact.subject, fact.predicate, fact.object),
2511 knowledge_type: KnowledgeType::Fact,
2512 confidence: fact.confidence,
2513 relevance_score: 1.0, source: fact.source.clone(),
2515 created_at: fact.created_at,
2516 })
2517 }
2518 Knowledge::Procedure(procedure) => {
2519 Ok(KnowledgeItem {
2520 id: knowledge_id,
2521 content: format!("{}: {}", procedure.name, procedure.description),
2522 knowledge_type: KnowledgeType::Procedure,
2523 confidence: procedure.success_rate,
2524 relevance_score: 1.0, source: KnowledgeSource::Learning,
2526 created_at: SystemTime::now(),
2527 })
2528 }
2529 Knowledge::Pattern(pattern) => {
2530 Ok(KnowledgeItem {
2531 id: knowledge_id,
2532 content: format!("Pattern: {}", pattern.description),
2533 knowledge_type: KnowledgeType::Pattern,
2534 confidence: pattern.confidence,
2535 relevance_score: 1.0, source: KnowledgeSource::Learning,
2537 created_at: SystemTime::now(),
2538 })
2539 }
2540 }
2541 }
2542}
2543
2544fn estimate_json_value_size(value: &Value) -> usize {
2546 match value {
2547 Value::Null => 4,
2548 Value::Bool(_) => 1,
2549 Value::Number(n) => std::mem::size_of::<f64>() + n.to_string().len(),
2550 Value::String(s) => s.len(),
2551 Value::Array(arr) => {
2552 arr.iter().map(estimate_json_value_size).sum::<usize>()
2553 + std::mem::size_of::<Vec<Value>>()
2554 }
2555 Value::Object(obj) => {
2556 obj.iter()
2557 .map(|(k, v)| k.len() + estimate_json_value_size(v))
2558 .sum::<usize>()
2559 + std::mem::size_of::<serde_json::Map<String, Value>>()
2560 }
2561 }
2562}
2563
2564fn estimate_memory_item_size(item: &MemoryItem) -> usize {
2566 let mut size = std::mem::size_of::<MemoryItem>();
2567 size += item.content.len();
2568
2569 if let Some(ref embedding) = item.embedding {
2571 size += embedding.len() * std::mem::size_of::<f32>();
2572 }
2573
2574 for (key, value) in &item.metadata {
2576 size += key.len() + value.len();
2577 }
2578
2579 size
2580}
2581
2582fn estimate_episode_size(episode: &Episode) -> usize {
2584 let mut size = std::mem::size_of::<Episode>();
2585 size += episode.title.len();
2586 size += episode.description.len();
2587
2588 if let Some(ref outcome) = episode.outcome {
2589 size += outcome.len();
2590 }
2591
2592 for event in &episode.events {
2593 size += event.action.len();
2594 size += event.result.len();
2595 for (key, value) in &event.context {
2596 size += key.len() + value.len();
2597 }
2598 }
2599
2600 for lesson in &episode.lessons_learned {
2601 size += lesson.len();
2602 }
2603
2604 size
2605}
2606
2607fn estimate_semantic_memory_item_size(item: &SemanticMemoryItem) -> usize {
2609 let mut size = std::mem::size_of::<SemanticMemoryItem>();
2610 size += item.concept.len();
2611
2612 for relationship in &item.relationships {
2613 size += relationship.target_concept.len();
2614 size += std::mem::size_of::<ConceptRelationship>();
2615 }
2616
2617 for (key, value) in &item.properties {
2618 size += key.len() + estimate_json_value_size(value);
2619 }
2620
2621 size
2622}
2623
2624fn estimate_knowledge_fact_size(fact: &KnowledgeFact) -> usize {
2626 std::mem::size_of::<KnowledgeFact>()
2627 + fact.subject.len()
2628 + fact.predicate.len()
2629 + fact.object.len()
2630}
2631
2632fn estimate_procedure_size(procedure: &Procedure) -> usize {
2634 let mut size = std::mem::size_of::<Procedure>();
2635 size += procedure.name.len();
2636 size += procedure.description.len();
2637
2638 for step in &procedure.steps {
2639 size += step.action.len();
2640 size += step.expected_result.len();
2641 if let Some(ref error_handling) = step.error_handling {
2642 size += error_handling.len();
2643 }
2644 }
2645
2646 for condition in &procedure.preconditions {
2647 size += condition.len();
2648 }
2649
2650 for condition in &procedure.postconditions {
2651 size += condition.len();
2652 }
2653
2654 size
2655}
2656
2657fn estimate_pattern_size(pattern: &Pattern) -> usize {
2659 let mut size = std::mem::size_of::<Pattern>();
2660 size += pattern.name.len();
2661 size += pattern.description.len();
2662
2663 for condition in &pattern.conditions {
2664 size += condition.len();
2665 }
2666
2667 for outcome in &pattern.outcomes {
2668 size += outcome.len();
2669 }
2670
2671 size
2672}
2673
2674fn estimate_conversation_item_size(item: &ConversationItem) -> usize {
2676 let mut size = std::mem::size_of::<ConversationItem>();
2677 size += item.content.len();
2678
2679 size += item.context_used.len() * std::mem::size_of::<ContextId>();
2681 size += item.knowledge_used.len() * std::mem::size_of::<KnowledgeId>();
2682
2683 size
2684}
2685
2686#[async_trait]
2687impl ContextManager for StandardContextManager {
2688 async fn store_context(
2689 &self,
2690 agent_id: AgentId,
2691 mut context: AgentContext,
2692 ) -> Result<ContextId, ContextError> {
2693 self.validate_access(agent_id, "store_context").await?;
2694
2695 context.updated_at = SystemTime::now();
2696 let context_id = ContextId::new();
2697
2698 if self.config.enable_persistence {
2700 self.persistence.save_context(agent_id, &context).await?;
2701 }
2702
2703 let mut contexts = self.contexts.write().await;
2705 contexts.insert(agent_id, context);
2706
2707 Ok(context_id)
2708 }
2709
2710 async fn retrieve_context(
2711 &self,
2712 agent_id: AgentId,
2713 session_id: Option<SessionId>,
2714 ) -> Result<Option<AgentContext>, ContextError> {
2715 self.validate_access(agent_id, "retrieve_context").await?;
2716
2717 {
2719 let contexts = self.contexts.read().await;
2720 if let Some(context) = contexts.get(&agent_id) {
2721 if let Some(sid) = session_id {
2723 if context.session_id == sid {
2724 return Ok(Some(context.clone()));
2725 }
2726 } else {
2727 return Ok(Some(context.clone()));
2728 }
2729 }
2730 }
2731
2732 if self.config.enable_persistence {
2734 if let Some(context) = self.persistence.load_context(agent_id).await? {
2735 if let Some(sid) = session_id {
2737 if context.session_id != sid {
2738 return Ok(None);
2739 }
2740 }
2741
2742 let mut contexts = self.contexts.write().await;
2744 contexts.insert(agent_id, context.clone());
2745
2746 return Ok(Some(context));
2747 }
2748 }
2749
2750 Ok(None)
2751 }
2752
2753 async fn query_context(
2754 &self,
2755 agent_id: AgentId,
2756 query: ContextQuery,
2757 ) -> Result<Vec<ContextItem>, ContextError> {
2758 self.validate_access(agent_id, "query_context").await?;
2759
2760 match query.query_type {
2761 QueryType::Semantic => {
2762 let search_term = query.search_terms.join(" ");
2763 self.semantic_search_memory(agent_id, &search_term, query.max_results)
2764 .await
2765 }
2766 QueryType::Keyword => self.keyword_search_memory(agent_id, &query).await,
2767 QueryType::Temporal => self.temporal_search_memory(agent_id, &query).await,
2768 QueryType::Similarity => self.similarity_search_memory(agent_id, &query).await,
2769 QueryType::Hybrid => self.hybrid_search_memory(agent_id, &query).await,
2770 }
2771 }
2772
2773 async fn update_memory(
2774 &self,
2775 agent_id: AgentId,
2776 memory_updates: Vec<MemoryUpdate>,
2777 ) -> Result<(), ContextError> {
2778 self.validate_access(agent_id, "update_memory").await?;
2779
2780 let mut contexts = self.contexts.write().await;
2781 if let Some(context) = contexts.get_mut(&agent_id) {
2782 for update in memory_updates {
2783 match update.operation {
2784 UpdateOperation::Add => {
2785 match update.target {
2786 MemoryTarget::ShortTerm(_) => {
2787 if let Ok(memory_item_data) =
2789 serde_json::from_value::<serde_json::Map<String, Value>>(
2790 update.data,
2791 )
2792 {
2793 let memory_item = MemoryItem {
2794 id: ContextId::new(),
2795 content: memory_item_data
2796 .get("content")
2797 .and_then(|v| v.as_str())
2798 .unwrap_or("")
2799 .to_string(),
2800 memory_type: memory_item_data
2801 .get("memory_type")
2802 .and_then(|v| serde_json::from_value(v.clone()).ok())
2803 .unwrap_or(MemoryType::Factual),
2804 importance: memory_item_data
2805 .get("importance")
2806 .and_then(|v| v.as_f64())
2807 .unwrap_or(0.5)
2808 as f32,
2809 access_count: 0,
2810 last_accessed: SystemTime::now(),
2811 created_at: SystemTime::now(),
2812 embedding: None,
2813 metadata: memory_item_data
2814 .get("metadata")
2815 .and_then(|v| serde_json::from_value(v.clone()).ok())
2816 .unwrap_or_default(),
2817 };
2818 context.memory.short_term.push(memory_item);
2819 }
2820 }
2821 MemoryTarget::LongTerm(_) => {
2822 if let Ok(memory_item_data) =
2824 serde_json::from_value::<serde_json::Map<String, Value>>(
2825 update.data,
2826 )
2827 {
2828 let memory_item = MemoryItem {
2829 id: ContextId::new(),
2830 content: memory_item_data
2831 .get("content")
2832 .and_then(|v| v.as_str())
2833 .unwrap_or("")
2834 .to_string(),
2835 memory_type: memory_item_data
2836 .get("memory_type")
2837 .and_then(|v| serde_json::from_value(v.clone()).ok())
2838 .unwrap_or(MemoryType::Factual),
2839 importance: memory_item_data
2840 .get("importance")
2841 .and_then(|v| v.as_f64())
2842 .unwrap_or(0.5)
2843 as f32,
2844 access_count: 0,
2845 last_accessed: SystemTime::now(),
2846 created_at: SystemTime::now(),
2847 embedding: None,
2848 metadata: memory_item_data
2849 .get("metadata")
2850 .and_then(|v| serde_json::from_value(v.clone()).ok())
2851 .unwrap_or_default(),
2852 };
2853 context.memory.long_term.push(memory_item);
2854 }
2855 }
2856 MemoryTarget::Working(key) => {
2857 context
2859 .memory
2860 .working_memory
2861 .variables
2862 .insert(key, update.data);
2863 }
2864 MemoryTarget::Episodic(_) => {
2865 if let Ok(episode_data) =
2867 serde_json::from_value::<serde_json::Map<String, Value>>(
2868 update.data,
2869 )
2870 {
2871 let episode = Episode {
2872 id: ContextId::new(),
2873 title: episode_data
2874 .get("title")
2875 .and_then(|v| v.as_str())
2876 .unwrap_or("Untitled Episode")
2877 .to_string(),
2878 description: episode_data
2879 .get("description")
2880 .and_then(|v| v.as_str())
2881 .unwrap_or("")
2882 .to_string(),
2883 events: episode_data
2884 .get("events")
2885 .and_then(|v| serde_json::from_value(v.clone()).ok())
2886 .unwrap_or_default(),
2887 outcome: episode_data
2888 .get("outcome")
2889 .and_then(|v| v.as_str())
2890 .map(|s| s.to_string()),
2891 lessons_learned: episode_data
2892 .get("lessons_learned")
2893 .and_then(|v| serde_json::from_value(v.clone()).ok())
2894 .unwrap_or_default(),
2895 timestamp: SystemTime::now(),
2896 importance: episode_data
2897 .get("importance")
2898 .and_then(|v| v.as_f64())
2899 .unwrap_or(0.5)
2900 as f32,
2901 };
2902 context.memory.episodic_memory.push(episode);
2903 }
2904 }
2905 MemoryTarget::Semantic(_) => {
2906 if let Ok(semantic_data) =
2908 serde_json::from_value::<serde_json::Map<String, Value>>(
2909 update.data,
2910 )
2911 {
2912 let semantic_item = SemanticMemoryItem {
2913 id: ContextId::new(),
2914 concept: semantic_data
2915 .get("concept")
2916 .and_then(|v| v.as_str())
2917 .unwrap_or("")
2918 .to_string(),
2919 relationships: semantic_data
2920 .get("relationships")
2921 .and_then(|v| serde_json::from_value(v.clone()).ok())
2922 .unwrap_or_default(),
2923 properties: semantic_data
2924 .get("properties")
2925 .and_then(|v| serde_json::from_value(v.clone()).ok())
2926 .unwrap_or_default(),
2927 confidence: semantic_data
2928 .get("confidence")
2929 .and_then(|v| v.as_f64())
2930 .unwrap_or(0.5)
2931 as f32,
2932 created_at: SystemTime::now(),
2933 updated_at: SystemTime::now(),
2934 };
2935 context.memory.semantic_memory.push(semantic_item);
2936 }
2937 }
2938 }
2939 }
2940 UpdateOperation::Update => {
2941 match update.target {
2942 MemoryTarget::ShortTerm(target_id) => {
2943 if let Some(memory_item) = context
2945 .memory
2946 .short_term
2947 .iter_mut()
2948 .find(|item| item.id == target_id)
2949 {
2950 if let Ok(update_data) =
2951 serde_json::from_value::<serde_json::Map<String, Value>>(
2952 update.data,
2953 )
2954 {
2955 if let Some(content) =
2956 update_data.get("content").and_then(|v| v.as_str())
2957 {
2958 memory_item.content = content.to_string();
2959 }
2960 if let Some(importance) =
2961 update_data.get("importance").and_then(|v| v.as_f64())
2962 {
2963 memory_item.importance = importance as f32;
2964 }
2965 if let Some(metadata) = update_data.get("metadata") {
2966 if let Ok(new_metadata) =
2967 serde_json::from_value(metadata.clone())
2968 {
2969 memory_item.metadata = new_metadata;
2970 }
2971 }
2972 memory_item.last_accessed = SystemTime::now();
2973 memory_item.access_count += 1;
2974 }
2975 }
2976 }
2977 MemoryTarget::LongTerm(target_id) => {
2978 if let Some(memory_item) = context
2980 .memory
2981 .long_term
2982 .iter_mut()
2983 .find(|item| item.id == target_id)
2984 {
2985 if let Ok(update_data) =
2986 serde_json::from_value::<serde_json::Map<String, Value>>(
2987 update.data,
2988 )
2989 {
2990 if let Some(content) =
2991 update_data.get("content").and_then(|v| v.as_str())
2992 {
2993 memory_item.content = content.to_string();
2994 }
2995 if let Some(importance) =
2996 update_data.get("importance").and_then(|v| v.as_f64())
2997 {
2998 memory_item.importance = importance as f32;
2999 }
3000 if let Some(metadata) = update_data.get("metadata") {
3001 if let Ok(new_metadata) =
3002 serde_json::from_value(metadata.clone())
3003 {
3004 memory_item.metadata = new_metadata;
3005 }
3006 }
3007 memory_item.last_accessed = SystemTime::now();
3008 memory_item.access_count += 1;
3009 }
3010 }
3011 }
3012 MemoryTarget::Working(key) => {
3013 context
3015 .memory
3016 .working_memory
3017 .variables
3018 .insert(key, update.data);
3019 }
3020 MemoryTarget::Episodic(target_id) => {
3021 if let Some(episode) = context
3023 .memory
3024 .episodic_memory
3025 .iter_mut()
3026 .find(|ep| ep.id == target_id)
3027 {
3028 if let Ok(update_data) =
3029 serde_json::from_value::<serde_json::Map<String, Value>>(
3030 update.data,
3031 )
3032 {
3033 if let Some(title) =
3034 update_data.get("title").and_then(|v| v.as_str())
3035 {
3036 episode.title = title.to_string();
3037 }
3038 if let Some(description) =
3039 update_data.get("description").and_then(|v| v.as_str())
3040 {
3041 episode.description = description.to_string();
3042 }
3043 if let Some(outcome) =
3044 update_data.get("outcome").and_then(|v| v.as_str())
3045 {
3046 episode.outcome = Some(outcome.to_string());
3047 }
3048 if let Some(importance) =
3049 update_data.get("importance").and_then(|v| v.as_f64())
3050 {
3051 episode.importance = importance as f32;
3052 }
3053 if let Some(lessons) = update_data.get("lessons_learned") {
3054 if let Ok(new_lessons) =
3055 serde_json::from_value(lessons.clone())
3056 {
3057 episode.lessons_learned = new_lessons;
3058 }
3059 }
3060 }
3061 }
3062 }
3063 MemoryTarget::Semantic(target_id) => {
3064 if let Some(semantic_item) = context
3066 .memory
3067 .semantic_memory
3068 .iter_mut()
3069 .find(|item| item.id == target_id)
3070 {
3071 if let Ok(update_data) =
3072 serde_json::from_value::<serde_json::Map<String, Value>>(
3073 update.data,
3074 )
3075 {
3076 if let Some(concept) =
3077 update_data.get("concept").and_then(|v| v.as_str())
3078 {
3079 semantic_item.concept = concept.to_string();
3080 }
3081 if let Some(confidence) =
3082 update_data.get("confidence").and_then(|v| v.as_f64())
3083 {
3084 semantic_item.confidence = confidence as f32;
3085 }
3086 if let Some(relationships) =
3087 update_data.get("relationships")
3088 {
3089 if let Ok(new_relationships) =
3090 serde_json::from_value(relationships.clone())
3091 {
3092 semantic_item.relationships = new_relationships;
3093 }
3094 }
3095 if let Some(properties) = update_data.get("properties") {
3096 if let Ok(new_properties) =
3097 serde_json::from_value(properties.clone())
3098 {
3099 semantic_item.properties = new_properties;
3100 }
3101 }
3102 semantic_item.updated_at = SystemTime::now();
3103 }
3104 }
3105 }
3106 }
3107 }
3108 UpdateOperation::Delete => {
3109 match update.target {
3110 MemoryTarget::ShortTerm(target_id) => {
3111 context
3113 .memory
3114 .short_term
3115 .retain(|item| item.id != target_id);
3116 }
3117 MemoryTarget::LongTerm(target_id) => {
3118 context.memory.long_term.retain(|item| item.id != target_id);
3120 }
3121 MemoryTarget::Working(key) => {
3122 context.memory.working_memory.variables.remove(&key);
3124 }
3125 MemoryTarget::Episodic(target_id) => {
3126 context
3128 .memory
3129 .episodic_memory
3130 .retain(|ep| ep.id != target_id);
3131 }
3132 MemoryTarget::Semantic(target_id) => {
3133 context
3135 .memory
3136 .semantic_memory
3137 .retain(|item| item.id != target_id);
3138 }
3139 }
3140 }
3141 UpdateOperation::Increment => {
3142 match update.target {
3143 MemoryTarget::ShortTerm(target_id) => {
3144 if let Some(memory_item) = context
3146 .memory
3147 .short_term
3148 .iter_mut()
3149 .find(|item| item.id == target_id)
3150 {
3151 if let Ok(increment_data) =
3152 serde_json::from_value::<serde_json::Map<String, Value>>(
3153 update.data,
3154 )
3155 {
3156 if let Some(importance_increment) = increment_data
3157 .get("importance")
3158 .and_then(|v| v.as_f64())
3159 {
3160 memory_item.importance = (memory_item.importance
3161 + importance_increment as f32)
3162 .clamp(0.0, 1.0);
3163 }
3164 if let Some(access_increment) = increment_data
3165 .get("access_count")
3166 .and_then(|v| v.as_u64())
3167 {
3168 memory_item.access_count = memory_item
3169 .access_count
3170 .saturating_add(access_increment as u32);
3171 }
3172 memory_item.last_accessed = SystemTime::now();
3173 }
3174 }
3175 }
3176 MemoryTarget::LongTerm(target_id) => {
3177 if let Some(memory_item) = context
3179 .memory
3180 .long_term
3181 .iter_mut()
3182 .find(|item| item.id == target_id)
3183 {
3184 if let Ok(increment_data) =
3185 serde_json::from_value::<serde_json::Map<String, Value>>(
3186 update.data,
3187 )
3188 {
3189 if let Some(importance_increment) = increment_data
3190 .get("importance")
3191 .and_then(|v| v.as_f64())
3192 {
3193 memory_item.importance = (memory_item.importance
3194 + importance_increment as f32)
3195 .clamp(0.0, 1.0);
3196 }
3197 if let Some(access_increment) = increment_data
3198 .get("access_count")
3199 .and_then(|v| v.as_u64())
3200 {
3201 memory_item.access_count = memory_item
3202 .access_count
3203 .saturating_add(access_increment as u32);
3204 }
3205 memory_item.last_accessed = SystemTime::now();
3206 }
3207 }
3208 }
3209 MemoryTarget::Working(key) => {
3210 if let Some(existing_value) =
3212 context.memory.working_memory.variables.get(&key)
3213 {
3214 if let (Some(current), Some(increment)) =
3215 (existing_value.as_f64(), update.data.as_f64())
3216 {
3217 let new_value = current + increment;
3218 context.memory.working_memory.variables.insert(
3219 key,
3220 Value::Number(
3221 serde_json::Number::from_f64(new_value)
3222 .unwrap_or_else(|| serde_json::Number::from(0)),
3223 ),
3224 );
3225 } else if let (Some(current), Some(increment)) =
3226 (existing_value.as_i64(), update.data.as_i64())
3227 {
3228 let new_value = current.saturating_add(increment);
3229 context.memory.working_memory.variables.insert(
3230 key,
3231 Value::Number(serde_json::Number::from(new_value)),
3232 );
3233 }
3234 }
3235 }
3236 MemoryTarget::Episodic(target_id) => {
3237 if let Some(episode) = context
3239 .memory
3240 .episodic_memory
3241 .iter_mut()
3242 .find(|ep| ep.id == target_id)
3243 {
3244 if let Some(importance_increment) = update.data.as_f64() {
3245 episode.importance = (episode.importance
3246 + importance_increment as f32)
3247 .clamp(0.0, 1.0);
3248 }
3249 }
3250 }
3251 MemoryTarget::Semantic(target_id) => {
3252 if let Some(semantic_item) = context
3254 .memory
3255 .semantic_memory
3256 .iter_mut()
3257 .find(|item| item.id == target_id)
3258 {
3259 if let Some(confidence_increment) = update.data.as_f64() {
3260 semantic_item.confidence = (semantic_item.confidence
3261 + confidence_increment as f32)
3262 .clamp(0.0, 1.0);
3263 semantic_item.updated_at = SystemTime::now();
3264 }
3265 }
3266 }
3267 }
3268 }
3269 }
3270 }
3271
3272 context.updated_at = SystemTime::now();
3274
3275 if self.config.enable_persistence {
3277 if let Err(e) = self.persistence.save_context(agent_id, context).await {
3278 tracing::error!(
3279 "Failed to persist memory updates for agent {}: {}",
3280 agent_id,
3281 e
3282 );
3283 return Err(e);
3284 }
3285 }
3286 } else {
3287 return Err(ContextError::NotFound {
3288 id: ContextId::new(),
3289 });
3290 }
3291
3292 Ok(())
3293 }
3294
3295 async fn add_knowledge(
3296 &self,
3297 agent_id: AgentId,
3298 knowledge: Knowledge,
3299 ) -> Result<KnowledgeId, ContextError> {
3300 self.validate_access(agent_id, "add_knowledge").await?;
3301
3302 let knowledge_id = KnowledgeId::new();
3303
3304 if self.config.enable_vector_db {
3306 let knowledge_item = self.knowledge_to_item(&knowledge, knowledge_id)?;
3307 let embedding = self.generate_embeddings(&knowledge_item.content).await?;
3308 let _vector_id = self
3309 .vector_db
3310 .store_knowledge_item(&knowledge_item, embedding)
3311 .await?;
3312 }
3313
3314 let mut contexts = self.contexts.write().await;
3316 if let Some(context) = contexts.get_mut(&agent_id) {
3317 match knowledge {
3318 Knowledge::Fact(fact) => {
3319 context.knowledge_base.facts.push(fact);
3320 }
3321 Knowledge::Procedure(procedure) => {
3322 context.knowledge_base.procedures.push(procedure);
3323 }
3324 Knowledge::Pattern(pattern) => {
3325 context.knowledge_base.learned_patterns.push(pattern);
3326 }
3327 }
3328 context.updated_at = SystemTime::now();
3329 }
3330
3331 Ok(knowledge_id)
3332 }
3333
3334 async fn search_knowledge(
3335 &self,
3336 agent_id: AgentId,
3337 query: &str,
3338 limit: usize,
3339 ) -> Result<Vec<KnowledgeItem>, ContextError> {
3340 self.validate_access(agent_id, "search_knowledge").await?;
3341
3342 if self.config.enable_vector_db {
3343 let query_embedding = self.generate_embeddings(query).await?;
3345
3346 self.vector_db
3348 .search_knowledge_base(agent_id, query_embedding, limit)
3349 .await
3350 } else {
3351 let contexts = self.contexts.read().await;
3353 if let Some(context) = contexts.get(&agent_id) {
3354 let mut results = Vec::new();
3355
3356 for fact in &context.knowledge_base.facts {
3358 let content = format!("{} {} {}", fact.subject, fact.predicate, fact.object);
3359 if let Some(relevance_score) =
3360 self.calculate_knowledge_relevance(&content, query, fact.confidence)
3361 {
3362 results.push(KnowledgeItem {
3363 id: fact.id,
3364 content,
3365 knowledge_type: KnowledgeType::Fact,
3366 confidence: fact.confidence,
3367 relevance_score,
3368 source: fact.source.clone(),
3369 created_at: fact.created_at,
3370 });
3371 }
3372 }
3373
3374 for procedure in &context.knowledge_base.procedures {
3376 let searchable_content =
3377 format!("{} {}", procedure.name, procedure.description);
3378 if let Some(relevance_score) = self.calculate_knowledge_relevance(
3379 &searchable_content,
3380 query,
3381 procedure.success_rate,
3382 ) {
3383 results.push(KnowledgeItem {
3384 id: procedure.id,
3385 content: format!("{}: {}", procedure.name, procedure.description),
3386 knowledge_type: KnowledgeType::Procedure,
3387 confidence: procedure.success_rate,
3388 relevance_score,
3389 source: KnowledgeSource::Learning,
3390 created_at: SystemTime::now(), });
3392 }
3393 }
3394
3395 for pattern in &context.knowledge_base.learned_patterns {
3397 let searchable_content = format!("{} {}", pattern.name, pattern.description);
3398 if let Some(relevance_score) = self.calculate_knowledge_relevance(
3399 &searchable_content,
3400 query,
3401 pattern.confidence,
3402 ) {
3403 results.push(KnowledgeItem {
3404 id: pattern.id,
3405 content: format!("Pattern: {}", pattern.description),
3406 knowledge_type: KnowledgeType::Pattern,
3407 confidence: pattern.confidence,
3408 relevance_score,
3409 source: KnowledgeSource::Learning,
3410 created_at: SystemTime::now(), });
3412 }
3413 }
3414
3415 results.sort_by(|a, b| {
3417 b.relevance_score
3418 .partial_cmp(&a.relevance_score)
3419 .unwrap_or(std::cmp::Ordering::Equal)
3420 });
3421
3422 results.truncate(limit);
3423 Ok(results)
3424 } else {
3425 Ok(Vec::new())
3426 }
3427 }
3428 }
3429
3430 async fn share_knowledge(
3431 &self,
3432 from_agent: AgentId,
3433 _to_agent: AgentId,
3434 knowledge_id: KnowledgeId,
3435 access_level: AccessLevel,
3436 ) -> Result<(), ContextError> {
3437 self.validate_access(from_agent, "share_knowledge").await?;
3438
3439 let contexts = self.contexts.read().await;
3441 if let Some(from_context) = contexts.get(&from_agent) {
3442 let knowledge = if let Some(fact) = from_context
3444 .knowledge_base
3445 .facts
3446 .iter()
3447 .find(|f| f.id == knowledge_id)
3448 {
3449 Some(Knowledge::Fact(fact.clone()))
3450 } else if let Some(procedure) = from_context
3451 .knowledge_base
3452 .procedures
3453 .iter()
3454 .find(|p| p.id == knowledge_id)
3455 {
3456 Some(Knowledge::Procedure(procedure.clone()))
3457 } else {
3458 from_context
3459 .knowledge_base
3460 .learned_patterns
3461 .iter()
3462 .find(|p| p.id == knowledge_id)
3463 .map(|pattern| Knowledge::Pattern(pattern.clone()))
3464 };
3465
3466 if let Some(knowledge) = knowledge {
3467 let shared_item = SharedKnowledgeItem {
3469 knowledge,
3470 source_agent: from_agent,
3471 access_level,
3472 created_at: SystemTime::now(),
3473 access_count: 0,
3474 };
3475
3476 let mut shared_knowledge = self.shared_knowledge.write().await;
3477 shared_knowledge.insert(knowledge_id, shared_item);
3478
3479 Ok(())
3480 } else {
3481 Err(ContextError::KnowledgeNotFound { id: knowledge_id })
3482 }
3483 } else {
3484 Err(ContextError::NotFound {
3485 id: ContextId::new(),
3486 })
3487 }
3488 }
3489
3490 async fn get_shared_knowledge(
3491 &self,
3492 agent_id: AgentId,
3493 ) -> Result<Vec<SharedKnowledgeRef>, ContextError> {
3494 self.validate_access(agent_id, "get_shared_knowledge")
3495 .await?;
3496
3497 let shared_knowledge = self.shared_knowledge.read().await;
3498 let mut results = Vec::new();
3499
3500 for (knowledge_id, shared_item) in shared_knowledge.iter() {
3501 match shared_item.access_level {
3503 AccessLevel::Public => {
3504 let trust_score = self.calculate_trust_score(shared_item);
3506
3507 tracing::debug!(
3508 "Shared knowledge {} accessed {} times, trust score: {}",
3509 knowledge_id,
3510 shared_item.access_count,
3511 trust_score
3512 );
3513
3514 results.push(SharedKnowledgeRef {
3515 knowledge_id: *knowledge_id,
3516 source_agent: shared_item.source_agent,
3517 shared_at: shared_item.created_at,
3518 access_level: shared_item.access_level.clone(),
3519 trust_score,
3520 });
3521 }
3522 _ => {
3523 }
3525 }
3526 }
3527
3528 Ok(results)
3529 }
3530
3531 async fn archive_context(
3532 &self,
3533 agent_id: AgentId,
3534 before: SystemTime,
3535 ) -> Result<u32, ContextError> {
3536 self.validate_access(agent_id, "archive_context").await?;
3537
3538 let mut total_archived = 0u32;
3539 let mut archived_context = ArchivedContext::new(agent_id, before);
3540
3541 let mut contexts = self.contexts.write().await;
3543 if let Some(context) = contexts.get_mut(&agent_id) {
3544 total_archived += self
3546 .archive_memory_items(context, before, &mut archived_context)
3547 .await?;
3548
3549 total_archived += self
3551 .archive_conversation_history(context, before, &mut archived_context)
3552 .await?;
3553
3554 total_archived += self
3556 .archive_knowledge_items(context, before, &mut archived_context)
3557 .await?;
3558
3559 if total_archived > 0 {
3561 self.save_archived_context(agent_id, &archived_context)
3562 .await?;
3563
3564 context.updated_at = SystemTime::now();
3566 context.metadata.insert(
3567 "last_archived".to_string(),
3568 SystemTime::now()
3569 .duration_since(SystemTime::UNIX_EPOCH)
3570 .unwrap_or_default()
3571 .as_secs()
3572 .to_string(),
3573 );
3574 context
3575 .metadata
3576 .insert("archived_count".to_string(), total_archived.to_string());
3577
3578 if self.config.enable_persistence {
3580 self.persistence.save_context(agent_id, context).await?;
3581 }
3582 }
3583
3584 tracing::info!(
3585 "Archived {} items for agent {} before timestamp {:?}",
3586 total_archived,
3587 agent_id,
3588 before
3589 );
3590 } else {
3591 tracing::warn!("No context found for agent {} during archiving", agent_id);
3592 }
3593
3594 Ok(total_archived)
3595 }
3596
3597 async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError> {
3598 self.validate_access(agent_id, "get_context_stats").await?;
3599
3600 let contexts = self.contexts.read().await;
3601 if let Some(context) = contexts.get(&agent_id) {
3602 let total_memory_items = context.memory.short_term.len()
3603 + context.memory.long_term.len()
3604 + context.memory.episodic_memory.len()
3605 + context.memory.semantic_memory.len();
3606
3607 let total_knowledge_items = context.knowledge_base.facts.len()
3608 + context.knowledge_base.procedures.len()
3609 + context.knowledge_base.learned_patterns.len();
3610
3611 let memory_size_bytes = self.calculate_memory_size_bytes(context);
3613
3614 let retention_stats = self.calculate_retention_statistics(context);
3616
3617 Ok(ContextStats {
3618 total_memory_items,
3619 total_knowledge_items,
3620 total_conversations: context.conversation_history.len(),
3621 total_episodes: context.memory.episodic_memory.len(),
3622 memory_size_bytes,
3623 last_activity: context.updated_at,
3624 retention_status: retention_stats,
3625 })
3626 } else {
3627 Err(ContextError::NotFound {
3628 id: ContextId::new(),
3629 })
3630 }
3631 }
3632
3633 async fn shutdown(&self) -> Result<(), ContextError> {
3634 StandardContextManager::shutdown(self).await
3636 }
3637}
3638
3639#[cfg(test)]
3640mod tests {
3641 use super::*;
3642
3643 #[tokio::test]
3644 async fn check_and_compact_noop_when_below_threshold() {
3645 use super::super::compaction::CompactionConfig;
3646 use super::super::token_counter::HeuristicTokenCounter;
3647
3648 let tmp = tempfile::tempdir().unwrap();
3649 let mut config = ContextManagerConfig::default();
3650 config.persistence_config.root_data_dir = tmp.path().to_path_buf();
3651 let agent_id = AgentId::new();
3652 let manager = StandardContextManager::new(config, &agent_id.to_string())
3653 .await
3654 .unwrap();
3655 manager.initialize().await.unwrap();
3656 let session_id = manager.create_session(agent_id).await.unwrap();
3657
3658 let compaction_config = CompactionConfig::default();
3659 let counter = HeuristicTokenCounter::new(200_000);
3660
3661 let result = manager
3662 .check_and_compact(&agent_id, &session_id, &compaction_config, &counter)
3663 .await
3664 .unwrap();
3665
3666 assert!(
3667 result.is_none(),
3668 "should be no-op when context is nearly empty"
3669 );
3670 }
3671}