Skip to main content

orbok_workers/
chunk_and_index.rs

1//! Chunk-and-index worker (RFC-006 §12): loads an extraction result
2//! from the cache, chunks it, and atomically inserts chunks + FTS index
3//! into the catalog (one transaction).
4
5use orbok_cache::{CacheService, EngineOptions, OrbokCacheNamespace};
6use orbok_core::{ErrorCategory, ExtractionId, FileId, OrbokError, OrbokResult};
7use orbok_db::Catalog;
8use orbok_db::repo::{ChunkRepository, FileRepository, SourceRepository};
9use orbok_extract::{ExtractOutput, chunk};
10use orbok_fs::{GuardedSource, PathGuard};
11use rusqlite::params;
12use std::path::Path;
13
14/// Chunk-and-index worker.
15pub struct ChunkAndIndexWorker<'a> {
16    catalog: &'a Catalog,
17    cache: &'a CacheService,
18}
19
20impl<'a> ChunkAndIndexWorker<'a> {
21    pub fn new(catalog: &'a Catalog, cache: &'a CacheService) -> Self {
22        Self { catalog, cache }
23    }
24
25    /// Load the extraction cache for a file, chunk, and index.
26    pub fn run(&self, file_id: &FileId) -> OrbokResult<()> {
27        let files = FileRepository::new(self.catalog);
28        let record = files.get_by_id(file_id)?.ok_or(OrbokError::FileNotFound)?;
29        let sources = SourceRepository::new(self.catalog);
30        let source = sources
31            .get(&record.source_id)?
32            .ok_or(OrbokError::SourceNotFound)?;
33
34        let guard = PathGuard::new(vec![GuardedSource::from_record(&source)]);
35        let validated = guard.validate(Path::new(&record.canonical_path))?;
36
37        let engine = self.cache.engine::<ExtractOutput>(
38            self.catalog,
39            &OrbokCacheNamespace::ExtractSegments,
40            EngineOptions::default(),
41        )?;
42        let output = CacheService::get_fresh(&engine, &validated)?.ok_or_else(|| {
43            OrbokError::Extraction {
44                category: ErrorCategory::ParserError,
45                message: "extraction cache miss: run extraction first".into(),
46            }
47        })?;
48
49        // Find the most recent succeeded extraction record for this file.
50        let extraction_id = self.latest_extraction_id(file_id)?;
51
52        let file_name = Path::new(&record.display_path)
53            .file_name()
54            .map(|n| n.to_string_lossy().into_owned())
55            .unwrap_or_else(|| record.display_path.clone());
56
57        let specs = chunk(&output, &file_name);
58        if specs.is_empty() || (specs.len() == 1 && specs[0].normalized_text.is_empty()) {
59            return Ok(());
60        }
61
62        ChunkRepository::new(self.catalog).insert_bundle(file_id, &extraction_id, &specs)?;
63        Ok(())
64    }
65
66    fn latest_extraction_id(&self, file_id: &FileId) -> OrbokResult<ExtractionId> {
67        let conn = self.catalog.lock();
68        let id: String = conn
69            .query_row(
70                "SELECT extraction_id FROM extraction_records \
71                 WHERE file_id = ?1 AND status = 'succeeded' \
72                 ORDER BY completed_at DESC LIMIT 1",
73                params![file_id.as_str()],
74                |row| row.get(0),
75            )
76            .map_err(|e| OrbokError::Database(format!("no extraction record: {e}")))?;
77        Ok(ExtractionId::from_string(id))
78    }
79}