use std::sync::Arc;
use anyhow::Result;
use second_brain_adapters::claude_code::ClaudeCodeAdapter;
use second_brain_adapters::cursor::CursorAdapter;
use second_brain_adapters::{Adapter, ConversationSource};
use second_brain_core::embedding::Embedder;
use second_brain_core::kuzu_store::KuzuStore;
use second_brain_core::schema::{Relation, RelationType};
use second_brain_core::store::Store;
pub struct IngestStats {
pub new_memories: usize,
pub new_conversations: usize,
pub new_entities: usize,
pub skipped_dupes: usize,
pub unchanged: usize,
pub errors: usize,
pub files_processed: usize,
}
pub fn run_ingest(store: &Arc<KuzuStore>, embedder: Option<&Embedder>, machine_id: &str) -> Result<IngestStats> {
let adapters: Vec<Box<dyn Adapter>> = vec![
Box::new(ClaudeCodeAdapter::new(machine_id.to_string())),
Box::new(CursorAdapter::new(machine_id.to_string())),
];
let mut total = IngestStats {
new_memories: 0,
new_conversations: 0,
new_entities: 0,
skipped_dupes: 0,
unchanged: 0,
errors: 0,
files_processed: 0,
};
for adapter in &adapters {
let sources = match adapter.discover_conversations() {
Ok(s) => s,
Err(_) => continue,
};
let source_name = adapter.name().to_string();
total.files_processed += sources.len();
ingest_sources(
adapter.as_ref(),
&sources,
store,
embedder,
&source_name,
machine_id,
&mut total,
);
}
Ok(total)
}
pub fn ingest_file(store: &Arc<KuzuStore>, embedder: Option<&Embedder>, path: &std::path::Path, machine_id: &str) {
let path_str = path.to_string_lossy().to_string();
let hash = match file_hash(path) {
Some(h) => h,
None => return,
};
if let Ok(false) = store.is_file_changed(&path_str, &hash) {
return;
}
let adapter = ClaudeCodeAdapter::new(machine_id.to_string());
let source = ConversationSource {
id: path_str.clone(),
path: path_str.clone(),
source_name: "claude-code".to_string(),
machine_id: machine_id.to_string(),
};
let result = match adapter.ingest_conversation(&source) {
Ok(r) => r,
Err(e) => {
tracing::debug!("watcher: failed to parse {}: {e}", path_str);
return;
}
};
if result.memories.is_empty() {
store.mark_ingested(&path_str, &hash, 0, "claude-code").ok();
return;
}
store.store_conversation(&result.conversation).ok();
let mut count = 0;
for memory in &result.memories {
let dedup_prefix: String = memory.content.chars().take(128).collect();
if store.memory_content_exists(&dedup_prefix).unwrap_or(false) {
continue;
}
let mut mem = memory.clone();
mem.machine_id = machine_id.to_string();
if let Some(emb) = embedder
&& let Ok(embedding) = emb.embed(&mem.content)
{
mem.embedding = embedding;
}
if store.store_memory(&mem).is_ok() {
let relation = Relation {
from_id: mem.id,
to_id: result.conversation.id,
relation_type: RelationType::DerivedFrom,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
count += 1;
}
}
for (entity_info, memory_id) in &result.entities {
if let Ok(entity) = store.find_or_create_entity(&entity_info.name, &entity_info.entity_type)
{
let relation = Relation {
from_id: *memory_id,
to_id: entity.id,
relation_type: RelationType::Mentions,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
}
}
store
.mark_ingested(&path_str, &hash, count, "claude-code")
.ok();
tracing::info!("watcher: ingested {} ({} memories)", path_str, count);
}
fn ingest_sources(
adapter: &dyn Adapter,
sources: &[ConversationSource],
store: &Arc<KuzuStore>,
embedder: Option<&Embedder>,
source_name: &str,
machine_id: &str,
stats: &mut IngestStats,
) {
for source in sources {
let path = std::path::Path::new(&source.path);
let hash = match file_hash(path) {
Some(h) => h,
None => {
stats.errors += 1;
continue;
}
};
if let Ok(false) = store.is_file_changed(&source.path, &hash) {
stats.unchanged += 1;
continue;
}
let result = match adapter.ingest_conversation(source) {
Ok(r) => r,
Err(_) => {
stats.errors += 1;
continue;
}
};
if result.memories.is_empty() {
store
.mark_ingested(&source.path, &hash, 0, source_name)
.ok();
continue;
}
store.store_conversation(&result.conversation).ok();
stats.new_conversations += 1;
let mut stored_count = 0;
for memory in &result.memories {
let dedup_prefix: String = memory.content.chars().take(128).collect();
if store.memory_content_exists(&dedup_prefix).unwrap_or(false) {
stats.skipped_dupes += 1;
continue;
}
let mut mem = memory.clone();
mem.machine_id = machine_id.to_string();
if let Some(emb) = embedder
&& let Ok(embedding) = emb.embed(&mem.content)
{
mem.embedding = embedding;
}
store.store_memory(&mem).ok();
let relation = Relation {
from_id: mem.id,
to_id: result.conversation.id,
relation_type: RelationType::DerivedFrom,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
stored_count += 1;
}
for (entity_info, memory_id) in &result.entities {
if let Ok(entity) =
store.find_or_create_entity(&entity_info.name, &entity_info.entity_type)
{
let relation = Relation {
from_id: *memory_id,
to_id: entity.id,
relation_type: RelationType::Mentions,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
stats.new_entities += 1;
}
}
store
.mark_ingested(&source.path, &hash, stored_count, source_name)
.ok();
stats.new_memories += stored_count;
}
}
fn file_hash(path: &std::path::Path) -> Option<String> {
use std::hash::Hasher;
use std::io::Read;
let mut file = std::fs::File::open(path).ok()?;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut buf = [0u8; 8192];
loop {
let n = file.read(&mut buf).ok()?;
if n == 0 {
break;
}
hasher.write(&buf[..n]);
}
Some(format!("{:016x}", hasher.finish()))
}