use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio_cron_scheduler::{Job, JobScheduler};
use crate::collector::{Collector, CollectorError, Schedule};
use crate::storage::MetricCategory;
use crate::{Event, EventKind, EventPayload, EventSeverity, EventSource, StorageWriter};
pub const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug, Clone)]
pub struct JobInfo {
pub id: uuid::Uuid,
pub category: MetricCategory,
pub name: String,
pub metric_series_id: u64,
pub schedule: String,
}
struct JobContext<C> {
collector: Arc<C>,
name: String,
category: MetricCategory,
writer: StorageWriter,
}
pub struct CollectorRegistry {
scheduler: JobScheduler,
jobs: Arc<RwLock<HashMap<uuid::Uuid, JobInfo>>>,
writer: StorageWriter,
}
impl CollectorRegistry {
pub async fn new(writer: StorageWriter) -> Result<Self, CollectorError> {
let scheduler = JobScheduler::new()
.await
.map_err(|e| CollectorError::Scheduler(e.to_string()))?;
Ok(Self {
scheduler,
jobs: Arc::new(RwLock::new(HashMap::new())),
writer,
})
}
}
impl std::fmt::Debug for CollectorRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CollectorRegistry")
.field(
"job_count",
&self.jobs.try_read().map(|j| j.len()).unwrap_or(0),
)
.finish_non_exhaustive()
}
}
impl CollectorRegistry {
pub async fn spawn<C: Collector>(&self, collector: C) -> Result<uuid::Uuid, CollectorError> {
let name = collector.name().to_string();
let schedule_desc = collector.schedule().to_string();
let category = collector.category();
let category_str = category.as_ref();
let series_id = collector.upsert_metric_series().inspect_err(|e| {
self.emit(
None,
EventSeverity::Error,
format!("Job '{}' upsert_series failed", name),
|p| {
p.insert("collector".into(), name.clone());
p.insert("category".into(), category_str.to_string());
p.insert("error".into(), e.to_string());
},
)
})?;
let collector = Arc::new(collector);
let job = self
.create_job(Arc::clone(&collector), &name)
.map_err(|e| CollectorError::Scheduler(e.to_string()))
.inspect_err(|e| {
self.emit(
None,
EventSeverity::Error,
format!("Job '{}' create failed", name),
|p| {
p.insert("collector".into(), name.clone());
p.insert("category".into(), category_str.to_string());
p.insert("series_id".into(), series_id.to_string());
p.insert("error".into(), e.to_string());
},
)
})?;
let job_id = self
.scheduler
.add(job)
.await
.map_err(|e| CollectorError::Scheduler(e.to_string()))
.inspect_err(|e| {
self.emit(
None,
EventSeverity::Error,
format!("Job '{}' register failed", name),
|p| {
p.insert("collector".into(), name.clone());
p.insert("category".into(), category_str.to_string());
p.insert("series_id".into(), series_id.to_string());
p.insert("error".into(), e.to_string());
},
)
})?;
self.jobs.write().await.insert(
job_id,
JobInfo {
id: job_id,
category,
name: name.clone(),
metric_series_id: series_id,
schedule: schedule_desc.clone(),
},
);
tracing::info!(
category = category_str,
collector = %name,
series_id = series_id,
job_id = %job_id,
schedule = %schedule_desc,
"Metric series registered"
);
self.emit(
Some(category_to_event_source(category)),
EventSeverity::Info,
format!("Job created: {}", name),
|p| {
p.insert("job_id".into(), job_id.to_string());
p.insert("collector".into(), name.clone());
p.insert("category".into(), category_str.to_string());
p.insert("schedule".into(), schedule_desc);
p.insert("series_id".into(), series_id.to_string());
},
);
tracing::info!(collector = %name, job_id = %job_id, "Collector registered");
Ok(job_id)
}
pub async fn start(&self) -> Result<(), CollectorError> {
self.scheduler
.start()
.await
.map_err(|e| CollectorError::Scheduler(e.to_string()))?;
self.emit(
None,
EventSeverity::Info,
"Collector scheduler started",
|_| {},
);
tracing::info!("Collector scheduler started");
Ok(())
}
pub async fn list_jobs(&self) -> Vec<JobInfo> {
self.jobs.read().await.values().cloned().collect()
}
pub async fn job_count(&self) -> usize {
self.jobs.read().await.len()
}
pub async fn shutdown(self) -> Result<(), CollectorError> {
self.shutdown_with_timeout(DEFAULT_SHUTDOWN_TIMEOUT).await
}
pub async fn shutdown_with_timeout(mut self, timeout: Duration) -> Result<(), CollectorError> {
let job_count = self.jobs.read().await.len();
let shutdown_result = tokio::time::timeout(timeout, async {
self.scheduler
.shutdown()
.await
.map_err(|e| CollectorError::Scheduler(e.to_string()))
})
.await;
match shutdown_result {
Ok(Ok(())) => {
tracing::info!("Collector scheduler shutdown complete");
self.emit(
None,
EventSeverity::Info,
"Scheduler shutdown complete",
|p| {
p.insert("job_count".into(), job_count.to_string());
p.insert("timed_out".into(), "false".into());
},
);
Ok(())
}
Ok(Err(err)) => {
self.emit(
None,
EventSeverity::Error,
"Scheduler shutdown failed",
|p| {
p.insert("job_count".into(), job_count.to_string());
p.insert("error".into(), err.to_string());
},
);
Err(err)
}
Err(_) => {
let err = CollectorError::Scheduler("scheduler shutdown timed out".to_string());
tracing::error!("Collector scheduler shutdown timed out");
self.emit(
None,
EventSeverity::Error,
"Scheduler shutdown timed out",
|p| {
p.insert("job_count".into(), job_count.to_string());
p.insert("timed_out".into(), "true".into());
},
);
Err(err)
}
}
}
pub async fn remove(&self, job_id: &uuid::Uuid) -> Result<(), CollectorError> {
let job_name = self.jobs.read().await.get(job_id).map(|j| j.name.clone());
self.scheduler
.remove(job_id)
.await
.map_err(|e| CollectorError::Scheduler(e.to_string()))
.inspect_err(|e| {
self.emit(None, EventSeverity::Error, "Job remove failed", |p| {
p.insert("job_id".into(), job_id.to_string());
if let Some(ref name) = job_name {
p.insert("collector".into(), name.clone());
}
p.insert("error".into(), e.to_string());
});
})?;
self.jobs.write().await.remove(job_id);
self.emit(None, EventSeverity::Info, "Job removed", |p| {
p.insert("job_id".into(), job_id.to_string());
if let Some(ref name) = job_name {
p.insert("collector".into(), name.clone());
}
});
tracing::info!(job_id = %job_id, "Collector removed");
Ok(())
}
fn create_job<C: Collector>(
&self,
collector: Arc<C>,
name: &str,
) -> Result<Job, CollectorError> {
let name = name.to_owned();
let category = collector.category();
let writer = self.writer.clone();
let schedule = collector.schedule();
let context = Arc::new(JobContext {
collector,
name,
category,
writer,
});
let make_callback = move || {
let ctx = Arc::clone(&context);
move |_: uuid::Uuid, _: JobScheduler| {
let ctx = Arc::clone(&ctx);
Box::pin(async move { run_collection(ctx).await })
as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
}
};
match &schedule {
Schedule::Interval(d) => Job::new_repeated_async(*d, make_callback()),
Schedule::Cron(expr) => Job::new_cron_job_async(expr, make_callback()),
}
.map_err(|e| CollectorError::Scheduler(e.to_string()))
}
fn emit(
&self,
source: Option<EventSource>,
severity: EventSeverity,
message: impl Into<String>,
build_payload: impl FnOnce(&mut EventPayload),
) {
let mut payload = EventPayload::new();
build_payload(&mut payload);
let event = Event::new(
source.unwrap_or(EventSource::System),
EventKind::System,
severity,
message,
)
.with_payloads(payload);
if let Err(e) = self.writer.insert_event(event) {
tracing::warn!(error = %e, "Failed to enqueue event");
}
}
}
fn category_to_event_source(category: MetricCategory) -> EventSource {
match category {
MetricCategory::NetworkTcp => EventSource::CollectorNetworkTcp,
MetricCategory::NetworkPing => EventSource::CollectorNetworkPing,
MetricCategory::NetworkHttp => EventSource::CollectorNetworkHttp,
_ => EventSource::System,
}
}
async fn run_collection<C: Collector>(ctx: Arc<JobContext<C>>) {
let start = std::time::Instant::now();
tracing::debug!(collector = %ctx.name, "Running collection");
let result = ctx.collector.collect().await;
let duration_ms = start.elapsed().as_millis();
if let Ok(()) = result {
tracing::debug!(
collector = %ctx.name,
duration_ms,
"Collection succeeded"
);
return;
}
let e = result.unwrap_err(); tracing::error!(collector = %ctx.name, error = %e, "Collection failed");
let event_source = category_to_event_source(ctx.category);
let event = Event::new(
event_source,
EventKind::Error,
EventSeverity::Error,
format!("Collection failed: {}", e),
)
.with_payload("collector", &ctx.name)
.with_payload("category", ctx.category.as_ref())
.with_payload("duration_ms", duration_ms.to_string())
.with_payload("status", "failed")
.with_payload("error", e.to_string());
if let Err(e) = ctx.writer.insert_event(event) {
tracing::warn!(error = %e, "Failed to enqueue collection event");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::{MetricSeries, MetricValue, StaticTags, StorageBuilder, StorageWriter};
struct MockCollector {
name: String,
schedule: Schedule,
writer: StorageWriter,
series_id: u64,
}
impl MockCollector {
fn new(name: impl Into<String>, writer: StorageWriter) -> Self {
let name = name.into();
let series_id = crate::storage::MetricSeries::compute_series_id(
MetricCategory::Custom,
"mock",
&name,
&StaticTags::new(),
);
Self {
name,
schedule: Schedule::interval(Duration::from_secs(60)),
writer,
series_id,
}
}
}
#[async_trait::async_trait]
impl Collector for MockCollector {
fn name(&self) -> &str {
&self.name
}
fn category(&self) -> MetricCategory {
MetricCategory::Custom
}
fn schedule(&self) -> Schedule {
self.schedule.clone()
}
fn upsert_metric_series(&self) -> Result<u64, CollectorError> {
let series = MetricSeries::new(
MetricCategory::Custom,
"mock",
&self.name,
StaticTags::new(),
Some("Mock collector for testing".to_string()),
);
self.writer.upsert_metric_series(series)?;
Ok(self.series_id)
}
async fn collect(&self) -> Result<(), CollectorError> {
let value = MetricValue::new(self.series_id, 1.0, true);
self.writer.insert_metric_value(value)?;
Ok(())
}
}
#[tokio::test]
async fn test_registry_lifecycle() {
let handles = StorageBuilder::new("sqlite::memory:")
.build()
.await
.unwrap();
let writer = handles.writer.clone();
let registry = CollectorRegistry::new(writer.clone()).await.unwrap();
let collector = MockCollector::new("test-collector", writer);
let job_id = registry.spawn(collector).await.unwrap();
assert_eq!(registry.job_count().await, 1);
let jobs = registry.list_jobs().await;
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].name, "test-collector");
assert!(jobs[0].schedule.contains("60s"));
registry.remove(&job_id).await.unwrap();
assert_eq!(registry.job_count().await, 0);
registry.shutdown().await.unwrap();
handles.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_schedule_cron_validation() {
let result = Schedule::cron("invalid cron expression");
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("invalid cron"));
}
}