use orbok_cache::{CacheService, EngineOptions, OrbokCacheNamespace};
use orbok_core::{ErrorCategory, FileId, JobType, OrbokError, OrbokResult, now_iso8601};
use orbok_db::Catalog;
use orbok_core::ExtractionId;
use orbok_db::repo::{FileRepository, IndexJobRepository, SourceRepository};
use orbok_extract::{ExtractOutput, ExtractorRegistry};
use orbok_fs::{GuardedSource, PathGuard};
use std::path::Path;
pub struct ExtractionWorker<'a> {
catalog: &'a Catalog,
cache: &'a CacheService,
registry: ExtractorRegistry,
}
impl<'a> ExtractionWorker<'a> {
pub fn new(catalog: &'a Catalog, cache: &'a CacheService) -> Self {
Self {
catalog,
cache,
registry: ExtractorRegistry::default(),
}
}
pub fn run(&self, file_id: &FileId) -> OrbokResult<()> {
let files = FileRepository::new(self.catalog);
let record = files
.get_by_id(file_id)?
.ok_or(OrbokError::FileNotFound)?;
let sources = SourceRepository::new(self.catalog);
let source = sources
.get(&record.source_id)?
.ok_or(OrbokError::SourceNotFound)?;
let guard = PathGuard::new(vec![GuardedSource::from_record(&source)]);
let validated = guard.validate(Path::new(&record.canonical_path))?;
let engine = self.cache.engine::<ExtractOutput>(
self.catalog,
&OrbokCacheNamespace::ExtractSegments,
EngineOptions::default(),
)?;
if CacheService::get_fresh(&engine, &validated)?.is_some() {
IndexJobRepository::new(self.catalog).enqueue(
JobType::Chunk,
Some(&record.source_id),
Some(file_id),
)?;
return Ok(());
}
let output = self
.registry
.extract(&validated)
.map_err(|e| OrbokError::Extraction {
category: ErrorCategory::ParserError,
message: e.to_string(),
})?;
CacheService::put(&engine, &validated, &output)?;
let extraction_id = ExtractionId::generate();
let now = now_iso8601();
{
let conn = self.catalog.lock();
conn.execute(
"INSERT INTO extraction_records \
(extraction_id, file_id, extractor_name, extractor_version, \
normalization_version, source_content_hash, status, \
extracted_char_count, extracted_byte_count, started_at, completed_at, \
created_at, updated_at) \
VALUES (?1,?2,?3,?4,?5,?6,'succeeded',?7,?8,?9,?9,?9,?9)",
rusqlite::params![
extraction_id.as_str(),
file_id.as_str(),
output.extractor_name,
output.extractor_version,
output.normalization_version,
record.content_hash,
output.char_count as i64,
output.char_count as i64,
now,
],
)
.map_err(|e| OrbokError::Database(e.to_string()))?;
}
IndexJobRepository::new(self.catalog).enqueue(
JobType::Chunk,
Some(&record.source_id),
Some(file_id),
)?;
Ok(())
}
}