use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::RwLock;
use super::types::*;
use super::vector_db::{
EmbeddingService, MockEmbeddingService, QdrantClientWrapper, QdrantConfig, VectorDatabase,
};
use crate::integrations::policy_engine::{
PolicyEngine, MockPolicyEngine
};
use crate::secrets::{SecretStore, SecretsConfig};
use crate::types::AgentId;
#[async_trait]
pub trait ContextManager: Send + Sync {
async fn store_context(
&self,
agent_id: AgentId,
context: AgentContext,
) -> Result<ContextId, ContextError>;
async fn retrieve_context(
&self,
agent_id: AgentId,
session_id: Option<SessionId>,
) -> Result<Option<AgentContext>, ContextError>;
async fn query_context(
&self,
agent_id: AgentId,
query: ContextQuery,
) -> Result<Vec<ContextItem>, ContextError>;
async fn update_memory(
&self,
agent_id: AgentId,
memory_updates: Vec<MemoryUpdate>,
) -> Result<(), ContextError>;
async fn add_knowledge(
&self,
agent_id: AgentId,
knowledge: Knowledge,
) -> Result<KnowledgeId, ContextError>;
async fn search_knowledge(
&self,
agent_id: AgentId,
query: &str,
limit: usize,
) -> Result<Vec<KnowledgeItem>, ContextError>;
async fn share_knowledge(
&self,
from_agent: AgentId,
to_agent: AgentId,
knowledge_id: KnowledgeId,
access_level: AccessLevel,
) -> Result<(), ContextError>;
async fn get_shared_knowledge(
&self,
agent_id: AgentId,
) -> Result<Vec<SharedKnowledgeRef>, ContextError>;
async fn archive_context(
&self,
agent_id: AgentId,
before: SystemTime,
) -> Result<u32, ContextError>;
async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError>;
async fn shutdown(&self) -> Result<(), ContextError>;
}
pub struct StandardContextManager {
contexts: Arc<RwLock<HashMap<AgentId, AgentContext>>>,
config: ContextManagerConfig,
shared_knowledge: Arc<RwLock<HashMap<KnowledgeId, SharedKnowledgeItem>>>,
vector_db: Arc<dyn VectorDatabase>,
embedding_service: Arc<MockEmbeddingService>,
persistence: Arc<dyn ContextPersistence>,
secrets: Box<dyn SecretStore + Send + Sync>,
#[allow(dead_code)]
policy_engine: Arc<dyn PolicyEngine>,
shutdown_flag: Arc<RwLock<bool>>,
background_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
#[derive(Debug, Clone)]
pub struct ContextManagerConfig {
pub max_contexts_in_memory: usize,
pub default_retention_policy: RetentionPolicy,
pub enable_auto_archiving: bool,
pub archiving_interval: std::time::Duration,
pub max_memory_items_per_agent: usize,
pub max_knowledge_items_per_agent: usize,
pub qdrant_config: QdrantConfig,
pub enable_vector_db: bool,
pub persistence_config: FilePersistenceConfig,
pub enable_persistence: bool,
pub secrets_config: SecretsConfig,
}
impl Default for ContextManagerConfig {
fn default() -> Self {
use std::path::PathBuf;
Self {
max_contexts_in_memory: 1000,
default_retention_policy: RetentionPolicy::default(),
enable_auto_archiving: true,
archiving_interval: std::time::Duration::from_secs(3600), max_memory_items_per_agent: 10000,
max_knowledge_items_per_agent: 5000,
qdrant_config: QdrantConfig::default(),
enable_vector_db: true,
persistence_config: FilePersistenceConfig::default(),
enable_persistence: true,
secrets_config: SecretsConfig::file_json(PathBuf::from("secrets.json")),
}
}
}
#[derive(Debug, Clone)]
struct ImportanceWeights {
pub base_importance: f32,
pub access_frequency: f32,
pub recency: f32,
pub user_feedback: f32,
pub no_access_penalty: f32,
}
impl Default for ImportanceWeights {
fn default() -> Self {
Self {
base_importance: 0.3,
access_frequency: 0.25,
recency: 0.3,
user_feedback: 0.15,
no_access_penalty: 0.1,
}
}
}
#[derive(Debug, Clone)]
struct SharedKnowledgeItem {
knowledge: Knowledge,
source_agent: AgentId,
access_level: AccessLevel,
created_at: SystemTime,
access_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ArchivedContext {
agent_id: AgentId,
archived_at: SystemTime,
archive_reason: String,
memory: HierarchicalMemory,
conversation_history: Vec<ConversationItem>,
knowledge_base: KnowledgeBase,
metadata: HashMap<String, String>,
}
impl ArchivedContext {
fn new(agent_id: AgentId, before: SystemTime) -> Self {
Self {
agent_id,
archived_at: SystemTime::now(),
archive_reason: format!("Archiving items before {:?}", before),
memory: HierarchicalMemory::default(),
conversation_history: Vec::new(),
knowledge_base: KnowledgeBase::default(),
metadata: HashMap::new(),
}
}
}
pub struct FilePersistence {
config: FilePersistenceConfig,
}
impl FilePersistence {
pub fn new(config: FilePersistenceConfig) -> Self {
Self { config }
}
pub async fn initialize(&self) -> Result<(), ContextError> {
self.config.ensure_directories()
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to create storage directories: {}", e),
})?;
Ok(())
}
fn get_context_path(&self, agent_id: AgentId) -> PathBuf {
let filename = if self.config.enable_compression {
format!("{}.json.gz", agent_id)
} else {
format!("{}.json", agent_id)
};
self.config.agent_contexts_path().join(filename)
}
async fn serialize_context(&self, context: &AgentContext) -> Result<Vec<u8>, ContextError> {
let json_data =
serde_json::to_vec_pretty(context).map_err(|e| ContextError::SerializationError {
reason: format!("Failed to serialize context: {}", e),
})?;
if self.config.enable_compression {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(&json_data)
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to compress context: {}", e),
})?;
encoder
.finish()
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to finalize compression: {}", e),
})
} else {
Ok(json_data)
}
}
async fn deserialize_context(&self, data: Vec<u8>) -> Result<AgentContext, ContextError> {
let json_data = if self.config.enable_compression {
use flate2::read::GzDecoder;
use std::io::Read;
let mut decoder = GzDecoder::new(&data[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
ContextError::SerializationError {
reason: format!("Failed to decompress context: {}", e),
}
})?;
decompressed
} else {
data
};
serde_json::from_slice(&json_data).map_err(|e| ContextError::SerializationError {
reason: format!("Failed to deserialize context: {}", e),
})
}
async fn create_backup(&self, agent_id: AgentId) -> Result<(), ContextError> {
let context_path = self.get_context_path(agent_id);
if !context_path.exists() {
return Ok(());
}
let backup_path = context_path.with_extension(format!(
"backup.{}.json",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
));
fs::copy(&context_path, &backup_path)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to create backup: {}", e),
})?;
self.cleanup_old_backups(agent_id).await?;
Ok(())
}
async fn cleanup_old_backups(&self, agent_id: AgentId) -> Result<(), ContextError> {
let mut backup_files = Vec::new();
let mut dir = fs::read_dir(&self.config.agent_contexts_path()).await.map_err(|e| {
ContextError::StorageError {
reason: format!("Failed to read storage directory: {}", e),
}
})?;
while let Some(entry) = dir
.next_entry()
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to read directory entry: {}", e),
})?
{
let path = entry.path();
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.starts_with(&format!("{}.backup.", agent_id)) {
if let Ok(metadata) = entry.metadata().await {
if let Ok(modified) = metadata.modified() {
backup_files.push((path, modified));
}
}
}
}
}
backup_files.sort_by(|a, b| b.1.cmp(&a.1));
for (path, _) in backup_files.into_iter().skip(self.config.backup_count) {
if let Err(e) = fs::remove_file(&path).await {
eprintln!(
"Warning: Failed to remove old backup {}: {}",
path.display(),
e
);
}
}
Ok(())
}
}
#[async_trait]
impl ContextPersistence for FilePersistence {
async fn save_context(
&self,
agent_id: AgentId,
context: &AgentContext,
) -> Result<(), ContextError> {
self.create_backup(agent_id).await?;
let data = self.serialize_context(context).await?;
let context_path = self.get_context_path(agent_id);
let mut file =
fs::File::create(&context_path)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to create context file: {}", e),
})?;
file.write_all(&data)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to write context data: {}", e),
})?;
file.sync_all()
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to sync context file: {}", e),
})?;
Ok(())
}
async fn load_context(&self, agent_id: AgentId) -> Result<Option<AgentContext>, ContextError> {
let context_path = self.get_context_path(agent_id);
if !context_path.exists() {
return Ok(None);
}
let mut file =
fs::File::open(&context_path)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to open context file: {}", e),
})?;
let mut data = Vec::new();
file.read_to_end(&mut data)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to read context file: {}", e),
})?;
let context = self.deserialize_context(data).await?;
Ok(Some(context))
}
async fn delete_context(&self, agent_id: AgentId) -> Result<(), ContextError> {
let context_path = self.get_context_path(agent_id);
if context_path.exists() {
fs::remove_file(&context_path)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to delete context file: {}", e),
})?;
}
Ok(())
}
async fn list_agent_contexts(&self) -> Result<Vec<AgentId>, ContextError> {
let mut agent_ids = Vec::new();
let mut dir = fs::read_dir(&self.config.agent_contexts_path()).await.map_err(|e| {
ContextError::StorageError {
reason: format!("Failed to read storage directory: {}", e),
}
})?;
while let Some(entry) = dir
.next_entry()
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to read directory entry: {}", e),
})?
{
let path = entry.path();
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.ends_with(".json") || filename.ends_with(".json.gz") {
let agent_id_str = filename
.strip_suffix(".json.gz")
.or_else(|| filename.strip_suffix(".json"))
.unwrap_or(filename);
if let Ok(uuid) = uuid::Uuid::parse_str(agent_id_str) {
agent_ids.push(AgentId(uuid));
}
}
}
}
Ok(agent_ids)
}
async fn context_exists(&self, agent_id: AgentId) -> Result<bool, ContextError> {
let context_path = self.get_context_path(agent_id);
Ok(context_path.exists())
}
async fn get_storage_stats(&self) -> Result<StorageStats, ContextError> {
let mut total_contexts = 0;
let mut total_size_bytes = 0;
let mut dir = fs::read_dir(&self.config.agent_contexts_path()).await.map_err(|e| {
ContextError::StorageError {
reason: format!("Failed to read storage directory: {}", e),
}
})?;
while let Some(entry) = dir
.next_entry()
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to read directory entry: {}", e),
})?
{
let path = entry.path();
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.ends_with(".json") || filename.ends_with(".json.gz") {
total_contexts += 1;
if let Ok(metadata) = entry.metadata().await {
total_size_bytes += metadata.len();
}
}
}
}
Ok(StorageStats {
total_contexts,
total_size_bytes,
last_cleanup: SystemTime::now(),
storage_path: self.config.agent_contexts_path(),
})
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
impl StandardContextManager {
pub async fn new(config: ContextManagerConfig, agent_id: &str) -> Result<Self, ContextError> {
let vector_db: Arc<dyn VectorDatabase> = if config.enable_vector_db {
Arc::new(QdrantClientWrapper::new(config.qdrant_config.clone()))
} else {
Arc::new(QdrantClientWrapper::new(config.qdrant_config.clone()))
};
let embedding_service = Arc::new(MockEmbeddingService::new(
config.qdrant_config.vector_dimension,
));
let persistence: Arc<dyn ContextPersistence> = if config.enable_persistence {
Arc::new(FilePersistence::new(config.persistence_config.clone()))
} else {
Arc::new(FilePersistence::new(config.persistence_config.clone()))
};
let secrets = crate::secrets::new_secret_store(&config.secrets_config, agent_id)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to initialize secrets store: {}", e),
})?;
let policy_engine: Arc<dyn PolicyEngine> = Arc::new(MockPolicyEngine::new());
Ok(Self {
contexts: Arc::new(RwLock::new(HashMap::new())),
config,
shared_knowledge: Arc::new(RwLock::new(HashMap::new())),
vector_db,
embedding_service,
persistence,
secrets,
policy_engine,
shutdown_flag: Arc::new(RwLock::new(false)),
background_tasks: Arc::new(RwLock::new(Vec::new())),
})
}
pub fn secrets(&self) -> &(dyn SecretStore + Send + Sync) {
self.secrets.as_ref()
}
pub async fn initialize(&self) -> Result<(), ContextError> {
if self.config.enable_vector_db {
self.vector_db.initialize().await?;
}
if self.config.enable_persistence {
if let Some(file_persistence) =
self.persistence.as_any().downcast_ref::<FilePersistence>()
{
file_persistence.initialize().await?;
}
self.load_existing_contexts().await?;
}
self.setup_retention_scheduler().await?;
Ok(())
}
async fn load_existing_contexts(&self) -> Result<(), ContextError> {
let agent_ids = self.persistence.list_agent_contexts().await?;
let mut contexts = self.contexts.write().await;
for agent_id in agent_ids {
if let Some(context) = self.persistence.load_context(agent_id).await? {
contexts.insert(agent_id, context);
}
}
Ok(())
}
pub async fn shutdown(&self) -> Result<(), ContextError> {
{
let shutdown_flag = self.shutdown_flag.read().await;
if *shutdown_flag {
tracing::info!("ContextManager already shutdown, skipping");
return Ok(());
}
}
tracing::info!("Starting ContextManager shutdown sequence");
{
let mut shutdown_flag = self.shutdown_flag.write().await;
*shutdown_flag = true;
}
self.stop_background_tasks().await?;
self.save_all_contexts().await?;
tracing::info!("Vector database connections will be closed when client is dropped");
tracing::info!("Secrets store cleanup handled by Drop trait");
tracing::info!("ContextManager shutdown completed successfully");
Ok(())
}
async fn stop_background_tasks(&self) -> Result<(), ContextError> {
let mut tasks = self.background_tasks.write().await;
if tasks.is_empty() {
tracing::debug!("No background tasks to stop");
return Ok(());
}
tracing::info!("Stopping {} background tasks", tasks.len());
for task in tasks.drain(..) {
task.abort();
match tokio::time::timeout(std::time::Duration::from_secs(5), task).await {
Ok(result) => {
match result {
Ok(_) => tracing::debug!("Background task completed successfully"),
Err(e) if e.is_cancelled() => tracing::debug!("Background task was cancelled"),
Err(e) => tracing::warn!("Background task finished with error: {}", e),
}
}
Err(_) => tracing::warn!("Background task did not finish within timeout"),
}
}
tracing::info!("All background tasks stopped");
Ok(())
}
async fn save_all_contexts(&self) -> Result<(), ContextError> {
if !self.config.enable_persistence {
tracing::debug!("Persistence disabled, skipping context save");
return Ok(());
}
let contexts = self.contexts.read().await;
if contexts.is_empty() {
tracing::debug!("No contexts to save");
return Ok(());
}
tracing::info!("Saving {} contexts to persistent storage", contexts.len());
let mut save_errors = Vec::new();
for (agent_id, context) in contexts.iter() {
match self.persistence.save_context(*agent_id, context).await {
Ok(_) => tracing::debug!("Saved context for agent {}", agent_id),
Err(e) => {
tracing::error!("Failed to save context for agent {}: {}", agent_id, e);
save_errors.push((*agent_id, e));
}
}
}
if !save_errors.is_empty() {
let error_msg = format!(
"Failed to save {} out of {} contexts during shutdown",
save_errors.len(),
contexts.len()
);
tracing::error!("{}", error_msg);
if let Some((agent_id, error)) = save_errors.into_iter().next() {
return Err(ContextError::StorageError {
reason: format!("Failed to save context for agent {}: {}", agent_id, error),
});
}
}
tracing::info!("All contexts saved successfully");
Ok(())
}
async fn setup_retention_scheduler(&self) -> Result<(), ContextError> {
if !self.config.enable_auto_archiving {
tracing::debug!("Auto-archiving disabled, skipping retention scheduler setup");
return Ok(());
}
let contexts = self.contexts.clone();
let persistence = self.persistence.clone();
let config = self.config.clone();
let shutdown_flag = self.shutdown_flag.clone();
let task = tokio::spawn(async move {
let mut interval = tokio::time::interval(config.archiving_interval);
tracing::info!("Retention policy scheduler started with interval {:?}", config.archiving_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if *shutdown_flag.read().await {
tracing::info!("Retention scheduler shutting down");
break;
}
Self::run_retention_check(&contexts, &persistence, &config).await;
}
}
}
});
self.add_background_task(task).await;
tracing::info!("Retention policy scheduler initialized successfully");
Ok(())
}
async fn run_retention_check(
contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
persistence: &Arc<dyn ContextPersistence>,
config: &ContextManagerConfig,
) {
let current_time = SystemTime::now();
let agents_to_check: Vec<(AgentId, usize)> = {
let context_guard = contexts.read().await;
let agent_count = context_guard.len();
if agent_count == 0 {
tracing::debug!("No agent contexts to process for retention");
return;
}
tracing::info!("Starting retention check for {} agent contexts", agent_count);
context_guard.iter()
.map(|(agent_id, context)| {
let retention_stats = Self::calculate_retention_statistics_static(context);
(*agent_id, retention_stats.items_to_archive)
})
.collect()
};
let start_time = std::time::Instant::now();
let total_agents = agents_to_check.len();
for (agent_id, items_to_archive) in agents_to_check {
if items_to_archive > 0 {
tracing::debug!(
"Agent {} has {} items eligible for archiving",
agent_id,
items_to_archive
);
let archive_result = Self::archive_agent_context_static(
agent_id,
current_time,
contexts,
persistence,
config
).await;
match archive_result {
Ok(archived_count) => {
tracing::info!("Successfully archived {} items for agent {}", archived_count, agent_id);
}
Err(e) => {
tracing::error!("Failed to archive context for agent {}: {}", agent_id, e);
}
}
}
}
let elapsed = start_time.elapsed();
tracing::info!("Retention check completed for {} agents in {:?}", total_agents, elapsed);
}
fn calculate_retention_statistics_static(context: &AgentContext) -> RetentionStatus {
let now = SystemTime::now();
let retention_policy = &context.retention_policy;
let mut items_to_archive = 0;
let items_to_delete = 0;
let memory_cutoff = now
.checked_sub(retention_policy.memory_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let knowledge_cutoff = now
.checked_sub(retention_policy.knowledge_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let conversation_cutoff = now
.checked_sub(retention_policy.session_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
for item in &context.memory.short_term {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
items_to_archive += 1;
}
}
for item in &context.memory.long_term {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
items_to_archive += 1;
}
}
for item in &context.conversation_history {
if item.timestamp < conversation_cutoff {
items_to_archive += 1;
}
}
for fact in &context.knowledge_base.facts {
if fact.created_at < knowledge_cutoff {
items_to_archive += 1;
}
}
let next_cleanup = now + Duration::from_secs(86400);
RetentionStatus {
items_to_archive,
items_to_delete,
next_cleanup,
}
}
async fn archive_agent_context_static(
agent_id: AgentId,
before: SystemTime,
contexts: &Arc<RwLock<HashMap<AgentId, AgentContext>>>,
persistence: &Arc<dyn ContextPersistence>,
config: &ContextManagerConfig,
) -> Result<u32, ContextError> {
let mut total_archived = 0u32;
let mut archived_context = ArchivedContext::new(agent_id, before);
let mut contexts_guard = contexts.write().await;
if let Some(context) = contexts_guard.get_mut(&agent_id) {
let retention_policy = &context.retention_policy;
let memory_cutoff = before
.checked_sub(retention_policy.memory_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let conversation_cutoff = before
.checked_sub(retention_policy.session_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let knowledge_cutoff = before
.checked_sub(retention_policy.knowledge_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let mut retained_short_term = Vec::new();
for item in context.memory.short_term.drain(..) {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
archived_context.memory.short_term.push(item);
total_archived += 1;
} else {
retained_short_term.push(item);
}
}
context.memory.short_term = retained_short_term;
let mut retained_long_term = Vec::new();
for item in context.memory.long_term.drain(..) {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
archived_context.memory.long_term.push(item);
total_archived += 1;
} else {
retained_long_term.push(item);
}
}
context.memory.long_term = retained_long_term;
let mut retained_conversations = Vec::new();
for item in context.conversation_history.drain(..) {
if item.timestamp < conversation_cutoff {
archived_context.conversation_history.push(item);
total_archived += 1;
} else {
retained_conversations.push(item);
}
}
context.conversation_history = retained_conversations;
let mut retained_facts = Vec::new();
for fact in context.knowledge_base.facts.drain(..) {
if fact.created_at < knowledge_cutoff {
archived_context.knowledge_base.facts.push(fact);
total_archived += 1;
} else {
retained_facts.push(fact);
}
}
context.knowledge_base.facts = retained_facts;
if total_archived > 0 {
context.updated_at = SystemTime::now();
context.metadata.insert(
"last_archived".to_string(),
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.to_string(),
);
context.metadata.insert(
"archived_count".to_string(),
total_archived.to_string(),
);
if config.enable_persistence {
Self::save_archived_context_static(agent_id, &archived_context, persistence, config).await?;
persistence.save_context(agent_id, context).await?;
}
}
}
Ok(total_archived)
}
async fn save_archived_context_static(
agent_id: AgentId,
archived_context: &ArchivedContext,
_persistence: &Arc<dyn ContextPersistence>,
config: &ContextManagerConfig,
) -> Result<(), ContextError> {
let archive_dir = config.persistence_config
.agent_contexts_path()
.join("archives")
.join(agent_id.to_string());
tokio::fs::create_dir_all(&archive_dir)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to create archive directory: {}", e),
})?;
let timestamp = archived_context.archived_at
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let archive_filename = format!("archive_{}.json", timestamp);
let archive_path = archive_dir.join(archive_filename);
let archive_data = serde_json::to_vec_pretty(archived_context)
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to serialize archived context: {}", e),
})?;
tokio::fs::write(&archive_path, &archive_data)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to write archive file: {}", e),
})?;
tracing::debug!(
"Saved archived context for agent {} to {}",
agent_id,
archive_path.display()
);
Ok(())
}
pub async fn add_background_task(&self, task: tokio::task::JoinHandle<()>) {
let mut tasks = self.background_tasks.write().await;
tasks.push(task);
}
pub async fn is_shutdown(&self) -> bool {
let shutdown_flag = self.shutdown_flag.read().await;
*shutdown_flag
}
pub async fn create_session(&self, agent_id: AgentId) -> Result<SessionId, ContextError> {
let session_id = SessionId::new();
let context = AgentContext {
agent_id,
session_id,
memory: HierarchicalMemory::default(),
knowledge_base: KnowledgeBase::default(),
conversation_history: Vec::new(),
metadata: HashMap::new(),
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
retention_policy: self.config.default_retention_policy.clone(),
};
ContextManager::store_context(self, agent_id, context).await?;
Ok(session_id)
}
async fn validate_access(
&self,
agent_id: AgentId,
operation: &str,
) -> Result<(), ContextError> {
let is_dangerous_operation = matches!(operation, "archive_context");
if is_dangerous_operation {
tracing::warn!("Potentially dangerous operation {} denied for agent {} by default policy", operation, agent_id);
Err(ContextError::AccessDenied {
reason: format!("Operation {} requires explicit approval", operation),
})
} else {
tracing::debug!("Policy engine allowed {} for agent {}", operation, agent_id);
Ok(())
}
}
async fn generate_embeddings(&self, content: &str) -> Result<Vec<f32>, ContextError> {
self.embedding_service.generate_embedding(content).await
}
async fn semantic_search_memory(
&self,
agent_id: AgentId,
query: &str,
limit: usize,
) -> Result<Vec<ContextItem>, ContextError> {
if self.config.enable_vector_db {
let query_embedding = self.generate_embeddings(query).await?;
let threshold = 0.7; self.vector_db
.semantic_search(agent_id, query_embedding, limit, threshold)
.await
} else {
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
let mut results = Vec::new();
for memory_item in &context.memory.short_term {
if memory_item
.content
.to_lowercase()
.contains(&query.to_lowercase())
{
let importance_score = self.calculate_importance(memory_item);
let relevance_score = (importance_score + 0.8) / 2.0;
results.push(ContextItem {
id: memory_item.id,
content: memory_item.content.clone(),
item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
relevance_score,
timestamp: memory_item.created_at,
metadata: memory_item.metadata.clone(),
});
}
}
results.truncate(limit);
Ok(results)
} else {
Ok(Vec::new())
}
}
}
fn calculate_importance(&self, memory_item: &MemoryItem) -> f32 {
let weights = ImportanceWeights::default();
let base_score = memory_item.importance.clamp(0.0, 1.0);
let access_score = if memory_item.access_count == 0 {
weights.no_access_penalty
} else {
let log_access = (memory_item.access_count as f32 + 1.0).ln();
(log_access / 10.0).min(1.0) };
let recency_score = self.calculate_recency_factor(memory_item);
let feedback_score = self.extract_user_feedback_score(memory_item);
let type_multiplier = self.get_memory_type_multiplier(&memory_item.memory_type);
let age_decay = self.calculate_age_decay(memory_item);
let combined_score = (
base_score * weights.base_importance +
access_score * weights.access_frequency +
recency_score * weights.recency +
feedback_score * weights.user_feedback
) * type_multiplier * age_decay;
combined_score.clamp(0.0, 1.0)
}
fn calculate_recency_factor(&self, memory_item: &MemoryItem) -> f32 {
let now = SystemTime::now();
let most_recent = memory_item.last_accessed.max(memory_item.created_at);
let time_since_access = now
.duration_since(most_recent)
.unwrap_or_else(|_| std::time::Duration::from_secs(0));
let hours_since_access = time_since_access.as_secs() as f32 / 3600.0;
let half_life_hours = 24.0; let decay_factor = 2.0_f32.powf(-hours_since_access / half_life_hours);
decay_factor.max(0.01)
}
fn extract_user_feedback_score(&self, memory_item: &MemoryItem) -> f32 {
let mut feedback_score = 0.5;
if let Some(rating_str) = memory_item.metadata.get("user_rating") {
if let Ok(rating) = rating_str.parse::<f32>() {
feedback_score = (rating / 5.0).clamp(0.0, 1.0); }
}
if let Some(helpful_str) = memory_item.metadata.get("helpful") {
match helpful_str.to_lowercase().as_str() {
"true" | "yes" | "1" => feedback_score = feedback_score.max(0.8),
"false" | "no" | "0" => feedback_score = feedback_score.min(0.2),
_ => {}
}
}
if memory_item.metadata.contains_key("corrected") ||
memory_item.metadata.contains_key("incorrect") {
feedback_score = feedback_score.min(0.3);
}
if memory_item.metadata.contains_key("bookmarked") ||
memory_item.metadata.contains_key("favorite") {
feedback_score = feedback_score.max(0.9);
}
if let Some(context) = memory_item.metadata.get("usage_context") {
match context.to_lowercase().as_str() {
"critical" | "important" => feedback_score = feedback_score.max(0.95),
"routine" | "common" => feedback_score = feedback_score.max(0.7),
"experimental" | "trial" => feedback_score = feedback_score.min(0.4),
_ => {}
}
}
feedback_score.clamp(0.0, 1.0)
}
fn get_memory_type_multiplier(&self, memory_type: &MemoryType) -> f32 {
match memory_type {
MemoryType::Factual => 1.0, MemoryType::Procedural => 1.2, MemoryType::Episodic => 0.9, MemoryType::Semantic => 1.1, MemoryType::Working => 1.3, }
}
fn calculate_age_decay(&self, memory_item: &MemoryItem) -> f32 {
let now = SystemTime::now();
let age = now
.duration_since(memory_item.created_at)
.unwrap_or_else(|_| std::time::Duration::from_secs(0));
let days_old = age.as_secs() as f32 / 86400.0;
let decay_rate = match memory_item.memory_type {
MemoryType::Working => 0.1, MemoryType::Factual => 0.01, MemoryType::Procedural => 0.005, MemoryType::Episodic => 0.02, MemoryType::Semantic => 0.008, };
let decay_factor = (-decay_rate * days_old).exp();
decay_factor.max(0.05)
}
async fn keyword_search_memory(
&self,
agent_id: AgentId,
query: &ContextQuery,
) -> Result<Vec<ContextItem>, ContextError> {
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
let mut results = Vec::new();
let search_terms: Vec<String> = query.search_terms.iter()
.map(|term| term.to_lowercase())
.collect();
if search_terms.is_empty() {
return Ok(results);
}
for memory_item in context.memory.short_term.iter()
.chain(context.memory.long_term.iter()) {
if !query.memory_types.is_empty() &&
!query.memory_types.contains(&memory_item.memory_type) {
continue;
}
let content_lower = memory_item.content.to_lowercase();
let mut match_score = 0.0f32;
let mut matched_terms = 0;
for term in &search_terms {
if content_lower.contains(term) {
matched_terms += 1;
if content_lower.split_whitespace().any(|word| word == term) {
match_score += 1.0;
} else {
match_score += 0.5;
}
}
}
if matched_terms > 0 {
let importance_score = self.calculate_importance(memory_item);
let term_coverage = matched_terms as f32 / search_terms.len() as f32;
let relevance_score = (match_score * term_coverage + importance_score) / 2.0;
if relevance_score >= query.relevance_threshold {
results.push(ContextItem {
id: memory_item.id,
content: memory_item.content.clone(),
item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
relevance_score,
timestamp: memory_item.created_at,
metadata: memory_item.metadata.clone(),
});
}
}
}
for episode in &context.memory.episodic_memory {
let episode_content = format!("{} {}", episode.title, episode.description);
let content_lower = episode_content.to_lowercase();
let mut match_score = 0.0f32;
let mut matched_terms = 0;
for term in &search_terms {
if content_lower.contains(term) {
matched_terms += 1;
match_score += if content_lower.split_whitespace().any(|word| word == term) {
1.0
} else {
0.5
};
}
}
if matched_terms > 0 {
let term_coverage = matched_terms as f32 / search_terms.len() as f32;
let relevance_score = (match_score * term_coverage + episode.importance) / 2.0;
if relevance_score >= query.relevance_threshold {
results.push(ContextItem {
id: episode.id,
content: episode_content,
item_type: ContextItemType::Episode,
relevance_score,
timestamp: episode.timestamp,
metadata: HashMap::new(),
});
}
}
}
for conv_item in &context.conversation_history {
let content_lower = conv_item.content.to_lowercase();
let mut match_score = 0.0f32;
let mut matched_terms = 0;
for term in &search_terms {
if content_lower.contains(term) {
matched_terms += 1;
match_score += if content_lower.split_whitespace().any(|word| word == term) {
1.0
} else {
0.5
};
}
}
if matched_terms > 0 {
let term_coverage = matched_terms as f32 / search_terms.len() as f32;
let relevance_score = match_score * term_coverage;
if relevance_score >= query.relevance_threshold {
results.push(ContextItem {
id: conv_item.id,
content: conv_item.content.clone(),
item_type: ContextItemType::Conversation,
relevance_score,
timestamp: conv_item.timestamp,
metadata: HashMap::new(),
});
}
}
}
results.sort_by(|a, b| b.relevance_score.partial_cmp(&a.relevance_score).unwrap_or(std::cmp::Ordering::Equal));
results.truncate(query.max_results);
Ok(results)
} else {
Ok(Vec::new())
}
}
async fn temporal_search_memory(
&self,
agent_id: AgentId,
query: &ContextQuery,
) -> Result<Vec<ContextItem>, ContextError> {
let time_range = match &query.time_range {
Some(range) => range,
None => return Ok(Vec::new()), };
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
let mut results = Vec::new();
for memory_item in context.memory.short_term.iter()
.chain(context.memory.long_term.iter()) {
if !query.memory_types.is_empty() &&
!query.memory_types.contains(&memory_item.memory_type) {
continue;
}
if memory_item.created_at >= time_range.start &&
memory_item.created_at <= time_range.end {
let passes_keyword_filter = if query.search_terms.is_empty() {
true
} else {
let content_lower = memory_item.content.to_lowercase();
query.search_terms.iter().any(|term|
content_lower.contains(&term.to_lowercase())
)
};
if passes_keyword_filter {
let importance_score = self.calculate_importance(memory_item);
let recency_score = self.calculate_recency_factor(memory_item);
let relevance_score = (importance_score + recency_score) / 2.0;
if relevance_score >= query.relevance_threshold {
results.push(ContextItem {
id: memory_item.id,
content: memory_item.content.clone(),
item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
relevance_score,
timestamp: memory_item.created_at,
metadata: memory_item.metadata.clone(),
});
}
}
}
}
for episode in &context.memory.episodic_memory {
if episode.timestamp >= time_range.start &&
episode.timestamp <= time_range.end {
let passes_keyword_filter = if query.search_terms.is_empty() {
true
} else {
let episode_content = format!("{} {}", episode.title, episode.description);
let content_lower = episode_content.to_lowercase();
query.search_terms.iter().any(|term|
content_lower.contains(&term.to_lowercase())
)
};
if passes_keyword_filter && episode.importance >= query.relevance_threshold {
results.push(ContextItem {
id: episode.id,
content: format!("{} {}", episode.title, episode.description),
item_type: ContextItemType::Episode,
relevance_score: episode.importance,
timestamp: episode.timestamp,
metadata: HashMap::new(),
});
}
}
}
for conv_item in &context.conversation_history {
if conv_item.timestamp >= time_range.start &&
conv_item.timestamp <= time_range.end {
let passes_keyword_filter = if query.search_terms.is_empty() {
true
} else {
let content_lower = conv_item.content.to_lowercase();
query.search_terms.iter().any(|term|
content_lower.contains(&term.to_lowercase())
)
};
if passes_keyword_filter {
let time_since_start = conv_item.timestamp
.duration_since(time_range.start)
.unwrap_or_default()
.as_secs() as f32;
let range_duration = time_range.end
.duration_since(time_range.start)
.unwrap_or_default()
.as_secs() as f32;
let temporal_score = if range_duration > 0.0 {
1.0 - (time_since_start / range_duration)
} else {
1.0
};
if temporal_score >= query.relevance_threshold {
results.push(ContextItem {
id: conv_item.id,
content: conv_item.content.clone(),
item_type: ContextItemType::Conversation,
relevance_score: temporal_score,
timestamp: conv_item.timestamp,
metadata: HashMap::new(),
});
}
}
}
}
results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
results.truncate(query.max_results);
Ok(results)
} else {
Ok(Vec::new())
}
}
async fn similarity_search_memory(
&self,
agent_id: AgentId,
query: &ContextQuery,
) -> Result<Vec<ContextItem>, ContextError> {
if self.config.enable_vector_db && !query.search_terms.is_empty() {
let search_term = query.search_terms.join(" ");
let query_embedding = self.generate_embeddings(&search_term).await?;
let threshold = query.relevance_threshold;
self.vector_db
.semantic_search(agent_id, query_embedding, query.max_results, threshold)
.await
} else {
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
if query.search_terms.is_empty() {
return Ok(Vec::new());
}
let search_term = query.search_terms.join(" ");
let query_embedding = match self.generate_embeddings(&search_term).await {
Ok(embedding) => embedding,
Err(_) => return Ok(Vec::new()), };
let mut results = Vec::new();
for memory_item in context.memory.short_term.iter()
.chain(context.memory.long_term.iter()) {
if !query.memory_types.is_empty() &&
!query.memory_types.contains(&memory_item.memory_type) {
continue;
}
if let Some(ref item_embedding) = memory_item.embedding {
let similarity = self.cosine_similarity(&query_embedding, item_embedding);
if similarity >= query.relevance_threshold {
let importance_score = self.calculate_importance(memory_item);
let relevance_score = (similarity + importance_score) / 2.0;
results.push(ContextItem {
id: memory_item.id,
content: memory_item.content.clone(),
item_type: ContextItemType::Memory(memory_item.memory_type.clone()),
relevance_score,
timestamp: memory_item.created_at,
metadata: memory_item.metadata.clone(),
});
}
}
}
results.sort_by(|a, b| b.relevance_score.partial_cmp(&a.relevance_score).unwrap_or(std::cmp::Ordering::Equal));
results.truncate(query.max_results);
Ok(results)
} else {
Ok(Vec::new())
}
}
}
async fn hybrid_search_memory(
&self,
agent_id: AgentId,
query: &ContextQuery,
) -> Result<Vec<ContextItem>, ContextError> {
let keyword_results = self.keyword_search_memory(agent_id, query).await?;
let similarity_results = self.similarity_search_memory(agent_id, query).await?;
let mut combined_results: HashMap<ContextId, ContextItem> = HashMap::new();
let keyword_weight = 0.4;
let similarity_weight = 0.6;
for mut item in keyword_results {
item.relevance_score *= keyword_weight;
combined_results.insert(item.id, item);
}
for mut item in similarity_results {
item.relevance_score *= similarity_weight;
if let Some(existing_item) = combined_results.get_mut(&item.id) {
existing_item.relevance_score += item.relevance_score;
existing_item.relevance_score = existing_item.relevance_score.max(item.relevance_score);
} else {
combined_results.insert(item.id, item);
}
}
let mut final_results: Vec<ContextItem> = combined_results.into_values().collect();
final_results.sort_by(|a, b| b.relevance_score.partial_cmp(&a.relevance_score).unwrap_or(std::cmp::Ordering::Equal));
final_results.retain(|item| item.relevance_score >= query.relevance_threshold);
final_results.truncate(query.max_results);
Ok(final_results)
}
fn cosine_similarity(&self, a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() {
return 0.0;
}
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if magnitude_a == 0.0 || magnitude_b == 0.0 {
0.0
} else {
dot_product / (magnitude_a * magnitude_b)
}
}
fn calculate_knowledge_relevance(&self, content: &str, query: &str, confidence: f32) -> Option<f32> {
let content_lower = content.to_lowercase();
let query_lower = query.to_lowercase();
let query_terms: Vec<&str> = query_lower.split_whitespace().collect();
if query_terms.is_empty() {
return None;
}
let mut total_match_score = 0.0f32;
let mut matched_terms = 0;
for term in &query_terms {
if content_lower.contains(term) {
matched_terms += 1;
if content_lower.split_whitespace().any(|word| word == *term) {
total_match_score += 1.0; } else {
total_match_score += 0.6; }
let occurrences = content_lower.matches(term).count();
if occurrences > 1 {
total_match_score += (occurrences as f32 - 1.0) * 0.1;
}
}
}
if matched_terms == 0 {
return None;
}
let term_coverage = matched_terms as f32 / query_terms.len() as f32;
let match_quality = total_match_score / query_terms.len() as f32;
let base_score = match_quality * 0.5 + term_coverage * 0.3 + confidence * 0.2;
let content_length_factor = if content.len() < 50 {
1.0 } else if content.len() < 200 {
0.95 } else {
0.9 };
let position_bonus = if content_lower.starts_with(&query_lower) {
0.1 } else if query_terms.iter().any(|term| content_lower.starts_with(term)) {
0.05 } else {
0.0
};
let final_score = (base_score + position_bonus) * content_length_factor;
Some(final_score.clamp(0.0, 1.0))
}
fn calculate_trust_score(&self, shared_item: &SharedKnowledgeItem) -> f32 {
let mut trust_score = 0.5;
let access_factor = (shared_item.access_count as f32 + 1.0).ln() / 10.0;
trust_score += access_factor;
let knowledge_factor = match &shared_item.knowledge {
Knowledge::Fact(_) => 0.2,
Knowledge::Procedure(_) => 0.1,
Knowledge::Pattern(_) => 0.05,
};
trust_score += knowledge_factor;
trust_score.clamp(0.0, 1.0)
}
fn calculate_memory_size_bytes(&self, context: &AgentContext) -> usize {
let mut total_size = 0;
total_size += std::mem::size_of::<WorkingMemory>();
for (key, value) in &context.memory.working_memory.variables {
total_size += key.len();
total_size += estimate_json_value_size(value);
}
for goal in &context.memory.working_memory.active_goals {
total_size += goal.len();
}
if let Some(ref current_context) = context.memory.working_memory.current_context {
total_size += current_context.len();
}
for focus in &context.memory.working_memory.attention_focus {
total_size += focus.len();
}
for item in &context.memory.short_term {
total_size += estimate_memory_item_size(item);
}
for item in &context.memory.long_term {
total_size += estimate_memory_item_size(item);
}
for episode in &context.memory.episodic_memory {
total_size += estimate_episode_size(episode);
}
for item in &context.memory.semantic_memory {
total_size += estimate_semantic_memory_item_size(item);
}
for fact in &context.knowledge_base.facts {
total_size += estimate_knowledge_fact_size(fact);
}
for procedure in &context.knowledge_base.procedures {
total_size += estimate_procedure_size(procedure);
}
for pattern in &context.knowledge_base.learned_patterns {
total_size += estimate_pattern_size(pattern);
}
for item in &context.conversation_history {
total_size += estimate_conversation_item_size(item);
}
for (key, value) in &context.metadata {
total_size += key.len() + value.len();
}
total_size += std::mem::size_of::<AgentContext>();
total_size
}
fn calculate_retention_statistics(&self, context: &AgentContext) -> RetentionStatus {
let now = SystemTime::now();
let retention_policy = &context.retention_policy;
let mut items_to_archive = 0;
let mut items_to_delete = 0;
let memory_cutoff = now
.checked_sub(retention_policy.memory_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let knowledge_cutoff = now
.checked_sub(retention_policy.knowledge_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let conversation_cutoff = now
.checked_sub(retention_policy.session_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
for item in &context.memory.short_term {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
items_to_archive += 1;
}
}
for item in &context.memory.long_term {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
items_to_archive += 1;
}
}
for episode in &context.memory.episodic_memory {
if episode.timestamp < memory_cutoff {
items_to_archive += 1;
}
}
let semantic_cutoff = now
.checked_sub(retention_policy.memory_retention * 2)
.unwrap_or(SystemTime::UNIX_EPOCH);
for item in &context.memory.semantic_memory {
if item.created_at < semantic_cutoff {
items_to_archive += 1;
}
}
for fact in &context.knowledge_base.facts {
if fact.created_at < knowledge_cutoff {
items_to_archive += 1;
}
}
for procedure in &context.knowledge_base.procedures {
if procedure.success_rate < 0.3 {
items_to_archive += 1;
}
}
for pattern in &context.knowledge_base.learned_patterns {
if pattern.confidence < 0.4 || pattern.occurrences < 2 {
items_to_archive += 1;
}
}
for item in &context.conversation_history {
if item.timestamp < conversation_cutoff {
items_to_archive += 1;
}
}
let delete_cutoff_memory = now
.checked_sub(retention_policy.memory_retention * 2)
.unwrap_or(SystemTime::UNIX_EPOCH);
let delete_cutoff_knowledge = now
.checked_sub(retention_policy.knowledge_retention * 2)
.unwrap_or(SystemTime::UNIX_EPOCH);
let delete_cutoff_conversation = now
.checked_sub(retention_policy.session_retention * 2)
.unwrap_or(SystemTime::UNIX_EPOCH);
for item in &context.memory.short_term {
if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
items_to_delete += 1;
}
}
for item in &context.memory.long_term {
if item.created_at < delete_cutoff_memory && item.last_accessed < delete_cutoff_memory {
items_to_delete += 1;
}
}
for fact in &context.knowledge_base.facts {
if fact.created_at < delete_cutoff_knowledge && !fact.verified {
items_to_delete += 1;
}
}
for item in &context.conversation_history {
if item.timestamp < delete_cutoff_conversation {
items_to_delete += 1;
}
}
let next_cleanup = now + Duration::from_secs(86400);
RetentionStatus {
items_to_archive,
items_to_delete,
next_cleanup,
}
}
async fn archive_memory_items(
&self,
context: &mut AgentContext,
before: SystemTime,
archived_context: &mut ArchivedContext,
) -> Result<u32, ContextError> {
let mut archived_count = 0u32;
let retention_policy = &context.retention_policy;
let memory_cutoff = before
.checked_sub(retention_policy.memory_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let mut retained_short_term = Vec::new();
for item in context.memory.short_term.drain(..) {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
archived_context.memory.short_term.push(item);
archived_count += 1;
} else {
retained_short_term.push(item);
}
}
context.memory.short_term = retained_short_term;
let mut retained_long_term = Vec::new();
for item in context.memory.long_term.drain(..) {
if item.created_at < memory_cutoff || item.last_accessed < memory_cutoff {
archived_context.memory.long_term.push(item);
archived_count += 1;
} else {
retained_long_term.push(item);
}
}
context.memory.long_term = retained_long_term;
let mut retained_episodes = Vec::new();
for episode in context.memory.episodic_memory.drain(..) {
if episode.timestamp < memory_cutoff {
archived_context.memory.episodic_memory.push(episode);
archived_count += 1;
} else {
retained_episodes.push(episode);
}
}
context.memory.episodic_memory = retained_episodes;
let semantic_cutoff = before
.checked_sub(retention_policy.memory_retention * 2)
.unwrap_or(SystemTime::UNIX_EPOCH);
let mut retained_semantic = Vec::new();
for item in context.memory.semantic_memory.drain(..) {
if item.created_at < semantic_cutoff {
archived_context.memory.semantic_memory.push(item);
archived_count += 1;
} else {
retained_semantic.push(item);
}
}
context.memory.semantic_memory = retained_semantic;
Ok(archived_count)
}
async fn archive_conversation_history(
&self,
context: &mut AgentContext,
before: SystemTime,
archived_context: &mut ArchivedContext,
) -> Result<u32, ContextError> {
let mut archived_count = 0u32;
let retention_policy = &context.retention_policy;
let conversation_cutoff = before
.checked_sub(retention_policy.session_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let mut retained_conversations = Vec::new();
for item in context.conversation_history.drain(..) {
if item.timestamp < conversation_cutoff {
archived_context.conversation_history.push(item);
archived_count += 1;
} else {
retained_conversations.push(item);
}
}
context.conversation_history = retained_conversations;
Ok(archived_count)
}
async fn archive_knowledge_items(
&self,
context: &mut AgentContext,
before: SystemTime,
archived_context: &mut ArchivedContext,
) -> Result<u32, ContextError> {
let mut archived_count = 0u32;
let retention_policy = &context.retention_policy;
let knowledge_cutoff = before
.checked_sub(retention_policy.knowledge_retention)
.unwrap_or(SystemTime::UNIX_EPOCH);
let mut retained_facts = Vec::new();
for fact in context.knowledge_base.facts.drain(..) {
if fact.created_at < knowledge_cutoff {
archived_context.knowledge_base.facts.push(fact);
archived_count += 1;
} else {
retained_facts.push(fact);
}
}
context.knowledge_base.facts = retained_facts;
let mut retained_procedures = Vec::new();
for procedure in context.knowledge_base.procedures.drain(..) {
if procedure.success_rate < 0.3 {
archived_context.knowledge_base.procedures.push(procedure);
archived_count += 1;
} else {
retained_procedures.push(procedure);
}
}
context.knowledge_base.procedures = retained_procedures;
let mut retained_patterns = Vec::new();
for pattern in context.knowledge_base.learned_patterns.drain(..) {
if pattern.confidence < 0.4 || pattern.occurrences < 2 {
archived_context.knowledge_base.learned_patterns.push(pattern);
archived_count += 1;
} else {
retained_patterns.push(pattern);
}
}
context.knowledge_base.learned_patterns = retained_patterns;
Ok(archived_count)
}
async fn save_archived_context(
&self,
agent_id: AgentId,
archived_context: &ArchivedContext,
) -> Result<(), ContextError> {
if !self.config.enable_persistence {
return Ok(());
}
let archive_dir = self.get_archive_directory_path(agent_id).await?;
let timestamp = archived_context.archived_at
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let archive_filename = format!("archive_{}.json", timestamp);
let archive_path = archive_dir.join(archive_filename);
let archive_data = serde_json::to_vec_pretty(archived_context)
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to serialize archived context: {}", e),
})?;
let final_data = if self.config.persistence_config.enable_compression {
self.compress_data(&archive_data)?
} else {
archive_data
};
let temp_path = archive_path.with_extension("tmp");
fs::write(&temp_path, &final_data)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to write archive file: {}", e),
})?;
fs::rename(&temp_path, &archive_path)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to finalize archive file: {}", e),
})?;
tracing::info!(
"Saved archived context for agent {} to {}",
agent_id,
archive_path.display()
);
Ok(())
}
async fn get_archive_directory_path(&self, agent_id: AgentId) -> Result<PathBuf, ContextError> {
let archive_dir = self.config.persistence_config
.agent_contexts_path()
.join("archives")
.join(agent_id.to_string());
fs::create_dir_all(&archive_dir)
.await
.map_err(|e| ContextError::StorageError {
reason: format!("Failed to create archive directory: {}", e),
})?;
Ok(archive_dir)
}
fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>, ContextError> {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder
.write_all(data)
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to compress archive data: {}", e),
})?;
encoder
.finish()
.map_err(|e| ContextError::SerializationError {
reason: format!("Failed to finalize compression: {}", e),
})
}
fn knowledge_to_item(
&self,
knowledge: &Knowledge,
knowledge_id: KnowledgeId,
) -> Result<KnowledgeItem, ContextError> {
match knowledge {
Knowledge::Fact(fact) => {
Ok(KnowledgeItem {
id: knowledge_id,
content: format!("{} {} {}", fact.subject, fact.predicate, fact.object),
knowledge_type: KnowledgeType::Fact,
confidence: fact.confidence,
relevance_score: 1.0, source: fact.source.clone(),
created_at: fact.created_at,
})
}
Knowledge::Procedure(procedure) => {
Ok(KnowledgeItem {
id: knowledge_id,
content: format!("{}: {}", procedure.name, procedure.description),
knowledge_type: KnowledgeType::Procedure,
confidence: procedure.success_rate,
relevance_score: 1.0, source: KnowledgeSource::Learning,
created_at: SystemTime::now(),
})
}
Knowledge::Pattern(pattern) => {
Ok(KnowledgeItem {
id: knowledge_id,
content: format!("Pattern: {}", pattern.description),
knowledge_type: KnowledgeType::Pattern,
confidence: pattern.confidence,
relevance_score: 1.0, source: KnowledgeSource::Learning,
created_at: SystemTime::now(),
})
}
}
}
}
fn estimate_json_value_size(value: &Value) -> usize {
match value {
Value::Null => 4,
Value::Bool(_) => 1,
Value::Number(n) => std::mem::size_of::<f64>() + n.to_string().len(),
Value::String(s) => s.len(),
Value::Array(arr) => {
arr.iter().map(estimate_json_value_size).sum::<usize>() + std::mem::size_of::<Vec<Value>>()
}
Value::Object(obj) => {
obj.iter()
.map(|(k, v)| k.len() + estimate_json_value_size(v))
.sum::<usize>()
+ std::mem::size_of::<serde_json::Map<String, Value>>()
}
}
}
fn estimate_memory_item_size(item: &MemoryItem) -> usize {
let mut size = std::mem::size_of::<MemoryItem>();
size += item.content.len();
if let Some(ref embedding) = item.embedding {
size += embedding.len() * std::mem::size_of::<f32>();
}
for (key, value) in &item.metadata {
size += key.len() + value.len();
}
size
}
fn estimate_episode_size(episode: &Episode) -> usize {
let mut size = std::mem::size_of::<Episode>();
size += episode.title.len();
size += episode.description.len();
if let Some(ref outcome) = episode.outcome {
size += outcome.len();
}
for event in &episode.events {
size += event.action.len();
size += event.result.len();
for (key, value) in &event.context {
size += key.len() + value.len();
}
}
for lesson in &episode.lessons_learned {
size += lesson.len();
}
size
}
fn estimate_semantic_memory_item_size(item: &SemanticMemoryItem) -> usize {
let mut size = std::mem::size_of::<SemanticMemoryItem>();
size += item.concept.len();
for relationship in &item.relationships {
size += relationship.target_concept.len();
size += std::mem::size_of::<ConceptRelationship>();
}
for (key, value) in &item.properties {
size += key.len() + estimate_json_value_size(value);
}
size
}
fn estimate_knowledge_fact_size(fact: &KnowledgeFact) -> usize {
std::mem::size_of::<KnowledgeFact>()
+ fact.subject.len()
+ fact.predicate.len()
+ fact.object.len()
}
fn estimate_procedure_size(procedure: &Procedure) -> usize {
let mut size = std::mem::size_of::<Procedure>();
size += procedure.name.len();
size += procedure.description.len();
for step in &procedure.steps {
size += step.action.len();
size += step.expected_result.len();
if let Some(ref error_handling) = step.error_handling {
size += error_handling.len();
}
}
for condition in &procedure.preconditions {
size += condition.len();
}
for condition in &procedure.postconditions {
size += condition.len();
}
size
}
fn estimate_pattern_size(pattern: &Pattern) -> usize {
let mut size = std::mem::size_of::<Pattern>();
size += pattern.name.len();
size += pattern.description.len();
for condition in &pattern.conditions {
size += condition.len();
}
for outcome in &pattern.outcomes {
size += outcome.len();
}
size
}
fn estimate_conversation_item_size(item: &ConversationItem) -> usize {
let mut size = std::mem::size_of::<ConversationItem>();
size += item.content.len();
size += item.context_used.len() * std::mem::size_of::<ContextId>();
size += item.knowledge_used.len() * std::mem::size_of::<KnowledgeId>();
size
}
#[async_trait]
impl ContextManager for StandardContextManager {
async fn store_context(
&self,
agent_id: AgentId,
mut context: AgentContext,
) -> Result<ContextId, ContextError> {
self.validate_access(agent_id, "store_context").await?;
context.updated_at = SystemTime::now();
let context_id = ContextId::new();
if self.config.enable_persistence {
self.persistence.save_context(agent_id, &context).await?;
}
let mut contexts = self.contexts.write().await;
contexts.insert(agent_id, context);
Ok(context_id)
}
async fn retrieve_context(
&self,
agent_id: AgentId,
session_id: Option<SessionId>,
) -> Result<Option<AgentContext>, ContextError> {
self.validate_access(agent_id, "retrieve_context").await?;
{
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
if let Some(sid) = session_id {
if context.session_id == sid {
return Ok(Some(context.clone()));
}
} else {
return Ok(Some(context.clone()));
}
}
}
if self.config.enable_persistence {
if let Some(context) = self.persistence.load_context(agent_id).await? {
if let Some(sid) = session_id {
if context.session_id != sid {
return Ok(None);
}
}
let mut contexts = self.contexts.write().await;
contexts.insert(agent_id, context.clone());
return Ok(Some(context));
}
}
Ok(None)
}
async fn query_context(
&self,
agent_id: AgentId,
query: ContextQuery,
) -> Result<Vec<ContextItem>, ContextError> {
self.validate_access(agent_id, "query_context").await?;
match query.query_type {
QueryType::Semantic => {
let search_term = query.search_terms.join(" ");
self.semantic_search_memory(agent_id, &search_term, query.max_results)
.await
}
QueryType::Keyword => {
self.keyword_search_memory(agent_id, &query).await
}
QueryType::Temporal => {
self.temporal_search_memory(agent_id, &query).await
}
QueryType::Similarity => {
self.similarity_search_memory(agent_id, &query).await
}
QueryType::Hybrid => {
self.hybrid_search_memory(agent_id, &query).await
}
}
}
async fn update_memory(
&self,
agent_id: AgentId,
memory_updates: Vec<MemoryUpdate>,
) -> Result<(), ContextError> {
self.validate_access(agent_id, "update_memory").await?;
let mut contexts = self.contexts.write().await;
if let Some(context) = contexts.get_mut(&agent_id) {
for update in memory_updates {
match update.operation {
UpdateOperation::Add => {
match update.target {
MemoryTarget::ShortTerm(_) => {
if let Ok(memory_item_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
let memory_item = MemoryItem {
id: ContextId::new(),
content: memory_item_data.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
memory_type: memory_item_data.get("memory_type")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or(MemoryType::Factual),
importance: memory_item_data.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.5) as f32,
access_count: 0,
last_accessed: SystemTime::now(),
created_at: SystemTime::now(),
embedding: None,
metadata: memory_item_data.get("metadata")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
};
context.memory.short_term.push(memory_item);
}
}
MemoryTarget::LongTerm(_) => {
if let Ok(memory_item_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
let memory_item = MemoryItem {
id: ContextId::new(),
content: memory_item_data.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
memory_type: memory_item_data.get("memory_type")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or(MemoryType::Factual),
importance: memory_item_data.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.5) as f32,
access_count: 0,
last_accessed: SystemTime::now(),
created_at: SystemTime::now(),
embedding: None,
metadata: memory_item_data.get("metadata")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
};
context.memory.long_term.push(memory_item);
}
}
MemoryTarget::Working(key) => {
context.memory.working_memory.variables.insert(key, update.data);
}
MemoryTarget::Episodic(_) => {
if let Ok(episode_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
let episode = Episode {
id: ContextId::new(),
title: episode_data.get("title")
.and_then(|v| v.as_str())
.unwrap_or("Untitled Episode")
.to_string(),
description: episode_data.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
events: episode_data.get("events")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
outcome: episode_data.get("outcome")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
lessons_learned: episode_data.get("lessons_learned")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
timestamp: SystemTime::now(),
importance: episode_data.get("importance")
.and_then(|v| v.as_f64())
.unwrap_or(0.5) as f32,
};
context.memory.episodic_memory.push(episode);
}
}
MemoryTarget::Semantic(_) => {
if let Ok(semantic_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
let semantic_item = SemanticMemoryItem {
id: ContextId::new(),
concept: semantic_data.get("concept")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
relationships: semantic_data.get("relationships")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
properties: semantic_data.get("properties")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default(),
confidence: semantic_data.get("confidence")
.and_then(|v| v.as_f64())
.unwrap_or(0.5) as f32,
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
};
context.memory.semantic_memory.push(semantic_item);
}
}
}
}
UpdateOperation::Update => {
match update.target {
MemoryTarget::ShortTerm(target_id) => {
if let Some(memory_item) = context.memory.short_term.iter_mut()
.find(|item| item.id == target_id) {
if let Ok(update_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(content) = update_data.get("content").and_then(|v| v.as_str()) {
memory_item.content = content.to_string();
}
if let Some(importance) = update_data.get("importance").and_then(|v| v.as_f64()) {
memory_item.importance = importance as f32;
}
if let Some(metadata) = update_data.get("metadata") {
if let Ok(new_metadata) = serde_json::from_value(metadata.clone()) {
memory_item.metadata = new_metadata;
}
}
memory_item.last_accessed = SystemTime::now();
memory_item.access_count += 1;
}
}
}
MemoryTarget::LongTerm(target_id) => {
if let Some(memory_item) = context.memory.long_term.iter_mut()
.find(|item| item.id == target_id) {
if let Ok(update_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(content) = update_data.get("content").and_then(|v| v.as_str()) {
memory_item.content = content.to_string();
}
if let Some(importance) = update_data.get("importance").and_then(|v| v.as_f64()) {
memory_item.importance = importance as f32;
}
if let Some(metadata) = update_data.get("metadata") {
if let Ok(new_metadata) = serde_json::from_value(metadata.clone()) {
memory_item.metadata = new_metadata;
}
}
memory_item.last_accessed = SystemTime::now();
memory_item.access_count += 1;
}
}
}
MemoryTarget::Working(key) => {
context.memory.working_memory.variables.insert(key, update.data);
}
MemoryTarget::Episodic(target_id) => {
if let Some(episode) = context.memory.episodic_memory.iter_mut()
.find(|ep| ep.id == target_id) {
if let Ok(update_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(title) = update_data.get("title").and_then(|v| v.as_str()) {
episode.title = title.to_string();
}
if let Some(description) = update_data.get("description").and_then(|v| v.as_str()) {
episode.description = description.to_string();
}
if let Some(outcome) = update_data.get("outcome").and_then(|v| v.as_str()) {
episode.outcome = Some(outcome.to_string());
}
if let Some(importance) = update_data.get("importance").and_then(|v| v.as_f64()) {
episode.importance = importance as f32;
}
if let Some(lessons) = update_data.get("lessons_learned") {
if let Ok(new_lessons) = serde_json::from_value(lessons.clone()) {
episode.lessons_learned = new_lessons;
}
}
}
}
}
MemoryTarget::Semantic(target_id) => {
if let Some(semantic_item) = context.memory.semantic_memory.iter_mut()
.find(|item| item.id == target_id) {
if let Ok(update_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(concept) = update_data.get("concept").and_then(|v| v.as_str()) {
semantic_item.concept = concept.to_string();
}
if let Some(confidence) = update_data.get("confidence").and_then(|v| v.as_f64()) {
semantic_item.confidence = confidence as f32;
}
if let Some(relationships) = update_data.get("relationships") {
if let Ok(new_relationships) = serde_json::from_value(relationships.clone()) {
semantic_item.relationships = new_relationships;
}
}
if let Some(properties) = update_data.get("properties") {
if let Ok(new_properties) = serde_json::from_value(properties.clone()) {
semantic_item.properties = new_properties;
}
}
semantic_item.updated_at = SystemTime::now();
}
}
}
}
}
UpdateOperation::Delete => {
match update.target {
MemoryTarget::ShortTerm(target_id) => {
context.memory.short_term.retain(|item| item.id != target_id);
}
MemoryTarget::LongTerm(target_id) => {
context.memory.long_term.retain(|item| item.id != target_id);
}
MemoryTarget::Working(key) => {
context.memory.working_memory.variables.remove(&key);
}
MemoryTarget::Episodic(target_id) => {
context.memory.episodic_memory.retain(|ep| ep.id != target_id);
}
MemoryTarget::Semantic(target_id) => {
context.memory.semantic_memory.retain(|item| item.id != target_id);
}
}
}
UpdateOperation::Increment => {
match update.target {
MemoryTarget::ShortTerm(target_id) => {
if let Some(memory_item) = context.memory.short_term.iter_mut()
.find(|item| item.id == target_id) {
if let Ok(increment_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(importance_increment) = increment_data.get("importance").and_then(|v| v.as_f64()) {
memory_item.importance = (memory_item.importance + importance_increment as f32).clamp(0.0, 1.0);
}
if let Some(access_increment) = increment_data.get("access_count").and_then(|v| v.as_u64()) {
memory_item.access_count = memory_item.access_count.saturating_add(access_increment as u32);
}
memory_item.last_accessed = SystemTime::now();
}
}
}
MemoryTarget::LongTerm(target_id) => {
if let Some(memory_item) = context.memory.long_term.iter_mut()
.find(|item| item.id == target_id) {
if let Ok(increment_data) = serde_json::from_value::<serde_json::Map<String, Value>>(update.data) {
if let Some(importance_increment) = increment_data.get("importance").and_then(|v| v.as_f64()) {
memory_item.importance = (memory_item.importance + importance_increment as f32).clamp(0.0, 1.0);
}
if let Some(access_increment) = increment_data.get("access_count").and_then(|v| v.as_u64()) {
memory_item.access_count = memory_item.access_count.saturating_add(access_increment as u32);
}
memory_item.last_accessed = SystemTime::now();
}
}
}
MemoryTarget::Working(key) => {
if let Some(existing_value) = context.memory.working_memory.variables.get(&key) {
if let (Some(current), Some(increment)) = (existing_value.as_f64(), update.data.as_f64()) {
let new_value = current + increment;
context.memory.working_memory.variables.insert(key, Value::Number(
serde_json::Number::from_f64(new_value).unwrap_or_else(|| serde_json::Number::from(0))
));
} else if let (Some(current), Some(increment)) = (existing_value.as_i64(), update.data.as_i64()) {
let new_value = current.saturating_add(increment);
context.memory.working_memory.variables.insert(key, Value::Number(
serde_json::Number::from(new_value)
));
}
}
}
MemoryTarget::Episodic(target_id) => {
if let Some(episode) = context.memory.episodic_memory.iter_mut()
.find(|ep| ep.id == target_id) {
if let Some(importance_increment) = update.data.as_f64() {
episode.importance = (episode.importance + importance_increment as f32).clamp(0.0, 1.0);
}
}
}
MemoryTarget::Semantic(target_id) => {
if let Some(semantic_item) = context.memory.semantic_memory.iter_mut()
.find(|item| item.id == target_id) {
if let Some(confidence_increment) = update.data.as_f64() {
semantic_item.confidence = (semantic_item.confidence + confidence_increment as f32).clamp(0.0, 1.0);
semantic_item.updated_at = SystemTime::now();
}
}
}
}
}
}
}
context.updated_at = SystemTime::now();
if self.config.enable_persistence {
if let Err(e) = self.persistence.save_context(agent_id, context).await {
tracing::error!("Failed to persist memory updates for agent {}: {}", agent_id, e);
return Err(e);
}
}
} else {
return Err(ContextError::NotFound { id: ContextId::new() });
}
Ok(())
}
async fn add_knowledge(
&self,
agent_id: AgentId,
knowledge: Knowledge,
) -> Result<KnowledgeId, ContextError> {
self.validate_access(agent_id, "add_knowledge").await?;
let knowledge_id = KnowledgeId::new();
if self.config.enable_vector_db {
let knowledge_item = self.knowledge_to_item(&knowledge, knowledge_id)?;
let embedding = self.generate_embeddings(&knowledge_item.content).await?;
let _vector_id = self
.vector_db
.store_knowledge_item(&knowledge_item, embedding)
.await?;
}
let mut contexts = self.contexts.write().await;
if let Some(context) = contexts.get_mut(&agent_id) {
match knowledge {
Knowledge::Fact(fact) => {
context.knowledge_base.facts.push(fact);
}
Knowledge::Procedure(procedure) => {
context.knowledge_base.procedures.push(procedure);
}
Knowledge::Pattern(pattern) => {
context.knowledge_base.learned_patterns.push(pattern);
}
}
context.updated_at = SystemTime::now();
}
Ok(knowledge_id)
}
async fn search_knowledge(
&self,
agent_id: AgentId,
query: &str,
limit: usize,
) -> Result<Vec<KnowledgeItem>, ContextError> {
self.validate_access(agent_id, "search_knowledge").await?;
if self.config.enable_vector_db {
let query_embedding = self.generate_embeddings(query).await?;
self.vector_db
.search_knowledge_base(agent_id, query_embedding, limit)
.await
} else {
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
let mut results = Vec::new();
for fact in &context.knowledge_base.facts {
let content = format!("{} {} {}", fact.subject, fact.predicate, fact.object);
if let Some(relevance_score) = self.calculate_knowledge_relevance(&content, query, fact.confidence) {
results.push(KnowledgeItem {
id: fact.id,
content,
knowledge_type: KnowledgeType::Fact,
confidence: fact.confidence,
relevance_score,
source: fact.source.clone(),
created_at: fact.created_at,
});
}
}
for procedure in &context.knowledge_base.procedures {
let searchable_content = format!("{} {}", procedure.name, procedure.description);
if let Some(relevance_score) = self.calculate_knowledge_relevance(&searchable_content, query, procedure.success_rate) {
results.push(KnowledgeItem {
id: procedure.id,
content: format!("{}: {}", procedure.name, procedure.description),
knowledge_type: KnowledgeType::Procedure,
confidence: procedure.success_rate,
relevance_score,
source: KnowledgeSource::Learning,
created_at: SystemTime::now(), });
}
}
for pattern in &context.knowledge_base.learned_patterns {
let searchable_content = format!("{} {}", pattern.name, pattern.description);
if let Some(relevance_score) = self.calculate_knowledge_relevance(&searchable_content, query, pattern.confidence) {
results.push(KnowledgeItem {
id: pattern.id,
content: format!("Pattern: {}", pattern.description),
knowledge_type: KnowledgeType::Pattern,
confidence: pattern.confidence,
relevance_score,
source: KnowledgeSource::Learning,
created_at: SystemTime::now(), });
}
}
results.sort_by(|a, b| b.relevance_score.partial_cmp(&a.relevance_score).unwrap_or(std::cmp::Ordering::Equal));
results.truncate(limit);
Ok(results)
} else {
Ok(Vec::new())
}
}
}
async fn share_knowledge(
&self,
from_agent: AgentId,
_to_agent: AgentId,
knowledge_id: KnowledgeId,
access_level: AccessLevel,
) -> Result<(), ContextError> {
self.validate_access(from_agent, "share_knowledge").await?;
let contexts = self.contexts.read().await;
if let Some(from_context) = contexts.get(&from_agent) {
let knowledge = if let Some(fact) = from_context
.knowledge_base
.facts
.iter()
.find(|f| f.id == knowledge_id)
{
Some(Knowledge::Fact(fact.clone()))
} else if let Some(procedure) = from_context
.knowledge_base
.procedures
.iter()
.find(|p| p.id == knowledge_id)
{
Some(Knowledge::Procedure(procedure.clone()))
} else {
from_context
.knowledge_base
.learned_patterns
.iter()
.find(|p| p.id == knowledge_id)
.map(|pattern| Knowledge::Pattern(pattern.clone()))
};
if let Some(knowledge) = knowledge {
let shared_item = SharedKnowledgeItem {
knowledge,
source_agent: from_agent,
access_level,
created_at: SystemTime::now(),
access_count: 0,
};
let mut shared_knowledge = self.shared_knowledge.write().await;
shared_knowledge.insert(knowledge_id, shared_item);
Ok(())
} else {
Err(ContextError::KnowledgeNotFound { id: knowledge_id })
}
} else {
Err(ContextError::NotFound {
id: ContextId::new(),
})
}
}
async fn get_shared_knowledge(
&self,
agent_id: AgentId,
) -> Result<Vec<SharedKnowledgeRef>, ContextError> {
self.validate_access(agent_id, "get_shared_knowledge")
.await?;
let shared_knowledge = self.shared_knowledge.read().await;
let mut results = Vec::new();
for (knowledge_id, shared_item) in shared_knowledge.iter() {
match shared_item.access_level {
AccessLevel::Public => {
let trust_score = self.calculate_trust_score(shared_item);
tracing::debug!(
"Shared knowledge {} accessed {} times, trust score: {}",
knowledge_id,
shared_item.access_count,
trust_score
);
results.push(SharedKnowledgeRef {
knowledge_id: *knowledge_id,
source_agent: shared_item.source_agent,
shared_at: shared_item.created_at,
access_level: shared_item.access_level.clone(),
trust_score,
});
}
_ => {
}
}
}
Ok(results)
}
async fn archive_context(
&self,
agent_id: AgentId,
before: SystemTime,
) -> Result<u32, ContextError> {
self.validate_access(agent_id, "archive_context").await?;
let mut total_archived = 0u32;
let mut archived_context = ArchivedContext::new(agent_id, before);
let mut contexts = self.contexts.write().await;
if let Some(context) = contexts.get_mut(&agent_id) {
total_archived += self.archive_memory_items(context, before, &mut archived_context).await?;
total_archived += self.archive_conversation_history(context, before, &mut archived_context).await?;
total_archived += self.archive_knowledge_items(context, before, &mut archived_context).await?;
if total_archived > 0 {
self.save_archived_context(agent_id, &archived_context).await?;
context.updated_at = SystemTime::now();
context.metadata.insert(
"last_archived".to_string(),
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.to_string(),
);
context.metadata.insert(
"archived_count".to_string(),
total_archived.to_string(),
);
if self.config.enable_persistence {
self.persistence.save_context(agent_id, context).await?;
}
}
tracing::info!(
"Archived {} items for agent {} before timestamp {:?}",
total_archived,
agent_id,
before
);
} else {
tracing::warn!("No context found for agent {} during archiving", agent_id);
}
Ok(total_archived)
}
async fn get_context_stats(&self, agent_id: AgentId) -> Result<ContextStats, ContextError> {
self.validate_access(agent_id, "get_context_stats").await?;
let contexts = self.contexts.read().await;
if let Some(context) = contexts.get(&agent_id) {
let total_memory_items = context.memory.short_term.len()
+ context.memory.long_term.len()
+ context.memory.episodic_memory.len()
+ context.memory.semantic_memory.len();
let total_knowledge_items = context.knowledge_base.facts.len()
+ context.knowledge_base.procedures.len()
+ context.knowledge_base.learned_patterns.len();
let memory_size_bytes = self.calculate_memory_size_bytes(context);
let retention_stats = self.calculate_retention_statistics(context);
Ok(ContextStats {
total_memory_items,
total_knowledge_items,
total_conversations: context.conversation_history.len(),
total_episodes: context.memory.episodic_memory.len(),
memory_size_bytes,
last_activity: context.updated_at,
retention_status: retention_stats,
})
} else {
Err(ContextError::NotFound {
id: ContextId::new(),
})
}
}
async fn shutdown(&self) -> Result<(), ContextError> {
StandardContextManager::shutdown(self).await
}
}