1use async_trait::async_trait;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use thiserror::Error;
15
16pub mod file;
17
18#[cfg(feature = "metrics")]
19pub mod otlp;
20
21#[derive(Debug, Error)]
27pub enum MetricsError {
28 #[error("metrics export failed: {0}")]
29 ExportFailed(String),
30
31 #[error("metrics configuration error: {0}")]
32 ConfigError(String),
33
34 #[error("I/O error: {0}")]
35 Io(#[from] std::io::Error),
36
37 #[error("serialization error: {0}")]
38 Serialization(#[from] serde_json::Error),
39
40 #[error("metrics shutdown failed: {0}")]
41 ShutdownFailed(String),
42}
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize)]
50#[serde(rename_all = "lowercase")]
51pub enum OtlpProtocol {
52 #[default]
54 Grpc,
55 HttpBinary,
57 HttpJson,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct OtlpConfig {
64 pub endpoint: String,
66
67 #[serde(default)]
69 pub protocol: OtlpProtocol,
70
71 #[serde(default = "default_otlp_timeout")]
73 pub timeout_seconds: u64,
74
75 #[serde(default)]
78 pub headers: std::collections::HashMap<String, String>,
79}
80
81fn default_otlp_timeout() -> u64 {
82 10
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct FileMetricsConfig {
88 pub path: PathBuf,
90
91 #[serde(default = "default_pretty_print")]
93 pub pretty_print: bool,
94}
95
96fn default_pretty_print() -> bool {
97 true
98}
99
100impl Default for FileMetricsConfig {
101 fn default() -> Self {
102 Self {
103 path: std::env::temp_dir().join("symbiont_scheduler_metrics.json"),
104 pretty_print: true,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct MetricsConfig {
112 #[serde(default = "default_enabled")]
114 pub enabled: bool,
115
116 #[serde(default = "default_export_interval")]
118 pub export_interval_seconds: u64,
119
120 #[serde(default = "default_service_name")]
122 pub service_name: String,
123
124 #[serde(default = "default_service_namespace")]
126 pub service_namespace: String,
127
128 pub otlp: Option<OtlpConfig>,
130
131 pub file: Option<FileMetricsConfig>,
133}
134
135fn default_enabled() -> bool {
136 true
137}
138
139fn default_export_interval() -> u64 {
140 60
141}
142
143fn default_service_name() -> String {
144 "symbiont-scheduler".to_string()
145}
146
147fn default_service_namespace() -> String {
148 "symbiont".to_string()
149}
150
151impl Default for MetricsConfig {
152 fn default() -> Self {
153 Self {
154 enabled: true,
155 export_interval_seconds: 60,
156 service_name: default_service_name(),
157 service_namespace: default_service_namespace(),
158 otlp: None,
159 file: Some(FileMetricsConfig::default()),
160 }
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct MetricsSnapshot {
171 pub timestamp: u64,
173 pub scheduler: SchedulerMetrics,
175 pub task_manager: TaskManagerMetrics,
177 pub load_balancer: LoadBalancerMetrics,
179 pub system: SystemResourceMetrics,
181 pub compaction: Option<CompactionMetrics>,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct SchedulerMetrics {
188 pub total_scheduled: usize,
189 pub uptime_seconds: u64,
190 pub running_agents: usize,
191 pub queued_agents: usize,
192 pub suspended_agents: usize,
193 pub max_capacity: usize,
194 pub load_factor: f64,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TaskManagerMetrics {
200 pub total_tasks: usize,
201 pub healthy_tasks: usize,
202 pub average_uptime_seconds: f64,
203 pub total_memory_usage: usize,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct LoadBalancerMetrics {
209 pub total_allocations: usize,
210 pub active_allocations: usize,
211 pub memory_utilization: f64,
212 pub cpu_utilization: f64,
213 pub allocation_failures: usize,
214 pub average_allocation_time_ms: f64,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SystemResourceMetrics {
220 pub memory_usage_mb: f64,
221 pub cpu_usage_percent: f64,
222}
223
224#[derive(Debug, Clone, Default, Serialize, Deserialize)]
226pub struct CompactionMetrics {
227 pub total_compactions: u64,
229 pub total_tokens_saved: u64,
231 pub compactions_by_tier: HashMap<String, u64>,
233 pub context_utilization_ratio: Option<f64>,
235}
236
237#[async_trait]
243pub trait MetricsExporter: Send + Sync {
244 async fn export(&self, snapshot: &MetricsSnapshot) -> Result<(), MetricsError>;
246
247 async fn shutdown(&self) -> Result<(), MetricsError>;
249}
250
251pub struct CompositeExporter {
260 exporters: Vec<Arc<dyn MetricsExporter>>,
261}
262
263impl CompositeExporter {
264 pub fn new(exporters: Vec<Arc<dyn MetricsExporter>>) -> Self {
265 Self { exporters }
266 }
267}
268
269#[async_trait]
270impl MetricsExporter for CompositeExporter {
271 async fn export(&self, snapshot: &MetricsSnapshot) -> Result<(), MetricsError> {
272 let mut last_error: Option<MetricsError> = None;
273 for exporter in &self.exporters {
274 if let Err(e) = exporter.export(snapshot).await {
275 tracing::warn!("Metrics exporter failed: {}", e);
276 last_error = Some(e);
277 }
278 }
279 if self.exporters.len() == 1 {
281 if let Some(e) = last_error {
282 return Err(e);
283 }
284 }
285 Ok(())
286 }
287
288 async fn shutdown(&self) -> Result<(), MetricsError> {
289 let mut last_error: Option<MetricsError> = None;
290 for exporter in &self.exporters {
291 if let Err(e) = exporter.shutdown().await {
292 tracing::warn!("Metrics exporter shutdown failed: {}", e);
293 last_error = Some(e);
294 }
295 }
296 if self.exporters.len() == 1 {
297 if let Some(e) = last_error {
298 return Err(e);
299 }
300 }
301 Ok(())
302 }
303}
304
305pub fn create_exporter(config: &MetricsConfig) -> Result<Arc<dyn MetricsExporter>, MetricsError> {
311 let mut exporters: Vec<Arc<dyn MetricsExporter>> = Vec::new();
312
313 if let Some(ref file_cfg) = config.file {
315 let file_exporter = file::FileExporter::new(file_cfg.clone())?;
316 exporters.push(Arc::new(file_exporter));
317 }
318
319 #[cfg(feature = "metrics")]
321 if let Some(ref otlp_cfg) = config.otlp {
322 let export_interval = std::time::Duration::from_secs(config.export_interval_seconds);
323 let otlp_exporter = otlp::OtlpExporter::new(
324 otlp_cfg.clone(),
325 &config.service_name,
326 &config.service_namespace,
327 export_interval,
328 )?;
329 exporters.push(Arc::new(otlp_exporter));
330 }
331
332 #[cfg(not(feature = "metrics"))]
333 if config.otlp.is_some() {
334 tracing::warn!(
335 "OTLP metrics configuration provided but the `metrics` feature is not enabled; \
336 OTLP exporter will not be created"
337 );
338 }
339
340 if exporters.is_empty() {
341 return Err(MetricsError::ConfigError(
342 "No metrics exporters configured (enable at least `file` or `otlp`)".to_string(),
343 ));
344 }
345
346 if exporters.len() == 1 {
347 Ok(exporters.remove(0))
348 } else {
349 Ok(Arc::new(CompositeExporter::new(exporters)))
350 }
351}
352
353#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn test_default_metrics_config() {
363 let cfg = MetricsConfig::default();
364 assert!(cfg.enabled);
365 assert_eq!(cfg.export_interval_seconds, 60);
366 assert_eq!(cfg.service_name, "symbiont-scheduler");
367 assert!(cfg.file.is_some());
368 assert!(cfg.otlp.is_none());
369 }
370
371 #[test]
372 fn test_metrics_snapshot_serialization_roundtrip() {
373 let snapshot = MetricsSnapshot {
374 timestamp: 1700000000,
375 scheduler: SchedulerMetrics {
376 total_scheduled: 10,
377 uptime_seconds: 3600,
378 running_agents: 5,
379 queued_agents: 3,
380 suspended_agents: 2,
381 max_capacity: 1000,
382 load_factor: 0.005,
383 },
384 task_manager: TaskManagerMetrics {
385 total_tasks: 5,
386 healthy_tasks: 4,
387 average_uptime_seconds: 1800.0,
388 total_memory_usage: 1024,
389 },
390 load_balancer: LoadBalancerMetrics {
391 total_allocations: 100,
392 active_allocations: 5,
393 memory_utilization: 0.45,
394 cpu_utilization: 0.30,
395 allocation_failures: 2,
396 average_allocation_time_ms: 1.5,
397 },
398 system: SystemResourceMetrics {
399 memory_usage_mb: 512.0,
400 cpu_usage_percent: 30.0,
401 },
402 compaction: None,
403 };
404
405 let json = serde_json::to_string(&snapshot).unwrap();
406 let deser: MetricsSnapshot = serde_json::from_str(&json).unwrap();
407 assert_eq!(deser.timestamp, 1700000000);
408 assert_eq!(deser.scheduler.running_agents, 5);
409 assert_eq!(deser.task_manager.healthy_tasks, 4);
410 assert_eq!(deser.load_balancer.allocation_failures, 2);
411 }
412
413 #[test]
414 fn test_create_exporter_no_backends() {
415 let cfg = MetricsConfig {
416 enabled: true,
417 export_interval_seconds: 60,
418 service_name: "test".to_string(),
419 service_namespace: "test".to_string(),
420 otlp: None,
421 file: None,
422 };
423 assert!(create_exporter(&cfg).is_err());
424 }
425
426 #[test]
427 fn test_create_exporter_file_only() {
428 let cfg = MetricsConfig {
429 enabled: true,
430 export_interval_seconds: 60,
431 service_name: "test".to_string(),
432 service_namespace: "test".to_string(),
433 otlp: None,
434 file: Some(FileMetricsConfig {
435 path: std::env::temp_dir().join("test_metrics_create.json"),
436 pretty_print: true,
437 }),
438 };
439 assert!(create_exporter(&cfg).is_ok());
440 }
441
442 #[test]
443 fn test_otlp_protocol_default() {
444 let proto = OtlpProtocol::default();
445 assert!(matches!(proto, OtlpProtocol::Grpc));
446 }
447
448 #[tokio::test]
449 async fn test_composite_exporter_lifecycle() {
450 let file_cfg = FileMetricsConfig {
451 path: std::env::temp_dir().join("test_composite_lifecycle.json"),
452 pretty_print: false,
453 };
454 let file_exp =
455 Arc::new(file::FileExporter::new(file_cfg).unwrap()) as Arc<dyn MetricsExporter>;
456 let composite = CompositeExporter::new(vec![file_exp]);
457
458 let snapshot = MetricsSnapshot {
459 timestamp: 1,
460 scheduler: SchedulerMetrics {
461 total_scheduled: 0,
462 uptime_seconds: 0,
463 running_agents: 0,
464 queued_agents: 0,
465 suspended_agents: 0,
466 max_capacity: 100,
467 load_factor: 0.0,
468 },
469 task_manager: TaskManagerMetrics {
470 total_tasks: 0,
471 healthy_tasks: 0,
472 average_uptime_seconds: 0.0,
473 total_memory_usage: 0,
474 },
475 load_balancer: LoadBalancerMetrics {
476 total_allocations: 0,
477 active_allocations: 0,
478 memory_utilization: 0.0,
479 cpu_utilization: 0.0,
480 allocation_failures: 0,
481 average_allocation_time_ms: 0.0,
482 },
483 system: SystemResourceMetrics {
484 memory_usage_mb: 0.0,
485 cpu_usage_percent: 0.0,
486 },
487 compaction: None,
488 };
489
490 assert!(composite.export(&snapshot).await.is_ok());
491 assert!(composite.shutdown().await.is_ok());
492 }
493
494 #[test]
495 fn compaction_metrics_default() {
496 let m = CompactionMetrics::default();
497 assert_eq!(m.total_compactions, 0);
498 assert_eq!(m.total_tokens_saved, 0);
499 assert!(m.context_utilization_ratio.is_none());
500 }
501}