Skip to main content

symbi_runtime/metrics/
mod.rs

1//! Metrics collection and export for the Symbiont scheduler.
2//!
3//! Supports multiple export backends:
4//! - **File**: JSON snapshots written atomically to disk (always available)
5//! - **OTLP**: OpenTelemetry Protocol export via gRPC or HTTP (requires `metrics` feature)
6//!
7//! Multiple backends can run simultaneously via [`CompositeExporter`].
8
9use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use thiserror::Error;
15
16pub mod file;
17
18#[cfg(feature = "metrics")]
19pub mod otlp;
20
21// ---------------------------------------------------------------------------
22// Errors
23// ---------------------------------------------------------------------------
24
25/// Errors that can occur during metrics operations.
26#[derive(Debug, Error)]
27pub enum MetricsError {
28    #[error("metrics export failed: {0}")]
29    ExportFailed(String),
30
31    #[error("metrics configuration error: {0}")]
32    ConfigError(String),
33
34    #[error("I/O error: {0}")]
35    Io(#[from] std::io::Error),
36
37    #[error("serialization error: {0}")]
38    Serialization(#[from] serde_json::Error),
39
40    #[error("metrics shutdown failed: {0}")]
41    ShutdownFailed(String),
42}
43
44// ---------------------------------------------------------------------------
45// Configuration types
46// ---------------------------------------------------------------------------
47
48/// OTLP transport protocol.
49#[derive(Debug, Clone, Default, Serialize, Deserialize)]
50#[serde(rename_all = "lowercase")]
51pub enum OtlpProtocol {
52    /// gRPC (default port 4317).
53    #[default]
54    Grpc,
55    /// HTTP with protobuf encoding (default port 4318).
56    HttpBinary,
57    /// HTTP with JSON encoding (default port 4318).
58    HttpJson,
59}
60
61/// OTLP exporter configuration.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct OtlpConfig {
64    /// OTLP endpoint URL (e.g. `http://localhost:4317` for gRPC).
65    pub endpoint: String,
66
67    /// Transport protocol.
68    #[serde(default)]
69    pub protocol: OtlpProtocol,
70
71    /// Export timeout in seconds.
72    #[serde(default = "default_otlp_timeout")]
73    pub timeout_seconds: u64,
74
75    /// Additional headers sent with each export request.
76    /// Applied to HTTP transport; for gRPC use `OTEL_EXPORTER_OTLP_HEADERS` env var.
77    #[serde(default)]
78    pub headers: std::collections::HashMap<String, String>,
79}
80
81fn default_otlp_timeout() -> u64 {
82    10
83}
84
85/// File-based metrics exporter configuration.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct FileMetricsConfig {
88    /// Path to the output JSON file.
89    pub path: PathBuf,
90
91    /// Pretty-print JSON output.
92    #[serde(default = "default_pretty_print")]
93    pub pretty_print: bool,
94}
95
96fn default_pretty_print() -> bool {
97    true
98}
99
100impl Default for FileMetricsConfig {
101    fn default() -> Self {
102        Self {
103            path: std::env::temp_dir().join("symbiont_scheduler_metrics.json"),
104            pretty_print: true,
105        }
106    }
107}
108
109/// Top-level metrics configuration.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct MetricsConfig {
112    /// Enable metrics collection and export.
113    #[serde(default = "default_enabled")]
114    pub enabled: bool,
115
116    /// Export interval in seconds.
117    #[serde(default = "default_export_interval")]
118    pub export_interval_seconds: u64,
119
120    /// Service name reported to backends.
121    #[serde(default = "default_service_name")]
122    pub service_name: String,
123
124    /// Service namespace reported to backends.
125    #[serde(default = "default_service_namespace")]
126    pub service_namespace: String,
127
128    /// OTLP exporter configuration (requires `metrics` feature).
129    pub otlp: Option<OtlpConfig>,
130
131    /// File exporter configuration.
132    pub file: Option<FileMetricsConfig>,
133}
134
135fn default_enabled() -> bool {
136    true
137}
138
139fn default_export_interval() -> u64 {
140    60
141}
142
143fn default_service_name() -> String {
144    "symbiont-scheduler".to_string()
145}
146
147fn default_service_namespace() -> String {
148    "symbiont".to_string()
149}
150
151impl Default for MetricsConfig {
152    fn default() -> Self {
153        Self {
154            enabled: true,
155            export_interval_seconds: 60,
156            service_name: default_service_name(),
157            service_namespace: default_service_namespace(),
158            otlp: None,
159            file: Some(FileMetricsConfig::default()),
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// Snapshot types
166// ---------------------------------------------------------------------------
167
168/// Point-in-time snapshot of all scheduler metrics.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct MetricsSnapshot {
171    /// Unix timestamp (seconds) when snapshot was taken.
172    pub timestamp: u64,
173    /// Scheduler-level metrics.
174    pub scheduler: SchedulerMetrics,
175    /// Task manager metrics.
176    pub task_manager: TaskManagerMetrics,
177    /// Load balancer metrics.
178    pub load_balancer: LoadBalancerMetrics,
179    /// System resource metrics.
180    pub system: SystemResourceMetrics,
181    /// Context compaction metrics.
182    pub compaction: Option<CompactionMetrics>,
183}
184
185/// Scheduler-level counters and gauges.
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct SchedulerMetrics {
188    pub total_scheduled: usize,
189    pub uptime_seconds: u64,
190    pub running_agents: usize,
191    pub queued_agents: usize,
192    pub suspended_agents: usize,
193    pub max_capacity: usize,
194    pub load_factor: f64,
195}
196
197/// Task manager statistics.
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TaskManagerMetrics {
200    pub total_tasks: usize,
201    pub healthy_tasks: usize,
202    pub average_uptime_seconds: f64,
203    pub total_memory_usage: usize,
204}
205
206/// Load balancer statistics.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct LoadBalancerMetrics {
209    pub total_allocations: usize,
210    pub active_allocations: usize,
211    pub memory_utilization: f64,
212    pub cpu_utilization: f64,
213    pub allocation_failures: usize,
214    pub average_allocation_time_ms: f64,
215}
216
217/// System resource usage.
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SystemResourceMetrics {
220    pub memory_usage_mb: f64,
221    pub cpu_usage_percent: f64,
222}
223
224/// Compaction pipeline metrics.
225#[derive(Debug, Clone, Default, Serialize, Deserialize)]
226pub struct CompactionMetrics {
227    /// Total number of compaction runs.
228    pub total_compactions: u64,
229    /// Cumulative tokens reclaimed.
230    pub total_tokens_saved: u64,
231    /// Compactions by tier: (tier_name, count).
232    pub compactions_by_tier: HashMap<String, u64>,
233    /// Current context utilization ratio (0.0–1.0), if known.
234    pub context_utilization_ratio: Option<f64>,
235}
236
237// ---------------------------------------------------------------------------
238// Exporter trait
239// ---------------------------------------------------------------------------
240
241/// Trait for metrics export backends.
242#[async_trait]
243pub trait MetricsExporter: Send + Sync {
244    /// Export a metrics snapshot to the backend.
245    async fn export(&self, snapshot: &MetricsSnapshot) -> Result<(), MetricsError>;
246
247    /// Flush pending data and release resources.
248    async fn shutdown(&self) -> Result<(), MetricsError>;
249}
250
251// ---------------------------------------------------------------------------
252// Composite exporter
253// ---------------------------------------------------------------------------
254
255/// Combines multiple exporters into a single exporter.
256///
257/// All backends are called on every export; individual failures are logged
258/// but do not prevent other backends from running.
259pub struct CompositeExporter {
260    exporters: Vec<Arc<dyn MetricsExporter>>,
261}
262
263impl CompositeExporter {
264    pub fn new(exporters: Vec<Arc<dyn MetricsExporter>>) -> Self {
265        Self { exporters }
266    }
267}
268
269#[async_trait]
270impl MetricsExporter for CompositeExporter {
271    async fn export(&self, snapshot: &MetricsSnapshot) -> Result<(), MetricsError> {
272        let mut last_error: Option<MetricsError> = None;
273        for exporter in &self.exporters {
274            if let Err(e) = exporter.export(snapshot).await {
275                tracing::warn!("Metrics exporter failed: {}", e);
276                last_error = Some(e);
277            }
278        }
279        // Propagate error only when a single exporter is configured and it failed.
280        if self.exporters.len() == 1 {
281            if let Some(e) = last_error {
282                return Err(e);
283            }
284        }
285        Ok(())
286    }
287
288    async fn shutdown(&self) -> Result<(), MetricsError> {
289        let mut last_error: Option<MetricsError> = None;
290        for exporter in &self.exporters {
291            if let Err(e) = exporter.shutdown().await {
292                tracing::warn!("Metrics exporter shutdown failed: {}", e);
293                last_error = Some(e);
294            }
295        }
296        if self.exporters.len() == 1 {
297            if let Some(e) = last_error {
298                return Err(e);
299            }
300        }
301        Ok(())
302    }
303}
304
305// ---------------------------------------------------------------------------
306// Factory
307// ---------------------------------------------------------------------------
308
309/// Build an exporter (or composite) from configuration.
310pub fn create_exporter(config: &MetricsConfig) -> Result<Arc<dyn MetricsExporter>, MetricsError> {
311    let mut exporters: Vec<Arc<dyn MetricsExporter>> = Vec::new();
312
313    // File exporter (always available).
314    if let Some(ref file_cfg) = config.file {
315        let file_exporter = file::FileExporter::new(file_cfg.clone())?;
316        exporters.push(Arc::new(file_exporter));
317    }
318
319    // OTLP exporter (requires `metrics` feature).
320    #[cfg(feature = "metrics")]
321    if let Some(ref otlp_cfg) = config.otlp {
322        let export_interval = std::time::Duration::from_secs(config.export_interval_seconds);
323        let otlp_exporter = otlp::OtlpExporter::new(
324            otlp_cfg.clone(),
325            &config.service_name,
326            &config.service_namespace,
327            export_interval,
328        )?;
329        exporters.push(Arc::new(otlp_exporter));
330    }
331
332    #[cfg(not(feature = "metrics"))]
333    if config.otlp.is_some() {
334        tracing::warn!(
335            "OTLP metrics configuration provided but the `metrics` feature is not enabled; \
336             OTLP exporter will not be created"
337        );
338    }
339
340    if exporters.is_empty() {
341        return Err(MetricsError::ConfigError(
342            "No metrics exporters configured (enable at least `file` or `otlp`)".to_string(),
343        ));
344    }
345
346    if exporters.len() == 1 {
347        Ok(exporters.remove(0))
348    } else {
349        Ok(Arc::new(CompositeExporter::new(exporters)))
350    }
351}
352
353// ---------------------------------------------------------------------------
354// Tests
355// ---------------------------------------------------------------------------
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[test]
362    fn test_default_metrics_config() {
363        let cfg = MetricsConfig::default();
364        assert!(cfg.enabled);
365        assert_eq!(cfg.export_interval_seconds, 60);
366        assert_eq!(cfg.service_name, "symbiont-scheduler");
367        assert!(cfg.file.is_some());
368        assert!(cfg.otlp.is_none());
369    }
370
371    #[test]
372    fn test_metrics_snapshot_serialization_roundtrip() {
373        let snapshot = MetricsSnapshot {
374            timestamp: 1700000000,
375            scheduler: SchedulerMetrics {
376                total_scheduled: 10,
377                uptime_seconds: 3600,
378                running_agents: 5,
379                queued_agents: 3,
380                suspended_agents: 2,
381                max_capacity: 1000,
382                load_factor: 0.005,
383            },
384            task_manager: TaskManagerMetrics {
385                total_tasks: 5,
386                healthy_tasks: 4,
387                average_uptime_seconds: 1800.0,
388                total_memory_usage: 1024,
389            },
390            load_balancer: LoadBalancerMetrics {
391                total_allocations: 100,
392                active_allocations: 5,
393                memory_utilization: 0.45,
394                cpu_utilization: 0.30,
395                allocation_failures: 2,
396                average_allocation_time_ms: 1.5,
397            },
398            system: SystemResourceMetrics {
399                memory_usage_mb: 512.0,
400                cpu_usage_percent: 30.0,
401            },
402            compaction: None,
403        };
404
405        let json = serde_json::to_string(&snapshot).unwrap();
406        let deser: MetricsSnapshot = serde_json::from_str(&json).unwrap();
407        assert_eq!(deser.timestamp, 1700000000);
408        assert_eq!(deser.scheduler.running_agents, 5);
409        assert_eq!(deser.task_manager.healthy_tasks, 4);
410        assert_eq!(deser.load_balancer.allocation_failures, 2);
411    }
412
413    #[test]
414    fn test_create_exporter_no_backends() {
415        let cfg = MetricsConfig {
416            enabled: true,
417            export_interval_seconds: 60,
418            service_name: "test".to_string(),
419            service_namespace: "test".to_string(),
420            otlp: None,
421            file: None,
422        };
423        assert!(create_exporter(&cfg).is_err());
424    }
425
426    #[test]
427    fn test_create_exporter_file_only() {
428        let cfg = MetricsConfig {
429            enabled: true,
430            export_interval_seconds: 60,
431            service_name: "test".to_string(),
432            service_namespace: "test".to_string(),
433            otlp: None,
434            file: Some(FileMetricsConfig {
435                path: std::env::temp_dir().join("test_metrics_create.json"),
436                pretty_print: true,
437            }),
438        };
439        assert!(create_exporter(&cfg).is_ok());
440    }
441
442    #[test]
443    fn test_otlp_protocol_default() {
444        let proto = OtlpProtocol::default();
445        assert!(matches!(proto, OtlpProtocol::Grpc));
446    }
447
448    #[tokio::test]
449    async fn test_composite_exporter_lifecycle() {
450        let file_cfg = FileMetricsConfig {
451            path: std::env::temp_dir().join("test_composite_lifecycle.json"),
452            pretty_print: false,
453        };
454        let file_exp =
455            Arc::new(file::FileExporter::new(file_cfg).unwrap()) as Arc<dyn MetricsExporter>;
456        let composite = CompositeExporter::new(vec![file_exp]);
457
458        let snapshot = MetricsSnapshot {
459            timestamp: 1,
460            scheduler: SchedulerMetrics {
461                total_scheduled: 0,
462                uptime_seconds: 0,
463                running_agents: 0,
464                queued_agents: 0,
465                suspended_agents: 0,
466                max_capacity: 100,
467                load_factor: 0.0,
468            },
469            task_manager: TaskManagerMetrics {
470                total_tasks: 0,
471                healthy_tasks: 0,
472                average_uptime_seconds: 0.0,
473                total_memory_usage: 0,
474            },
475            load_balancer: LoadBalancerMetrics {
476                total_allocations: 0,
477                active_allocations: 0,
478                memory_utilization: 0.0,
479                cpu_utilization: 0.0,
480                allocation_failures: 0,
481                average_allocation_time_ms: 0.0,
482            },
483            system: SystemResourceMetrics {
484                memory_usage_mb: 0.0,
485                cpu_usage_percent: 0.0,
486            },
487            compaction: None,
488        };
489
490        assert!(composite.export(&snapshot).await.is_ok());
491        assert!(composite.shutdown().await.is_ok());
492    }
493
494    #[test]
495    fn compaction_metrics_default() {
496        let m = CompactionMetrics::default();
497        assert_eq!(m.total_compactions, 0);
498        assert_eq!(m.total_tokens_saved, 0);
499        assert!(m.context_utilization_ratio.is_none());
500    }
501}