use crate::{
Result,
automation::{AutoIndexer, AutomationConfig, AutomationManager, IndexerConfig},
embedding::{EmbeddingClient, EmbeddingConfig},
events::EventBus,
filesystem::CortexFilesystem,
llm::LLMClient,
memory_event_coordinator::{CoordinatorConfig, MemoryEventCoordinator},
session::{SessionConfig, SessionManager},
vector_store::{QdrantVectorStore, VectorStore},
};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, warn};
pub struct CortexMemBuilder {
data_dir: PathBuf,
embedding_config: Option<EmbeddingConfig>,
qdrant_config: Option<crate::config::QdrantConfig>,
llm_client: Option<Arc<dyn LLMClient>>,
session_config: SessionConfig,
coordinator_config: Option<CoordinatorConfig>,
}
impl CortexMemBuilder {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
embedding_config: None,
qdrant_config: None,
llm_client: None,
session_config: SessionConfig::default(),
coordinator_config: None,
}
}
pub fn with_embedding(mut self, config: EmbeddingConfig) -> Self {
self.embedding_config = Some(config);
self
}
pub fn with_qdrant(mut self, config: crate::config::QdrantConfig) -> Self {
self.qdrant_config = Some(config);
self
}
pub fn with_llm(mut self, llm_client: Arc<dyn LLMClient>) -> Self {
self.llm_client = Some(llm_client);
self
}
pub fn with_session_config(mut self, config: SessionConfig) -> Self {
self.session_config = config;
self
}
pub fn with_coordinator_config(mut self, config: CoordinatorConfig) -> Self {
self.coordinator_config = Some(config);
self
}
pub async fn build(self) -> Result<CortexMem> {
info!("Building Cortex Memory with incremental update support");
let filesystem = Arc::new(CortexFilesystem::new(
self.data_dir.to_string_lossy().as_ref(),
));
filesystem.initialize().await?;
info!("Filesystem initialized at: {:?}", self.data_dir);
let embedding = if let Some(cfg) = self.embedding_config {
match EmbeddingClient::new_with_global_limiter(cfg).await {
Ok(client) => Some(Arc::new(client)),
Err(e) => {
warn!("Failed to create embedding client: {}", e);
None
}
}
} else {
None
};
let (qdrant_store_typed, vector_store): (Option<Arc<QdrantVectorStore>>, Option<Arc<dyn VectorStore>>) =
if let Some(ref cfg) = self.qdrant_config {
match QdrantVectorStore::new(cfg).await {
Ok(store) => {
info!("Qdrant vector store connected: {}", cfg.url);
let typed = Arc::new(store);
let dyn_store: Arc<dyn VectorStore> = typed.clone();
(Some(typed), Some(dyn_store))
}
Err(e) => {
warn!("Failed to connect to Qdrant, vector search disabled: {}", e);
(None, None)
}
}
} else {
(None, None)
};
let (event_bus, event_rx) = EventBus::new();
let event_bus = Arc::new(event_bus);
let (coordinator_handle, memory_event_tx) =
if let (Some(llm), Some(emb), Some(qdrant_store)) =
(&self.llm_client, &embedding, &qdrant_store_typed)
{
let qdrant_store = qdrant_store.clone();
let config = self.coordinator_config.unwrap_or_default();
let (coordinator, tx, rx) = MemoryEventCoordinator::new_with_config(
filesystem.clone(),
llm.clone(),
emb.clone(),
qdrant_store,
config,
);
let handle = tokio::spawn(coordinator.start(rx));
info!("✅ MemoryEventCoordinator started for incremental updates");
(Some(handle), Some(tx))
} else {
warn!("MemoryEventCoordinator disabled: missing LLM, embedding, or vector store");
(None, None)
};
let memory_event_tx_for_session = memory_event_tx.clone();
let session_manager = if let Some(tx) = memory_event_tx_for_session {
if let Some(ref llm) = self.llm_client {
SessionManager::with_llm_and_events(
filesystem.clone(),
self.session_config,
llm.clone(),
event_bus.as_ref().clone(),
)
.with_memory_event_tx(tx)
} else {
SessionManager::with_event_bus(
filesystem.clone(),
self.session_config,
event_bus.as_ref().clone(),
)
.with_memory_event_tx(tx)
}
} else {
if let Some(ref llm) = self.llm_client {
SessionManager::with_llm_and_events(
filesystem.clone(),
self.session_config,
llm.clone(),
event_bus.as_ref().clone(),
)
} else {
SessionManager::with_event_bus(
filesystem.clone(),
self.session_config,
event_bus.as_ref().clone(),
)
}
};
let (automation_handle, automation_tx_handle) = if let (Some(emb), Some(qdrant_store)) = (&embedding, &qdrant_store_typed) {
let indexer = Arc::new(AutoIndexer::new(
filesystem.clone(),
emb.clone(),
qdrant_store.clone(),
IndexerConfig::default(),
));
let automation_manager = if let Some(ref tx) = memory_event_tx {
AutomationManager::with_memory_events(
indexer,
AutomationConfig {
auto_index: true,
index_on_message: true,
..AutomationConfig::default()
},
tx.clone(),
)
} else {
AutomationManager::new(
indexer,
AutomationConfig {
auto_index: true,
index_on_message: true,
..AutomationConfig::default()
},
)
};
let tx_handle = automation_manager.memory_event_tx_handle();
let handle = tokio::spawn(async move {
if let Err(e) = automation_manager.start(event_rx).await {
tracing::error!("AutomationManager failed: {}", e);
}
});
info!("✅ AutomationManager started (real-time L2 indexing on MessageAdded)");
(Some(handle), Some(tx_handle))
} else {
drop(event_rx);
warn!("AutomationManager not started: Qdrant or Embedding not configured");
(None, None)
};
info!("✅ CortexMem initialized successfully");
Ok(CortexMem {
filesystem,
session_manager: Arc::new(RwLock::new(session_manager)),
embedding,
vector_store,
llm_client: self.llm_client,
event_bus,
qdrant_store_typed,
memory_event_tx,
coordinator_handle,
automation_handle,
automation_tx_handle,
})
}
}
pub struct CortexMem {
pub filesystem: Arc<CortexFilesystem>,
pub session_manager: Arc<RwLock<SessionManager>>,
pub embedding: Option<Arc<EmbeddingClient>>,
pub vector_store: Option<Arc<dyn VectorStore>>,
pub llm_client: Option<Arc<dyn LLMClient>>,
#[allow(dead_code)]
event_bus: Arc<EventBus>,
qdrant_store_typed: Option<Arc<QdrantVectorStore>>,
memory_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>>,
coordinator_handle: Option<tokio::task::JoinHandle<()>>,
automation_handle: Option<tokio::task::JoinHandle<()>>,
automation_tx_handle: Option<Arc<tokio::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>>>>>,
}
impl CortexMem {
pub fn session_manager(&self) -> Arc<RwLock<SessionManager>> {
self.session_manager.clone()
}
pub fn filesystem(&self) -> Arc<CortexFilesystem> {
self.filesystem.clone()
}
pub fn embedding(&self) -> Option<Arc<EmbeddingClient>> {
self.embedding.clone()
}
pub fn vector_store(&self) -> Option<Arc<dyn VectorStore>> {
self.vector_store.clone()
}
pub fn llm_client(&self) -> Option<Arc<dyn LLMClient>> {
self.llm_client.clone()
}
pub fn qdrant_store(&self) -> Option<Arc<QdrantVectorStore>> {
self.qdrant_store_typed.clone()
}
pub fn memory_event_tx(
&self,
) -> Option<tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>> {
self.memory_event_tx.clone()
}
pub fn automation_tx_handle(
&self,
) -> Option<Arc<tokio::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>>>>> {
self.automation_tx_handle.clone()
}
pub async fn shutdown(self) -> Result<()> {
info!("Shutting down CortexMem...");
if let Some(handle) = self.automation_handle {
handle.abort();
info!("AutomationManager stopped");
}
if let Some(handle) = self.coordinator_handle {
handle.abort();
info!("MemoryEventCoordinator stopped");
}
Ok(())
}
}