use crate::{errors::*, types::*};
use cortex_mem_core::{
CortexFilesystem,
FilesystemOperations,
SessionConfig,
SessionManager,
automation::{
AbstractConfig, AutoExtractConfig, AutoExtractor, AutoIndexer, AutomationConfig,
AutomationManager, IndexerConfig, LayerGenerationConfig, LayerGenerator, OverviewConfig,
SyncConfig, SyncManager,
},
embedding::{EmbeddingClient, EmbeddingConfig},
events::EventBus,
layers::manager::LayerManager,
llm::LLMClient,
search::VectorSearchEngine,
vector_store::{QdrantVectorStore, VectorStore},
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::warn;
pub struct MemoryOperations {
pub(crate) filesystem: Arc<CortexFilesystem>,
pub(crate) session_manager: Arc<RwLock<SessionManager>>,
pub(crate) layer_manager: Arc<LayerManager>,
pub(crate) vector_engine: Arc<VectorSearchEngine>,
pub(crate) auto_extractor: Option<Arc<AutoExtractor>>,
pub(crate) layer_generator: Option<Arc<LayerGenerator>>,
pub(crate) auto_indexer: Option<Arc<AutoIndexer>>,
pub(crate) embedding_client: Arc<EmbeddingClient>,
pub(crate) vector_store: Arc<QdrantVectorStore>,
pub(crate) llm_client: Arc<dyn LLMClient>,
pub(crate) default_user_id: String,
pub(crate) default_agent_id: String,
pub(crate) memory_event_tx:
Option<tokio::sync::mpsc::UnboundedSender<cortex_mem_core::memory_events::MemoryEvent>>,
pub(crate) event_coordinator: Option<Arc<cortex_mem_core::MemoryEventCoordinator>>,
}
impl MemoryOperations {
pub fn filesystem(&self) -> &Arc<CortexFilesystem> {
&self.filesystem
}
pub fn vector_engine(&self) -> &Arc<VectorSearchEngine> {
&self.vector_engine
}
pub fn session_manager(&self) -> &Arc<RwLock<SessionManager>> {
&self.session_manager
}
pub fn auto_extractor(&self) -> Option<&Arc<AutoExtractor>> {
self.auto_extractor.as_ref()
}
pub fn layer_generator(&self) -> Option<&Arc<LayerGenerator>> {
self.layer_generator.as_ref()
}
pub fn auto_indexer(&self) -> Option<&Arc<AutoIndexer>> {
self.auto_indexer.as_ref()
}
pub fn default_user_id(&self) -> &str {
&self.default_user_id
}
pub fn default_agent_id(&self) -> &str {
&self.default_agent_id
}
pub fn memory_event_tx(
&self,
) -> Option<&tokio::sync::mpsc::UnboundedSender<cortex_mem_core::memory_events::MemoryEvent>>
{
self.memory_event_tx.as_ref()
}
pub async fn new(
data_dir: &str,
tenant_id: impl Into<String>,
llm_client: Arc<dyn LLMClient>,
qdrant_url: &str,
qdrant_collection: &str,
qdrant_api_key: Option<&str>,
embedding_api_base_url: &str,
embedding_api_key: &str,
embedding_model_name: &str,
embedding_dim: Option<usize>,
user_id: Option<String>,
) -> Result<Self> {
let tenant_id = tenant_id.into();
let filesystem = Arc::new(CortexFilesystem::with_tenant(data_dir, &tenant_id));
filesystem.initialize().await?;
let (event_bus, mut event_rx_main) = EventBus::new();
tracing::info!("Initializing Qdrant vector store: {}", qdrant_url);
let qdrant_config = cortex_mem_core::QdrantConfig {
url: qdrant_url.to_string(),
collection_name: qdrant_collection.to_string(),
embedding_dim,
timeout_secs: 30,
api_key: qdrant_api_key
.map(|s| s.to_string())
.or_else(|| std::env::var("QDRANT_API_KEY").ok()),
tenant_id: Some(tenant_id.clone()), };
let vector_store = Arc::new(QdrantVectorStore::new(&qdrant_config).await?);
tracing::info!(
"Qdrant connected successfully, collection: {}",
qdrant_config.get_collection_name()
);
tracing::info!(
"Initializing Embedding client with model: {}",
embedding_model_name
);
let embedding_config = EmbeddingConfig {
api_base_url: embedding_api_base_url.to_string(),
api_key: embedding_api_key.to_string(),
model_name: embedding_model_name.to_string(),
batch_size: 10,
timeout_secs: 30,
};
let embedding_client = Arc::new(EmbeddingClient::new(embedding_config)?);
tracing::info!("Embedding client initialized");
let (coordinator, memory_event_tx, event_rx) = cortex_mem_core::MemoryEventCoordinator::new(
filesystem.clone(),
llm_client.clone(),
embedding_client.clone(),
vector_store.clone(),
);
let coordinator_clone = coordinator.clone();
tokio::spawn(coordinator.start(event_rx));
tracing::info!("MemoryEventCoordinator started for v2.5 incremental updates");
let config = SessionConfig::default();
let session_manager = SessionManager::with_llm_and_events(
filesystem.clone(),
config,
llm_client.clone(),
event_bus.clone(),
)
.with_memory_event_tx(memory_event_tx.clone());
let session_manager = Arc::new(RwLock::new(session_manager));
let layer_manager = Arc::new(LayerManager::new(filesystem.clone(), llm_client.clone()));
let vector_engine = Arc::new(VectorSearchEngine::with_llm(
vector_store.clone(),
embedding_client.clone(),
filesystem.clone(),
llm_client.clone(),
));
tracing::info!("Vector search engine created with LLM support for query rewriting");
let actual_user_id = user_id.unwrap_or_else(|| tenant_id.clone());
let auto_extract_config = AutoExtractConfig {
min_message_count: 5,
extract_on_close: false, };
let auto_extractor = Arc::new(AutoExtractor::with_user_id(
filesystem.clone(),
llm_client.clone(),
auto_extract_config,
&actual_user_id,
));
let indexer_config = IndexerConfig {
auto_index: true,
batch_size: 10,
async_index: true,
};
let auto_indexer = Arc::new(AutoIndexer::new(
filesystem.clone(),
embedding_client.clone(),
vector_store.clone(),
indexer_config,
));
let automation_config = AutomationConfig {
auto_index: true,
auto_extract: false, index_on_message: true, index_on_close: true, index_batch_delay: 1,
auto_generate_layers_on_startup: false, generate_layers_every_n_messages: 5, max_concurrent_llm_tasks: 3, };
let layer_gen_config = LayerGenerationConfig {
batch_size: 10,
delay_ms: 1000,
auto_generate_on_startup: false,
abstract_config: AbstractConfig {
max_tokens: 400,
max_chars: 2000,
target_sentences: 2,
},
overview_config: OverviewConfig {
max_tokens: 1500,
max_chars: 6000,
},
};
let layer_generator = Arc::new(LayerGenerator::new(
filesystem.clone(),
llm_client.clone(),
layer_gen_config,
));
let automation_manager = AutomationManager::new(
auto_indexer.clone(),
None, automation_config,
)
.with_layer_generator(layer_generator.clone());
let (tx_automation, rx_automation) = tokio::sync::mpsc::unbounded_channel();
let (tx_extractor, rx_extractor) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
while let Some(event) = event_rx_main.recv().await {
let _ = tx_automation.send(event.clone());
let _ = tx_extractor.send(event);
}
});
let tenant_id_for_automation = tenant_id.clone();
tokio::spawn(async move {
tracing::info!(
"Starting AutomationManager for tenant {}",
tenant_id_for_automation
);
if let Err(e) = automation_manager.start(rx_automation).await {
tracing::error!("AutomationManager stopped with error: {}", e);
}
});
let extractor_clone = auto_extractor.clone();
let tenant_id_clone = tenant_id.clone();
tokio::spawn(async move {
tracing::info!(
"Starting AutoExtractor event listener for tenant {}",
tenant_id_clone
);
let mut rx = rx_extractor;
while let Some(event) = rx.recv().await {
if let cortex_mem_core::CortexEvent::Session(session_event) = event {
match session_event {
cortex_mem_core::SessionEvent::Closed { session_id } => {
tracing::info!("Session closed event received: {}", session_id);
match extractor_clone.extract_session(&session_id).await {
Ok(stats) => {
tracing::info!(
"Extraction completed for session {}: {:?}",
session_id,
stats
);
}
Err(e) => {
tracing::error!(
"Extraction failed for session {}: {}",
session_id,
e
);
}
}
}
_ => {} }
}
}
});
let sync_manager = SyncManager::new(
filesystem.clone(),
embedding_client.clone(),
vector_store.clone(),
llm_client.clone(),
SyncConfig::default(),
);
let _fs_clone = filesystem.clone();
tokio::spawn(async move {
tracing::info!("Starting background sync to vector database...");
match sync_manager.sync_all().await {
Ok(stats) => {
tracing::info!(
"Auto-sync completed: {} files indexed, {} files skipped",
stats.indexed_files,
stats.skipped_files
);
}
Err(e) => {
tracing::warn!("Auto-sync failed: {}", e);
}
}
});
Ok(Self {
filesystem,
session_manager,
layer_manager,
vector_engine,
auto_extractor: Some(auto_extractor),
layer_generator: Some(layer_generator), auto_indexer: Some(auto_indexer),
embedding_client,
vector_store,
llm_client,
default_user_id: actual_user_id,
default_agent_id: tenant_id.clone(),
memory_event_tx: Some(memory_event_tx),
event_coordinator: Some(coordinator_clone),
})
}
pub async fn add_message(&self, thread_id: &str, role: &str, content: &str) -> Result<String> {
let thread_id = if thread_id.is_empty() {
"default"
} else {
thread_id
};
let sm = self.session_manager.read().await;
if !sm.session_exists(thread_id).await? {
drop(sm);
let sm = self.session_manager.write().await;
sm.create_session_with_ids(
thread_id,
Some(self.default_user_id.clone()),
Some(self.default_agent_id.clone()),
)
.await?;
drop(sm);
} else {
if let Ok(metadata) = sm.load_session(thread_id).await {
let needs_update = metadata.user_id.is_none() || metadata.agent_id.is_none();
if needs_update {
drop(sm);
let sm = self.session_manager.write().await;
if let Ok(mut metadata) = sm.load_session(thread_id).await {
if metadata.user_id.is_none() {
metadata.user_id = Some(self.default_user_id.clone());
}
if metadata.agent_id.is_none() {
metadata.agent_id = Some(self.default_agent_id.clone());
}
let _ = sm.update_session(&metadata).await;
tracing::info!("Updated session {} with user_id and agent_id", thread_id);
}
drop(sm);
}
}
}
let sm = self.session_manager.read().await;
let message_role = match role {
"user" => cortex_mem_core::MessageRole::User,
"assistant" => cortex_mem_core::MessageRole::Assistant,
"system" => cortex_mem_core::MessageRole::System,
_ => cortex_mem_core::MessageRole::User,
};
let message = sm
.add_message(thread_id, message_role, content.to_string())
.await?;
let message_uri = format!(
"cortex://session/{}/timeline/{}/{}/{}_{}.md",
thread_id,
message.timestamp.format("%Y-%m"),
message.timestamp.format("%d"),
message.timestamp.format("%H_%M_%S"),
&message.id[..8]
);
tracing::info!(
"Added message to session {}, URI: {}",
thread_id,
message_uri
);
Ok(message_uri)
}
pub async fn list_sessions(&self) -> Result<Vec<SessionInfo>> {
let entries = self.filesystem.list("cortex://session").await?;
let mut session_infos = Vec::new();
for entry in entries {
if entry.is_directory {
let thread_id = entry.name;
if let Ok(metadata) = self
.session_manager
.read()
.await
.load_session(&thread_id)
.await
{
let status_str = match metadata.status {
cortex_mem_core::session::manager::SessionStatus::Active => "active",
cortex_mem_core::session::manager::SessionStatus::Closed => "closed",
cortex_mem_core::session::manager::SessionStatus::Archived => "archived",
};
session_infos.push(SessionInfo {
thread_id: metadata.thread_id,
status: status_str.to_string(),
message_count: 0,
created_at: metadata.created_at,
updated_at: metadata.updated_at,
});
}
}
}
Ok(session_infos)
}
pub async fn get_session(&self, thread_id: &str) -> Result<SessionInfo> {
let sm = self.session_manager.read().await;
let metadata = sm.load_session(thread_id).await?;
let status_str = match metadata.status {
cortex_mem_core::session::manager::SessionStatus::Active => "active",
cortex_mem_core::session::manager::SessionStatus::Closed => "closed",
cortex_mem_core::session::manager::SessionStatus::Archived => "archived",
};
Ok(SessionInfo {
thread_id: metadata.thread_id,
status: status_str.to_string(),
message_count: 0,
created_at: metadata.created_at,
updated_at: metadata.updated_at,
})
}
pub async fn close_session(&self, thread_id: &str) -> Result<()> {
let mut sm = self.session_manager.write().await;
sm.close_session(thread_id).await?;
tracing::info!("Closed session: {}", thread_id);
Ok(())
}
pub async fn read_file(&self, uri: &str) -> Result<String> {
let content = self.filesystem.read(uri).await?;
Ok(content)
}
pub async fn list_files(&self, uri: &str) -> Result<Vec<String>> {
let entries = self.filesystem.list(uri).await?;
let uris = entries.into_iter().map(|e| e.uri).collect();
Ok(uris)
}
pub async fn delete(&self, uri: &str) -> Result<()> {
let l0_id =
cortex_mem_core::uri_to_vector_id(uri, cortex_mem_core::ContextLayer::L0Abstract);
let l1_id =
cortex_mem_core::uri_to_vector_id(uri, cortex_mem_core::ContextLayer::L1Overview);
let l2_id = cortex_mem_core::uri_to_vector_id(uri, cortex_mem_core::ContextLayer::L2Detail);
let _ = self.vector_store.delete(&l0_id).await;
let _ = self.vector_store.delete(&l1_id).await;
let _ = self.vector_store.delete(&l2_id).await;
tracing::info!(
"Deleted vectors for URI: {} (L0: {}, L1: {}, L2: {})",
uri,
l0_id,
l1_id,
l2_id
);
self.filesystem.delete(uri).await?;
tracing::info!("Deleted file: {}", uri);
Ok(())
}
pub async fn exists(&self, uri: &str) -> Result<bool> {
let exists = self
.filesystem
.exists(uri)
.await
.map_err(ToolsError::Core)?;
Ok(exists)
}
pub async fn ensure_all_layers(&self) -> Result<cortex_mem_core::automation::GenerationStats> {
if let Some(ref generator) = self.layer_generator {
tracing::info!("Starting scan and generation of missing L0/L1 layer files...");
match generator.ensure_all_layers().await {
Ok(stats) => {
tracing::info!(
"L0/L1 layer generation completed: total={}, generated={}, failed={}",
stats.total,
stats.generated,
stats.failed
);
Ok(stats)
}
Err(e) => {
tracing::error!("L0/L1 layer generation failed: {}", e);
Err(e.into())
}
}
} else {
tracing::warn!("LayerGenerator not configured, skipping layer generation");
Ok(cortex_mem_core::automation::GenerationStats::default())
}
}
pub async fn ensure_session_layers(
&self,
session_id: &str,
) -> Result<cortex_mem_core::automation::GenerationStats> {
if let Some(ref generator) = self.layer_generator {
let timeline_uri = format!("cortex://session/{}/timeline", session_id);
tracing::info!("Generating L0/L1 layer files for session {}", session_id);
match generator.ensure_timeline_layers(&timeline_uri).await {
Ok(stats) => {
tracing::info!(
"Session {} L0/L1 layer generation completed: total={}, generated={}, failed={}",
session_id,
stats.total,
stats.generated,
stats.failed
);
Ok(stats)
}
Err(e) => {
tracing::error!("Session {} L0/L1 layer generation failed: {}", session_id, e);
Err(e.into())
}
}
} else {
tracing::warn!("LayerGenerator not configured, skipping layer generation");
Ok(cortex_mem_core::automation::GenerationStats::default())
}
}
pub async fn index_all_files(&self) -> Result<cortex_mem_core::automation::SyncStats> {
tracing::info!("Starting to index all files to vector database...");
use cortex_mem_core::automation::{SyncConfig, SyncManager};
let sync_manager = SyncManager::new(
self.filesystem.clone(),
self.embedding_client.clone(),
self.vector_store.clone(),
self.llm_client.clone(), SyncConfig::default(),
);
match sync_manager.sync_all().await {
Ok(stats) => {
tracing::info!(
"Indexing completed: {} total files, {} indexed, {} skipped, {} errors",
stats.total_files,
stats.indexed_files,
stats.skipped_files,
stats.error_files
);
Ok(stats)
}
Err(e) => {
tracing::error!("Indexing failed: {}", e);
Err(e.into())
}
}
}
pub async fn index_session_files(
&self,
session_id: &str,
) -> Result<cortex_mem_core::automation::SyncStats> {
tracing::info!("Starting to index files to vector database for session {}...", session_id);
use cortex_mem_core::automation::{SyncConfig, SyncManager};
let sync_manager = SyncManager::new(
self.filesystem.clone(),
self.embedding_client.clone(),
self.vector_store.clone(),
self.llm_client.clone(),
SyncConfig::default(),
);
let session_uri = format!("cortex://session/{}", session_id);
match sync_manager.sync_specific_path(&session_uri).await {
Ok(stats) => {
tracing::info!(
"Session {} indexing completed: {} total files, {} indexed, {} skipped, {} errors",
session_id,
stats.total_files,
stats.indexed_files,
stats.skipped_files,
stats.error_files
);
Ok(stats)
}
Err(e) => {
tracing::error!("Session {} indexing failed: {}", session_id, e);
Err(e.into())
}
}
}
pub async fn wait_for_background_tasks(&self, max_wait_secs: u64) -> bool {
use std::time::Duration;
if let Some(ref coordinator) = self.event_coordinator {
coordinator
.wait_for_completion(Duration::from_secs(max_wait_secs))
.await
} else {
warn!("MemoryEventCoordinator not initialized, using simple wait");
tokio::time::sleep(Duration::from_secs(max_wait_secs.min(5))).await;
true
}
}
pub async fn flush_and_wait(&self, check_interval_secs: Option<u64>) -> bool {
let interval = std::time::Duration::from_secs(check_interval_secs.unwrap_or(1));
if let Some(ref coordinator) = self.event_coordinator {
coordinator.flush_and_wait(interval).await
} else {
warn!("MemoryEventCoordinator not initialized, skipping wait");
true
}
}
pub async fn trigger_processing(
&self,
scope: &str,
owner_id: &str,
) -> crate::Result<ProcessingResult> {
let memory_scope = match scope.to_lowercase().as_str() {
"user" => cortex_mem_core::MemoryScope::User,
"agent" => cortex_mem_core::MemoryScope::Agent,
"session" => cortex_mem_core::MemoryScope::Session,
"resources" => cortex_mem_core::MemoryScope::Resources,
_ => {
return Err(crate::ToolsError::ValidationError(format!(
"Invalid scope: {}. Valid values: user, agent, session, resources",
scope
)));
}
};
tracing::info!("Manual trigger processing for {:?}/{}", memory_scope, owner_id);
if let Some(ref coordinator) = self.event_coordinator {
coordinator
.force_full_update(&memory_scope, owner_id)
.await?;
} else {
warn!("MemoryEventCoordinator not initialized, cannot trigger processing");
return Ok(ProcessingResult::default());
}
let sync_stats = match memory_scope {
cortex_mem_core::MemoryScope::Session => {
self.index_session_files(owner_id).await?
}
_ => {
self.index_all_files().await?
}
};
Ok(ProcessingResult {
scope: scope.to_string(),
owner_id: owner_id.to_string(),
layers_updated: sync_stats.indexed_files,
vectors_indexed: sync_stats.indexed_files,
})
}
pub async fn pending_status(&self) -> PendingStatus {
if let Some(ref coordinator) = self.event_coordinator {
let stats = coordinator.get_stats().await;
PendingStatus {
memory_created: stats.memory_created,
memory_updated: stats.memory_updated,
memory_deleted: stats.memory_deleted,
layers_updated: stats.layers_updated,
sessions_closed: stats.sessions_closed,
}
} else {
PendingStatus::default()
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ProcessingResult {
pub scope: String,
pub owner_id: String,
pub layers_updated: usize,
pub vectors_indexed: usize,
}
#[derive(Debug, Clone, Default)]
pub struct PendingStatus {
pub memory_created: u64,
pub memory_updated: u64,
pub memory_deleted: u64,
pub layers_updated: u64,
pub sessions_closed: u64,
}