use crate::{
embedding::EmbeddingClient,
filesystem::{CortexFilesystem, FilesystemOperations},
layers::manager::LayerManager,
llm::LLMClient,
types::{Memory, MemoryMetadata},
vector_store::{QdrantVectorStore, uri_to_vector_id},
ContextLayer,
Result,
};
use std::collections::HashSet;
use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::vector_store::VectorStore as _;
pub struct SyncManager {
filesystem: Arc<CortexFilesystem>,
embedding: Arc<EmbeddingClient>,
vector_store: Arc<crate::vector_store::QdrantVectorStore>,
llm_client: Arc<dyn LLMClient>,
config: SyncConfig,
}
#[derive(Debug, Clone)]
pub struct SyncConfig {
pub auto_index: bool,
pub sync_agents: bool,
pub sync_threads: bool,
pub sync_users: bool,
pub sync_global: bool,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
auto_index: true,
sync_agents: true,
sync_threads: true,
sync_users: true,
sync_global: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SyncStats {
pub total_files: usize,
pub indexed_files: usize,
pub skipped_files: usize,
pub error_files: usize,
}
impl SyncManager {
pub fn new(
filesystem: Arc<CortexFilesystem>,
embedding: Arc<EmbeddingClient>,
vector_store: Arc<crate::vector_store::QdrantVectorStore>,
llm_client: Arc<dyn LLMClient>,
config: SyncConfig,
) -> Self {
Self {
filesystem,
embedding,
vector_store,
llm_client,
config,
}
}
pub fn with_defaults(
filesystem: Arc<CortexFilesystem>,
embedding: Arc<EmbeddingClient>,
vector_store: Arc<QdrantVectorStore>,
llm_client: Arc<dyn LLMClient>,
) -> Self {
Self::new(filesystem, embedding, vector_store, llm_client, SyncConfig::default())
}
pub async fn sync_all(&self) -> Result<SyncStats> {
info!("Starting full sync to vector database");
let mut total_stats = SyncStats::default();
if self.config.sync_users {
let stats = self
.sync_directory("cortex://user", "L2")
.await?;
total_stats.add(&stats);
}
if self.config.sync_agents {
let stats = self
.sync_directory("cortex://agent", "L2")
.await?;
total_stats.add(&stats);
}
if self.config.sync_threads {
let stats = self.sync_directory_recursive("cortex://session").await?;
total_stats.add(&stats);
}
if self.config.sync_global {
if let Ok(entries) = self.filesystem.list("cortex://resources").await {
if !entries.is_empty() {
let stats = self
.sync_directory("cortex://resources", "L2")
.await?;
total_stats.add(&stats);
}
}
}
info!(
"Sync completed: {} files processed, {} indexed, {} skipped, {} errors",
total_stats.total_files,
total_stats.indexed_files,
total_stats.skipped_files,
total_stats.error_files
);
Ok(total_stats)
}
pub async fn sync_specific_path(&self, uri: &str) -> Result<SyncStats> {
info!("Starting sync for specific path: {}", uri);
if !self.filesystem.exists(uri).await? {
warn!("Path does not exist: {}", uri);
return Ok(SyncStats::default());
}
let stats = if uri.starts_with("cortex://session/") {
self.sync_directory_recursive(uri).await?
} else if uri.starts_with("cortex://user/") || uri.starts_with("cortex://agent/") {
self.sync_directory(uri, "L2").await?
} else if uri.starts_with("cortex://resources/") {
self.sync_directory(uri, "L2").await?
} else {
self.sync_directory_recursive(uri).await?
};
info!(
"Sync completed for {}: {} files processed, {} indexed, {} skipped, {} errors",
uri,
stats.total_files,
stats.indexed_files,
stats.skipped_files,
stats.error_files
);
Ok(stats)
}
fn sync_directory<'a>(
&'a self,
uri: &'a str,
_layer: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<SyncStats>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.list(uri).await?;
let mut stats = SyncStats::default();
let mut subdirs: Vec<String> = Vec::new();
let mut files: Vec<String> = Vec::new();
for entry in &entries {
if entry.is_directory {
subdirs.push(entry.uri.clone());
} else if entry.name.ends_with(".md") && !entry.name.starts_with('.') {
files.push(entry.uri.clone());
}
}
if !files.is_empty() {
if let Err(e) = self.sync_directory_layers(uri).await {
debug!("Failed to sync L0/L1 for directory {}: {}", uri, e);
}
}
for file_uri in files {
match self.sync_file_l2(&file_uri).await {
Ok(true) => stats.indexed_files += 1,
Ok(false) => stats.skipped_files += 1,
Err(e) => {
warn!("Failed to sync {}: {}", file_uri, e);
stats.error_files += 1;
}
}
stats.total_files += 1;
}
for subdir in subdirs {
let sub_stats = self.sync_directory(&subdir, "L2").await?;
stats.add(&sub_stats);
}
Ok(stats)
})
}
fn sync_directory_recursive<'a>(
&'a self,
uri: &'a str,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<SyncStats>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.list(uri).await?;
let mut stats = SyncStats::default();
let is_session_timeline_root = uri.ends_with("/timeline") && !uri.contains("/timeline/");
if is_session_timeline_root {
let l0_exists = self
.filesystem
.exists(&format!("{}/.abstract.md", uri))
.await
.unwrap_or(false);
let l1_exists = self
.filesystem
.exists(&format!("{}/.overview.md", uri))
.await
.unwrap_or(false);
if l0_exists && l1_exists {
debug!("Timeline layers already exist for {}, skipping generation", uri);
} else {
if let Err(e) = self.generate_timeline_layers(uri).await {
warn!("Failed to generate timeline layers for {}: {}", uri, e);
} else {
info!("Generated session-level timeline layers for {}", uri);
}
}
}
let mut subdirs: Vec<String> = Vec::new();
let mut files: Vec<String> = Vec::new();
for entry in &entries {
if entry.is_directory {
subdirs.push(entry.uri.clone());
} else if entry.name.ends_with(".md") && !entry.name.starts_with('.') {
files.push(entry.uri.clone());
}
}
if !files.is_empty() {
if let Err(e) = self.sync_directory_layers(uri).await {
debug!("Failed to sync L0/L1 for directory {}: {}", uri, e);
}
}
for file_uri in files {
match self.sync_file_l2(&file_uri).await {
Ok(true) => stats.indexed_files += 1,
Ok(false) => stats.skipped_files += 1,
Err(e) => {
warn!("Failed to sync {}: {}", file_uri, e);
stats.error_files += 1;
}
}
stats.total_files += 1;
}
for subdir in subdirs {
let sub_stats = self.sync_directory_recursive(&subdir).await?;
stats.add(&sub_stats);
}
Ok(stats)
})
}
async fn sync_directory_layers(&self, dir_uri: &str) -> Result<()> {
let l0_file_uri = format!("{}/.abstract.md", dir_uri);
let l0_id = uri_to_vector_id(dir_uri, ContextLayer::L0Abstract);
if !self.is_indexed(&l0_id).await? {
if let Ok(l0_content) = self.filesystem.read(&l0_file_uri).await {
let l0_embedding = self.embedding.embed(&l0_content).await?;
let l0_metadata = self.parse_metadata(dir_uri, "L0")?;
let l0_memory = Memory {
id: l0_id,
content: l0_content,
embedding: l0_embedding,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: l0_metadata,
};
self.vector_store.insert(&l0_memory).await?;
debug!("L0 indexed for directory: {}", dir_uri);
}
}
let l1_file_uri = format!("{}/.overview.md", dir_uri);
let l1_id = uri_to_vector_id(dir_uri, ContextLayer::L1Overview);
if !self.is_indexed(&l1_id).await? {
if let Ok(l1_content) = self.filesystem.read(&l1_file_uri).await {
let l1_embedding = self.embedding.embed(&l1_content).await?;
let l1_metadata = self.parse_metadata(dir_uri, "L1")?;
let l1_memory = Memory {
id: l1_id,
content: l1_content,
embedding: l1_embedding,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: l1_metadata,
};
self.vector_store.insert(&l1_memory).await?;
debug!("L1 indexed for directory: {}", dir_uri);
}
}
Ok(())
}
async fn sync_file_l2(&self, uri: &str) -> Result<bool> {
let l2_id = uri_to_vector_id(uri, ContextLayer::L2Detail);
if self.is_indexed(&l2_id).await? {
debug!("File already indexed (L2): {}", uri);
return Ok(false);
}
let l2_content = self.filesystem.read(uri).await?;
let l2_embedding = self.embedding.embed(&l2_content).await?;
let l2_metadata = self.parse_metadata(uri, "L2")?;
let l2_memory = Memory {
id: l2_id,
content: l2_content,
embedding: l2_embedding,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: l2_metadata,
};
self.vector_store.insert(&l2_memory).await?;
debug!("L2 indexed: {}", uri);
Ok(true)
}
async fn is_indexed(&self, id: &str) -> Result<bool> {
match self.vector_store.get(id).await {
Ok(Some(_)) => Ok(true),
Ok(None) => Ok(false),
Err(e) => {
debug!("Error checking if indexed: {}", e);
Ok(false)
}
}
}
fn parse_metadata(&self, uri: &str, layer: &str) -> Result<MemoryMetadata> {
use serde_json::Value;
let parts: Vec<&str> = uri.split('/').collect();
let (dimension, path): (&str, String) = if parts.len() >= 3 {
(parts[2], parts[3..].join("/"))
} else {
(
"session",
uri.strip_prefix("cortex://").unwrap_or(uri).to_string(),
)
};
let hash = self.calculate_hash(uri);
let mut custom = std::collections::HashMap::new();
custom.insert("uri".to_string(), Value::String(uri.to_string()));
custom.insert("path".to_string(), Value::String(path.clone()));
Ok(MemoryMetadata {
uri: Some(uri.to_string()),
user_id: if dimension == "user" {
Some(path.clone())
} else {
None
},
agent_id: if dimension == "agent" {
Some(path.clone())
} else {
None
},
run_id: if dimension == "session" {
Some(path.clone())
} else {
None
},
actor_id: None,
role: None,
layer: layer.to_string(),
hash,
importance_score: 0.5,
entities: vec![],
topics: vec![],
custom,
})
}
fn calculate_hash(&self, content: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
async fn generate_timeline_layers(&self, timeline_uri: &str) -> Result<()> {
let layer_manager = LayerManager::new(self.filesystem.clone(), self.llm_client.clone());
layer_manager.generate_timeline_layers(timeline_uri).await
}
pub async fn sync_directories(&self, dir_uris: &[String]) -> Result<SyncStats> {
let mut total_stats = SyncStats::default();
let mut processed_dirs: HashSet<String> = HashSet::new();
for dir_uri in dir_uris {
if processed_dirs.contains(dir_uri) {
continue;
}
processed_dirs.insert(dir_uri.clone());
let stats = self.sync_specific_path(dir_uri).await?;
total_stats.add(&stats);
}
Ok(total_stats)
}
}
impl SyncStats {
pub fn add(&mut self, other: &SyncStats) {
self.total_files += other.total_files;
self.indexed_files += other.indexed_files;
self.skipped_files += other.skipped_files;
self.error_files += other.error_files;
}
}