use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsConfig {
pub enabled: bool,
pub prometheus_port: Option<u16>,
pub push_gateway: Option<String>,
pub push_interval: Duration,
pub include_process_metrics: bool,
pub include_runtime_metrics: bool,
pub latency_buckets: Vec<f64>,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
enabled: true,
prometheus_port: Some(9090),
push_gateway: None,
push_interval: Duration::from_secs(15),
include_process_metrics: true,
include_runtime_metrics: true,
latency_buckets: vec![
0.000_1, 0.000_5, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, ],
}
}
}
impl MetricsConfig {
pub fn prometheus(port: u16) -> Self {
Self {
prometheus_port: Some(port),
..Default::default()
}
}
pub fn push_gateway(url: impl Into<String>) -> Self {
Self {
prometheus_port: None,
push_gateway: Some(url.into()),
..Default::default()
}
}
#[cfg(feature = "metrics")]
pub async fn init(&self) -> crate::error::Result<()> {
use ::metrics_exporter_prometheus::PrometheusBuilder;
if !self.enabled {
return Ok(());
}
if let Some(port) = self.prometheus_port {
let builder = PrometheusBuilder::new();
builder
.with_http_listener(([0, 0, 0, 0], port))
.install()
.map_err(|e| crate::error::KernelError::ConfigError(e.to_string()))?;
}
Ok(())
}
#[cfg(not(feature = "metrics"))]
pub async fn init(&self) -> crate::error::Result<()> {
Ok(())
}
}
pub struct MetricsExporter {
#[allow(dead_code)]
config: MetricsConfig,
}
impl MetricsExporter {
pub fn new(config: MetricsConfig) -> Self {
Self { config }
}
#[cfg(feature = "metrics")]
pub async fn start(&self) -> crate::error::Result<()> {
self.config.init().await
}
#[cfg(not(feature = "metrics"))]
pub async fn start(&self) -> crate::error::Result<()> {
Ok(())
}
}
#[derive(Debug)]
pub struct KernelMetrics {
pub kernel_id: String,
pub domain: String,
pub messages_total: AtomicU64,
pub messages_success: AtomicU64,
pub messages_failed: AtomicU64,
pub processing_time_ns: AtomicU64,
pub queue_depth: AtomicU64,
}
impl KernelMetrics {
pub fn new(kernel_id: impl Into<String>, domain: impl Into<String>) -> Self {
Self {
kernel_id: kernel_id.into(),
domain: domain.into(),
messages_total: AtomicU64::new(0),
messages_success: AtomicU64::new(0),
messages_failed: AtomicU64::new(0),
processing_time_ns: AtomicU64::new(0),
queue_depth: AtomicU64::new(0),
}
}
pub fn record_execution(&self, latency: Duration, success: bool) {
self.messages_total.fetch_add(1, Ordering::Relaxed);
if success {
self.messages_success.fetch_add(1, Ordering::Relaxed);
} else {
self.messages_failed.fetch_add(1, Ordering::Relaxed);
}
self.processing_time_ns
.fetch_add(latency.as_nanos() as u64, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::{counter, histogram};
counter!("rustkernel_messages_total",
"kernel_id" => self.kernel_id.clone(),
"domain" => self.domain.clone(),
"status" => if success { "success" } else { "error" }
)
.increment(1);
histogram!("rustkernel_message_latency_seconds",
"kernel_id" => self.kernel_id.clone(),
"domain" => self.domain.clone()
)
.record(latency.as_secs_f64());
}
}
pub fn set_queue_depth(&self, depth: u64) {
self.queue_depth.store(depth, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::gauge;
gauge!("rustkernel_queue_depth",
"kernel_id" => self.kernel_id.clone(),
"domain" => self.domain.clone()
)
.set(depth as f64);
}
}
pub fn avg_latency_us(&self) -> f64 {
let total = self.messages_total.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let time_ns = self.processing_time_ns.load(Ordering::Relaxed);
(time_ns as f64 / total as f64) / 1000.0
}
pub fn success_rate(&self) -> f64 {
let total = self.messages_total.load(Ordering::Relaxed);
if total == 0 {
return 1.0;
}
let success = self.messages_success.load(Ordering::Relaxed);
success as f64 / total as f64
}
pub fn throughput(&self, duration: Duration) -> f64 {
let total = self.messages_total.load(Ordering::Relaxed);
total as f64 / duration.as_secs_f64()
}
}
pub struct RuntimeMetrics {
inner: Arc<RuntimeMetricsInner>,
}
struct RuntimeMetricsInner {
pub kernels_registered: AtomicU64,
pub kernels_active: AtomicU64,
pub messages_total: AtomicU64,
pub gpu_memory_bytes: AtomicU64,
pub gpu_memory_peak_bytes: AtomicU64,
}
impl RuntimeMetrics {
pub fn new() -> Self {
Self {
inner: Arc::new(RuntimeMetricsInner {
kernels_registered: AtomicU64::new(0),
kernels_active: AtomicU64::new(0),
messages_total: AtomicU64::new(0),
gpu_memory_bytes: AtomicU64::new(0),
gpu_memory_peak_bytes: AtomicU64::new(0),
}),
}
}
pub fn record_kernel_registered(&self) {
self.inner
.kernels_registered
.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::gauge;
gauge!("rustkernel_kernels_registered")
.set(self.inner.kernels_registered.load(Ordering::Relaxed) as f64);
}
}
pub fn record_kernel_activated(&self) {
self.inner.kernels_active.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::gauge;
gauge!("rustkernel_kernels_active")
.set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
}
}
pub fn record_kernel_deactivated(&self) {
self.inner.kernels_active.fetch_sub(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::gauge;
gauge!("rustkernel_kernels_active")
.set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
}
}
pub fn record_message(&self) {
self.inner.messages_total.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "metrics")]
{
use ::metrics::counter;
counter!("rustkernel_messages_total_global").increment(1);
}
}
pub fn set_gpu_memory(&self, bytes: u64) {
self.inner.gpu_memory_bytes.store(bytes, Ordering::Relaxed);
let current_peak = self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed);
if bytes > current_peak {
self.inner
.gpu_memory_peak_bytes
.store(bytes, Ordering::Relaxed);
}
#[cfg(feature = "metrics")]
{
use ::metrics::gauge;
gauge!("rustkernel_gpu_memory_bytes").set(bytes as f64);
gauge!("rustkernel_gpu_memory_peak_bytes")
.set(self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed) as f64);
}
}
pub fn gpu_memory(&self) -> u64 {
self.inner.gpu_memory_bytes.load(Ordering::Relaxed)
}
pub fn gpu_memory_peak(&self) -> u64 {
self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed)
}
}
impl Default for RuntimeMetrics {
fn default() -> Self {
Self::new()
}
}
impl Clone for RuntimeMetrics {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kernel_metrics() {
let metrics = KernelMetrics::new("graph/pagerank", "GraphAnalytics");
metrics.record_execution(Duration::from_micros(100), true);
metrics.record_execution(Duration::from_micros(200), true);
metrics.record_execution(Duration::from_micros(300), false);
assert_eq!(metrics.messages_total.load(Ordering::Relaxed), 3);
assert_eq!(metrics.messages_success.load(Ordering::Relaxed), 2);
assert_eq!(metrics.messages_failed.load(Ordering::Relaxed), 1);
assert!((metrics.avg_latency_us() - 200.0).abs() < 1.0);
assert!((metrics.success_rate() - 0.666).abs() < 0.01);
}
#[test]
fn test_runtime_metrics() {
let metrics = RuntimeMetrics::new();
metrics.record_kernel_registered();
metrics.record_kernel_registered();
metrics.record_kernel_activated();
assert_eq!(metrics.inner.kernels_registered.load(Ordering::Relaxed), 2);
assert_eq!(metrics.inner.kernels_active.load(Ordering::Relaxed), 1);
metrics.set_gpu_memory(1024 * 1024);
assert_eq!(metrics.gpu_memory(), 1024 * 1024);
}
#[test]
fn test_metrics_config() {
let config = MetricsConfig::prometheus(9090);
assert_eq!(config.prometheus_port, Some(9090));
assert!(config.enabled);
let push_config = MetricsConfig::push_gateway("http://localhost:9091");
assert!(push_config.push_gateway.is_some());
assert!(push_config.prometheus_port.is_none());
}
}