#[cfg(not(feature = "office"))]
use crate::KreuzbergError;
use crate::Result;
use crate::core::config::ExtractionConfig;
use crate::core::mime::{LEGACY_POWERPOINT_MIME_TYPE, LEGACY_WORD_MIME_TYPE};
use crate::types::ExtractionResult;
use std::path::Path;
use super::helpers::get_extractor;
#[cfg_attr(feature = "otel", tracing::instrument(
skip(config, path),
fields(
{ crate::telemetry::conventions::OPERATION } = crate::telemetry::conventions::operations::EXTRACT_FILE,
{ crate::telemetry::conventions::DOCUMENT_FILENAME } = tracing::field::Empty,
{ crate::telemetry::conventions::OTEL_STATUS_CODE } = tracing::field::Empty,
{ crate::telemetry::conventions::ERROR_TYPE } = tracing::field::Empty,
{ crate::telemetry::conventions::ERROR_MESSAGE } = tracing::field::Empty,
)
))]
pub async fn extract_file(
path: impl AsRef<Path>,
mime_type: Option<&str>,
config: &ExtractionConfig,
) -> Result<ExtractionResult> {
use crate::core::{io, mime};
let path = path.as_ref();
#[cfg(feature = "otel")]
{
let span = tracing::Span::current();
span.record(
crate::telemetry::conventions::DOCUMENT_FILENAME,
crate::telemetry::spans::sanitize_path(path),
);
}
let result = async {
io::validate_file_exists(path)?;
let detected_mime = mime::detect_or_validate(Some(path), mime_type)?;
#[cfg(not(feature = "office"))]
match detected_mime.as_str() {
LEGACY_WORD_MIME_TYPE => {
return Err(KreuzbergError::UnsupportedFormat(
"Legacy Word extraction requires the `office` feature".to_string(),
));
}
LEGACY_POWERPOINT_MIME_TYPE => {
return Err(KreuzbergError::UnsupportedFormat(
"Legacy PowerPoint extraction requires the `office` feature".to_string(),
));
}
_ => {}
}
#[cfg(feature = "office")]
{
let _ = LEGACY_WORD_MIME_TYPE;
let _ = LEGACY_POWERPOINT_MIME_TYPE;
}
extract_file_with_extractor(path, &detected_mime, config).await
}
.await;
#[cfg(feature = "otel")]
if let Err(ref e) = result {
crate::telemetry::spans::record_error_on_current_span(e);
}
result
}
pub(in crate::core::extractor) async fn extract_file_with_extractor(
path: &Path,
mime_type: &str,
config: &ExtractionConfig,
) -> Result<ExtractionResult> {
let config = config.normalized();
let config = config.as_ref();
if !config.use_cache || config.cache_ttl_secs == Some(0) {
return extract_file_uncached(path, mime_type, config).await;
}
let content_hash = crate::cache::blake3_hash_file(path)?;
let config_hash = hash_extraction_config(config, mime_type);
let cache_key = format!("{content_hash}_{config_hash}");
let namespace = config.cache_namespace.as_deref();
if let Some(cache) = get_extraction_cache()
&& let Ok(Some(data)) = cache.get(&cache_key, path.to_str(), namespace, config.cache_ttl_secs)
&& let Ok(result) = rmp_serde::from_slice::<ExtractionResult>(&data)
{
tracing::debug!(cache_key = %cache_key, "Extraction cache hit");
return Ok(result);
}
let result = extract_file_uncached(path, mime_type, config).await?;
if let Some(cache) = get_extraction_cache()
&& let Ok(data) = rmp_serde::to_vec(&result)
{
let _ = cache.set(&cache_key, data, path.to_str(), namespace, config.cache_ttl_secs);
}
Ok(result)
}
async fn extract_file_uncached(path: &Path, mime_type: &str, config: &ExtractionConfig) -> Result<ExtractionResult> {
let config = config.normalized();
let config = config.as_ref();
let budget = crate::core::config::concurrency::resolve_thread_budget(config.concurrency.as_ref());
crate::core::config::concurrency::init_thread_pools(budget);
crate::extractors::ensure_initialized()?;
let extractor = get_extractor(mime_type)?;
let mut result = extractor.extract_file(path, mime_type, config).await?;
result = crate::core::pipeline::run_pipeline(result, config).await?;
Ok(result)
}
fn hash_extraction_config(config: &ExtractionConfig, mime_type: &str) -> String {
let mut normalized = config.clone();
normalized.use_cache = true;
normalized.cache_namespace = None;
normalized.cache_ttl_secs = None;
let mut hasher = blake3::Hasher::new();
hasher.update(mime_type.as_bytes());
if let Ok(bytes) = rmp_serde::to_vec(&normalized) {
hasher.update(&bytes);
}
let hash = hasher.finalize();
hex::encode(&hash.as_bytes()[..16])
}
fn get_extraction_cache() -> Option<&'static crate::cache::GenericCache> {
use std::sync::OnceLock;
static CACHE: OnceLock<Option<crate::cache::GenericCache>> = OnceLock::new();
CACHE
.get_or_init(|| {
crate::cache::GenericCache::new(
"extraction".to_string(),
None,
30.0, 2000.0, 500.0, )
.ok()
})
.as_ref()
}
pub(in crate::core::extractor) async fn extract_bytes_with_extractor(
content: &[u8],
mime_type: &str,
config: &ExtractionConfig,
) -> Result<ExtractionResult> {
let config = config.normalized();
let config = config.as_ref();
let budget = crate::core::config::concurrency::resolve_thread_budget(config.concurrency.as_ref());
crate::core::config::concurrency::init_thread_pools(budget);
crate::extractors::ensure_initialized()?;
let extractor = get_extractor(mime_type)?;
let mut result = extractor.extract_bytes(content, mime_type, config).await?;
result = crate::core::pipeline::run_pipeline(result, config).await?;
Ok(result)
}