1use crate::analytics::WorkflowAnalytics;
46use serde::{Deserialize, Serialize};
47use std::collections::HashMap;
48use thiserror::Error;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52pub enum ExportFormat {
53 Prometheus,
55 OpenTelemetry,
57 InfluxDb,
59 Json,
61}
62
63#[derive(Debug, Clone)]
65pub struct MetricsExporter {
66 workflow_name: String,
68 labels: HashMap<String, String>,
70 namespace: String,
72}
73
74impl MetricsExporter {
75 pub fn new(workflow_name: impl Into<String>) -> Self {
77 Self {
78 workflow_name: workflow_name.into(),
79 labels: HashMap::new(),
80 namespace: "oxify".to_string(),
81 }
82 }
83
84 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
86 self.labels.insert(key.into(), value.into());
87 self
88 }
89
90 pub fn with_namespace(mut self, namespace: impl Into<String>) -> Self {
92 self.namespace = namespace.into();
93 self
94 }
95
96 pub fn export_prometheus(&self, analytics: &WorkflowAnalytics) -> Result<String, MetricsError> {
98 let mut output = String::new();
99 let labels = self.format_prometheus_labels();
100
101 output.push_str(&format!(
103 "# HELP {}_executions_total Total number of workflow executions\n",
104 self.namespace
105 ));
106 output.push_str(&format!(
107 "# TYPE {}_executions_total counter\n",
108 self.namespace
109 ));
110 output.push_str(&format!(
111 "{}_executions_total{{{}}} {}\n",
112 self.namespace, labels, analytics.execution_stats.total_executions
113 ));
114
115 output.push_str(&format!(
116 "# HELP {}_executions_successful_total Total number of successful executions\n",
117 self.namespace
118 ));
119 output.push_str(&format!(
120 "# TYPE {}_executions_successful_total counter\n",
121 self.namespace
122 ));
123 output.push_str(&format!(
124 "{}_executions_successful_total{{{}}} {}\n",
125 self.namespace, labels, analytics.execution_stats.successful_executions
126 ));
127
128 output.push_str(&format!(
129 "# HELP {}_executions_failed_total Total number of failed executions\n",
130 self.namespace
131 ));
132 output.push_str(&format!(
133 "# TYPE {}_executions_failed_total counter\n",
134 self.namespace
135 ));
136 output.push_str(&format!(
137 "{}_executions_failed_total{{{}}} {}\n",
138 self.namespace, labels, analytics.execution_stats.failed_executions
139 ));
140
141 output.push_str(&format!(
143 "# HELP {}_success_rate Success rate of workflow executions (0-1)\n",
144 self.namespace
145 ));
146 output.push_str(&format!("# TYPE {}_success_rate gauge\n", self.namespace));
147 output.push_str(&format!(
148 "{}_success_rate{{{}}} {:.4}\n",
149 self.namespace, labels, analytics.execution_stats.success_rate
150 ));
151
152 let perf = &analytics.performance_metrics;
154 output.push_str(&format!(
155 "# HELP {}_duration_seconds Workflow execution duration in seconds\n",
156 self.namespace
157 ));
158 output.push_str(&format!(
159 "# TYPE {}_duration_seconds summary\n",
160 self.namespace
161 ));
162 output.push_str(&format!(
163 "{}_duration_seconds{{{}quantile=\"0.5\"}} {:.3}\n",
164 self.namespace,
165 labels,
166 perf.p50_duration_ms as f64 / 1000.0
167 ));
168 output.push_str(&format!(
169 "{}_duration_seconds{{{}quantile=\"0.95\"}} {:.3}\n",
170 self.namespace,
171 labels,
172 perf.p95_duration_ms as f64 / 1000.0
173 ));
174 output.push_str(&format!(
175 "{}_duration_seconds{{{}quantile=\"0.99\"}} {:.3}\n",
176 self.namespace,
177 labels,
178 perf.p99_duration_ms as f64 / 1000.0
179 ));
180 output.push_str(&format!(
181 "{}_duration_seconds_sum{{{}}} {:.3}\n",
182 self.namespace,
183 labels,
184 perf.avg_duration_ms / 1000.0 * analytics.execution_stats.total_executions as f64
185 ));
186 output.push_str(&format!(
187 "{}_duration_seconds_count{{{}}} {}\n",
188 self.namespace, labels, analytics.execution_stats.total_executions
189 ));
190
191 for node_stats in &analytics.node_analytics {
193 let node_labels = format!("{},node_id=\"{}\"", labels, node_stats.node_id);
194 output.push_str(&format!(
195 "{}_node_executions_total{{{}}} {}\n",
196 self.namespace, node_labels, node_stats.execution_count
197 ));
198 output.push_str(&format!(
199 "{}_node_duration_seconds{{{}}} {:.3}\n",
200 self.namespace,
201 node_labels,
202 node_stats.avg_duration_ms / 1000.0
203 ));
204 }
205
206 Ok(output)
207 }
208
209 pub fn export_opentelemetry(
211 &self,
212 analytics: &WorkflowAnalytics,
213 ) -> Result<serde_json::Value, MetricsError> {
214 let perf = &analytics.performance_metrics;
215
216 let metrics = vec![
217 self.create_otel_metric(
219 "executions.total",
220 "counter",
221 analytics.execution_stats.total_executions as f64,
222 "Number of total executions",
223 ),
224 self.create_otel_metric(
225 "executions.successful",
226 "counter",
227 analytics.execution_stats.successful_executions as f64,
228 "Number of successful executions",
229 ),
230 self.create_otel_metric(
231 "executions.failed",
232 "counter",
233 analytics.execution_stats.failed_executions as f64,
234 "Number of failed executions",
235 ),
236 self.create_otel_metric(
238 "success_rate",
239 "gauge",
240 analytics.execution_stats.success_rate,
241 "Success rate of executions",
242 ),
243 self.create_otel_metric(
245 "duration.p50",
246 "gauge",
247 perf.p50_duration_ms as f64,
248 "Median execution duration (ms)",
249 ),
250 self.create_otel_metric(
251 "duration.p95",
252 "gauge",
253 perf.p95_duration_ms as f64,
254 "95th percentile execution duration (ms)",
255 ),
256 self.create_otel_metric(
257 "duration.p99",
258 "gauge",
259 perf.p99_duration_ms as f64,
260 "99th percentile execution duration (ms)",
261 ),
262 ];
263
264 Ok(serde_json::json!({
265 "resourceMetrics": [{
266 "resource": {
267 "attributes": self.create_otel_attributes()
268 },
269 "scopeMetrics": [{
270 "scope": {
271 "name": "oxify-model",
272 "version": env!("CARGO_PKG_VERSION")
273 },
274 "metrics": metrics
275 }]
276 }]
277 }))
278 }
279
280 pub fn export_influxdb(&self, analytics: &WorkflowAnalytics) -> Result<String, MetricsError> {
282 let mut output = String::new();
283 let tags = self.format_influxdb_tags();
284 let timestamp = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
285
286 output.push_str(&format!(
288 "workflow_executions,{} total={},successful={},failed={},cancelled={} {}\n",
289 tags,
290 analytics.execution_stats.total_executions,
291 analytics.execution_stats.successful_executions,
292 analytics.execution_stats.failed_executions,
293 analytics.execution_stats.cancelled_executions,
294 timestamp
295 ));
296
297 output.push_str(&format!(
298 "workflow_rates,{} success_rate={:.4},failure_rate={:.4} {}\n",
299 tags,
300 analytics.execution_stats.success_rate,
301 analytics.execution_stats.failure_rate,
302 timestamp
303 ));
304
305 let perf = &analytics.performance_metrics;
307 output.push_str(&format!(
308 "workflow_duration,{} min={},max={},avg={:.3},median={},p95={},p99={} {}\n",
309 tags,
310 perf.min_duration_ms,
311 perf.max_duration_ms,
312 perf.avg_duration_ms,
313 perf.p50_duration_ms,
314 perf.p95_duration_ms,
315 perf.p99_duration_ms,
316 timestamp
317 ));
318
319 for node_stats in &analytics.node_analytics {
321 output.push_str(&format!(
322 "node_metrics,{},node_id={} executions={},avg_duration={:.3} {}\n",
323 tags,
324 node_stats.node_id,
325 node_stats.execution_count,
326 node_stats.avg_duration_ms,
327 timestamp
328 ));
329 }
330
331 Ok(output)
332 }
333
334 pub fn export_json(
336 &self,
337 analytics: &WorkflowAnalytics,
338 ) -> Result<serde_json::Value, MetricsError> {
339 let perf = &analytics.performance_metrics;
340 Ok(serde_json::json!({
341 "workflow": self.workflow_name,
342 "namespace": self.namespace,
343 "labels": self.labels,
344 "timestamp": chrono::Utc::now().to_rfc3339(),
345 "execution_stats": {
346 "total": analytics.execution_stats.total_executions,
347 "successful": analytics.execution_stats.successful_executions,
348 "failed": analytics.execution_stats.failed_executions,
349 "cancelled": analytics.execution_stats.cancelled_executions,
350 "success_rate": analytics.execution_stats.success_rate,
351 "failure_rate": analytics.execution_stats.failure_rate,
352 },
353 "performance": {
354 "min_duration_ms": perf.min_duration_ms,
355 "max_duration_ms": perf.max_duration_ms,
356 "avg_duration_ms": perf.avg_duration_ms,
357 "p50_duration_ms": perf.p50_duration_ms,
358 "p95_duration_ms": perf.p95_duration_ms,
359 "p99_duration_ms": perf.p99_duration_ms,
360 },
361 "node_analytics": analytics.node_analytics,
362 "error_patterns": analytics.error_patterns,
363 }))
364 }
365
366 pub fn export(
368 &self,
369 analytics: &WorkflowAnalytics,
370 format: ExportFormat,
371 ) -> Result<String, MetricsError> {
372 match format {
373 ExportFormat::Prometheus => self.export_prometheus(analytics),
374 ExportFormat::OpenTelemetry => {
375 let json = self.export_opentelemetry(analytics)?;
376 serde_json::to_string_pretty(&json)
377 .map_err(|e| MetricsError::SerializationError(e.to_string()))
378 }
379 ExportFormat::InfluxDb => self.export_influxdb(analytics),
380 ExportFormat::Json => {
381 let json = self.export_json(analytics)?;
382 serde_json::to_string_pretty(&json)
383 .map_err(|e| MetricsError::SerializationError(e.to_string()))
384 }
385 }
386 }
387
388 fn format_prometheus_labels(&self) -> String {
391 let mut labels = vec![format!("workflow=\"{}\"", self.workflow_name)];
392 for (k, v) in &self.labels {
393 labels.push(format!("{}=\"{}\"", k, v));
394 }
395 labels.join(",")
396 }
397
398 fn format_influxdb_tags(&self) -> String {
399 let mut tags = vec![format!("workflow={}", self.workflow_name)];
400 for (k, v) in &self.labels {
401 tags.push(format!("{}={}", k, v));
402 }
403 tags.join(",")
404 }
405
406 fn create_otel_attributes(&self) -> Vec<serde_json::Value> {
407 let mut attrs = vec![serde_json::json!({
408 "key": "workflow.name",
409 "value": { "stringValue": self.workflow_name }
410 })];
411
412 for (k, v) in &self.labels {
413 attrs.push(serde_json::json!({
414 "key": k,
415 "value": { "stringValue": v }
416 }));
417 }
418
419 attrs
420 }
421
422 fn create_otel_metric(
423 &self,
424 name: &str,
425 metric_type: &str,
426 value: f64,
427 description: &str,
428 ) -> serde_json::Value {
429 serde_json::json!({
430 "name": format!("{}.{}", self.namespace, name),
431 "description": description,
432 "unit": match metric_type {
433 "counter" => "1",
434 "gauge" => "1",
435 _ => "1"
436 },
437 metric_type: {
438 "dataPoints": [{
439 "asDouble": value,
440 "timeUnixNano": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
441 "attributes": self.create_otel_attributes()
442 }]
443 }
444 })
445 }
446}
447
448#[derive(Debug, Error)]
450pub enum MetricsError {
451 #[error("Serialization error: {0}")]
453 SerializationError(String),
454
455 #[error("Invalid metric data: {0}")]
457 InvalidData(String),
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use crate::analytics::{
464 AnalyticsPeriod, ExecutionStats, NodeAnalytics, PerformanceMetrics, PeriodType,
465 };
466 use chrono::Utc;
467 use uuid::Uuid;
468
469 fn create_test_analytics() -> WorkflowAnalytics {
470 WorkflowAnalytics {
471 workflow_id: Uuid::new_v4(),
472 workflow_name: "test_workflow".to_string(),
473 period: AnalyticsPeriod {
474 start: Utc::now(),
475 end: Utc::now(),
476 period_type: PeriodType::Daily,
477 },
478 execution_stats: ExecutionStats {
479 total_executions: 100,
480 successful_executions: 85,
481 failed_executions: 10,
482 cancelled_executions: 5,
483 success_rate: 0.85,
484 failure_rate: 0.10,
485 executions_per_hour: 10.0,
486 },
487 performance_metrics: PerformanceMetrics {
488 avg_duration_ms: 1500.0,
489 p50_duration_ms: 1200,
490 p95_duration_ms: 3000,
491 p99_duration_ms: 4500,
492 min_duration_ms: 100,
493 max_duration_ms: 5000,
494 total_tokens: 1000000,
495 avg_tokens: 10000.0,
496 total_cost_usd: 100.0,
497 avg_cost_usd: 1.0,
498 },
499 node_analytics: vec![NodeAnalytics {
500 node_id: Uuid::new_v4(),
501 node_name: "node1".to_string(),
502 node_type: "LLM".to_string(),
503 execution_count: 100,
504 success_count: 95,
505 failure_count: 5,
506 avg_duration_ms: 500.0,
507 max_duration_ms: 1000,
508 total_duration_ms: 50000,
509 time_percentage: 33.0,
510 is_bottleneck: false,
511 }],
512 error_patterns: vec![],
513 updated_at: Utc::now(),
514 }
515 }
516
517 #[test]
518 fn test_metrics_exporter_creation() {
519 let exporter = MetricsExporter::new("test_workflow");
520 assert_eq!(exporter.workflow_name, "test_workflow");
521 assert_eq!(exporter.namespace, "oxify");
522 assert!(exporter.labels.is_empty());
523 }
524
525 #[test]
526 fn test_with_label() {
527 let exporter = MetricsExporter::new("test")
528 .with_label("env", "production")
529 .with_label("region", "us-east-1");
530
531 assert_eq!(exporter.labels.len(), 2);
532 assert_eq!(exporter.labels.get("env"), Some(&"production".to_string()));
533 assert_eq!(
534 exporter.labels.get("region"),
535 Some(&"us-east-1".to_string())
536 );
537 }
538
539 #[test]
540 fn test_with_namespace() {
541 let exporter = MetricsExporter::new("test").with_namespace("custom");
542 assert_eq!(exporter.namespace, "custom");
543 }
544
545 #[test]
546 fn test_export_prometheus() {
547 let exporter = MetricsExporter::new("test_workflow");
548 let analytics = create_test_analytics();
549 let result = exporter.export_prometheus(&analytics);
550
551 assert!(result.is_ok());
552 let output = result.unwrap();
553
554 assert!(output.contains("oxify_executions_total"));
556 assert!(output.contains("oxify_executions_successful_total"));
557 assert!(output.contains("oxify_executions_failed_total"));
558 assert!(output.contains("oxify_success_rate"));
559 assert!(output.contains("oxify_duration_seconds"));
560
561 assert!(output.contains("100")); assert!(output.contains("85")); assert!(output.contains("10")); assert!(output.contains("0.85")); assert!(output.contains("workflow=\"test_workflow\""));
569 }
570
571 #[test]
572 fn test_export_opentelemetry() {
573 let exporter = MetricsExporter::new("test_workflow");
574 let analytics = create_test_analytics();
575 let result = exporter.export_opentelemetry(&analytics);
576
577 assert!(result.is_ok());
578 let json = result.unwrap();
579
580 assert!(json["resourceMetrics"].is_array());
582 assert!(json["resourceMetrics"][0]["scopeMetrics"].is_array());
583 assert!(json["resourceMetrics"][0]["scopeMetrics"][0]["metrics"].is_array());
584
585 let metrics = json["resourceMetrics"][0]["scopeMetrics"][0]["metrics"]
587 .as_array()
588 .unwrap();
589 assert!(!metrics.is_empty());
590
591 let metric_names: Vec<String> = metrics
593 .iter()
594 .filter_map(|m| m["name"].as_str().map(String::from))
595 .collect();
596
597 assert!(metric_names.iter().any(|n| n.contains("executions.total")));
598 assert!(metric_names.iter().any(|n| n.contains("success_rate")));
599 }
600
601 #[test]
602 fn test_export_influxdb() {
603 let exporter = MetricsExporter::new("test_workflow");
604 let analytics = create_test_analytics();
605 let result = exporter.export_influxdb(&analytics);
606
607 assert!(result.is_ok());
608 let output = result.unwrap();
609
610 assert!(output.contains("workflow_executions"));
612 assert!(output.contains("workflow_rates"));
613 assert!(output.contains("workflow_duration"));
614 assert!(output.contains("node_metrics"));
615
616 assert!(output.contains("workflow=test_workflow"));
618
619 assert!(output.contains("total=100"));
621 assert!(output.contains("successful=85"));
622 assert!(output.contains("failed=10"));
623 }
624
625 #[test]
626 fn test_export_json() {
627 let exporter = MetricsExporter::new("test_workflow");
628 let analytics = create_test_analytics();
629 let result = exporter.export_json(&analytics);
630
631 assert!(result.is_ok());
632 let json = result.unwrap();
633
634 assert_eq!(json["workflow"], "test_workflow");
635 assert_eq!(json["namespace"], "oxify");
636 assert_eq!(json["execution_stats"]["total"], 100);
637 assert_eq!(json["execution_stats"]["successful"], 85);
638 assert_eq!(json["execution_stats"]["failed"], 10);
639 assert_eq!(json["execution_stats"]["success_rate"], 0.85);
640 }
641
642 #[test]
643 fn test_export_with_format() {
644 let exporter = MetricsExporter::new("test");
645 let analytics = create_test_analytics();
646
647 assert!(exporter
649 .export(&analytics, ExportFormat::Prometheus)
650 .is_ok());
651 assert!(exporter
652 .export(&analytics, ExportFormat::OpenTelemetry)
653 .is_ok());
654 assert!(exporter.export(&analytics, ExportFormat::InfluxDb).is_ok());
655 assert!(exporter.export(&analytics, ExportFormat::Json).is_ok());
656 }
657
658 #[test]
659 fn test_prometheus_labels_formatting() {
660 let exporter = MetricsExporter::new("test")
661 .with_label("env", "prod")
662 .with_label("region", "us");
663
664 let labels = exporter.format_prometheus_labels();
665 assert!(labels.contains("workflow=\"test\""));
666 assert!(labels.contains("env=\"prod\""));
667 assert!(labels.contains("region=\"us\""));
668 }
669
670 #[test]
671 fn test_influxdb_tags_formatting() {
672 let exporter = MetricsExporter::new("test").with_label("env", "prod");
673
674 let tags = exporter.format_influxdb_tags();
675 assert!(tags.contains("workflow=test"));
676 assert!(tags.contains("env=prod"));
677 }
678
679 #[test]
680 fn test_otel_attributes_creation() {
681 let exporter = MetricsExporter::new("test").with_label("env", "prod");
682
683 let attrs = exporter.create_otel_attributes();
684 assert!(!attrs.is_empty());
685
686 let workflow_attr = attrs.iter().find(|a| a["key"] == "workflow.name");
688 assert!(workflow_attr.is_some());
689 }
690
691 #[test]
692 fn test_export_format_enum() {
693 assert_eq!(ExportFormat::Prometheus, ExportFormat::Prometheus);
694 assert_ne!(ExportFormat::Prometheus, ExportFormat::Json);
695 }
696
697 #[test]
698 fn test_prometheus_node_metrics() {
699 let exporter = MetricsExporter::new("test");
700 let analytics = create_test_analytics();
701 let output = exporter.export_prometheus(&analytics).unwrap();
702
703 assert!(output.contains("node_id="));
705 assert!(output.contains("oxify_node_executions_total"));
706 assert!(output.contains("oxify_node_duration_seconds"));
707 }
708
709 #[test]
710 fn test_influxdb_node_metrics() {
711 let exporter = MetricsExporter::new("test");
712 let analytics = create_test_analytics();
713 let output = exporter.export_influxdb(&analytics).unwrap();
714
715 assert!(output.contains("node_metrics"));
716 assert!(output.contains("node_id="));
717 assert!(output.contains("executions=100"));
718 }
719
720 #[test]
721 fn test_custom_namespace() {
722 let exporter = MetricsExporter::new("test").with_namespace("custom_ns");
723 let analytics = create_test_analytics();
724 let output = exporter.export_prometheus(&analytics).unwrap();
725
726 assert!(output.contains("custom_ns_executions_total"));
727 assert!(output.contains("custom_ns_success_rate"));
728 }
729
730 #[test]
731 fn test_metrics_with_multiple_labels() {
732 let exporter = MetricsExporter::new("test")
733 .with_label("env", "staging")
734 .with_label("region", "eu-west-1")
735 .with_label("team", "platform");
736
737 let analytics = create_test_analytics();
738 let output = exporter.export_prometheus(&analytics).unwrap();
739
740 assert!(output.contains("env=\"staging\""));
741 assert!(output.contains("region=\"eu-west-1\""));
742 assert!(output.contains("team=\"platform\""));
743 }
744}