1use orbok_cache::{CacheService, EngineOptions, OrbokCacheNamespace};
6use orbok_core::{ErrorCategory, FileId, JobType, OrbokError, OrbokResult, now_iso8601};
7use orbok_db::Catalog;
8use orbok_core::ExtractionId;
9use orbok_db::repo::{FileRepository, IndexJobRepository, SourceRepository};
10use orbok_extract::{ExtractOutput, ExtractorRegistry};
11use orbok_fs::{GuardedSource, PathGuard};
12use std::path::Path;
13
14pub struct ExtractionWorker<'a> {
16 catalog: &'a Catalog,
17 cache: &'a CacheService,
18 registry: ExtractorRegistry,
19}
20
21impl<'a> ExtractionWorker<'a> {
22 pub fn new(catalog: &'a Catalog, cache: &'a CacheService) -> Self {
23 Self {
24 catalog,
25 cache,
26 registry: ExtractorRegistry::default(),
27 }
28 }
29
30 pub fn run(&self, file_id: &FileId) -> OrbokResult<()> {
34 let files = FileRepository::new(self.catalog);
35 let record = files
36 .get_by_id(file_id)?
37 .ok_or(OrbokError::FileNotFound)?;
38 let sources = SourceRepository::new(self.catalog);
39 let source = sources
40 .get(&record.source_id)?
41 .ok_or(OrbokError::SourceNotFound)?;
42
43 let guard = PathGuard::new(vec![GuardedSource::from_record(&source)]);
45 let validated = guard.validate(Path::new(&record.canonical_path))?;
46
47 let engine = self.cache.engine::<ExtractOutput>(
49 self.catalog,
50 &OrbokCacheNamespace::ExtractSegments,
51 EngineOptions::default(),
52 )?;
53 if CacheService::get_fresh(&engine, &validated)?.is_some() {
54 IndexJobRepository::new(self.catalog).enqueue(
56 JobType::Chunk,
57 Some(&record.source_id),
58 Some(file_id),
59 )?;
60 return Ok(());
61 }
62
63 let output = self
65 .registry
66 .extract(&validated)
67 .map_err(|e| OrbokError::Extraction {
68 category: ErrorCategory::ParserError,
69 message: e.to_string(),
70 })?;
71
72 CacheService::put(&engine, &validated, &output)?;
74
75 let extraction_id = ExtractionId::generate();
77 let now = now_iso8601();
78 {
79 let conn = self.catalog.lock();
80 conn.execute(
81 "INSERT INTO extraction_records \
82 (extraction_id, file_id, extractor_name, extractor_version, \
83 normalization_version, source_content_hash, status, \
84 extracted_char_count, extracted_byte_count, started_at, completed_at, \
85 created_at, updated_at) \
86 VALUES (?1,?2,?3,?4,?5,?6,'succeeded',?7,?8,?9,?9,?9,?9)",
87 rusqlite::params![
88 extraction_id.as_str(),
89 file_id.as_str(),
90 output.extractor_name,
91 output.extractor_version,
92 output.normalization_version,
93 record.content_hash,
94 output.char_count as i64,
95 output.char_count as i64,
96 now,
97 ],
98 )
99 .map_err(|e| OrbokError::Database(e.to_string()))?;
100 }
101
102 IndexJobRepository::new(self.catalog).enqueue(
104 JobType::Chunk,
105 Some(&record.source_id),
106 Some(file_id),
107 )?;
108 Ok(())
109 }
110}