opencrabs 0.3.43

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
//! Indexing — insert memory/brain files into the qmd store and generate embeddings.

use qmd::Store;
use std::path::Path;
use std::sync::Mutex;

use super::embedding::{backfill_embeddings, embed_content, embed_content_api, embed_via_api};
use super::{COLLECTION_BRAIN, COLLECTION_MEMORY, embedding_api_config, embedding_api_configured};

/// Brain files loaded from the workspace root (`~/.opencrabs/`).
pub const BRAIN_FILES: &[&str] = &[
    "SOUL.md",
    "USER.md",
    "AGENTS.md",
    "TOOLS.md",
    "CODE.md",
    "SECURITY.md",
    "MEMORY.md",
    "BOOT.md",
    "BOOTSTRAP.md",
    "HEARTBEAT.md",
];

/// Index a single `.md` file into the qmd store under the `"memory"` collection.
///
/// Skips re-indexing if the file's SHA-256 hash hasn't changed.
/// Generates an embedding when the engine is already initialized.
pub async fn index_file(store: &'static Mutex<Store>, path: &Path) -> Result<(), String> {
    let body = tokio::fs::read_to_string(path)
        .await
        .map_err(|e| format!("Failed to read {}: {e}", path.display()))?;

    let path = path.to_path_buf();
    let body_clone = body.clone();

    // Phase 1: synchronous FTS indexing (blocking)
    let indexed = tokio::task::spawn_blocking(move || {
        let indexed = {
            let s = store
                .lock()
                .map_err(|e| format!("Store lock poisoned: {e}"))?;
            index_file_sync(&s, COLLECTION_MEMORY, &path, &body)?
        };
        Ok::<bool, String>(indexed)
    })
    .await
    .map_err(|e| format!("spawn_blocking failed: {e}"))??;

    // Phase 2: embedding (async for API, blocking for local)
    if indexed {
        if embedding_api_configured() {
            if let Err(e) = embed_content_api(store, &body_clone).await {
                tracing::warn!("API embedding skipped during index: {e}");
            }
        } else {
            let store_ref = store;
            let body_for_embed = body_clone;
            tokio::task::spawn_blocking(move || {
                if let Err(e) = embed_content(store_ref, &body_for_embed) {
                    tracing::warn!("Embedding skipped during index: {e}");
                }
            })
            .await
            .map_err(|e| format!("spawn_blocking failed: {e}"))?;
        }
    }

    Ok(())
}

/// Synchronous inner implementation for indexing a single file into a given collection.
/// Returns `true` if new content was indexed, `false` if hash-skipped.
fn index_file_sync(
    store: &Store,
    collection: &str,
    path: &Path,
    body: &str,
) -> Result<bool, String> {
    let hash = Store::hash_content(body);
    let rel_path = path
        .file_name()
        .map(|n| n.to_string_lossy().to_string())
        .unwrap_or_else(|| path.to_string_lossy().to_string());

    if let Ok(Some((_id, existing_hash, _title))) =
        store.find_active_document(collection, &rel_path)
        && existing_hash == hash
    {
        return Ok(false);
    }

    let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
    let title = Store::extract_title(body);

    // Pre-clear any existing FTS entry so the ON CONFLICT UPDATE branch in
    // insert_document fires a plain INSERT into documents_fts (not OR REPLACE,
    // which SQLite FTS5 rejects with "constraint failed").
    // Safe for new documents: deactivate_document matches 0 rows → no-op.
    let _ = store.deactivate_document(collection, &rel_path);

    store
        .insert_content(&hash, body, &now)
        .map_err(|e| format!("Failed to insert content: {e}"))?;
    store
        .insert_document(collection, &rel_path, &title, &hash, &now, &now)
        .map_err(|e| format!("Failed to insert document: {e}"))?;

    tracing::debug!("Indexed {collection} file: {}", path.display());
    Ok(true)
}

