use qmd::Store;
use std::path::Path;
use std::sync::Mutex;
use super::embedding::{backfill_embeddings, embed_content, embed_content_api, embed_via_api};
use super::{COLLECTION_BRAIN, COLLECTION_MEMORY, embedding_api_config, embedding_api_configured};
pub const BRAIN_FILES: &[&str] = &[
"SOUL.md",
"USER.md",
"AGENTS.md",
"TOOLS.md",
"CODE.md",
"SECURITY.md",
"MEMORY.md",
"BOOT.md",
"BOOTSTRAP.md",
"HEARTBEAT.md",
];
pub async fn index_file(store: &'static Mutex<Store>, path: &Path) -> Result<(), String> {
let body = tokio::fs::read_to_string(path)
.await
.map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
let path = path.to_path_buf();
let body_clone = body.clone();
let indexed = tokio::task::spawn_blocking(move || {
let indexed = {
let s = store
.lock()
.map_err(|e| format!("Store lock poisoned: {e}"))?;
index_file_sync(&s, COLLECTION_MEMORY, &path, &body)?
};
Ok::<bool, String>(indexed)
})
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))??;
if indexed {
if embedding_api_configured() {
if let Err(e) = embed_content_api(store, &body_clone).await {
tracing::warn!("API embedding skipped during index: {e}");
}
} else {
let store_ref = store;
let body_for_embed = body_clone;
tokio::task::spawn_blocking(move || {
if let Err(e) = embed_content(store_ref, &body_for_embed) {
tracing::warn!("Embedding skipped during index: {e}");
}
})
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))?;
}
}
Ok(())
}
fn index_file_sync(
store: &Store,
collection: &str,
path: &Path,
body: &str,
) -> Result<bool, String> {
let hash = Store::hash_content(body);
let rel_path = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.to_string_lossy().to_string());
if let Ok(Some((_id, existing_hash, _title))) =
store.find_active_document(collection, &rel_path)
&& existing_hash == hash
{
return Ok(false);
}
let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
let title = Store::extract_title(body);
let _ = store.deactivate_document(collection, &rel_path);
store
.insert_content(&hash, body, &now)
.map_err(|e| format!("Failed to insert content: {e}"))?;
store
.insert_document(collection, &rel_path, &title, &hash, &now, &now)
.map_err(|e| format!("Failed to insert document: {e}"))?;
tracing::debug!("Indexed {collection} file: {}", path.display());
Ok(true)
}
pub async fn reindex(store: &'static Mutex<Store>) -> Result<usize, String> {
let home = crate::config::opencrabs_home();
let dir = home.join("memory");
let mut indexed = 0usize;
let mut memory_on_disk: Vec<String> = Vec::new();
let mut brain_on_disk: Vec<String> = Vec::new();
if dir.exists() {
let entries =
std::fs::read_dir(&dir).map_err(|e| format!("Failed to read memory dir: {e}"))?;
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("md") {
let rel = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
memory_on_disk.push(rel);
if let Err(e) = index_file(store, &path).await {
tracing::warn!("Failed to index {}: {}", path.display(), e);
} else {
indexed += 1;
}
}
}
}
for &name in BRAIN_FILES {
let path = home.join(name);
if path.exists() {
let body = match tokio::fs::read_to_string(&path).await {
Ok(b) if !b.trim().is_empty() => b,
_ => continue,
};
brain_on_disk.push(name.to_string());
let result: Result<bool, String> = tokio::task::spawn_blocking({
let path = path.clone();
move || {
let store = store
.lock()
.map_err(|e| format!("Store lock poisoned: {e}"))?;
index_file_sync(&store, COLLECTION_BRAIN, &path, &body)
}
})
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))?;
match result {
Ok(_) => indexed += 1,
Err(e) => tracing::warn!("Failed to index brain file {name}: {e}"),
}
}
}
let prune_result: Result<(), String> = tokio::task::spawn_blocking({
move || {
let store = store
.lock()
.map_err(|e| format!("Store lock poisoned: {e}"))?;
if let Ok(db_paths) = store.get_active_document_paths(COLLECTION_MEMORY) {
for db_path in &db_paths {
if !memory_on_disk.contains(db_path) {
let _ = store.deactivate_document(COLLECTION_MEMORY, db_path);
tracing::debug!("Pruned missing memory file: {}", db_path);
}
}
}
if let Ok(db_paths) = store.get_active_document_paths(COLLECTION_BRAIN) {
for db_path in &db_paths {
if !brain_on_disk.contains(db_path) {
let _ = store.deactivate_document(COLLECTION_BRAIN, db_path);
tracing::debug!("Pruned missing brain file: {}", db_path);
}
}
}
Ok(())
}
})
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))?;
if let Err(e) = prune_result {
tracing::warn!("Memory prune failed: {e}");
}
if embedding_api_configured() {
let store_ref = store;
tokio::task::spawn_blocking(move || {
let needing = match store_ref.lock() {
Ok(s) => s.get_hashes_needing_embedding().unwrap_or_default(),
Err(_) => return,
};
if needing.is_empty() {
return;
}
tracing::info!("API backfill: {} documents need embeddings", needing.len());
drop(needing);
})
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))?;
let home = crate::config::opencrabs_home();
tokio::spawn(async move {
let store_ref = store;
let needing = tokio::task::spawn_blocking(move || {
store_ref
.lock()
.ok()
.and_then(|s| s.get_hashes_needing_embedding().ok())
.unwrap_or_default()
})
.await
.unwrap_or_default();
if needing.is_empty() {
return;
}
let count = needing.len();
tracing::info!("API backfill: embedding {count} documents");
let mut stored = 0usize;
for (hash, path, body) in &needing {
if body.len() > 32_000 {
tracing::warn!("Skipping embedding for '{path}' — body too large");
let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
if let Ok(s) = store_ref.lock() {
let _ = s.insert_embedding(hash, 0, 0, &[], "skipped-too-large", &now);
}
continue;
}
match embed_via_api(body).await {
Ok(embedding) => {
let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
let model_name = embedding_api_config()
.and_then(|c| c.model)
.unwrap_or_else(|| "api-embedding".to_string());
if let Ok(s) = store_ref.lock()
&& s.insert_embedding(hash, 0, 0, &embedding, &model_name, &now)
.is_ok()
{
stored += 1;
}
}
Err(e) => {
tracing::warn!("API embed failed for '{path}': {e}");
}
}
}
tracing::info!("API backfilled {stored}/{count} embeddings");
drop(home);
});
} else {
tokio::task::spawn_blocking(move || backfill_embeddings(store))
.await
.map_err(|e| format!("spawn_blocking failed: {e}"))?;
}
tracing::info!("Memory reindex complete: {} files", indexed);
Ok(indexed)
}