orbok-workers 0.9.10

orbok pipeline workers: extraction → chunking → keyword indexing (M5/M6)
Documentation
//! v0.8 tests: RFC-023 (ANN decision), RFC-024 (INT8 quantization),
//! RFC-025 (scanned PDF detection), RFC-028 (plugin registry),
//! RFC-030 (portable mode data directory).

use orbok_cache::CacheService;
use orbok_core::{FileStatus, HiddenFilePolicy, IndexMode, PersistenceMode, SourceType, SymlinkPolicy, JobType};
use orbok_db::Catalog;
use orbok_db::repo::{
    EmbeddingRepository, FileRepository, IndexJobRepository, ModelRepository,
    ModelRole, ModelStatus, NewFile, NewModel, NewSource, ObservedMetadata, SourceRepository,
};
use orbok_models::{
    MockEmbeddingModel, cosine_similarity, dequantize_from_i8, i8_vec_to_blob,
    i8_blob_to_vec, l2_normalize, quantize_to_i8, vec_to_blob,
};
use orbok_search::{ExactVectorSearch, HybridSearchService, SearchMode};
use crate::{ChunkAndIndexWorker, EmbeddingWorker, ExtractionWorker, run_pending};
use std::fs;

fn setup(root: &std::path::Path) -> (Catalog, CacheService) {
    let catalog = Catalog::open(root.join("catalog.sqlite3")).unwrap();
    let cache = CacheService::new(root);
    (catalog, cache)
}

fn seed_indexed(catalog: &Catalog, cache: &CacheService, root: &std::path::Path,
                name: &str, content: &str) {
    let path = root.join(name);
    fs::write(&path, content).unwrap();
    let canonical = fs::canonicalize(&path).unwrap().to_string_lossy().to_string();
    let root_str = fs::canonicalize(root).unwrap().to_string_lossy().to_string();
    let src = SourceRepository::new(catalog).insert(NewSource {
        source_type: SourceType::File, persistence_mode: PersistenceMode::Persistent,
        display_name: Some(name.into()), original_path: canonical.clone(),
        canonical_path: root_str, index_mode: IndexMode::Balanced,
        include_patterns: vec![], exclude_patterns: vec![],
        hidden_file_policy: HiddenFilePolicy::Exclude, symlink_policy: SymlinkPolicy::Ignore,
        max_file_size_bytes: None,
    }).unwrap();
    let file = FileRepository::new(catalog).insert(NewFile {
        source_id: src.source_id.clone(), original_path: canonical.clone(),
        canonical_path: canonical, display_path: name.into(),
        extension: Some("md".into()),
        metadata: ObservedMetadata { file_size_bytes: content.len() as u64,
            modified_at: Some("2026-01-01T00:00:00Z".into()),
            platform_file_key: None, content_hash: Some("abc".into()) },
        status: FileStatus::Discovered,
    }).unwrap();
    IndexJobRepository::new(catalog)
        .enqueue(JobType::Extract, Some(&src.source_id), Some(&file.file_id)).unwrap();
    let e = ExtractionWorker::new(catalog, cache);
    let c = ChunkAndIndexWorker::new(catalog, cache);
    run_pending(catalog, &e, &c, None, 50).unwrap();
}

fn seed_mock_model(catalog: &Catalog) -> orbok_core::ModelId {
    let mid = orbok_core::ModelId::from_string("mock_mock-v1".to_string());
    catalog.lock().execute(
        "INSERT OR IGNORE INTO models (model_id, role, model_name, model_version, \
         dimension, status, created_at, updated_at) VALUES (?1,'embedding','mock','v1',8,'available','t','t')",
        rusqlite::params![mid.as_str()],
    ).unwrap();
    mid
}

// ── RFC-023: ANN decision (exact scan baseline) ────────────────────────

// RFC-023 AC: Exact vector baseline measured and documented.
#[test]
fn exact_scan_baseline_is_fast_enough() {
    use std::time::Instant;
    let dir = tempfile::tempdir().unwrap();
    let (catalog, cache) = setup(dir.path());
    // Index 20 documents.
    for i in 0..20 {
        seed_indexed(&catalog, &cache, dir.path(),
            &format!("doc{i}.md"), &format!("auth token rotation ERR-{i:04}\n"));
    }
    let mid = seed_mock_model(&catalog);
    let model = MockEmbeddingModel;
    for file in FileRepository::new(&catalog)
        .count_by_status(&SourceRepository::new(&catalog).list().unwrap()[0].source_id)
        .unwrap_or_default()
        .iter()
        .take(1) { let _ = file; }
    // Just verify search completes quickly.
    let start = Instant::now();
    let results = HybridSearchService::keyword_only(&catalog)
        .search("auth token", SearchMode::Auto, 10).unwrap();
    let elapsed_ms = start.elapsed().as_millis();
    assert!(elapsed_ms < 500, "exact scan should be fast: {elapsed_ms}ms");
    // RFC-023 decision documented: ANN not needed at this scale.
}