/// Walk `~/.opencrabs/memory/*.md` and `~/.opencrabs/*.md` brain files, indexing all.
///
/// Also deactivates entries for files that no longer exist on disk.
/// After indexing, backfills embeddings for any documents missing them.
/// Returns the number of files indexed.
pub async fn reindex(store: &'static Mutex<Store>) -> Result<usize, String> {
    let home = crate::config::opencrabs_home();
    let dir = home.join("memory");
    let mut indexed = 0usize;
    let mut memory_on_disk: Vec<String> = Vec::new();
    let mut brain_on_disk: Vec<String> = Vec::new();

    // --- Index daily memory logs ---
    if dir.exists() {
        let entries =
            std::fs::read_dir(&dir).map_err(|e| format!("Failed to read memory dir: {e}"))?;

        for entry in entries.flatten() {
            let path = entry.path();
            if path.extension().and_then(|s| s.to_str()) == Some("md") {
                let rel = path
                    .file_name()
                    .map(|n| n.to_string_lossy().to_string())
                    .unwrap_or_default();
                memory_on_disk.push(rel);

                if let Err(e) = index_file(store, &path).await {
                    tracing::warn!("Failed to index {}: {}", path.display(), e);
                } else {
                    indexed += 1;
                }
            }
        }
    }

    // --- Index brain workspace files ---
    for &name in BRAIN_FILES {
        let path = home.join(name);
        if path.exists() {
            let body = match tokio::fs::read_to_string(&path).await {
                Ok(b) if !b.trim().is_empty() => b,
                _ => continue,
            };
            brain_on_disk.push(name.to_string());

            let result: Result<bool, String> = tokio::task::spawn_blocking({
                let path = path.clone();
                move || {
                    let store = store
                        .lock()
                        .map_err(|e| format!("Store lock poisoned: {e}"))?;
                    index_file_sync(&store, COLLECTION_BRAIN, &path, &body)
                }
            })
            .await
            .map_err(|e| format!("spawn_blocking failed: {e}"))?;

            match result {
                Ok(_) => indexed += 1,
                Err(e) => tracing::warn!("Failed to index brain file {name}: {e}"),
            }
        }
    }

    // --- Prune deleted files from both collections ---
    let prune_result: Result<(), String> = tokio::task::spawn_blocking({
        move || {
            let store = store
                .lock()
                .map_err(|e| format!("Store lock poisoned: {e}"))?;

            if let Ok(db_paths) = store.get_active_document_paths(COLLECTION_MEMORY) {
                for db_path in &db_paths {
                    if !memory_on_disk.contains(db_path) {
                        let _ = store.deactivate_document(COLLECTION_MEMORY, db_path);
                        tracing::debug!("Pruned missing memory file: {}", db_path);
                    }
                }
            }

            if let Ok(db_paths) = store.get_active_document_paths(COLLECTION_BRAIN) {
                for db_path in &db_paths {
                    if !brain_on_disk.contains(db_path) {
                        let _ = store.deactivate_document(COLLECTION_BRAIN, db_path);
                        tracing::debug!("Pruned missing brain file: {}", db_path);
                    }
                }
            }

            Ok(())
        }
    })
    .await
    .map_err(|e| format!("spawn_blocking failed: {e}"))?;

    if let Err(e) = prune_result {
        tracing::warn!("Memory prune failed: {e}");
    }

    // --- Backfill embeddings for documents missing them ---
    if embedding_api_configured() {
        // API path: backfill one-by-one via HTTP (async)
        let store_ref = store;
        tokio::task::spawn_blocking(move || {
            let needing = match store_ref.lock() {
                Ok(s) => s.get_hashes_needing_embedding().unwrap_or_default(),
                Err(_) => return,
            };
            if needing.is_empty() {
                return;
            }
            tracing::info!("API backfill: {} documents need embeddings", needing.len());
            // Note: actual API calls happen in the next spawn_blocking cycle
            // to avoid blocking the tokio runtime. For now, just log.
            drop(needing);
        })
        .await
        .map_err(|e| format!("spawn_blocking failed: {e}"))?;

        // Async backfill: spawn as a background task
        let home = crate::config::opencrabs_home();
        tokio::spawn(async move {
            let store_ref = store;
            // Get hashes needing embedding
            let needing = tokio::task::spawn_blocking(move || {
                store_ref
                    .lock()
                    .ok()
                    .and_then(|s| s.get_hashes_needing_embedding().ok())
                    .unwrap_or_default()
            })
            .await
            .unwrap_or_default();

            if needing.is_empty() {
                return;
            }

            let count = needing.len();
            tracing::info!("API backfill: embedding {count} documents");
            let mut stored = 0usize;

            for (hash, path, body) in &needing {
                if body.len() > 32_000 {
                    tracing::warn!("Skipping embedding for '{path}' — body too large");
                    let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
                    if let Ok(s) = store_ref.lock() {
                        let _ = s.insert_embedding(hash, 0, 0, &[], "skipped-too-large", &now);
                    }
                    continue;
                }

                match embed_via_api(body).await {
                    Ok(embedding) => {
                        let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
                        let model_name = embedding_api_config()
                            .and_then(|c| c.model)
                            .unwrap_or_else(|| "api-embedding".to_string());
                        if let Ok(s) = store_ref.lock()
                            && s.insert_embedding(hash, 0, 0, &embedding, &model_name, &now)
                                .is_ok()
                        {
                            stored += 1;
                        }
                    }
                    Err(e) => {
                        tracing::warn!("API embed failed for '{path}': {e}");
                    }
                }
            }
            tracing::info!("API backfilled {stored}/{count} embeddings");
            drop(home);
        });
    } else {
        // Local path: use GGUF engine (blocking)
        tokio::task::spawn_blocking(move || backfill_embeddings(store))
            .await
            .map_err(|e| format!("spawn_blocking failed: {e}"))?;
    }

    tracing::info!("Memory reindex complete: {} files", indexed);
    Ok(indexed)
}