use std::sync::Arc;
use std::time::Instant;
use rsigma_eval::pipeline::sources::{DynamicSource, SourceType};
use rsigma_runtime::sources::{ResolvedValue, SourceError, SourceResolver};
use super::metrics::Metrics;
pub struct InstrumentedResolver {
inner: rsigma_runtime::DefaultSourceResolver,
metrics: Arc<Metrics>,
}
impl InstrumentedResolver {
pub fn new(metrics: Arc<Metrics>) -> Self {
Self {
inner: rsigma_runtime::DefaultSourceResolver::new(),
metrics,
}
}
pub fn cache(&self) -> &rsigma_runtime::sources::cache::SourceCache {
self.inner.cache()
}
}
#[async_trait::async_trait]
impl SourceResolver for InstrumentedResolver {
async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
let source_type_label = source_type_label(&source.source_type);
self.metrics
.source_resolves_total
.with_label_values(&[source.id.as_str(), source_type_label])
.inc();
let start = Instant::now();
let result = self.inner.resolve(source).await;
let elapsed = start.elapsed().as_secs_f64();
self.metrics.source_resolve_latency.observe(elapsed);
match &result {
Ok(value) => {
if value.from_cache {
self.metrics.source_cache_hits.inc();
}
self.metrics
.source_last_resolved
.with_label_values(&[source.id.as_str()])
.set(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
);
}
Err(e) => {
let error_kind = match &e.kind {
rsigma_runtime::SourceErrorKind::Fetch(_) => "Fetch",
rsigma_runtime::SourceErrorKind::Parse(_) => "Parse",
rsigma_runtime::SourceErrorKind::Extract(_) => "Extract",
rsigma_runtime::SourceErrorKind::Timeout => "Timeout",
};
self.metrics
.source_resolve_errors
.with_label_values(&[source.id.as_str(), error_kind])
.inc();
}
}
result
}
}
fn source_type_label(st: &SourceType) -> &'static str {
match st {
SourceType::File { .. } => "file",
SourceType::Command { .. } => "command",
SourceType::Http { .. } => "http",
SourceType::Nats { .. } => "nats",
}
}