opencrabs 0.3.12

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
//! Embedding — singleton engine, generate and store vector embeddings.

use once_cell::sync::OnceCell;
use qmd::{EmbeddingEngine, Store, pull_model};
use std::sync::Mutex;

static ENGINE: OnceCell<Mutex<EmbeddingEngine>> = OnceCell::new();

/// Disable llama.cpp's C-level logging globally.
///
/// Must be called once before creating any EmbeddingEngine.
/// Routes all llama.cpp log output through the tracing framework
/// with logging disabled — zero stderr pollution.
fn silence_llama_logs() {
    use llama_cpp_2::{LogOptions, send_logs_to_tracing};
    send_logs_to_tracing(LogOptions::default().with_logs_enabled(false));
}

/// Get (or create) the shared embedding engine.
///
/// Downloads the embeddinggemma-300M model (~300MB) on first call.
/// Returns Err if the download fails (e.g. no internet) or if the CPU lacks
/// AVX (required by llama.cpp GGUF inference) — callers fall back to FTS-only.
pub fn get_engine() -> Result<&'static Mutex<EmbeddingEngine>, String> {
    ENGINE.get_or_try_init(|| {
        check_cpu_features()?;
        silence_llama_logs();

        // Suppress hf-hub's indicatif progress bar (stderr) and any llama.cpp /
        // kalosm-common startup prints (stdout) while the TUI owns the terminal.
        // Progress is still logged via tracing, so no UX regression.
        let _fd_guard = crate::utils::fd_suppress::suppress_stdio();

        let pull = pull_model(qmd::llm::DEFAULT_EMBED_MODEL_URI, false)
            .map_err(|e| format!("Failed to pull embedding model: {e}"))?;

        let engine = EmbeddingEngine::new(&pull.path)
            .map_err(|e| format!("Failed to init embedding engine: {e}"))?;

        tracing::info!(
            "Embedding engine ready: {} ({:.1} MB)",
            pull.model,
            pull.size_bytes as f64 / 1_048_576.0
        );
        Ok(Mutex::new(engine))
    })
}

/// Verify the CPU supports the instruction sets required by llama.cpp.
/// Returns Err on x86 without AVX; passes through on ARM/other architectures.
fn check_cpu_features() -> Result<(), String> {
    #[cfg(target_arch = "x86_64")]
    {
        if !std::arch::is_x86_feature_detected!("avx") {
            return Err(
                "CPU lacks AVX — llama.cpp GGUF inference requires AVX (Sandy Bridge 2011+). \
                 Memory search will use FTS-only."
                    .to_string(),
            );
        }
    }
    Ok(())
}

/// Returns the engine if already initialized, without triggering a download.
pub fn engine_if_ready() -> Option<&'static Mutex<EmbeddingEngine>> {
    ENGINE.get()
}

/// Max bytes we'll send to llama.cpp for embedding.  Anything larger causes
/// a native `abort()` inside ggml_backend_sched_synchronize, which kills the
/// whole process.  Must match the constant in `backfill_embeddings`.
const MAX_EMBED_BYTES: usize = 32_000;

/// Generate and store an embedding for content.
///
/// Returns an error if the body is too large or the engine fails.
/// Never panics or aborts — all llama.cpp failures are caught.
///
/// Lock ordering: engine first (embed), then store (insert). Never both at once.
pub fn embed_content(store: &Mutex<Store>, body: &str) -> Result<(), String> {
    if body.is_empty() {
        return Ok(());
    }
    if body.len() > MAX_EMBED_BYTES {
        return Err(format!(
            "Body too large for embedding ({} bytes, max {MAX_EMBED_BYTES})",
            body.len()
        ));
    }

    let engine_mutex = engine_if_ready().ok_or("Embedding engine not initialized")?;

    let title = Store::extract_title(body);
    let hash = Store::hash_content(body);

    // catch_unwind guards against Rust-side panics from llama-cpp bindings.
    // A C-level abort() cannot be caught, so the size guard above is critical.
    let emb = {
        let mut engine = engine_mutex
            .lock()
            .map_err(|e| format!("Engine lock poisoned: {e}"))?;
        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            engine.embed_document(body, Some(&title))
        }))
        .map_err(|_| "llama.cpp panicked during embedding".to_string())?
        .map_err(|e| format!("Embedding failed: {e}"))?
    };

    // Store lock → insert → release
    let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
    store
        .lock()
        .map_err(|e| format!("Store lock poisoned: {e}"))?
        .insert_embedding(&hash, 0, 0, &emb.embedding, &emb.model, &now)
        .map_err(|e| format!("Failed to store embedding: {e}"))
}

/// Backfill embeddings for all documents that don't have one yet.
///
/// Initializes the engine (downloading the model if needed) and batch-embeds
/// any documents missing embeddings. Lock ordering: store → release → engine → release → store.
pub(super) fn backfill_embeddings(store: &Mutex<Store>) {
    let engine_mutex = match get_engine() {
        Ok(e) => e,
        Err(e) => {
            tracing::warn!("Embedding engine unavailable, skipping backfill: {e}");
            return;
        }
    };

    // Store lock: get hashes needing embeddings → release
    let needing = match store.lock() {
        Ok(s) => s.get_hashes_needing_embedding().unwrap_or_default(),
        Err(_) => return,
    };

    if needing.is_empty() {
        return;
    }

    let count = needing.len();
    tracing::info!("Backfilling embeddings for {count} documents");

    // Process one document at a time, releasing the engine lock between each
    // so other callers (session_search, embed_content) aren't blocked for the
    // entire batch duration.
    let now = chrono::Local::now().format("%Y-%m-%dT%H:%M:%S").to_string();
    let mut stored = 0usize;

    for (i, (hash, path, body)) in needing.iter().enumerate() {
        tracing::info!(
            "Embedding {}/{}: path={}, body_len={}, hash={}",
            i + 1,
            count,
            path,
            body.len(),
            hash
        );

        if body.len() > MAX_EMBED_BYTES {
            tracing::warn!(
                "Skipping embedding for '{}' — body too large ({} bytes, max {}). \
                 Inserting zero-vector placeholder so it won't retry.",
                path,
                body.len(),
                MAX_EMBED_BYTES
            );
            // Insert a zero-length placeholder embedding so this doc is no longer
            // returned by get_hashes_needing_embedding on every startup.
            if let Ok(s) = store.lock() {
                let _ = s.insert_embedding(hash, 0, 0, &[], "skipped-too-large", &now);
            }
            continue;
        }

        let title = Store::extract_title(body);

        // Engine lock: embed single document → release
        // catch_unwind guards against panics from llama-cpp bindings.
        let emb = {
            let mut engine = match engine_mutex.lock() {
                Ok(e) => e,
                Err(_) => return,
            };
            match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                engine.embed_document(body, Some(&title))
            })) {
                Ok(result) => result.ok(),
                Err(_) => {
                    tracing::error!("llama.cpp panicked during backfill embed of '{path}'");
                    continue;
                }
            }
        };

        // Store lock: insert embedding → release
        if let Some(emb) = emb
            && let Ok(s) = store.lock()
            && s.insert_embedding(hash, 0, 0, &emb.embedding, &emb.model, &now)
                .is_ok()
        {
            stored += 1;
        }
    }

    tracing::info!("Backfilled {stored}/{count} embeddings");
}