Skip to main content

ringkernel_core/
observability.rs

1//! Observability infrastructure for RingKernel.
2//!
3//! This module provides production-ready observability features:
4//!
5//! - **OpenTelemetry Integration** - Distributed tracing and metrics
6//! - **Prometheus Exporter** - Metrics in Prometheus exposition format
7//! - **Grafana Dashboard** - JSON templates for visualization
8//!
9//! ## Usage
10//!
11//! ```ignore
12//! use ringkernel_core::observability::{PrometheusExporter, GrafanaDashboard};
13//!
14//! // Create Prometheus exporter
15//! let exporter = PrometheusExporter::new();
16//! exporter.register_collector(metrics_collector);
17//!
18//! // Get Prometheus metrics
19//! let metrics = exporter.render();
20//! println!("{}", metrics);
21//!
22//! // Generate Grafana dashboard JSON
23//! let dashboard = GrafanaDashboard::new("RingKernel Metrics")
24//!     .add_kernel_panel()
25//!     .add_latency_panel()
26//!     .add_throughput_panel()
27//!     .build();
28//! ```
29
30use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39// ============================================================================
40// OpenTelemetry-Compatible Span/Trace Types
41// ============================================================================
42
43/// A trace ID compatible with OpenTelemetry W3C Trace Context.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48    /// Generate a new random trace ID.
49    pub fn new() -> Self {
50        use std::hash::{Hash, Hasher};
51        let mut hasher = std::collections::hash_map::DefaultHasher::new();
52        SystemTime::now().hash(&mut hasher);
53        std::thread::current().id().hash(&mut hasher);
54        let high = hasher.finish() as u128;
55        hasher.write_u64(high as u64);
56        let low = hasher.finish() as u128;
57        Self((high << 64) | low)
58    }
59
60    /// Parse from hex string.
61    pub fn from_hex(hex: &str) -> Option<Self> {
62        u128::from_str_radix(hex, 16).ok().map(Self)
63    }
64
65    /// Convert to hex string.
66    pub fn to_hex(&self) -> String {
67        format!("{:032x}", self.0)
68    }
69}
70
71impl Default for TraceId {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// A span ID compatible with OpenTelemetry W3C Trace Context.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82    /// Generate a new random span ID.
83    pub fn new() -> Self {
84        use std::hash::{Hash, Hasher};
85        let mut hasher = std::collections::hash_map::DefaultHasher::new();
86        SystemTime::now().hash(&mut hasher);
87        std::process::id().hash(&mut hasher);
88        Self(hasher.finish())
89    }
90
91    /// Parse from hex string.
92    pub fn from_hex(hex: &str) -> Option<Self> {
93        u64::from_str_radix(hex, 16).ok().map(Self)
94    }
95
96    /// Convert to hex string.
97    pub fn to_hex(&self) -> String {
98        format!("{:016x}", self.0)
99    }
100}
101
102impl Default for SpanId {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108/// Span kind (OpenTelemetry compatible).
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111    /// Internal operation.
112    Internal,
113    /// Server-side span (receiving request).
114    Server,
115    /// Client-side span (sending request).
116    Client,
117    /// Producer span (async message send).
118    Producer,
119    /// Consumer span (async message receive).
120    Consumer,
121}
122
123/// Span status (OpenTelemetry compatible).
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126    /// Unset status.
127    Unset,
128    /// Operation completed successfully.
129    Ok,
130    /// Operation failed with error message.
131    Error {
132        /// Error message describing what went wrong.
133        message: String,
134    },
135}
136
137/// An OpenTelemetry-compatible span.
138#[derive(Debug, Clone)]
139pub struct Span {
140    /// Trace ID.
141    pub trace_id: TraceId,
142    /// Span ID.
143    pub span_id: SpanId,
144    /// Parent span ID (if any).
145    pub parent_span_id: Option<SpanId>,
146    /// Span name.
147    pub name: String,
148    /// Span kind.
149    pub kind: SpanKind,
150    /// Start time.
151    pub start_time: Instant,
152    /// End time (if completed).
153    pub end_time: Option<Instant>,
154    /// Status.
155    pub status: SpanStatus,
156    /// Attributes (key-value pairs).
157    pub attributes: HashMap<String, AttributeValue>,
158    /// Events recorded during span.
159    pub events: Vec<SpanEvent>,
160}
161
162/// Attribute value types.
163#[derive(Debug, Clone)]
164pub enum AttributeValue {
165    /// String value.
166    String(String),
167    /// Integer value.
168    Int(i64),
169    /// Float value.
170    Float(f64),
171    /// Boolean value.
172    Bool(bool),
173    /// String array.
174    StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178    fn from(s: &str) -> Self {
179        Self::String(s.to_string())
180    }
181}
182
183impl From<String> for AttributeValue {
184    fn from(s: String) -> Self {
185        Self::String(s)
186    }
187}
188
189impl From<i64> for AttributeValue {
190    fn from(i: i64) -> Self {
191        Self::Int(i)
192    }
193}
194
195impl From<f64> for AttributeValue {
196    fn from(f: f64) -> Self {
197        Self::Float(f)
198    }
199}
200
201impl From<bool> for AttributeValue {
202    fn from(b: bool) -> Self {
203        Self::Bool(b)
204    }
205}
206
207/// An event that occurred during a span.
208#[derive(Debug, Clone)]
209pub struct SpanEvent {
210    /// Event name.
211    pub name: String,
212    /// Timestamp.
213    pub timestamp: Instant,
214    /// Event attributes.
215    pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219    /// Create a new span.
220    pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221        Self {
222            trace_id: TraceId::new(),
223            span_id: SpanId::new(),
224            parent_span_id: None,
225            name: name.into(),
226            kind,
227            start_time: Instant::now(),
228            end_time: None,
229            status: SpanStatus::Unset,
230            attributes: HashMap::new(),
231            events: Vec::new(),
232        }
233    }
234
235    /// Create a child span.
236    pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237        Self {
238            trace_id: self.trace_id,
239            span_id: SpanId::new(),
240            parent_span_id: Some(self.span_id),
241            name: name.into(),
242            kind,
243            start_time: Instant::now(),
244            end_time: None,
245            status: SpanStatus::Unset,
246            attributes: HashMap::new(),
247            events: Vec::new(),
248        }
249    }
250
251    /// Set an attribute.
252    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253        self.attributes.insert(key.into(), value.into());
254    }
255
256    /// Add an event.
257    pub fn add_event(&mut self, name: impl Into<String>) {
258        self.events.push(SpanEvent {
259            name: name.into(),
260            timestamp: Instant::now(),
261            attributes: HashMap::new(),
262        });
263    }
264
265    /// Add an event with attributes.
266    pub fn add_event_with_attributes(
267        &mut self,
268        name: impl Into<String>,
269        attributes: HashMap<String, AttributeValue>,
270    ) {
271        self.events.push(SpanEvent {
272            name: name.into(),
273            timestamp: Instant::now(),
274            attributes,
275        });
276    }
277
278    /// Set status to OK.
279    pub fn set_ok(&mut self) {
280        self.status = SpanStatus::Ok;
281    }
282
283    /// Set error status.
284    pub fn set_error(&mut self, message: impl Into<String>) {
285        self.status = SpanStatus::Error {
286            message: message.into(),
287        };
288    }
289
290    /// End the span.
291    pub fn end(&mut self) {
292        self.end_time = Some(Instant::now());
293    }
294
295    /// Get span duration.
296    pub fn duration(&self) -> Duration {
297        self.end_time
298            .unwrap_or_else(Instant::now)
299            .duration_since(self.start_time)
300    }
301
302    /// Check if span is ended.
303    pub fn is_ended(&self) -> bool {
304        self.end_time.is_some()
305    }
306}
307
308// ============================================================================
309// Span Builder
310// ============================================================================
311
312/// Builder for creating spans with fluent API.
313pub struct SpanBuilder {
314    name: String,
315    kind: SpanKind,
316    parent: Option<(TraceId, SpanId)>,
317    attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321    /// Create a new span builder.
322    pub fn new(name: impl Into<String>) -> Self {
323        Self {
324            name: name.into(),
325            kind: SpanKind::Internal,
326            parent: None,
327            attributes: HashMap::new(),
328        }
329    }
330
331    /// Set span kind.
332    pub fn kind(mut self, kind: SpanKind) -> Self {
333        self.kind = kind;
334        self
335    }
336
337    /// Set parent span.
338    pub fn parent(mut self, parent: &Span) -> Self {
339        self.parent = Some((parent.trace_id, parent.span_id));
340        self
341    }
342
343    /// Set attribute.
344    pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345        self.attributes.insert(key.into(), value.into());
346        self
347    }
348
349    /// Build the span.
350    pub fn build(self) -> Span {
351        let mut span = Span::new(self.name, self.kind);
352        if let Some((trace_id, parent_id)) = self.parent {
353            span.trace_id = trace_id;
354            span.parent_span_id = Some(parent_id);
355        }
356        span.attributes = self.attributes;
357        span
358    }
359}
360
361// ============================================================================
362// Prometheus Metrics Exporter
363// ============================================================================
364
365/// Prometheus metric type.
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368    /// Counter (monotonically increasing).
369    Counter,
370    /// Gauge (can go up or down).
371    Gauge,
372    /// Histogram (distribution of values).
373    Histogram,
374    /// Summary (quantiles).
375    Summary,
376}
377
378/// A Prometheus metric definition.
379#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381    /// Metric name.
382    pub name: String,
383    /// Metric type.
384    pub metric_type: MetricType,
385    /// Help text.
386    pub help: String,
387    /// Label names.
388    pub labels: Vec<String>,
389}
390
391/// A single metric sample.
392#[derive(Debug, Clone)]
393pub struct MetricSample {
394    /// Metric name.
395    pub name: String,
396    /// Label values (in order matching definition).
397    pub label_values: Vec<String>,
398    /// Sample value.
399    pub value: f64,
400    /// Timestamp (optional).
401    pub timestamp_ms: Option<u64>,
402}
403
404/// Prometheus metrics exporter.
405pub struct PrometheusExporter {
406    /// Metric definitions.
407    definitions: RwLock<Vec<MetricDefinition>>,
408    /// Registered collectors.
409    collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410    /// Custom metrics (for direct registration).
411    custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412    /// Export timestamp.
413    export_count: AtomicU64,
414}
415
416/// A custom registered metric.
417struct CustomMetric {
418    definition: MetricDefinition,
419    samples: Vec<MetricSample>,
420}
421
422/// Trait for collecting Prometheus metrics.
423pub trait PrometheusCollector: Send + Sync {
424    /// Get metric definitions.
425    fn definitions(&self) -> Vec<MetricDefinition>;
426
427    /// Collect current metric samples.
428    fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432    /// Create a new Prometheus exporter.
433    pub fn new() -> Arc<Self> {
434        Arc::new(Self {
435            definitions: RwLock::new(Vec::new()),
436            collectors: RwLock::new(Vec::new()),
437            custom_metrics: RwLock::new(HashMap::new()),
438            export_count: AtomicU64::new(0),
439        })
440    }
441
442    /// Register a collector.
443    pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444        let defs = collector.definitions();
445        self.definitions.write().extend(defs);
446        self.collectors.write().push(collector);
447    }
448
449    /// Register a counter metric.
450    pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451        let def = MetricDefinition {
452            name: name.to_string(),
453            metric_type: MetricType::Counter,
454            help: help.to_string(),
455            labels: labels.iter().map(|s| s.to_string()).collect(),
456        };
457        self.custom_metrics.write().insert(
458            name.to_string(),
459            CustomMetric {
460                definition: def,
461                samples: Vec::new(),
462            },
463        );
464    }
465
466    /// Register a gauge metric.
467    pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468        let def = MetricDefinition {
469            name: name.to_string(),
470            metric_type: MetricType::Gauge,
471            help: help.to_string(),
472            labels: labels.iter().map(|s| s.to_string()).collect(),
473        };
474        self.custom_metrics.write().insert(
475            name.to_string(),
476            CustomMetric {
477                definition: def,
478                samples: Vec::new(),
479            },
480        );
481    }
482
483    /// Register a histogram metric.
484    pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485        let def = MetricDefinition {
486            name: name.to_string(),
487            metric_type: MetricType::Histogram,
488            help: help.to_string(),
489            labels: labels.iter().map(|s| s.to_string()).collect(),
490        };
491        self.custom_metrics.write().insert(
492            name.to_string(),
493            CustomMetric {
494                definition: def,
495                samples: Vec::new(),
496            },
497        );
498    }
499
500    /// Set a metric value.
501    pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502        let mut metrics = self.custom_metrics.write();
503        if let Some(metric) = metrics.get_mut(name) {
504            let sample = MetricSample {
505                name: name.to_string(),
506                label_values: label_values.iter().map(|s| s.to_string()).collect(),
507                value,
508                timestamp_ms: None,
509            };
510            // Find and replace existing sample with same labels, or add new
511            let existing = metric
512                .samples
513                .iter_mut()
514                .find(|s| s.label_values == sample.label_values);
515            if let Some(existing) = existing {
516                existing.value = value;
517            } else {
518                metric.samples.push(sample);
519            }
520        }
521    }
522
523    /// Increment a counter.
524    pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525        self.add_counter(name, 1.0, label_values);
526    }
527
528    /// Add to a counter.
529    pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530        let mut metrics = self.custom_metrics.write();
531        if let Some(metric) = metrics.get_mut(name) {
532            let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533            let existing = metric
534                .samples
535                .iter_mut()
536                .find(|s| s.label_values == label_vec);
537            if let Some(existing) = existing {
538                existing.value += delta;
539            } else {
540                metric.samples.push(MetricSample {
541                    name: name.to_string(),
542                    label_values: label_vec,
543                    value: delta,
544                    timestamp_ms: None,
545                });
546            }
547        }
548    }
549
550    /// Render metrics in Prometheus exposition format.
551    pub fn render(&self) -> String {
552        self.export_count.fetch_add(1, Ordering::Relaxed);
553
554        let mut output = String::new();
555
556        // Collect from registered collectors
557        let collectors = self.collectors.read();
558        for collector in collectors.iter() {
559            let defs = collector.definitions();
560            let samples = collector.collect();
561
562            for def in &defs {
563                // Write TYPE and HELP
564                writeln!(output, "# HELP {} {}", def.name, def.help).unwrap();
565                writeln!(
566                    output,
567                    "# TYPE {} {}",
568                    def.name,
569                    match def.metric_type {
570                        MetricType::Counter => "counter",
571                        MetricType::Gauge => "gauge",
572                        MetricType::Histogram => "histogram",
573                        MetricType::Summary => "summary",
574                    }
575                )
576                .unwrap();
577
578                // Write samples for this metric
579                for sample in samples.iter().filter(|s| s.name == def.name) {
580                    Self::write_sample(&mut output, &def.labels, sample);
581                }
582            }
583        }
584
585        // Collect custom metrics
586        let custom = self.custom_metrics.read();
587        for metric in custom.values() {
588            writeln!(
589                output,
590                "# HELP {} {}",
591                metric.definition.name, metric.definition.help
592            )
593            .unwrap();
594            writeln!(
595                output,
596                "# TYPE {} {}",
597                metric.definition.name,
598                match metric.definition.metric_type {
599                    MetricType::Counter => "counter",
600                    MetricType::Gauge => "gauge",
601                    MetricType::Histogram => "histogram",
602                    MetricType::Summary => "summary",
603                }
604            )
605            .unwrap();
606
607            for sample in &metric.samples {
608                Self::write_sample(&mut output, &metric.definition.labels, sample);
609            }
610        }
611
612        output
613    }
614
615    fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
616        if labels.is_empty() || sample.label_values.is_empty() {
617            writeln!(output, "{} {}", sample.name, sample.value).unwrap();
618        } else {
619            let label_pairs: Vec<String> = labels
620                .iter()
621                .zip(sample.label_values.iter())
622                .map(|(k, v)| format!("{}=\"{}\"", k, v))
623                .collect();
624            writeln!(
625                output,
626                "{}{{{}}} {}",
627                sample.name,
628                label_pairs.join(","),
629                sample.value
630            )
631            .unwrap();
632        }
633    }
634
635    /// Get export count.
636    pub fn export_count(&self) -> u64 {
637        self.export_count.load(Ordering::Relaxed)
638    }
639}
640
641impl Default for PrometheusExporter {
642    fn default() -> Self {
643        Self {
644            definitions: RwLock::new(Vec::new()),
645            collectors: RwLock::new(Vec::new()),
646            custom_metrics: RwLock::new(HashMap::new()),
647            export_count: AtomicU64::new(0),
648        }
649    }
650}
651
652// ============================================================================
653// RingKernel Prometheus Collector
654// ============================================================================
655
656/// Prometheus collector for RingKernel metrics.
657pub struct RingKernelCollector {
658    /// Metrics collector to read from.
659    collector: Arc<MetricsCollector>,
660}
661
662impl RingKernelCollector {
663    /// Create a new RingKernel collector.
664    pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
665        Arc::new(Self { collector })
666    }
667}
668
669impl PrometheusCollector for RingKernelCollector {
670    fn definitions(&self) -> Vec<MetricDefinition> {
671        vec![
672            MetricDefinition {
673                name: "ringkernel_messages_processed_total".to_string(),
674                metric_type: MetricType::Counter,
675                help: "Total number of messages processed by kernels".to_string(),
676                labels: vec!["kernel_id".to_string()],
677            },
678            MetricDefinition {
679                name: "ringkernel_messages_dropped_total".to_string(),
680                metric_type: MetricType::Counter,
681                help: "Total number of messages dropped by kernels".to_string(),
682                labels: vec!["kernel_id".to_string()],
683            },
684            MetricDefinition {
685                name: "ringkernel_latency_us".to_string(),
686                metric_type: MetricType::Gauge,
687                help: "Current average message latency in microseconds".to_string(),
688                labels: vec!["kernel_id".to_string(), "stat".to_string()],
689            },
690            MetricDefinition {
691                name: "ringkernel_throughput".to_string(),
692                metric_type: MetricType::Gauge,
693                help: "Current message throughput per second".to_string(),
694                labels: vec!["kernel_id".to_string()],
695            },
696        ]
697    }
698
699    fn collect(&self) -> Vec<MetricSample> {
700        let aggregate = self.collector.get_aggregate();
701        let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
702
703        vec![
704            MetricSample {
705                name: "ringkernel_messages_processed_total".to_string(),
706                label_values: vec!["aggregate".to_string()],
707                value: aggregate.messages_processed as f64,
708                timestamp_ms: None,
709            },
710            MetricSample {
711                name: "ringkernel_messages_dropped_total".to_string(),
712                label_values: vec!["aggregate".to_string()],
713                value: aggregate.messages_dropped as f64,
714                timestamp_ms: None,
715            },
716            MetricSample {
717                name: "ringkernel_latency_us".to_string(),
718                label_values: vec!["aggregate".to_string(), "avg".to_string()],
719                value: aggregate.avg_latency_us(),
720                timestamp_ms: None,
721            },
722            MetricSample {
723                name: "ringkernel_latency_us".to_string(),
724                label_values: vec!["aggregate".to_string(), "min".to_string()],
725                value: aggregate.min_latency_us as f64,
726                timestamp_ms: None,
727            },
728            MetricSample {
729                name: "ringkernel_latency_us".to_string(),
730                label_values: vec!["aggregate".to_string(), "max".to_string()],
731                value: aggregate.max_latency_us as f64,
732                timestamp_ms: None,
733            },
734            MetricSample {
735                name: "ringkernel_throughput".to_string(),
736                label_values: vec!["aggregate".to_string()],
737                value: aggregate.messages_processed as f64 / elapsed,
738                timestamp_ms: None,
739            },
740        ]
741    }
742}
743
744// ============================================================================
745// Grafana Dashboard Generator
746// ============================================================================
747
748/// Grafana panel type.
749#[derive(Debug, Clone, Copy, PartialEq, Eq)]
750pub enum PanelType {
751    /// Time series graph.
752    Graph,
753    /// Single stat / gauge.
754    Stat,
755    /// Table.
756    Table,
757    /// Heatmap.
758    Heatmap,
759    /// Bar gauge.
760    BarGauge,
761}
762
763/// A Grafana panel definition.
764#[derive(Debug, Clone)]
765pub struct GrafanaPanel {
766    /// Panel title.
767    pub title: String,
768    /// Panel type.
769    pub panel_type: PanelType,
770    /// PromQL query expressions.
771    pub queries: Vec<String>,
772    /// Grid position.
773    pub grid_pos: (u32, u32, u32, u32), // x, y, w, h
774    /// Unit (for display).
775    pub unit: Option<String>,
776}
777
778/// Grafana dashboard builder.
779pub struct GrafanaDashboard {
780    /// Dashboard title.
781    title: String,
782    /// Dashboard description.
783    description: String,
784    /// Panels.
785    panels: Vec<GrafanaPanel>,
786    /// Refresh interval.
787    refresh: String,
788    /// Time range.
789    time_from: String,
790    /// Tags.
791    tags: Vec<String>,
792}
793
794impl GrafanaDashboard {
795    /// Create a new dashboard builder.
796    pub fn new(title: impl Into<String>) -> Self {
797        Self {
798            title: title.into(),
799            description: String::new(),
800            panels: Vec::new(),
801            refresh: "5s".to_string(),
802            time_from: "now-1h".to_string(),
803            tags: vec!["ringkernel".to_string()],
804        }
805    }
806
807    /// Set description.
808    pub fn description(mut self, desc: impl Into<String>) -> Self {
809        self.description = desc.into();
810        self
811    }
812
813    /// Set refresh interval.
814    pub fn refresh(mut self, interval: impl Into<String>) -> Self {
815        self.refresh = interval.into();
816        self
817    }
818
819    /// Set time range.
820    pub fn time_from(mut self, from: impl Into<String>) -> Self {
821        self.time_from = from.into();
822        self
823    }
824
825    /// Add a tag.
826    pub fn tag(mut self, tag: impl Into<String>) -> Self {
827        self.tags.push(tag.into());
828        self
829    }
830
831    /// Add a custom panel.
832    pub fn panel(mut self, panel: GrafanaPanel) -> Self {
833        self.panels.push(panel);
834        self
835    }
836
837    /// Add kernel throughput panel.
838    pub fn add_throughput_panel(mut self) -> Self {
839        self.panels.push(GrafanaPanel {
840            title: "Message Throughput".to_string(),
841            panel_type: PanelType::Graph,
842            queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
843            grid_pos: (0, 0, 12, 8),
844            unit: Some("msg/s".to_string()),
845        });
846        self
847    }
848
849    /// Add latency panel.
850    pub fn add_latency_panel(mut self) -> Self {
851        self.panels.push(GrafanaPanel {
852            title: "Message Latency".to_string(),
853            panel_type: PanelType::Graph,
854            queries: vec![
855                "ringkernel_latency_us{stat=\"avg\"}".to_string(),
856                "ringkernel_latency_us{stat=\"max\"}".to_string(),
857            ],
858            grid_pos: (12, 0, 12, 8),
859            unit: Some("µs".to_string()),
860        });
861        self
862    }
863
864    /// Add kernel status panel.
865    pub fn add_kernel_status_panel(mut self) -> Self {
866        self.panels.push(GrafanaPanel {
867            title: "Active Kernels".to_string(),
868            panel_type: PanelType::Stat,
869            queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
870            grid_pos: (0, 8, 6, 4),
871            unit: None,
872        });
873        self
874    }
875
876    /// Add drop rate panel.
877    pub fn add_drop_rate_panel(mut self) -> Self {
878        self.panels.push(GrafanaPanel {
879            title: "Message Drop Rate".to_string(),
880            panel_type: PanelType::Graph,
881            queries: vec![
882                "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
883            ],
884            grid_pos: (6, 8, 6, 4),
885            unit: Some("percentunit".to_string()),
886        });
887        self
888    }
889
890    /// Add multi-GPU panel.
891    pub fn add_multi_gpu_panel(mut self) -> Self {
892        self.panels.push(GrafanaPanel {
893            title: "GPU Memory Usage".to_string(),
894            panel_type: PanelType::BarGauge,
895            queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
896            grid_pos: (12, 8, 12, 4),
897            unit: Some("bytes".to_string()),
898        });
899        self
900    }
901
902    /// Add all standard panels.
903    pub fn add_standard_panels(self) -> Self {
904        self.add_throughput_panel()
905            .add_latency_panel()
906            .add_kernel_status_panel()
907            .add_drop_rate_panel()
908            .add_multi_gpu_panel()
909    }
910
911    /// Build dashboard JSON.
912    pub fn build(&self) -> String {
913        let panels_json: Vec<String> = self
914            .panels
915            .iter()
916            .enumerate()
917            .map(|(i, panel)| {
918                let queries_json: Vec<String> = panel
919                    .queries
920                    .iter()
921                    .enumerate()
922                    .map(|(j, q)| {
923                        format!(
924                            r#"{{
925                        "expr": "{}",
926                        "refId": "{}",
927                        "legendFormat": "{{}}"
928                    }}"#,
929                            q,
930                            (b'A' + j as u8) as char
931                        )
932                    })
933                    .collect();
934
935                let unit_field = panel
936                    .unit
937                    .as_ref()
938                    .map(|u| format!(r#""unit": "{}","#, u))
939                    .unwrap_or_default();
940
941                format!(
942                    r#"{{
943                    "id": {},
944                    "title": "{}",
945                    "type": "{}",
946                    "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
947                    {}
948                    "targets": [{}],
949                    "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
950                }}"#,
951                    i + 1,
952                    panel.title,
953                    match panel.panel_type {
954                        PanelType::Graph => "timeseries",
955                        PanelType::Stat => "stat",
956                        PanelType::Table => "table",
957                        PanelType::Heatmap => "heatmap",
958                        PanelType::BarGauge => "bargauge",
959                    },
960                    panel.grid_pos.0,
961                    panel.grid_pos.1,
962                    panel.grid_pos.2,
963                    panel.grid_pos.3,
964                    unit_field,
965                    queries_json.join(",")
966                )
967            })
968            .collect();
969
970        let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
971
972        format!(
973            r#"{{
974                "title": "{}",
975                "description": "{}",
976                "tags": [{}],
977                "refresh": "{}",
978                "time": {{"from": "{}", "to": "now"}},
979                "templating": {{
980                    "list": [
981                        {{
982                            "name": "datasource",
983                            "type": "datasource",
984                            "query": "prometheus"
985                        }},
986                        {{
987                            "name": "kernel_id",
988                            "type": "query",
989                            "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
990                            "multi": true,
991                            "includeAll": true
992                        }}
993                    ]
994                }},
995                "panels": [{}]
996            }}"#,
997            self.title,
998            self.description,
999            tags_json.join(","),
1000            self.refresh,
1001            self.time_from,
1002            panels_json.join(",")
1003        )
1004    }
1005}
1006
1007// ============================================================================
1008// Observability Context
1009// ============================================================================
1010
1011/// Global observability context for managing spans and metrics.
1012pub struct ObservabilityContext {
1013    /// Active spans.
1014    active_spans: RwLock<HashMap<SpanId, Span>>,
1015    /// Completed spans (for export).
1016    completed_spans: RwLock<Vec<Span>>,
1017    /// Max completed spans to retain.
1018    max_completed: usize,
1019    /// Prometheus exporter.
1020    prometheus: Arc<PrometheusExporter>,
1021}
1022
1023impl ObservabilityContext {
1024    /// Create a new observability context.
1025    pub fn new() -> Arc<Self> {
1026        Arc::new(Self {
1027            active_spans: RwLock::new(HashMap::new()),
1028            completed_spans: RwLock::new(Vec::new()),
1029            max_completed: 10000,
1030            prometheus: PrometheusExporter::new(),
1031        })
1032    }
1033
1034    /// Start a new span.
1035    pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1036        let span = Span::new(name, kind);
1037        self.active_spans.write().insert(span.span_id, span.clone());
1038        span
1039    }
1040
1041    /// Start a child span.
1042    pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1043        let span = parent.child(name, kind);
1044        self.active_spans.write().insert(span.span_id, span.clone());
1045        span
1046    }
1047
1048    /// End a span.
1049    pub fn end_span(&self, mut span: Span) {
1050        span.end();
1051        self.active_spans.write().remove(&span.span_id);
1052
1053        let mut completed = self.completed_spans.write();
1054        completed.push(span);
1055        if completed.len() > self.max_completed {
1056            completed.remove(0);
1057        }
1058    }
1059
1060    /// Get Prometheus exporter.
1061    pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1062        &self.prometheus
1063    }
1064
1065    /// Export completed spans (for sending to trace backends).
1066    pub fn export_spans(&self) -> Vec<Span> {
1067        self.completed_spans.write().drain(..).collect()
1068    }
1069
1070    /// Get active span count.
1071    pub fn active_span_count(&self) -> usize {
1072        self.active_spans.read().len()
1073    }
1074}
1075
1076impl Default for ObservabilityContext {
1077    fn default() -> Self {
1078        Self {
1079            active_spans: RwLock::new(HashMap::new()),
1080            completed_spans: RwLock::new(Vec::new()),
1081            max_completed: 10000,
1082            prometheus: PrometheusExporter::new(),
1083        }
1084    }
1085}
1086
1087// ============================================================================
1088// GPU Profiler Integration Stubs
1089// ============================================================================
1090
1091/// GPU profiler backend type.
1092#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1093pub enum GpuProfilerBackend {
1094    /// NVIDIA Nsight Systems/Compute.
1095    Nsight,
1096    /// RenderDoc (cross-platform).
1097    RenderDoc,
1098    /// PIX for Windows.
1099    Pix,
1100    /// Apple Metal System Trace.
1101    MetalSystemTrace,
1102    /// AMD Radeon GPU Profiler.
1103    Rgp,
1104    /// Custom profiler.
1105    Custom,
1106}
1107
1108/// GPU profiler marker color.
1109#[derive(Debug, Clone, Copy)]
1110pub struct ProfilerColor {
1111    /// Red component (0-255).
1112    pub r: u8,
1113    /// Green component (0-255).
1114    pub g: u8,
1115    /// Blue component (0-255).
1116    pub b: u8,
1117    /// Alpha component (0-255).
1118    pub a: u8,
1119}
1120
1121impl ProfilerColor {
1122    /// Create a new color.
1123    pub const fn new(r: u8, g: u8, b: u8) -> Self {
1124        Self { r, g, b, a: 255 }
1125    }
1126
1127    /// Red color.
1128    pub const RED: Self = Self::new(255, 0, 0);
1129    /// Green color.
1130    pub const GREEN: Self = Self::new(0, 255, 0);
1131    /// Blue color.
1132    pub const BLUE: Self = Self::new(0, 0, 255);
1133    /// Yellow color.
1134    pub const YELLOW: Self = Self::new(255, 255, 0);
1135    /// Cyan color.
1136    pub const CYAN: Self = Self::new(0, 255, 255);
1137    /// Magenta color.
1138    pub const MAGENTA: Self = Self::new(255, 0, 255);
1139    /// Orange color.
1140    pub const ORANGE: Self = Self::new(255, 165, 0);
1141}
1142
1143/// GPU profiler range handle for scoped profiling.
1144pub struct ProfilerRange {
1145    /// Range name.
1146    #[allow(dead_code)]
1147    name: String,
1148    /// Backend being used.
1149    #[allow(dead_code)]
1150    backend: GpuProfilerBackend,
1151    /// Start time.
1152    start: Instant,
1153}
1154
1155impl ProfilerRange {
1156    /// Create a new profiler range (internal use).
1157    fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1158        Self {
1159            name: name.into(),
1160            backend,
1161            start: Instant::now(),
1162        }
1163    }
1164
1165    /// Create a stub profiler range for external profiler implementations.
1166    ///
1167    /// This is used by custom profiler implementations (like CUDA NVTX) that
1168    /// manage their own range lifecycle but need to return a ProfilerRange
1169    /// for API compatibility.
1170    pub fn stub(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1171        Self::new(name, backend)
1172    }
1173
1174    /// Get elapsed duration.
1175    pub fn elapsed(&self) -> Duration {
1176        self.start.elapsed()
1177    }
1178}
1179
1180impl Drop for ProfilerRange {
1181    fn drop(&mut self) {
1182        // In a real implementation, this would call the profiler API to end the range
1183        // e.g., nvtxRangePop() for NVTX
1184    }
1185}
1186
1187/// Trait for GPU profiler integration.
1188///
1189/// Implement this trait to integrate with specific GPU profiling tools.
1190/// The default implementation is a no-op for when no profiler is attached.
1191pub trait GpuProfiler: Send + Sync {
1192    /// Check if the profiler is available and attached.
1193    fn is_available(&self) -> bool {
1194        false
1195    }
1196
1197    /// Get the profiler backend type.
1198    fn backend(&self) -> GpuProfilerBackend;
1199
1200    /// Start a profiler capture session.
1201    fn start_capture(&self) -> Result<(), ProfilerError> {
1202        Ok(())
1203    }
1204
1205    /// End a profiler capture session.
1206    fn end_capture(&self) -> Result<(), ProfilerError> {
1207        Ok(())
1208    }
1209
1210    /// Trigger a frame/dispatch capture.
1211    fn trigger_capture(&self) -> Result<(), ProfilerError> {
1212        Ok(())
1213    }
1214
1215    /// Push a named range onto the profiler stack.
1216    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1217        ProfilerRange::new(name, self.backend())
1218    }
1219
1220    /// Pop the current range from the profiler stack.
1221    fn pop_range(&self) {}
1222
1223    /// Insert an instantaneous marker.
1224    fn mark(&self, _name: &str, _color: ProfilerColor) {}
1225
1226    /// Set a per-thread name for the profiler.
1227    fn set_thread_name(&self, _name: &str) {}
1228
1229    /// Add a message to the profiler output.
1230    fn message(&self, _text: &str) {}
1231
1232    /// Register a GPU memory allocation.
1233    fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1234
1235    /// Unregister a GPU memory allocation.
1236    fn unregister_allocation(&self, _ptr: u64) {}
1237}
1238
1239/// Profiler error type.
1240#[derive(Debug, Clone, thiserror::Error)]
1241pub enum ProfilerError {
1242    /// Profiler is not available.
1243    #[error("GPU profiler not available")]
1244    NotAvailable,
1245    /// Profiler is not attached.
1246    #[error("GPU profiler not attached")]
1247    NotAttached,
1248    /// Capture already in progress.
1249    #[error("Capture already in progress")]
1250    CaptureInProgress,
1251    /// No capture in progress.
1252    #[error("No capture in progress")]
1253    NoCaptureInProgress,
1254    /// Backend-specific error.
1255    #[error("Profiler error: {0}")]
1256    Backend(String),
1257}
1258
1259/// Null profiler implementation (no-op).
1260pub struct NullProfiler;
1261
1262impl GpuProfiler for NullProfiler {
1263    fn backend(&self) -> GpuProfilerBackend {
1264        GpuProfilerBackend::Custom
1265    }
1266}
1267
1268/// NVTX (NVIDIA Tools Extension) profiler stub.
1269///
1270/// When the real NVTX library is available, this integrates with
1271/// Nsight Systems and Nsight Compute.
1272pub struct NvtxProfiler {
1273    /// Whether NVTX is available.
1274    available: bool,
1275    /// Whether a capture is in progress.
1276    capture_in_progress: std::sync::atomic::AtomicBool,
1277}
1278
1279impl NvtxProfiler {
1280    /// Create a new NVTX profiler.
1281    ///
1282    /// In a real implementation, this would check for libnvtx availability.
1283    pub fn new() -> Self {
1284        Self {
1285            available: false, // Would check nvtxInitialize() in real impl
1286            capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1287        }
1288    }
1289
1290    /// Check if NVTX library is loaded.
1291    pub fn is_nvtx_loaded(&self) -> bool {
1292        // In real implementation: check if libnvtx is dynamically loaded
1293        self.available
1294    }
1295}
1296
1297impl Default for NvtxProfiler {
1298    fn default() -> Self {
1299        Self::new()
1300    }
1301}
1302
1303impl GpuProfiler for NvtxProfiler {
1304    fn is_available(&self) -> bool {
1305        self.available
1306    }
1307
1308    fn backend(&self) -> GpuProfilerBackend {
1309        GpuProfilerBackend::Nsight
1310    }
1311
1312    fn start_capture(&self) -> Result<(), ProfilerError> {
1313        if !self.available {
1314            return Err(ProfilerError::NotAvailable);
1315        }
1316        if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1317            return Err(ProfilerError::CaptureInProgress);
1318        }
1319        // Real impl: nvtxRangePushA("Capture")
1320        Ok(())
1321    }
1322
1323    fn end_capture(&self) -> Result<(), ProfilerError> {
1324        if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1325            return Err(ProfilerError::NoCaptureInProgress);
1326        }
1327        // Real impl: nvtxRangePop()
1328        Ok(())
1329    }
1330
1331    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1332        // Real impl: nvtxRangePushA(name) with color attribute
1333        ProfilerRange::new(name, self.backend())
1334    }
1335
1336    fn pop_range(&self) {
1337        // Real impl: nvtxRangePop()
1338    }
1339
1340    fn mark(&self, _name: &str, _color: ProfilerColor) {
1341        // Real impl: nvtxMarkA(name) with color
1342    }
1343
1344    fn set_thread_name(&self, _name: &str) {
1345        // Real impl: nvtxNameOsThread(thread_id, name)
1346    }
1347}
1348
1349/// RenderDoc profiler stub.
1350///
1351/// Integrates with RenderDoc for GPU frame capture and debugging.
1352pub struct RenderDocProfiler {
1353    /// Whether RenderDoc is attached.
1354    attached: bool,
1355}
1356
1357impl RenderDocProfiler {
1358    /// Create a new RenderDoc profiler.
1359    ///
1360    /// In a real implementation, this would use the RenderDoc in-app API.
1361    pub fn new() -> Self {
1362        Self {
1363            attached: false, // Would check RENDERDOC_GetAPI in real impl
1364        }
1365    }
1366
1367    /// Check if RenderDoc is attached to the process.
1368    pub fn is_attached(&self) -> bool {
1369        // Real impl: check RENDERDOC_API_VERSION via GetAPI
1370        self.attached
1371    }
1372
1373    /// Get RenderDoc capture file path.
1374    pub fn get_capture_path(&self) -> Option<String> {
1375        // Real impl: RENDERDOC_GetCapture
1376        None
1377    }
1378
1379    /// Launch RenderDoc UI.
1380    pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1381        if !self.attached {
1382            return Err(ProfilerError::NotAttached);
1383        }
1384        // Real impl: RENDERDOC_LaunchReplayUI
1385        Ok(())
1386    }
1387}
1388
1389impl Default for RenderDocProfiler {
1390    fn default() -> Self {
1391        Self::new()
1392    }
1393}
1394
1395impl GpuProfiler for RenderDocProfiler {
1396    fn is_available(&self) -> bool {
1397        self.attached
1398    }
1399
1400    fn backend(&self) -> GpuProfilerBackend {
1401        GpuProfilerBackend::RenderDoc
1402    }
1403
1404    fn trigger_capture(&self) -> Result<(), ProfilerError> {
1405        if !self.attached {
1406            return Err(ProfilerError::NotAttached);
1407        }
1408        // Real impl: RENDERDOC_TriggerCapture
1409        Ok(())
1410    }
1411
1412    fn start_capture(&self) -> Result<(), ProfilerError> {
1413        if !self.attached {
1414            return Err(ProfilerError::NotAttached);
1415        }
1416        // Real impl: RENDERDOC_StartFrameCapture
1417        Ok(())
1418    }
1419
1420    fn end_capture(&self) -> Result<(), ProfilerError> {
1421        // Real impl: RENDERDOC_EndFrameCapture
1422        Ok(())
1423    }
1424
1425    fn set_thread_name(&self, _name: &str) {
1426        // Real impl: RENDERDOC_SetCaptureOptionStr
1427    }
1428}
1429
1430/// Metal System Trace profiler stub (macOS).
1431///
1432/// Integrates with Xcode Instruments for Metal GPU profiling.
1433#[cfg(target_os = "macos")]
1434pub struct MetalProfiler {
1435    /// Whether Metal profiling is available.
1436    available: bool,
1437}
1438
1439#[cfg(target_os = "macos")]
1440impl MetalProfiler {
1441    /// Create a new Metal profiler.
1442    pub fn new() -> Self {
1443        Self { available: true }
1444    }
1445}
1446
1447#[cfg(target_os = "macos")]
1448impl Default for MetalProfiler {
1449    fn default() -> Self {
1450        Self::new()
1451    }
1452}
1453
1454#[cfg(target_os = "macos")]
1455impl GpuProfiler for MetalProfiler {
1456    fn is_available(&self) -> bool {
1457        self.available
1458    }
1459
1460    fn backend(&self) -> GpuProfilerBackend {
1461        GpuProfilerBackend::MetalSystemTrace
1462    }
1463
1464    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1465        // Real impl: MTLCommandBuffer.pushDebugGroup(name)
1466        ProfilerRange::new(name, self.backend())
1467    }
1468
1469    fn pop_range(&self) {
1470        // Real impl: MTLCommandBuffer.popDebugGroup()
1471    }
1472
1473    fn mark(&self, _name: &str, _color: ProfilerColor) {
1474        // Real impl: MTLCommandBuffer.insertDebugSignpost(name)
1475    }
1476}
1477
1478/// GPU profiler manager for selecting and using profilers.
1479pub struct GpuProfilerManager {
1480    /// Active profiler.
1481    profiler: Arc<dyn GpuProfiler>,
1482    /// Enabled state.
1483    enabled: std::sync::atomic::AtomicBool,
1484}
1485
1486impl GpuProfilerManager {
1487    /// Create a new profiler manager with auto-detection.
1488    pub fn new() -> Self {
1489        // Try to detect available profiler
1490        let nvtx = NvtxProfiler::new();
1491        if nvtx.is_available() {
1492            return Self {
1493                profiler: Arc::new(nvtx),
1494                enabled: std::sync::atomic::AtomicBool::new(true),
1495            };
1496        }
1497
1498        let renderdoc = RenderDocProfiler::new();
1499        if renderdoc.is_available() {
1500            return Self {
1501                profiler: Arc::new(renderdoc),
1502                enabled: std::sync::atomic::AtomicBool::new(true),
1503            };
1504        }
1505
1506        // Fallback to null profiler
1507        Self {
1508            profiler: Arc::new(NullProfiler),
1509            enabled: std::sync::atomic::AtomicBool::new(false),
1510        }
1511    }
1512
1513    /// Create with a specific profiler.
1514    pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1515        let enabled = profiler.is_available();
1516        Self {
1517            profiler,
1518            enabled: std::sync::atomic::AtomicBool::new(enabled),
1519        }
1520    }
1521
1522    /// Check if profiling is enabled.
1523    pub fn is_enabled(&self) -> bool {
1524        self.enabled.load(Ordering::Relaxed)
1525    }
1526
1527    /// Enable or disable profiling.
1528    pub fn set_enabled(&self, enabled: bool) {
1529        self.enabled.store(enabled, Ordering::Relaxed);
1530    }
1531
1532    /// Get the profiler backend.
1533    pub fn backend(&self) -> GpuProfilerBackend {
1534        self.profiler.backend()
1535    }
1536
1537    /// Start a profiled scope.
1538    pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1539        ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1540    }
1541
1542    /// Start a profiled scope with color.
1543    pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1544        ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1545    }
1546
1547    /// Insert a marker.
1548    pub fn mark(&self, name: &str) {
1549        if self.is_enabled() {
1550            self.profiler.mark(name, ProfilerColor::CYAN);
1551        }
1552    }
1553
1554    /// Get access to the underlying profiler.
1555    pub fn profiler(&self) -> &dyn GpuProfiler {
1556        &*self.profiler
1557    }
1558}
1559
1560impl Default for GpuProfilerManager {
1561    fn default() -> Self {
1562        Self::new()
1563    }
1564}
1565
1566/// RAII scope for profiler ranges.
1567pub struct ProfilerScope<'a> {
1568    profiler: &'a dyn GpuProfiler,
1569    enabled: bool,
1570}
1571
1572impl<'a> ProfilerScope<'a> {
1573    fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1574        if enabled {
1575            profiler.push_range(name, ProfilerColor::CYAN);
1576        }
1577        Self { profiler, enabled }
1578    }
1579
1580    fn new_colored(
1581        name: &str,
1582        profiler: &'a dyn GpuProfiler,
1583        enabled: bool,
1584        color: ProfilerColor,
1585    ) -> Self {
1586        if enabled {
1587            profiler.push_range(name, color);
1588        }
1589        Self { profiler, enabled }
1590    }
1591}
1592
1593impl<'a> Drop for ProfilerScope<'a> {
1594    fn drop(&mut self) {
1595        if self.enabled {
1596            self.profiler.pop_range();
1597        }
1598    }
1599}
1600
1601/// Macro for scoped GPU profiling.
1602///
1603/// # Example
1604///
1605/// ```ignore
1606/// use ringkernel_core::gpu_profile;
1607///
1608/// fn compute_kernel() {
1609///     gpu_profile!(profiler, "compute_kernel", {
1610///         // GPU work here
1611///     });
1612/// }
1613/// ```
1614#[macro_export]
1615macro_rules! gpu_profile {
1616    ($profiler:expr, $name:expr) => {
1617        let _scope = $profiler.scope($name);
1618    };
1619    ($profiler:expr, $name:expr, $color:expr) => {
1620        let _scope = $profiler.scope_colored($name, $color);
1621    };
1622}
1623
1624// ============================================================================
1625// GPU Memory Dashboard
1626// ============================================================================
1627
1628/// GPU memory allocation type.
1629#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1630pub enum GpuMemoryType {
1631    /// Device-local memory (fastest, GPU only).
1632    DeviceLocal,
1633    /// Host-visible memory (accessible from CPU).
1634    HostVisible,
1635    /// Host-coherent memory (no explicit flush needed).
1636    HostCoherent,
1637    /// Mapped memory (CPU-GPU shared).
1638    Mapped,
1639    /// Queue buffers for message passing.
1640    QueueBuffer,
1641    /// Control block memory.
1642    ControlBlock,
1643    /// Shared memory (block-local).
1644    SharedMemory,
1645}
1646
1647/// A tracked GPU memory allocation.
1648#[derive(Debug, Clone)]
1649pub struct GpuMemoryAllocation {
1650    /// Unique allocation ID.
1651    pub id: u64,
1652    /// Allocation name/label.
1653    pub name: String,
1654    /// Size in bytes.
1655    pub size: usize,
1656    /// Memory type.
1657    pub memory_type: GpuMemoryType,
1658    /// Device index (for multi-GPU).
1659    pub device_index: u32,
1660    /// Kernel ID (if associated with a kernel).
1661    pub kernel_id: Option<String>,
1662    /// Allocation timestamp.
1663    pub allocated_at: Instant,
1664    /// Whether the allocation is currently in use.
1665    pub in_use: bool,
1666}
1667
1668/// GPU memory pool statistics.
1669#[derive(Debug, Clone, Default)]
1670pub struct GpuMemoryPoolStats {
1671    /// Pool name.
1672    pub name: String,
1673    /// Total capacity in bytes.
1674    pub capacity: usize,
1675    /// Currently allocated bytes.
1676    pub allocated: usize,
1677    /// Peak allocated bytes.
1678    pub peak_allocated: usize,
1679    /// Number of active allocations.
1680    pub allocation_count: u32,
1681    /// Number of allocations since creation.
1682    pub total_allocations: u64,
1683    /// Number of deallocations since creation.
1684    pub total_deallocations: u64,
1685    /// Fragmentation ratio (0.0 = none, 1.0 = fully fragmented).
1686    pub fragmentation: f32,
1687}
1688
1689impl GpuMemoryPoolStats {
1690    /// Get utilization percentage.
1691    pub fn utilization(&self) -> f32 {
1692        if self.capacity == 0 {
1693            0.0
1694        } else {
1695            (self.allocated as f32 / self.capacity as f32) * 100.0
1696        }
1697    }
1698}
1699
1700/// Per-device GPU memory statistics.
1701#[derive(Debug, Clone, Default)]
1702pub struct GpuDeviceMemoryStats {
1703    /// Device index.
1704    pub device_index: u32,
1705    /// Device name.
1706    pub device_name: String,
1707    /// Total device memory in bytes.
1708    pub total_memory: u64,
1709    /// Free device memory in bytes.
1710    pub free_memory: u64,
1711    /// Memory used by RingKernel.
1712    pub ringkernel_used: u64,
1713    /// Memory used by other applications.
1714    pub other_used: u64,
1715    /// Memory pool statistics.
1716    pub pools: Vec<GpuMemoryPoolStats>,
1717}
1718
1719impl GpuDeviceMemoryStats {
1720    /// Get used memory in bytes.
1721    pub fn used_memory(&self) -> u64 {
1722        self.total_memory - self.free_memory
1723    }
1724
1725    /// Get utilization percentage.
1726    pub fn utilization(&self) -> f32 {
1727        if self.total_memory == 0 {
1728            0.0
1729        } else {
1730            (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1731        }
1732    }
1733}
1734
1735/// GPU Memory Dashboard for monitoring and visualization.
1736///
1737/// Provides real-time GPU memory tracking with allocation history,
1738/// per-kernel usage, and memory pressure alerts.
1739///
1740/// # Example
1741///
1742/// ```ignore
1743/// use ringkernel_core::observability::GpuMemoryDashboard;
1744///
1745/// let dashboard = GpuMemoryDashboard::new();
1746///
1747/// // Track an allocation
1748/// dashboard.track_allocation(
1749///     1,
1750///     "input_queue",
1751///     65536,
1752///     GpuMemoryType::QueueBuffer,
1753///     0,
1754///     Some("processor_kernel"),
1755/// );
1756///
1757/// // Get current stats
1758/// let stats = dashboard.get_device_stats(0);
1759/// println!("GPU 0 utilization: {:.1}%", stats.utilization());
1760///
1761/// // Generate Grafana panel JSON
1762/// let panel = dashboard.grafana_panel();
1763/// ```
1764pub struct GpuMemoryDashboard {
1765    /// Active allocations.
1766    allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1767    /// Per-device statistics.
1768    device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1769    /// Memory pressure thresholds.
1770    thresholds: GpuMemoryThresholds,
1771    /// Allocation counter for unique IDs.
1772    allocation_counter: AtomicU64,
1773    /// Total bytes allocated.
1774    total_allocated: AtomicU64,
1775    /// Peak bytes allocated.
1776    peak_allocated: AtomicU64,
1777}
1778
1779/// Memory pressure thresholds for alerts.
1780#[derive(Debug, Clone)]
1781pub struct GpuMemoryThresholds {
1782    /// Warning threshold (percentage).
1783    pub warning: f32,
1784    /// Critical threshold (percentage).
1785    pub critical: f32,
1786    /// Maximum allocation size before warning (bytes).
1787    pub max_allocation_size: usize,
1788    /// Maximum number of allocations before warning.
1789    pub max_allocation_count: u32,
1790}
1791
1792impl Default for GpuMemoryThresholds {
1793    fn default() -> Self {
1794        Self {
1795            warning: 75.0,
1796            critical: 90.0,
1797            max_allocation_size: 1024 * 1024 * 1024, // 1 GB
1798            max_allocation_count: 10000,
1799        }
1800    }
1801}
1802
1803/// Memory pressure level.
1804#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1805pub enum MemoryPressureLevel {
1806    /// Memory usage is normal.
1807    Normal,
1808    /// Memory usage is elevated (approaching warning threshold).
1809    Elevated,
1810    /// Memory usage is at warning level.
1811    Warning,
1812    /// Memory usage is critical.
1813    Critical,
1814    /// Out of memory.
1815    OutOfMemory,
1816}
1817
1818impl GpuMemoryDashboard {
1819    /// Create a new GPU memory dashboard.
1820    pub fn new() -> Arc<Self> {
1821        Arc::new(Self {
1822            allocations: RwLock::new(HashMap::new()),
1823            device_stats: RwLock::new(HashMap::new()),
1824            thresholds: GpuMemoryThresholds::default(),
1825            allocation_counter: AtomicU64::new(1),
1826            total_allocated: AtomicU64::new(0),
1827            peak_allocated: AtomicU64::new(0),
1828        })
1829    }
1830
1831    /// Create with custom thresholds.
1832    pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1833        Arc::new(Self {
1834            allocations: RwLock::new(HashMap::new()),
1835            device_stats: RwLock::new(HashMap::new()),
1836            thresholds,
1837            allocation_counter: AtomicU64::new(1),
1838            total_allocated: AtomicU64::new(0),
1839            peak_allocated: AtomicU64::new(0),
1840        })
1841    }
1842
1843    /// Track a new GPU memory allocation.
1844    pub fn track_allocation(
1845        &self,
1846        id: u64,
1847        name: impl Into<String>,
1848        size: usize,
1849        memory_type: GpuMemoryType,
1850        device_index: u32,
1851        kernel_id: Option<&str>,
1852    ) {
1853        let allocation = GpuMemoryAllocation {
1854            id,
1855            name: name.into(),
1856            size,
1857            memory_type,
1858            device_index,
1859            kernel_id: kernel_id.map(String::from),
1860            allocated_at: Instant::now(),
1861            in_use: true,
1862        };
1863
1864        self.allocations.write().insert(id, allocation);
1865
1866        // Update totals
1867        let new_total = self
1868            .total_allocated
1869            .fetch_add(size as u64, Ordering::Relaxed)
1870            + size as u64;
1871        let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1872        while new_total > peak {
1873            match self.peak_allocated.compare_exchange_weak(
1874                peak,
1875                new_total,
1876                Ordering::Relaxed,
1877                Ordering::Relaxed,
1878            ) {
1879                Ok(_) => break,
1880                Err(current) => peak = current,
1881            }
1882        }
1883    }
1884
1885    /// Generate a new unique allocation ID.
1886    pub fn next_allocation_id(&self) -> u64 {
1887        self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1888    }
1889
1890    /// Track deallocation.
1891    pub fn track_deallocation(&self, id: u64) {
1892        let mut allocations = self.allocations.write();
1893        if let Some(alloc) = allocations.remove(&id) {
1894            self.total_allocated
1895                .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1896        }
1897    }
1898
1899    /// Mark an allocation as no longer in use (but not freed).
1900    pub fn mark_unused(&self, id: u64) {
1901        let mut allocations = self.allocations.write();
1902        if let Some(alloc) = allocations.get_mut(&id) {
1903            alloc.in_use = false;
1904        }
1905    }
1906
1907    /// Register a GPU device.
1908    pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1909        let stats = GpuDeviceMemoryStats {
1910            device_index,
1911            device_name: name.into(),
1912            total_memory,
1913            free_memory: total_memory,
1914            ringkernel_used: 0,
1915            other_used: 0,
1916            pools: Vec::new(),
1917        };
1918        self.device_stats.write().insert(device_index, stats);
1919    }
1920
1921    /// Update device memory statistics.
1922    pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1923        let mut stats = self.device_stats.write();
1924        if let Some(device) = stats.get_mut(&device_index) {
1925            device.free_memory = free_memory;
1926            device.ringkernel_used = ringkernel_used;
1927            device.other_used = device
1928                .total_memory
1929                .saturating_sub(free_memory + ringkernel_used);
1930        }
1931    }
1932
1933    /// Get device statistics.
1934    pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1935        self.device_stats.read().get(&device_index).cloned()
1936    }
1937
1938    /// Get all device statistics.
1939    pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1940        self.device_stats.read().values().cloned().collect()
1941    }
1942
1943    /// Get all active allocations.
1944    pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1945        self.allocations.read().values().cloned().collect()
1946    }
1947
1948    /// Get allocations for a specific kernel.
1949    pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1950        self.allocations
1951            .read()
1952            .values()
1953            .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1954            .cloned()
1955            .collect()
1956    }
1957
1958    /// Get total allocated memory.
1959    pub fn total_allocated(&self) -> u64 {
1960        self.total_allocated.load(Ordering::Relaxed)
1961    }
1962
1963    /// Get peak allocated memory.
1964    pub fn peak_allocated(&self) -> u64 {
1965        self.peak_allocated.load(Ordering::Relaxed)
1966    }
1967
1968    /// Get allocation count.
1969    pub fn allocation_count(&self) -> usize {
1970        self.allocations.read().len()
1971    }
1972
1973    /// Check memory pressure level for a device.
1974    pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1975        let stats = self.device_stats.read();
1976        if let Some(device) = stats.get(&device_index) {
1977            let utilization = device.utilization();
1978            if device.free_memory == 0 {
1979                MemoryPressureLevel::OutOfMemory
1980            } else if utilization >= self.thresholds.critical {
1981                MemoryPressureLevel::Critical
1982            } else if utilization >= self.thresholds.warning {
1983                MemoryPressureLevel::Warning
1984            } else if utilization >= self.thresholds.warning * 0.8 {
1985                MemoryPressureLevel::Elevated
1986            } else {
1987                MemoryPressureLevel::Normal
1988            }
1989        } else {
1990            MemoryPressureLevel::Normal
1991        }
1992    }
1993
1994    /// Generate Grafana dashboard panel for GPU memory.
1995    pub fn grafana_panel(&self) -> GrafanaPanel {
1996        GrafanaPanel {
1997            title: "GPU Memory Usage".to_string(),
1998            panel_type: PanelType::BarGauge,
1999            queries: vec![
2000                "ringkernel_gpu_memory_allocated_bytes".to_string(),
2001                "ringkernel_gpu_memory_peak_bytes".to_string(),
2002            ],
2003            grid_pos: (0, 0, 12, 8),
2004            unit: Some("bytes".to_string()),
2005        }
2006    }
2007
2008    /// Generate Prometheus metrics for GPU memory.
2009    pub fn prometheus_metrics(&self) -> String {
2010        let mut output = String::new();
2011
2012        // Total allocated
2013        writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").unwrap();
2014        writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge").unwrap();
2015        writeln!(
2016            output,
2017            "ringkernel_gpu_memory_allocated_bytes {}",
2018            self.total_allocated()
2019        )
2020        .unwrap();
2021
2022        // Peak allocated
2023        writeln!(
2024            output,
2025            "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2026        )
2027        .unwrap();
2028        writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge").unwrap();
2029        writeln!(
2030            output,
2031            "ringkernel_gpu_memory_peak_bytes {}",
2032            self.peak_allocated()
2033        )
2034        .unwrap();
2035
2036        // Allocation count
2037        writeln!(
2038            output,
2039            "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2040        )
2041        .unwrap();
2042        writeln!(
2043            output,
2044            "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2045        )
2046        .unwrap();
2047        writeln!(
2048            output,
2049            "ringkernel_gpu_memory_allocation_count {}",
2050            self.allocation_count()
2051        )
2052        .unwrap();
2053
2054        // Per-device stats
2055        let device_stats = self.device_stats.read();
2056        for device in device_stats.values() {
2057            writeln!(
2058                output,
2059                "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2060                device.device_name, device.total_memory
2061            )
2062            .unwrap();
2063            writeln!(
2064                output,
2065                "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2066                device.device_name, device.free_memory
2067            )
2068            .unwrap();
2069            writeln!(
2070                output,
2071                "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2072                device.device_name,
2073                device.used_memory()
2074            )
2075            .unwrap();
2076            writeln!(
2077                output,
2078                "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2079                device.device_name,
2080                device.utilization()
2081            )
2082            .unwrap();
2083        }
2084
2085        output
2086    }
2087
2088    /// Generate a memory summary report.
2089    pub fn summary_report(&self) -> String {
2090        let mut report = String::new();
2091
2092        writeln!(report, "=== GPU Memory Dashboard ===").unwrap();
2093        writeln!(report, "Total Allocated: {} bytes", self.total_allocated()).unwrap();
2094        writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated()).unwrap();
2095        writeln!(report, "Active Allocations: {}", self.allocation_count()).unwrap();
2096        writeln!(report).unwrap();
2097
2098        // Device summary
2099        let device_stats = self.device_stats.read();
2100        for device in device_stats.values() {
2101            writeln!(
2102                report,
2103                "--- Device {} ({}) ---",
2104                device.device_index, device.device_name
2105            )
2106            .unwrap();
2107            writeln!(
2108                report,
2109                "  Total: {} MB",
2110                device.total_memory / (1024 * 1024)
2111            )
2112            .unwrap();
2113            writeln!(report, "  Free: {} MB", device.free_memory / (1024 * 1024)).unwrap();
2114            writeln!(
2115                report,
2116                "  RingKernel: {} MB",
2117                device.ringkernel_used / (1024 * 1024)
2118            )
2119            .unwrap();
2120            writeln!(report, "  Utilization: {:.1}%", device.utilization()).unwrap();
2121            writeln!(
2122                report,
2123                "  Pressure: {:?}",
2124                self.check_pressure(device.device_index)
2125            )
2126            .unwrap();
2127        }
2128
2129        // Top allocations by size
2130        let allocations = self.allocations.read();
2131        let mut sorted_allocs: Vec<_> = allocations.values().collect();
2132        sorted_allocs.sort_by_key(|a| std::cmp::Reverse(a.size));
2133
2134        if !sorted_allocs.is_empty() {
2135            writeln!(report).unwrap();
2136            writeln!(report, "--- Top 10 Allocations ---").unwrap();
2137            for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2138                writeln!(
2139                    report,
2140                    "  {}. {} - {} bytes ({:?})",
2141                    i + 1,
2142                    alloc.name,
2143                    alloc.size,
2144                    alloc.memory_type
2145                )
2146                .unwrap();
2147            }
2148        }
2149
2150        report
2151    }
2152}
2153
2154impl Default for GpuMemoryDashboard {
2155    fn default() -> Self {
2156        Self {
2157            allocations: RwLock::new(HashMap::new()),
2158            device_stats: RwLock::new(HashMap::new()),
2159            thresholds: GpuMemoryThresholds::default(),
2160            allocation_counter: AtomicU64::new(1),
2161            total_allocated: AtomicU64::new(0),
2162            peak_allocated: AtomicU64::new(0),
2163        }
2164    }
2165}
2166
2167// ============================================================================
2168// OTLP (OpenTelemetry Protocol) Exporter
2169// ============================================================================
2170
2171/// OTLP transport protocol.
2172#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2173pub enum OtlpTransport {
2174    /// HTTP with JSON encoding (default, no extra dependencies).
2175    #[default]
2176    HttpJson,
2177    /// HTTP with Protobuf encoding (requires protobuf support).
2178    HttpProtobuf,
2179    /// gRPC transport (requires tonic).
2180    Grpc,
2181}
2182
2183/// Configuration for OTLP exporter.
2184#[derive(Debug, Clone)]
2185pub struct OtlpConfig {
2186    /// OTLP endpoint URL (e.g., "http://localhost:4318/v1/traces").
2187    pub endpoint: String,
2188    /// Transport protocol.
2189    pub transport: OtlpTransport,
2190    /// Service name for resource attributes.
2191    pub service_name: String,
2192    /// Service version.
2193    pub service_version: String,
2194    /// Service instance ID.
2195    pub service_instance_id: Option<String>,
2196    /// Additional resource attributes.
2197    pub resource_attributes: Vec<(String, String)>,
2198    /// Export batch size.
2199    pub batch_size: usize,
2200    /// Export interval.
2201    pub export_interval: Duration,
2202    /// Request timeout.
2203    pub timeout: Duration,
2204    /// Maximum retry attempts.
2205    pub max_retries: u32,
2206    /// Retry delay (base for exponential backoff).
2207    pub retry_delay: Duration,
2208    /// Optional authorization header.
2209    pub authorization: Option<String>,
2210}
2211
2212impl Default for OtlpConfig {
2213    fn default() -> Self {
2214        Self {
2215            endpoint: "http://localhost:4318/v1/traces".to_string(),
2216            transport: OtlpTransport::HttpJson,
2217            service_name: "ringkernel".to_string(),
2218            service_version: env!("CARGO_PKG_VERSION").to_string(),
2219            service_instance_id: None,
2220            resource_attributes: Vec::new(),
2221            batch_size: 512,
2222            export_interval: Duration::from_secs(5),
2223            timeout: Duration::from_secs(30),
2224            max_retries: 3,
2225            retry_delay: Duration::from_millis(100),
2226            authorization: None,
2227        }
2228    }
2229}
2230
2231impl OtlpConfig {
2232    /// Create a new OTLP configuration.
2233    pub fn new(endpoint: impl Into<String>) -> Self {
2234        Self {
2235            endpoint: endpoint.into(),
2236            ..Default::default()
2237        }
2238    }
2239
2240    /// Set the service name.
2241    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
2242        self.service_name = name.into();
2243        self
2244    }
2245
2246    /// Set the service version.
2247    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
2248        self.service_version = version.into();
2249        self
2250    }
2251
2252    /// Set the service instance ID.
2253    pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
2254        self.service_instance_id = Some(id.into());
2255        self
2256    }
2257
2258    /// Add a resource attribute.
2259    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2260        self.resource_attributes.push((key.into(), value.into()));
2261        self
2262    }
2263
2264    /// Set the batch size.
2265    pub fn with_batch_size(mut self, size: usize) -> Self {
2266        self.batch_size = size;
2267        self
2268    }
2269
2270    /// Set the export interval.
2271    pub fn with_export_interval(mut self, interval: Duration) -> Self {
2272        self.export_interval = interval;
2273        self
2274    }
2275
2276    /// Set the authorization header.
2277    pub fn with_authorization(mut self, auth: impl Into<String>) -> Self {
2278        self.authorization = Some(auth.into());
2279        self
2280    }
2281
2282    /// Configure for Jaeger OTLP endpoint.
2283    pub fn jaeger(endpoint: impl Into<String>) -> Self {
2284        Self::new(endpoint).with_service_name("ringkernel")
2285    }
2286
2287    /// Configure for Honeycomb.
2288    pub fn honeycomb(api_key: impl Into<String>) -> Self {
2289        Self::new("https://api.honeycomb.io/v1/traces")
2290            .with_authorization(format!("x-honeycomb-team {}", api_key.into()))
2291    }
2292
2293    /// Configure for Grafana Cloud.
2294    pub fn grafana_cloud(instance_id: impl Into<String>, api_key: impl Into<String>) -> Self {
2295        let instance = instance_id.into();
2296        Self::new("https://otlp-gateway-prod-us-central-0.grafana.net/otlp/v1/traces")
2297            .with_authorization(format!("Basic {}", api_key.into()))
2298            .with_attribute("grafana.instance", instance)
2299    }
2300}
2301
2302/// OTLP export result.
2303#[derive(Debug, Clone)]
2304pub struct OtlpExportResult {
2305    /// Number of spans exported.
2306    pub spans_exported: usize,
2307    /// Whether the export succeeded.
2308    pub success: bool,
2309    /// Error message if export failed.
2310    pub error: Option<String>,
2311    /// Export duration.
2312    pub duration: Duration,
2313    /// Number of retry attempts.
2314    pub retry_count: u32,
2315}
2316
2317/// Statistics for the OTLP exporter.
2318#[derive(Debug, Clone, Default)]
2319pub struct OtlpExporterStats {
2320    /// Total spans exported.
2321    pub total_spans_exported: u64,
2322    /// Total export attempts.
2323    pub total_exports: u64,
2324    /// Successful exports.
2325    pub successful_exports: u64,
2326    /// Failed exports.
2327    pub failed_exports: u64,
2328    /// Total retry attempts.
2329    pub total_retries: u64,
2330    /// Spans currently in buffer.
2331    pub buffered_spans: usize,
2332    /// Last export time.
2333    pub last_export: Option<Instant>,
2334    /// Last error message.
2335    pub last_error: Option<String>,
2336}
2337
2338/// OTLP span exporter for sending traces to OTLP-compatible backends.
2339///
2340/// Supports HTTP/JSON transport with automatic batching and retries.
2341pub struct OtlpExporter {
2342    config: OtlpConfig,
2343    buffer: RwLock<Vec<Span>>,
2344    stats: RwLock<OtlpExporterStats>,
2345}
2346
2347impl OtlpExporter {
2348    /// Create a new OTLP exporter with the given configuration.
2349    pub fn new(config: OtlpConfig) -> Self {
2350        Self {
2351            config,
2352            buffer: RwLock::new(Vec::new()),
2353            stats: RwLock::new(OtlpExporterStats::default()),
2354        }
2355    }
2356
2357    /// Create an exporter for a local Jaeger instance.
2358    pub fn jaeger_local() -> Self {
2359        Self::new(OtlpConfig::jaeger("http://localhost:4318/v1/traces"))
2360    }
2361
2362    /// Get the exporter configuration.
2363    pub fn config(&self) -> &OtlpConfig {
2364        &self.config
2365    }
2366
2367    /// Get current statistics.
2368    pub fn stats(&self) -> OtlpExporterStats {
2369        self.stats.read().clone()
2370    }
2371
2372    /// Add a span to the export buffer.
2373    pub fn export_span(&self, span: Span) {
2374        let mut buffer = self.buffer.write();
2375        buffer.push(span);
2376
2377        let should_flush = buffer.len() >= self.config.batch_size;
2378        drop(buffer);
2379
2380        if should_flush {
2381            let _ = self.flush();
2382        }
2383    }
2384
2385    /// Add multiple spans to the export buffer.
2386    pub fn export_spans(&self, spans: Vec<Span>) {
2387        let mut buffer = self.buffer.write();
2388        buffer.extend(spans);
2389
2390        let should_flush = buffer.len() >= self.config.batch_size;
2391        drop(buffer);
2392
2393        if should_flush {
2394            let _ = self.flush();
2395        }
2396    }
2397
2398    /// Get the number of buffered spans.
2399    pub fn buffered_count(&self) -> usize {
2400        self.buffer.read().len()
2401    }
2402
2403    /// Flush all buffered spans to the OTLP endpoint.
2404    pub fn flush(&self) -> OtlpExportResult {
2405        let spans: Vec<Span> = {
2406            let mut buffer = self.buffer.write();
2407            std::mem::take(&mut *buffer)
2408        };
2409
2410        if spans.is_empty() {
2411            return OtlpExportResult {
2412                spans_exported: 0,
2413                success: true,
2414                error: None,
2415                duration: Duration::ZERO,
2416                retry_count: 0,
2417            };
2418        }
2419
2420        let start = Instant::now();
2421        let result = self.send_spans(&spans);
2422        let duration = start.elapsed();
2423
2424        // Update stats
2425        {
2426            let mut stats = self.stats.write();
2427            stats.total_exports += 1;
2428            stats.last_export = Some(Instant::now());
2429
2430            if result.success {
2431                stats.successful_exports += 1;
2432                stats.total_spans_exported += spans.len() as u64;
2433            } else {
2434                stats.failed_exports += 1;
2435                stats.last_error = result.error.clone();
2436                // Put spans back in buffer for retry
2437                let mut buffer = self.buffer.write();
2438                buffer.extend(spans);
2439            }
2440            stats.total_retries += result.retry_count as u64;
2441            stats.buffered_spans = self.buffer.read().len();
2442        }
2443
2444        OtlpExportResult {
2445            spans_exported: if result.success {
2446                result.spans_exported
2447            } else {
2448                0
2449            },
2450            duration,
2451            ..result
2452        }
2453    }
2454
2455    /// Send spans to the OTLP endpoint.
2456    fn send_spans(&self, spans: &[Span]) -> OtlpExportResult {
2457        // Without the alerting feature (reqwest), we can only buffer spans
2458        #[cfg(not(feature = "alerting"))]
2459        {
2460            eprintln!(
2461                "[OTLP stub] Would export {} spans to {} (enable 'alerting' feature for HTTP export)",
2462                spans.len(),
2463                self.config.endpoint
2464            );
2465            OtlpExportResult {
2466                spans_exported: spans.len(),
2467                success: true,
2468                error: None,
2469                duration: Duration::ZERO,
2470                retry_count: 0,
2471            }
2472        }
2473
2474        #[cfg(feature = "alerting")]
2475        {
2476            self.send_spans_http(spans)
2477        }
2478    }
2479
2480    /// Send spans via HTTP (requires alerting feature).
2481    #[cfg(feature = "alerting")]
2482    fn send_spans_http(&self, spans: &[Span]) -> OtlpExportResult {
2483        let payload = self.build_otlp_json(spans);
2484
2485        let client = reqwest::blocking::Client::builder()
2486            .timeout(self.config.timeout)
2487            .build();
2488
2489        let client = match client {
2490            Ok(c) => c,
2491            Err(e) => {
2492                return OtlpExportResult {
2493                    spans_exported: 0,
2494                    success: false,
2495                    error: Some(format!("Failed to create HTTP client: {}", e)),
2496                    duration: Duration::ZERO,
2497                    retry_count: 0,
2498                };
2499            }
2500        };
2501
2502        let mut retry_count = 0;
2503        let mut last_error = None;
2504
2505        for attempt in 0..=self.config.max_retries {
2506            let mut request = client
2507                .post(&self.config.endpoint)
2508                .header("Content-Type", "application/json")
2509                .body(payload.clone());
2510
2511            if let Some(auth) = &self.config.authorization {
2512                request = request.header("Authorization", auth);
2513            }
2514
2515            match request.send() {
2516                Ok(response) => {
2517                    if response.status().is_success() {
2518                        return OtlpExportResult {
2519                            spans_exported: spans.len(),
2520                            success: true,
2521                            error: None,
2522                            duration: Duration::ZERO,
2523                            retry_count,
2524                        };
2525                    } else {
2526                        last_error = Some(format!(
2527                            "HTTP {}: {}",
2528                            response.status(),
2529                            response.status().as_str()
2530                        ));
2531                    }
2532                }
2533                Err(e) => {
2534                    last_error = Some(format!("Request failed: {}", e));
2535                }
2536            }
2537
2538            if attempt < self.config.max_retries {
2539                retry_count += 1;
2540                std::thread::sleep(self.config.retry_delay * (1 << attempt));
2541            }
2542        }
2543
2544        OtlpExportResult {
2545            spans_exported: 0,
2546            success: false,
2547            error: last_error,
2548            duration: Duration::ZERO,
2549            retry_count,
2550        }
2551    }
2552
2553    /// Build OTLP JSON payload.
2554    #[cfg(feature = "alerting")]
2555    fn build_otlp_json(&self, spans: &[Span]) -> String {
2556        use std::fmt::Write;
2557
2558        let mut json = String::with_capacity(4096);
2559
2560        // Resource spans structure
2561        json.push_str(r#"{"resourceSpans":[{"resource":{"attributes":["#);
2562
2563        // Service name
2564        let _ = write!(
2565            json,
2566            r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}}"#,
2567            escape_json_str(&self.config.service_name)
2568        );
2569
2570        // Service version
2571        let _ = write!(
2572            json,
2573            r#",{{"key":"service.version","value":{{"stringValue":"{}"}}}}"#,
2574            escape_json_str(&self.config.service_version)
2575        );
2576
2577        // Instance ID
2578        if let Some(instance_id) = &self.config.service_instance_id {
2579            let _ = write!(
2580                json,
2581                r#",{{"key":"service.instance.id","value":{{"stringValue":"{}"}}}}"#,
2582                escape_json_str(instance_id)
2583            );
2584        }
2585
2586        // Additional attributes
2587        for (key, value) in &self.config.resource_attributes {
2588            let _ = write!(
2589                json,
2590                r#",{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
2591                escape_json_str(key),
2592                escape_json_str(value)
2593            );
2594        }
2595
2596        json.push_str(r#"]},"scopeSpans":[{"scope":{"name":"ringkernel"},"spans":["#);
2597
2598        // Add spans
2599        let mut first = true;
2600        for span in spans {
2601            if !first {
2602                json.push(',');
2603            }
2604            first = false;
2605            self.span_to_json(&mut json, span);
2606        }
2607
2608        json.push_str("]}]}]}");
2609        json
2610    }
2611
2612    /// Convert a span to OTLP JSON format.
2613    #[cfg(feature = "alerting")]
2614    fn span_to_json(&self, json: &mut String, span: &Span) {
2615        use std::fmt::Write;
2616
2617        let _ = write!(
2618            json,
2619            r#"{{"traceId":"{}","spanId":"{}""#,
2620            span.trace_id.to_hex(),
2621            span.span_id.to_hex()
2622        );
2623
2624        if let Some(parent) = span.parent_span_id {
2625            let _ = write!(json, r#","parentSpanId":"{}""#, parent.to_hex());
2626        }
2627
2628        let _ = write!(
2629            json,
2630            r#","name":"{}","kind":{}"#,
2631            escape_json_str(&span.name),
2632            match span.kind {
2633                SpanKind::Internal => 1,
2634                SpanKind::Server => 2,
2635                SpanKind::Client => 3,
2636                SpanKind::Producer => 4,
2637                SpanKind::Consumer => 5,
2638            }
2639        );
2640
2641        // Convert timestamps to nanoseconds since epoch
2642        let start_nanos = span.start_time.elapsed().as_nanos();
2643        let end_nanos = span
2644            .end_time
2645            .map(|t| t.elapsed().as_nanos())
2646            .unwrap_or(start_nanos);
2647
2648        // Note: These are approximate since we use Instant, not SystemTime
2649        let _ = write!(
2650            json,
2651            r#","startTimeUnixNano":"{}","endTimeUnixNano":"{}""#,
2652            start_nanos, end_nanos
2653        );
2654
2655        // Status
2656        let _ = write!(
2657            json,
2658            r#","status":{{"code":{}}}"#,
2659            match &span.status {
2660                SpanStatus::Unset => 0,
2661                SpanStatus::Ok => 1,
2662                SpanStatus::Error { .. } => 2,
2663            }
2664        );
2665
2666        // Attributes
2667        if !span.attributes.is_empty() {
2668            json.push_str(r#","attributes":["#);
2669            let mut first = true;
2670            for (key, value) in &span.attributes {
2671                if !first {
2672                    json.push(',');
2673                }
2674                first = false;
2675                let _ = write!(
2676                    json,
2677                    r#"{{"key":"{}","value":{}}}"#,
2678                    escape_json_str(key),
2679                    attribute_value_to_json(value)
2680                );
2681            }
2682            json.push(']');
2683        }
2684
2685        // Events
2686        if !span.events.is_empty() {
2687            json.push_str(r#","events":["#);
2688            let mut first = true;
2689            for event in &span.events {
2690                if !first {
2691                    json.push(',');
2692                }
2693                first = false;
2694                let _ = write!(
2695                    json,
2696                    r#"{{"name":"{}","timeUnixNano":"{}"}}"#,
2697                    escape_json_str(&event.name),
2698                    event.timestamp.elapsed().as_nanos()
2699                );
2700            }
2701            json.push(']');
2702        }
2703
2704        json.push('}');
2705    }
2706}
2707
2708/// Helper to escape JSON strings.
2709#[cfg(feature = "alerting")]
2710fn escape_json_str(s: &str) -> String {
2711    s.replace('\\', "\\\\")
2712        .replace('"', "\\\"")
2713        .replace('\n', "\\n")
2714        .replace('\r', "\\r")
2715        .replace('\t', "\\t")
2716}
2717
2718/// Convert AttributeValue to OTLP JSON format.
2719#[cfg(feature = "alerting")]
2720fn attribute_value_to_json(value: &AttributeValue) -> String {
2721    match value {
2722        AttributeValue::String(s) => format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)),
2723        AttributeValue::Int(i) => format!(r#"{{"intValue":"{}"}}"#, i),
2724        AttributeValue::Float(f) => format!(r#"{{"doubleValue":{}}}"#, f),
2725        AttributeValue::Bool(b) => format!(r#"{{"boolValue":{}}}"#, b),
2726        AttributeValue::StringArray(arr) => {
2727            let values: Vec<String> = arr
2728                .iter()
2729                .map(|s| format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)))
2730                .collect();
2731            format!(r#"{{"arrayValue":{{"values":[{}]}}}}"#, values.join(","))
2732        }
2733    }
2734}
2735
2736#[cfg(test)]
2737mod tests {
2738    use super::*;
2739    use crate::runtime::KernelId;
2740
2741    #[test]
2742    fn test_trace_id_generation() {
2743        let id1 = TraceId::new();
2744        let id2 = TraceId::new();
2745        assert_ne!(id1.0, id2.0);
2746    }
2747
2748    #[test]
2749    fn test_trace_id_hex() {
2750        let id = TraceId(0x123456789abcdef0123456789abcdef0);
2751        let hex = id.to_hex();
2752        assert_eq!(hex.len(), 32);
2753        let parsed = TraceId::from_hex(&hex).unwrap();
2754        assert_eq!(id, parsed);
2755    }
2756
2757    #[test]
2758    fn test_span_creation() {
2759        let span = Span::new("test_operation", SpanKind::Internal);
2760        assert!(!span.is_ended());
2761        assert_eq!(span.name, "test_operation");
2762    }
2763
2764    #[test]
2765    fn test_span_child() {
2766        let parent = Span::new("parent", SpanKind::Server);
2767        let child = parent.child("child", SpanKind::Internal);
2768
2769        assert_eq!(child.trace_id, parent.trace_id);
2770        assert_eq!(child.parent_span_id, Some(parent.span_id));
2771    }
2772
2773    #[test]
2774    fn test_span_attributes() {
2775        let mut span = Span::new("test", SpanKind::Internal);
2776        span.set_attribute("string_key", "value");
2777        span.set_attribute("int_key", 42i64);
2778        span.set_attribute("bool_key", true);
2779
2780        assert_eq!(span.attributes.len(), 3);
2781    }
2782
2783    #[test]
2784    fn test_span_events() {
2785        let mut span = Span::new("test", SpanKind::Internal);
2786        span.add_event("event1");
2787        span.add_event("event2");
2788
2789        assert_eq!(span.events.len(), 2);
2790    }
2791
2792    #[test]
2793    fn test_span_builder() {
2794        let parent = Span::new("parent", SpanKind::Server);
2795        let span = SpanBuilder::new("child")
2796            .kind(SpanKind::Client)
2797            .parent(&parent)
2798            .attribute("key", "value")
2799            .build();
2800
2801        assert_eq!(span.trace_id, parent.trace_id);
2802        assert_eq!(span.kind, SpanKind::Client);
2803        assert!(span.attributes.contains_key("key"));
2804    }
2805
2806    #[test]
2807    fn test_prometheus_exporter() {
2808        let exporter = PrometheusExporter::new();
2809        exporter.register_counter("test_counter", "A test counter", &["label1"]);
2810        exporter.register_gauge("test_gauge", "A test gauge", &[]);
2811
2812        exporter.inc_counter("test_counter", &["value1"]);
2813        exporter.inc_counter("test_counter", &["value1"]);
2814        exporter.set_metric("test_gauge", 42.0, &[]);
2815
2816        let output = exporter.render();
2817        assert!(output.contains("test_counter"));
2818        assert!(output.contains("test_gauge"));
2819    }
2820
2821    #[test]
2822    fn test_grafana_dashboard() {
2823        let dashboard = GrafanaDashboard::new("Test Dashboard")
2824            .description("A test dashboard")
2825            .add_throughput_panel()
2826            .add_latency_panel()
2827            .build();
2828
2829        assert!(dashboard.contains("Test Dashboard"));
2830        assert!(dashboard.contains("Message Throughput"));
2831        assert!(dashboard.contains("Message Latency"));
2832    }
2833
2834    #[test]
2835    fn test_observability_context() {
2836        let ctx = ObservabilityContext::new();
2837
2838        let span = ctx.start_span("test", SpanKind::Internal);
2839        assert_eq!(ctx.active_span_count(), 1);
2840
2841        ctx.end_span(span);
2842        assert_eq!(ctx.active_span_count(), 0);
2843
2844        let exported = ctx.export_spans();
2845        assert_eq!(exported.len(), 1);
2846    }
2847
2848    #[test]
2849    fn test_ringkernel_collector() {
2850        let collector = Arc::new(MetricsCollector::new());
2851        let kernel_id = KernelId::new("test");
2852
2853        collector.record_message_processed(&kernel_id, 100);
2854        collector.record_message_processed(&kernel_id, 200);
2855
2856        let prom_collector = RingKernelCollector::new(collector);
2857        let defs = prom_collector.definitions();
2858        let samples = prom_collector.collect();
2859
2860        assert!(!defs.is_empty());
2861        assert!(!samples.is_empty());
2862    }
2863
2864    // GPU Profiler tests
2865
2866    #[test]
2867    fn test_profiler_color() {
2868        let color = ProfilerColor::new(128, 64, 32);
2869        assert_eq!(color.r, 128);
2870        assert_eq!(color.g, 64);
2871        assert_eq!(color.b, 32);
2872        assert_eq!(color.a, 255);
2873
2874        assert_eq!(ProfilerColor::RED.r, 255);
2875        assert_eq!(ProfilerColor::GREEN.g, 255);
2876        assert_eq!(ProfilerColor::BLUE.b, 255);
2877    }
2878
2879    #[test]
2880    fn test_null_profiler() {
2881        let profiler = NullProfiler;
2882        assert!(!profiler.is_available());
2883        assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2884
2885        // All operations should be no-ops
2886        assert!(profiler.start_capture().is_ok());
2887        assert!(profiler.end_capture().is_ok());
2888        assert!(profiler.trigger_capture().is_ok());
2889
2890        let range = profiler.push_range("test", ProfilerColor::RED);
2891        let _elapsed = range.elapsed(); // Just verify it doesn't panic
2892        profiler.pop_range();
2893        profiler.mark("marker", ProfilerColor::BLUE);
2894        profiler.set_thread_name("thread");
2895    }
2896
2897    #[test]
2898    fn test_nvtx_profiler_stub() {
2899        let profiler = NvtxProfiler::new();
2900        assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2901
2902        // Not available by default (stub)
2903        assert!(!profiler.is_available());
2904        assert!(!profiler.is_nvtx_loaded());
2905
2906        // Start capture should fail when not available
2907        assert!(matches!(
2908            profiler.start_capture(),
2909            Err(ProfilerError::NotAvailable)
2910        ));
2911    }
2912
2913    #[test]
2914    fn test_renderdoc_profiler_stub() {
2915        let profiler = RenderDocProfiler::new();
2916        assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2917
2918        // Not attached by default (stub)
2919        assert!(!profiler.is_available());
2920        assert!(!profiler.is_attached());
2921        assert!(profiler.get_capture_path().is_none());
2922
2923        // Launch UI should fail when not attached
2924        assert!(matches!(
2925            profiler.launch_ui(),
2926            Err(ProfilerError::NotAttached)
2927        ));
2928    }
2929
2930    #[test]
2931    fn test_gpu_profiler_manager() {
2932        let manager = GpuProfilerManager::new();
2933
2934        // Default should be null profiler (since stubs report unavailable)
2935        assert!(!manager.is_enabled());
2936        assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2937
2938        // Can enable/disable
2939        manager.set_enabled(true);
2940        assert!(manager.is_enabled());
2941        manager.set_enabled(false);
2942        assert!(!manager.is_enabled());
2943    }
2944
2945    #[test]
2946    fn test_profiler_scope() {
2947        let manager = GpuProfilerManager::new();
2948
2949        // Scopes should work even when profiler is not available
2950        {
2951            let _scope = manager.scope("test_scope");
2952            // Scope automatically pops on drop
2953        }
2954
2955        {
2956            let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2957        }
2958
2959        // Mark should also work
2960        manager.mark("test_marker");
2961    }
2962
2963    #[test]
2964    fn test_profiler_with_custom() {
2965        let custom_profiler = Arc::new(NullProfiler);
2966        let manager = GpuProfilerManager::with_profiler(custom_profiler);
2967
2968        assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2969    }
2970
2971    #[test]
2972    fn test_profiler_range_elapsed() {
2973        let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2974        std::thread::sleep(std::time::Duration::from_millis(10));
2975        let elapsed = range.elapsed();
2976        assert!(elapsed.as_millis() >= 10);
2977    }
2978
2979    #[test]
2980    fn test_profiler_error_display() {
2981        let err = ProfilerError::NotAvailable;
2982        assert!(err.to_string().contains("not available"));
2983
2984        let err = ProfilerError::NotAttached;
2985        assert!(err.to_string().contains("not attached"));
2986
2987        let err = ProfilerError::CaptureInProgress;
2988        assert!(err.to_string().contains("in progress"));
2989
2990        let err = ProfilerError::Backend("test error".to_string());
2991        assert!(err.to_string().contains("test error"));
2992    }
2993
2994    // GPU Memory Dashboard tests
2995
2996    #[test]
2997    fn test_gpu_memory_dashboard_creation() {
2998        let dashboard = GpuMemoryDashboard::new();
2999        assert_eq!(dashboard.total_allocated(), 0);
3000        assert_eq!(dashboard.peak_allocated(), 0);
3001        assert_eq!(dashboard.allocation_count(), 0);
3002    }
3003
3004    #[test]
3005    fn test_gpu_memory_allocation_tracking() {
3006        let dashboard = GpuMemoryDashboard::new();
3007
3008        // Track an allocation
3009        dashboard.track_allocation(
3010            1,
3011            "test_buffer",
3012            65536,
3013            GpuMemoryType::DeviceLocal,
3014            0,
3015            Some("test_kernel"),
3016        );
3017
3018        assert_eq!(dashboard.total_allocated(), 65536);
3019        assert_eq!(dashboard.peak_allocated(), 65536);
3020        assert_eq!(dashboard.allocation_count(), 1);
3021
3022        // Track another allocation
3023        dashboard.track_allocation(
3024            2,
3025            "queue_buffer",
3026            1024,
3027            GpuMemoryType::QueueBuffer,
3028            0,
3029            Some("test_kernel"),
3030        );
3031
3032        assert_eq!(dashboard.total_allocated(), 66560);
3033        assert_eq!(dashboard.peak_allocated(), 66560);
3034        assert_eq!(dashboard.allocation_count(), 2);
3035
3036        // Deallocate first buffer
3037        dashboard.track_deallocation(1);
3038        assert_eq!(dashboard.total_allocated(), 1024);
3039        assert_eq!(dashboard.peak_allocated(), 66560); // Peak should remain
3040        assert_eq!(dashboard.allocation_count(), 1);
3041    }
3042
3043    #[test]
3044    fn test_gpu_memory_device_stats() {
3045        let dashboard = GpuMemoryDashboard::new();
3046
3047        // Register a device
3048        dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); // 24 GB
3049
3050        let stats = dashboard.get_device_stats(0).unwrap();
3051        assert_eq!(stats.device_index, 0);
3052        assert_eq!(stats.device_name, "NVIDIA RTX 4090");
3053        assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
3054        assert_eq!(stats.utilization(), 0.0);
3055
3056        // Update device stats
3057        let used = 8 * 1024 * 1024 * 1024; // 8 GB used
3058        let free = 16 * 1024 * 1024 * 1024; // 16 GB free
3059        dashboard.update_device_stats(0, free, used);
3060
3061        let stats = dashboard.get_device_stats(0).unwrap();
3062        assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
3063    }
3064
3065    #[test]
3066    fn test_gpu_memory_pressure_levels() {
3067        let dashboard = GpuMemoryDashboard::new();
3068
3069        // Register a device with 1 GB
3070        dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
3071
3072        // Normal usage (50%)
3073        dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
3074        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
3075
3076        // Warning level (80%)
3077        dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
3078        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
3079
3080        // Critical level (95%)
3081        dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
3082        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
3083
3084        // OOM
3085        dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
3086        assert_eq!(
3087            dashboard.check_pressure(0),
3088            MemoryPressureLevel::OutOfMemory
3089        );
3090    }
3091
3092    #[test]
3093    fn test_gpu_memory_kernel_allocations() {
3094        let dashboard = GpuMemoryDashboard::new();
3095
3096        // Track allocations for different kernels
3097        dashboard.track_allocation(
3098            1,
3099            "buf1",
3100            1000,
3101            GpuMemoryType::DeviceLocal,
3102            0,
3103            Some("kernel_a"),
3104        );
3105        dashboard.track_allocation(
3106            2,
3107            "buf2",
3108            2000,
3109            GpuMemoryType::DeviceLocal,
3110            0,
3111            Some("kernel_a"),
3112        );
3113        dashboard.track_allocation(
3114            3,
3115            "buf3",
3116            3000,
3117            GpuMemoryType::DeviceLocal,
3118            0,
3119            Some("kernel_b"),
3120        );
3121
3122        let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
3123        assert_eq!(kernel_a_allocs.len(), 2);
3124
3125        let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
3126        assert_eq!(kernel_b_allocs.len(), 1);
3127
3128        let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
3129        assert_eq!(kernel_c_allocs.len(), 0);
3130    }
3131
3132    #[test]
3133    fn test_gpu_memory_prometheus_metrics() {
3134        let dashboard = GpuMemoryDashboard::new();
3135        dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
3136        dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3137
3138        let metrics = dashboard.prometheus_metrics();
3139        assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
3140        assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
3141        assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
3142    }
3143
3144    #[test]
3145    fn test_gpu_memory_summary_report() {
3146        let dashboard = GpuMemoryDashboard::new();
3147        dashboard.track_allocation(
3148            1,
3149            "large_buffer",
3150            1024 * 1024,
3151            GpuMemoryType::DeviceLocal,
3152            0,
3153            None,
3154        );
3155        dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3156
3157        let report = dashboard.summary_report();
3158        assert!(report.contains("GPU Memory Dashboard"));
3159        assert!(report.contains("large_buffer"));
3160    }
3161
3162    #[test]
3163    fn test_gpu_memory_pool_stats() {
3164        let pool_stats = GpuMemoryPoolStats {
3165            name: "default".to_string(),
3166            capacity: 1024 * 1024,
3167            allocated: 512 * 1024,
3168            peak_allocated: 768 * 1024,
3169            allocation_count: 10,
3170            total_allocations: 100,
3171            total_deallocations: 90,
3172            fragmentation: 0.1,
3173        };
3174
3175        assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
3176    }
3177
3178    #[test]
3179    fn test_gpu_memory_types() {
3180        // Ensure all memory types are distinct
3181        let types = [
3182            GpuMemoryType::DeviceLocal,
3183            GpuMemoryType::HostVisible,
3184            GpuMemoryType::HostCoherent,
3185            GpuMemoryType::Mapped,
3186            GpuMemoryType::QueueBuffer,
3187            GpuMemoryType::ControlBlock,
3188            GpuMemoryType::SharedMemory,
3189        ];
3190
3191        for (i, t1) in types.iter().enumerate() {
3192            for (j, t2) in types.iter().enumerate() {
3193                if i != j {
3194                    assert_ne!(t1, t2);
3195                }
3196            }
3197        }
3198    }
3199
3200    #[test]
3201    fn test_gpu_memory_grafana_panel() {
3202        let dashboard = GpuMemoryDashboard::new();
3203        let panel = dashboard.grafana_panel();
3204
3205        assert_eq!(panel.title, "GPU Memory Usage");
3206        assert_eq!(panel.panel_type, PanelType::BarGauge);
3207        assert!(!panel.queries.is_empty());
3208    }
3209
3210    #[test]
3211    fn test_gpu_memory_allocation_id_generation() {
3212        let dashboard = GpuMemoryDashboard::new();
3213
3214        let id1 = dashboard.next_allocation_id();
3215        let id2 = dashboard.next_allocation_id();
3216        let id3 = dashboard.next_allocation_id();
3217
3218        assert_eq!(id1, 1);
3219        assert_eq!(id2, 2);
3220        assert_eq!(id3, 3);
3221    }
3222
3223    // OTLP Exporter tests
3224
3225    #[test]
3226    fn test_otlp_config_default() {
3227        let config = OtlpConfig::default();
3228        assert_eq!(config.endpoint, "http://localhost:4318/v1/traces");
3229        assert_eq!(config.transport, OtlpTransport::HttpJson);
3230        assert_eq!(config.service_name, "ringkernel");
3231        assert_eq!(config.batch_size, 512);
3232    }
3233
3234    #[test]
3235    fn test_otlp_config_builder() {
3236        let config = OtlpConfig::new("http://example.com/v1/traces")
3237            .with_service_name("my-service")
3238            .with_service_version("1.0.0")
3239            .with_instance_id("instance-1")
3240            .with_attribute("env", "production")
3241            .with_batch_size(100);
3242
3243        assert_eq!(config.endpoint, "http://example.com/v1/traces");
3244        assert_eq!(config.service_name, "my-service");
3245        assert_eq!(config.service_version, "1.0.0");
3246        assert_eq!(config.service_instance_id, Some("instance-1".to_string()));
3247        assert_eq!(config.resource_attributes.len(), 1);
3248        assert_eq!(config.batch_size, 100);
3249    }
3250
3251    #[test]
3252    fn test_otlp_config_jaeger() {
3253        let config = OtlpConfig::jaeger("http://jaeger:4318/v1/traces");
3254        assert_eq!(config.endpoint, "http://jaeger:4318/v1/traces");
3255        assert_eq!(config.service_name, "ringkernel");
3256    }
3257
3258    #[test]
3259    fn test_otlp_config_honeycomb() {
3260        let config = OtlpConfig::honeycomb("my-api-key");
3261        assert_eq!(config.endpoint, "https://api.honeycomb.io/v1/traces");
3262        assert_eq!(
3263            config.authorization,
3264            Some("x-honeycomb-team my-api-key".to_string())
3265        );
3266    }
3267
3268    #[test]
3269    fn test_otlp_exporter_creation() {
3270        let exporter = OtlpExporter::new(OtlpConfig::default());
3271        assert_eq!(exporter.buffered_count(), 0);
3272        assert_eq!(exporter.config().service_name, "ringkernel");
3273    }
3274
3275    #[test]
3276    fn test_otlp_exporter_jaeger_local() {
3277        let exporter = OtlpExporter::jaeger_local();
3278        assert_eq!(
3279            exporter.config().endpoint,
3280            "http://localhost:4318/v1/traces"
3281        );
3282    }
3283
3284    #[test]
3285    fn test_otlp_exporter_buffering() {
3286        let config = OtlpConfig::default().with_batch_size(10);
3287        let exporter = OtlpExporter::new(config);
3288
3289        // Create a test span using the constructor
3290        let span = Span::new("test_span", SpanKind::Internal);
3291
3292        // Add spans
3293        for _ in 0..5 {
3294            exporter.export_span(span.clone());
3295        }
3296
3297        assert_eq!(exporter.buffered_count(), 5);
3298    }
3299
3300    #[test]
3301    fn test_otlp_exporter_flush_empty() {
3302        let exporter = OtlpExporter::new(OtlpConfig::default());
3303
3304        let result = exporter.flush();
3305        assert!(result.success);
3306        assert_eq!(result.spans_exported, 0);
3307    }
3308
3309    #[test]
3310    fn test_otlp_exporter_stats() {
3311        let exporter = OtlpExporter::new(OtlpConfig::default());
3312
3313        // Initial stats
3314        let stats = exporter.stats();
3315        assert_eq!(stats.total_exports, 0);
3316        assert_eq!(stats.total_spans_exported, 0);
3317        assert_eq!(stats.buffered_spans, 0);
3318    }
3319
3320    #[test]
3321    fn test_otlp_transport_default() {
3322        let transport = OtlpTransport::default();
3323        assert_eq!(transport, OtlpTransport::HttpJson);
3324    }
3325}