1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48 pub fn new() -> Self {
50 use std::hash::{Hash, Hasher};
51 let mut hasher = std::collections::hash_map::DefaultHasher::new();
52 SystemTime::now().hash(&mut hasher);
53 std::thread::current().id().hash(&mut hasher);
54 let high = hasher.finish() as u128;
55 hasher.write_u64(high as u64);
56 let low = hasher.finish() as u128;
57 Self((high << 64) | low)
58 }
59
60 pub fn from_hex(hex: &str) -> Option<Self> {
62 u128::from_str_radix(hex, 16).ok().map(Self)
63 }
64
65 pub fn to_hex(&self) -> String {
67 format!("{:032x}", self.0)
68 }
69}
70
71impl Default for TraceId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82 pub fn new() -> Self {
84 use std::hash::{Hash, Hasher};
85 let mut hasher = std::collections::hash_map::DefaultHasher::new();
86 SystemTime::now().hash(&mut hasher);
87 std::process::id().hash(&mut hasher);
88 Self(hasher.finish())
89 }
90
91 pub fn from_hex(hex: &str) -> Option<Self> {
93 u64::from_str_radix(hex, 16).ok().map(Self)
94 }
95
96 pub fn to_hex(&self) -> String {
98 format!("{:016x}", self.0)
99 }
100}
101
102impl Default for SpanId {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111 Internal,
113 Server,
115 Client,
117 Producer,
119 Consumer,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126 Unset,
128 Ok,
130 Error {
132 message: String,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Span {
140 pub trace_id: TraceId,
142 pub span_id: SpanId,
144 pub parent_span_id: Option<SpanId>,
146 pub name: String,
148 pub kind: SpanKind,
150 pub start_time: Instant,
152 pub end_time: Option<Instant>,
154 pub status: SpanStatus,
156 pub attributes: HashMap<String, AttributeValue>,
158 pub events: Vec<SpanEvent>,
160}
161
162#[derive(Debug, Clone)]
164pub enum AttributeValue {
165 String(String),
167 Int(i64),
169 Float(f64),
171 Bool(bool),
173 StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178 fn from(s: &str) -> Self {
179 Self::String(s.to_string())
180 }
181}
182
183impl From<String> for AttributeValue {
184 fn from(s: String) -> Self {
185 Self::String(s)
186 }
187}
188
189impl From<i64> for AttributeValue {
190 fn from(i: i64) -> Self {
191 Self::Int(i)
192 }
193}
194
195impl From<f64> for AttributeValue {
196 fn from(f: f64) -> Self {
197 Self::Float(f)
198 }
199}
200
201impl From<bool> for AttributeValue {
202 fn from(b: bool) -> Self {
203 Self::Bool(b)
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct SpanEvent {
210 pub name: String,
212 pub timestamp: Instant,
214 pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219 pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221 Self {
222 trace_id: TraceId::new(),
223 span_id: SpanId::new(),
224 parent_span_id: None,
225 name: name.into(),
226 kind,
227 start_time: Instant::now(),
228 end_time: None,
229 status: SpanStatus::Unset,
230 attributes: HashMap::new(),
231 events: Vec::new(),
232 }
233 }
234
235 pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237 Self {
238 trace_id: self.trace_id,
239 span_id: SpanId::new(),
240 parent_span_id: Some(self.span_id),
241 name: name.into(),
242 kind,
243 start_time: Instant::now(),
244 end_time: None,
245 status: SpanStatus::Unset,
246 attributes: HashMap::new(),
247 events: Vec::new(),
248 }
249 }
250
251 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253 self.attributes.insert(key.into(), value.into());
254 }
255
256 pub fn add_event(&mut self, name: impl Into<String>) {
258 self.events.push(SpanEvent {
259 name: name.into(),
260 timestamp: Instant::now(),
261 attributes: HashMap::new(),
262 });
263 }
264
265 pub fn add_event_with_attributes(
267 &mut self,
268 name: impl Into<String>,
269 attributes: HashMap<String, AttributeValue>,
270 ) {
271 self.events.push(SpanEvent {
272 name: name.into(),
273 timestamp: Instant::now(),
274 attributes,
275 });
276 }
277
278 pub fn set_ok(&mut self) {
280 self.status = SpanStatus::Ok;
281 }
282
283 pub fn set_error(&mut self, message: impl Into<String>) {
285 self.status = SpanStatus::Error {
286 message: message.into(),
287 };
288 }
289
290 pub fn end(&mut self) {
292 self.end_time = Some(Instant::now());
293 }
294
295 pub fn duration(&self) -> Duration {
297 self.end_time
298 .unwrap_or_else(Instant::now)
299 .duration_since(self.start_time)
300 }
301
302 pub fn is_ended(&self) -> bool {
304 self.end_time.is_some()
305 }
306}
307
308pub struct SpanBuilder {
314 name: String,
315 kind: SpanKind,
316 parent: Option<(TraceId, SpanId)>,
317 attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321 pub fn new(name: impl Into<String>) -> Self {
323 Self {
324 name: name.into(),
325 kind: SpanKind::Internal,
326 parent: None,
327 attributes: HashMap::new(),
328 }
329 }
330
331 pub fn kind(mut self, kind: SpanKind) -> Self {
333 self.kind = kind;
334 self
335 }
336
337 pub fn parent(mut self, parent: &Span) -> Self {
339 self.parent = Some((parent.trace_id, parent.span_id));
340 self
341 }
342
343 pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345 self.attributes.insert(key.into(), value.into());
346 self
347 }
348
349 pub fn build(self) -> Span {
351 let mut span = Span::new(self.name, self.kind);
352 if let Some((trace_id, parent_id)) = self.parent {
353 span.trace_id = trace_id;
354 span.parent_span_id = Some(parent_id);
355 }
356 span.attributes = self.attributes;
357 span
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368 Counter,
370 Gauge,
372 Histogram,
374 Summary,
376}
377
378#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381 pub name: String,
383 pub metric_type: MetricType,
385 pub help: String,
387 pub labels: Vec<String>,
389}
390
391#[derive(Debug, Clone)]
393pub struct MetricSample {
394 pub name: String,
396 pub label_values: Vec<String>,
398 pub value: f64,
400 pub timestamp_ms: Option<u64>,
402}
403
404pub struct PrometheusExporter {
406 definitions: RwLock<Vec<MetricDefinition>>,
408 collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410 custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412 export_count: AtomicU64,
414}
415
416struct CustomMetric {
418 definition: MetricDefinition,
419 samples: Vec<MetricSample>,
420}
421
422pub trait PrometheusCollector: Send + Sync {
424 fn definitions(&self) -> Vec<MetricDefinition>;
426
427 fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432 pub fn new() -> Arc<Self> {
434 Arc::new(Self {
435 definitions: RwLock::new(Vec::new()),
436 collectors: RwLock::new(Vec::new()),
437 custom_metrics: RwLock::new(HashMap::new()),
438 export_count: AtomicU64::new(0),
439 })
440 }
441
442 pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444 let defs = collector.definitions();
445 self.definitions.write().extend(defs);
446 self.collectors.write().push(collector);
447 }
448
449 pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451 let def = MetricDefinition {
452 name: name.to_string(),
453 metric_type: MetricType::Counter,
454 help: help.to_string(),
455 labels: labels.iter().map(|s| s.to_string()).collect(),
456 };
457 self.custom_metrics.write().insert(
458 name.to_string(),
459 CustomMetric {
460 definition: def,
461 samples: Vec::new(),
462 },
463 );
464 }
465
466 pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468 let def = MetricDefinition {
469 name: name.to_string(),
470 metric_type: MetricType::Gauge,
471 help: help.to_string(),
472 labels: labels.iter().map(|s| s.to_string()).collect(),
473 };
474 self.custom_metrics.write().insert(
475 name.to_string(),
476 CustomMetric {
477 definition: def,
478 samples: Vec::new(),
479 },
480 );
481 }
482
483 pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485 let def = MetricDefinition {
486 name: name.to_string(),
487 metric_type: MetricType::Histogram,
488 help: help.to_string(),
489 labels: labels.iter().map(|s| s.to_string()).collect(),
490 };
491 self.custom_metrics.write().insert(
492 name.to_string(),
493 CustomMetric {
494 definition: def,
495 samples: Vec::new(),
496 },
497 );
498 }
499
500 pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502 let mut metrics = self.custom_metrics.write();
503 if let Some(metric) = metrics.get_mut(name) {
504 let sample = MetricSample {
505 name: name.to_string(),
506 label_values: label_values.iter().map(|s| s.to_string()).collect(),
507 value,
508 timestamp_ms: None,
509 };
510 let existing = metric
512 .samples
513 .iter_mut()
514 .find(|s| s.label_values == sample.label_values);
515 if let Some(existing) = existing {
516 existing.value = value;
517 } else {
518 metric.samples.push(sample);
519 }
520 }
521 }
522
523 pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525 self.add_counter(name, 1.0, label_values);
526 }
527
528 pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530 let mut metrics = self.custom_metrics.write();
531 if let Some(metric) = metrics.get_mut(name) {
532 let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533 let existing = metric
534 .samples
535 .iter_mut()
536 .find(|s| s.label_values == label_vec);
537 if let Some(existing) = existing {
538 existing.value += delta;
539 } else {
540 metric.samples.push(MetricSample {
541 name: name.to_string(),
542 label_values: label_vec,
543 value: delta,
544 timestamp_ms: None,
545 });
546 }
547 }
548 }
549
550 pub fn render(&self) -> String {
552 self.export_count.fetch_add(1, Ordering::Relaxed);
553
554 let mut output = String::new();
555
556 let collectors = self.collectors.read();
558 for collector in collectors.iter() {
559 let defs = collector.definitions();
560 let samples = collector.collect();
561
562 for def in &defs {
563 writeln!(output, "# HELP {} {}", def.name, def.help).unwrap();
565 writeln!(
566 output,
567 "# TYPE {} {}",
568 def.name,
569 match def.metric_type {
570 MetricType::Counter => "counter",
571 MetricType::Gauge => "gauge",
572 MetricType::Histogram => "histogram",
573 MetricType::Summary => "summary",
574 }
575 )
576 .unwrap();
577
578 for sample in samples.iter().filter(|s| s.name == def.name) {
580 Self::write_sample(&mut output, &def.labels, sample);
581 }
582 }
583 }
584
585 let custom = self.custom_metrics.read();
587 for metric in custom.values() {
588 writeln!(
589 output,
590 "# HELP {} {}",
591 metric.definition.name, metric.definition.help
592 )
593 .unwrap();
594 writeln!(
595 output,
596 "# TYPE {} {}",
597 metric.definition.name,
598 match metric.definition.metric_type {
599 MetricType::Counter => "counter",
600 MetricType::Gauge => "gauge",
601 MetricType::Histogram => "histogram",
602 MetricType::Summary => "summary",
603 }
604 )
605 .unwrap();
606
607 for sample in &metric.samples {
608 Self::write_sample(&mut output, &metric.definition.labels, sample);
609 }
610 }
611
612 output
613 }
614
615 fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
616 if labels.is_empty() || sample.label_values.is_empty() {
617 writeln!(output, "{} {}", sample.name, sample.value).unwrap();
618 } else {
619 let label_pairs: Vec<String> = labels
620 .iter()
621 .zip(sample.label_values.iter())
622 .map(|(k, v)| format!("{}=\"{}\"", k, v))
623 .collect();
624 writeln!(
625 output,
626 "{}{{{}}} {}",
627 sample.name,
628 label_pairs.join(","),
629 sample.value
630 )
631 .unwrap();
632 }
633 }
634
635 pub fn export_count(&self) -> u64 {
637 self.export_count.load(Ordering::Relaxed)
638 }
639}
640
641impl Default for PrometheusExporter {
642 fn default() -> Self {
643 Self {
644 definitions: RwLock::new(Vec::new()),
645 collectors: RwLock::new(Vec::new()),
646 custom_metrics: RwLock::new(HashMap::new()),
647 export_count: AtomicU64::new(0),
648 }
649 }
650}
651
652pub struct RingKernelCollector {
658 collector: Arc<MetricsCollector>,
660}
661
662impl RingKernelCollector {
663 pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
665 Arc::new(Self { collector })
666 }
667}
668
669impl PrometheusCollector for RingKernelCollector {
670 fn definitions(&self) -> Vec<MetricDefinition> {
671 vec![
672 MetricDefinition {
673 name: "ringkernel_messages_processed_total".to_string(),
674 metric_type: MetricType::Counter,
675 help: "Total number of messages processed by kernels".to_string(),
676 labels: vec!["kernel_id".to_string()],
677 },
678 MetricDefinition {
679 name: "ringkernel_messages_dropped_total".to_string(),
680 metric_type: MetricType::Counter,
681 help: "Total number of messages dropped by kernels".to_string(),
682 labels: vec!["kernel_id".to_string()],
683 },
684 MetricDefinition {
685 name: "ringkernel_latency_us".to_string(),
686 metric_type: MetricType::Gauge,
687 help: "Current average message latency in microseconds".to_string(),
688 labels: vec!["kernel_id".to_string(), "stat".to_string()],
689 },
690 MetricDefinition {
691 name: "ringkernel_throughput".to_string(),
692 metric_type: MetricType::Gauge,
693 help: "Current message throughput per second".to_string(),
694 labels: vec!["kernel_id".to_string()],
695 },
696 ]
697 }
698
699 fn collect(&self) -> Vec<MetricSample> {
700 let aggregate = self.collector.get_aggregate();
701 let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
702
703 vec![
704 MetricSample {
705 name: "ringkernel_messages_processed_total".to_string(),
706 label_values: vec!["aggregate".to_string()],
707 value: aggregate.messages_processed as f64,
708 timestamp_ms: None,
709 },
710 MetricSample {
711 name: "ringkernel_messages_dropped_total".to_string(),
712 label_values: vec!["aggregate".to_string()],
713 value: aggregate.messages_dropped as f64,
714 timestamp_ms: None,
715 },
716 MetricSample {
717 name: "ringkernel_latency_us".to_string(),
718 label_values: vec!["aggregate".to_string(), "avg".to_string()],
719 value: aggregate.avg_latency_us(),
720 timestamp_ms: None,
721 },
722 MetricSample {
723 name: "ringkernel_latency_us".to_string(),
724 label_values: vec!["aggregate".to_string(), "min".to_string()],
725 value: aggregate.min_latency_us as f64,
726 timestamp_ms: None,
727 },
728 MetricSample {
729 name: "ringkernel_latency_us".to_string(),
730 label_values: vec!["aggregate".to_string(), "max".to_string()],
731 value: aggregate.max_latency_us as f64,
732 timestamp_ms: None,
733 },
734 MetricSample {
735 name: "ringkernel_throughput".to_string(),
736 label_values: vec!["aggregate".to_string()],
737 value: aggregate.messages_processed as f64 / elapsed,
738 timestamp_ms: None,
739 },
740 ]
741 }
742}
743
744#[derive(Debug, Clone, Copy, PartialEq, Eq)]
750pub enum PanelType {
751 Graph,
753 Stat,
755 Table,
757 Heatmap,
759 BarGauge,
761}
762
763#[derive(Debug, Clone)]
765pub struct GrafanaPanel {
766 pub title: String,
768 pub panel_type: PanelType,
770 pub queries: Vec<String>,
772 pub grid_pos: (u32, u32, u32, u32), pub unit: Option<String>,
776}
777
778pub struct GrafanaDashboard {
780 title: String,
782 description: String,
784 panels: Vec<GrafanaPanel>,
786 refresh: String,
788 time_from: String,
790 tags: Vec<String>,
792}
793
794impl GrafanaDashboard {
795 pub fn new(title: impl Into<String>) -> Self {
797 Self {
798 title: title.into(),
799 description: String::new(),
800 panels: Vec::new(),
801 refresh: "5s".to_string(),
802 time_from: "now-1h".to_string(),
803 tags: vec!["ringkernel".to_string()],
804 }
805 }
806
807 pub fn description(mut self, desc: impl Into<String>) -> Self {
809 self.description = desc.into();
810 self
811 }
812
813 pub fn refresh(mut self, interval: impl Into<String>) -> Self {
815 self.refresh = interval.into();
816 self
817 }
818
819 pub fn time_from(mut self, from: impl Into<String>) -> Self {
821 self.time_from = from.into();
822 self
823 }
824
825 pub fn tag(mut self, tag: impl Into<String>) -> Self {
827 self.tags.push(tag.into());
828 self
829 }
830
831 pub fn panel(mut self, panel: GrafanaPanel) -> Self {
833 self.panels.push(panel);
834 self
835 }
836
837 pub fn add_throughput_panel(mut self) -> Self {
839 self.panels.push(GrafanaPanel {
840 title: "Message Throughput".to_string(),
841 panel_type: PanelType::Graph,
842 queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
843 grid_pos: (0, 0, 12, 8),
844 unit: Some("msg/s".to_string()),
845 });
846 self
847 }
848
849 pub fn add_latency_panel(mut self) -> Self {
851 self.panels.push(GrafanaPanel {
852 title: "Message Latency".to_string(),
853 panel_type: PanelType::Graph,
854 queries: vec![
855 "ringkernel_latency_us{stat=\"avg\"}".to_string(),
856 "ringkernel_latency_us{stat=\"max\"}".to_string(),
857 ],
858 grid_pos: (12, 0, 12, 8),
859 unit: Some("µs".to_string()),
860 });
861 self
862 }
863
864 pub fn add_kernel_status_panel(mut self) -> Self {
866 self.panels.push(GrafanaPanel {
867 title: "Active Kernels".to_string(),
868 panel_type: PanelType::Stat,
869 queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
870 grid_pos: (0, 8, 6, 4),
871 unit: None,
872 });
873 self
874 }
875
876 pub fn add_drop_rate_panel(mut self) -> Self {
878 self.panels.push(GrafanaPanel {
879 title: "Message Drop Rate".to_string(),
880 panel_type: PanelType::Graph,
881 queries: vec![
882 "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
883 ],
884 grid_pos: (6, 8, 6, 4),
885 unit: Some("percentunit".to_string()),
886 });
887 self
888 }
889
890 pub fn add_multi_gpu_panel(mut self) -> Self {
892 self.panels.push(GrafanaPanel {
893 title: "GPU Memory Usage".to_string(),
894 panel_type: PanelType::BarGauge,
895 queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
896 grid_pos: (12, 8, 12, 4),
897 unit: Some("bytes".to_string()),
898 });
899 self
900 }
901
902 pub fn add_standard_panels(self) -> Self {
904 self.add_throughput_panel()
905 .add_latency_panel()
906 .add_kernel_status_panel()
907 .add_drop_rate_panel()
908 .add_multi_gpu_panel()
909 }
910
911 pub fn build(&self) -> String {
913 let panels_json: Vec<String> = self
914 .panels
915 .iter()
916 .enumerate()
917 .map(|(i, panel)| {
918 let queries_json: Vec<String> = panel
919 .queries
920 .iter()
921 .enumerate()
922 .map(|(j, q)| {
923 format!(
924 r#"{{
925 "expr": "{}",
926 "refId": "{}",
927 "legendFormat": "{{}}"
928 }}"#,
929 q,
930 (b'A' + j as u8) as char
931 )
932 })
933 .collect();
934
935 let unit_field = panel
936 .unit
937 .as_ref()
938 .map(|u| format!(r#""unit": "{}","#, u))
939 .unwrap_or_default();
940
941 format!(
942 r#"{{
943 "id": {},
944 "title": "{}",
945 "type": "{}",
946 "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
947 {}
948 "targets": [{}],
949 "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
950 }}"#,
951 i + 1,
952 panel.title,
953 match panel.panel_type {
954 PanelType::Graph => "timeseries",
955 PanelType::Stat => "stat",
956 PanelType::Table => "table",
957 PanelType::Heatmap => "heatmap",
958 PanelType::BarGauge => "bargauge",
959 },
960 panel.grid_pos.0,
961 panel.grid_pos.1,
962 panel.grid_pos.2,
963 panel.grid_pos.3,
964 unit_field,
965 queries_json.join(",")
966 )
967 })
968 .collect();
969
970 let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
971
972 format!(
973 r#"{{
974 "title": "{}",
975 "description": "{}",
976 "tags": [{}],
977 "refresh": "{}",
978 "time": {{"from": "{}", "to": "now"}},
979 "templating": {{
980 "list": [
981 {{
982 "name": "datasource",
983 "type": "datasource",
984 "query": "prometheus"
985 }},
986 {{
987 "name": "kernel_id",
988 "type": "query",
989 "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
990 "multi": true,
991 "includeAll": true
992 }}
993 ]
994 }},
995 "panels": [{}]
996 }}"#,
997 self.title,
998 self.description,
999 tags_json.join(","),
1000 self.refresh,
1001 self.time_from,
1002 panels_json.join(",")
1003 )
1004 }
1005}
1006
1007pub struct ObservabilityContext {
1013 active_spans: RwLock<HashMap<SpanId, Span>>,
1015 completed_spans: RwLock<Vec<Span>>,
1017 max_completed: usize,
1019 prometheus: Arc<PrometheusExporter>,
1021}
1022
1023impl ObservabilityContext {
1024 pub fn new() -> Arc<Self> {
1026 Arc::new(Self {
1027 active_spans: RwLock::new(HashMap::new()),
1028 completed_spans: RwLock::new(Vec::new()),
1029 max_completed: 10000,
1030 prometheus: PrometheusExporter::new(),
1031 })
1032 }
1033
1034 pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1036 let span = Span::new(name, kind);
1037 self.active_spans.write().insert(span.span_id, span.clone());
1038 span
1039 }
1040
1041 pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1043 let span = parent.child(name, kind);
1044 self.active_spans.write().insert(span.span_id, span.clone());
1045 span
1046 }
1047
1048 pub fn end_span(&self, mut span: Span) {
1050 span.end();
1051 self.active_spans.write().remove(&span.span_id);
1052
1053 let mut completed = self.completed_spans.write();
1054 completed.push(span);
1055 if completed.len() > self.max_completed {
1056 completed.remove(0);
1057 }
1058 }
1059
1060 pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1062 &self.prometheus
1063 }
1064
1065 pub fn export_spans(&self) -> Vec<Span> {
1067 self.completed_spans.write().drain(..).collect()
1068 }
1069
1070 pub fn active_span_count(&self) -> usize {
1072 self.active_spans.read().len()
1073 }
1074}
1075
1076impl Default for ObservabilityContext {
1077 fn default() -> Self {
1078 Self {
1079 active_spans: RwLock::new(HashMap::new()),
1080 completed_spans: RwLock::new(Vec::new()),
1081 max_completed: 10000,
1082 prometheus: PrometheusExporter::new(),
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1093pub enum GpuProfilerBackend {
1094 Nsight,
1096 RenderDoc,
1098 Pix,
1100 MetalSystemTrace,
1102 Rgp,
1104 Custom,
1106}
1107
1108#[derive(Debug, Clone, Copy)]
1110pub struct ProfilerColor {
1111 pub r: u8,
1113 pub g: u8,
1115 pub b: u8,
1117 pub a: u8,
1119}
1120
1121impl ProfilerColor {
1122 pub const fn new(r: u8, g: u8, b: u8) -> Self {
1124 Self { r, g, b, a: 255 }
1125 }
1126
1127 pub const RED: Self = Self::new(255, 0, 0);
1129 pub const GREEN: Self = Self::new(0, 255, 0);
1131 pub const BLUE: Self = Self::new(0, 0, 255);
1133 pub const YELLOW: Self = Self::new(255, 255, 0);
1135 pub const CYAN: Self = Self::new(0, 255, 255);
1137 pub const MAGENTA: Self = Self::new(255, 0, 255);
1139 pub const ORANGE: Self = Self::new(255, 165, 0);
1141}
1142
1143pub struct ProfilerRange {
1145 #[allow(dead_code)]
1147 name: String,
1148 #[allow(dead_code)]
1150 backend: GpuProfilerBackend,
1151 start: Instant,
1153}
1154
1155impl ProfilerRange {
1156 fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1158 Self {
1159 name: name.into(),
1160 backend,
1161 start: Instant::now(),
1162 }
1163 }
1164
1165 pub fn elapsed(&self) -> Duration {
1167 self.start.elapsed()
1168 }
1169}
1170
1171impl Drop for ProfilerRange {
1172 fn drop(&mut self) {
1173 }
1176}
1177
1178pub trait GpuProfiler: Send + Sync {
1183 fn is_available(&self) -> bool {
1185 false
1186 }
1187
1188 fn backend(&self) -> GpuProfilerBackend;
1190
1191 fn start_capture(&self) -> Result<(), ProfilerError> {
1193 Ok(())
1194 }
1195
1196 fn end_capture(&self) -> Result<(), ProfilerError> {
1198 Ok(())
1199 }
1200
1201 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1203 Ok(())
1204 }
1205
1206 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1208 ProfilerRange::new(name, self.backend())
1209 }
1210
1211 fn pop_range(&self) {}
1213
1214 fn mark(&self, _name: &str, _color: ProfilerColor) {}
1216
1217 fn set_thread_name(&self, _name: &str) {}
1219
1220 fn message(&self, _text: &str) {}
1222
1223 fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1225
1226 fn unregister_allocation(&self, _ptr: u64) {}
1228}
1229
1230#[derive(Debug, Clone, thiserror::Error)]
1232pub enum ProfilerError {
1233 #[error("GPU profiler not available")]
1235 NotAvailable,
1236 #[error("GPU profiler not attached")]
1238 NotAttached,
1239 #[error("Capture already in progress")]
1241 CaptureInProgress,
1242 #[error("No capture in progress")]
1244 NoCaptureInProgress,
1245 #[error("Profiler error: {0}")]
1247 Backend(String),
1248}
1249
1250pub struct NullProfiler;
1252
1253impl GpuProfiler for NullProfiler {
1254 fn backend(&self) -> GpuProfilerBackend {
1255 GpuProfilerBackend::Custom
1256 }
1257}
1258
1259pub struct NvtxProfiler {
1264 available: bool,
1266 capture_in_progress: std::sync::atomic::AtomicBool,
1268}
1269
1270impl NvtxProfiler {
1271 pub fn new() -> Self {
1275 Self {
1276 available: false, capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1278 }
1279 }
1280
1281 pub fn is_nvtx_loaded(&self) -> bool {
1283 self.available
1285 }
1286}
1287
1288impl Default for NvtxProfiler {
1289 fn default() -> Self {
1290 Self::new()
1291 }
1292}
1293
1294impl GpuProfiler for NvtxProfiler {
1295 fn is_available(&self) -> bool {
1296 self.available
1297 }
1298
1299 fn backend(&self) -> GpuProfilerBackend {
1300 GpuProfilerBackend::Nsight
1301 }
1302
1303 fn start_capture(&self) -> Result<(), ProfilerError> {
1304 if !self.available {
1305 return Err(ProfilerError::NotAvailable);
1306 }
1307 if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1308 return Err(ProfilerError::CaptureInProgress);
1309 }
1310 Ok(())
1312 }
1313
1314 fn end_capture(&self) -> Result<(), ProfilerError> {
1315 if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1316 return Err(ProfilerError::NoCaptureInProgress);
1317 }
1318 Ok(())
1320 }
1321
1322 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1323 ProfilerRange::new(name, self.backend())
1325 }
1326
1327 fn pop_range(&self) {
1328 }
1330
1331 fn mark(&self, _name: &str, _color: ProfilerColor) {
1332 }
1334
1335 fn set_thread_name(&self, _name: &str) {
1336 }
1338}
1339
1340pub struct RenderDocProfiler {
1344 attached: bool,
1346}
1347
1348impl RenderDocProfiler {
1349 pub fn new() -> Self {
1353 Self {
1354 attached: false, }
1356 }
1357
1358 pub fn is_attached(&self) -> bool {
1360 self.attached
1362 }
1363
1364 pub fn get_capture_path(&self) -> Option<String> {
1366 None
1368 }
1369
1370 pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1372 if !self.attached {
1373 return Err(ProfilerError::NotAttached);
1374 }
1375 Ok(())
1377 }
1378}
1379
1380impl Default for RenderDocProfiler {
1381 fn default() -> Self {
1382 Self::new()
1383 }
1384}
1385
1386impl GpuProfiler for RenderDocProfiler {
1387 fn is_available(&self) -> bool {
1388 self.attached
1389 }
1390
1391 fn backend(&self) -> GpuProfilerBackend {
1392 GpuProfilerBackend::RenderDoc
1393 }
1394
1395 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1396 if !self.attached {
1397 return Err(ProfilerError::NotAttached);
1398 }
1399 Ok(())
1401 }
1402
1403 fn start_capture(&self) -> Result<(), ProfilerError> {
1404 if !self.attached {
1405 return Err(ProfilerError::NotAttached);
1406 }
1407 Ok(())
1409 }
1410
1411 fn end_capture(&self) -> Result<(), ProfilerError> {
1412 Ok(())
1414 }
1415
1416 fn set_thread_name(&self, _name: &str) {
1417 }
1419}
1420
1421#[cfg(target_os = "macos")]
1425pub struct MetalProfiler {
1426 available: bool,
1428}
1429
1430#[cfg(target_os = "macos")]
1431impl MetalProfiler {
1432 pub fn new() -> Self {
1434 Self { available: true }
1435 }
1436}
1437
1438#[cfg(target_os = "macos")]
1439impl Default for MetalProfiler {
1440 fn default() -> Self {
1441 Self::new()
1442 }
1443}
1444
1445#[cfg(target_os = "macos")]
1446impl GpuProfiler for MetalProfiler {
1447 fn is_available(&self) -> bool {
1448 self.available
1449 }
1450
1451 fn backend(&self) -> GpuProfilerBackend {
1452 GpuProfilerBackend::MetalSystemTrace
1453 }
1454
1455 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1456 ProfilerRange::new(name, self.backend())
1458 }
1459
1460 fn pop_range(&self) {
1461 }
1463
1464 fn mark(&self, _name: &str, _color: ProfilerColor) {
1465 }
1467}
1468
1469pub struct GpuProfilerManager {
1471 profiler: Arc<dyn GpuProfiler>,
1473 enabled: std::sync::atomic::AtomicBool,
1475}
1476
1477impl GpuProfilerManager {
1478 pub fn new() -> Self {
1480 let nvtx = NvtxProfiler::new();
1482 if nvtx.is_available() {
1483 return Self {
1484 profiler: Arc::new(nvtx),
1485 enabled: std::sync::atomic::AtomicBool::new(true),
1486 };
1487 }
1488
1489 let renderdoc = RenderDocProfiler::new();
1490 if renderdoc.is_available() {
1491 return Self {
1492 profiler: Arc::new(renderdoc),
1493 enabled: std::sync::atomic::AtomicBool::new(true),
1494 };
1495 }
1496
1497 Self {
1499 profiler: Arc::new(NullProfiler),
1500 enabled: std::sync::atomic::AtomicBool::new(false),
1501 }
1502 }
1503
1504 pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1506 let enabled = profiler.is_available();
1507 Self {
1508 profiler,
1509 enabled: std::sync::atomic::AtomicBool::new(enabled),
1510 }
1511 }
1512
1513 pub fn is_enabled(&self) -> bool {
1515 self.enabled.load(Ordering::Relaxed)
1516 }
1517
1518 pub fn set_enabled(&self, enabled: bool) {
1520 self.enabled.store(enabled, Ordering::Relaxed);
1521 }
1522
1523 pub fn backend(&self) -> GpuProfilerBackend {
1525 self.profiler.backend()
1526 }
1527
1528 pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1530 ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1531 }
1532
1533 pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1535 ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1536 }
1537
1538 pub fn mark(&self, name: &str) {
1540 if self.is_enabled() {
1541 self.profiler.mark(name, ProfilerColor::CYAN);
1542 }
1543 }
1544
1545 pub fn profiler(&self) -> &dyn GpuProfiler {
1547 &*self.profiler
1548 }
1549}
1550
1551impl Default for GpuProfilerManager {
1552 fn default() -> Self {
1553 Self::new()
1554 }
1555}
1556
1557pub struct ProfilerScope<'a> {
1559 profiler: &'a dyn GpuProfiler,
1560 enabled: bool,
1561}
1562
1563impl<'a> ProfilerScope<'a> {
1564 fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1565 if enabled {
1566 profiler.push_range(name, ProfilerColor::CYAN);
1567 }
1568 Self { profiler, enabled }
1569 }
1570
1571 fn new_colored(
1572 name: &str,
1573 profiler: &'a dyn GpuProfiler,
1574 enabled: bool,
1575 color: ProfilerColor,
1576 ) -> Self {
1577 if enabled {
1578 profiler.push_range(name, color);
1579 }
1580 Self { profiler, enabled }
1581 }
1582}
1583
1584impl<'a> Drop for ProfilerScope<'a> {
1585 fn drop(&mut self) {
1586 if self.enabled {
1587 self.profiler.pop_range();
1588 }
1589 }
1590}
1591
1592#[macro_export]
1606macro_rules! gpu_profile {
1607 ($profiler:expr, $name:expr) => {
1608 let _scope = $profiler.scope($name);
1609 };
1610 ($profiler:expr, $name:expr, $color:expr) => {
1611 let _scope = $profiler.scope_colored($name, $color);
1612 };
1613}
1614
1615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1621pub enum GpuMemoryType {
1622 DeviceLocal,
1624 HostVisible,
1626 HostCoherent,
1628 Mapped,
1630 QueueBuffer,
1632 ControlBlock,
1634 SharedMemory,
1636}
1637
1638#[derive(Debug, Clone)]
1640pub struct GpuMemoryAllocation {
1641 pub id: u64,
1643 pub name: String,
1645 pub size: usize,
1647 pub memory_type: GpuMemoryType,
1649 pub device_index: u32,
1651 pub kernel_id: Option<String>,
1653 pub allocated_at: Instant,
1655 pub in_use: bool,
1657}
1658
1659#[derive(Debug, Clone, Default)]
1661pub struct GpuMemoryPoolStats {
1662 pub name: String,
1664 pub capacity: usize,
1666 pub allocated: usize,
1668 pub peak_allocated: usize,
1670 pub allocation_count: u32,
1672 pub total_allocations: u64,
1674 pub total_deallocations: u64,
1676 pub fragmentation: f32,
1678}
1679
1680impl GpuMemoryPoolStats {
1681 pub fn utilization(&self) -> f32 {
1683 if self.capacity == 0 {
1684 0.0
1685 } else {
1686 (self.allocated as f32 / self.capacity as f32) * 100.0
1687 }
1688 }
1689}
1690
1691#[derive(Debug, Clone, Default)]
1693pub struct GpuDeviceMemoryStats {
1694 pub device_index: u32,
1696 pub device_name: String,
1698 pub total_memory: u64,
1700 pub free_memory: u64,
1702 pub ringkernel_used: u64,
1704 pub other_used: u64,
1706 pub pools: Vec<GpuMemoryPoolStats>,
1708}
1709
1710impl GpuDeviceMemoryStats {
1711 pub fn used_memory(&self) -> u64 {
1713 self.total_memory - self.free_memory
1714 }
1715
1716 pub fn utilization(&self) -> f32 {
1718 if self.total_memory == 0 {
1719 0.0
1720 } else {
1721 (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1722 }
1723 }
1724}
1725
1726pub struct GpuMemoryDashboard {
1756 allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1758 device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1760 thresholds: GpuMemoryThresholds,
1762 allocation_counter: AtomicU64,
1764 total_allocated: AtomicU64,
1766 peak_allocated: AtomicU64,
1768}
1769
1770#[derive(Debug, Clone)]
1772pub struct GpuMemoryThresholds {
1773 pub warning: f32,
1775 pub critical: f32,
1777 pub max_allocation_size: usize,
1779 pub max_allocation_count: u32,
1781}
1782
1783impl Default for GpuMemoryThresholds {
1784 fn default() -> Self {
1785 Self {
1786 warning: 75.0,
1787 critical: 90.0,
1788 max_allocation_size: 1024 * 1024 * 1024, max_allocation_count: 10000,
1790 }
1791 }
1792}
1793
1794#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1796pub enum MemoryPressureLevel {
1797 Normal,
1799 Elevated,
1801 Warning,
1803 Critical,
1805 OutOfMemory,
1807}
1808
1809impl GpuMemoryDashboard {
1810 pub fn new() -> Arc<Self> {
1812 Arc::new(Self {
1813 allocations: RwLock::new(HashMap::new()),
1814 device_stats: RwLock::new(HashMap::new()),
1815 thresholds: GpuMemoryThresholds::default(),
1816 allocation_counter: AtomicU64::new(1),
1817 total_allocated: AtomicU64::new(0),
1818 peak_allocated: AtomicU64::new(0),
1819 })
1820 }
1821
1822 pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1824 Arc::new(Self {
1825 allocations: RwLock::new(HashMap::new()),
1826 device_stats: RwLock::new(HashMap::new()),
1827 thresholds,
1828 allocation_counter: AtomicU64::new(1),
1829 total_allocated: AtomicU64::new(0),
1830 peak_allocated: AtomicU64::new(0),
1831 })
1832 }
1833
1834 pub fn track_allocation(
1836 &self,
1837 id: u64,
1838 name: impl Into<String>,
1839 size: usize,
1840 memory_type: GpuMemoryType,
1841 device_index: u32,
1842 kernel_id: Option<&str>,
1843 ) {
1844 let allocation = GpuMemoryAllocation {
1845 id,
1846 name: name.into(),
1847 size,
1848 memory_type,
1849 device_index,
1850 kernel_id: kernel_id.map(String::from),
1851 allocated_at: Instant::now(),
1852 in_use: true,
1853 };
1854
1855 self.allocations.write().insert(id, allocation);
1856
1857 let new_total = self
1859 .total_allocated
1860 .fetch_add(size as u64, Ordering::Relaxed)
1861 + size as u64;
1862 let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1863 while new_total > peak {
1864 match self.peak_allocated.compare_exchange_weak(
1865 peak,
1866 new_total,
1867 Ordering::Relaxed,
1868 Ordering::Relaxed,
1869 ) {
1870 Ok(_) => break,
1871 Err(current) => peak = current,
1872 }
1873 }
1874 }
1875
1876 pub fn next_allocation_id(&self) -> u64 {
1878 self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1879 }
1880
1881 pub fn track_deallocation(&self, id: u64) {
1883 let mut allocations = self.allocations.write();
1884 if let Some(alloc) = allocations.remove(&id) {
1885 self.total_allocated
1886 .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1887 }
1888 }
1889
1890 pub fn mark_unused(&self, id: u64) {
1892 let mut allocations = self.allocations.write();
1893 if let Some(alloc) = allocations.get_mut(&id) {
1894 alloc.in_use = false;
1895 }
1896 }
1897
1898 pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1900 let stats = GpuDeviceMemoryStats {
1901 device_index,
1902 device_name: name.into(),
1903 total_memory,
1904 free_memory: total_memory,
1905 ringkernel_used: 0,
1906 other_used: 0,
1907 pools: Vec::new(),
1908 };
1909 self.device_stats.write().insert(device_index, stats);
1910 }
1911
1912 pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1914 let mut stats = self.device_stats.write();
1915 if let Some(device) = stats.get_mut(&device_index) {
1916 device.free_memory = free_memory;
1917 device.ringkernel_used = ringkernel_used;
1918 device.other_used = device
1919 .total_memory
1920 .saturating_sub(free_memory + ringkernel_used);
1921 }
1922 }
1923
1924 pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1926 self.device_stats.read().get(&device_index).cloned()
1927 }
1928
1929 pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1931 self.device_stats.read().values().cloned().collect()
1932 }
1933
1934 pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1936 self.allocations.read().values().cloned().collect()
1937 }
1938
1939 pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1941 self.allocations
1942 .read()
1943 .values()
1944 .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1945 .cloned()
1946 .collect()
1947 }
1948
1949 pub fn total_allocated(&self) -> u64 {
1951 self.total_allocated.load(Ordering::Relaxed)
1952 }
1953
1954 pub fn peak_allocated(&self) -> u64 {
1956 self.peak_allocated.load(Ordering::Relaxed)
1957 }
1958
1959 pub fn allocation_count(&self) -> usize {
1961 self.allocations.read().len()
1962 }
1963
1964 pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1966 let stats = self.device_stats.read();
1967 if let Some(device) = stats.get(&device_index) {
1968 let utilization = device.utilization();
1969 if device.free_memory == 0 {
1970 MemoryPressureLevel::OutOfMemory
1971 } else if utilization >= self.thresholds.critical {
1972 MemoryPressureLevel::Critical
1973 } else if utilization >= self.thresholds.warning {
1974 MemoryPressureLevel::Warning
1975 } else if utilization >= self.thresholds.warning * 0.8 {
1976 MemoryPressureLevel::Elevated
1977 } else {
1978 MemoryPressureLevel::Normal
1979 }
1980 } else {
1981 MemoryPressureLevel::Normal
1982 }
1983 }
1984
1985 pub fn grafana_panel(&self) -> GrafanaPanel {
1987 GrafanaPanel {
1988 title: "GPU Memory Usage".to_string(),
1989 panel_type: PanelType::BarGauge,
1990 queries: vec![
1991 "ringkernel_gpu_memory_allocated_bytes".to_string(),
1992 "ringkernel_gpu_memory_peak_bytes".to_string(),
1993 ],
1994 grid_pos: (0, 0, 12, 8),
1995 unit: Some("bytes".to_string()),
1996 }
1997 }
1998
1999 pub fn prometheus_metrics(&self) -> String {
2001 let mut output = String::new();
2002
2003 writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").unwrap();
2005 writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge").unwrap();
2006 writeln!(
2007 output,
2008 "ringkernel_gpu_memory_allocated_bytes {}",
2009 self.total_allocated()
2010 )
2011 .unwrap();
2012
2013 writeln!(
2015 output,
2016 "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2017 )
2018 .unwrap();
2019 writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge").unwrap();
2020 writeln!(
2021 output,
2022 "ringkernel_gpu_memory_peak_bytes {}",
2023 self.peak_allocated()
2024 )
2025 .unwrap();
2026
2027 writeln!(
2029 output,
2030 "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2031 )
2032 .unwrap();
2033 writeln!(
2034 output,
2035 "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2036 )
2037 .unwrap();
2038 writeln!(
2039 output,
2040 "ringkernel_gpu_memory_allocation_count {}",
2041 self.allocation_count()
2042 )
2043 .unwrap();
2044
2045 let device_stats = self.device_stats.read();
2047 for device in device_stats.values() {
2048 writeln!(
2049 output,
2050 "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2051 device.device_name, device.total_memory
2052 )
2053 .unwrap();
2054 writeln!(
2055 output,
2056 "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2057 device.device_name, device.free_memory
2058 )
2059 .unwrap();
2060 writeln!(
2061 output,
2062 "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2063 device.device_name,
2064 device.used_memory()
2065 )
2066 .unwrap();
2067 writeln!(
2068 output,
2069 "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2070 device.device_name,
2071 device.utilization()
2072 )
2073 .unwrap();
2074 }
2075
2076 output
2077 }
2078
2079 pub fn summary_report(&self) -> String {
2081 let mut report = String::new();
2082
2083 writeln!(report, "=== GPU Memory Dashboard ===").unwrap();
2084 writeln!(report, "Total Allocated: {} bytes", self.total_allocated()).unwrap();
2085 writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated()).unwrap();
2086 writeln!(report, "Active Allocations: {}", self.allocation_count()).unwrap();
2087 writeln!(report).unwrap();
2088
2089 let device_stats = self.device_stats.read();
2091 for device in device_stats.values() {
2092 writeln!(
2093 report,
2094 "--- Device {} ({}) ---",
2095 device.device_index, device.device_name
2096 )
2097 .unwrap();
2098 writeln!(
2099 report,
2100 " Total: {} MB",
2101 device.total_memory / (1024 * 1024)
2102 )
2103 .unwrap();
2104 writeln!(report, " Free: {} MB", device.free_memory / (1024 * 1024)).unwrap();
2105 writeln!(
2106 report,
2107 " RingKernel: {} MB",
2108 device.ringkernel_used / (1024 * 1024)
2109 )
2110 .unwrap();
2111 writeln!(report, " Utilization: {:.1}%", device.utilization()).unwrap();
2112 writeln!(
2113 report,
2114 " Pressure: {:?}",
2115 self.check_pressure(device.device_index)
2116 )
2117 .unwrap();
2118 }
2119
2120 let allocations = self.allocations.read();
2122 let mut sorted_allocs: Vec<_> = allocations.values().collect();
2123 sorted_allocs.sort_by(|a, b| b.size.cmp(&a.size));
2124
2125 if !sorted_allocs.is_empty() {
2126 writeln!(report).unwrap();
2127 writeln!(report, "--- Top 10 Allocations ---").unwrap();
2128 for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2129 writeln!(
2130 report,
2131 " {}. {} - {} bytes ({:?})",
2132 i + 1,
2133 alloc.name,
2134 alloc.size,
2135 alloc.memory_type
2136 )
2137 .unwrap();
2138 }
2139 }
2140
2141 report
2142 }
2143}
2144
2145impl Default for GpuMemoryDashboard {
2146 fn default() -> Self {
2147 Self {
2148 allocations: RwLock::new(HashMap::new()),
2149 device_stats: RwLock::new(HashMap::new()),
2150 thresholds: GpuMemoryThresholds::default(),
2151 allocation_counter: AtomicU64::new(1),
2152 total_allocated: AtomicU64::new(0),
2153 peak_allocated: AtomicU64::new(0),
2154 }
2155 }
2156}
2157
2158#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2164pub enum OtlpTransport {
2165 #[default]
2167 HttpJson,
2168 HttpProtobuf,
2170 Grpc,
2172}
2173
2174#[derive(Debug, Clone)]
2176pub struct OtlpConfig {
2177 pub endpoint: String,
2179 pub transport: OtlpTransport,
2181 pub service_name: String,
2183 pub service_version: String,
2185 pub service_instance_id: Option<String>,
2187 pub resource_attributes: Vec<(String, String)>,
2189 pub batch_size: usize,
2191 pub export_interval: Duration,
2193 pub timeout: Duration,
2195 pub max_retries: u32,
2197 pub retry_delay: Duration,
2199 pub authorization: Option<String>,
2201}
2202
2203impl Default for OtlpConfig {
2204 fn default() -> Self {
2205 Self {
2206 endpoint: "http://localhost:4318/v1/traces".to_string(),
2207 transport: OtlpTransport::HttpJson,
2208 service_name: "ringkernel".to_string(),
2209 service_version: env!("CARGO_PKG_VERSION").to_string(),
2210 service_instance_id: None,
2211 resource_attributes: Vec::new(),
2212 batch_size: 512,
2213 export_interval: Duration::from_secs(5),
2214 timeout: Duration::from_secs(30),
2215 max_retries: 3,
2216 retry_delay: Duration::from_millis(100),
2217 authorization: None,
2218 }
2219 }
2220}
2221
2222impl OtlpConfig {
2223 pub fn new(endpoint: impl Into<String>) -> Self {
2225 Self {
2226 endpoint: endpoint.into(),
2227 ..Default::default()
2228 }
2229 }
2230
2231 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
2233 self.service_name = name.into();
2234 self
2235 }
2236
2237 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
2239 self.service_version = version.into();
2240 self
2241 }
2242
2243 pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
2245 self.service_instance_id = Some(id.into());
2246 self
2247 }
2248
2249 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2251 self.resource_attributes.push((key.into(), value.into()));
2252 self
2253 }
2254
2255 pub fn with_batch_size(mut self, size: usize) -> Self {
2257 self.batch_size = size;
2258 self
2259 }
2260
2261 pub fn with_export_interval(mut self, interval: Duration) -> Self {
2263 self.export_interval = interval;
2264 self
2265 }
2266
2267 pub fn with_authorization(mut self, auth: impl Into<String>) -> Self {
2269 self.authorization = Some(auth.into());
2270 self
2271 }
2272
2273 pub fn jaeger(endpoint: impl Into<String>) -> Self {
2275 Self::new(endpoint).with_service_name("ringkernel")
2276 }
2277
2278 pub fn honeycomb(api_key: impl Into<String>) -> Self {
2280 Self::new("https://api.honeycomb.io/v1/traces")
2281 .with_authorization(format!("x-honeycomb-team {}", api_key.into()))
2282 }
2283
2284 pub fn grafana_cloud(instance_id: impl Into<String>, api_key: impl Into<String>) -> Self {
2286 let instance = instance_id.into();
2287 Self::new("https://otlp-gateway-prod-us-central-0.grafana.net/otlp/v1/traces")
2288 .with_authorization(format!("Basic {}", api_key.into()))
2289 .with_attribute("grafana.instance", instance)
2290 }
2291}
2292
2293#[derive(Debug, Clone)]
2295pub struct OtlpExportResult {
2296 pub spans_exported: usize,
2298 pub success: bool,
2300 pub error: Option<String>,
2302 pub duration: Duration,
2304 pub retry_count: u32,
2306}
2307
2308#[derive(Debug, Clone, Default)]
2310pub struct OtlpExporterStats {
2311 pub total_spans_exported: u64,
2313 pub total_exports: u64,
2315 pub successful_exports: u64,
2317 pub failed_exports: u64,
2319 pub total_retries: u64,
2321 pub buffered_spans: usize,
2323 pub last_export: Option<Instant>,
2325 pub last_error: Option<String>,
2327}
2328
2329pub struct OtlpExporter {
2333 config: OtlpConfig,
2334 buffer: RwLock<Vec<Span>>,
2335 stats: RwLock<OtlpExporterStats>,
2336}
2337
2338impl OtlpExporter {
2339 pub fn new(config: OtlpConfig) -> Self {
2341 Self {
2342 config,
2343 buffer: RwLock::new(Vec::new()),
2344 stats: RwLock::new(OtlpExporterStats::default()),
2345 }
2346 }
2347
2348 pub fn jaeger_local() -> Self {
2350 Self::new(OtlpConfig::jaeger("http://localhost:4318/v1/traces"))
2351 }
2352
2353 pub fn config(&self) -> &OtlpConfig {
2355 &self.config
2356 }
2357
2358 pub fn stats(&self) -> OtlpExporterStats {
2360 self.stats.read().clone()
2361 }
2362
2363 pub fn export_span(&self, span: Span) {
2365 let mut buffer = self.buffer.write();
2366 buffer.push(span);
2367
2368 let should_flush = buffer.len() >= self.config.batch_size;
2369 drop(buffer);
2370
2371 if should_flush {
2372 let _ = self.flush();
2373 }
2374 }
2375
2376 pub fn export_spans(&self, spans: Vec<Span>) {
2378 let mut buffer = self.buffer.write();
2379 buffer.extend(spans);
2380
2381 let should_flush = buffer.len() >= self.config.batch_size;
2382 drop(buffer);
2383
2384 if should_flush {
2385 let _ = self.flush();
2386 }
2387 }
2388
2389 pub fn buffered_count(&self) -> usize {
2391 self.buffer.read().len()
2392 }
2393
2394 pub fn flush(&self) -> OtlpExportResult {
2396 let spans: Vec<Span> = {
2397 let mut buffer = self.buffer.write();
2398 std::mem::take(&mut *buffer)
2399 };
2400
2401 if spans.is_empty() {
2402 return OtlpExportResult {
2403 spans_exported: 0,
2404 success: true,
2405 error: None,
2406 duration: Duration::ZERO,
2407 retry_count: 0,
2408 };
2409 }
2410
2411 let start = Instant::now();
2412 let result = self.send_spans(&spans);
2413 let duration = start.elapsed();
2414
2415 {
2417 let mut stats = self.stats.write();
2418 stats.total_exports += 1;
2419 stats.last_export = Some(Instant::now());
2420
2421 if result.success {
2422 stats.successful_exports += 1;
2423 stats.total_spans_exported += spans.len() as u64;
2424 } else {
2425 stats.failed_exports += 1;
2426 stats.last_error = result.error.clone();
2427 let mut buffer = self.buffer.write();
2429 buffer.extend(spans);
2430 }
2431 stats.total_retries += result.retry_count as u64;
2432 stats.buffered_spans = self.buffer.read().len();
2433 }
2434
2435 OtlpExportResult {
2436 spans_exported: if result.success {
2437 result.spans_exported
2438 } else {
2439 0
2440 },
2441 duration,
2442 ..result
2443 }
2444 }
2445
2446 fn send_spans(&self, spans: &[Span]) -> OtlpExportResult {
2448 #[cfg(not(feature = "alerting"))]
2450 {
2451 eprintln!(
2452 "[OTLP stub] Would export {} spans to {} (enable 'alerting' feature for HTTP export)",
2453 spans.len(),
2454 self.config.endpoint
2455 );
2456 OtlpExportResult {
2457 spans_exported: spans.len(),
2458 success: true,
2459 error: None,
2460 duration: Duration::ZERO,
2461 retry_count: 0,
2462 }
2463 }
2464
2465 #[cfg(feature = "alerting")]
2466 {
2467 self.send_spans_http(spans)
2468 }
2469 }
2470
2471 #[cfg(feature = "alerting")]
2473 fn send_spans_http(&self, spans: &[Span]) -> OtlpExportResult {
2474 let payload = self.build_otlp_json(spans);
2475
2476 let client = reqwest::blocking::Client::builder()
2477 .timeout(self.config.timeout)
2478 .build();
2479
2480 let client = match client {
2481 Ok(c) => c,
2482 Err(e) => {
2483 return OtlpExportResult {
2484 spans_exported: 0,
2485 success: false,
2486 error: Some(format!("Failed to create HTTP client: {}", e)),
2487 duration: Duration::ZERO,
2488 retry_count: 0,
2489 };
2490 }
2491 };
2492
2493 let mut retry_count = 0;
2494 let mut last_error = None;
2495
2496 for attempt in 0..=self.config.max_retries {
2497 let mut request = client
2498 .post(&self.config.endpoint)
2499 .header("Content-Type", "application/json")
2500 .body(payload.clone());
2501
2502 if let Some(auth) = &self.config.authorization {
2503 request = request.header("Authorization", auth);
2504 }
2505
2506 match request.send() {
2507 Ok(response) => {
2508 if response.status().is_success() {
2509 return OtlpExportResult {
2510 spans_exported: spans.len(),
2511 success: true,
2512 error: None,
2513 duration: Duration::ZERO,
2514 retry_count,
2515 };
2516 } else {
2517 last_error = Some(format!(
2518 "HTTP {}: {}",
2519 response.status(),
2520 response.status().as_str()
2521 ));
2522 }
2523 }
2524 Err(e) => {
2525 last_error = Some(format!("Request failed: {}", e));
2526 }
2527 }
2528
2529 if attempt < self.config.max_retries {
2530 retry_count += 1;
2531 std::thread::sleep(self.config.retry_delay * (1 << attempt));
2532 }
2533 }
2534
2535 OtlpExportResult {
2536 spans_exported: 0,
2537 success: false,
2538 error: last_error,
2539 duration: Duration::ZERO,
2540 retry_count,
2541 }
2542 }
2543
2544 #[cfg(feature = "alerting")]
2546 fn build_otlp_json(&self, spans: &[Span]) -> String {
2547 use std::fmt::Write;
2548
2549 let mut json = String::with_capacity(4096);
2550
2551 json.push_str(r#"{"resourceSpans":[{"resource":{"attributes":["#);
2553
2554 let _ = write!(
2556 json,
2557 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}}"#,
2558 escape_json_str(&self.config.service_name)
2559 );
2560
2561 let _ = write!(
2563 json,
2564 r#",{{"key":"service.version","value":{{"stringValue":"{}"}}}}"#,
2565 escape_json_str(&self.config.service_version)
2566 );
2567
2568 if let Some(instance_id) = &self.config.service_instance_id {
2570 let _ = write!(
2571 json,
2572 r#",{{"key":"service.instance.id","value":{{"stringValue":"{}"}}}}"#,
2573 escape_json_str(instance_id)
2574 );
2575 }
2576
2577 for (key, value) in &self.config.resource_attributes {
2579 let _ = write!(
2580 json,
2581 r#",{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
2582 escape_json_str(key),
2583 escape_json_str(value)
2584 );
2585 }
2586
2587 json.push_str(r#"]},"scopeSpans":[{"scope":{"name":"ringkernel"},"spans":["#);
2588
2589 let mut first = true;
2591 for span in spans {
2592 if !first {
2593 json.push(',');
2594 }
2595 first = false;
2596 self.span_to_json(&mut json, span);
2597 }
2598
2599 json.push_str("]}]}]}");
2600 json
2601 }
2602
2603 #[cfg(feature = "alerting")]
2605 fn span_to_json(&self, json: &mut String, span: &Span) {
2606 use std::fmt::Write;
2607
2608 let _ = write!(
2609 json,
2610 r#"{{"traceId":"{}","spanId":"{}""#,
2611 span.trace_id.to_hex(),
2612 span.span_id.to_hex()
2613 );
2614
2615 if let Some(parent) = span.parent_span_id {
2616 let _ = write!(json, r#","parentSpanId":"{}""#, parent.to_hex());
2617 }
2618
2619 let _ = write!(
2620 json,
2621 r#","name":"{}","kind":{}"#,
2622 escape_json_str(&span.name),
2623 match span.kind {
2624 SpanKind::Internal => 1,
2625 SpanKind::Server => 2,
2626 SpanKind::Client => 3,
2627 SpanKind::Producer => 4,
2628 SpanKind::Consumer => 5,
2629 }
2630 );
2631
2632 let start_nanos = span.start_time.elapsed().as_nanos();
2634 let end_nanos = span
2635 .end_time
2636 .map(|t| t.elapsed().as_nanos())
2637 .unwrap_or(start_nanos);
2638
2639 let _ = write!(
2641 json,
2642 r#","startTimeUnixNano":"{}","endTimeUnixNano":"{}""#,
2643 start_nanos, end_nanos
2644 );
2645
2646 let _ = write!(
2648 json,
2649 r#","status":{{"code":{}}}"#,
2650 match &span.status {
2651 SpanStatus::Unset => 0,
2652 SpanStatus::Ok => 1,
2653 SpanStatus::Error { .. } => 2,
2654 }
2655 );
2656
2657 if !span.attributes.is_empty() {
2659 json.push_str(r#","attributes":["#);
2660 let mut first = true;
2661 for (key, value) in &span.attributes {
2662 if !first {
2663 json.push(',');
2664 }
2665 first = false;
2666 let _ = write!(
2667 json,
2668 r#"{{"key":"{}","value":{}}}"#,
2669 escape_json_str(key),
2670 attribute_value_to_json(value)
2671 );
2672 }
2673 json.push(']');
2674 }
2675
2676 if !span.events.is_empty() {
2678 json.push_str(r#","events":["#);
2679 let mut first = true;
2680 for event in &span.events {
2681 if !first {
2682 json.push(',');
2683 }
2684 first = false;
2685 let _ = write!(
2686 json,
2687 r#"{{"name":"{}","timeUnixNano":"{}"}}"#,
2688 escape_json_str(&event.name),
2689 event.timestamp.elapsed().as_nanos()
2690 );
2691 }
2692 json.push(']');
2693 }
2694
2695 json.push('}');
2696 }
2697}
2698
2699#[cfg(feature = "alerting")]
2701fn escape_json_str(s: &str) -> String {
2702 s.replace('\\', "\\\\")
2703 .replace('"', "\\\"")
2704 .replace('\n', "\\n")
2705 .replace('\r', "\\r")
2706 .replace('\t', "\\t")
2707}
2708
2709#[cfg(feature = "alerting")]
2711fn attribute_value_to_json(value: &AttributeValue) -> String {
2712 match value {
2713 AttributeValue::String(s) => format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)),
2714 AttributeValue::Int(i) => format!(r#"{{"intValue":"{}"}}"#, i),
2715 AttributeValue::Float(f) => format!(r#"{{"doubleValue":{}}}"#, f),
2716 AttributeValue::Bool(b) => format!(r#"{{"boolValue":{}}}"#, b),
2717 AttributeValue::StringArray(arr) => {
2718 let values: Vec<String> = arr
2719 .iter()
2720 .map(|s| format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)))
2721 .collect();
2722 format!(r#"{{"arrayValue":{{"values":[{}]}}}}"#, values.join(","))
2723 }
2724 }
2725}
2726
2727#[cfg(test)]
2728mod tests {
2729 use super::*;
2730 use crate::runtime::KernelId;
2731
2732 #[test]
2733 fn test_trace_id_generation() {
2734 let id1 = TraceId::new();
2735 let id2 = TraceId::new();
2736 assert_ne!(id1.0, id2.0);
2737 }
2738
2739 #[test]
2740 fn test_trace_id_hex() {
2741 let id = TraceId(0x123456789abcdef0123456789abcdef0);
2742 let hex = id.to_hex();
2743 assert_eq!(hex.len(), 32);
2744 let parsed = TraceId::from_hex(&hex).unwrap();
2745 assert_eq!(id, parsed);
2746 }
2747
2748 #[test]
2749 fn test_span_creation() {
2750 let span = Span::new("test_operation", SpanKind::Internal);
2751 assert!(!span.is_ended());
2752 assert_eq!(span.name, "test_operation");
2753 }
2754
2755 #[test]
2756 fn test_span_child() {
2757 let parent = Span::new("parent", SpanKind::Server);
2758 let child = parent.child("child", SpanKind::Internal);
2759
2760 assert_eq!(child.trace_id, parent.trace_id);
2761 assert_eq!(child.parent_span_id, Some(parent.span_id));
2762 }
2763
2764 #[test]
2765 fn test_span_attributes() {
2766 let mut span = Span::new("test", SpanKind::Internal);
2767 span.set_attribute("string_key", "value");
2768 span.set_attribute("int_key", 42i64);
2769 span.set_attribute("bool_key", true);
2770
2771 assert_eq!(span.attributes.len(), 3);
2772 }
2773
2774 #[test]
2775 fn test_span_events() {
2776 let mut span = Span::new("test", SpanKind::Internal);
2777 span.add_event("event1");
2778 span.add_event("event2");
2779
2780 assert_eq!(span.events.len(), 2);
2781 }
2782
2783 #[test]
2784 fn test_span_builder() {
2785 let parent = Span::new("parent", SpanKind::Server);
2786 let span = SpanBuilder::new("child")
2787 .kind(SpanKind::Client)
2788 .parent(&parent)
2789 .attribute("key", "value")
2790 .build();
2791
2792 assert_eq!(span.trace_id, parent.trace_id);
2793 assert_eq!(span.kind, SpanKind::Client);
2794 assert!(span.attributes.contains_key("key"));
2795 }
2796
2797 #[test]
2798 fn test_prometheus_exporter() {
2799 let exporter = PrometheusExporter::new();
2800 exporter.register_counter("test_counter", "A test counter", &["label1"]);
2801 exporter.register_gauge("test_gauge", "A test gauge", &[]);
2802
2803 exporter.inc_counter("test_counter", &["value1"]);
2804 exporter.inc_counter("test_counter", &["value1"]);
2805 exporter.set_metric("test_gauge", 42.0, &[]);
2806
2807 let output = exporter.render();
2808 assert!(output.contains("test_counter"));
2809 assert!(output.contains("test_gauge"));
2810 }
2811
2812 #[test]
2813 fn test_grafana_dashboard() {
2814 let dashboard = GrafanaDashboard::new("Test Dashboard")
2815 .description("A test dashboard")
2816 .add_throughput_panel()
2817 .add_latency_panel()
2818 .build();
2819
2820 assert!(dashboard.contains("Test Dashboard"));
2821 assert!(dashboard.contains("Message Throughput"));
2822 assert!(dashboard.contains("Message Latency"));
2823 }
2824
2825 #[test]
2826 fn test_observability_context() {
2827 let ctx = ObservabilityContext::new();
2828
2829 let span = ctx.start_span("test", SpanKind::Internal);
2830 assert_eq!(ctx.active_span_count(), 1);
2831
2832 ctx.end_span(span);
2833 assert_eq!(ctx.active_span_count(), 0);
2834
2835 let exported = ctx.export_spans();
2836 assert_eq!(exported.len(), 1);
2837 }
2838
2839 #[test]
2840 fn test_ringkernel_collector() {
2841 let collector = Arc::new(MetricsCollector::new());
2842 let kernel_id = KernelId::new("test");
2843
2844 collector.record_message_processed(&kernel_id, 100);
2845 collector.record_message_processed(&kernel_id, 200);
2846
2847 let prom_collector = RingKernelCollector::new(collector);
2848 let defs = prom_collector.definitions();
2849 let samples = prom_collector.collect();
2850
2851 assert!(!defs.is_empty());
2852 assert!(!samples.is_empty());
2853 }
2854
2855 #[test]
2858 fn test_profiler_color() {
2859 let color = ProfilerColor::new(128, 64, 32);
2860 assert_eq!(color.r, 128);
2861 assert_eq!(color.g, 64);
2862 assert_eq!(color.b, 32);
2863 assert_eq!(color.a, 255);
2864
2865 assert_eq!(ProfilerColor::RED.r, 255);
2866 assert_eq!(ProfilerColor::GREEN.g, 255);
2867 assert_eq!(ProfilerColor::BLUE.b, 255);
2868 }
2869
2870 #[test]
2871 fn test_null_profiler() {
2872 let profiler = NullProfiler;
2873 assert!(!profiler.is_available());
2874 assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2875
2876 assert!(profiler.start_capture().is_ok());
2878 assert!(profiler.end_capture().is_ok());
2879 assert!(profiler.trigger_capture().is_ok());
2880
2881 let range = profiler.push_range("test", ProfilerColor::RED);
2882 let _elapsed = range.elapsed(); profiler.pop_range();
2884 profiler.mark("marker", ProfilerColor::BLUE);
2885 profiler.set_thread_name("thread");
2886 }
2887
2888 #[test]
2889 fn test_nvtx_profiler_stub() {
2890 let profiler = NvtxProfiler::new();
2891 assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2892
2893 assert!(!profiler.is_available());
2895 assert!(!profiler.is_nvtx_loaded());
2896
2897 assert!(matches!(
2899 profiler.start_capture(),
2900 Err(ProfilerError::NotAvailable)
2901 ));
2902 }
2903
2904 #[test]
2905 fn test_renderdoc_profiler_stub() {
2906 let profiler = RenderDocProfiler::new();
2907 assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2908
2909 assert!(!profiler.is_available());
2911 assert!(!profiler.is_attached());
2912 assert!(profiler.get_capture_path().is_none());
2913
2914 assert!(matches!(
2916 profiler.launch_ui(),
2917 Err(ProfilerError::NotAttached)
2918 ));
2919 }
2920
2921 #[test]
2922 fn test_gpu_profiler_manager() {
2923 let manager = GpuProfilerManager::new();
2924
2925 assert!(!manager.is_enabled());
2927 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2928
2929 manager.set_enabled(true);
2931 assert!(manager.is_enabled());
2932 manager.set_enabled(false);
2933 assert!(!manager.is_enabled());
2934 }
2935
2936 #[test]
2937 fn test_profiler_scope() {
2938 let manager = GpuProfilerManager::new();
2939
2940 {
2942 let _scope = manager.scope("test_scope");
2943 }
2945
2946 {
2947 let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2948 }
2949
2950 manager.mark("test_marker");
2952 }
2953
2954 #[test]
2955 fn test_profiler_with_custom() {
2956 let custom_profiler = Arc::new(NullProfiler);
2957 let manager = GpuProfilerManager::with_profiler(custom_profiler);
2958
2959 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2960 }
2961
2962 #[test]
2963 fn test_profiler_range_elapsed() {
2964 let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2965 std::thread::sleep(std::time::Duration::from_millis(10));
2966 let elapsed = range.elapsed();
2967 assert!(elapsed.as_millis() >= 10);
2968 }
2969
2970 #[test]
2971 fn test_profiler_error_display() {
2972 let err = ProfilerError::NotAvailable;
2973 assert!(err.to_string().contains("not available"));
2974
2975 let err = ProfilerError::NotAttached;
2976 assert!(err.to_string().contains("not attached"));
2977
2978 let err = ProfilerError::CaptureInProgress;
2979 assert!(err.to_string().contains("in progress"));
2980
2981 let err = ProfilerError::Backend("test error".to_string());
2982 assert!(err.to_string().contains("test error"));
2983 }
2984
2985 #[test]
2988 fn test_gpu_memory_dashboard_creation() {
2989 let dashboard = GpuMemoryDashboard::new();
2990 assert_eq!(dashboard.total_allocated(), 0);
2991 assert_eq!(dashboard.peak_allocated(), 0);
2992 assert_eq!(dashboard.allocation_count(), 0);
2993 }
2994
2995 #[test]
2996 fn test_gpu_memory_allocation_tracking() {
2997 let dashboard = GpuMemoryDashboard::new();
2998
2999 dashboard.track_allocation(
3001 1,
3002 "test_buffer",
3003 65536,
3004 GpuMemoryType::DeviceLocal,
3005 0,
3006 Some("test_kernel"),
3007 );
3008
3009 assert_eq!(dashboard.total_allocated(), 65536);
3010 assert_eq!(dashboard.peak_allocated(), 65536);
3011 assert_eq!(dashboard.allocation_count(), 1);
3012
3013 dashboard.track_allocation(
3015 2,
3016 "queue_buffer",
3017 1024,
3018 GpuMemoryType::QueueBuffer,
3019 0,
3020 Some("test_kernel"),
3021 );
3022
3023 assert_eq!(dashboard.total_allocated(), 66560);
3024 assert_eq!(dashboard.peak_allocated(), 66560);
3025 assert_eq!(dashboard.allocation_count(), 2);
3026
3027 dashboard.track_deallocation(1);
3029 assert_eq!(dashboard.total_allocated(), 1024);
3030 assert_eq!(dashboard.peak_allocated(), 66560); assert_eq!(dashboard.allocation_count(), 1);
3032 }
3033
3034 #[test]
3035 fn test_gpu_memory_device_stats() {
3036 let dashboard = GpuMemoryDashboard::new();
3037
3038 dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); let stats = dashboard.get_device_stats(0).unwrap();
3042 assert_eq!(stats.device_index, 0);
3043 assert_eq!(stats.device_name, "NVIDIA RTX 4090");
3044 assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
3045 assert_eq!(stats.utilization(), 0.0);
3046
3047 let used = 8 * 1024 * 1024 * 1024; let free = 16 * 1024 * 1024 * 1024; dashboard.update_device_stats(0, free, used);
3051
3052 let stats = dashboard.get_device_stats(0).unwrap();
3053 assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
3054 }
3055
3056 #[test]
3057 fn test_gpu_memory_pressure_levels() {
3058 let dashboard = GpuMemoryDashboard::new();
3059
3060 dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
3062
3063 dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
3065 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
3066
3067 dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
3069 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
3070
3071 dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
3073 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
3074
3075 dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
3077 assert_eq!(
3078 dashboard.check_pressure(0),
3079 MemoryPressureLevel::OutOfMemory
3080 );
3081 }
3082
3083 #[test]
3084 fn test_gpu_memory_kernel_allocations() {
3085 let dashboard = GpuMemoryDashboard::new();
3086
3087 dashboard.track_allocation(
3089 1,
3090 "buf1",
3091 1000,
3092 GpuMemoryType::DeviceLocal,
3093 0,
3094 Some("kernel_a"),
3095 );
3096 dashboard.track_allocation(
3097 2,
3098 "buf2",
3099 2000,
3100 GpuMemoryType::DeviceLocal,
3101 0,
3102 Some("kernel_a"),
3103 );
3104 dashboard.track_allocation(
3105 3,
3106 "buf3",
3107 3000,
3108 GpuMemoryType::DeviceLocal,
3109 0,
3110 Some("kernel_b"),
3111 );
3112
3113 let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
3114 assert_eq!(kernel_a_allocs.len(), 2);
3115
3116 let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
3117 assert_eq!(kernel_b_allocs.len(), 1);
3118
3119 let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
3120 assert_eq!(kernel_c_allocs.len(), 0);
3121 }
3122
3123 #[test]
3124 fn test_gpu_memory_prometheus_metrics() {
3125 let dashboard = GpuMemoryDashboard::new();
3126 dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
3127 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3128
3129 let metrics = dashboard.prometheus_metrics();
3130 assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
3131 assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
3132 assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
3133 }
3134
3135 #[test]
3136 fn test_gpu_memory_summary_report() {
3137 let dashboard = GpuMemoryDashboard::new();
3138 dashboard.track_allocation(
3139 1,
3140 "large_buffer",
3141 1024 * 1024,
3142 GpuMemoryType::DeviceLocal,
3143 0,
3144 None,
3145 );
3146 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3147
3148 let report = dashboard.summary_report();
3149 assert!(report.contains("GPU Memory Dashboard"));
3150 assert!(report.contains("large_buffer"));
3151 }
3152
3153 #[test]
3154 fn test_gpu_memory_pool_stats() {
3155 let pool_stats = GpuMemoryPoolStats {
3156 name: "default".to_string(),
3157 capacity: 1024 * 1024,
3158 allocated: 512 * 1024,
3159 peak_allocated: 768 * 1024,
3160 allocation_count: 10,
3161 total_allocations: 100,
3162 total_deallocations: 90,
3163 fragmentation: 0.1,
3164 };
3165
3166 assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
3167 }
3168
3169 #[test]
3170 fn test_gpu_memory_types() {
3171 let types = [
3173 GpuMemoryType::DeviceLocal,
3174 GpuMemoryType::HostVisible,
3175 GpuMemoryType::HostCoherent,
3176 GpuMemoryType::Mapped,
3177 GpuMemoryType::QueueBuffer,
3178 GpuMemoryType::ControlBlock,
3179 GpuMemoryType::SharedMemory,
3180 ];
3181
3182 for (i, t1) in types.iter().enumerate() {
3183 for (j, t2) in types.iter().enumerate() {
3184 if i != j {
3185 assert_ne!(t1, t2);
3186 }
3187 }
3188 }
3189 }
3190
3191 #[test]
3192 fn test_gpu_memory_grafana_panel() {
3193 let dashboard = GpuMemoryDashboard::new();
3194 let panel = dashboard.grafana_panel();
3195
3196 assert_eq!(panel.title, "GPU Memory Usage");
3197 assert_eq!(panel.panel_type, PanelType::BarGauge);
3198 assert!(!panel.queries.is_empty());
3199 }
3200
3201 #[test]
3202 fn test_gpu_memory_allocation_id_generation() {
3203 let dashboard = GpuMemoryDashboard::new();
3204
3205 let id1 = dashboard.next_allocation_id();
3206 let id2 = dashboard.next_allocation_id();
3207 let id3 = dashboard.next_allocation_id();
3208
3209 assert_eq!(id1, 1);
3210 assert_eq!(id2, 2);
3211 assert_eq!(id3, 3);
3212 }
3213
3214 #[test]
3217 fn test_otlp_config_default() {
3218 let config = OtlpConfig::default();
3219 assert_eq!(config.endpoint, "http://localhost:4318/v1/traces");
3220 assert_eq!(config.transport, OtlpTransport::HttpJson);
3221 assert_eq!(config.service_name, "ringkernel");
3222 assert_eq!(config.batch_size, 512);
3223 }
3224
3225 #[test]
3226 fn test_otlp_config_builder() {
3227 let config = OtlpConfig::new("http://example.com/v1/traces")
3228 .with_service_name("my-service")
3229 .with_service_version("1.0.0")
3230 .with_instance_id("instance-1")
3231 .with_attribute("env", "production")
3232 .with_batch_size(100);
3233
3234 assert_eq!(config.endpoint, "http://example.com/v1/traces");
3235 assert_eq!(config.service_name, "my-service");
3236 assert_eq!(config.service_version, "1.0.0");
3237 assert_eq!(config.service_instance_id, Some("instance-1".to_string()));
3238 assert_eq!(config.resource_attributes.len(), 1);
3239 assert_eq!(config.batch_size, 100);
3240 }
3241
3242 #[test]
3243 fn test_otlp_config_jaeger() {
3244 let config = OtlpConfig::jaeger("http://jaeger:4318/v1/traces");
3245 assert_eq!(config.endpoint, "http://jaeger:4318/v1/traces");
3246 assert_eq!(config.service_name, "ringkernel");
3247 }
3248
3249 #[test]
3250 fn test_otlp_config_honeycomb() {
3251 let config = OtlpConfig::honeycomb("my-api-key");
3252 assert_eq!(config.endpoint, "https://api.honeycomb.io/v1/traces");
3253 assert_eq!(
3254 config.authorization,
3255 Some("x-honeycomb-team my-api-key".to_string())
3256 );
3257 }
3258
3259 #[test]
3260 fn test_otlp_exporter_creation() {
3261 let exporter = OtlpExporter::new(OtlpConfig::default());
3262 assert_eq!(exporter.buffered_count(), 0);
3263 assert_eq!(exporter.config().service_name, "ringkernel");
3264 }
3265
3266 #[test]
3267 fn test_otlp_exporter_jaeger_local() {
3268 let exporter = OtlpExporter::jaeger_local();
3269 assert_eq!(
3270 exporter.config().endpoint,
3271 "http://localhost:4318/v1/traces"
3272 );
3273 }
3274
3275 #[test]
3276 fn test_otlp_exporter_buffering() {
3277 let config = OtlpConfig::default().with_batch_size(10);
3278 let exporter = OtlpExporter::new(config);
3279
3280 let span = Span::new("test_span", SpanKind::Internal);
3282
3283 for _ in 0..5 {
3285 exporter.export_span(span.clone());
3286 }
3287
3288 assert_eq!(exporter.buffered_count(), 5);
3289 }
3290
3291 #[test]
3292 fn test_otlp_exporter_flush_empty() {
3293 let exporter = OtlpExporter::new(OtlpConfig::default());
3294
3295 let result = exporter.flush();
3296 assert!(result.success);
3297 assert_eq!(result.spans_exported, 0);
3298 }
3299
3300 #[test]
3301 fn test_otlp_exporter_stats() {
3302 let exporter = OtlpExporter::new(OtlpConfig::default());
3303
3304 let stats = exporter.stats();
3306 assert_eq!(stats.total_exports, 0);
3307 assert_eq!(stats.total_spans_exported, 0);
3308 assert_eq!(stats.buffered_spans, 0);
3309 }
3310
3311 #[test]
3312 fn test_otlp_transport_default() {
3313 let transport = OtlpTransport::default();
3314 assert_eq!(transport, OtlpTransport::HttpJson);
3315 }
3316}