Skip to main content

ringkernel_core/
observability.rs

1//! Observability infrastructure for RingKernel.
2//!
3//! This module provides production-ready observability features:
4//!
5//! - **OpenTelemetry Integration** - Distributed tracing and metrics
6//! - **Prometheus Exporter** - Metrics in Prometheus exposition format
7//! - **Grafana Dashboard** - JSON templates for visualization
8//!
9//! ## Usage
10//!
11//! ```ignore
12//! use ringkernel_core::observability::{PrometheusExporter, GrafanaDashboard};
13//!
14//! // Create Prometheus exporter
15//! let exporter = PrometheusExporter::new();
16//! exporter.register_collector(metrics_collector);
17//!
18//! // Get Prometheus metrics
19//! let metrics = exporter.render();
20//! println!("{}", metrics);
21//!
22//! // Generate Grafana dashboard JSON
23//! let dashboard = GrafanaDashboard::new("RingKernel Metrics")
24//!     .add_kernel_panel()
25//!     .add_latency_panel()
26//!     .add_throughput_panel()
27//!     .build();
28//! ```
29
30use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39// ============================================================================
40// OpenTelemetry-Compatible Span/Trace Types
41// ============================================================================
42
43/// A trace ID compatible with OpenTelemetry W3C Trace Context.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48    /// Generate a new random trace ID.
49    pub fn new() -> Self {
50        use std::hash::{Hash, Hasher};
51        let mut hasher = std::collections::hash_map::DefaultHasher::new();
52        SystemTime::now().hash(&mut hasher);
53        std::thread::current().id().hash(&mut hasher);
54        let high = hasher.finish() as u128;
55        hasher.write_u64(high as u64);
56        let low = hasher.finish() as u128;
57        Self((high << 64) | low)
58    }
59
60    /// Parse from hex string.
61    pub fn from_hex(hex: &str) -> Option<Self> {
62        u128::from_str_radix(hex, 16).ok().map(Self)
63    }
64
65    /// Convert to hex string.
66    pub fn to_hex(&self) -> String {
67        format!("{:032x}", self.0)
68    }
69}
70
71impl Default for TraceId {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// A span ID compatible with OpenTelemetry W3C Trace Context.
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82    /// Generate a new random span ID.
83    pub fn new() -> Self {
84        use std::hash::{Hash, Hasher};
85        let mut hasher = std::collections::hash_map::DefaultHasher::new();
86        SystemTime::now().hash(&mut hasher);
87        std::process::id().hash(&mut hasher);
88        Self(hasher.finish())
89    }
90
91    /// Parse from hex string.
92    pub fn from_hex(hex: &str) -> Option<Self> {
93        u64::from_str_radix(hex, 16).ok().map(Self)
94    }
95
96    /// Convert to hex string.
97    pub fn to_hex(&self) -> String {
98        format!("{:016x}", self.0)
99    }
100}
101
102impl Default for SpanId {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108/// Span kind (OpenTelemetry compatible).
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111    /// Internal operation.
112    Internal,
113    /// Server-side span (receiving request).
114    Server,
115    /// Client-side span (sending request).
116    Client,
117    /// Producer span (async message send).
118    Producer,
119    /// Consumer span (async message receive).
120    Consumer,
121}
122
123/// Span status (OpenTelemetry compatible).
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126    /// Unset status.
127    Unset,
128    /// Operation completed successfully.
129    Ok,
130    /// Operation failed with error message.
131    Error {
132        /// Error message describing what went wrong.
133        message: String,
134    },
135}
136
137/// An OpenTelemetry-compatible span.
138#[derive(Debug, Clone)]
139pub struct Span {
140    /// Trace ID.
141    pub trace_id: TraceId,
142    /// Span ID.
143    pub span_id: SpanId,
144    /// Parent span ID (if any).
145    pub parent_span_id: Option<SpanId>,
146    /// Span name.
147    pub name: String,
148    /// Span kind.
149    pub kind: SpanKind,
150    /// Start time.
151    pub start_time: Instant,
152    /// End time (if completed).
153    pub end_time: Option<Instant>,
154    /// Status.
155    pub status: SpanStatus,
156    /// Attributes (key-value pairs).
157    pub attributes: HashMap<String, AttributeValue>,
158    /// Events recorded during span.
159    pub events: Vec<SpanEvent>,
160}
161
162/// Attribute value types.
163#[derive(Debug, Clone)]
164pub enum AttributeValue {
165    /// String value.
166    String(String),
167    /// Integer value.
168    Int(i64),
169    /// Float value.
170    Float(f64),
171    /// Boolean value.
172    Bool(bool),
173    /// String array.
174    StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178    fn from(s: &str) -> Self {
179        Self::String(s.to_string())
180    }
181}
182
183impl From<String> for AttributeValue {
184    fn from(s: String) -> Self {
185        Self::String(s)
186    }
187}
188
189impl From<i64> for AttributeValue {
190    fn from(i: i64) -> Self {
191        Self::Int(i)
192    }
193}
194
195impl From<f64> for AttributeValue {
196    fn from(f: f64) -> Self {
197        Self::Float(f)
198    }
199}
200
201impl From<bool> for AttributeValue {
202    fn from(b: bool) -> Self {
203        Self::Bool(b)
204    }
205}
206
207/// An event that occurred during a span.
208#[derive(Debug, Clone)]
209pub struct SpanEvent {
210    /// Event name.
211    pub name: String,
212    /// Timestamp.
213    pub timestamp: Instant,
214    /// Event attributes.
215    pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219    /// Create a new span.
220    pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221        Self {
222            trace_id: TraceId::new(),
223            span_id: SpanId::new(),
224            parent_span_id: None,
225            name: name.into(),
226            kind,
227            start_time: Instant::now(),
228            end_time: None,
229            status: SpanStatus::Unset,
230            attributes: HashMap::new(),
231            events: Vec::new(),
232        }
233    }
234
235    /// Create a child span.
236    pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237        Self {
238            trace_id: self.trace_id,
239            span_id: SpanId::new(),
240            parent_span_id: Some(self.span_id),
241            name: name.into(),
242            kind,
243            start_time: Instant::now(),
244            end_time: None,
245            status: SpanStatus::Unset,
246            attributes: HashMap::new(),
247            events: Vec::new(),
248        }
249    }
250
251    /// Set an attribute.
252    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253        self.attributes.insert(key.into(), value.into());
254    }
255
256    /// Add an event.
257    pub fn add_event(&mut self, name: impl Into<String>) {
258        self.events.push(SpanEvent {
259            name: name.into(),
260            timestamp: Instant::now(),
261            attributes: HashMap::new(),
262        });
263    }
264
265    /// Add an event with attributes.
266    pub fn add_event_with_attributes(
267        &mut self,
268        name: impl Into<String>,
269        attributes: HashMap<String, AttributeValue>,
270    ) {
271        self.events.push(SpanEvent {
272            name: name.into(),
273            timestamp: Instant::now(),
274            attributes,
275        });
276    }
277
278    /// Set status to OK.
279    pub fn set_ok(&mut self) {
280        self.status = SpanStatus::Ok;
281    }
282
283    /// Set error status.
284    pub fn set_error(&mut self, message: impl Into<String>) {
285        self.status = SpanStatus::Error {
286            message: message.into(),
287        };
288    }
289
290    /// End the span.
291    pub fn end(&mut self) {
292        self.end_time = Some(Instant::now());
293    }
294
295    /// Get span duration.
296    pub fn duration(&self) -> Duration {
297        self.end_time
298            .unwrap_or_else(Instant::now)
299            .duration_since(self.start_time)
300    }
301
302    /// Check if span is ended.
303    pub fn is_ended(&self) -> bool {
304        self.end_time.is_some()
305    }
306}
307
308// ============================================================================
309// Span Builder
310// ============================================================================
311
312/// Builder for creating spans with fluent API.
313pub struct SpanBuilder {
314    name: String,
315    kind: SpanKind,
316    parent: Option<(TraceId, SpanId)>,
317    attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321    /// Create a new span builder.
322    pub fn new(name: impl Into<String>) -> Self {
323        Self {
324            name: name.into(),
325            kind: SpanKind::Internal,
326            parent: None,
327            attributes: HashMap::new(),
328        }
329    }
330
331    /// Set span kind.
332    pub fn kind(mut self, kind: SpanKind) -> Self {
333        self.kind = kind;
334        self
335    }
336
337    /// Set parent span.
338    pub fn parent(mut self, parent: &Span) -> Self {
339        self.parent = Some((parent.trace_id, parent.span_id));
340        self
341    }
342
343    /// Set attribute.
344    pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345        self.attributes.insert(key.into(), value.into());
346        self
347    }
348
349    /// Build the span.
350    pub fn build(self) -> Span {
351        let mut span = Span::new(self.name, self.kind);
352        if let Some((trace_id, parent_id)) = self.parent {
353            span.trace_id = trace_id;
354            span.parent_span_id = Some(parent_id);
355        }
356        span.attributes = self.attributes;
357        span
358    }
359}
360
361// ============================================================================
362// Prometheus Metrics Exporter
363// ============================================================================
364
365/// Prometheus metric type.
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368    /// Counter (monotonically increasing).
369    Counter,
370    /// Gauge (can go up or down).
371    Gauge,
372    /// Histogram (distribution of values).
373    Histogram,
374    /// Summary (quantiles).
375    Summary,
376}
377
378/// A Prometheus metric definition.
379#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381    /// Metric name.
382    pub name: String,
383    /// Metric type.
384    pub metric_type: MetricType,
385    /// Help text.
386    pub help: String,
387    /// Label names.
388    pub labels: Vec<String>,
389}
390
391/// A single metric sample.
392#[derive(Debug, Clone)]
393pub struct MetricSample {
394    /// Metric name.
395    pub name: String,
396    /// Label values (in order matching definition).
397    pub label_values: Vec<String>,
398    /// Sample value.
399    pub value: f64,
400    /// Timestamp (optional).
401    pub timestamp_ms: Option<u64>,
402}
403
404/// Prometheus metrics exporter.
405pub struct PrometheusExporter {
406    /// Metric definitions.
407    definitions: RwLock<Vec<MetricDefinition>>,
408    /// Registered collectors.
409    collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410    /// Custom metrics (for direct registration).
411    custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412    /// Export timestamp.
413    export_count: AtomicU64,
414}
415
416/// A custom registered metric.
417struct CustomMetric {
418    definition: MetricDefinition,
419    samples: Vec<MetricSample>,
420}
421
422/// Trait for collecting Prometheus metrics.
423pub trait PrometheusCollector: Send + Sync {
424    /// Get metric definitions.
425    fn definitions(&self) -> Vec<MetricDefinition>;
426
427    /// Collect current metric samples.
428    fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432    /// Create a new Prometheus exporter.
433    pub fn new() -> Arc<Self> {
434        Arc::new(Self {
435            definitions: RwLock::new(Vec::new()),
436            collectors: RwLock::new(Vec::new()),
437            custom_metrics: RwLock::new(HashMap::new()),
438            export_count: AtomicU64::new(0),
439        })
440    }
441
442    /// Register a collector.
443    pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444        let defs = collector.definitions();
445        self.definitions.write().extend(defs);
446        self.collectors.write().push(collector);
447    }
448
449    /// Register a counter metric.
450    pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451        let def = MetricDefinition {
452            name: name.to_string(),
453            metric_type: MetricType::Counter,
454            help: help.to_string(),
455            labels: labels.iter().map(|s| s.to_string()).collect(),
456        };
457        self.custom_metrics.write().insert(
458            name.to_string(),
459            CustomMetric {
460                definition: def,
461                samples: Vec::new(),
462            },
463        );
464    }
465
466    /// Register a gauge metric.
467    pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468        let def = MetricDefinition {
469            name: name.to_string(),
470            metric_type: MetricType::Gauge,
471            help: help.to_string(),
472            labels: labels.iter().map(|s| s.to_string()).collect(),
473        };
474        self.custom_metrics.write().insert(
475            name.to_string(),
476            CustomMetric {
477                definition: def,
478                samples: Vec::new(),
479            },
480        );
481    }
482
483    /// Register a histogram metric.
484    pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485        let def = MetricDefinition {
486            name: name.to_string(),
487            metric_type: MetricType::Histogram,
488            help: help.to_string(),
489            labels: labels.iter().map(|s| s.to_string()).collect(),
490        };
491        self.custom_metrics.write().insert(
492            name.to_string(),
493            CustomMetric {
494                definition: def,
495                samples: Vec::new(),
496            },
497        );
498    }
499
500    /// Set a metric value.
501    pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502        let mut metrics = self.custom_metrics.write();
503        if let Some(metric) = metrics.get_mut(name) {
504            let sample = MetricSample {
505                name: name.to_string(),
506                label_values: label_values.iter().map(|s| s.to_string()).collect(),
507                value,
508                timestamp_ms: None,
509            };
510            // Find and replace existing sample with same labels, or add new
511            let existing = metric
512                .samples
513                .iter_mut()
514                .find(|s| s.label_values == sample.label_values);
515            if let Some(existing) = existing {
516                existing.value = value;
517            } else {
518                metric.samples.push(sample);
519            }
520        }
521    }
522
523    /// Increment a counter.
524    pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525        self.add_counter(name, 1.0, label_values);
526    }
527
528    /// Add to a counter.
529    pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530        let mut metrics = self.custom_metrics.write();
531        if let Some(metric) = metrics.get_mut(name) {
532            let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533            let existing = metric
534                .samples
535                .iter_mut()
536                .find(|s| s.label_values == label_vec);
537            if let Some(existing) = existing {
538                existing.value += delta;
539            } else {
540                metric.samples.push(MetricSample {
541                    name: name.to_string(),
542                    label_values: label_vec,
543                    value: delta,
544                    timestamp_ms: None,
545                });
546            }
547        }
548    }
549
550    /// Render metrics in Prometheus exposition format.
551    pub fn render(&self) -> String {
552        self.export_count.fetch_add(1, Ordering::Relaxed);
553
554        let mut output = String::new();
555
556        // Collect from registered collectors
557        let collectors = self.collectors.read();
558        for collector in collectors.iter() {
559            let defs = collector.definitions();
560            let samples = collector.collect();
561
562            for def in &defs {
563                // Write TYPE and HELP
564                writeln!(output, "# HELP {} {}", def.name, def.help)
565                    .expect("write to String is infallible");
566                writeln!(
567                    output,
568                    "# TYPE {} {}",
569                    def.name,
570                    match def.metric_type {
571                        MetricType::Counter => "counter",
572                        MetricType::Gauge => "gauge",
573                        MetricType::Histogram => "histogram",
574                        MetricType::Summary => "summary",
575                    }
576                )
577                .expect("write to String is infallible");
578
579                // Write samples for this metric
580                for sample in samples.iter().filter(|s| s.name == def.name) {
581                    Self::write_sample(&mut output, &def.labels, sample);
582                }
583            }
584        }
585
586        // Collect custom metrics
587        let custom = self.custom_metrics.read();
588        for metric in custom.values() {
589            writeln!(
590                output,
591                "# HELP {} {}",
592                metric.definition.name, metric.definition.help
593            )
594            .expect("write to String is infallible");
595            writeln!(
596                output,
597                "# TYPE {} {}",
598                metric.definition.name,
599                match metric.definition.metric_type {
600                    MetricType::Counter => "counter",
601                    MetricType::Gauge => "gauge",
602                    MetricType::Histogram => "histogram",
603                    MetricType::Summary => "summary",
604                }
605            )
606            .expect("write to String is infallible");
607
608            for sample in &metric.samples {
609                Self::write_sample(&mut output, &metric.definition.labels, sample);
610            }
611        }
612
613        output
614    }
615
616    fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
617        if labels.is_empty() || sample.label_values.is_empty() {
618            writeln!(output, "{} {}", sample.name, sample.value)
619                .expect("write to String is infallible");
620        } else {
621            let label_pairs: Vec<String> = labels
622                .iter()
623                .zip(sample.label_values.iter())
624                .map(|(k, v)| format!("{}=\"{}\"", k, v))
625                .collect();
626            writeln!(
627                output,
628                "{}{{{}}} {}",
629                sample.name,
630                label_pairs.join(","),
631                sample.value
632            )
633            .expect("write to String is infallible");
634        }
635    }
636
637    /// Get export count.
638    pub fn export_count(&self) -> u64 {
639        self.export_count.load(Ordering::Relaxed)
640    }
641}
642
643impl Default for PrometheusExporter {
644    fn default() -> Self {
645        Self {
646            definitions: RwLock::new(Vec::new()),
647            collectors: RwLock::new(Vec::new()),
648            custom_metrics: RwLock::new(HashMap::new()),
649            export_count: AtomicU64::new(0),
650        }
651    }
652}
653
654// ============================================================================
655// RingKernel Prometheus Collector
656// ============================================================================
657
658/// Prometheus collector for RingKernel metrics.
659pub struct RingKernelCollector {
660    /// Metrics collector to read from.
661    collector: Arc<MetricsCollector>,
662}
663
664impl RingKernelCollector {
665    /// Create a new RingKernel collector.
666    pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
667        Arc::new(Self { collector })
668    }
669}
670
671impl PrometheusCollector for RingKernelCollector {
672    fn definitions(&self) -> Vec<MetricDefinition> {
673        vec![
674            MetricDefinition {
675                name: "ringkernel_messages_processed_total".to_string(),
676                metric_type: MetricType::Counter,
677                help: "Total number of messages processed by kernels".to_string(),
678                labels: vec!["kernel_id".to_string()],
679            },
680            MetricDefinition {
681                name: "ringkernel_messages_dropped_total".to_string(),
682                metric_type: MetricType::Counter,
683                help: "Total number of messages dropped by kernels".to_string(),
684                labels: vec!["kernel_id".to_string()],
685            },
686            MetricDefinition {
687                name: "ringkernel_latency_us".to_string(),
688                metric_type: MetricType::Gauge,
689                help: "Current average message latency in microseconds".to_string(),
690                labels: vec!["kernel_id".to_string(), "stat".to_string()],
691            },
692            MetricDefinition {
693                name: "ringkernel_throughput".to_string(),
694                metric_type: MetricType::Gauge,
695                help: "Current message throughput per second".to_string(),
696                labels: vec!["kernel_id".to_string()],
697            },
698        ]
699    }
700
701    fn collect(&self) -> Vec<MetricSample> {
702        let aggregate = self.collector.get_aggregate();
703        let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
704
705        vec![
706            MetricSample {
707                name: "ringkernel_messages_processed_total".to_string(),
708                label_values: vec!["aggregate".to_string()],
709                value: aggregate.messages_processed as f64,
710                timestamp_ms: None,
711            },
712            MetricSample {
713                name: "ringkernel_messages_dropped_total".to_string(),
714                label_values: vec!["aggregate".to_string()],
715                value: aggregate.messages_dropped as f64,
716                timestamp_ms: None,
717            },
718            MetricSample {
719                name: "ringkernel_latency_us".to_string(),
720                label_values: vec!["aggregate".to_string(), "avg".to_string()],
721                value: aggregate.avg_latency_us(),
722                timestamp_ms: None,
723            },
724            MetricSample {
725                name: "ringkernel_latency_us".to_string(),
726                label_values: vec!["aggregate".to_string(), "min".to_string()],
727                value: aggregate.min_latency_us as f64,
728                timestamp_ms: None,
729            },
730            MetricSample {
731                name: "ringkernel_latency_us".to_string(),
732                label_values: vec!["aggregate".to_string(), "max".to_string()],
733                value: aggregate.max_latency_us as f64,
734                timestamp_ms: None,
735            },
736            MetricSample {
737                name: "ringkernel_throughput".to_string(),
738                label_values: vec!["aggregate".to_string()],
739                value: aggregate.messages_processed as f64 / elapsed,
740                timestamp_ms: None,
741            },
742        ]
743    }
744}
745
746// ============================================================================
747// Grafana Dashboard Generator
748// ============================================================================
749
750/// Grafana panel type.
751#[derive(Debug, Clone, Copy, PartialEq, Eq)]
752pub enum PanelType {
753    /// Time series graph.
754    Graph,
755    /// Single stat / gauge.
756    Stat,
757    /// Table.
758    Table,
759    /// Heatmap.
760    Heatmap,
761    /// Bar gauge.
762    BarGauge,
763}
764
765/// A Grafana panel definition.
766#[derive(Debug, Clone)]
767pub struct GrafanaPanel {
768    /// Panel title.
769    pub title: String,
770    /// Panel type.
771    pub panel_type: PanelType,
772    /// PromQL query expressions.
773    pub queries: Vec<String>,
774    /// Grid position.
775    pub grid_pos: (u32, u32, u32, u32), // x, y, w, h
776    /// Unit (for display).
777    pub unit: Option<String>,
778}
779
780/// Grafana dashboard builder.
781pub struct GrafanaDashboard {
782    /// Dashboard title.
783    title: String,
784    /// Dashboard description.
785    description: String,
786    /// Panels.
787    panels: Vec<GrafanaPanel>,
788    /// Refresh interval.
789    refresh: String,
790    /// Time range.
791    time_from: String,
792    /// Tags.
793    tags: Vec<String>,
794}
795
796impl GrafanaDashboard {
797    /// Create a new dashboard builder.
798    pub fn new(title: impl Into<String>) -> Self {
799        Self {
800            title: title.into(),
801            description: String::new(),
802            panels: Vec::new(),
803            refresh: "5s".to_string(),
804            time_from: "now-1h".to_string(),
805            tags: vec!["ringkernel".to_string()],
806        }
807    }
808
809    /// Set description.
810    pub fn description(mut self, desc: impl Into<String>) -> Self {
811        self.description = desc.into();
812        self
813    }
814
815    /// Set refresh interval.
816    pub fn refresh(mut self, interval: impl Into<String>) -> Self {
817        self.refresh = interval.into();
818        self
819    }
820
821    /// Set time range.
822    pub fn time_from(mut self, from: impl Into<String>) -> Self {
823        self.time_from = from.into();
824        self
825    }
826
827    /// Add a tag.
828    pub fn tag(mut self, tag: impl Into<String>) -> Self {
829        self.tags.push(tag.into());
830        self
831    }
832
833    /// Add a custom panel.
834    pub fn panel(mut self, panel: GrafanaPanel) -> Self {
835        self.panels.push(panel);
836        self
837    }
838
839    /// Add kernel throughput panel.
840    pub fn add_throughput_panel(mut self) -> Self {
841        self.panels.push(GrafanaPanel {
842            title: "Message Throughput".to_string(),
843            panel_type: PanelType::Graph,
844            queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
845            grid_pos: (0, 0, 12, 8),
846            unit: Some("msg/s".to_string()),
847        });
848        self
849    }
850
851    /// Add latency panel.
852    pub fn add_latency_panel(mut self) -> Self {
853        self.panels.push(GrafanaPanel {
854            title: "Message Latency".to_string(),
855            panel_type: PanelType::Graph,
856            queries: vec![
857                "ringkernel_latency_us{stat=\"avg\"}".to_string(),
858                "ringkernel_latency_us{stat=\"max\"}".to_string(),
859            ],
860            grid_pos: (12, 0, 12, 8),
861            unit: Some("µs".to_string()),
862        });
863        self
864    }
865
866    /// Add kernel status panel.
867    pub fn add_kernel_status_panel(mut self) -> Self {
868        self.panels.push(GrafanaPanel {
869            title: "Active Kernels".to_string(),
870            panel_type: PanelType::Stat,
871            queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
872            grid_pos: (0, 8, 6, 4),
873            unit: None,
874        });
875        self
876    }
877
878    /// Add drop rate panel.
879    pub fn add_drop_rate_panel(mut self) -> Self {
880        self.panels.push(GrafanaPanel {
881            title: "Message Drop Rate".to_string(),
882            panel_type: PanelType::Graph,
883            queries: vec![
884                "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
885            ],
886            grid_pos: (6, 8, 6, 4),
887            unit: Some("percentunit".to_string()),
888        });
889        self
890    }
891
892    /// Add multi-GPU panel.
893    pub fn add_multi_gpu_panel(mut self) -> Self {
894        self.panels.push(GrafanaPanel {
895            title: "GPU Memory Usage".to_string(),
896            panel_type: PanelType::BarGauge,
897            queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
898            grid_pos: (12, 8, 12, 4),
899            unit: Some("bytes".to_string()),
900        });
901        self
902    }
903
904    /// Add all standard panels.
905    pub fn add_standard_panels(self) -> Self {
906        self.add_throughput_panel()
907            .add_latency_panel()
908            .add_kernel_status_panel()
909            .add_drop_rate_panel()
910            .add_multi_gpu_panel()
911    }
912
913    /// Build dashboard JSON.
914    pub fn build(&self) -> String {
915        let panels_json: Vec<String> = self
916            .panels
917            .iter()
918            .enumerate()
919            .map(|(i, panel)| {
920                let queries_json: Vec<String> = panel
921                    .queries
922                    .iter()
923                    .enumerate()
924                    .map(|(j, q)| {
925                        format!(
926                            r#"{{
927                        "expr": "{}",
928                        "refId": "{}",
929                        "legendFormat": "{{}}"
930                    }}"#,
931                            q,
932                            (b'A' + j as u8) as char
933                        )
934                    })
935                    .collect();
936
937                let unit_field = panel
938                    .unit
939                    .as_ref()
940                    .map(|u| format!(r#""unit": "{}","#, u))
941                    .unwrap_or_default();
942
943                format!(
944                    r#"{{
945                    "id": {},
946                    "title": "{}",
947                    "type": "{}",
948                    "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
949                    {}
950                    "targets": [{}],
951                    "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
952                }}"#,
953                    i + 1,
954                    panel.title,
955                    match panel.panel_type {
956                        PanelType::Graph => "timeseries",
957                        PanelType::Stat => "stat",
958                        PanelType::Table => "table",
959                        PanelType::Heatmap => "heatmap",
960                        PanelType::BarGauge => "bargauge",
961                    },
962                    panel.grid_pos.0,
963                    panel.grid_pos.1,
964                    panel.grid_pos.2,
965                    panel.grid_pos.3,
966                    unit_field,
967                    queries_json.join(",")
968                )
969            })
970            .collect();
971
972        let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
973
974        format!(
975            r#"{{
976                "title": "{}",
977                "description": "{}",
978                "tags": [{}],
979                "refresh": "{}",
980                "time": {{"from": "{}", "to": "now"}},
981                "templating": {{
982                    "list": [
983                        {{
984                            "name": "datasource",
985                            "type": "datasource",
986                            "query": "prometheus"
987                        }},
988                        {{
989                            "name": "kernel_id",
990                            "type": "query",
991                            "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
992                            "multi": true,
993                            "includeAll": true
994                        }}
995                    ]
996                }},
997                "panels": [{}]
998            }}"#,
999            self.title,
1000            self.description,
1001            tags_json.join(","),
1002            self.refresh,
1003            self.time_from,
1004            panels_json.join(",")
1005        )
1006    }
1007}
1008
1009// ============================================================================
1010// Observability Context
1011// ============================================================================
1012
1013/// Global observability context for managing spans and metrics.
1014pub struct ObservabilityContext {
1015    /// Active spans.
1016    active_spans: RwLock<HashMap<SpanId, Span>>,
1017    /// Completed spans (for export).
1018    completed_spans: RwLock<Vec<Span>>,
1019    /// Max completed spans to retain.
1020    max_completed: usize,
1021    /// Prometheus exporter.
1022    prometheus: Arc<PrometheusExporter>,
1023}
1024
1025impl ObservabilityContext {
1026    /// Create a new observability context.
1027    pub fn new() -> Arc<Self> {
1028        Arc::new(Self {
1029            active_spans: RwLock::new(HashMap::new()),
1030            completed_spans: RwLock::new(Vec::new()),
1031            max_completed: 10000,
1032            prometheus: PrometheusExporter::new(),
1033        })
1034    }
1035
1036    /// Start a new span.
1037    pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1038        let span = Span::new(name, kind);
1039        self.active_spans.write().insert(span.span_id, span.clone());
1040        span
1041    }
1042
1043    /// Start a child span.
1044    pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1045        let span = parent.child(name, kind);
1046        self.active_spans.write().insert(span.span_id, span.clone());
1047        span
1048    }
1049
1050    /// End a span.
1051    pub fn end_span(&self, mut span: Span) {
1052        span.end();
1053        self.active_spans.write().remove(&span.span_id);
1054
1055        let mut completed = self.completed_spans.write();
1056        completed.push(span);
1057        if completed.len() > self.max_completed {
1058            completed.remove(0);
1059        }
1060    }
1061
1062    /// Get Prometheus exporter.
1063    pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1064        &self.prometheus
1065    }
1066
1067    /// Export completed spans (for sending to trace backends).
1068    pub fn export_spans(&self) -> Vec<Span> {
1069        self.completed_spans.write().drain(..).collect()
1070    }
1071
1072    /// Get active span count.
1073    pub fn active_span_count(&self) -> usize {
1074        self.active_spans.read().len()
1075    }
1076}
1077
1078impl Default for ObservabilityContext {
1079    fn default() -> Self {
1080        Self {
1081            active_spans: RwLock::new(HashMap::new()),
1082            completed_spans: RwLock::new(Vec::new()),
1083            max_completed: 10000,
1084            prometheus: PrometheusExporter::new(),
1085        }
1086    }
1087}
1088
1089// ============================================================================
1090// GPU Profiler Integration Stubs
1091// ============================================================================
1092
1093/// GPU profiler backend type.
1094#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1095pub enum GpuProfilerBackend {
1096    /// NVIDIA Nsight Systems/Compute.
1097    Nsight,
1098    /// RenderDoc (cross-platform).
1099    RenderDoc,
1100    /// PIX for Windows.
1101    Pix,
1102    /// Apple Metal System Trace.
1103    MetalSystemTrace,
1104    /// AMD Radeon GPU Profiler.
1105    Rgp,
1106    /// Custom profiler.
1107    Custom,
1108}
1109
1110/// GPU profiler marker color.
1111#[derive(Debug, Clone, Copy)]
1112pub struct ProfilerColor {
1113    /// Red component (0-255).
1114    pub r: u8,
1115    /// Green component (0-255).
1116    pub g: u8,
1117    /// Blue component (0-255).
1118    pub b: u8,
1119    /// Alpha component (0-255).
1120    pub a: u8,
1121}
1122
1123impl ProfilerColor {
1124    /// Create a new color.
1125    pub const fn new(r: u8, g: u8, b: u8) -> Self {
1126        Self { r, g, b, a: 255 }
1127    }
1128
1129    /// Red color.
1130    pub const RED: Self = Self::new(255, 0, 0);
1131    /// Green color.
1132    pub const GREEN: Self = Self::new(0, 255, 0);
1133    /// Blue color.
1134    pub const BLUE: Self = Self::new(0, 0, 255);
1135    /// Yellow color.
1136    pub const YELLOW: Self = Self::new(255, 255, 0);
1137    /// Cyan color.
1138    pub const CYAN: Self = Self::new(0, 255, 255);
1139    /// Magenta color.
1140    pub const MAGENTA: Self = Self::new(255, 0, 255);
1141    /// Orange color.
1142    pub const ORANGE: Self = Self::new(255, 165, 0);
1143}
1144
1145/// GPU profiler range handle for scoped profiling.
1146pub struct ProfilerRange {
1147    /// Range name.
1148    #[allow(dead_code)]
1149    name: String,
1150    /// Backend being used.
1151    #[allow(dead_code)]
1152    backend: GpuProfilerBackend,
1153    /// Start time.
1154    start: Instant,
1155}
1156
1157impl ProfilerRange {
1158    /// Create a new profiler range (internal use).
1159    fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1160        Self {
1161            name: name.into(),
1162            backend,
1163            start: Instant::now(),
1164        }
1165    }
1166
1167    /// Create a stub profiler range for external profiler implementations.
1168    ///
1169    /// This is used by custom profiler implementations (like CUDA NVTX) that
1170    /// manage their own range lifecycle but need to return a ProfilerRange
1171    /// for API compatibility.
1172    pub fn stub(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1173        Self::new(name, backend)
1174    }
1175
1176    /// Get elapsed duration.
1177    pub fn elapsed(&self) -> Duration {
1178        self.start.elapsed()
1179    }
1180}
1181
1182impl Drop for ProfilerRange {
1183    fn drop(&mut self) {
1184        // In a real implementation, this would call the profiler API to end the range
1185        // e.g., nvtxRangePop() for NVTX
1186    }
1187}
1188
1189/// Trait for GPU profiler integration.
1190///
1191/// Implement this trait to integrate with specific GPU profiling tools.
1192/// The default implementation is a no-op for when no profiler is attached.
1193pub trait GpuProfiler: Send + Sync {
1194    /// Check if the profiler is available and attached.
1195    fn is_available(&self) -> bool {
1196        false
1197    }
1198
1199    /// Get the profiler backend type.
1200    fn backend(&self) -> GpuProfilerBackend;
1201
1202    /// Start a profiler capture session.
1203    fn start_capture(&self) -> Result<(), ProfilerError> {
1204        Ok(())
1205    }
1206
1207    /// End a profiler capture session.
1208    fn end_capture(&self) -> Result<(), ProfilerError> {
1209        Ok(())
1210    }
1211
1212    /// Trigger a frame/dispatch capture.
1213    fn trigger_capture(&self) -> Result<(), ProfilerError> {
1214        Ok(())
1215    }
1216
1217    /// Push a named range onto the profiler stack.
1218    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1219        ProfilerRange::new(name, self.backend())
1220    }
1221
1222    /// Pop the current range from the profiler stack.
1223    fn pop_range(&self) {}
1224
1225    /// Insert an instantaneous marker.
1226    fn mark(&self, _name: &str, _color: ProfilerColor) {}
1227
1228    /// Set a per-thread name for the profiler.
1229    fn set_thread_name(&self, _name: &str) {}
1230
1231    /// Add a message to the profiler output.
1232    fn message(&self, _text: &str) {}
1233
1234    /// Register a GPU memory allocation.
1235    fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1236
1237    /// Unregister a GPU memory allocation.
1238    fn unregister_allocation(&self, _ptr: u64) {}
1239}
1240
1241/// Profiler error type.
1242#[derive(Debug, Clone, thiserror::Error)]
1243pub enum ProfilerError {
1244    /// Profiler is not available.
1245    #[error("GPU profiler not available")]
1246    NotAvailable,
1247    /// Profiler is not attached.
1248    #[error("GPU profiler not attached")]
1249    NotAttached,
1250    /// Capture already in progress.
1251    #[error("Capture already in progress")]
1252    CaptureInProgress,
1253    /// No capture in progress.
1254    #[error("No capture in progress")]
1255    NoCaptureInProgress,
1256    /// Backend-specific error.
1257    #[error("Profiler error: {0}")]
1258    Backend(String),
1259}
1260
1261/// Null profiler implementation (no-op).
1262pub struct NullProfiler;
1263
1264impl GpuProfiler for NullProfiler {
1265    fn backend(&self) -> GpuProfilerBackend {
1266        GpuProfilerBackend::Custom
1267    }
1268}
1269
1270/// NVTX (NVIDIA Tools Extension) profiler stub.
1271///
1272/// When the real NVTX library is available, this integrates with
1273/// Nsight Systems and Nsight Compute.
1274pub struct NvtxProfiler {
1275    /// Whether NVTX is available.
1276    available: bool,
1277    /// Whether a capture is in progress.
1278    capture_in_progress: std::sync::atomic::AtomicBool,
1279}
1280
1281impl NvtxProfiler {
1282    /// Create a new NVTX profiler.
1283    ///
1284    /// In a real implementation, this would check for libnvtx availability.
1285    pub fn new() -> Self {
1286        Self {
1287            available: false, // Would check nvtxInitialize() in real impl
1288            capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1289        }
1290    }
1291
1292    /// Check if NVTX library is loaded.
1293    pub fn is_nvtx_loaded(&self) -> bool {
1294        // In real implementation: check if libnvtx is dynamically loaded
1295        self.available
1296    }
1297}
1298
1299impl Default for NvtxProfiler {
1300    fn default() -> Self {
1301        Self::new()
1302    }
1303}
1304
1305impl GpuProfiler for NvtxProfiler {
1306    fn is_available(&self) -> bool {
1307        self.available
1308    }
1309
1310    fn backend(&self) -> GpuProfilerBackend {
1311        GpuProfilerBackend::Nsight
1312    }
1313
1314    fn start_capture(&self) -> Result<(), ProfilerError> {
1315        if !self.available {
1316            return Err(ProfilerError::NotAvailable);
1317        }
1318        if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1319            return Err(ProfilerError::CaptureInProgress);
1320        }
1321        // Real impl: nvtxRangePushA("Capture")
1322        Ok(())
1323    }
1324
1325    fn end_capture(&self) -> Result<(), ProfilerError> {
1326        if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1327            return Err(ProfilerError::NoCaptureInProgress);
1328        }
1329        // Real impl: nvtxRangePop()
1330        Ok(())
1331    }
1332
1333    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1334        // Real impl: nvtxRangePushA(name) with color attribute
1335        ProfilerRange::new(name, self.backend())
1336    }
1337
1338    fn pop_range(&self) {
1339        // Real impl: nvtxRangePop()
1340    }
1341
1342    fn mark(&self, _name: &str, _color: ProfilerColor) {
1343        // Real impl: nvtxMarkA(name) with color
1344    }
1345
1346    fn set_thread_name(&self, _name: &str) {
1347        // Real impl: nvtxNameOsThread(thread_id, name)
1348    }
1349}
1350
1351/// RenderDoc profiler stub.
1352///
1353/// Integrates with RenderDoc for GPU frame capture and debugging.
1354pub struct RenderDocProfiler {
1355    /// Whether RenderDoc is attached.
1356    attached: bool,
1357}
1358
1359impl RenderDocProfiler {
1360    /// Create a new RenderDoc profiler.
1361    ///
1362    /// In a real implementation, this would use the RenderDoc in-app API.
1363    pub fn new() -> Self {
1364        Self {
1365            attached: false, // Would check RENDERDOC_GetAPI in real impl
1366        }
1367    }
1368
1369    /// Check if RenderDoc is attached to the process.
1370    pub fn is_attached(&self) -> bool {
1371        // Real impl: check RENDERDOC_API_VERSION via GetAPI
1372        self.attached
1373    }
1374
1375    /// Get RenderDoc capture file path.
1376    pub fn get_capture_path(&self) -> Option<String> {
1377        // Real impl: RENDERDOC_GetCapture
1378        None
1379    }
1380
1381    /// Launch RenderDoc UI.
1382    pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1383        if !self.attached {
1384            return Err(ProfilerError::NotAttached);
1385        }
1386        // Real impl: RENDERDOC_LaunchReplayUI
1387        Ok(())
1388    }
1389}
1390
1391impl Default for RenderDocProfiler {
1392    fn default() -> Self {
1393        Self::new()
1394    }
1395}
1396
1397impl GpuProfiler for RenderDocProfiler {
1398    fn is_available(&self) -> bool {
1399        self.attached
1400    }
1401
1402    fn backend(&self) -> GpuProfilerBackend {
1403        GpuProfilerBackend::RenderDoc
1404    }
1405
1406    fn trigger_capture(&self) -> Result<(), ProfilerError> {
1407        if !self.attached {
1408            return Err(ProfilerError::NotAttached);
1409        }
1410        // Real impl: RENDERDOC_TriggerCapture
1411        Ok(())
1412    }
1413
1414    fn start_capture(&self) -> Result<(), ProfilerError> {
1415        if !self.attached {
1416            return Err(ProfilerError::NotAttached);
1417        }
1418        // Real impl: RENDERDOC_StartFrameCapture
1419        Ok(())
1420    }
1421
1422    fn end_capture(&self) -> Result<(), ProfilerError> {
1423        // Real impl: RENDERDOC_EndFrameCapture
1424        Ok(())
1425    }
1426
1427    fn set_thread_name(&self, _name: &str) {
1428        // Real impl: RENDERDOC_SetCaptureOptionStr
1429    }
1430}
1431
1432/// Metal System Trace profiler stub (macOS).
1433///
1434/// Integrates with Xcode Instruments for Metal GPU profiling.
1435#[cfg(target_os = "macos")]
1436pub struct MetalProfiler {
1437    /// Whether Metal profiling is available.
1438    available: bool,
1439}
1440
1441#[cfg(target_os = "macos")]
1442impl MetalProfiler {
1443    /// Create a new Metal profiler.
1444    pub fn new() -> Self {
1445        Self { available: true }
1446    }
1447}
1448
1449#[cfg(target_os = "macos")]
1450impl Default for MetalProfiler {
1451    fn default() -> Self {
1452        Self::new()
1453    }
1454}
1455
1456#[cfg(target_os = "macos")]
1457impl GpuProfiler for MetalProfiler {
1458    fn is_available(&self) -> bool {
1459        self.available
1460    }
1461
1462    fn backend(&self) -> GpuProfilerBackend {
1463        GpuProfilerBackend::MetalSystemTrace
1464    }
1465
1466    fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1467        // Real impl: MTLCommandBuffer.pushDebugGroup(name)
1468        ProfilerRange::new(name, self.backend())
1469    }
1470
1471    fn pop_range(&self) {
1472        // Real impl: MTLCommandBuffer.popDebugGroup()
1473    }
1474
1475    fn mark(&self, _name: &str, _color: ProfilerColor) {
1476        // Real impl: MTLCommandBuffer.insertDebugSignpost(name)
1477    }
1478}
1479
1480/// GPU profiler manager for selecting and using profilers.
1481pub struct GpuProfilerManager {
1482    /// Active profiler.
1483    profiler: Arc<dyn GpuProfiler>,
1484    /// Enabled state.
1485    enabled: std::sync::atomic::AtomicBool,
1486}
1487
1488impl GpuProfilerManager {
1489    /// Create a new profiler manager with auto-detection.
1490    pub fn new() -> Self {
1491        // Try to detect available profiler
1492        let nvtx = NvtxProfiler::new();
1493        if nvtx.is_available() {
1494            return Self {
1495                profiler: Arc::new(nvtx),
1496                enabled: std::sync::atomic::AtomicBool::new(true),
1497            };
1498        }
1499
1500        let renderdoc = RenderDocProfiler::new();
1501        if renderdoc.is_available() {
1502            return Self {
1503                profiler: Arc::new(renderdoc),
1504                enabled: std::sync::atomic::AtomicBool::new(true),
1505            };
1506        }
1507
1508        // Fallback to null profiler
1509        Self {
1510            profiler: Arc::new(NullProfiler),
1511            enabled: std::sync::atomic::AtomicBool::new(false),
1512        }
1513    }
1514
1515    /// Create with a specific profiler.
1516    pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1517        let enabled = profiler.is_available();
1518        Self {
1519            profiler,
1520            enabled: std::sync::atomic::AtomicBool::new(enabled),
1521        }
1522    }
1523
1524    /// Check if profiling is enabled.
1525    pub fn is_enabled(&self) -> bool {
1526        self.enabled.load(Ordering::Relaxed)
1527    }
1528
1529    /// Enable or disable profiling.
1530    pub fn set_enabled(&self, enabled: bool) {
1531        self.enabled.store(enabled, Ordering::Relaxed);
1532    }
1533
1534    /// Get the profiler backend.
1535    pub fn backend(&self) -> GpuProfilerBackend {
1536        self.profiler.backend()
1537    }
1538
1539    /// Start a profiled scope.
1540    pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1541        ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1542    }
1543
1544    /// Start a profiled scope with color.
1545    pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1546        ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1547    }
1548
1549    /// Insert a marker.
1550    pub fn mark(&self, name: &str) {
1551        if self.is_enabled() {
1552            self.profiler.mark(name, ProfilerColor::CYAN);
1553        }
1554    }
1555
1556    /// Get access to the underlying profiler.
1557    pub fn profiler(&self) -> &dyn GpuProfiler {
1558        &*self.profiler
1559    }
1560}
1561
1562impl Default for GpuProfilerManager {
1563    fn default() -> Self {
1564        Self::new()
1565    }
1566}
1567
1568/// RAII scope for profiler ranges.
1569pub struct ProfilerScope<'a> {
1570    profiler: &'a dyn GpuProfiler,
1571    enabled: bool,
1572}
1573
1574impl<'a> ProfilerScope<'a> {
1575    fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1576        if enabled {
1577            profiler.push_range(name, ProfilerColor::CYAN);
1578        }
1579        Self { profiler, enabled }
1580    }
1581
1582    fn new_colored(
1583        name: &str,
1584        profiler: &'a dyn GpuProfiler,
1585        enabled: bool,
1586        color: ProfilerColor,
1587    ) -> Self {
1588        if enabled {
1589            profiler.push_range(name, color);
1590        }
1591        Self { profiler, enabled }
1592    }
1593}
1594
1595impl<'a> Drop for ProfilerScope<'a> {
1596    fn drop(&mut self) {
1597        if self.enabled {
1598            self.profiler.pop_range();
1599        }
1600    }
1601}
1602
1603/// Macro for scoped GPU profiling.
1604///
1605/// # Example
1606///
1607/// ```ignore
1608/// use ringkernel_core::gpu_profile;
1609///
1610/// fn compute_kernel() {
1611///     gpu_profile!(profiler, "compute_kernel", {
1612///         // GPU work here
1613///     });
1614/// }
1615/// ```
1616#[macro_export]
1617macro_rules! gpu_profile {
1618    ($profiler:expr, $name:expr) => {
1619        let _scope = $profiler.scope($name);
1620    };
1621    ($profiler:expr, $name:expr, $color:expr) => {
1622        let _scope = $profiler.scope_colored($name, $color);
1623    };
1624}
1625
1626// ============================================================================
1627// GPU Memory Dashboard
1628// ============================================================================
1629
1630/// GPU memory allocation type.
1631#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1632pub enum GpuMemoryType {
1633    /// Device-local memory (fastest, GPU only).
1634    DeviceLocal,
1635    /// Host-visible memory (accessible from CPU).
1636    HostVisible,
1637    /// Host-coherent memory (no explicit flush needed).
1638    HostCoherent,
1639    /// Mapped memory (CPU-GPU shared).
1640    Mapped,
1641    /// Queue buffers for message passing.
1642    QueueBuffer,
1643    /// Control block memory.
1644    ControlBlock,
1645    /// Shared memory (block-local).
1646    SharedMemory,
1647}
1648
1649/// A tracked GPU memory allocation.
1650#[derive(Debug, Clone)]
1651pub struct GpuMemoryAllocation {
1652    /// Unique allocation ID.
1653    pub id: u64,
1654    /// Allocation name/label.
1655    pub name: String,
1656    /// Size in bytes.
1657    pub size: usize,
1658    /// Memory type.
1659    pub memory_type: GpuMemoryType,
1660    /// Device index (for multi-GPU).
1661    pub device_index: u32,
1662    /// Kernel ID (if associated with a kernel).
1663    pub kernel_id: Option<String>,
1664    /// Allocation timestamp.
1665    pub allocated_at: Instant,
1666    /// Whether the allocation is currently in use.
1667    pub in_use: bool,
1668}
1669
1670/// GPU memory pool statistics.
1671#[derive(Debug, Clone, Default)]
1672pub struct GpuMemoryPoolStats {
1673    /// Pool name.
1674    pub name: String,
1675    /// Total capacity in bytes.
1676    pub capacity: usize,
1677    /// Currently allocated bytes.
1678    pub allocated: usize,
1679    /// Peak allocated bytes.
1680    pub peak_allocated: usize,
1681    /// Number of active allocations.
1682    pub allocation_count: u32,
1683    /// Number of allocations since creation.
1684    pub total_allocations: u64,
1685    /// Number of deallocations since creation.
1686    pub total_deallocations: u64,
1687    /// Fragmentation ratio (0.0 = none, 1.0 = fully fragmented).
1688    pub fragmentation: f32,
1689}
1690
1691impl GpuMemoryPoolStats {
1692    /// Get utilization percentage.
1693    pub fn utilization(&self) -> f32 {
1694        if self.capacity == 0 {
1695            0.0
1696        } else {
1697            (self.allocated as f32 / self.capacity as f32) * 100.0
1698        }
1699    }
1700}
1701
1702/// Per-device GPU memory statistics.
1703#[derive(Debug, Clone, Default)]
1704pub struct GpuDeviceMemoryStats {
1705    /// Device index.
1706    pub device_index: u32,
1707    /// Device name.
1708    pub device_name: String,
1709    /// Total device memory in bytes.
1710    pub total_memory: u64,
1711    /// Free device memory in bytes.
1712    pub free_memory: u64,
1713    /// Memory used by RingKernel.
1714    pub ringkernel_used: u64,
1715    /// Memory used by other applications.
1716    pub other_used: u64,
1717    /// Memory pool statistics.
1718    pub pools: Vec<GpuMemoryPoolStats>,
1719}
1720
1721impl GpuDeviceMemoryStats {
1722    /// Get used memory in bytes.
1723    pub fn used_memory(&self) -> u64 {
1724        self.total_memory - self.free_memory
1725    }
1726
1727    /// Get utilization percentage.
1728    pub fn utilization(&self) -> f32 {
1729        if self.total_memory == 0 {
1730            0.0
1731        } else {
1732            (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1733        }
1734    }
1735}
1736
1737/// GPU Memory Dashboard for monitoring and visualization.
1738///
1739/// Provides real-time GPU memory tracking with allocation history,
1740/// per-kernel usage, and memory pressure alerts.
1741///
1742/// # Example
1743///
1744/// ```ignore
1745/// use ringkernel_core::observability::GpuMemoryDashboard;
1746///
1747/// let dashboard = GpuMemoryDashboard::new();
1748///
1749/// // Track an allocation
1750/// dashboard.track_allocation(
1751///     1,
1752///     "input_queue",
1753///     65536,
1754///     GpuMemoryType::QueueBuffer,
1755///     0,
1756///     Some("processor_kernel"),
1757/// );
1758///
1759/// // Get current stats
1760/// let stats = dashboard.get_device_stats(0);
1761/// println!("GPU 0 utilization: {:.1}%", stats.utilization());
1762///
1763/// // Generate Grafana panel JSON
1764/// let panel = dashboard.grafana_panel();
1765/// ```
1766pub struct GpuMemoryDashboard {
1767    /// Active allocations.
1768    allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1769    /// Per-device statistics.
1770    device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1771    /// Memory pressure thresholds.
1772    thresholds: GpuMemoryThresholds,
1773    /// Allocation counter for unique IDs.
1774    allocation_counter: AtomicU64,
1775    /// Total bytes allocated.
1776    total_allocated: AtomicU64,
1777    /// Peak bytes allocated.
1778    peak_allocated: AtomicU64,
1779}
1780
1781/// Memory pressure thresholds for alerts.
1782#[derive(Debug, Clone)]
1783pub struct GpuMemoryThresholds {
1784    /// Warning threshold (percentage).
1785    pub warning: f32,
1786    /// Critical threshold (percentage).
1787    pub critical: f32,
1788    /// Maximum allocation size before warning (bytes).
1789    pub max_allocation_size: usize,
1790    /// Maximum number of allocations before warning.
1791    pub max_allocation_count: u32,
1792}
1793
1794impl Default for GpuMemoryThresholds {
1795    fn default() -> Self {
1796        Self {
1797            warning: 75.0,
1798            critical: 90.0,
1799            max_allocation_size: 1024 * 1024 * 1024, // 1 GB
1800            max_allocation_count: 10000,
1801        }
1802    }
1803}
1804
1805/// Memory pressure level.
1806#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1807pub enum MemoryPressureLevel {
1808    /// Memory usage is normal.
1809    Normal,
1810    /// Memory usage is elevated (approaching warning threshold).
1811    Elevated,
1812    /// Memory usage is at warning level.
1813    Warning,
1814    /// Memory usage is critical.
1815    Critical,
1816    /// Out of memory.
1817    OutOfMemory,
1818}
1819
1820impl GpuMemoryDashboard {
1821    /// Create a new GPU memory dashboard.
1822    pub fn new() -> Arc<Self> {
1823        Arc::new(Self {
1824            allocations: RwLock::new(HashMap::new()),
1825            device_stats: RwLock::new(HashMap::new()),
1826            thresholds: GpuMemoryThresholds::default(),
1827            allocation_counter: AtomicU64::new(1),
1828            total_allocated: AtomicU64::new(0),
1829            peak_allocated: AtomicU64::new(0),
1830        })
1831    }
1832
1833    /// Create with custom thresholds.
1834    pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1835        Arc::new(Self {
1836            allocations: RwLock::new(HashMap::new()),
1837            device_stats: RwLock::new(HashMap::new()),
1838            thresholds,
1839            allocation_counter: AtomicU64::new(1),
1840            total_allocated: AtomicU64::new(0),
1841            peak_allocated: AtomicU64::new(0),
1842        })
1843    }
1844
1845    /// Track a new GPU memory allocation.
1846    pub fn track_allocation(
1847        &self,
1848        id: u64,
1849        name: impl Into<String>,
1850        size: usize,
1851        memory_type: GpuMemoryType,
1852        device_index: u32,
1853        kernel_id: Option<&str>,
1854    ) {
1855        let allocation = GpuMemoryAllocation {
1856            id,
1857            name: name.into(),
1858            size,
1859            memory_type,
1860            device_index,
1861            kernel_id: kernel_id.map(String::from),
1862            allocated_at: Instant::now(),
1863            in_use: true,
1864        };
1865
1866        self.allocations.write().insert(id, allocation);
1867
1868        // Update totals
1869        let new_total = self
1870            .total_allocated
1871            .fetch_add(size as u64, Ordering::Relaxed)
1872            + size as u64;
1873        let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1874        while new_total > peak {
1875            match self.peak_allocated.compare_exchange_weak(
1876                peak,
1877                new_total,
1878                Ordering::Relaxed,
1879                Ordering::Relaxed,
1880            ) {
1881                Ok(_) => break,
1882                Err(current) => peak = current,
1883            }
1884        }
1885    }
1886
1887    /// Generate a new unique allocation ID.
1888    pub fn next_allocation_id(&self) -> u64 {
1889        self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1890    }
1891
1892    /// Track deallocation.
1893    pub fn track_deallocation(&self, id: u64) {
1894        let mut allocations = self.allocations.write();
1895        if let Some(alloc) = allocations.remove(&id) {
1896            self.total_allocated
1897                .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1898        }
1899    }
1900
1901    /// Mark an allocation as no longer in use (but not freed).
1902    pub fn mark_unused(&self, id: u64) {
1903        let mut allocations = self.allocations.write();
1904        if let Some(alloc) = allocations.get_mut(&id) {
1905            alloc.in_use = false;
1906        }
1907    }
1908
1909    /// Register a GPU device.
1910    pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1911        let stats = GpuDeviceMemoryStats {
1912            device_index,
1913            device_name: name.into(),
1914            total_memory,
1915            free_memory: total_memory,
1916            ringkernel_used: 0,
1917            other_used: 0,
1918            pools: Vec::new(),
1919        };
1920        self.device_stats.write().insert(device_index, stats);
1921    }
1922
1923    /// Update device memory statistics.
1924    pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1925        let mut stats = self.device_stats.write();
1926        if let Some(device) = stats.get_mut(&device_index) {
1927            device.free_memory = free_memory;
1928            device.ringkernel_used = ringkernel_used;
1929            device.other_used = device
1930                .total_memory
1931                .saturating_sub(free_memory + ringkernel_used);
1932        }
1933    }
1934
1935    /// Get device statistics.
1936    pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1937        self.device_stats.read().get(&device_index).cloned()
1938    }
1939
1940    /// Get all device statistics.
1941    pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1942        self.device_stats.read().values().cloned().collect()
1943    }
1944
1945    /// Get all active allocations.
1946    pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1947        self.allocations.read().values().cloned().collect()
1948    }
1949
1950    /// Get allocations for a specific kernel.
1951    pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1952        self.allocations
1953            .read()
1954            .values()
1955            .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1956            .cloned()
1957            .collect()
1958    }
1959
1960    /// Get total allocated memory.
1961    pub fn total_allocated(&self) -> u64 {
1962        self.total_allocated.load(Ordering::Relaxed)
1963    }
1964
1965    /// Get peak allocated memory.
1966    pub fn peak_allocated(&self) -> u64 {
1967        self.peak_allocated.load(Ordering::Relaxed)
1968    }
1969
1970    /// Get allocation count.
1971    pub fn allocation_count(&self) -> usize {
1972        self.allocations.read().len()
1973    }
1974
1975    /// Check memory pressure level for a device.
1976    pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1977        let stats = self.device_stats.read();
1978        if let Some(device) = stats.get(&device_index) {
1979            let utilization = device.utilization();
1980            if device.free_memory == 0 {
1981                MemoryPressureLevel::OutOfMemory
1982            } else if utilization >= self.thresholds.critical {
1983                MemoryPressureLevel::Critical
1984            } else if utilization >= self.thresholds.warning {
1985                MemoryPressureLevel::Warning
1986            } else if utilization >= self.thresholds.warning * 0.8 {
1987                MemoryPressureLevel::Elevated
1988            } else {
1989                MemoryPressureLevel::Normal
1990            }
1991        } else {
1992            MemoryPressureLevel::Normal
1993        }
1994    }
1995
1996    /// Generate Grafana dashboard panel for GPU memory.
1997    pub fn grafana_panel(&self) -> GrafanaPanel {
1998        GrafanaPanel {
1999            title: "GPU Memory Usage".to_string(),
2000            panel_type: PanelType::BarGauge,
2001            queries: vec![
2002                "ringkernel_gpu_memory_allocated_bytes".to_string(),
2003                "ringkernel_gpu_memory_peak_bytes".to_string(),
2004            ],
2005            grid_pos: (0, 0, 12, 8),
2006            unit: Some("bytes".to_string()),
2007        }
2008    }
2009
2010    /// Generate Prometheus metrics for GPU memory.
2011    ///
2012    /// All `writeln!` calls use `expect` because `fmt::Write` for `String` is infallible.
2013    pub fn prometheus_metrics(&self) -> String {
2014        let mut output = String::new();
2015
2016        // Total allocated
2017        writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").expect("write to String is infallible");
2018        writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge")
2019            .expect("write to String is infallible");
2020        writeln!(
2021            output,
2022            "ringkernel_gpu_memory_allocated_bytes {}",
2023            self.total_allocated()
2024        )
2025        .expect("write to String is infallible");
2026
2027        // Peak allocated
2028        writeln!(
2029            output,
2030            "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2031        )
2032        .expect("write to String is infallible");
2033        writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge")
2034            .expect("write to String is infallible");
2035        writeln!(
2036            output,
2037            "ringkernel_gpu_memory_peak_bytes {}",
2038            self.peak_allocated()
2039        )
2040        .expect("write to String is infallible");
2041
2042        // Allocation count
2043        writeln!(
2044            output,
2045            "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2046        )
2047        .expect("write to String is infallible");
2048        writeln!(
2049            output,
2050            "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2051        )
2052        .expect("write to String is infallible");
2053        writeln!(
2054            output,
2055            "ringkernel_gpu_memory_allocation_count {}",
2056            self.allocation_count()
2057        )
2058        .expect("write to String is infallible");
2059
2060        // Per-device stats
2061        let device_stats = self.device_stats.read();
2062        for device in device_stats.values() {
2063            writeln!(
2064                output,
2065                "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2066                device.device_name, device.total_memory
2067            )
2068            .expect("write to String is infallible");
2069            writeln!(
2070                output,
2071                "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2072                device.device_name, device.free_memory
2073            )
2074            .expect("write to String is infallible");
2075            writeln!(
2076                output,
2077                "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2078                device.device_name,
2079                device.used_memory()
2080            )
2081            .expect("write to String is infallible");
2082            writeln!(
2083                output,
2084                "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2085                device.device_name,
2086                device.utilization()
2087            )
2088            .expect("write to String is infallible");
2089        }
2090
2091        output
2092    }
2093
2094    /// Generate a memory summary report.
2095    ///
2096    /// All `writeln!` calls use `expect` because `fmt::Write` for `String` is infallible.
2097    pub fn summary_report(&self) -> String {
2098        let mut report = String::new();
2099
2100        writeln!(report, "=== GPU Memory Dashboard ===").expect("write to String is infallible");
2101        writeln!(report, "Total Allocated: {} bytes", self.total_allocated())
2102            .expect("write to String is infallible");
2103        writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated())
2104            .expect("write to String is infallible");
2105        writeln!(report, "Active Allocations: {}", self.allocation_count())
2106            .expect("write to String is infallible");
2107        writeln!(report).expect("write to String is infallible");
2108
2109        // Device summary
2110        let device_stats = self.device_stats.read();
2111        for device in device_stats.values() {
2112            writeln!(
2113                report,
2114                "--- Device {} ({}) ---",
2115                device.device_index, device.device_name
2116            )
2117            .expect("write to String is infallible");
2118            writeln!(
2119                report,
2120                "  Total: {} MB",
2121                device.total_memory / (1024 * 1024)
2122            )
2123            .expect("write to String is infallible");
2124            writeln!(report, "  Free: {} MB", device.free_memory / (1024 * 1024))
2125                .expect("write to String is infallible");
2126            writeln!(
2127                report,
2128                "  RingKernel: {} MB",
2129                device.ringkernel_used / (1024 * 1024)
2130            )
2131            .expect("write to String is infallible");
2132            writeln!(report, "  Utilization: {:.1}%", device.utilization())
2133                .expect("write to String is infallible");
2134            writeln!(
2135                report,
2136                "  Pressure: {:?}",
2137                self.check_pressure(device.device_index)
2138            )
2139            .expect("write to String is infallible");
2140        }
2141
2142        // Top allocations by size
2143        let allocations = self.allocations.read();
2144        let mut sorted_allocs: Vec<_> = allocations.values().collect();
2145        sorted_allocs.sort_by_key(|a| std::cmp::Reverse(a.size));
2146
2147        if !sorted_allocs.is_empty() {
2148            writeln!(report).expect("write to String is infallible");
2149            writeln!(report, "--- Top 10 Allocations ---").expect("write to String is infallible");
2150            for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2151                writeln!(
2152                    report,
2153                    "  {}. {} - {} bytes ({:?})",
2154                    i + 1,
2155                    alloc.name,
2156                    alloc.size,
2157                    alloc.memory_type
2158                )
2159                .expect("write to String is infallible");
2160            }
2161        }
2162
2163        report
2164    }
2165}
2166
2167impl Default for GpuMemoryDashboard {
2168    fn default() -> Self {
2169        Self {
2170            allocations: RwLock::new(HashMap::new()),
2171            device_stats: RwLock::new(HashMap::new()),
2172            thresholds: GpuMemoryThresholds::default(),
2173            allocation_counter: AtomicU64::new(1),
2174            total_allocated: AtomicU64::new(0),
2175            peak_allocated: AtomicU64::new(0),
2176        }
2177    }
2178}
2179
2180// ============================================================================
2181// OTLP (OpenTelemetry Protocol) Exporter
2182// ============================================================================
2183
2184/// OTLP transport protocol.
2185#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2186pub enum OtlpTransport {
2187    /// HTTP with JSON encoding (default, no extra dependencies).
2188    #[default]
2189    HttpJson,
2190    /// HTTP with Protobuf encoding (requires protobuf support).
2191    HttpProtobuf,
2192    /// gRPC transport (requires tonic).
2193    Grpc,
2194}
2195
2196/// Configuration for OTLP exporter.
2197#[derive(Debug, Clone)]
2198pub struct OtlpConfig {
2199    /// OTLP endpoint URL (e.g., "http://localhost:4318/v1/traces").
2200    pub endpoint: String,
2201    /// Transport protocol.
2202    pub transport: OtlpTransport,
2203    /// Service name for resource attributes.
2204    pub service_name: String,
2205    /// Service version.
2206    pub service_version: String,
2207    /// Service instance ID.
2208    pub service_instance_id: Option<String>,
2209    /// Additional resource attributes.
2210    pub resource_attributes: Vec<(String, String)>,
2211    /// Export batch size.
2212    pub batch_size: usize,
2213    /// Export interval.
2214    pub export_interval: Duration,
2215    /// Request timeout.
2216    pub timeout: Duration,
2217    /// Maximum retry attempts.
2218    pub max_retries: u32,
2219    /// Retry delay (base for exponential backoff).
2220    pub retry_delay: Duration,
2221    /// Optional authorization header.
2222    pub authorization: Option<String>,
2223}
2224
2225impl Default for OtlpConfig {
2226    fn default() -> Self {
2227        Self {
2228            endpoint: "http://localhost:4318/v1/traces".to_string(),
2229            transport: OtlpTransport::HttpJson,
2230            service_name: "ringkernel".to_string(),
2231            service_version: env!("CARGO_PKG_VERSION").to_string(),
2232            service_instance_id: None,
2233            resource_attributes: Vec::new(),
2234            batch_size: 512,
2235            export_interval: Duration::from_secs(5),
2236            timeout: Duration::from_secs(30),
2237            max_retries: 3,
2238            retry_delay: Duration::from_millis(100),
2239            authorization: None,
2240        }
2241    }
2242}
2243
2244impl OtlpConfig {
2245    /// Create a new OTLP configuration.
2246    pub fn new(endpoint: impl Into<String>) -> Self {
2247        Self {
2248            endpoint: endpoint.into(),
2249            ..Default::default()
2250        }
2251    }
2252
2253    /// Set the service name.
2254    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
2255        self.service_name = name.into();
2256        self
2257    }
2258
2259    /// Set the service version.
2260    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
2261        self.service_version = version.into();
2262        self
2263    }
2264
2265    /// Set the service instance ID.
2266    pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
2267        self.service_instance_id = Some(id.into());
2268        self
2269    }
2270
2271    /// Add a resource attribute.
2272    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2273        self.resource_attributes.push((key.into(), value.into()));
2274        self
2275    }
2276
2277    /// Set the batch size.
2278    pub fn with_batch_size(mut self, size: usize) -> Self {
2279        self.batch_size = size;
2280        self
2281    }
2282
2283    /// Set the export interval.
2284    pub fn with_export_interval(mut self, interval: Duration) -> Self {
2285        self.export_interval = interval;
2286        self
2287    }
2288
2289    /// Set the authorization header.
2290    pub fn with_authorization(mut self, auth: impl Into<String>) -> Self {
2291        self.authorization = Some(auth.into());
2292        self
2293    }
2294
2295    /// Configure for Jaeger OTLP endpoint.
2296    pub fn jaeger(endpoint: impl Into<String>) -> Self {
2297        Self::new(endpoint).with_service_name("ringkernel")
2298    }
2299
2300    /// Configure for Honeycomb.
2301    pub fn honeycomb(api_key: impl Into<String>) -> Self {
2302        Self::new("https://api.honeycomb.io/v1/traces")
2303            .with_authorization(format!("x-honeycomb-team {}", api_key.into()))
2304    }
2305
2306    /// Configure for Grafana Cloud.
2307    pub fn grafana_cloud(instance_id: impl Into<String>, api_key: impl Into<String>) -> Self {
2308        let instance = instance_id.into();
2309        Self::new("https://otlp-gateway-prod-us-central-0.grafana.net/otlp/v1/traces")
2310            .with_authorization(format!("Basic {}", api_key.into()))
2311            .with_attribute("grafana.instance", instance)
2312    }
2313}
2314
2315/// OTLP export result.
2316#[derive(Debug, Clone)]
2317pub struct OtlpExportResult {
2318    /// Number of spans exported.
2319    pub spans_exported: usize,
2320    /// Whether the export succeeded.
2321    pub success: bool,
2322    /// Error message if export failed.
2323    pub error: Option<String>,
2324    /// Export duration.
2325    pub duration: Duration,
2326    /// Number of retry attempts.
2327    pub retry_count: u32,
2328}
2329
2330/// Statistics for the OTLP exporter.
2331#[derive(Debug, Clone, Default)]
2332pub struct OtlpExporterStats {
2333    /// Total spans exported.
2334    pub total_spans_exported: u64,
2335    /// Total export attempts.
2336    pub total_exports: u64,
2337    /// Successful exports.
2338    pub successful_exports: u64,
2339    /// Failed exports.
2340    pub failed_exports: u64,
2341    /// Total retry attempts.
2342    pub total_retries: u64,
2343    /// Spans currently in buffer.
2344    pub buffered_spans: usize,
2345    /// Last export time.
2346    pub last_export: Option<Instant>,
2347    /// Last error message.
2348    pub last_error: Option<String>,
2349}
2350
2351/// OTLP span exporter for sending traces to OTLP-compatible backends.
2352///
2353/// Supports HTTP/JSON transport with automatic batching and retries.
2354pub struct OtlpExporter {
2355    config: OtlpConfig,
2356    buffer: RwLock<Vec<Span>>,
2357    stats: RwLock<OtlpExporterStats>,
2358}
2359
2360impl OtlpExporter {
2361    /// Create a new OTLP exporter with the given configuration.
2362    pub fn new(config: OtlpConfig) -> Self {
2363        Self {
2364            config,
2365            buffer: RwLock::new(Vec::new()),
2366            stats: RwLock::new(OtlpExporterStats::default()),
2367        }
2368    }
2369
2370    /// Create an exporter for a local Jaeger instance.
2371    pub fn jaeger_local() -> Self {
2372        Self::new(OtlpConfig::jaeger("http://localhost:4318/v1/traces"))
2373    }
2374
2375    /// Get the exporter configuration.
2376    pub fn config(&self) -> &OtlpConfig {
2377        &self.config
2378    }
2379
2380    /// Get current statistics.
2381    pub fn stats(&self) -> OtlpExporterStats {
2382        self.stats.read().clone()
2383    }
2384
2385    /// Add a span to the export buffer.
2386    pub fn export_span(&self, span: Span) {
2387        let mut buffer = self.buffer.write();
2388        buffer.push(span);
2389
2390        let should_flush = buffer.len() >= self.config.batch_size;
2391        drop(buffer);
2392
2393        if should_flush {
2394            let _ = self.flush();
2395        }
2396    }
2397
2398    /// Add multiple spans to the export buffer.
2399    pub fn export_spans(&self, spans: Vec<Span>) {
2400        let mut buffer = self.buffer.write();
2401        buffer.extend(spans);
2402
2403        let should_flush = buffer.len() >= self.config.batch_size;
2404        drop(buffer);
2405
2406        if should_flush {
2407            let _ = self.flush();
2408        }
2409    }
2410
2411    /// Get the number of buffered spans.
2412    pub fn buffered_count(&self) -> usize {
2413        self.buffer.read().len()
2414    }
2415
2416    /// Flush all buffered spans to the OTLP endpoint.
2417    pub fn flush(&self) -> OtlpExportResult {
2418        let spans: Vec<Span> = {
2419            let mut buffer = self.buffer.write();
2420            std::mem::take(&mut *buffer)
2421        };
2422
2423        if spans.is_empty() {
2424            return OtlpExportResult {
2425                spans_exported: 0,
2426                success: true,
2427                error: None,
2428                duration: Duration::ZERO,
2429                retry_count: 0,
2430            };
2431        }
2432
2433        let start = Instant::now();
2434        let result = self.send_spans(&spans);
2435        let duration = start.elapsed();
2436
2437        // Update stats
2438        {
2439            let mut stats = self.stats.write();
2440            stats.total_exports += 1;
2441            stats.last_export = Some(Instant::now());
2442
2443            if result.success {
2444                stats.successful_exports += 1;
2445                stats.total_spans_exported += spans.len() as u64;
2446            } else {
2447                stats.failed_exports += 1;
2448                stats.last_error = result.error.clone();
2449                // Put spans back in buffer for retry
2450                let mut buffer = self.buffer.write();
2451                buffer.extend(spans);
2452            }
2453            stats.total_retries += result.retry_count as u64;
2454            stats.buffered_spans = self.buffer.read().len();
2455        }
2456
2457        OtlpExportResult {
2458            spans_exported: if result.success {
2459                result.spans_exported
2460            } else {
2461                0
2462            },
2463            duration,
2464            ..result
2465        }
2466    }
2467
2468    /// Send spans to the OTLP endpoint.
2469    fn send_spans(&self, spans: &[Span]) -> OtlpExportResult {
2470        // Without the alerting/otel feature (reqwest), we can only buffer spans
2471        #[cfg(not(any(feature = "alerting", feature = "otel")))]
2472        {
2473            tracing::debug!(
2474                span_count = spans.len(),
2475                endpoint = %self.config.endpoint,
2476                "OTLP stub: would export spans (enable 'alerting' or 'otel' feature for HTTP export)"
2477            );
2478            OtlpExportResult {
2479                spans_exported: spans.len(),
2480                success: true,
2481                error: None,
2482                duration: Duration::ZERO,
2483                retry_count: 0,
2484            }
2485        }
2486
2487        #[cfg(any(feature = "alerting", feature = "otel"))]
2488        {
2489            self.send_spans_http(spans)
2490        }
2491    }
2492
2493    /// Send spans via HTTP (requires alerting or otel feature).
2494    #[cfg(any(feature = "alerting", feature = "otel"))]
2495    fn send_spans_http(&self, spans: &[Span]) -> OtlpExportResult {
2496        let payload = self.build_otlp_json(spans);
2497
2498        let client = reqwest::blocking::Client::builder()
2499            .timeout(self.config.timeout)
2500            .build();
2501
2502        let client = match client {
2503            Ok(c) => c,
2504            Err(e) => {
2505                return OtlpExportResult {
2506                    spans_exported: 0,
2507                    success: false,
2508                    error: Some(format!("Failed to create HTTP client: {}", e)),
2509                    duration: Duration::ZERO,
2510                    retry_count: 0,
2511                };
2512            }
2513        };
2514
2515        let mut retry_count = 0;
2516        let mut last_error = None;
2517
2518        for attempt in 0..=self.config.max_retries {
2519            let mut request = client
2520                .post(&self.config.endpoint)
2521                .header("Content-Type", "application/json")
2522                .body(payload.clone());
2523
2524            if let Some(auth) = &self.config.authorization {
2525                request = request.header("Authorization", auth);
2526            }
2527
2528            match request.send() {
2529                Ok(response) => {
2530                    if response.status().is_success() {
2531                        return OtlpExportResult {
2532                            spans_exported: spans.len(),
2533                            success: true,
2534                            error: None,
2535                            duration: Duration::ZERO,
2536                            retry_count,
2537                        };
2538                    } else {
2539                        last_error = Some(format!(
2540                            "HTTP {}: {}",
2541                            response.status(),
2542                            response.status().as_str()
2543                        ));
2544                    }
2545                }
2546                Err(e) => {
2547                    last_error = Some(format!("Request failed: {}", e));
2548                }
2549            }
2550
2551            if attempt < self.config.max_retries {
2552                retry_count += 1;
2553                std::thread::sleep(self.config.retry_delay * (1 << attempt));
2554            }
2555        }
2556
2557        OtlpExportResult {
2558            spans_exported: 0,
2559            success: false,
2560            error: last_error,
2561            duration: Duration::ZERO,
2562            retry_count,
2563        }
2564    }
2565
2566    /// Build OTLP JSON payload.
2567    #[cfg(any(feature = "alerting", feature = "otel"))]
2568    fn build_otlp_json(&self, spans: &[Span]) -> String {
2569        use std::fmt::Write;
2570
2571        let mut json = String::with_capacity(4096);
2572
2573        // Resource spans structure
2574        json.push_str(r#"{"resourceSpans":[{"resource":{"attributes":["#);
2575
2576        // Service name
2577        let _ = write!(
2578            json,
2579            r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}}"#,
2580            escape_json_str(&self.config.service_name)
2581        );
2582
2583        // Service version
2584        let _ = write!(
2585            json,
2586            r#",{{"key":"service.version","value":{{"stringValue":"{}"}}}}"#,
2587            escape_json_str(&self.config.service_version)
2588        );
2589
2590        // Instance ID
2591        if let Some(instance_id) = &self.config.service_instance_id {
2592            let _ = write!(
2593                json,
2594                r#",{{"key":"service.instance.id","value":{{"stringValue":"{}"}}}}"#,
2595                escape_json_str(instance_id)
2596            );
2597        }
2598
2599        // Additional attributes
2600        for (key, value) in &self.config.resource_attributes {
2601            let _ = write!(
2602                json,
2603                r#",{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
2604                escape_json_str(key),
2605                escape_json_str(value)
2606            );
2607        }
2608
2609        json.push_str(r#"]},"scopeSpans":[{"scope":{"name":"ringkernel"},"spans":["#);
2610
2611        // Add spans
2612        let mut first = true;
2613        for span in spans {
2614            if !first {
2615                json.push(',');
2616            }
2617            first = false;
2618            self.span_to_json(&mut json, span);
2619        }
2620
2621        json.push_str("]}]}]}");
2622        json
2623    }
2624
2625    /// Convert a span to OTLP JSON format.
2626    #[cfg(any(feature = "alerting", feature = "otel"))]
2627    fn span_to_json(&self, json: &mut String, span: &Span) {
2628        use std::fmt::Write;
2629
2630        let _ = write!(
2631            json,
2632            r#"{{"traceId":"{}","spanId":"{}""#,
2633            span.trace_id.to_hex(),
2634            span.span_id.to_hex()
2635        );
2636
2637        if let Some(parent) = span.parent_span_id {
2638            let _ = write!(json, r#","parentSpanId":"{}""#, parent.to_hex());
2639        }
2640
2641        let _ = write!(
2642            json,
2643            r#","name":"{}","kind":{}"#,
2644            escape_json_str(&span.name),
2645            match span.kind {
2646                SpanKind::Internal => 1,
2647                SpanKind::Server => 2,
2648                SpanKind::Client => 3,
2649                SpanKind::Producer => 4,
2650                SpanKind::Consumer => 5,
2651            }
2652        );
2653
2654        // Convert timestamps to nanoseconds since epoch
2655        let start_nanos = span.start_time.elapsed().as_nanos();
2656        let end_nanos = span
2657            .end_time
2658            .map(|t| t.elapsed().as_nanos())
2659            .unwrap_or(start_nanos);
2660
2661        // Note: These are approximate since we use Instant, not SystemTime
2662        let _ = write!(
2663            json,
2664            r#","startTimeUnixNano":"{}","endTimeUnixNano":"{}""#,
2665            start_nanos, end_nanos
2666        );
2667
2668        // Status
2669        let _ = write!(
2670            json,
2671            r#","status":{{"code":{}}}"#,
2672            match &span.status {
2673                SpanStatus::Unset => 0,
2674                SpanStatus::Ok => 1,
2675                SpanStatus::Error { .. } => 2,
2676            }
2677        );
2678
2679        // Attributes
2680        if !span.attributes.is_empty() {
2681            json.push_str(r#","attributes":["#);
2682            let mut first = true;
2683            for (key, value) in &span.attributes {
2684                if !first {
2685                    json.push(',');
2686                }
2687                first = false;
2688                let _ = write!(
2689                    json,
2690                    r#"{{"key":"{}","value":{}}}"#,
2691                    escape_json_str(key),
2692                    attribute_value_to_json(value)
2693                );
2694            }
2695            json.push(']');
2696        }
2697
2698        // Events
2699        if !span.events.is_empty() {
2700            json.push_str(r#","events":["#);
2701            let mut first = true;
2702            for event in &span.events {
2703                if !first {
2704                    json.push(',');
2705                }
2706                first = false;
2707                let _ = write!(
2708                    json,
2709                    r#"{{"name":"{}","timeUnixNano":"{}"}}"#,
2710                    escape_json_str(&event.name),
2711                    event.timestamp.elapsed().as_nanos()
2712                );
2713            }
2714            json.push(']');
2715        }
2716
2717        json.push('}');
2718    }
2719}
2720
2721/// Helper to escape JSON strings.
2722#[cfg(any(feature = "alerting", feature = "otel"))]
2723fn escape_json_str(s: &str) -> String {
2724    s.replace('\\', "\\\\")
2725        .replace('"', "\\\"")
2726        .replace('\n', "\\n")
2727        .replace('\r', "\\r")
2728        .replace('\t', "\\t")
2729}
2730
2731/// Convert AttributeValue to OTLP JSON format.
2732#[cfg(any(feature = "alerting", feature = "otel"))]
2733fn attribute_value_to_json(value: &AttributeValue) -> String {
2734    match value {
2735        AttributeValue::String(s) => format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)),
2736        AttributeValue::Int(i) => format!(r#"{{"intValue":"{}"}}"#, i),
2737        AttributeValue::Float(f) => format!(r#"{{"doubleValue":{}}}"#, f),
2738        AttributeValue::Bool(b) => format!(r#"{{"boolValue":{}}}"#, b),
2739        AttributeValue::StringArray(arr) => {
2740            let values: Vec<String> = arr
2741                .iter()
2742                .map(|s| format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)))
2743                .collect();
2744            format!(r#"{{"arrayValue":{{"values":[{}]}}}}"#, values.join(","))
2745        }
2746    }
2747}
2748
2749#[cfg(test)]
2750mod tests {
2751    use super::*;
2752    use crate::runtime::KernelId;
2753
2754    #[test]
2755    fn test_trace_id_generation() {
2756        let id1 = TraceId::new();
2757        let id2 = TraceId::new();
2758        assert_ne!(id1.0, id2.0);
2759    }
2760
2761    #[test]
2762    fn test_trace_id_hex() {
2763        let id = TraceId(0x123456789abcdef0123456789abcdef0);
2764        let hex = id.to_hex();
2765        assert_eq!(hex.len(), 32);
2766        let parsed = TraceId::from_hex(&hex).unwrap();
2767        assert_eq!(id, parsed);
2768    }
2769
2770    #[test]
2771    fn test_span_creation() {
2772        let span = Span::new("test_operation", SpanKind::Internal);
2773        assert!(!span.is_ended());
2774        assert_eq!(span.name, "test_operation");
2775    }
2776
2777    #[test]
2778    fn test_span_child() {
2779        let parent = Span::new("parent", SpanKind::Server);
2780        let child = parent.child("child", SpanKind::Internal);
2781
2782        assert_eq!(child.trace_id, parent.trace_id);
2783        assert_eq!(child.parent_span_id, Some(parent.span_id));
2784    }
2785
2786    #[test]
2787    fn test_span_attributes() {
2788        let mut span = Span::new("test", SpanKind::Internal);
2789        span.set_attribute("string_key", "value");
2790        span.set_attribute("int_key", 42i64);
2791        span.set_attribute("bool_key", true);
2792
2793        assert_eq!(span.attributes.len(), 3);
2794    }
2795
2796    #[test]
2797    fn test_span_events() {
2798        let mut span = Span::new("test", SpanKind::Internal);
2799        span.add_event("event1");
2800        span.add_event("event2");
2801
2802        assert_eq!(span.events.len(), 2);
2803    }
2804
2805    #[test]
2806    fn test_span_builder() {
2807        let parent = Span::new("parent", SpanKind::Server);
2808        let span = SpanBuilder::new("child")
2809            .kind(SpanKind::Client)
2810            .parent(&parent)
2811            .attribute("key", "value")
2812            .build();
2813
2814        assert_eq!(span.trace_id, parent.trace_id);
2815        assert_eq!(span.kind, SpanKind::Client);
2816        assert!(span.attributes.contains_key("key"));
2817    }
2818
2819    #[test]
2820    fn test_prometheus_exporter() {
2821        let exporter = PrometheusExporter::new();
2822        exporter.register_counter("test_counter", "A test counter", &["label1"]);
2823        exporter.register_gauge("test_gauge", "A test gauge", &[]);
2824
2825        exporter.inc_counter("test_counter", &["value1"]);
2826        exporter.inc_counter("test_counter", &["value1"]);
2827        exporter.set_metric("test_gauge", 42.0, &[]);
2828
2829        let output = exporter.render();
2830        assert!(output.contains("test_counter"));
2831        assert!(output.contains("test_gauge"));
2832    }
2833
2834    #[test]
2835    fn test_grafana_dashboard() {
2836        let dashboard = GrafanaDashboard::new("Test Dashboard")
2837            .description("A test dashboard")
2838            .add_throughput_panel()
2839            .add_latency_panel()
2840            .build();
2841
2842        assert!(dashboard.contains("Test Dashboard"));
2843        assert!(dashboard.contains("Message Throughput"));
2844        assert!(dashboard.contains("Message Latency"));
2845    }
2846
2847    #[test]
2848    fn test_observability_context() {
2849        let ctx = ObservabilityContext::new();
2850
2851        let span = ctx.start_span("test", SpanKind::Internal);
2852        assert_eq!(ctx.active_span_count(), 1);
2853
2854        ctx.end_span(span);
2855        assert_eq!(ctx.active_span_count(), 0);
2856
2857        let exported = ctx.export_spans();
2858        assert_eq!(exported.len(), 1);
2859    }
2860
2861    #[test]
2862    fn test_ringkernel_collector() {
2863        let collector = Arc::new(MetricsCollector::new());
2864        let kernel_id = KernelId::new("test");
2865
2866        collector.record_message_processed(&kernel_id, 100);
2867        collector.record_message_processed(&kernel_id, 200);
2868
2869        let prom_collector = RingKernelCollector::new(collector);
2870        let defs = prom_collector.definitions();
2871        let samples = prom_collector.collect();
2872
2873        assert!(!defs.is_empty());
2874        assert!(!samples.is_empty());
2875    }
2876
2877    // GPU Profiler tests
2878
2879    #[test]
2880    fn test_profiler_color() {
2881        let color = ProfilerColor::new(128, 64, 32);
2882        assert_eq!(color.r, 128);
2883        assert_eq!(color.g, 64);
2884        assert_eq!(color.b, 32);
2885        assert_eq!(color.a, 255);
2886
2887        assert_eq!(ProfilerColor::RED.r, 255);
2888        assert_eq!(ProfilerColor::GREEN.g, 255);
2889        assert_eq!(ProfilerColor::BLUE.b, 255);
2890    }
2891
2892    #[test]
2893    fn test_null_profiler() {
2894        let profiler = NullProfiler;
2895        assert!(!profiler.is_available());
2896        assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2897
2898        // All operations should be no-ops
2899        assert!(profiler.start_capture().is_ok());
2900        assert!(profiler.end_capture().is_ok());
2901        assert!(profiler.trigger_capture().is_ok());
2902
2903        let range = profiler.push_range("test", ProfilerColor::RED);
2904        let _elapsed = range.elapsed(); // Just verify it doesn't panic
2905        profiler.pop_range();
2906        profiler.mark("marker", ProfilerColor::BLUE);
2907        profiler.set_thread_name("thread");
2908    }
2909
2910    #[test]
2911    fn test_nvtx_profiler_stub() {
2912        let profiler = NvtxProfiler::new();
2913        assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2914
2915        // Not available by default (stub)
2916        assert!(!profiler.is_available());
2917        assert!(!profiler.is_nvtx_loaded());
2918
2919        // Start capture should fail when not available
2920        assert!(matches!(
2921            profiler.start_capture(),
2922            Err(ProfilerError::NotAvailable)
2923        ));
2924    }
2925
2926    #[test]
2927    fn test_renderdoc_profiler_stub() {
2928        let profiler = RenderDocProfiler::new();
2929        assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2930
2931        // Not attached by default (stub)
2932        assert!(!profiler.is_available());
2933        assert!(!profiler.is_attached());
2934        assert!(profiler.get_capture_path().is_none());
2935
2936        // Launch UI should fail when not attached
2937        assert!(matches!(
2938            profiler.launch_ui(),
2939            Err(ProfilerError::NotAttached)
2940        ));
2941    }
2942
2943    #[test]
2944    fn test_gpu_profiler_manager() {
2945        let manager = GpuProfilerManager::new();
2946
2947        // Default should be null profiler (since stubs report unavailable)
2948        assert!(!manager.is_enabled());
2949        assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2950
2951        // Can enable/disable
2952        manager.set_enabled(true);
2953        assert!(manager.is_enabled());
2954        manager.set_enabled(false);
2955        assert!(!manager.is_enabled());
2956    }
2957
2958    #[test]
2959    fn test_profiler_scope() {
2960        let manager = GpuProfilerManager::new();
2961
2962        // Scopes should work even when profiler is not available
2963        {
2964            let _scope = manager.scope("test_scope");
2965            // Scope automatically pops on drop
2966        }
2967
2968        {
2969            let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2970        }
2971
2972        // Mark should also work
2973        manager.mark("test_marker");
2974    }
2975
2976    #[test]
2977    fn test_profiler_with_custom() {
2978        let custom_profiler = Arc::new(NullProfiler);
2979        let manager = GpuProfilerManager::with_profiler(custom_profiler);
2980
2981        assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2982    }
2983
2984    #[test]
2985    fn test_profiler_range_elapsed() {
2986        let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2987        std::thread::sleep(std::time::Duration::from_millis(10));
2988        let elapsed = range.elapsed();
2989        assert!(elapsed.as_millis() >= 10);
2990    }
2991
2992    #[test]
2993    fn test_profiler_error_display() {
2994        let err = ProfilerError::NotAvailable;
2995        assert!(err.to_string().contains("not available"));
2996
2997        let err = ProfilerError::NotAttached;
2998        assert!(err.to_string().contains("not attached"));
2999
3000        let err = ProfilerError::CaptureInProgress;
3001        assert!(err.to_string().contains("in progress"));
3002
3003        let err = ProfilerError::Backend("test error".to_string());
3004        assert!(err.to_string().contains("test error"));
3005    }
3006
3007    // GPU Memory Dashboard tests
3008
3009    #[test]
3010    fn test_gpu_memory_dashboard_creation() {
3011        let dashboard = GpuMemoryDashboard::new();
3012        assert_eq!(dashboard.total_allocated(), 0);
3013        assert_eq!(dashboard.peak_allocated(), 0);
3014        assert_eq!(dashboard.allocation_count(), 0);
3015    }
3016
3017    #[test]
3018    fn test_gpu_memory_allocation_tracking() {
3019        let dashboard = GpuMemoryDashboard::new();
3020
3021        // Track an allocation
3022        dashboard.track_allocation(
3023            1,
3024            "test_buffer",
3025            65536,
3026            GpuMemoryType::DeviceLocal,
3027            0,
3028            Some("test_kernel"),
3029        );
3030
3031        assert_eq!(dashboard.total_allocated(), 65536);
3032        assert_eq!(dashboard.peak_allocated(), 65536);
3033        assert_eq!(dashboard.allocation_count(), 1);
3034
3035        // Track another allocation
3036        dashboard.track_allocation(
3037            2,
3038            "queue_buffer",
3039            1024,
3040            GpuMemoryType::QueueBuffer,
3041            0,
3042            Some("test_kernel"),
3043        );
3044
3045        assert_eq!(dashboard.total_allocated(), 66560);
3046        assert_eq!(dashboard.peak_allocated(), 66560);
3047        assert_eq!(dashboard.allocation_count(), 2);
3048
3049        // Deallocate first buffer
3050        dashboard.track_deallocation(1);
3051        assert_eq!(dashboard.total_allocated(), 1024);
3052        assert_eq!(dashboard.peak_allocated(), 66560); // Peak should remain
3053        assert_eq!(dashboard.allocation_count(), 1);
3054    }
3055
3056    #[test]
3057    fn test_gpu_memory_device_stats() {
3058        let dashboard = GpuMemoryDashboard::new();
3059
3060        // Register a device
3061        dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); // 24 GB
3062
3063        let stats = dashboard.get_device_stats(0).unwrap();
3064        assert_eq!(stats.device_index, 0);
3065        assert_eq!(stats.device_name, "NVIDIA RTX 4090");
3066        assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
3067        assert_eq!(stats.utilization(), 0.0);
3068
3069        // Update device stats
3070        let used = 8 * 1024 * 1024 * 1024; // 8 GB used
3071        let free = 16 * 1024 * 1024 * 1024; // 16 GB free
3072        dashboard.update_device_stats(0, free, used);
3073
3074        let stats = dashboard.get_device_stats(0).unwrap();
3075        assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
3076    }
3077
3078    #[test]
3079    fn test_gpu_memory_pressure_levels() {
3080        let dashboard = GpuMemoryDashboard::new();
3081
3082        // Register a device with 1 GB
3083        dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
3084
3085        // Normal usage (50%)
3086        dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
3087        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
3088
3089        // Warning level (80%)
3090        dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
3091        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
3092
3093        // Critical level (95%)
3094        dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
3095        assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
3096
3097        // OOM
3098        dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
3099        assert_eq!(
3100            dashboard.check_pressure(0),
3101            MemoryPressureLevel::OutOfMemory
3102        );
3103    }
3104
3105    #[test]
3106    fn test_gpu_memory_kernel_allocations() {
3107        let dashboard = GpuMemoryDashboard::new();
3108
3109        // Track allocations for different kernels
3110        dashboard.track_allocation(
3111            1,
3112            "buf1",
3113            1000,
3114            GpuMemoryType::DeviceLocal,
3115            0,
3116            Some("kernel_a"),
3117        );
3118        dashboard.track_allocation(
3119            2,
3120            "buf2",
3121            2000,
3122            GpuMemoryType::DeviceLocal,
3123            0,
3124            Some("kernel_a"),
3125        );
3126        dashboard.track_allocation(
3127            3,
3128            "buf3",
3129            3000,
3130            GpuMemoryType::DeviceLocal,
3131            0,
3132            Some("kernel_b"),
3133        );
3134
3135        let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
3136        assert_eq!(kernel_a_allocs.len(), 2);
3137
3138        let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
3139        assert_eq!(kernel_b_allocs.len(), 1);
3140
3141        let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
3142        assert_eq!(kernel_c_allocs.len(), 0);
3143    }
3144
3145    #[test]
3146    fn test_gpu_memory_prometheus_metrics() {
3147        let dashboard = GpuMemoryDashboard::new();
3148        dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
3149        dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3150
3151        let metrics = dashboard.prometheus_metrics();
3152        assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
3153        assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
3154        assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
3155    }
3156
3157    #[test]
3158    fn test_gpu_memory_summary_report() {
3159        let dashboard = GpuMemoryDashboard::new();
3160        dashboard.track_allocation(
3161            1,
3162            "large_buffer",
3163            1024 * 1024,
3164            GpuMemoryType::DeviceLocal,
3165            0,
3166            None,
3167        );
3168        dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3169
3170        let report = dashboard.summary_report();
3171        assert!(report.contains("GPU Memory Dashboard"));
3172        assert!(report.contains("large_buffer"));
3173    }
3174
3175    #[test]
3176    fn test_gpu_memory_pool_stats() {
3177        let pool_stats = GpuMemoryPoolStats {
3178            name: "default".to_string(),
3179            capacity: 1024 * 1024,
3180            allocated: 512 * 1024,
3181            peak_allocated: 768 * 1024,
3182            allocation_count: 10,
3183            total_allocations: 100,
3184            total_deallocations: 90,
3185            fragmentation: 0.1,
3186        };
3187
3188        assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
3189    }
3190
3191    #[test]
3192    fn test_gpu_memory_types() {
3193        // Ensure all memory types are distinct
3194        let types = [
3195            GpuMemoryType::DeviceLocal,
3196            GpuMemoryType::HostVisible,
3197            GpuMemoryType::HostCoherent,
3198            GpuMemoryType::Mapped,
3199            GpuMemoryType::QueueBuffer,
3200            GpuMemoryType::ControlBlock,
3201            GpuMemoryType::SharedMemory,
3202        ];
3203
3204        for (i, t1) in types.iter().enumerate() {
3205            for (j, t2) in types.iter().enumerate() {
3206                if i != j {
3207                    assert_ne!(t1, t2);
3208                }
3209            }
3210        }
3211    }
3212
3213    #[test]
3214    fn test_gpu_memory_grafana_panel() {
3215        let dashboard = GpuMemoryDashboard::new();
3216        let panel = dashboard.grafana_panel();
3217
3218        assert_eq!(panel.title, "GPU Memory Usage");
3219        assert_eq!(panel.panel_type, PanelType::BarGauge);
3220        assert!(!panel.queries.is_empty());
3221    }
3222
3223    #[test]
3224    fn test_gpu_memory_allocation_id_generation() {
3225        let dashboard = GpuMemoryDashboard::new();
3226
3227        let id1 = dashboard.next_allocation_id();
3228        let id2 = dashboard.next_allocation_id();
3229        let id3 = dashboard.next_allocation_id();
3230
3231        assert_eq!(id1, 1);
3232        assert_eq!(id2, 2);
3233        assert_eq!(id3, 3);
3234    }
3235
3236    // OTLP Exporter tests
3237
3238    #[test]
3239    fn test_otlp_config_default() {
3240        let config = OtlpConfig::default();
3241        assert_eq!(config.endpoint, "http://localhost:4318/v1/traces");
3242        assert_eq!(config.transport, OtlpTransport::HttpJson);
3243        assert_eq!(config.service_name, "ringkernel");
3244        assert_eq!(config.batch_size, 512);
3245    }
3246
3247    #[test]
3248    fn test_otlp_config_builder() {
3249        let config = OtlpConfig::new("http://example.com/v1/traces")
3250            .with_service_name("my-service")
3251            .with_service_version("1.0.0")
3252            .with_instance_id("instance-1")
3253            .with_attribute("env", "production")
3254            .with_batch_size(100);
3255
3256        assert_eq!(config.endpoint, "http://example.com/v1/traces");
3257        assert_eq!(config.service_name, "my-service");
3258        assert_eq!(config.service_version, "1.0.0");
3259        assert_eq!(config.service_instance_id, Some("instance-1".to_string()));
3260        assert_eq!(config.resource_attributes.len(), 1);
3261        assert_eq!(config.batch_size, 100);
3262    }
3263
3264    #[test]
3265    fn test_otlp_config_jaeger() {
3266        let config = OtlpConfig::jaeger("http://jaeger:4318/v1/traces");
3267        assert_eq!(config.endpoint, "http://jaeger:4318/v1/traces");
3268        assert_eq!(config.service_name, "ringkernel");
3269    }
3270
3271    #[test]
3272    fn test_otlp_config_honeycomb() {
3273        let config = OtlpConfig::honeycomb("my-api-key");
3274        assert_eq!(config.endpoint, "https://api.honeycomb.io/v1/traces");
3275        assert_eq!(
3276            config.authorization,
3277            Some("x-honeycomb-team my-api-key".to_string())
3278        );
3279    }
3280
3281    #[test]
3282    fn test_otlp_exporter_creation() {
3283        let exporter = OtlpExporter::new(OtlpConfig::default());
3284        assert_eq!(exporter.buffered_count(), 0);
3285        assert_eq!(exporter.config().service_name, "ringkernel");
3286    }
3287
3288    #[test]
3289    fn test_otlp_exporter_jaeger_local() {
3290        let exporter = OtlpExporter::jaeger_local();
3291        assert_eq!(
3292            exporter.config().endpoint,
3293            "http://localhost:4318/v1/traces"
3294        );
3295    }
3296
3297    #[test]
3298    fn test_otlp_exporter_buffering() {
3299        let config = OtlpConfig::default().with_batch_size(10);
3300        let exporter = OtlpExporter::new(config);
3301
3302        // Create a test span using the constructor
3303        let span = Span::new("test_span", SpanKind::Internal);
3304
3305        // Add spans
3306        for _ in 0..5 {
3307            exporter.export_span(span.clone());
3308        }
3309
3310        assert_eq!(exporter.buffered_count(), 5);
3311    }
3312
3313    #[test]
3314    fn test_otlp_exporter_flush_empty() {
3315        let exporter = OtlpExporter::new(OtlpConfig::default());
3316
3317        let result = exporter.flush();
3318        assert!(result.success);
3319        assert_eq!(result.spans_exported, 0);
3320    }
3321
3322    #[test]
3323    fn test_otlp_exporter_stats() {
3324        let exporter = OtlpExporter::new(OtlpConfig::default());
3325
3326        // Initial stats
3327        let stats = exporter.stats();
3328        assert_eq!(stats.total_exports, 0);
3329        assert_eq!(stats.total_spans_exported, 0);
3330        assert_eq!(stats.buffered_spans, 0);
3331    }
3332
3333    #[test]
3334    fn test_otlp_transport_default() {
3335        let transport = OtlpTransport::default();
3336        assert_eq!(transport, OtlpTransport::HttpJson);
3337    }
3338}