1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48 pub fn new() -> Self {
50 use std::hash::{Hash, Hasher};
51 let mut hasher = std::collections::hash_map::DefaultHasher::new();
52 SystemTime::now().hash(&mut hasher);
53 std::thread::current().id().hash(&mut hasher);
54 let high = hasher.finish() as u128;
55 hasher.write_u64(high as u64);
56 let low = hasher.finish() as u128;
57 Self((high << 64) | low)
58 }
59
60 pub fn from_hex(hex: &str) -> Option<Self> {
62 u128::from_str_radix(hex, 16).ok().map(Self)
63 }
64
65 pub fn to_hex(&self) -> String {
67 format!("{:032x}", self.0)
68 }
69}
70
71impl Default for TraceId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82 pub fn new() -> Self {
84 use std::hash::{Hash, Hasher};
85 let mut hasher = std::collections::hash_map::DefaultHasher::new();
86 SystemTime::now().hash(&mut hasher);
87 std::process::id().hash(&mut hasher);
88 Self(hasher.finish())
89 }
90
91 pub fn from_hex(hex: &str) -> Option<Self> {
93 u64::from_str_radix(hex, 16).ok().map(Self)
94 }
95
96 pub fn to_hex(&self) -> String {
98 format!("{:016x}", self.0)
99 }
100}
101
102impl Default for SpanId {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111 Internal,
113 Server,
115 Client,
117 Producer,
119 Consumer,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126 Unset,
128 Ok,
130 Error {
132 message: String,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Span {
140 pub trace_id: TraceId,
142 pub span_id: SpanId,
144 pub parent_span_id: Option<SpanId>,
146 pub name: String,
148 pub kind: SpanKind,
150 pub start_time: Instant,
152 pub end_time: Option<Instant>,
154 pub status: SpanStatus,
156 pub attributes: HashMap<String, AttributeValue>,
158 pub events: Vec<SpanEvent>,
160}
161
162#[derive(Debug, Clone)]
164pub enum AttributeValue {
165 String(String),
167 Int(i64),
169 Float(f64),
171 Bool(bool),
173 StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178 fn from(s: &str) -> Self {
179 Self::String(s.to_string())
180 }
181}
182
183impl From<String> for AttributeValue {
184 fn from(s: String) -> Self {
185 Self::String(s)
186 }
187}
188
189impl From<i64> for AttributeValue {
190 fn from(i: i64) -> Self {
191 Self::Int(i)
192 }
193}
194
195impl From<f64> for AttributeValue {
196 fn from(f: f64) -> Self {
197 Self::Float(f)
198 }
199}
200
201impl From<bool> for AttributeValue {
202 fn from(b: bool) -> Self {
203 Self::Bool(b)
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct SpanEvent {
210 pub name: String,
212 pub timestamp: Instant,
214 pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219 pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221 Self {
222 trace_id: TraceId::new(),
223 span_id: SpanId::new(),
224 parent_span_id: None,
225 name: name.into(),
226 kind,
227 start_time: Instant::now(),
228 end_time: None,
229 status: SpanStatus::Unset,
230 attributes: HashMap::new(),
231 events: Vec::new(),
232 }
233 }
234
235 pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237 Self {
238 trace_id: self.trace_id,
239 span_id: SpanId::new(),
240 parent_span_id: Some(self.span_id),
241 name: name.into(),
242 kind,
243 start_time: Instant::now(),
244 end_time: None,
245 status: SpanStatus::Unset,
246 attributes: HashMap::new(),
247 events: Vec::new(),
248 }
249 }
250
251 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253 self.attributes.insert(key.into(), value.into());
254 }
255
256 pub fn add_event(&mut self, name: impl Into<String>) {
258 self.events.push(SpanEvent {
259 name: name.into(),
260 timestamp: Instant::now(),
261 attributes: HashMap::new(),
262 });
263 }
264
265 pub fn add_event_with_attributes(
267 &mut self,
268 name: impl Into<String>,
269 attributes: HashMap<String, AttributeValue>,
270 ) {
271 self.events.push(SpanEvent {
272 name: name.into(),
273 timestamp: Instant::now(),
274 attributes,
275 });
276 }
277
278 pub fn set_ok(&mut self) {
280 self.status = SpanStatus::Ok;
281 }
282
283 pub fn set_error(&mut self, message: impl Into<String>) {
285 self.status = SpanStatus::Error {
286 message: message.into(),
287 };
288 }
289
290 pub fn end(&mut self) {
292 self.end_time = Some(Instant::now());
293 }
294
295 pub fn duration(&self) -> Duration {
297 self.end_time
298 .unwrap_or_else(Instant::now)
299 .duration_since(self.start_time)
300 }
301
302 pub fn is_ended(&self) -> bool {
304 self.end_time.is_some()
305 }
306}
307
308pub struct SpanBuilder {
314 name: String,
315 kind: SpanKind,
316 parent: Option<(TraceId, SpanId)>,
317 attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321 pub fn new(name: impl Into<String>) -> Self {
323 Self {
324 name: name.into(),
325 kind: SpanKind::Internal,
326 parent: None,
327 attributes: HashMap::new(),
328 }
329 }
330
331 pub fn kind(mut self, kind: SpanKind) -> Self {
333 self.kind = kind;
334 self
335 }
336
337 pub fn parent(mut self, parent: &Span) -> Self {
339 self.parent = Some((parent.trace_id, parent.span_id));
340 self
341 }
342
343 pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345 self.attributes.insert(key.into(), value.into());
346 self
347 }
348
349 pub fn build(self) -> Span {
351 let mut span = Span::new(self.name, self.kind);
352 if let Some((trace_id, parent_id)) = self.parent {
353 span.trace_id = trace_id;
354 span.parent_span_id = Some(parent_id);
355 }
356 span.attributes = self.attributes;
357 span
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368 Counter,
370 Gauge,
372 Histogram,
374 Summary,
376}
377
378#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381 pub name: String,
383 pub metric_type: MetricType,
385 pub help: String,
387 pub labels: Vec<String>,
389}
390
391#[derive(Debug, Clone)]
393pub struct MetricSample {
394 pub name: String,
396 pub label_values: Vec<String>,
398 pub value: f64,
400 pub timestamp_ms: Option<u64>,
402}
403
404pub struct PrometheusExporter {
406 definitions: RwLock<Vec<MetricDefinition>>,
408 collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410 custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412 export_count: AtomicU64,
414}
415
416struct CustomMetric {
418 definition: MetricDefinition,
419 samples: Vec<MetricSample>,
420}
421
422pub trait PrometheusCollector: Send + Sync {
424 fn definitions(&self) -> Vec<MetricDefinition>;
426
427 fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432 pub fn new() -> Arc<Self> {
434 Arc::new(Self {
435 definitions: RwLock::new(Vec::new()),
436 collectors: RwLock::new(Vec::new()),
437 custom_metrics: RwLock::new(HashMap::new()),
438 export_count: AtomicU64::new(0),
439 })
440 }
441
442 pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444 let defs = collector.definitions();
445 self.definitions.write().extend(defs);
446 self.collectors.write().push(collector);
447 }
448
449 pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451 let def = MetricDefinition {
452 name: name.to_string(),
453 metric_type: MetricType::Counter,
454 help: help.to_string(),
455 labels: labels.iter().map(|s| s.to_string()).collect(),
456 };
457 self.custom_metrics.write().insert(
458 name.to_string(),
459 CustomMetric {
460 definition: def,
461 samples: Vec::new(),
462 },
463 );
464 }
465
466 pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468 let def = MetricDefinition {
469 name: name.to_string(),
470 metric_type: MetricType::Gauge,
471 help: help.to_string(),
472 labels: labels.iter().map(|s| s.to_string()).collect(),
473 };
474 self.custom_metrics.write().insert(
475 name.to_string(),
476 CustomMetric {
477 definition: def,
478 samples: Vec::new(),
479 },
480 );
481 }
482
483 pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485 let def = MetricDefinition {
486 name: name.to_string(),
487 metric_type: MetricType::Histogram,
488 help: help.to_string(),
489 labels: labels.iter().map(|s| s.to_string()).collect(),
490 };
491 self.custom_metrics.write().insert(
492 name.to_string(),
493 CustomMetric {
494 definition: def,
495 samples: Vec::new(),
496 },
497 );
498 }
499
500 pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502 let mut metrics = self.custom_metrics.write();
503 if let Some(metric) = metrics.get_mut(name) {
504 let sample = MetricSample {
505 name: name.to_string(),
506 label_values: label_values.iter().map(|s| s.to_string()).collect(),
507 value,
508 timestamp_ms: None,
509 };
510 let existing = metric
512 .samples
513 .iter_mut()
514 .find(|s| s.label_values == sample.label_values);
515 if let Some(existing) = existing {
516 existing.value = value;
517 } else {
518 metric.samples.push(sample);
519 }
520 }
521 }
522
523 pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525 self.add_counter(name, 1.0, label_values);
526 }
527
528 pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530 let mut metrics = self.custom_metrics.write();
531 if let Some(metric) = metrics.get_mut(name) {
532 let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533 let existing = metric
534 .samples
535 .iter_mut()
536 .find(|s| s.label_values == label_vec);
537 if let Some(existing) = existing {
538 existing.value += delta;
539 } else {
540 metric.samples.push(MetricSample {
541 name: name.to_string(),
542 label_values: label_vec,
543 value: delta,
544 timestamp_ms: None,
545 });
546 }
547 }
548 }
549
550 pub fn render(&self) -> String {
552 self.export_count.fetch_add(1, Ordering::Relaxed);
553
554 let mut output = String::new();
555
556 let collectors = self.collectors.read();
558 for collector in collectors.iter() {
559 let defs = collector.definitions();
560 let samples = collector.collect();
561
562 for def in &defs {
563 writeln!(output, "# HELP {} {}", def.name, def.help).unwrap();
565 writeln!(
566 output,
567 "# TYPE {} {}",
568 def.name,
569 match def.metric_type {
570 MetricType::Counter => "counter",
571 MetricType::Gauge => "gauge",
572 MetricType::Histogram => "histogram",
573 MetricType::Summary => "summary",
574 }
575 )
576 .unwrap();
577
578 for sample in samples.iter().filter(|s| s.name == def.name) {
580 Self::write_sample(&mut output, &def.labels, sample);
581 }
582 }
583 }
584
585 let custom = self.custom_metrics.read();
587 for metric in custom.values() {
588 writeln!(
589 output,
590 "# HELP {} {}",
591 metric.definition.name, metric.definition.help
592 )
593 .unwrap();
594 writeln!(
595 output,
596 "# TYPE {} {}",
597 metric.definition.name,
598 match metric.definition.metric_type {
599 MetricType::Counter => "counter",
600 MetricType::Gauge => "gauge",
601 MetricType::Histogram => "histogram",
602 MetricType::Summary => "summary",
603 }
604 )
605 .unwrap();
606
607 for sample in &metric.samples {
608 Self::write_sample(&mut output, &metric.definition.labels, sample);
609 }
610 }
611
612 output
613 }
614
615 fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
616 if labels.is_empty() || sample.label_values.is_empty() {
617 writeln!(output, "{} {}", sample.name, sample.value).unwrap();
618 } else {
619 let label_pairs: Vec<String> = labels
620 .iter()
621 .zip(sample.label_values.iter())
622 .map(|(k, v)| format!("{}=\"{}\"", k, v))
623 .collect();
624 writeln!(
625 output,
626 "{}{{{}}} {}",
627 sample.name,
628 label_pairs.join(","),
629 sample.value
630 )
631 .unwrap();
632 }
633 }
634
635 pub fn export_count(&self) -> u64 {
637 self.export_count.load(Ordering::Relaxed)
638 }
639}
640
641impl Default for PrometheusExporter {
642 fn default() -> Self {
643 Self {
644 definitions: RwLock::new(Vec::new()),
645 collectors: RwLock::new(Vec::new()),
646 custom_metrics: RwLock::new(HashMap::new()),
647 export_count: AtomicU64::new(0),
648 }
649 }
650}
651
652pub struct RingKernelCollector {
658 collector: Arc<MetricsCollector>,
660}
661
662impl RingKernelCollector {
663 pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
665 Arc::new(Self { collector })
666 }
667}
668
669impl PrometheusCollector for RingKernelCollector {
670 fn definitions(&self) -> Vec<MetricDefinition> {
671 vec![
672 MetricDefinition {
673 name: "ringkernel_messages_processed_total".to_string(),
674 metric_type: MetricType::Counter,
675 help: "Total number of messages processed by kernels".to_string(),
676 labels: vec!["kernel_id".to_string()],
677 },
678 MetricDefinition {
679 name: "ringkernel_messages_dropped_total".to_string(),
680 metric_type: MetricType::Counter,
681 help: "Total number of messages dropped by kernels".to_string(),
682 labels: vec!["kernel_id".to_string()],
683 },
684 MetricDefinition {
685 name: "ringkernel_latency_us".to_string(),
686 metric_type: MetricType::Gauge,
687 help: "Current average message latency in microseconds".to_string(),
688 labels: vec!["kernel_id".to_string(), "stat".to_string()],
689 },
690 MetricDefinition {
691 name: "ringkernel_throughput".to_string(),
692 metric_type: MetricType::Gauge,
693 help: "Current message throughput per second".to_string(),
694 labels: vec!["kernel_id".to_string()],
695 },
696 ]
697 }
698
699 fn collect(&self) -> Vec<MetricSample> {
700 let aggregate = self.collector.get_aggregate();
701 let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
702
703 vec![
704 MetricSample {
705 name: "ringkernel_messages_processed_total".to_string(),
706 label_values: vec!["aggregate".to_string()],
707 value: aggregate.messages_processed as f64,
708 timestamp_ms: None,
709 },
710 MetricSample {
711 name: "ringkernel_messages_dropped_total".to_string(),
712 label_values: vec!["aggregate".to_string()],
713 value: aggregate.messages_dropped as f64,
714 timestamp_ms: None,
715 },
716 MetricSample {
717 name: "ringkernel_latency_us".to_string(),
718 label_values: vec!["aggregate".to_string(), "avg".to_string()],
719 value: aggregate.avg_latency_us(),
720 timestamp_ms: None,
721 },
722 MetricSample {
723 name: "ringkernel_latency_us".to_string(),
724 label_values: vec!["aggregate".to_string(), "min".to_string()],
725 value: aggregate.min_latency_us as f64,
726 timestamp_ms: None,
727 },
728 MetricSample {
729 name: "ringkernel_latency_us".to_string(),
730 label_values: vec!["aggregate".to_string(), "max".to_string()],
731 value: aggregate.max_latency_us as f64,
732 timestamp_ms: None,
733 },
734 MetricSample {
735 name: "ringkernel_throughput".to_string(),
736 label_values: vec!["aggregate".to_string()],
737 value: aggregate.messages_processed as f64 / elapsed,
738 timestamp_ms: None,
739 },
740 ]
741 }
742}
743
744#[derive(Debug, Clone, Copy, PartialEq, Eq)]
750pub enum PanelType {
751 Graph,
753 Stat,
755 Table,
757 Heatmap,
759 BarGauge,
761}
762
763#[derive(Debug, Clone)]
765pub struct GrafanaPanel {
766 pub title: String,
768 pub panel_type: PanelType,
770 pub queries: Vec<String>,
772 pub grid_pos: (u32, u32, u32, u32), pub unit: Option<String>,
776}
777
778pub struct GrafanaDashboard {
780 title: String,
782 description: String,
784 panels: Vec<GrafanaPanel>,
786 refresh: String,
788 time_from: String,
790 tags: Vec<String>,
792}
793
794impl GrafanaDashboard {
795 pub fn new(title: impl Into<String>) -> Self {
797 Self {
798 title: title.into(),
799 description: String::new(),
800 panels: Vec::new(),
801 refresh: "5s".to_string(),
802 time_from: "now-1h".to_string(),
803 tags: vec!["ringkernel".to_string()],
804 }
805 }
806
807 pub fn description(mut self, desc: impl Into<String>) -> Self {
809 self.description = desc.into();
810 self
811 }
812
813 pub fn refresh(mut self, interval: impl Into<String>) -> Self {
815 self.refresh = interval.into();
816 self
817 }
818
819 pub fn time_from(mut self, from: impl Into<String>) -> Self {
821 self.time_from = from.into();
822 self
823 }
824
825 pub fn tag(mut self, tag: impl Into<String>) -> Self {
827 self.tags.push(tag.into());
828 self
829 }
830
831 pub fn panel(mut self, panel: GrafanaPanel) -> Self {
833 self.panels.push(panel);
834 self
835 }
836
837 pub fn add_throughput_panel(mut self) -> Self {
839 self.panels.push(GrafanaPanel {
840 title: "Message Throughput".to_string(),
841 panel_type: PanelType::Graph,
842 queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
843 grid_pos: (0, 0, 12, 8),
844 unit: Some("msg/s".to_string()),
845 });
846 self
847 }
848
849 pub fn add_latency_panel(mut self) -> Self {
851 self.panels.push(GrafanaPanel {
852 title: "Message Latency".to_string(),
853 panel_type: PanelType::Graph,
854 queries: vec![
855 "ringkernel_latency_us{stat=\"avg\"}".to_string(),
856 "ringkernel_latency_us{stat=\"max\"}".to_string(),
857 ],
858 grid_pos: (12, 0, 12, 8),
859 unit: Some("µs".to_string()),
860 });
861 self
862 }
863
864 pub fn add_kernel_status_panel(mut self) -> Self {
866 self.panels.push(GrafanaPanel {
867 title: "Active Kernels".to_string(),
868 panel_type: PanelType::Stat,
869 queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
870 grid_pos: (0, 8, 6, 4),
871 unit: None,
872 });
873 self
874 }
875
876 pub fn add_drop_rate_panel(mut self) -> Self {
878 self.panels.push(GrafanaPanel {
879 title: "Message Drop Rate".to_string(),
880 panel_type: PanelType::Graph,
881 queries: vec![
882 "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
883 ],
884 grid_pos: (6, 8, 6, 4),
885 unit: Some("percentunit".to_string()),
886 });
887 self
888 }
889
890 pub fn add_multi_gpu_panel(mut self) -> Self {
892 self.panels.push(GrafanaPanel {
893 title: "GPU Memory Usage".to_string(),
894 panel_type: PanelType::BarGauge,
895 queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
896 grid_pos: (12, 8, 12, 4),
897 unit: Some("bytes".to_string()),
898 });
899 self
900 }
901
902 pub fn add_standard_panels(self) -> Self {
904 self.add_throughput_panel()
905 .add_latency_panel()
906 .add_kernel_status_panel()
907 .add_drop_rate_panel()
908 .add_multi_gpu_panel()
909 }
910
911 pub fn build(&self) -> String {
913 let panels_json: Vec<String> = self
914 .panels
915 .iter()
916 .enumerate()
917 .map(|(i, panel)| {
918 let queries_json: Vec<String> = panel
919 .queries
920 .iter()
921 .enumerate()
922 .map(|(j, q)| {
923 format!(
924 r#"{{
925 "expr": "{}",
926 "refId": "{}",
927 "legendFormat": "{{}}"
928 }}"#,
929 q,
930 (b'A' + j as u8) as char
931 )
932 })
933 .collect();
934
935 let unit_field = panel
936 .unit
937 .as_ref()
938 .map(|u| format!(r#""unit": "{}","#, u))
939 .unwrap_or_default();
940
941 format!(
942 r#"{{
943 "id": {},
944 "title": "{}",
945 "type": "{}",
946 "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
947 {}
948 "targets": [{}],
949 "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
950 }}"#,
951 i + 1,
952 panel.title,
953 match panel.panel_type {
954 PanelType::Graph => "timeseries",
955 PanelType::Stat => "stat",
956 PanelType::Table => "table",
957 PanelType::Heatmap => "heatmap",
958 PanelType::BarGauge => "bargauge",
959 },
960 panel.grid_pos.0,
961 panel.grid_pos.1,
962 panel.grid_pos.2,
963 panel.grid_pos.3,
964 unit_field,
965 queries_json.join(",")
966 )
967 })
968 .collect();
969
970 let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
971
972 format!(
973 r#"{{
974 "title": "{}",
975 "description": "{}",
976 "tags": [{}],
977 "refresh": "{}",
978 "time": {{"from": "{}", "to": "now"}},
979 "templating": {{
980 "list": [
981 {{
982 "name": "datasource",
983 "type": "datasource",
984 "query": "prometheus"
985 }},
986 {{
987 "name": "kernel_id",
988 "type": "query",
989 "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
990 "multi": true,
991 "includeAll": true
992 }}
993 ]
994 }},
995 "panels": [{}]
996 }}"#,
997 self.title,
998 self.description,
999 tags_json.join(","),
1000 self.refresh,
1001 self.time_from,
1002 panels_json.join(",")
1003 )
1004 }
1005}
1006
1007pub struct ObservabilityContext {
1013 active_spans: RwLock<HashMap<SpanId, Span>>,
1015 completed_spans: RwLock<Vec<Span>>,
1017 max_completed: usize,
1019 prometheus: Arc<PrometheusExporter>,
1021}
1022
1023impl ObservabilityContext {
1024 pub fn new() -> Arc<Self> {
1026 Arc::new(Self {
1027 active_spans: RwLock::new(HashMap::new()),
1028 completed_spans: RwLock::new(Vec::new()),
1029 max_completed: 10000,
1030 prometheus: PrometheusExporter::new(),
1031 })
1032 }
1033
1034 pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1036 let span = Span::new(name, kind);
1037 self.active_spans.write().insert(span.span_id, span.clone());
1038 span
1039 }
1040
1041 pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1043 let span = parent.child(name, kind);
1044 self.active_spans.write().insert(span.span_id, span.clone());
1045 span
1046 }
1047
1048 pub fn end_span(&self, mut span: Span) {
1050 span.end();
1051 self.active_spans.write().remove(&span.span_id);
1052
1053 let mut completed = self.completed_spans.write();
1054 completed.push(span);
1055 if completed.len() > self.max_completed {
1056 completed.remove(0);
1057 }
1058 }
1059
1060 pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1062 &self.prometheus
1063 }
1064
1065 pub fn export_spans(&self) -> Vec<Span> {
1067 self.completed_spans.write().drain(..).collect()
1068 }
1069
1070 pub fn active_span_count(&self) -> usize {
1072 self.active_spans.read().len()
1073 }
1074}
1075
1076impl Default for ObservabilityContext {
1077 fn default() -> Self {
1078 Self {
1079 active_spans: RwLock::new(HashMap::new()),
1080 completed_spans: RwLock::new(Vec::new()),
1081 max_completed: 10000,
1082 prometheus: PrometheusExporter::new(),
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1093pub enum GpuProfilerBackend {
1094 Nsight,
1096 RenderDoc,
1098 Pix,
1100 MetalSystemTrace,
1102 Rgp,
1104 Custom,
1106}
1107
1108#[derive(Debug, Clone, Copy)]
1110pub struct ProfilerColor {
1111 pub r: u8,
1113 pub g: u8,
1115 pub b: u8,
1117 pub a: u8,
1119}
1120
1121impl ProfilerColor {
1122 pub const fn new(r: u8, g: u8, b: u8) -> Self {
1124 Self { r, g, b, a: 255 }
1125 }
1126
1127 pub const RED: Self = Self::new(255, 0, 0);
1129 pub const GREEN: Self = Self::new(0, 255, 0);
1131 pub const BLUE: Self = Self::new(0, 0, 255);
1133 pub const YELLOW: Self = Self::new(255, 255, 0);
1135 pub const CYAN: Self = Self::new(0, 255, 255);
1137 pub const MAGENTA: Self = Self::new(255, 0, 255);
1139 pub const ORANGE: Self = Self::new(255, 165, 0);
1141}
1142
1143pub struct ProfilerRange {
1145 #[allow(dead_code)]
1147 name: String,
1148 #[allow(dead_code)]
1150 backend: GpuProfilerBackend,
1151 start: Instant,
1153}
1154
1155impl ProfilerRange {
1156 fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1158 Self {
1159 name: name.into(),
1160 backend,
1161 start: Instant::now(),
1162 }
1163 }
1164
1165 pub fn stub(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1171 Self::new(name, backend)
1172 }
1173
1174 pub fn elapsed(&self) -> Duration {
1176 self.start.elapsed()
1177 }
1178}
1179
1180impl Drop for ProfilerRange {
1181 fn drop(&mut self) {
1182 }
1185}
1186
1187pub trait GpuProfiler: Send + Sync {
1192 fn is_available(&self) -> bool {
1194 false
1195 }
1196
1197 fn backend(&self) -> GpuProfilerBackend;
1199
1200 fn start_capture(&self) -> Result<(), ProfilerError> {
1202 Ok(())
1203 }
1204
1205 fn end_capture(&self) -> Result<(), ProfilerError> {
1207 Ok(())
1208 }
1209
1210 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1212 Ok(())
1213 }
1214
1215 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1217 ProfilerRange::new(name, self.backend())
1218 }
1219
1220 fn pop_range(&self) {}
1222
1223 fn mark(&self, _name: &str, _color: ProfilerColor) {}
1225
1226 fn set_thread_name(&self, _name: &str) {}
1228
1229 fn message(&self, _text: &str) {}
1231
1232 fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1234
1235 fn unregister_allocation(&self, _ptr: u64) {}
1237}
1238
1239#[derive(Debug, Clone, thiserror::Error)]
1241pub enum ProfilerError {
1242 #[error("GPU profiler not available")]
1244 NotAvailable,
1245 #[error("GPU profiler not attached")]
1247 NotAttached,
1248 #[error("Capture already in progress")]
1250 CaptureInProgress,
1251 #[error("No capture in progress")]
1253 NoCaptureInProgress,
1254 #[error("Profiler error: {0}")]
1256 Backend(String),
1257}
1258
1259pub struct NullProfiler;
1261
1262impl GpuProfiler for NullProfiler {
1263 fn backend(&self) -> GpuProfilerBackend {
1264 GpuProfilerBackend::Custom
1265 }
1266}
1267
1268pub struct NvtxProfiler {
1273 available: bool,
1275 capture_in_progress: std::sync::atomic::AtomicBool,
1277}
1278
1279impl NvtxProfiler {
1280 pub fn new() -> Self {
1284 Self {
1285 available: false, capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1287 }
1288 }
1289
1290 pub fn is_nvtx_loaded(&self) -> bool {
1292 self.available
1294 }
1295}
1296
1297impl Default for NvtxProfiler {
1298 fn default() -> Self {
1299 Self::new()
1300 }
1301}
1302
1303impl GpuProfiler for NvtxProfiler {
1304 fn is_available(&self) -> bool {
1305 self.available
1306 }
1307
1308 fn backend(&self) -> GpuProfilerBackend {
1309 GpuProfilerBackend::Nsight
1310 }
1311
1312 fn start_capture(&self) -> Result<(), ProfilerError> {
1313 if !self.available {
1314 return Err(ProfilerError::NotAvailable);
1315 }
1316 if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1317 return Err(ProfilerError::CaptureInProgress);
1318 }
1319 Ok(())
1321 }
1322
1323 fn end_capture(&self) -> Result<(), ProfilerError> {
1324 if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1325 return Err(ProfilerError::NoCaptureInProgress);
1326 }
1327 Ok(())
1329 }
1330
1331 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1332 ProfilerRange::new(name, self.backend())
1334 }
1335
1336 fn pop_range(&self) {
1337 }
1339
1340 fn mark(&self, _name: &str, _color: ProfilerColor) {
1341 }
1343
1344 fn set_thread_name(&self, _name: &str) {
1345 }
1347}
1348
1349pub struct RenderDocProfiler {
1353 attached: bool,
1355}
1356
1357impl RenderDocProfiler {
1358 pub fn new() -> Self {
1362 Self {
1363 attached: false, }
1365 }
1366
1367 pub fn is_attached(&self) -> bool {
1369 self.attached
1371 }
1372
1373 pub fn get_capture_path(&self) -> Option<String> {
1375 None
1377 }
1378
1379 pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1381 if !self.attached {
1382 return Err(ProfilerError::NotAttached);
1383 }
1384 Ok(())
1386 }
1387}
1388
1389impl Default for RenderDocProfiler {
1390 fn default() -> Self {
1391 Self::new()
1392 }
1393}
1394
1395impl GpuProfiler for RenderDocProfiler {
1396 fn is_available(&self) -> bool {
1397 self.attached
1398 }
1399
1400 fn backend(&self) -> GpuProfilerBackend {
1401 GpuProfilerBackend::RenderDoc
1402 }
1403
1404 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1405 if !self.attached {
1406 return Err(ProfilerError::NotAttached);
1407 }
1408 Ok(())
1410 }
1411
1412 fn start_capture(&self) -> Result<(), ProfilerError> {
1413 if !self.attached {
1414 return Err(ProfilerError::NotAttached);
1415 }
1416 Ok(())
1418 }
1419
1420 fn end_capture(&self) -> Result<(), ProfilerError> {
1421 Ok(())
1423 }
1424
1425 fn set_thread_name(&self, _name: &str) {
1426 }
1428}
1429
1430#[cfg(target_os = "macos")]
1434pub struct MetalProfiler {
1435 available: bool,
1437}
1438
1439#[cfg(target_os = "macos")]
1440impl MetalProfiler {
1441 pub fn new() -> Self {
1443 Self { available: true }
1444 }
1445}
1446
1447#[cfg(target_os = "macos")]
1448impl Default for MetalProfiler {
1449 fn default() -> Self {
1450 Self::new()
1451 }
1452}
1453
1454#[cfg(target_os = "macos")]
1455impl GpuProfiler for MetalProfiler {
1456 fn is_available(&self) -> bool {
1457 self.available
1458 }
1459
1460 fn backend(&self) -> GpuProfilerBackend {
1461 GpuProfilerBackend::MetalSystemTrace
1462 }
1463
1464 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1465 ProfilerRange::new(name, self.backend())
1467 }
1468
1469 fn pop_range(&self) {
1470 }
1472
1473 fn mark(&self, _name: &str, _color: ProfilerColor) {
1474 }
1476}
1477
1478pub struct GpuProfilerManager {
1480 profiler: Arc<dyn GpuProfiler>,
1482 enabled: std::sync::atomic::AtomicBool,
1484}
1485
1486impl GpuProfilerManager {
1487 pub fn new() -> Self {
1489 let nvtx = NvtxProfiler::new();
1491 if nvtx.is_available() {
1492 return Self {
1493 profiler: Arc::new(nvtx),
1494 enabled: std::sync::atomic::AtomicBool::new(true),
1495 };
1496 }
1497
1498 let renderdoc = RenderDocProfiler::new();
1499 if renderdoc.is_available() {
1500 return Self {
1501 profiler: Arc::new(renderdoc),
1502 enabled: std::sync::atomic::AtomicBool::new(true),
1503 };
1504 }
1505
1506 Self {
1508 profiler: Arc::new(NullProfiler),
1509 enabled: std::sync::atomic::AtomicBool::new(false),
1510 }
1511 }
1512
1513 pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1515 let enabled = profiler.is_available();
1516 Self {
1517 profiler,
1518 enabled: std::sync::atomic::AtomicBool::new(enabled),
1519 }
1520 }
1521
1522 pub fn is_enabled(&self) -> bool {
1524 self.enabled.load(Ordering::Relaxed)
1525 }
1526
1527 pub fn set_enabled(&self, enabled: bool) {
1529 self.enabled.store(enabled, Ordering::Relaxed);
1530 }
1531
1532 pub fn backend(&self) -> GpuProfilerBackend {
1534 self.profiler.backend()
1535 }
1536
1537 pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1539 ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1540 }
1541
1542 pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1544 ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1545 }
1546
1547 pub fn mark(&self, name: &str) {
1549 if self.is_enabled() {
1550 self.profiler.mark(name, ProfilerColor::CYAN);
1551 }
1552 }
1553
1554 pub fn profiler(&self) -> &dyn GpuProfiler {
1556 &*self.profiler
1557 }
1558}
1559
1560impl Default for GpuProfilerManager {
1561 fn default() -> Self {
1562 Self::new()
1563 }
1564}
1565
1566pub struct ProfilerScope<'a> {
1568 profiler: &'a dyn GpuProfiler,
1569 enabled: bool,
1570}
1571
1572impl<'a> ProfilerScope<'a> {
1573 fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1574 if enabled {
1575 profiler.push_range(name, ProfilerColor::CYAN);
1576 }
1577 Self { profiler, enabled }
1578 }
1579
1580 fn new_colored(
1581 name: &str,
1582 profiler: &'a dyn GpuProfiler,
1583 enabled: bool,
1584 color: ProfilerColor,
1585 ) -> Self {
1586 if enabled {
1587 profiler.push_range(name, color);
1588 }
1589 Self { profiler, enabled }
1590 }
1591}
1592
1593impl<'a> Drop for ProfilerScope<'a> {
1594 fn drop(&mut self) {
1595 if self.enabled {
1596 self.profiler.pop_range();
1597 }
1598 }
1599}
1600
1601#[macro_export]
1615macro_rules! gpu_profile {
1616 ($profiler:expr, $name:expr) => {
1617 let _scope = $profiler.scope($name);
1618 };
1619 ($profiler:expr, $name:expr, $color:expr) => {
1620 let _scope = $profiler.scope_colored($name, $color);
1621 };
1622}
1623
1624#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1630pub enum GpuMemoryType {
1631 DeviceLocal,
1633 HostVisible,
1635 HostCoherent,
1637 Mapped,
1639 QueueBuffer,
1641 ControlBlock,
1643 SharedMemory,
1645}
1646
1647#[derive(Debug, Clone)]
1649pub struct GpuMemoryAllocation {
1650 pub id: u64,
1652 pub name: String,
1654 pub size: usize,
1656 pub memory_type: GpuMemoryType,
1658 pub device_index: u32,
1660 pub kernel_id: Option<String>,
1662 pub allocated_at: Instant,
1664 pub in_use: bool,
1666}
1667
1668#[derive(Debug, Clone, Default)]
1670pub struct GpuMemoryPoolStats {
1671 pub name: String,
1673 pub capacity: usize,
1675 pub allocated: usize,
1677 pub peak_allocated: usize,
1679 pub allocation_count: u32,
1681 pub total_allocations: u64,
1683 pub total_deallocations: u64,
1685 pub fragmentation: f32,
1687}
1688
1689impl GpuMemoryPoolStats {
1690 pub fn utilization(&self) -> f32 {
1692 if self.capacity == 0 {
1693 0.0
1694 } else {
1695 (self.allocated as f32 / self.capacity as f32) * 100.0
1696 }
1697 }
1698}
1699
1700#[derive(Debug, Clone, Default)]
1702pub struct GpuDeviceMemoryStats {
1703 pub device_index: u32,
1705 pub device_name: String,
1707 pub total_memory: u64,
1709 pub free_memory: u64,
1711 pub ringkernel_used: u64,
1713 pub other_used: u64,
1715 pub pools: Vec<GpuMemoryPoolStats>,
1717}
1718
1719impl GpuDeviceMemoryStats {
1720 pub fn used_memory(&self) -> u64 {
1722 self.total_memory - self.free_memory
1723 }
1724
1725 pub fn utilization(&self) -> f32 {
1727 if self.total_memory == 0 {
1728 0.0
1729 } else {
1730 (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1731 }
1732 }
1733}
1734
1735pub struct GpuMemoryDashboard {
1765 allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1767 device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1769 thresholds: GpuMemoryThresholds,
1771 allocation_counter: AtomicU64,
1773 total_allocated: AtomicU64,
1775 peak_allocated: AtomicU64,
1777}
1778
1779#[derive(Debug, Clone)]
1781pub struct GpuMemoryThresholds {
1782 pub warning: f32,
1784 pub critical: f32,
1786 pub max_allocation_size: usize,
1788 pub max_allocation_count: u32,
1790}
1791
1792impl Default for GpuMemoryThresholds {
1793 fn default() -> Self {
1794 Self {
1795 warning: 75.0,
1796 critical: 90.0,
1797 max_allocation_size: 1024 * 1024 * 1024, max_allocation_count: 10000,
1799 }
1800 }
1801}
1802
1803#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1805pub enum MemoryPressureLevel {
1806 Normal,
1808 Elevated,
1810 Warning,
1812 Critical,
1814 OutOfMemory,
1816}
1817
1818impl GpuMemoryDashboard {
1819 pub fn new() -> Arc<Self> {
1821 Arc::new(Self {
1822 allocations: RwLock::new(HashMap::new()),
1823 device_stats: RwLock::new(HashMap::new()),
1824 thresholds: GpuMemoryThresholds::default(),
1825 allocation_counter: AtomicU64::new(1),
1826 total_allocated: AtomicU64::new(0),
1827 peak_allocated: AtomicU64::new(0),
1828 })
1829 }
1830
1831 pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1833 Arc::new(Self {
1834 allocations: RwLock::new(HashMap::new()),
1835 device_stats: RwLock::new(HashMap::new()),
1836 thresholds,
1837 allocation_counter: AtomicU64::new(1),
1838 total_allocated: AtomicU64::new(0),
1839 peak_allocated: AtomicU64::new(0),
1840 })
1841 }
1842
1843 pub fn track_allocation(
1845 &self,
1846 id: u64,
1847 name: impl Into<String>,
1848 size: usize,
1849 memory_type: GpuMemoryType,
1850 device_index: u32,
1851 kernel_id: Option<&str>,
1852 ) {
1853 let allocation = GpuMemoryAllocation {
1854 id,
1855 name: name.into(),
1856 size,
1857 memory_type,
1858 device_index,
1859 kernel_id: kernel_id.map(String::from),
1860 allocated_at: Instant::now(),
1861 in_use: true,
1862 };
1863
1864 self.allocations.write().insert(id, allocation);
1865
1866 let new_total = self
1868 .total_allocated
1869 .fetch_add(size as u64, Ordering::Relaxed)
1870 + size as u64;
1871 let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1872 while new_total > peak {
1873 match self.peak_allocated.compare_exchange_weak(
1874 peak,
1875 new_total,
1876 Ordering::Relaxed,
1877 Ordering::Relaxed,
1878 ) {
1879 Ok(_) => break,
1880 Err(current) => peak = current,
1881 }
1882 }
1883 }
1884
1885 pub fn next_allocation_id(&self) -> u64 {
1887 self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1888 }
1889
1890 pub fn track_deallocation(&self, id: u64) {
1892 let mut allocations = self.allocations.write();
1893 if let Some(alloc) = allocations.remove(&id) {
1894 self.total_allocated
1895 .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1896 }
1897 }
1898
1899 pub fn mark_unused(&self, id: u64) {
1901 let mut allocations = self.allocations.write();
1902 if let Some(alloc) = allocations.get_mut(&id) {
1903 alloc.in_use = false;
1904 }
1905 }
1906
1907 pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1909 let stats = GpuDeviceMemoryStats {
1910 device_index,
1911 device_name: name.into(),
1912 total_memory,
1913 free_memory: total_memory,
1914 ringkernel_used: 0,
1915 other_used: 0,
1916 pools: Vec::new(),
1917 };
1918 self.device_stats.write().insert(device_index, stats);
1919 }
1920
1921 pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1923 let mut stats = self.device_stats.write();
1924 if let Some(device) = stats.get_mut(&device_index) {
1925 device.free_memory = free_memory;
1926 device.ringkernel_used = ringkernel_used;
1927 device.other_used = device
1928 .total_memory
1929 .saturating_sub(free_memory + ringkernel_used);
1930 }
1931 }
1932
1933 pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1935 self.device_stats.read().get(&device_index).cloned()
1936 }
1937
1938 pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1940 self.device_stats.read().values().cloned().collect()
1941 }
1942
1943 pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1945 self.allocations.read().values().cloned().collect()
1946 }
1947
1948 pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1950 self.allocations
1951 .read()
1952 .values()
1953 .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1954 .cloned()
1955 .collect()
1956 }
1957
1958 pub fn total_allocated(&self) -> u64 {
1960 self.total_allocated.load(Ordering::Relaxed)
1961 }
1962
1963 pub fn peak_allocated(&self) -> u64 {
1965 self.peak_allocated.load(Ordering::Relaxed)
1966 }
1967
1968 pub fn allocation_count(&self) -> usize {
1970 self.allocations.read().len()
1971 }
1972
1973 pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1975 let stats = self.device_stats.read();
1976 if let Some(device) = stats.get(&device_index) {
1977 let utilization = device.utilization();
1978 if device.free_memory == 0 {
1979 MemoryPressureLevel::OutOfMemory
1980 } else if utilization >= self.thresholds.critical {
1981 MemoryPressureLevel::Critical
1982 } else if utilization >= self.thresholds.warning {
1983 MemoryPressureLevel::Warning
1984 } else if utilization >= self.thresholds.warning * 0.8 {
1985 MemoryPressureLevel::Elevated
1986 } else {
1987 MemoryPressureLevel::Normal
1988 }
1989 } else {
1990 MemoryPressureLevel::Normal
1991 }
1992 }
1993
1994 pub fn grafana_panel(&self) -> GrafanaPanel {
1996 GrafanaPanel {
1997 title: "GPU Memory Usage".to_string(),
1998 panel_type: PanelType::BarGauge,
1999 queries: vec![
2000 "ringkernel_gpu_memory_allocated_bytes".to_string(),
2001 "ringkernel_gpu_memory_peak_bytes".to_string(),
2002 ],
2003 grid_pos: (0, 0, 12, 8),
2004 unit: Some("bytes".to_string()),
2005 }
2006 }
2007
2008 pub fn prometheus_metrics(&self) -> String {
2010 let mut output = String::new();
2011
2012 writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").unwrap();
2014 writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge").unwrap();
2015 writeln!(
2016 output,
2017 "ringkernel_gpu_memory_allocated_bytes {}",
2018 self.total_allocated()
2019 )
2020 .unwrap();
2021
2022 writeln!(
2024 output,
2025 "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2026 )
2027 .unwrap();
2028 writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge").unwrap();
2029 writeln!(
2030 output,
2031 "ringkernel_gpu_memory_peak_bytes {}",
2032 self.peak_allocated()
2033 )
2034 .unwrap();
2035
2036 writeln!(
2038 output,
2039 "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2040 )
2041 .unwrap();
2042 writeln!(
2043 output,
2044 "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2045 )
2046 .unwrap();
2047 writeln!(
2048 output,
2049 "ringkernel_gpu_memory_allocation_count {}",
2050 self.allocation_count()
2051 )
2052 .unwrap();
2053
2054 let device_stats = self.device_stats.read();
2056 for device in device_stats.values() {
2057 writeln!(
2058 output,
2059 "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2060 device.device_name, device.total_memory
2061 )
2062 .unwrap();
2063 writeln!(
2064 output,
2065 "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2066 device.device_name, device.free_memory
2067 )
2068 .unwrap();
2069 writeln!(
2070 output,
2071 "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2072 device.device_name,
2073 device.used_memory()
2074 )
2075 .unwrap();
2076 writeln!(
2077 output,
2078 "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2079 device.device_name,
2080 device.utilization()
2081 )
2082 .unwrap();
2083 }
2084
2085 output
2086 }
2087
2088 pub fn summary_report(&self) -> String {
2090 let mut report = String::new();
2091
2092 writeln!(report, "=== GPU Memory Dashboard ===").unwrap();
2093 writeln!(report, "Total Allocated: {} bytes", self.total_allocated()).unwrap();
2094 writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated()).unwrap();
2095 writeln!(report, "Active Allocations: {}", self.allocation_count()).unwrap();
2096 writeln!(report).unwrap();
2097
2098 let device_stats = self.device_stats.read();
2100 for device in device_stats.values() {
2101 writeln!(
2102 report,
2103 "--- Device {} ({}) ---",
2104 device.device_index, device.device_name
2105 )
2106 .unwrap();
2107 writeln!(
2108 report,
2109 " Total: {} MB",
2110 device.total_memory / (1024 * 1024)
2111 )
2112 .unwrap();
2113 writeln!(report, " Free: {} MB", device.free_memory / (1024 * 1024)).unwrap();
2114 writeln!(
2115 report,
2116 " RingKernel: {} MB",
2117 device.ringkernel_used / (1024 * 1024)
2118 )
2119 .unwrap();
2120 writeln!(report, " Utilization: {:.1}%", device.utilization()).unwrap();
2121 writeln!(
2122 report,
2123 " Pressure: {:?}",
2124 self.check_pressure(device.device_index)
2125 )
2126 .unwrap();
2127 }
2128
2129 let allocations = self.allocations.read();
2131 let mut sorted_allocs: Vec<_> = allocations.values().collect();
2132 sorted_allocs.sort_by(|a, b| b.size.cmp(&a.size));
2133
2134 if !sorted_allocs.is_empty() {
2135 writeln!(report).unwrap();
2136 writeln!(report, "--- Top 10 Allocations ---").unwrap();
2137 for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2138 writeln!(
2139 report,
2140 " {}. {} - {} bytes ({:?})",
2141 i + 1,
2142 alloc.name,
2143 alloc.size,
2144 alloc.memory_type
2145 )
2146 .unwrap();
2147 }
2148 }
2149
2150 report
2151 }
2152}
2153
2154impl Default for GpuMemoryDashboard {
2155 fn default() -> Self {
2156 Self {
2157 allocations: RwLock::new(HashMap::new()),
2158 device_stats: RwLock::new(HashMap::new()),
2159 thresholds: GpuMemoryThresholds::default(),
2160 allocation_counter: AtomicU64::new(1),
2161 total_allocated: AtomicU64::new(0),
2162 peak_allocated: AtomicU64::new(0),
2163 }
2164 }
2165}
2166
2167#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2173pub enum OtlpTransport {
2174 #[default]
2176 HttpJson,
2177 HttpProtobuf,
2179 Grpc,
2181}
2182
2183#[derive(Debug, Clone)]
2185pub struct OtlpConfig {
2186 pub endpoint: String,
2188 pub transport: OtlpTransport,
2190 pub service_name: String,
2192 pub service_version: String,
2194 pub service_instance_id: Option<String>,
2196 pub resource_attributes: Vec<(String, String)>,
2198 pub batch_size: usize,
2200 pub export_interval: Duration,
2202 pub timeout: Duration,
2204 pub max_retries: u32,
2206 pub retry_delay: Duration,
2208 pub authorization: Option<String>,
2210}
2211
2212impl Default for OtlpConfig {
2213 fn default() -> Self {
2214 Self {
2215 endpoint: "http://localhost:4318/v1/traces".to_string(),
2216 transport: OtlpTransport::HttpJson,
2217 service_name: "ringkernel".to_string(),
2218 service_version: env!("CARGO_PKG_VERSION").to_string(),
2219 service_instance_id: None,
2220 resource_attributes: Vec::new(),
2221 batch_size: 512,
2222 export_interval: Duration::from_secs(5),
2223 timeout: Duration::from_secs(30),
2224 max_retries: 3,
2225 retry_delay: Duration::from_millis(100),
2226 authorization: None,
2227 }
2228 }
2229}
2230
2231impl OtlpConfig {
2232 pub fn new(endpoint: impl Into<String>) -> Self {
2234 Self {
2235 endpoint: endpoint.into(),
2236 ..Default::default()
2237 }
2238 }
2239
2240 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
2242 self.service_name = name.into();
2243 self
2244 }
2245
2246 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
2248 self.service_version = version.into();
2249 self
2250 }
2251
2252 pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
2254 self.service_instance_id = Some(id.into());
2255 self
2256 }
2257
2258 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2260 self.resource_attributes.push((key.into(), value.into()));
2261 self
2262 }
2263
2264 pub fn with_batch_size(mut self, size: usize) -> Self {
2266 self.batch_size = size;
2267 self
2268 }
2269
2270 pub fn with_export_interval(mut self, interval: Duration) -> Self {
2272 self.export_interval = interval;
2273 self
2274 }
2275
2276 pub fn with_authorization(mut self, auth: impl Into<String>) -> Self {
2278 self.authorization = Some(auth.into());
2279 self
2280 }
2281
2282 pub fn jaeger(endpoint: impl Into<String>) -> Self {
2284 Self::new(endpoint).with_service_name("ringkernel")
2285 }
2286
2287 pub fn honeycomb(api_key: impl Into<String>) -> Self {
2289 Self::new("https://api.honeycomb.io/v1/traces")
2290 .with_authorization(format!("x-honeycomb-team {}", api_key.into()))
2291 }
2292
2293 pub fn grafana_cloud(instance_id: impl Into<String>, api_key: impl Into<String>) -> Self {
2295 let instance = instance_id.into();
2296 Self::new("https://otlp-gateway-prod-us-central-0.grafana.net/otlp/v1/traces")
2297 .with_authorization(format!("Basic {}", api_key.into()))
2298 .with_attribute("grafana.instance", instance)
2299 }
2300}
2301
2302#[derive(Debug, Clone)]
2304pub struct OtlpExportResult {
2305 pub spans_exported: usize,
2307 pub success: bool,
2309 pub error: Option<String>,
2311 pub duration: Duration,
2313 pub retry_count: u32,
2315}
2316
2317#[derive(Debug, Clone, Default)]
2319pub struct OtlpExporterStats {
2320 pub total_spans_exported: u64,
2322 pub total_exports: u64,
2324 pub successful_exports: u64,
2326 pub failed_exports: u64,
2328 pub total_retries: u64,
2330 pub buffered_spans: usize,
2332 pub last_export: Option<Instant>,
2334 pub last_error: Option<String>,
2336}
2337
2338pub struct OtlpExporter {
2342 config: OtlpConfig,
2343 buffer: RwLock<Vec<Span>>,
2344 stats: RwLock<OtlpExporterStats>,
2345}
2346
2347impl OtlpExporter {
2348 pub fn new(config: OtlpConfig) -> Self {
2350 Self {
2351 config,
2352 buffer: RwLock::new(Vec::new()),
2353 stats: RwLock::new(OtlpExporterStats::default()),
2354 }
2355 }
2356
2357 pub fn jaeger_local() -> Self {
2359 Self::new(OtlpConfig::jaeger("http://localhost:4318/v1/traces"))
2360 }
2361
2362 pub fn config(&self) -> &OtlpConfig {
2364 &self.config
2365 }
2366
2367 pub fn stats(&self) -> OtlpExporterStats {
2369 self.stats.read().clone()
2370 }
2371
2372 pub fn export_span(&self, span: Span) {
2374 let mut buffer = self.buffer.write();
2375 buffer.push(span);
2376
2377 let should_flush = buffer.len() >= self.config.batch_size;
2378 drop(buffer);
2379
2380 if should_flush {
2381 let _ = self.flush();
2382 }
2383 }
2384
2385 pub fn export_spans(&self, spans: Vec<Span>) {
2387 let mut buffer = self.buffer.write();
2388 buffer.extend(spans);
2389
2390 let should_flush = buffer.len() >= self.config.batch_size;
2391 drop(buffer);
2392
2393 if should_flush {
2394 let _ = self.flush();
2395 }
2396 }
2397
2398 pub fn buffered_count(&self) -> usize {
2400 self.buffer.read().len()
2401 }
2402
2403 pub fn flush(&self) -> OtlpExportResult {
2405 let spans: Vec<Span> = {
2406 let mut buffer = self.buffer.write();
2407 std::mem::take(&mut *buffer)
2408 };
2409
2410 if spans.is_empty() {
2411 return OtlpExportResult {
2412 spans_exported: 0,
2413 success: true,
2414 error: None,
2415 duration: Duration::ZERO,
2416 retry_count: 0,
2417 };
2418 }
2419
2420 let start = Instant::now();
2421 let result = self.send_spans(&spans);
2422 let duration = start.elapsed();
2423
2424 {
2426 let mut stats = self.stats.write();
2427 stats.total_exports += 1;
2428 stats.last_export = Some(Instant::now());
2429
2430 if result.success {
2431 stats.successful_exports += 1;
2432 stats.total_spans_exported += spans.len() as u64;
2433 } else {
2434 stats.failed_exports += 1;
2435 stats.last_error = result.error.clone();
2436 let mut buffer = self.buffer.write();
2438 buffer.extend(spans);
2439 }
2440 stats.total_retries += result.retry_count as u64;
2441 stats.buffered_spans = self.buffer.read().len();
2442 }
2443
2444 OtlpExportResult {
2445 spans_exported: if result.success {
2446 result.spans_exported
2447 } else {
2448 0
2449 },
2450 duration,
2451 ..result
2452 }
2453 }
2454
2455 fn send_spans(&self, spans: &[Span]) -> OtlpExportResult {
2457 #[cfg(not(feature = "alerting"))]
2459 {
2460 eprintln!(
2461 "[OTLP stub] Would export {} spans to {} (enable 'alerting' feature for HTTP export)",
2462 spans.len(),
2463 self.config.endpoint
2464 );
2465 OtlpExportResult {
2466 spans_exported: spans.len(),
2467 success: true,
2468 error: None,
2469 duration: Duration::ZERO,
2470 retry_count: 0,
2471 }
2472 }
2473
2474 #[cfg(feature = "alerting")]
2475 {
2476 self.send_spans_http(spans)
2477 }
2478 }
2479
2480 #[cfg(feature = "alerting")]
2482 fn send_spans_http(&self, spans: &[Span]) -> OtlpExportResult {
2483 let payload = self.build_otlp_json(spans);
2484
2485 let client = reqwest::blocking::Client::builder()
2486 .timeout(self.config.timeout)
2487 .build();
2488
2489 let client = match client {
2490 Ok(c) => c,
2491 Err(e) => {
2492 return OtlpExportResult {
2493 spans_exported: 0,
2494 success: false,
2495 error: Some(format!("Failed to create HTTP client: {}", e)),
2496 duration: Duration::ZERO,
2497 retry_count: 0,
2498 };
2499 }
2500 };
2501
2502 let mut retry_count = 0;
2503 let mut last_error = None;
2504
2505 for attempt in 0..=self.config.max_retries {
2506 let mut request = client
2507 .post(&self.config.endpoint)
2508 .header("Content-Type", "application/json")
2509 .body(payload.clone());
2510
2511 if let Some(auth) = &self.config.authorization {
2512 request = request.header("Authorization", auth);
2513 }
2514
2515 match request.send() {
2516 Ok(response) => {
2517 if response.status().is_success() {
2518 return OtlpExportResult {
2519 spans_exported: spans.len(),
2520 success: true,
2521 error: None,
2522 duration: Duration::ZERO,
2523 retry_count,
2524 };
2525 } else {
2526 last_error = Some(format!(
2527 "HTTP {}: {}",
2528 response.status(),
2529 response.status().as_str()
2530 ));
2531 }
2532 }
2533 Err(e) => {
2534 last_error = Some(format!("Request failed: {}", e));
2535 }
2536 }
2537
2538 if attempt < self.config.max_retries {
2539 retry_count += 1;
2540 std::thread::sleep(self.config.retry_delay * (1 << attempt));
2541 }
2542 }
2543
2544 OtlpExportResult {
2545 spans_exported: 0,
2546 success: false,
2547 error: last_error,
2548 duration: Duration::ZERO,
2549 retry_count,
2550 }
2551 }
2552
2553 #[cfg(feature = "alerting")]
2555 fn build_otlp_json(&self, spans: &[Span]) -> String {
2556 use std::fmt::Write;
2557
2558 let mut json = String::with_capacity(4096);
2559
2560 json.push_str(r#"{"resourceSpans":[{"resource":{"attributes":["#);
2562
2563 let _ = write!(
2565 json,
2566 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}}"#,
2567 escape_json_str(&self.config.service_name)
2568 );
2569
2570 let _ = write!(
2572 json,
2573 r#",{{"key":"service.version","value":{{"stringValue":"{}"}}}}"#,
2574 escape_json_str(&self.config.service_version)
2575 );
2576
2577 if let Some(instance_id) = &self.config.service_instance_id {
2579 let _ = write!(
2580 json,
2581 r#",{{"key":"service.instance.id","value":{{"stringValue":"{}"}}}}"#,
2582 escape_json_str(instance_id)
2583 );
2584 }
2585
2586 for (key, value) in &self.config.resource_attributes {
2588 let _ = write!(
2589 json,
2590 r#",{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
2591 escape_json_str(key),
2592 escape_json_str(value)
2593 );
2594 }
2595
2596 json.push_str(r#"]},"scopeSpans":[{"scope":{"name":"ringkernel"},"spans":["#);
2597
2598 let mut first = true;
2600 for span in spans {
2601 if !first {
2602 json.push(',');
2603 }
2604 first = false;
2605 self.span_to_json(&mut json, span);
2606 }
2607
2608 json.push_str("]}]}]}");
2609 json
2610 }
2611
2612 #[cfg(feature = "alerting")]
2614 fn span_to_json(&self, json: &mut String, span: &Span) {
2615 use std::fmt::Write;
2616
2617 let _ = write!(
2618 json,
2619 r#"{{"traceId":"{}","spanId":"{}""#,
2620 span.trace_id.to_hex(),
2621 span.span_id.to_hex()
2622 );
2623
2624 if let Some(parent) = span.parent_span_id {
2625 let _ = write!(json, r#","parentSpanId":"{}""#, parent.to_hex());
2626 }
2627
2628 let _ = write!(
2629 json,
2630 r#","name":"{}","kind":{}"#,
2631 escape_json_str(&span.name),
2632 match span.kind {
2633 SpanKind::Internal => 1,
2634 SpanKind::Server => 2,
2635 SpanKind::Client => 3,
2636 SpanKind::Producer => 4,
2637 SpanKind::Consumer => 5,
2638 }
2639 );
2640
2641 let start_nanos = span.start_time.elapsed().as_nanos();
2643 let end_nanos = span
2644 .end_time
2645 .map(|t| t.elapsed().as_nanos())
2646 .unwrap_or(start_nanos);
2647
2648 let _ = write!(
2650 json,
2651 r#","startTimeUnixNano":"{}","endTimeUnixNano":"{}""#,
2652 start_nanos, end_nanos
2653 );
2654
2655 let _ = write!(
2657 json,
2658 r#","status":{{"code":{}}}"#,
2659 match &span.status {
2660 SpanStatus::Unset => 0,
2661 SpanStatus::Ok => 1,
2662 SpanStatus::Error { .. } => 2,
2663 }
2664 );
2665
2666 if !span.attributes.is_empty() {
2668 json.push_str(r#","attributes":["#);
2669 let mut first = true;
2670 for (key, value) in &span.attributes {
2671 if !first {
2672 json.push(',');
2673 }
2674 first = false;
2675 let _ = write!(
2676 json,
2677 r#"{{"key":"{}","value":{}}}"#,
2678 escape_json_str(key),
2679 attribute_value_to_json(value)
2680 );
2681 }
2682 json.push(']');
2683 }
2684
2685 if !span.events.is_empty() {
2687 json.push_str(r#","events":["#);
2688 let mut first = true;
2689 for event in &span.events {
2690 if !first {
2691 json.push(',');
2692 }
2693 first = false;
2694 let _ = write!(
2695 json,
2696 r#"{{"name":"{}","timeUnixNano":"{}"}}"#,
2697 escape_json_str(&event.name),
2698 event.timestamp.elapsed().as_nanos()
2699 );
2700 }
2701 json.push(']');
2702 }
2703
2704 json.push('}');
2705 }
2706}
2707
2708#[cfg(feature = "alerting")]
2710fn escape_json_str(s: &str) -> String {
2711 s.replace('\\', "\\\\")
2712 .replace('"', "\\\"")
2713 .replace('\n', "\\n")
2714 .replace('\r', "\\r")
2715 .replace('\t', "\\t")
2716}
2717
2718#[cfg(feature = "alerting")]
2720fn attribute_value_to_json(value: &AttributeValue) -> String {
2721 match value {
2722 AttributeValue::String(s) => format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)),
2723 AttributeValue::Int(i) => format!(r#"{{"intValue":"{}"}}"#, i),
2724 AttributeValue::Float(f) => format!(r#"{{"doubleValue":{}}}"#, f),
2725 AttributeValue::Bool(b) => format!(r#"{{"boolValue":{}}}"#, b),
2726 AttributeValue::StringArray(arr) => {
2727 let values: Vec<String> = arr
2728 .iter()
2729 .map(|s| format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)))
2730 .collect();
2731 format!(r#"{{"arrayValue":{{"values":[{}]}}}}"#, values.join(","))
2732 }
2733 }
2734}
2735
2736#[cfg(test)]
2737mod tests {
2738 use super::*;
2739 use crate::runtime::KernelId;
2740
2741 #[test]
2742 fn test_trace_id_generation() {
2743 let id1 = TraceId::new();
2744 let id2 = TraceId::new();
2745 assert_ne!(id1.0, id2.0);
2746 }
2747
2748 #[test]
2749 fn test_trace_id_hex() {
2750 let id = TraceId(0x123456789abcdef0123456789abcdef0);
2751 let hex = id.to_hex();
2752 assert_eq!(hex.len(), 32);
2753 let parsed = TraceId::from_hex(&hex).unwrap();
2754 assert_eq!(id, parsed);
2755 }
2756
2757 #[test]
2758 fn test_span_creation() {
2759 let span = Span::new("test_operation", SpanKind::Internal);
2760 assert!(!span.is_ended());
2761 assert_eq!(span.name, "test_operation");
2762 }
2763
2764 #[test]
2765 fn test_span_child() {
2766 let parent = Span::new("parent", SpanKind::Server);
2767 let child = parent.child("child", SpanKind::Internal);
2768
2769 assert_eq!(child.trace_id, parent.trace_id);
2770 assert_eq!(child.parent_span_id, Some(parent.span_id));
2771 }
2772
2773 #[test]
2774 fn test_span_attributes() {
2775 let mut span = Span::new("test", SpanKind::Internal);
2776 span.set_attribute("string_key", "value");
2777 span.set_attribute("int_key", 42i64);
2778 span.set_attribute("bool_key", true);
2779
2780 assert_eq!(span.attributes.len(), 3);
2781 }
2782
2783 #[test]
2784 fn test_span_events() {
2785 let mut span = Span::new("test", SpanKind::Internal);
2786 span.add_event("event1");
2787 span.add_event("event2");
2788
2789 assert_eq!(span.events.len(), 2);
2790 }
2791
2792 #[test]
2793 fn test_span_builder() {
2794 let parent = Span::new("parent", SpanKind::Server);
2795 let span = SpanBuilder::new("child")
2796 .kind(SpanKind::Client)
2797 .parent(&parent)
2798 .attribute("key", "value")
2799 .build();
2800
2801 assert_eq!(span.trace_id, parent.trace_id);
2802 assert_eq!(span.kind, SpanKind::Client);
2803 assert!(span.attributes.contains_key("key"));
2804 }
2805
2806 #[test]
2807 fn test_prometheus_exporter() {
2808 let exporter = PrometheusExporter::new();
2809 exporter.register_counter("test_counter", "A test counter", &["label1"]);
2810 exporter.register_gauge("test_gauge", "A test gauge", &[]);
2811
2812 exporter.inc_counter("test_counter", &["value1"]);
2813 exporter.inc_counter("test_counter", &["value1"]);
2814 exporter.set_metric("test_gauge", 42.0, &[]);
2815
2816 let output = exporter.render();
2817 assert!(output.contains("test_counter"));
2818 assert!(output.contains("test_gauge"));
2819 }
2820
2821 #[test]
2822 fn test_grafana_dashboard() {
2823 let dashboard = GrafanaDashboard::new("Test Dashboard")
2824 .description("A test dashboard")
2825 .add_throughput_panel()
2826 .add_latency_panel()
2827 .build();
2828
2829 assert!(dashboard.contains("Test Dashboard"));
2830 assert!(dashboard.contains("Message Throughput"));
2831 assert!(dashboard.contains("Message Latency"));
2832 }
2833
2834 #[test]
2835 fn test_observability_context() {
2836 let ctx = ObservabilityContext::new();
2837
2838 let span = ctx.start_span("test", SpanKind::Internal);
2839 assert_eq!(ctx.active_span_count(), 1);
2840
2841 ctx.end_span(span);
2842 assert_eq!(ctx.active_span_count(), 0);
2843
2844 let exported = ctx.export_spans();
2845 assert_eq!(exported.len(), 1);
2846 }
2847
2848 #[test]
2849 fn test_ringkernel_collector() {
2850 let collector = Arc::new(MetricsCollector::new());
2851 let kernel_id = KernelId::new("test");
2852
2853 collector.record_message_processed(&kernel_id, 100);
2854 collector.record_message_processed(&kernel_id, 200);
2855
2856 let prom_collector = RingKernelCollector::new(collector);
2857 let defs = prom_collector.definitions();
2858 let samples = prom_collector.collect();
2859
2860 assert!(!defs.is_empty());
2861 assert!(!samples.is_empty());
2862 }
2863
2864 #[test]
2867 fn test_profiler_color() {
2868 let color = ProfilerColor::new(128, 64, 32);
2869 assert_eq!(color.r, 128);
2870 assert_eq!(color.g, 64);
2871 assert_eq!(color.b, 32);
2872 assert_eq!(color.a, 255);
2873
2874 assert_eq!(ProfilerColor::RED.r, 255);
2875 assert_eq!(ProfilerColor::GREEN.g, 255);
2876 assert_eq!(ProfilerColor::BLUE.b, 255);
2877 }
2878
2879 #[test]
2880 fn test_null_profiler() {
2881 let profiler = NullProfiler;
2882 assert!(!profiler.is_available());
2883 assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2884
2885 assert!(profiler.start_capture().is_ok());
2887 assert!(profiler.end_capture().is_ok());
2888 assert!(profiler.trigger_capture().is_ok());
2889
2890 let range = profiler.push_range("test", ProfilerColor::RED);
2891 let _elapsed = range.elapsed(); profiler.pop_range();
2893 profiler.mark("marker", ProfilerColor::BLUE);
2894 profiler.set_thread_name("thread");
2895 }
2896
2897 #[test]
2898 fn test_nvtx_profiler_stub() {
2899 let profiler = NvtxProfiler::new();
2900 assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2901
2902 assert!(!profiler.is_available());
2904 assert!(!profiler.is_nvtx_loaded());
2905
2906 assert!(matches!(
2908 profiler.start_capture(),
2909 Err(ProfilerError::NotAvailable)
2910 ));
2911 }
2912
2913 #[test]
2914 fn test_renderdoc_profiler_stub() {
2915 let profiler = RenderDocProfiler::new();
2916 assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2917
2918 assert!(!profiler.is_available());
2920 assert!(!profiler.is_attached());
2921 assert!(profiler.get_capture_path().is_none());
2922
2923 assert!(matches!(
2925 profiler.launch_ui(),
2926 Err(ProfilerError::NotAttached)
2927 ));
2928 }
2929
2930 #[test]
2931 fn test_gpu_profiler_manager() {
2932 let manager = GpuProfilerManager::new();
2933
2934 assert!(!manager.is_enabled());
2936 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2937
2938 manager.set_enabled(true);
2940 assert!(manager.is_enabled());
2941 manager.set_enabled(false);
2942 assert!(!manager.is_enabled());
2943 }
2944
2945 #[test]
2946 fn test_profiler_scope() {
2947 let manager = GpuProfilerManager::new();
2948
2949 {
2951 let _scope = manager.scope("test_scope");
2952 }
2954
2955 {
2956 let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2957 }
2958
2959 manager.mark("test_marker");
2961 }
2962
2963 #[test]
2964 fn test_profiler_with_custom() {
2965 let custom_profiler = Arc::new(NullProfiler);
2966 let manager = GpuProfilerManager::with_profiler(custom_profiler);
2967
2968 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2969 }
2970
2971 #[test]
2972 fn test_profiler_range_elapsed() {
2973 let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2974 std::thread::sleep(std::time::Duration::from_millis(10));
2975 let elapsed = range.elapsed();
2976 assert!(elapsed.as_millis() >= 10);
2977 }
2978
2979 #[test]
2980 fn test_profiler_error_display() {
2981 let err = ProfilerError::NotAvailable;
2982 assert!(err.to_string().contains("not available"));
2983
2984 let err = ProfilerError::NotAttached;
2985 assert!(err.to_string().contains("not attached"));
2986
2987 let err = ProfilerError::CaptureInProgress;
2988 assert!(err.to_string().contains("in progress"));
2989
2990 let err = ProfilerError::Backend("test error".to_string());
2991 assert!(err.to_string().contains("test error"));
2992 }
2993
2994 #[test]
2997 fn test_gpu_memory_dashboard_creation() {
2998 let dashboard = GpuMemoryDashboard::new();
2999 assert_eq!(dashboard.total_allocated(), 0);
3000 assert_eq!(dashboard.peak_allocated(), 0);
3001 assert_eq!(dashboard.allocation_count(), 0);
3002 }
3003
3004 #[test]
3005 fn test_gpu_memory_allocation_tracking() {
3006 let dashboard = GpuMemoryDashboard::new();
3007
3008 dashboard.track_allocation(
3010 1,
3011 "test_buffer",
3012 65536,
3013 GpuMemoryType::DeviceLocal,
3014 0,
3015 Some("test_kernel"),
3016 );
3017
3018 assert_eq!(dashboard.total_allocated(), 65536);
3019 assert_eq!(dashboard.peak_allocated(), 65536);
3020 assert_eq!(dashboard.allocation_count(), 1);
3021
3022 dashboard.track_allocation(
3024 2,
3025 "queue_buffer",
3026 1024,
3027 GpuMemoryType::QueueBuffer,
3028 0,
3029 Some("test_kernel"),
3030 );
3031
3032 assert_eq!(dashboard.total_allocated(), 66560);
3033 assert_eq!(dashboard.peak_allocated(), 66560);
3034 assert_eq!(dashboard.allocation_count(), 2);
3035
3036 dashboard.track_deallocation(1);
3038 assert_eq!(dashboard.total_allocated(), 1024);
3039 assert_eq!(dashboard.peak_allocated(), 66560); assert_eq!(dashboard.allocation_count(), 1);
3041 }
3042
3043 #[test]
3044 fn test_gpu_memory_device_stats() {
3045 let dashboard = GpuMemoryDashboard::new();
3046
3047 dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); let stats = dashboard.get_device_stats(0).unwrap();
3051 assert_eq!(stats.device_index, 0);
3052 assert_eq!(stats.device_name, "NVIDIA RTX 4090");
3053 assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
3054 assert_eq!(stats.utilization(), 0.0);
3055
3056 let used = 8 * 1024 * 1024 * 1024; let free = 16 * 1024 * 1024 * 1024; dashboard.update_device_stats(0, free, used);
3060
3061 let stats = dashboard.get_device_stats(0).unwrap();
3062 assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
3063 }
3064
3065 #[test]
3066 fn test_gpu_memory_pressure_levels() {
3067 let dashboard = GpuMemoryDashboard::new();
3068
3069 dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
3071
3072 dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
3074 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
3075
3076 dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
3078 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
3079
3080 dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
3082 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
3083
3084 dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
3086 assert_eq!(
3087 dashboard.check_pressure(0),
3088 MemoryPressureLevel::OutOfMemory
3089 );
3090 }
3091
3092 #[test]
3093 fn test_gpu_memory_kernel_allocations() {
3094 let dashboard = GpuMemoryDashboard::new();
3095
3096 dashboard.track_allocation(
3098 1,
3099 "buf1",
3100 1000,
3101 GpuMemoryType::DeviceLocal,
3102 0,
3103 Some("kernel_a"),
3104 );
3105 dashboard.track_allocation(
3106 2,
3107 "buf2",
3108 2000,
3109 GpuMemoryType::DeviceLocal,
3110 0,
3111 Some("kernel_a"),
3112 );
3113 dashboard.track_allocation(
3114 3,
3115 "buf3",
3116 3000,
3117 GpuMemoryType::DeviceLocal,
3118 0,
3119 Some("kernel_b"),
3120 );
3121
3122 let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
3123 assert_eq!(kernel_a_allocs.len(), 2);
3124
3125 let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
3126 assert_eq!(kernel_b_allocs.len(), 1);
3127
3128 let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
3129 assert_eq!(kernel_c_allocs.len(), 0);
3130 }
3131
3132 #[test]
3133 fn test_gpu_memory_prometheus_metrics() {
3134 let dashboard = GpuMemoryDashboard::new();
3135 dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
3136 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3137
3138 let metrics = dashboard.prometheus_metrics();
3139 assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
3140 assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
3141 assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
3142 }
3143
3144 #[test]
3145 fn test_gpu_memory_summary_report() {
3146 let dashboard = GpuMemoryDashboard::new();
3147 dashboard.track_allocation(
3148 1,
3149 "large_buffer",
3150 1024 * 1024,
3151 GpuMemoryType::DeviceLocal,
3152 0,
3153 None,
3154 );
3155 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3156
3157 let report = dashboard.summary_report();
3158 assert!(report.contains("GPU Memory Dashboard"));
3159 assert!(report.contains("large_buffer"));
3160 }
3161
3162 #[test]
3163 fn test_gpu_memory_pool_stats() {
3164 let pool_stats = GpuMemoryPoolStats {
3165 name: "default".to_string(),
3166 capacity: 1024 * 1024,
3167 allocated: 512 * 1024,
3168 peak_allocated: 768 * 1024,
3169 allocation_count: 10,
3170 total_allocations: 100,
3171 total_deallocations: 90,
3172 fragmentation: 0.1,
3173 };
3174
3175 assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
3176 }
3177
3178 #[test]
3179 fn test_gpu_memory_types() {
3180 let types = [
3182 GpuMemoryType::DeviceLocal,
3183 GpuMemoryType::HostVisible,
3184 GpuMemoryType::HostCoherent,
3185 GpuMemoryType::Mapped,
3186 GpuMemoryType::QueueBuffer,
3187 GpuMemoryType::ControlBlock,
3188 GpuMemoryType::SharedMemory,
3189 ];
3190
3191 for (i, t1) in types.iter().enumerate() {
3192 for (j, t2) in types.iter().enumerate() {
3193 if i != j {
3194 assert_ne!(t1, t2);
3195 }
3196 }
3197 }
3198 }
3199
3200 #[test]
3201 fn test_gpu_memory_grafana_panel() {
3202 let dashboard = GpuMemoryDashboard::new();
3203 let panel = dashboard.grafana_panel();
3204
3205 assert_eq!(panel.title, "GPU Memory Usage");
3206 assert_eq!(panel.panel_type, PanelType::BarGauge);
3207 assert!(!panel.queries.is_empty());
3208 }
3209
3210 #[test]
3211 fn test_gpu_memory_allocation_id_generation() {
3212 let dashboard = GpuMemoryDashboard::new();
3213
3214 let id1 = dashboard.next_allocation_id();
3215 let id2 = dashboard.next_allocation_id();
3216 let id3 = dashboard.next_allocation_id();
3217
3218 assert_eq!(id1, 1);
3219 assert_eq!(id2, 2);
3220 assert_eq!(id3, 3);
3221 }
3222
3223 #[test]
3226 fn test_otlp_config_default() {
3227 let config = OtlpConfig::default();
3228 assert_eq!(config.endpoint, "http://localhost:4318/v1/traces");
3229 assert_eq!(config.transport, OtlpTransport::HttpJson);
3230 assert_eq!(config.service_name, "ringkernel");
3231 assert_eq!(config.batch_size, 512);
3232 }
3233
3234 #[test]
3235 fn test_otlp_config_builder() {
3236 let config = OtlpConfig::new("http://example.com/v1/traces")
3237 .with_service_name("my-service")
3238 .with_service_version("1.0.0")
3239 .with_instance_id("instance-1")
3240 .with_attribute("env", "production")
3241 .with_batch_size(100);
3242
3243 assert_eq!(config.endpoint, "http://example.com/v1/traces");
3244 assert_eq!(config.service_name, "my-service");
3245 assert_eq!(config.service_version, "1.0.0");
3246 assert_eq!(config.service_instance_id, Some("instance-1".to_string()));
3247 assert_eq!(config.resource_attributes.len(), 1);
3248 assert_eq!(config.batch_size, 100);
3249 }
3250
3251 #[test]
3252 fn test_otlp_config_jaeger() {
3253 let config = OtlpConfig::jaeger("http://jaeger:4318/v1/traces");
3254 assert_eq!(config.endpoint, "http://jaeger:4318/v1/traces");
3255 assert_eq!(config.service_name, "ringkernel");
3256 }
3257
3258 #[test]
3259 fn test_otlp_config_honeycomb() {
3260 let config = OtlpConfig::honeycomb("my-api-key");
3261 assert_eq!(config.endpoint, "https://api.honeycomb.io/v1/traces");
3262 assert_eq!(
3263 config.authorization,
3264 Some("x-honeycomb-team my-api-key".to_string())
3265 );
3266 }
3267
3268 #[test]
3269 fn test_otlp_exporter_creation() {
3270 let exporter = OtlpExporter::new(OtlpConfig::default());
3271 assert_eq!(exporter.buffered_count(), 0);
3272 assert_eq!(exporter.config().service_name, "ringkernel");
3273 }
3274
3275 #[test]
3276 fn test_otlp_exporter_jaeger_local() {
3277 let exporter = OtlpExporter::jaeger_local();
3278 assert_eq!(
3279 exporter.config().endpoint,
3280 "http://localhost:4318/v1/traces"
3281 );
3282 }
3283
3284 #[test]
3285 fn test_otlp_exporter_buffering() {
3286 let config = OtlpConfig::default().with_batch_size(10);
3287 let exporter = OtlpExporter::new(config);
3288
3289 let span = Span::new("test_span", SpanKind::Internal);
3291
3292 for _ in 0..5 {
3294 exporter.export_span(span.clone());
3295 }
3296
3297 assert_eq!(exporter.buffered_count(), 5);
3298 }
3299
3300 #[test]
3301 fn test_otlp_exporter_flush_empty() {
3302 let exporter = OtlpExporter::new(OtlpConfig::default());
3303
3304 let result = exporter.flush();
3305 assert!(result.success);
3306 assert_eq!(result.spans_exported, 0);
3307 }
3308
3309 #[test]
3310 fn test_otlp_exporter_stats() {
3311 let exporter = OtlpExporter::new(OtlpConfig::default());
3312
3313 let stats = exporter.stats();
3315 assert_eq!(stats.total_exports, 0);
3316 assert_eq!(stats.total_spans_exported, 0);
3317 assert_eq!(stats.buffered_spans, 0);
3318 }
3319
3320 #[test]
3321 fn test_otlp_transport_default() {
3322 let transport = OtlpTransport::default();
3323 assert_eq!(transport, OtlpTransport::HttpJson);
3324 }
3325}