use crate::core::config::ExtractionConfig;
use crate::plugins::ProcessingStage;
use crate::types::{ExtractionResult, ProcessingWarning};
use crate::{KreuzbergError, Result};
use std::borrow::Cow;
#[cfg(feature = "otel")]
use std::time::Instant;
#[cfg(feature = "otel")]
use tracing::Instrument;
pub(super) async fn execute_processors(
result: &mut ExtractionResult,
config: &ExtractionConfig,
pp_config: &Option<&crate::core::config::PostProcessorConfig>,
early_processors: std::sync::Arc<Vec<std::sync::Arc<dyn crate::plugins::PostProcessor>>>,
middle_processors: std::sync::Arc<Vec<std::sync::Arc<dyn crate::plugins::PostProcessor>>>,
late_processors: std::sync::Arc<Vec<std::sync::Arc<dyn crate::plugins::PostProcessor>>>,
) -> Result<()> {
for (_stage, processors_arc) in [
(ProcessingStage::Early, early_processors),
(ProcessingStage::Middle, middle_processors),
(ProcessingStage::Late, late_processors),
] {
#[cfg(feature = "otel")]
let stage_name = match _stage {
ProcessingStage::Early => crate::telemetry::conventions::stages::POST_PROCESSING_EARLY,
ProcessingStage::Middle => crate::telemetry::conventions::stages::POST_PROCESSING_MIDDLE,
ProcessingStage::Late => crate::telemetry::conventions::stages::POST_PROCESSING_LATE,
};
#[cfg(feature = "otel")]
let stage_span = crate::telemetry::spans::pipeline_stage_span(stage_name);
#[cfg(feature = "otel")]
let stage_start = Instant::now();
#[cfg(feature = "otel")]
let _stage_guard = stage_span.enter();
for processor in processors_arc.iter() {
let processor_name = processor.name();
let should_run = should_processor_run(pp_config, processor_name);
if should_run && processor.should_process(result, config) {
#[cfg(feature = "otel")]
let processor_span = crate::telemetry::spans::pipeline_processor_span(stage_name, processor_name);
#[cfg(feature = "otel")]
let process_result = processor.process(result, config).instrument(processor_span).await;
#[cfg(not(feature = "otel"))]
let process_result = processor.process(result, config).await;
match process_result {
Ok(_) => {}
Err(err @ KreuzbergError::Io(_))
| Err(err @ KreuzbergError::LockPoisoned(_))
| Err(err @ KreuzbergError::Plugin { .. }) => {
return Err(err);
}
Err(err) => {
let error_msg = err.to_string();
result.processing_warnings.push(ProcessingWarning {
source: Cow::Owned(processor_name.to_string()),
message: Cow::Owned(error_msg),
});
}
}
}
}
#[cfg(feature = "otel")]
{
let stage_ms = stage_start.elapsed().as_secs_f64() * 1000.0;
crate::telemetry::metrics::get_metrics().pipeline_duration_ms.record(
stage_ms,
&[opentelemetry::KeyValue::new(
crate::telemetry::conventions::PIPELINE_STAGE,
stage_name.to_string(),
)],
);
drop(_stage_guard);
drop(stage_span);
}
}
Ok(())
}
fn should_processor_run(pp_config: &Option<&crate::core::config::PostProcessorConfig>, processor_name: &str) -> bool {
if let Some(config) = pp_config {
if let Some(ref enabled_set) = config.enabled_set {
enabled_set.contains(processor_name)
} else if let Some(ref disabled_set) = config.disabled_set {
!disabled_set.contains(processor_name)
} else if let Some(ref enabled) = config.enabled_processors {
enabled.iter().any(|name| name == processor_name)
} else if let Some(ref disabled) = config.disabled_processors {
!disabled.iter().any(|name| name == processor_name)
} else {
true
}
} else {
true
}
}
pub(super) async fn execute_validators(result: &ExtractionResult, config: &ExtractionConfig) -> Result<()> {
let validator_registry = crate::plugins::registry::get_validator_registry();
let validators = {
let registry = validator_registry.read();
registry.get_all()
};
if !validators.is_empty() {
for validator in validators {
if validator.should_validate(result, config) {
validator.validate(result, config).await?;
}
}
}
Ok(())
}