pub mod crates_io;
pub mod curated_index;
pub mod github_topic;
use async_trait::async_trait;
use crate::types::{DiscoveredPlugin, SourceError};
#[async_trait]
pub trait Source: Send + Sync {
fn name(&self) -> &'static str;
async fn fetch(&self) -> Result<Vec<DiscoveredPlugin>, SourceError>;
}
pub fn source_error(source: &str, message: impl Into<String>) -> SourceError {
let mut message = message.into();
if message.len() > 256 {
message.truncate(256);
}
SourceError {
source: source.to_string(),
message,
}
}
#[derive(Debug, Default, Clone)]
pub struct RunOutcome {
pub items: Vec<DiscoveredPlugin>,
pub partial_failures: Vec<SourceError>,
}
pub async fn run_all(
sources: &[Box<dyn Source>],
per_source_timeout: std::time::Duration,
) -> RunOutcome {
use nexo_resilience::{CircuitBreaker, CircuitBreakerConfig, CircuitError};
let mut handles = Vec::with_capacity(sources.len());
for src in sources.iter() {
let name = src.name();
let cfg = CircuitBreakerConfig {
failure_threshold: 3,
success_threshold: 2,
initial_backoff: std::time::Duration::from_secs(60),
max_backoff: std::time::Duration::from_secs(600),
};
let breaker = CircuitBreaker::new(format!("plugin_discovery::{name}"), cfg);
let timeout = per_source_timeout;
let fut = async move {
let res: Result<Vec<DiscoveredPlugin>, CircuitError<SourceError>> = breaker
.call(|| async {
match tokio::time::timeout(timeout, src.fetch()).await {
Ok(inner) => inner,
Err(_elapsed) => Err(source_error(
name,
format!("timed out after {}s", timeout.as_secs()),
)),
}
})
.await;
(name, res)
};
handles.push(fut);
}
let results = futures::future::join_all(handles).await;
let mut outcome = RunOutcome::default();
for (name, res) in results {
match res {
Ok(items) => outcome.items.extend(items),
Err(CircuitError::Open(_)) => {
outcome.partial_failures.push(source_error(
name,
"circuit open — source flaky, see breaker logs",
));
}
Err(CircuitError::Inner(e)) => {
outcome.partial_failures.push(e);
}
}
}
outcome
}