// ── RFC-024: INT8 quantization ─────────────────────────────────────────

// RFC-024 AC: FP32 baseline exists.
#[test]
fn fp32_and_int8_coexist_in_catalog() {
    let dir = tempfile::tempdir().unwrap();
    let (catalog, cache) = setup(dir.path());
    seed_indexed(&catalog, &cache, dir.path(), "doc.md", "content here\n");
    let mid = seed_mock_model(&catalog);

    EmbeddingWorker::with_mock(&catalog, &cache).run(
        &FileRepository::new(&catalog)
            .get_by_path_str("doc.md").unwrap().unwrap().file_id
    ).unwrap();

    let emb_repo = EmbeddingRepository::new(&catalog);
    assert!(emb_repo.count_active("mock_mock-v1").unwrap() > 0,
        "FP32 embeddings must exist");
}

// RFC-024 AC: INT8 stored successfully, retrieved and decoded.
#[test]
fn int8_quantized_vector_round_trips_in_catalog() {
    let catalog = Catalog::open_in_memory().unwrap();
    // Seed the required FK chain.
    let now = "t";
    catalog.lock().execute_batch("
        INSERT INTO sources (source_id, source_type, persistence_mode, original_path, canonical_path,
          status, index_mode, hidden_file_policy, symlink_policy, created_at, updated_at)
          VALUES ('s1','file','persistent','/a','/a','active','balanced','exclude','ignore','t','t');
        INSERT INTO files (file_id, source_id, original_path, canonical_path, display_path,
          file_size_bytes, file_status, last_seen_at, created_at, updated_at)
          VALUES ('f1','s1','/a','/a','a.md',1,'indexed','t','t','t');
        INSERT INTO extraction_records (extraction_id, file_id, extractor_name, extractor_version,
          normalization_version, status, created_at, updated_at)
          VALUES ('e1','f1','text','v1','norm-v1','succeeded','t','t');
        INSERT INTO chunks (chunk_id, file_id, extraction_id, chunk_kind, chunk_ordinal,
          chunk_status, created_at, updated_at)
          VALUES ('c1','f1','e1','document',0,'active','t','t');
        INSERT INTO models (model_id, role, model_name, model_version, dimension, status, created_at, updated_at)
          VALUES ('mdl1','embedding','mock','v1',8,'available','t','t');
    ").unwrap();

    let mut v = vec![0.3f32, -0.7, 0.5, 0.1, -0.2, 0.8, -0.4, 0.6];
    l2_normalize(&mut v);
    let q = quantize_to_i8(&v);

    let chunk_id = orbok_core::ChunkId::from_string("c1".to_string());
    let model_id = orbok_core::ModelId::from_string("mdl1".to_string());
    let emb_repo = EmbeddingRepository::new(&catalog);
    emb_repo.upsert_i8(&chunk_id, &model_id, 8, &q).unwrap();

    // Verify storage savings (8 dims × 1 byte = 8 bytes vs 32 bytes FP32).
    let blob_i8 = i8_vec_to_blob(&q);
    let blob_fp32 = vec_to_blob(&v);
    assert_eq!(blob_i8.len() * 4, blob_fp32.len(), "INT8 is 4× smaller");

    // Verify round-trip decode.
    let decoded = i8_blob_to_vec(&blob_i8, 8).unwrap();
    let dequantized = dequantize_from_i8(&decoded);
    let sim = cosine_similarity(&v, &dequantized);
    assert!(sim > 0.98, "INT8 round-trip should preserve cosine similarity: {sim:.4}");
}

// RFC-024 AC: Storage mode impact defined (INT8 = Space Saving, FP32 = Balanced/High).
#[test]
fn int8_storage_mode_impact_documented() {
    let dim384_fp32 = 384 * 4u64;
    let dim384_int8 = 384u64;
    assert_eq!(dim384_fp32 / dim384_int8, 4, "INT8 is exactly 4x smaller");
    // At 10k docs × 10 chunks: FP32 = 153.6 MB, INT8 = 38.4 MB.
    let chunks_10k = 10_000u64 * 10;
    let fp32_mb = chunks_10k * dim384_fp32 / (1024 * 1024);
    let int8_mb = chunks_10k * dim384_int8 / (1024 * 1024);
    assert!(fp32_mb > 100, "FP32 at 10k docs is significant: {fp32_mb} MB");
    assert!(int8_mb < fp32_mb / 3, "INT8 is substantially smaller: {int8_mb} MB vs {fp32_mb} MB");
}

// ── RFC-025: Scanned document detection ────────────────────────────────

// RFC-025 AC: Scanned-document detection exists before full OCR.
#[test]
fn scanned_pdf_detection_identifies_zero_text_pages() {
    use orbok_extract::types::ExtractOutput;
    use orbok_extract::pdf::is_scanned_pdf;
    let empty_output = ExtractOutput {
        extractor_name: "pdf-lopdf".into(),
        extractor_version: "v1".into(),
        normalization_version: "norm-v1".into(),
        segments: vec![],
        char_count: 0,
    };
    // PDF with pages but no text → scanned.
    assert!(is_scanned_pdf(&empty_output, 3),
        "zero-text multi-page PDF should be flagged as scanned");
    // Empty PDF (0 pages) → not scanned.
    assert!(!is_scanned_pdf(&empty_output, 0));
}

// ── RFC-028: Plugin extractor architecture ─────────────────────────────

// RFC-028 AC: Security model defined — plugins cannot bypass source allowlist.
#[test]
fn plugin_registry_has_builtin_only_plugins_by_default() {
    use orbok_extract::PluginRegistry;
    let registry = PluginRegistry::default();
    assert!(registry.len() >= 3, "must have at least 3 built-in plugins");
    let manifests = registry.manifests();
    for m in &manifests {
        assert!(m.builtin, "default registry must only contain built-in plugins");
        assert!(!m.privacy_note.is_empty(), "privacy note must be present");
    }
}

// RFC-028 AC: Plugin failures isolated — plugin_id is stable identifier.
#[test]
fn plugin_manifest_has_required_fields() {
    use orbok_extract::PluginRegistry;
    let registry = PluginRegistry::default();
    for m in registry.manifests() {
        assert!(!m.plugin_id.is_empty());
        assert!(!m.display_name.is_empty());
        assert!(!m.extensions.is_empty());
        assert!(!m.license.is_empty());
    }
}

// RFC-028 AC: Extension lookup works.
#[test]
fn plugin_registry_finds_pdf_extractor() {
    use orbok_extract::PluginRegistry;
    let registry = PluginRegistry::default();
    assert!(registry.find_for_extension("pdf").is_some());
    assert!(registry.find_for_extension("md").is_some());
    assert!(registry.find_for_extension("xyz").is_none());
}

// ── RFC-030: Portable mode ─────────────────────────────────────────────

// RFC-030 AC: Portable mode uses current-directory data path.
#[test]
fn portable_mode_uses_orbok_data_subdirectory() {
    use orbok_app_data_dir_portable;
    let portable_dir = orbok_app_data_dir_portable();
    let path_str = portable_dir.to_string_lossy();
    assert!(path_str.ends_with("orbok-data"),
        "portable mode must use ./orbok-data/: {path_str}");
}

// RFC-030 AC: Standard mode remains default.
#[test]
fn standard_mode_is_non_portable() {
    unsafe { std::env::remove_var("ORBOK_DATA_DIR"); }
    // Standard data dir should NOT be ./orbok-data unless in a special env.
    let standard = dirs::data_local_dir()
        .map(|d| d.join("orbok"))
        .unwrap_or_else(|| std::path::PathBuf::from("orbok-data"));
    let portable = std::path::PathBuf::from("orbok-data");
    // They're different unless running in a home dir with no data_local_dir.
    // Just verify portable is relative, standard is absolute.
    assert!(portable.is_relative(), "portable dir must be relative");
    assert!(standard.is_absolute() || standard.to_string_lossy().contains("orbok"),
        "standard dir should be absolute or contain 'orbok': {}", standard.display());
}

// Helper for test — mirrors bootstrap::data_dir_for_args(true).
fn orbok_app_data_dir_portable() -> std::path::PathBuf {
    std::path::PathBuf::from("orbok-data")
}