use crate::{
ContextLayer, Result,
embedding::EmbeddingClient,
filesystem::{CortexFilesystem, FilesystemOperations},
session::Message,
vector_store::{QdrantVectorStore, VectorStore},
};
use std::sync::Arc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct IndexerConfig {
pub auto_index: bool,
pub batch_size: usize,
pub async_index: bool,
}
impl Default for IndexerConfig {
fn default() -> Self {
Self {
auto_index: true,
batch_size: 10,
async_index: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct IndexStats {
pub total_indexed: usize,
pub total_skipped: usize,
pub total_errors: usize,
}
#[derive(Debug, Clone, Default)]
struct TimelineLayerStats {
l0_indexed: usize,
l1_indexed: usize,
errors: usize,
}
pub struct AutoIndexer {
filesystem: Arc<CortexFilesystem>,
embedding: Arc<EmbeddingClient>,
vector_store: Arc<QdrantVectorStore>,
config: IndexerConfig,
}
impl AutoIndexer {
pub fn new(
filesystem: Arc<CortexFilesystem>,
embedding: Arc<EmbeddingClient>,
vector_store: Arc<QdrantVectorStore>,
config: IndexerConfig,
) -> Self {
Self {
filesystem,
embedding,
vector_store,
config,
}
}
pub async fn index_message(&self, thread_id: &str, message: &Message) -> Result<()> {
info!("Indexing message {} in thread {}", message.id, thread_id);
let embedding = self.embedding.embed(&message.content).await?;
let uri = format!("cortex://session/{}/messages/{}", thread_id, message.id);
let memory = crate::types::Memory {
id: message.id.clone(),
content: message.content.clone(),
embedding,
created_at: message.created_at,
updated_at: message.created_at,
metadata: crate::types::MemoryMetadata {
uri: Some(uri),
user_id: None,
agent_id: None,
run_id: Some(thread_id.to_string()),
actor_id: None,
role: Some(format!("{:?}", message.role)),
layer: "L2".to_string(),
hash: self.calculate_hash(&message.content),
importance_score: 0.5,
entities: vec![],
topics: vec![],
custom: std::collections::HashMap::new(),
},
};
self.vector_store.as_ref().insert(&memory).await?;
debug!("Message {} indexed successfully", message.id);
Ok(())
}
pub async fn index_thread(&self, thread_id: &str) -> Result<IndexStats> {
self.index_thread_with_progress::<fn(usize, usize)>(thread_id, None)
.await
}
pub async fn index_thread_with_progress<F>(
&self,
thread_id: &str,
mut progress_callback: Option<F>,
) -> Result<IndexStats>
where
F: FnMut(usize, usize) + Send,
{
info!("Starting batch indexing for thread: {}", thread_id);
let mut stats = IndexStats::default();
let messages = self.collect_messages(thread_id).await?;
let total_messages = messages.len();
info!("Found {} messages to index", total_messages);
if total_messages == 0 {
return Ok(stats);
}
let existing_ids = self.get_indexed_message_ids(thread_id).await?;
let messages_to_index: Vec<_> = messages
.into_iter()
.filter(|m| !existing_ids.contains(&m.id))
.collect();
info!(
"Skipping {} already indexed messages",
total_messages - messages_to_index.len()
);
stats.total_skipped = total_messages - messages_to_index.len();
if messages_to_index.is_empty() {
info!("All messages already indexed");
return Ok(stats);
}
let total_to_index = messages_to_index.len();
for (batch_idx, chunk) in messages_to_index.chunks(self.config.batch_size).enumerate() {
let batch_start = batch_idx * self.config.batch_size;
if let Some(ref mut callback) = progress_callback {
callback(batch_start, total_to_index);
}
let contents: Vec<String> = chunk.iter().map(|m| m.content.clone()).collect();
match self.embedding.embed_batch(&contents).await {
Ok(embeddings) => {
for (message, embedding) in chunk.iter().zip(embeddings.iter()) {
let uri = format!("cortex://session/{}/messages/{}", thread_id, message.id);
let memory = crate::types::Memory {
id: message.id.clone(),
content: message.content.clone(),
embedding: embedding.clone(),
created_at: message.created_at,
updated_at: message.created_at,
metadata: crate::types::MemoryMetadata {
uri: Some(uri),
user_id: None,
agent_id: None,
run_id: Some(thread_id.to_string()),
actor_id: None,
role: Some(format!("{:?}", message.role)),
layer: "L2".to_string(),
hash: self.calculate_hash(&message.content),
importance_score: 0.5,
entities: vec![],
topics: vec![],
custom: std::collections::HashMap::new(),
},
};
match self.vector_store.as_ref().insert(&memory).await {
Ok(_) => {
stats.total_indexed += 1;
debug!("Indexed message {}", message.id);
}
Err(e) => {
warn!("Failed to index message {}: {}", message.id, e);
stats.total_errors += 1;
}
}
}
}
Err(e) => {
warn!(
"Failed to generate embeddings for batch {}: {}",
batch_idx, e
);
stats.total_errors += chunk.len();
}
}
}
info!(
"Batch indexing complete: {} indexed, {} skipped, {} errors",
stats.total_indexed, stats.total_skipped, stats.total_errors
);
info!("Indexing timeline L0/L1 layers for thread: {}", thread_id);
match self.index_timeline_layers(thread_id).await {
Ok(layer_stats) => {
info!(
"Timeline layers indexed: {} L0, {} L1",
layer_stats.l0_indexed, layer_stats.l1_indexed
);
stats.total_indexed += layer_stats.l0_indexed + layer_stats.l1_indexed;
stats.total_errors += layer_stats.errors;
}
Err(e) => {
warn!("Failed to index timeline layers: {}", e);
}
}
Ok(stats)
}
async fn get_indexed_message_ids(
&self,
thread_id: &str,
) -> Result<std::collections::HashSet<String>> {
use crate::vector_store::VectorStore;
let filters = crate::types::Filters {
run_id: Some(thread_id.to_string()),
..Default::default()
};
match self.vector_store.as_ref().scroll_ids(&filters, 1000).await {
Ok(ids) => Ok(ids.into_iter().collect()),
Err(e) => {
warn!(
"Failed to get indexed message IDs: {}, assuming none indexed",
e
);
Ok(std::collections::HashSet::new())
}
}
}
async fn collect_messages(&self, thread_id: &str) -> Result<Vec<Message>> {
let timeline_uri = format!("cortex://session/{}/timeline", thread_id);
let mut messages = Vec::new();
self.collect_messages_recursive(&timeline_uri, &mut messages)
.await?;
Ok(messages)
}
fn collect_messages_recursive<'a>(
&'a self,
uri: &'a str,
messages: &'a mut Vec<Message>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let entries = self.filesystem.as_ref().list(uri).await?;
for entry in entries {
if entry.is_directory && !entry.name.starts_with('.') {
self.collect_messages_recursive(&entry.uri, messages)
.await?;
} else if entry.name.ends_with(".md") && !entry.name.starts_with('.') {
if let Ok(content) = self.filesystem.as_ref().read(&entry.uri).await {
if let Some(message) = self.parse_message_markdown(&content) {
messages.push(message);
} else {
let message_id = if let Some(id) =
Self::extract_id_from_content(&content)
{
id
} else {
let name_without_ext = entry.name.trim_end_matches(".md");
let parts: Vec<&str> = name_without_ext.split('_').collect();
if parts.len() >= 4 {
let partial_id = parts[parts.len() - 1];
warn!(
"Could not extract full UUID from {}, using partial ID: {}",
entry.uri, partial_id
);
continue;
} else {
warn!("Invalid filename format: {}", entry.name);
continue;
}
};
let timestamp = entry.modified;
let message = Message {
id: message_id.clone(), role: crate::session::MessageRole::User, content: content.trim().to_string(),
timestamp,
created_at: timestamp,
metadata: None,
};
debug!(
"Collected message from {} with ID: {}",
entry.uri, message_id
);
messages.push(message);
}
}
}
}
Ok(())
})
}
fn parse_message_markdown(&self, content: &str) -> Option<Message> {
use crate::session::MessageRole;
let mut role = MessageRole::User;
let mut message_content = String::new();
let mut id = String::new();
let mut timestamp = chrono::Utc::now();
let mut in_content_section = false;
for line in content.lines() {
if line.starts_with("# 👤 User") || line.starts_with("# User") {
role = MessageRole::User;
} else if line.starts_with("# 🤖 Assistant") || line.starts_with("# Assistant") {
role = MessageRole::Assistant;
} else if line.starts_with("# ⚙️ System") || line.starts_with("# System") {
role = MessageRole::System;
} else if line.starts_with("**ID**:") {
if let Some(id_str) = line
.strip_prefix("**ID**:")
.map(|s| s.trim())
.and_then(|s| {
s.trim_start_matches('`')
.trim_end_matches('`')
.trim()
.to_string()
.into()
})
{
if !id_str.is_empty() {
id = id_str;
}
}
} else if line.starts_with("**Timestamp**:") {
if let Some(ts_str) = line.strip_prefix("**Timestamp**:").map(|s| s.trim()) {
if let Ok(parsed_ts) =
chrono::DateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S %Z")
{
timestamp = parsed_ts.with_timezone(&chrono::Utc);
} else if let Ok(parsed_ts) =
chrono::DateTime::parse_from_str(ts_str, "%Y-%m-%d %H:%M:%S UTC")
{
timestamp = parsed_ts.with_timezone(&chrono::Utc);
}
}
} else if line.starts_with("## Content") {
in_content_section = true;
} else if line.starts_with("##") {
in_content_section = false;
} else if in_content_section && !line.trim().is_empty() {
if !message_content.is_empty() {
message_content.push('\n');
}
message_content.push_str(line);
}
}
if !id.is_empty() && !message_content.is_empty() {
Some(Message {
id,
role,
content: message_content.trim().to_string(),
timestamp,
created_at: timestamp,
metadata: None,
})
} else {
None
}
}
fn extract_id_from_content(content: &str) -> Option<String> {
for line in content.lines() {
if line.contains("**ID**:") || line.contains("ID:") {
if let Some(id_part) = line.split(':').nth(1) {
let id = id_part.trim().trim_matches('`').trim().to_string();
if uuid::Uuid::parse_str(&id).is_ok() {
return Some(id);
}
}
}
}
None
}
fn calculate_hash(&self, content: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
async fn index_timeline_layers(&self, thread_id: &str) -> Result<TimelineLayerStats> {
let mut stats = TimelineLayerStats::default();
let timeline_base = format!("cortex://session/{}/timeline", thread_id);
let directories = self.collect_timeline_directories(&timeline_base).await?;
info!("Found {} timeline directories to index", directories.len());
for dir_uri in directories {
let l0_file_uri = format!("{}/.abstract.md", dir_uri);
if let Ok(l0_content) = self.filesystem.as_ref().read(&l0_file_uri).await {
match self
.index_layer(&dir_uri, &l0_content, ContextLayer::L0Abstract)
.await
{
Ok(indexed) => {
if indexed {
stats.l0_indexed += 1;
debug!("Indexed L0 for {}", dir_uri);
}
}
Err(e) => {
warn!("Failed to index L0 for {}: {}", dir_uri, e);
stats.errors += 1;
}
}
}
let l1_file_uri = format!("{}/.overview.md", dir_uri);
if let Ok(l1_content) = self.filesystem.as_ref().read(&l1_file_uri).await {
match self
.index_layer(&dir_uri, &l1_content, ContextLayer::L1Overview)
.await
{
Ok(indexed) => {
if indexed {
stats.l1_indexed += 1;
debug!("Indexed L1 for {}", dir_uri);
}
}
Err(e) => {
warn!("Failed to index L1 for {}: {}", dir_uri, e);
stats.errors += 1;
}
}
}
}
Ok(stats)
}
async fn collect_timeline_directories(&self, base_uri: &str) -> Result<Vec<String>> {
let mut directories = Vec::new();
self.collect_directories_recursive(base_uri, &mut directories)
.await?;
Ok(directories)
}
fn collect_directories_recursive<'a>(
&'a self,
uri: &'a str,
directories: &'a mut Vec<String>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
match self.filesystem.as_ref().list(uri).await {
Ok(entries) => {
let has_layers = entries
.iter()
.any(|e| e.name == ".abstract.md" || e.name == ".overview.md");
if has_layers {
directories.push(uri.to_string());
}
for entry in entries {
if entry.is_directory && !entry.name.starts_with('.') {
self.collect_directories_recursive(&entry.uri, directories)
.await?;
}
}
Ok(())
}
Err(e) => {
debug!("Failed to list {}: {}", uri, e);
Ok(())
}
}
})
}
async fn index_layer(&self, dir_uri: &str, content: &str, layer: ContextLayer) -> Result<bool> {
use crate::vector_store::{VectorStore, uri_to_vector_id};
let vector_id = uri_to_vector_id(dir_uri, layer);
if let Ok(Some(_)) = self.vector_store.as_ref().get(&vector_id).await {
debug!("Layer {:?} already indexed for {}", layer, dir_uri);
return Ok(false);
}
let embedding = self.embedding.embed(content).await?;
let memory = crate::types::Memory {
id: vector_id,
content: content.to_string(),
embedding,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
metadata: crate::types::MemoryMetadata {
uri: Some(dir_uri.to_string()), user_id: None,
agent_id: None,
run_id: None,
actor_id: None,
role: None,
layer: match layer {
ContextLayer::L0Abstract => "L0",
ContextLayer::L1Overview => "L1",
ContextLayer::L2Detail => "L2",
}.to_string(),
hash: self.calculate_hash(content),
importance_score: 0.5,
entities: vec![],
topics: vec![],
custom: std::collections::HashMap::new(),
},
};
self.vector_store.as_ref().insert(&memory).await?;
Ok(true)
}
}