use super::{MetricsError, MetricsExporter, MetricsSnapshot, OtlpConfig, OtlpProtocol};
use async_trait::async_trait;
use opentelemetry::metrics::{Gauge, MeterProvider};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use std::time::Duration;
struct Instruments {
running_agents: Gauge<f64>,
queued_agents: Gauge<f64>,
suspended_agents: Gauge<f64>,
total_scheduled: Gauge<f64>,
max_capacity: Gauge<f64>,
load_factor: Gauge<f64>,
uptime_seconds: Gauge<f64>,
tm_total_tasks: Gauge<f64>,
tm_healthy_tasks: Gauge<f64>,
tm_average_uptime: Gauge<f64>,
tm_total_memory: Gauge<f64>,
lb_total_allocations: Gauge<f64>,
lb_active_allocations: Gauge<f64>,
lb_memory_utilization: Gauge<f64>,
lb_cpu_utilization: Gauge<f64>,
lb_allocation_failures: Gauge<f64>,
lb_avg_allocation_time: Gauge<f64>,
system_memory_mb: Gauge<f64>,
system_cpu_percent: Gauge<f64>,
}
pub struct OtlpExporter {
provider: SdkMeterProvider,
instruments: Instruments,
}
impl OtlpExporter {
pub fn new(
config: OtlpConfig,
service_name: &str,
service_namespace: &str,
export_interval: Duration,
) -> Result<Self, MetricsError> {
use opentelemetry::KeyValue;
use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::metrics::PeriodicReader;
use opentelemetry_sdk::Resource;
let timeout = Duration::from_secs(config.timeout_seconds);
let metric_exporter = match config.protocol {
OtlpProtocol::Grpc => MetricExporter::builder()
.with_tonic()
.with_endpoint(&config.endpoint)
.with_timeout(timeout)
.build()
.map_err(|e| {
MetricsError::ConfigError(format!("Failed to build gRPC OTLP exporter: {}", e))
})?,
OtlpProtocol::HttpBinary | OtlpProtocol::HttpJson => MetricExporter::builder()
.with_http()
.with_endpoint(&config.endpoint)
.with_timeout(timeout)
.build()
.map_err(|e| {
MetricsError::ConfigError(format!("Failed to build HTTP OTLP exporter: {}", e))
})?,
};
let reader = PeriodicReader::builder(metric_exporter)
.with_interval(export_interval)
.build();
let resource = Resource::builder()
.with_service_name(service_name.to_string())
.with_attribute(KeyValue::new(
"service.namespace",
service_namespace.to_string(),
))
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
let meter = provider.meter("symbiont.scheduler");
let instruments = Instruments {
running_agents: meter
.f64_gauge("scheduler.running_agents")
.with_description("Number of currently running agents")
.build(),
queued_agents: meter
.f64_gauge("scheduler.queued_agents")
.with_description("Number of agents waiting in queue")
.build(),
suspended_agents: meter
.f64_gauge("scheduler.suspended_agents")
.with_description("Number of suspended agents")
.build(),
total_scheduled: meter
.f64_gauge("scheduler.total_scheduled")
.with_description("Total number of scheduled agents")
.build(),
max_capacity: meter
.f64_gauge("scheduler.max_capacity")
.with_description("Maximum concurrent agent capacity")
.build(),
load_factor: meter
.f64_gauge("scheduler.load_factor")
.with_description("Current load factor (0.0-1.0)")
.build(),
uptime_seconds: meter
.f64_gauge("scheduler.uptime_seconds")
.with_description("Scheduler uptime in seconds")
.build(),
tm_total_tasks: meter
.f64_gauge("task_manager.total_tasks")
.with_description("Total tasks tracked by task manager")
.build(),
tm_healthy_tasks: meter
.f64_gauge("task_manager.healthy_tasks")
.with_description("Number of healthy tasks")
.build(),
tm_average_uptime: meter
.f64_gauge("task_manager.average_uptime_seconds")
.with_description("Average task uptime in seconds")
.build(),
tm_total_memory: meter
.f64_gauge("task_manager.total_memory_usage")
.with_description("Total memory usage across all tasks")
.build(),
lb_total_allocations: meter
.f64_gauge("load_balancer.total_allocations")
.with_description("Total resource allocations made")
.build(),
lb_active_allocations: meter
.f64_gauge("load_balancer.active_allocations")
.with_description("Currently active resource allocations")
.build(),
lb_memory_utilization: meter
.f64_gauge("load_balancer.memory_utilization")
.with_description("Memory utilization ratio (0.0-1.0)")
.build(),
lb_cpu_utilization: meter
.f64_gauge("load_balancer.cpu_utilization")
.with_description("CPU utilization ratio (0.0-1.0)")
.build(),
lb_allocation_failures: meter
.f64_gauge("load_balancer.allocation_failures")
.with_description("Total resource allocation failures")
.build(),
lb_avg_allocation_time: meter
.f64_gauge("load_balancer.average_allocation_time_ms")
.with_description("Average resource allocation time in milliseconds")
.build(),
system_memory_mb: meter
.f64_gauge("system.memory_usage_mb")
.with_description("System memory usage in megabytes")
.build(),
system_cpu_percent: meter
.f64_gauge("system.cpu_usage_percent")
.with_description("System CPU usage percentage")
.build(),
};
tracing::info!(
"OTLP metrics exporter initialized: endpoint={}, protocol={:?}",
config.endpoint,
config.protocol
);
Ok(Self {
provider,
instruments,
})
}
}
#[async_trait]
impl MetricsExporter for OtlpExporter {
async fn export(&self, snapshot: &MetricsSnapshot) -> Result<(), MetricsError> {
let i = &self.instruments;
i.running_agents
.record(snapshot.scheduler.running_agents as f64, &[]);
i.queued_agents
.record(snapshot.scheduler.queued_agents as f64, &[]);
i.suspended_agents
.record(snapshot.scheduler.suspended_agents as f64, &[]);
i.total_scheduled
.record(snapshot.scheduler.total_scheduled as f64, &[]);
i.max_capacity
.record(snapshot.scheduler.max_capacity as f64, &[]);
i.load_factor.record(snapshot.scheduler.load_factor, &[]);
i.uptime_seconds
.record(snapshot.scheduler.uptime_seconds as f64, &[]);
i.tm_total_tasks
.record(snapshot.task_manager.total_tasks as f64, &[]);
i.tm_healthy_tasks
.record(snapshot.task_manager.healthy_tasks as f64, &[]);
i.tm_average_uptime
.record(snapshot.task_manager.average_uptime_seconds, &[]);
i.tm_total_memory
.record(snapshot.task_manager.total_memory_usage as f64, &[]);
i.lb_total_allocations
.record(snapshot.load_balancer.total_allocations as f64, &[]);
i.lb_active_allocations
.record(snapshot.load_balancer.active_allocations as f64, &[]);
i.lb_memory_utilization
.record(snapshot.load_balancer.memory_utilization, &[]);
i.lb_cpu_utilization
.record(snapshot.load_balancer.cpu_utilization, &[]);
i.lb_allocation_failures
.record(snapshot.load_balancer.allocation_failures as f64, &[]);
i.lb_avg_allocation_time
.record(snapshot.load_balancer.average_allocation_time_ms, &[]);
i.system_memory_mb
.record(snapshot.system.memory_usage_mb, &[]);
i.system_cpu_percent
.record(snapshot.system.cpu_usage_percent, &[]);
tracing::trace!("Recorded metrics snapshot to OTLP gauges");
Ok(())
}
async fn shutdown(&self) -> Result<(), MetricsError> {
self.provider.shutdown().map_err(|e| {
MetricsError::ShutdownFailed(format!("OTLP meter provider shutdown failed: {}", e))
})
}
}