use std::time::Duration;
use async_trait::async_trait;
use thiserror::Error;
use crate::discovery::types::{DiscoveryContext, Finding};
#[async_trait]
pub trait Discoverer: Send + Sync {
fn name(&self) -> &'static str;
fn enabled(&self, _ctx: &DiscoveryContext) -> bool {
true
}
async fn discover(&self, ctx: &DiscoveryContext) -> Result<Vec<Finding>, DiscoveryError>;
}
#[derive(Debug, Error)]
pub enum DiscoveryError {
#[error("discoverer `{name}` exceeded its budget ({budget_ms}ms)")]
Timeout { name: &'static str, budget_ms: u64 },
#[error("discoverer `{name}` returned an error: {message}")]
Backend { name: &'static str, message: String },
}
pub struct DiscoveryPipeline {
discoverers: Vec<Box<dyn Discoverer>>,
}
impl DiscoveryPipeline {
pub fn with_discoverers(discoverers: Vec<Box<dyn Discoverer>>) -> Self {
Self { discoverers }
}
pub async fn run(&self, ctx: &DiscoveryContext) -> Vec<Finding> {
let mut findings: Vec<Finding> = Vec::new();
for discoverer in &self.discoverers {
if !discoverer.enabled(ctx) {
tracing::trace!(
target: "crawlex::discovery",
name = discoverer.name(),
"discoverer disabled by feature gate"
);
continue;
}
let name = discoverer.name();
let budget_ms = ctx.budget.as_millis() as u64;
let started = std::time::Instant::now();
match tokio::time::timeout(ctx.budget, discoverer.discover(ctx)).await {
Ok(Ok(mut more)) => {
let count = more.len();
findings.append(&mut more);
tracing::debug!(
target: "crawlex::discovery",
name,
count,
elapsed_ms = started.elapsed().as_millis() as u64,
"discoverer ok"
);
}
Ok(Err(err)) => {
tracing::warn!(
target: "crawlex::discovery",
name,
?err,
elapsed_ms = started.elapsed().as_millis() as u64,
"discoverer failed; continuing"
);
}
Err(_elapsed) => {
tracing::warn!(
target: "crawlex::discovery",
name,
budget_ms,
"discoverer timed out; continuing"
);
}
}
}
findings
}
pub const DEFAULT_BUDGET: Duration = Duration::from_secs(30);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::discovery::types::{DiscoveryContext, DiscoveryFeatures};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc as StdArc;
struct Probe {
name: &'static str,
order: StdArc<AtomicUsize>,
recorded_at: StdArc<parking_lot::Mutex<Vec<(usize, &'static str)>>>,
delay: Duration,
fail: bool,
}
#[async_trait]
impl Discoverer for Probe {
fn name(&self) -> &'static str {
self.name
}
async fn discover(&self, _ctx: &DiscoveryContext) -> Result<Vec<Finding>, DiscoveryError> {
let pos = self.order.fetch_add(1, Ordering::SeqCst);
self.recorded_at.lock().push((pos, self.name));
if self.delay > Duration::ZERO {
tokio::time::sleep(self.delay).await;
}
if self.fail {
return Err(DiscoveryError::Backend {
name: self.name,
message: "synthetic".into(),
});
}
Ok(vec![Finding::Fact {
key: self.name.into(),
value: serde_json::json!(true),
}])
}
}
fn fake_ctx(budget: Duration) -> DiscoveryContext {
let http = StdArc::new(
crate::impersonate::ImpersonateClient::new(
crate::impersonate::Profile::Chrome149Stable,
)
.expect("ImpersonateClient builds in tests"),
);
DiscoveryContext {
target: "example.com".into(),
host: None,
http,
budget,
features: DiscoveryFeatures::default(),
}
}
#[tokio::test]
async fn pipeline_runs_in_order() {
let order = StdArc::new(AtomicUsize::new(0));
let recorded = StdArc::new(parking_lot::Mutex::new(Vec::new()));
let pipeline = DiscoveryPipeline::with_discoverers(vec![
Box::new(Probe {
name: "alpha",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: false,
}),
Box::new(Probe {
name: "beta",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: false,
}),
Box::new(Probe {
name: "gamma",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: false,
}),
]);
let _ = pipeline.run(&fake_ctx(Duration::from_secs(5))).await;
let recorded = recorded.lock().clone();
assert_eq!(
recorded,
vec![(0, "alpha"), (1, "beta"), (2, "gamma")],
"pipeline must invoke discoverers in order"
);
}
#[tokio::test]
async fn pipeline_continues_after_failure() {
let order = StdArc::new(AtomicUsize::new(0));
let recorded = StdArc::new(parking_lot::Mutex::new(Vec::new()));
let pipeline = DiscoveryPipeline::with_discoverers(vec![
Box::new(Probe {
name: "first",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: true,
}),
Box::new(Probe {
name: "second",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: false,
}),
]);
let findings = pipeline.run(&fake_ctx(Duration::from_secs(5))).await;
assert_eq!(findings.len(), 1, "pipeline should keep going after err");
let recorded = recorded.lock().clone();
assert_eq!(recorded.len(), 2, "both probes executed");
}
#[tokio::test]
async fn pipeline_enforces_budget() {
let order = StdArc::new(AtomicUsize::new(0));
let recorded = StdArc::new(parking_lot::Mutex::new(Vec::new()));
let pipeline = DiscoveryPipeline::with_discoverers(vec![
Box::new(Probe {
name: "slow",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::from_millis(500),
fail: false,
}),
Box::new(Probe {
name: "fast",
order: order.clone(),
recorded_at: recorded.clone(),
delay: Duration::ZERO,
fail: false,
}),
]);
let findings = pipeline.run(&fake_ctx(Duration::from_millis(50))).await;
assert_eq!(
findings.len(),
1,
"only the fast probe should produce a finding"
);
}
}