Skip to main content

orbok_workers/
extract.rs

1//! Extraction worker (RFC-005 §14): reads a queued file, runs the
2//! extractor, stores the output in the cache, and writes an
3//! extraction_record. On success, queues a Chunk job.
4
5use orbok_cache::{CacheService, EngineOptions, OrbokCacheNamespace};
6use orbok_core::{ErrorCategory, FileId, JobType, OrbokError, OrbokResult, now_iso8601};
7use orbok_db::Catalog;
8use orbok_core::ExtractionId;
9use orbok_db::repo::{FileRepository, IndexJobRepository, SourceRepository};
10use orbok_extract::{ExtractOutput, ExtractorRegistry};
11use orbok_fs::{GuardedSource, PathGuard};
12use std::path::Path;
13
14/// Extraction worker instance, held for the duration of an index run.
15pub struct ExtractionWorker<'a> {
16    catalog: &'a Catalog,
17    cache: &'a CacheService,
18    registry: ExtractorRegistry,
19}
20
21impl<'a> ExtractionWorker<'a> {
22    pub fn new(catalog: &'a Catalog, cache: &'a CacheService) -> Self {
23        Self {
24            catalog,
25            cache,
26            registry: ExtractorRegistry::default(),
27        }
28    }
29
30    /// Run extraction for one file. Fails with a typed error on
31    /// unrecoverable cases; the worker coordinator converts failures to
32    /// a catalog record.
33    pub fn run(&self, file_id: &FileId) -> OrbokResult<()> {
34        let files = FileRepository::new(self.catalog);
35        let record = files
36            .get_by_id(file_id)?
37            .ok_or(OrbokError::FileNotFound)?;
38        let sources = SourceRepository::new(self.catalog);
39        let source = sources
40            .get(&record.source_id)?
41            .ok_or(OrbokError::SourceNotFound)?;
42
43        // Build a single-source path guard (RFC-003 §8 boundary).
44        let guard = PathGuard::new(vec![GuardedSource::from_record(&source)]);
45        let validated = guard.validate(Path::new(&record.canonical_path))?;
46
47        // Skip if cached extraction is still fresh (Appendix A §8).
48        let engine = self.cache.engine::<ExtractOutput>(
49            self.catalog,
50            &OrbokCacheNamespace::ExtractSegments,
51            EngineOptions::default(),
52        )?;
53        if CacheService::get_fresh(&engine, &validated)?.is_some() {
54            // Still fresh — queue the chunk job and return.
55            IndexJobRepository::new(self.catalog).enqueue(
56                JobType::Chunk,
57                Some(&record.source_id),
58                Some(file_id),
59            )?;
60            return Ok(());
61        }
62
63        // Run extractor.
64        let output = self
65            .registry
66            .extract(&validated)
67            .map_err(|e| OrbokError::Extraction {
68                category: ErrorCategory::ParserError,
69                message: e.to_string(),
70            })?;
71
72        // Cache the output (Appendix A §9.1).
73        CacheService::put(&engine, &validated, &output)?;
74
75        // Record in catalog.
76        let extraction_id = ExtractionId::generate();
77        let now = now_iso8601();
78        {
79            let conn = self.catalog.lock();
80            conn.execute(
81                "INSERT INTO extraction_records \
82                 (extraction_id, file_id, extractor_name, extractor_version, \
83                  normalization_version, source_content_hash, status, \
84                  extracted_char_count, extracted_byte_count, started_at, completed_at, \
85                  created_at, updated_at) \
86                 VALUES (?1,?2,?3,?4,?5,?6,'succeeded',?7,?8,?9,?9,?9,?9)",
87                rusqlite::params![
88                    extraction_id.as_str(),
89                    file_id.as_str(),
90                    output.extractor_name,
91                    output.extractor_version,
92                    output.normalization_version,
93                    record.content_hash,
94                    output.char_count as i64,
95                    output.char_count as i64,
96                    now,
97                ],
98            )
99            .map_err(|e| OrbokError::Database(e.to_string()))?;
100        }
101
102        // Queue chunk job.
103        IndexJobRepository::new(self.catalog).enqueue(
104            JobType::Chunk,
105            Some(&record.source_id),
106            Some(file_id),
107        )?;
108        Ok(())
109    }
110}