use crate::context::{ResourceContext, ServiceContext};
use crate::event::EventBus;
use crate::system::System;
use async_trait::async_trait;
use std::any::Any;
use std::sync::Arc;
use super::events::*;
use super::hook::MetricsHook;
use super::registry::MetricsRegistry;
use super::reporting::{MetricReport, MetricSnapshot};
#[derive(Clone)]
pub struct MetricsSystem {
hook: Arc<dyn MetricsHook>,
}
impl MetricsSystem {
pub fn new(hook: Arc<dyn MetricsHook>) -> Self {
Self { hook }
}
pub async fn process_events(
&mut self,
_services: &ServiceContext,
resources: &mut ResourceContext,
) {
self.process_define_requests(resources).await;
self.process_record_requests(resources).await;
self.process_snapshot_requests(resources).await;
self.process_report_requests(resources).await;
self.process_remove_requests(resources).await;
self.process_clear_requests(resources).await;
}
async fn process_define_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<DefineMetricRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for request in requests {
{
if let Some(mut registry) = resources.get_mut::<MetricsRegistry>().await {
registry.define(request.definition.clone());
}
}
self.hook
.on_metric_defined(&request.definition, resources)
.await;
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(MetricDefined {
definition: request.definition,
});
}
}
}
async fn process_record_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<RecordMetricRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for request in requests {
let success = {
if let Some(mut registry) = resources.get_mut::<MetricsRegistry>().await {
registry.record(request.value.clone()).is_ok()
} else {
false
}
};
if success {
self.hook
.on_metric_recorded(&request.value, resources)
.await;
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(MetricRecorded {
value: request.value,
});
}
}
}
}
async fn process_snapshot_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<CreateSnapshotRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for request in requests {
let mut snapshot = if let Some(label) = request.label {
MetricSnapshot::with_label(request.timestamp, label)
} else {
MetricSnapshot::new(request.timestamp)
};
{
if let Some(registry) = resources.get::<MetricsRegistry>().await {
for (_metric_id, values) in registry.all_values() {
for value in values {
snapshot.add_value(value.clone());
}
}
}
}
self.hook.on_snapshot_created(&snapshot, resources).await;
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(SnapshotCreated { snapshot });
}
}
}
async fn process_report_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<GenerateReportRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for request in requests {
let mut report = if let Some(label) = request.label {
MetricReport::with_label(request.period_start, request.period_end, label)
} else {
MetricReport::new(request.period_start, request.period_end)
};
{
if let Some(registry) = resources.get::<MetricsRegistry>().await {
for metric_id in &request.metric_ids {
for aggregation in &request.aggregations {
if let Some(aggregated) = registry.aggregate(
metric_id,
*aggregation,
request.period_start,
request.period_end,
) {
report.add_aggregated(aggregated);
}
}
}
}
}
self.hook.on_report_generated(&report, resources).await;
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(ReportGenerated { report });
}
}
}
async fn process_remove_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<RemoveMetricRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for request in requests {
self.hook
.on_metric_removed(&request.metric_id, resources)
.await;
{
if let Some(mut registry) = resources.get_mut::<MetricsRegistry>().await {
registry.remove(&request.metric_id);
}
}
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(MetricRemoved {
metric_id: request.metric_id,
});
}
}
}
async fn process_clear_requests(&mut self, resources: &mut ResourceContext) {
let requests = {
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
let reader = bus.reader::<ClearMetricsRequested>();
reader.iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
}
};
for _request in requests {
self.hook.on_registry_cleared(resources).await;
{
if let Some(mut registry) = resources.get_mut::<MetricsRegistry>().await {
registry.clear();
}
}
if let Some(mut bus) = resources.get_mut::<EventBus>().await {
bus.publish(MetricsCleared {
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
});
}
}
}
}
#[async_trait]
impl System for MetricsSystem {
fn name(&self) -> &'static str {
"metrics_system"
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::plugin::metrics::hook::NoOpMetricsHook;
#[test]
fn test_system_creation() {
let hook = Arc::new(NoOpMetricsHook);
let _system = MetricsSystem::new(hook);
}
}