use beachcomber::cache::Cache;
use beachcomber::config::Config;
use beachcomber::provider::registry::ProviderRegistry;
use beachcomber::provider::{
FieldSchema, FieldType, InvalidationStrategy, Provider, ProviderMetadata, ProviderResult,
};
use beachcomber::scheduler::{Scheduler, SchedulerMessage};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
static FAIL_EXEC_COUNT: AtomicU32 = AtomicU32::new(0);
struct FailingProvider;
impl Provider for FailingProvider {
fn metadata(&self) -> ProviderMetadata {
ProviderMetadata {
name: "failing".to_string(),
fields: vec![FieldSchema {
name: "value".to_string(),
field_type: FieldType::String,
}],
invalidation: InvalidationStrategy::Poll {
interval_secs: 60,
floor_secs: 1,
},
global: true,
}
}
fn execute(&self, _path: Option<&str>) -> Option<ProviderResult> {
FAIL_EXEC_COUNT.fetch_add(1, Ordering::SeqCst);
None }
}
#[tokio::test]
async fn repeated_failures_trigger_backoff() {
FAIL_EXEC_COUNT.store(0, Ordering::SeqCst);
let cache = Arc::new(Cache::new());
let mut registry = ProviderRegistry::new();
registry.register(Box::new(FailingProvider));
let registry = Arc::new(registry);
let config = Config::default();
let (handle, scheduler) = Scheduler::new(cache.clone(), registry, config);
let sched_task = tokio::spawn(async move { scheduler.run().await });
for _ in 0..10 {
handle
.send(SchedulerMessage::Poke {
provider: "failing".to_string(),
path: None,
})
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let count = FAIL_EXEC_COUNT.load(Ordering::SeqCst);
assert!(
count < 10,
"Expected failure backoff to suppress some executions, got {count}"
);
handle.send(SchedulerMessage::Shutdown).await;
let _ = sched_task.await;
}