1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48 pub fn new() -> Self {
50 use std::hash::{Hash, Hasher};
51 let mut hasher = std::collections::hash_map::DefaultHasher::new();
52 SystemTime::now().hash(&mut hasher);
53 std::thread::current().id().hash(&mut hasher);
54 let high = hasher.finish() as u128;
55 hasher.write_u64(high as u64);
56 let low = hasher.finish() as u128;
57 Self((high << 64) | low)
58 }
59
60 pub fn from_hex(hex: &str) -> Option<Self> {
62 u128::from_str_radix(hex, 16).ok().map(Self)
63 }
64
65 pub fn to_hex(&self) -> String {
67 format!("{:032x}", self.0)
68 }
69}
70
71impl Default for TraceId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82 pub fn new() -> Self {
84 use std::hash::{Hash, Hasher};
85 let mut hasher = std::collections::hash_map::DefaultHasher::new();
86 SystemTime::now().hash(&mut hasher);
87 std::process::id().hash(&mut hasher);
88 Self(hasher.finish())
89 }
90
91 pub fn from_hex(hex: &str) -> Option<Self> {
93 u64::from_str_radix(hex, 16).ok().map(Self)
94 }
95
96 pub fn to_hex(&self) -> String {
98 format!("{:016x}", self.0)
99 }
100}
101
102impl Default for SpanId {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111 Internal,
113 Server,
115 Client,
117 Producer,
119 Consumer,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126 Unset,
128 Ok,
130 Error {
132 message: String,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Span {
140 pub trace_id: TraceId,
142 pub span_id: SpanId,
144 pub parent_span_id: Option<SpanId>,
146 pub name: String,
148 pub kind: SpanKind,
150 pub start_time: Instant,
152 pub end_time: Option<Instant>,
154 pub status: SpanStatus,
156 pub attributes: HashMap<String, AttributeValue>,
158 pub events: Vec<SpanEvent>,
160}
161
162#[derive(Debug, Clone)]
164pub enum AttributeValue {
165 String(String),
167 Int(i64),
169 Float(f64),
171 Bool(bool),
173 StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178 fn from(s: &str) -> Self {
179 Self::String(s.to_string())
180 }
181}
182
183impl From<String> for AttributeValue {
184 fn from(s: String) -> Self {
185 Self::String(s)
186 }
187}
188
189impl From<i64> for AttributeValue {
190 fn from(i: i64) -> Self {
191 Self::Int(i)
192 }
193}
194
195impl From<f64> for AttributeValue {
196 fn from(f: f64) -> Self {
197 Self::Float(f)
198 }
199}
200
201impl From<bool> for AttributeValue {
202 fn from(b: bool) -> Self {
203 Self::Bool(b)
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct SpanEvent {
210 pub name: String,
212 pub timestamp: Instant,
214 pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219 pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221 Self {
222 trace_id: TraceId::new(),
223 span_id: SpanId::new(),
224 parent_span_id: None,
225 name: name.into(),
226 kind,
227 start_time: Instant::now(),
228 end_time: None,
229 status: SpanStatus::Unset,
230 attributes: HashMap::new(),
231 events: Vec::new(),
232 }
233 }
234
235 pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237 Self {
238 trace_id: self.trace_id,
239 span_id: SpanId::new(),
240 parent_span_id: Some(self.span_id),
241 name: name.into(),
242 kind,
243 start_time: Instant::now(),
244 end_time: None,
245 status: SpanStatus::Unset,
246 attributes: HashMap::new(),
247 events: Vec::new(),
248 }
249 }
250
251 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253 self.attributes.insert(key.into(), value.into());
254 }
255
256 pub fn add_event(&mut self, name: impl Into<String>) {
258 self.events.push(SpanEvent {
259 name: name.into(),
260 timestamp: Instant::now(),
261 attributes: HashMap::new(),
262 });
263 }
264
265 pub fn add_event_with_attributes(
267 &mut self,
268 name: impl Into<String>,
269 attributes: HashMap<String, AttributeValue>,
270 ) {
271 self.events.push(SpanEvent {
272 name: name.into(),
273 timestamp: Instant::now(),
274 attributes,
275 });
276 }
277
278 pub fn set_ok(&mut self) {
280 self.status = SpanStatus::Ok;
281 }
282
283 pub fn set_error(&mut self, message: impl Into<String>) {
285 self.status = SpanStatus::Error {
286 message: message.into(),
287 };
288 }
289
290 pub fn end(&mut self) {
292 self.end_time = Some(Instant::now());
293 }
294
295 pub fn duration(&self) -> Duration {
297 self.end_time
298 .unwrap_or_else(Instant::now)
299 .duration_since(self.start_time)
300 }
301
302 pub fn is_ended(&self) -> bool {
304 self.end_time.is_some()
305 }
306}
307
308pub struct SpanBuilder {
314 name: String,
315 kind: SpanKind,
316 parent: Option<(TraceId, SpanId)>,
317 attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321 pub fn new(name: impl Into<String>) -> Self {
323 Self {
324 name: name.into(),
325 kind: SpanKind::Internal,
326 parent: None,
327 attributes: HashMap::new(),
328 }
329 }
330
331 pub fn kind(mut self, kind: SpanKind) -> Self {
333 self.kind = kind;
334 self
335 }
336
337 pub fn parent(mut self, parent: &Span) -> Self {
339 self.parent = Some((parent.trace_id, parent.span_id));
340 self
341 }
342
343 pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345 self.attributes.insert(key.into(), value.into());
346 self
347 }
348
349 pub fn build(self) -> Span {
351 let mut span = Span::new(self.name, self.kind);
352 if let Some((trace_id, parent_id)) = self.parent {
353 span.trace_id = trace_id;
354 span.parent_span_id = Some(parent_id);
355 }
356 span.attributes = self.attributes;
357 span
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368 Counter,
370 Gauge,
372 Histogram,
374 Summary,
376}
377
378#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381 pub name: String,
383 pub metric_type: MetricType,
385 pub help: String,
387 pub labels: Vec<String>,
389}
390
391#[derive(Debug, Clone)]
393pub struct MetricSample {
394 pub name: String,
396 pub label_values: Vec<String>,
398 pub value: f64,
400 pub timestamp_ms: Option<u64>,
402}
403
404pub struct PrometheusExporter {
406 definitions: RwLock<Vec<MetricDefinition>>,
408 collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410 custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412 export_count: AtomicU64,
414}
415
416struct CustomMetric {
418 definition: MetricDefinition,
419 samples: Vec<MetricSample>,
420}
421
422pub trait PrometheusCollector: Send + Sync {
424 fn definitions(&self) -> Vec<MetricDefinition>;
426
427 fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432 pub fn new() -> Arc<Self> {
434 Arc::new(Self {
435 definitions: RwLock::new(Vec::new()),
436 collectors: RwLock::new(Vec::new()),
437 custom_metrics: RwLock::new(HashMap::new()),
438 export_count: AtomicU64::new(0),
439 })
440 }
441
442 pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444 let defs = collector.definitions();
445 self.definitions.write().extend(defs);
446 self.collectors.write().push(collector);
447 }
448
449 pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451 let def = MetricDefinition {
452 name: name.to_string(),
453 metric_type: MetricType::Counter,
454 help: help.to_string(),
455 labels: labels.iter().map(|s| s.to_string()).collect(),
456 };
457 self.custom_metrics.write().insert(
458 name.to_string(),
459 CustomMetric {
460 definition: def,
461 samples: Vec::new(),
462 },
463 );
464 }
465
466 pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468 let def = MetricDefinition {
469 name: name.to_string(),
470 metric_type: MetricType::Gauge,
471 help: help.to_string(),
472 labels: labels.iter().map(|s| s.to_string()).collect(),
473 };
474 self.custom_metrics.write().insert(
475 name.to_string(),
476 CustomMetric {
477 definition: def,
478 samples: Vec::new(),
479 },
480 );
481 }
482
483 pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485 let def = MetricDefinition {
486 name: name.to_string(),
487 metric_type: MetricType::Histogram,
488 help: help.to_string(),
489 labels: labels.iter().map(|s| s.to_string()).collect(),
490 };
491 self.custom_metrics.write().insert(
492 name.to_string(),
493 CustomMetric {
494 definition: def,
495 samples: Vec::new(),
496 },
497 );
498 }
499
500 pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502 let mut metrics = self.custom_metrics.write();
503 if let Some(metric) = metrics.get_mut(name) {
504 let sample = MetricSample {
505 name: name.to_string(),
506 label_values: label_values.iter().map(|s| s.to_string()).collect(),
507 value,
508 timestamp_ms: None,
509 };
510 let existing = metric
512 .samples
513 .iter_mut()
514 .find(|s| s.label_values == sample.label_values);
515 if let Some(existing) = existing {
516 existing.value = value;
517 } else {
518 metric.samples.push(sample);
519 }
520 }
521 }
522
523 pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525 self.add_counter(name, 1.0, label_values);
526 }
527
528 pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530 let mut metrics = self.custom_metrics.write();
531 if let Some(metric) = metrics.get_mut(name) {
532 let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533 let existing = metric
534 .samples
535 .iter_mut()
536 .find(|s| s.label_values == label_vec);
537 if let Some(existing) = existing {
538 existing.value += delta;
539 } else {
540 metric.samples.push(MetricSample {
541 name: name.to_string(),
542 label_values: label_vec,
543 value: delta,
544 timestamp_ms: None,
545 });
546 }
547 }
548 }
549
550 pub fn render(&self) -> String {
552 self.export_count.fetch_add(1, Ordering::Relaxed);
553
554 let mut output = String::new();
555
556 let collectors = self.collectors.read();
558 for collector in collectors.iter() {
559 let defs = collector.definitions();
560 let samples = collector.collect();
561
562 for def in &defs {
563 writeln!(output, "# HELP {} {}", def.name, def.help).unwrap();
565 writeln!(
566 output,
567 "# TYPE {} {}",
568 def.name,
569 match def.metric_type {
570 MetricType::Counter => "counter",
571 MetricType::Gauge => "gauge",
572 MetricType::Histogram => "histogram",
573 MetricType::Summary => "summary",
574 }
575 )
576 .unwrap();
577
578 for sample in samples.iter().filter(|s| s.name == def.name) {
580 Self::write_sample(&mut output, &def.labels, sample);
581 }
582 }
583 }
584
585 let custom = self.custom_metrics.read();
587 for metric in custom.values() {
588 writeln!(
589 output,
590 "# HELP {} {}",
591 metric.definition.name, metric.definition.help
592 )
593 .unwrap();
594 writeln!(
595 output,
596 "# TYPE {} {}",
597 metric.definition.name,
598 match metric.definition.metric_type {
599 MetricType::Counter => "counter",
600 MetricType::Gauge => "gauge",
601 MetricType::Histogram => "histogram",
602 MetricType::Summary => "summary",
603 }
604 )
605 .unwrap();
606
607 for sample in &metric.samples {
608 Self::write_sample(&mut output, &metric.definition.labels, sample);
609 }
610 }
611
612 output
613 }
614
615 fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
616 if labels.is_empty() || sample.label_values.is_empty() {
617 writeln!(output, "{} {}", sample.name, sample.value).unwrap();
618 } else {
619 let label_pairs: Vec<String> = labels
620 .iter()
621 .zip(sample.label_values.iter())
622 .map(|(k, v)| format!("{}=\"{}\"", k, v))
623 .collect();
624 writeln!(
625 output,
626 "{}{{{}}} {}",
627 sample.name,
628 label_pairs.join(","),
629 sample.value
630 )
631 .unwrap();
632 }
633 }
634
635 pub fn export_count(&self) -> u64 {
637 self.export_count.load(Ordering::Relaxed)
638 }
639}
640
641impl Default for PrometheusExporter {
642 fn default() -> Self {
643 Self {
644 definitions: RwLock::new(Vec::new()),
645 collectors: RwLock::new(Vec::new()),
646 custom_metrics: RwLock::new(HashMap::new()),
647 export_count: AtomicU64::new(0),
648 }
649 }
650}
651
652pub struct RingKernelCollector {
658 collector: Arc<MetricsCollector>,
660}
661
662impl RingKernelCollector {
663 pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
665 Arc::new(Self { collector })
666 }
667}
668
669impl PrometheusCollector for RingKernelCollector {
670 fn definitions(&self) -> Vec<MetricDefinition> {
671 vec![
672 MetricDefinition {
673 name: "ringkernel_messages_processed_total".to_string(),
674 metric_type: MetricType::Counter,
675 help: "Total number of messages processed by kernels".to_string(),
676 labels: vec!["kernel_id".to_string()],
677 },
678 MetricDefinition {
679 name: "ringkernel_messages_dropped_total".to_string(),
680 metric_type: MetricType::Counter,
681 help: "Total number of messages dropped by kernels".to_string(),
682 labels: vec!["kernel_id".to_string()],
683 },
684 MetricDefinition {
685 name: "ringkernel_latency_us".to_string(),
686 metric_type: MetricType::Gauge,
687 help: "Current average message latency in microseconds".to_string(),
688 labels: vec!["kernel_id".to_string(), "stat".to_string()],
689 },
690 MetricDefinition {
691 name: "ringkernel_throughput".to_string(),
692 metric_type: MetricType::Gauge,
693 help: "Current message throughput per second".to_string(),
694 labels: vec!["kernel_id".to_string()],
695 },
696 ]
697 }
698
699 fn collect(&self) -> Vec<MetricSample> {
700 let aggregate = self.collector.get_aggregate();
701 let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
702
703 vec![
704 MetricSample {
705 name: "ringkernel_messages_processed_total".to_string(),
706 label_values: vec!["aggregate".to_string()],
707 value: aggregate.messages_processed as f64,
708 timestamp_ms: None,
709 },
710 MetricSample {
711 name: "ringkernel_messages_dropped_total".to_string(),
712 label_values: vec!["aggregate".to_string()],
713 value: aggregate.messages_dropped as f64,
714 timestamp_ms: None,
715 },
716 MetricSample {
717 name: "ringkernel_latency_us".to_string(),
718 label_values: vec!["aggregate".to_string(), "avg".to_string()],
719 value: aggregate.avg_latency_us(),
720 timestamp_ms: None,
721 },
722 MetricSample {
723 name: "ringkernel_latency_us".to_string(),
724 label_values: vec!["aggregate".to_string(), "min".to_string()],
725 value: aggregate.min_latency_us as f64,
726 timestamp_ms: None,
727 },
728 MetricSample {
729 name: "ringkernel_latency_us".to_string(),
730 label_values: vec!["aggregate".to_string(), "max".to_string()],
731 value: aggregate.max_latency_us as f64,
732 timestamp_ms: None,
733 },
734 MetricSample {
735 name: "ringkernel_throughput".to_string(),
736 label_values: vec!["aggregate".to_string()],
737 value: aggregate.messages_processed as f64 / elapsed,
738 timestamp_ms: None,
739 },
740 ]
741 }
742}
743
744#[derive(Debug, Clone, Copy, PartialEq, Eq)]
750pub enum PanelType {
751 Graph,
753 Stat,
755 Table,
757 Heatmap,
759 BarGauge,
761}
762
763#[derive(Debug, Clone)]
765pub struct GrafanaPanel {
766 pub title: String,
768 pub panel_type: PanelType,
770 pub queries: Vec<String>,
772 pub grid_pos: (u32, u32, u32, u32), pub unit: Option<String>,
776}
777
778pub struct GrafanaDashboard {
780 title: String,
782 description: String,
784 panels: Vec<GrafanaPanel>,
786 refresh: String,
788 time_from: String,
790 tags: Vec<String>,
792}
793
794impl GrafanaDashboard {
795 pub fn new(title: impl Into<String>) -> Self {
797 Self {
798 title: title.into(),
799 description: String::new(),
800 panels: Vec::new(),
801 refresh: "5s".to_string(),
802 time_from: "now-1h".to_string(),
803 tags: vec!["ringkernel".to_string()],
804 }
805 }
806
807 pub fn description(mut self, desc: impl Into<String>) -> Self {
809 self.description = desc.into();
810 self
811 }
812
813 pub fn refresh(mut self, interval: impl Into<String>) -> Self {
815 self.refresh = interval.into();
816 self
817 }
818
819 pub fn time_from(mut self, from: impl Into<String>) -> Self {
821 self.time_from = from.into();
822 self
823 }
824
825 pub fn tag(mut self, tag: impl Into<String>) -> Self {
827 self.tags.push(tag.into());
828 self
829 }
830
831 pub fn panel(mut self, panel: GrafanaPanel) -> Self {
833 self.panels.push(panel);
834 self
835 }
836
837 pub fn add_throughput_panel(mut self) -> Self {
839 self.panels.push(GrafanaPanel {
840 title: "Message Throughput".to_string(),
841 panel_type: PanelType::Graph,
842 queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
843 grid_pos: (0, 0, 12, 8),
844 unit: Some("msg/s".to_string()),
845 });
846 self
847 }
848
849 pub fn add_latency_panel(mut self) -> Self {
851 self.panels.push(GrafanaPanel {
852 title: "Message Latency".to_string(),
853 panel_type: PanelType::Graph,
854 queries: vec![
855 "ringkernel_latency_us{stat=\"avg\"}".to_string(),
856 "ringkernel_latency_us{stat=\"max\"}".to_string(),
857 ],
858 grid_pos: (12, 0, 12, 8),
859 unit: Some("µs".to_string()),
860 });
861 self
862 }
863
864 pub fn add_kernel_status_panel(mut self) -> Self {
866 self.panels.push(GrafanaPanel {
867 title: "Active Kernels".to_string(),
868 panel_type: PanelType::Stat,
869 queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
870 grid_pos: (0, 8, 6, 4),
871 unit: None,
872 });
873 self
874 }
875
876 pub fn add_drop_rate_panel(mut self) -> Self {
878 self.panels.push(GrafanaPanel {
879 title: "Message Drop Rate".to_string(),
880 panel_type: PanelType::Graph,
881 queries: vec![
882 "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
883 ],
884 grid_pos: (6, 8, 6, 4),
885 unit: Some("percentunit".to_string()),
886 });
887 self
888 }
889
890 pub fn add_multi_gpu_panel(mut self) -> Self {
892 self.panels.push(GrafanaPanel {
893 title: "GPU Memory Usage".to_string(),
894 panel_type: PanelType::BarGauge,
895 queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
896 grid_pos: (12, 8, 12, 4),
897 unit: Some("bytes".to_string()),
898 });
899 self
900 }
901
902 pub fn add_standard_panels(self) -> Self {
904 self.add_throughput_panel()
905 .add_latency_panel()
906 .add_kernel_status_panel()
907 .add_drop_rate_panel()
908 .add_multi_gpu_panel()
909 }
910
911 pub fn build(&self) -> String {
913 let panels_json: Vec<String> = self
914 .panels
915 .iter()
916 .enumerate()
917 .map(|(i, panel)| {
918 let queries_json: Vec<String> = panel
919 .queries
920 .iter()
921 .enumerate()
922 .map(|(j, q)| {
923 format!(
924 r#"{{
925 "expr": "{}",
926 "refId": "{}",
927 "legendFormat": "{{}}"
928 }}"#,
929 q,
930 (b'A' + j as u8) as char
931 )
932 })
933 .collect();
934
935 let unit_field = panel
936 .unit
937 .as_ref()
938 .map(|u| format!(r#""unit": "{}","#, u))
939 .unwrap_or_default();
940
941 format!(
942 r#"{{
943 "id": {},
944 "title": "{}",
945 "type": "{}",
946 "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
947 {}
948 "targets": [{}],
949 "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
950 }}"#,
951 i + 1,
952 panel.title,
953 match panel.panel_type {
954 PanelType::Graph => "timeseries",
955 PanelType::Stat => "stat",
956 PanelType::Table => "table",
957 PanelType::Heatmap => "heatmap",
958 PanelType::BarGauge => "bargauge",
959 },
960 panel.grid_pos.0,
961 panel.grid_pos.1,
962 panel.grid_pos.2,
963 panel.grid_pos.3,
964 unit_field,
965 queries_json.join(",")
966 )
967 })
968 .collect();
969
970 let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
971
972 format!(
973 r#"{{
974 "title": "{}",
975 "description": "{}",
976 "tags": [{}],
977 "refresh": "{}",
978 "time": {{"from": "{}", "to": "now"}},
979 "templating": {{
980 "list": [
981 {{
982 "name": "datasource",
983 "type": "datasource",
984 "query": "prometheus"
985 }},
986 {{
987 "name": "kernel_id",
988 "type": "query",
989 "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
990 "multi": true,
991 "includeAll": true
992 }}
993 ]
994 }},
995 "panels": [{}]
996 }}"#,
997 self.title,
998 self.description,
999 tags_json.join(","),
1000 self.refresh,
1001 self.time_from,
1002 panels_json.join(",")
1003 )
1004 }
1005}
1006
1007pub struct ObservabilityContext {
1013 active_spans: RwLock<HashMap<SpanId, Span>>,
1015 completed_spans: RwLock<Vec<Span>>,
1017 max_completed: usize,
1019 prometheus: Arc<PrometheusExporter>,
1021}
1022
1023impl ObservabilityContext {
1024 pub fn new() -> Arc<Self> {
1026 Arc::new(Self {
1027 active_spans: RwLock::new(HashMap::new()),
1028 completed_spans: RwLock::new(Vec::new()),
1029 max_completed: 10000,
1030 prometheus: PrometheusExporter::new(),
1031 })
1032 }
1033
1034 pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1036 let span = Span::new(name, kind);
1037 self.active_spans.write().insert(span.span_id, span.clone());
1038 span
1039 }
1040
1041 pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1043 let span = parent.child(name, kind);
1044 self.active_spans.write().insert(span.span_id, span.clone());
1045 span
1046 }
1047
1048 pub fn end_span(&self, mut span: Span) {
1050 span.end();
1051 self.active_spans.write().remove(&span.span_id);
1052
1053 let mut completed = self.completed_spans.write();
1054 completed.push(span);
1055 if completed.len() > self.max_completed {
1056 completed.remove(0);
1057 }
1058 }
1059
1060 pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1062 &self.prometheus
1063 }
1064
1065 pub fn export_spans(&self) -> Vec<Span> {
1067 self.completed_spans.write().drain(..).collect()
1068 }
1069
1070 pub fn active_span_count(&self) -> usize {
1072 self.active_spans.read().len()
1073 }
1074}
1075
1076impl Default for ObservabilityContext {
1077 fn default() -> Self {
1078 Self {
1079 active_spans: RwLock::new(HashMap::new()),
1080 completed_spans: RwLock::new(Vec::new()),
1081 max_completed: 10000,
1082 prometheus: PrometheusExporter::new(),
1083 }
1084 }
1085}
1086
1087#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1093pub enum GpuProfilerBackend {
1094 Nsight,
1096 RenderDoc,
1098 Pix,
1100 MetalSystemTrace,
1102 Rgp,
1104 Custom,
1106}
1107
1108#[derive(Debug, Clone, Copy)]
1110pub struct ProfilerColor {
1111 pub r: u8,
1113 pub g: u8,
1115 pub b: u8,
1117 pub a: u8,
1119}
1120
1121impl ProfilerColor {
1122 pub const fn new(r: u8, g: u8, b: u8) -> Self {
1124 Self { r, g, b, a: 255 }
1125 }
1126
1127 pub const RED: Self = Self::new(255, 0, 0);
1129 pub const GREEN: Self = Self::new(0, 255, 0);
1131 pub const BLUE: Self = Self::new(0, 0, 255);
1133 pub const YELLOW: Self = Self::new(255, 255, 0);
1135 pub const CYAN: Self = Self::new(0, 255, 255);
1137 pub const MAGENTA: Self = Self::new(255, 0, 255);
1139 pub const ORANGE: Self = Self::new(255, 165, 0);
1141}
1142
1143pub struct ProfilerRange {
1145 #[allow(dead_code)]
1147 name: String,
1148 #[allow(dead_code)]
1150 backend: GpuProfilerBackend,
1151 start: Instant,
1153}
1154
1155impl ProfilerRange {
1156 fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1158 Self {
1159 name: name.into(),
1160 backend,
1161 start: Instant::now(),
1162 }
1163 }
1164
1165 pub fn elapsed(&self) -> Duration {
1167 self.start.elapsed()
1168 }
1169}
1170
1171impl Drop for ProfilerRange {
1172 fn drop(&mut self) {
1173 }
1176}
1177
1178pub trait GpuProfiler: Send + Sync {
1183 fn is_available(&self) -> bool {
1185 false
1186 }
1187
1188 fn backend(&self) -> GpuProfilerBackend;
1190
1191 fn start_capture(&self) -> Result<(), ProfilerError> {
1193 Ok(())
1194 }
1195
1196 fn end_capture(&self) -> Result<(), ProfilerError> {
1198 Ok(())
1199 }
1200
1201 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1203 Ok(())
1204 }
1205
1206 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1208 ProfilerRange::new(name, self.backend())
1209 }
1210
1211 fn pop_range(&self) {}
1213
1214 fn mark(&self, _name: &str, _color: ProfilerColor) {}
1216
1217 fn set_thread_name(&self, _name: &str) {}
1219
1220 fn message(&self, _text: &str) {}
1222
1223 fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1225
1226 fn unregister_allocation(&self, _ptr: u64) {}
1228}
1229
1230#[derive(Debug, Clone, thiserror::Error)]
1232pub enum ProfilerError {
1233 #[error("GPU profiler not available")]
1235 NotAvailable,
1236 #[error("GPU profiler not attached")]
1238 NotAttached,
1239 #[error("Capture already in progress")]
1241 CaptureInProgress,
1242 #[error("No capture in progress")]
1244 NoCaptureInProgress,
1245 #[error("Profiler error: {0}")]
1247 Backend(String),
1248}
1249
1250pub struct NullProfiler;
1252
1253impl GpuProfiler for NullProfiler {
1254 fn backend(&self) -> GpuProfilerBackend {
1255 GpuProfilerBackend::Custom
1256 }
1257}
1258
1259pub struct NvtxProfiler {
1264 available: bool,
1266 capture_in_progress: std::sync::atomic::AtomicBool,
1268}
1269
1270impl NvtxProfiler {
1271 pub fn new() -> Self {
1275 Self {
1276 available: false, capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1278 }
1279 }
1280
1281 pub fn is_nvtx_loaded(&self) -> bool {
1283 self.available
1285 }
1286}
1287
1288impl Default for NvtxProfiler {
1289 fn default() -> Self {
1290 Self::new()
1291 }
1292}
1293
1294impl GpuProfiler for NvtxProfiler {
1295 fn is_available(&self) -> bool {
1296 self.available
1297 }
1298
1299 fn backend(&self) -> GpuProfilerBackend {
1300 GpuProfilerBackend::Nsight
1301 }
1302
1303 fn start_capture(&self) -> Result<(), ProfilerError> {
1304 if !self.available {
1305 return Err(ProfilerError::NotAvailable);
1306 }
1307 if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1308 return Err(ProfilerError::CaptureInProgress);
1309 }
1310 Ok(())
1312 }
1313
1314 fn end_capture(&self) -> Result<(), ProfilerError> {
1315 if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1316 return Err(ProfilerError::NoCaptureInProgress);
1317 }
1318 Ok(())
1320 }
1321
1322 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1323 ProfilerRange::new(name, self.backend())
1325 }
1326
1327 fn pop_range(&self) {
1328 }
1330
1331 fn mark(&self, _name: &str, _color: ProfilerColor) {
1332 }
1334
1335 fn set_thread_name(&self, _name: &str) {
1336 }
1338}
1339
1340pub struct RenderDocProfiler {
1344 attached: bool,
1346}
1347
1348impl RenderDocProfiler {
1349 pub fn new() -> Self {
1353 Self {
1354 attached: false, }
1356 }
1357
1358 pub fn is_attached(&self) -> bool {
1360 self.attached
1362 }
1363
1364 pub fn get_capture_path(&self) -> Option<String> {
1366 None
1368 }
1369
1370 pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1372 if !self.attached {
1373 return Err(ProfilerError::NotAttached);
1374 }
1375 Ok(())
1377 }
1378}
1379
1380impl Default for RenderDocProfiler {
1381 fn default() -> Self {
1382 Self::new()
1383 }
1384}
1385
1386impl GpuProfiler for RenderDocProfiler {
1387 fn is_available(&self) -> bool {
1388 self.attached
1389 }
1390
1391 fn backend(&self) -> GpuProfilerBackend {
1392 GpuProfilerBackend::RenderDoc
1393 }
1394
1395 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1396 if !self.attached {
1397 return Err(ProfilerError::NotAttached);
1398 }
1399 Ok(())
1401 }
1402
1403 fn start_capture(&self) -> Result<(), ProfilerError> {
1404 if !self.attached {
1405 return Err(ProfilerError::NotAttached);
1406 }
1407 Ok(())
1409 }
1410
1411 fn end_capture(&self) -> Result<(), ProfilerError> {
1412 Ok(())
1414 }
1415
1416 fn set_thread_name(&self, _name: &str) {
1417 }
1419}
1420
1421#[cfg(target_os = "macos")]
1425pub struct MetalProfiler {
1426 available: bool,
1428}
1429
1430#[cfg(target_os = "macos")]
1431impl MetalProfiler {
1432 pub fn new() -> Self {
1434 Self { available: true }
1435 }
1436}
1437
1438#[cfg(target_os = "macos")]
1439impl Default for MetalProfiler {
1440 fn default() -> Self {
1441 Self::new()
1442 }
1443}
1444
1445#[cfg(target_os = "macos")]
1446impl GpuProfiler for MetalProfiler {
1447 fn is_available(&self) -> bool {
1448 self.available
1449 }
1450
1451 fn backend(&self) -> GpuProfilerBackend {
1452 GpuProfilerBackend::MetalSystemTrace
1453 }
1454
1455 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1456 ProfilerRange::new(name, self.backend())
1458 }
1459
1460 fn pop_range(&self) {
1461 }
1463
1464 fn mark(&self, _name: &str, _color: ProfilerColor) {
1465 }
1467}
1468
1469pub struct GpuProfilerManager {
1471 profiler: Arc<dyn GpuProfiler>,
1473 enabled: std::sync::atomic::AtomicBool,
1475}
1476
1477impl GpuProfilerManager {
1478 pub fn new() -> Self {
1480 let nvtx = NvtxProfiler::new();
1482 if nvtx.is_available() {
1483 return Self {
1484 profiler: Arc::new(nvtx),
1485 enabled: std::sync::atomic::AtomicBool::new(true),
1486 };
1487 }
1488
1489 let renderdoc = RenderDocProfiler::new();
1490 if renderdoc.is_available() {
1491 return Self {
1492 profiler: Arc::new(renderdoc),
1493 enabled: std::sync::atomic::AtomicBool::new(true),
1494 };
1495 }
1496
1497 Self {
1499 profiler: Arc::new(NullProfiler),
1500 enabled: std::sync::atomic::AtomicBool::new(false),
1501 }
1502 }
1503
1504 pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1506 let enabled = profiler.is_available();
1507 Self {
1508 profiler,
1509 enabled: std::sync::atomic::AtomicBool::new(enabled),
1510 }
1511 }
1512
1513 pub fn is_enabled(&self) -> bool {
1515 self.enabled.load(Ordering::Relaxed)
1516 }
1517
1518 pub fn set_enabled(&self, enabled: bool) {
1520 self.enabled.store(enabled, Ordering::Relaxed);
1521 }
1522
1523 pub fn backend(&self) -> GpuProfilerBackend {
1525 self.profiler.backend()
1526 }
1527
1528 pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1530 ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1531 }
1532
1533 pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1535 ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1536 }
1537
1538 pub fn mark(&self, name: &str) {
1540 if self.is_enabled() {
1541 self.profiler.mark(name, ProfilerColor::CYAN);
1542 }
1543 }
1544
1545 pub fn profiler(&self) -> &dyn GpuProfiler {
1547 &*self.profiler
1548 }
1549}
1550
1551impl Default for GpuProfilerManager {
1552 fn default() -> Self {
1553 Self::new()
1554 }
1555}
1556
1557pub struct ProfilerScope<'a> {
1559 profiler: &'a dyn GpuProfiler,
1560 enabled: bool,
1561}
1562
1563impl<'a> ProfilerScope<'a> {
1564 fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1565 if enabled {
1566 profiler.push_range(name, ProfilerColor::CYAN);
1567 }
1568 Self { profiler, enabled }
1569 }
1570
1571 fn new_colored(
1572 name: &str,
1573 profiler: &'a dyn GpuProfiler,
1574 enabled: bool,
1575 color: ProfilerColor,
1576 ) -> Self {
1577 if enabled {
1578 profiler.push_range(name, color);
1579 }
1580 Self { profiler, enabled }
1581 }
1582}
1583
1584impl<'a> Drop for ProfilerScope<'a> {
1585 fn drop(&mut self) {
1586 if self.enabled {
1587 self.profiler.pop_range();
1588 }
1589 }
1590}
1591
1592#[macro_export]
1606macro_rules! gpu_profile {
1607 ($profiler:expr, $name:expr) => {
1608 let _scope = $profiler.scope($name);
1609 };
1610 ($profiler:expr, $name:expr, $color:expr) => {
1611 let _scope = $profiler.scope_colored($name, $color);
1612 };
1613}
1614
1615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1621pub enum GpuMemoryType {
1622 DeviceLocal,
1624 HostVisible,
1626 HostCoherent,
1628 Mapped,
1630 QueueBuffer,
1632 ControlBlock,
1634 SharedMemory,
1636}
1637
1638#[derive(Debug, Clone)]
1640pub struct GpuMemoryAllocation {
1641 pub id: u64,
1643 pub name: String,
1645 pub size: usize,
1647 pub memory_type: GpuMemoryType,
1649 pub device_index: u32,
1651 pub kernel_id: Option<String>,
1653 pub allocated_at: Instant,
1655 pub in_use: bool,
1657}
1658
1659#[derive(Debug, Clone, Default)]
1661pub struct GpuMemoryPoolStats {
1662 pub name: String,
1664 pub capacity: usize,
1666 pub allocated: usize,
1668 pub peak_allocated: usize,
1670 pub allocation_count: u32,
1672 pub total_allocations: u64,
1674 pub total_deallocations: u64,
1676 pub fragmentation: f32,
1678}
1679
1680impl GpuMemoryPoolStats {
1681 pub fn utilization(&self) -> f32 {
1683 if self.capacity == 0 {
1684 0.0
1685 } else {
1686 (self.allocated as f32 / self.capacity as f32) * 100.0
1687 }
1688 }
1689}
1690
1691#[derive(Debug, Clone, Default)]
1693pub struct GpuDeviceMemoryStats {
1694 pub device_index: u32,
1696 pub device_name: String,
1698 pub total_memory: u64,
1700 pub free_memory: u64,
1702 pub ringkernel_used: u64,
1704 pub other_used: u64,
1706 pub pools: Vec<GpuMemoryPoolStats>,
1708}
1709
1710impl GpuDeviceMemoryStats {
1711 pub fn used_memory(&self) -> u64 {
1713 self.total_memory - self.free_memory
1714 }
1715
1716 pub fn utilization(&self) -> f32 {
1718 if self.total_memory == 0 {
1719 0.0
1720 } else {
1721 (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1722 }
1723 }
1724}
1725
1726pub struct GpuMemoryDashboard {
1756 allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1758 device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1760 thresholds: GpuMemoryThresholds,
1762 allocation_counter: AtomicU64,
1764 total_allocated: AtomicU64,
1766 peak_allocated: AtomicU64,
1768}
1769
1770#[derive(Debug, Clone)]
1772pub struct GpuMemoryThresholds {
1773 pub warning: f32,
1775 pub critical: f32,
1777 pub max_allocation_size: usize,
1779 pub max_allocation_count: u32,
1781}
1782
1783impl Default for GpuMemoryThresholds {
1784 fn default() -> Self {
1785 Self {
1786 warning: 75.0,
1787 critical: 90.0,
1788 max_allocation_size: 1024 * 1024 * 1024, max_allocation_count: 10000,
1790 }
1791 }
1792}
1793
1794#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1796pub enum MemoryPressureLevel {
1797 Normal,
1799 Elevated,
1801 Warning,
1803 Critical,
1805 OutOfMemory,
1807}
1808
1809impl GpuMemoryDashboard {
1810 pub fn new() -> Arc<Self> {
1812 Arc::new(Self {
1813 allocations: RwLock::new(HashMap::new()),
1814 device_stats: RwLock::new(HashMap::new()),
1815 thresholds: GpuMemoryThresholds::default(),
1816 allocation_counter: AtomicU64::new(1),
1817 total_allocated: AtomicU64::new(0),
1818 peak_allocated: AtomicU64::new(0),
1819 })
1820 }
1821
1822 pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1824 Arc::new(Self {
1825 allocations: RwLock::new(HashMap::new()),
1826 device_stats: RwLock::new(HashMap::new()),
1827 thresholds,
1828 allocation_counter: AtomicU64::new(1),
1829 total_allocated: AtomicU64::new(0),
1830 peak_allocated: AtomicU64::new(0),
1831 })
1832 }
1833
1834 pub fn track_allocation(
1836 &self,
1837 id: u64,
1838 name: impl Into<String>,
1839 size: usize,
1840 memory_type: GpuMemoryType,
1841 device_index: u32,
1842 kernel_id: Option<&str>,
1843 ) {
1844 let allocation = GpuMemoryAllocation {
1845 id,
1846 name: name.into(),
1847 size,
1848 memory_type,
1849 device_index,
1850 kernel_id: kernel_id.map(String::from),
1851 allocated_at: Instant::now(),
1852 in_use: true,
1853 };
1854
1855 self.allocations.write().insert(id, allocation);
1856
1857 let new_total = self
1859 .total_allocated
1860 .fetch_add(size as u64, Ordering::Relaxed)
1861 + size as u64;
1862 let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1863 while new_total > peak {
1864 match self.peak_allocated.compare_exchange_weak(
1865 peak,
1866 new_total,
1867 Ordering::Relaxed,
1868 Ordering::Relaxed,
1869 ) {
1870 Ok(_) => break,
1871 Err(current) => peak = current,
1872 }
1873 }
1874 }
1875
1876 pub fn next_allocation_id(&self) -> u64 {
1878 self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1879 }
1880
1881 pub fn track_deallocation(&self, id: u64) {
1883 let mut allocations = self.allocations.write();
1884 if let Some(alloc) = allocations.remove(&id) {
1885 self.total_allocated
1886 .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1887 }
1888 }
1889
1890 pub fn mark_unused(&self, id: u64) {
1892 let mut allocations = self.allocations.write();
1893 if let Some(alloc) = allocations.get_mut(&id) {
1894 alloc.in_use = false;
1895 }
1896 }
1897
1898 pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1900 let stats = GpuDeviceMemoryStats {
1901 device_index,
1902 device_name: name.into(),
1903 total_memory,
1904 free_memory: total_memory,
1905 ringkernel_used: 0,
1906 other_used: 0,
1907 pools: Vec::new(),
1908 };
1909 self.device_stats.write().insert(device_index, stats);
1910 }
1911
1912 pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1914 let mut stats = self.device_stats.write();
1915 if let Some(device) = stats.get_mut(&device_index) {
1916 device.free_memory = free_memory;
1917 device.ringkernel_used = ringkernel_used;
1918 device.other_used = device
1919 .total_memory
1920 .saturating_sub(free_memory + ringkernel_used);
1921 }
1922 }
1923
1924 pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1926 self.device_stats.read().get(&device_index).cloned()
1927 }
1928
1929 pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1931 self.device_stats.read().values().cloned().collect()
1932 }
1933
1934 pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1936 self.allocations.read().values().cloned().collect()
1937 }
1938
1939 pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1941 self.allocations
1942 .read()
1943 .values()
1944 .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1945 .cloned()
1946 .collect()
1947 }
1948
1949 pub fn total_allocated(&self) -> u64 {
1951 self.total_allocated.load(Ordering::Relaxed)
1952 }
1953
1954 pub fn peak_allocated(&self) -> u64 {
1956 self.peak_allocated.load(Ordering::Relaxed)
1957 }
1958
1959 pub fn allocation_count(&self) -> usize {
1961 self.allocations.read().len()
1962 }
1963
1964 pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1966 let stats = self.device_stats.read();
1967 if let Some(device) = stats.get(&device_index) {
1968 let utilization = device.utilization();
1969 if device.free_memory == 0 {
1970 MemoryPressureLevel::OutOfMemory
1971 } else if utilization >= self.thresholds.critical {
1972 MemoryPressureLevel::Critical
1973 } else if utilization >= self.thresholds.warning {
1974 MemoryPressureLevel::Warning
1975 } else if utilization >= self.thresholds.warning * 0.8 {
1976 MemoryPressureLevel::Elevated
1977 } else {
1978 MemoryPressureLevel::Normal
1979 }
1980 } else {
1981 MemoryPressureLevel::Normal
1982 }
1983 }
1984
1985 pub fn grafana_panel(&self) -> GrafanaPanel {
1987 GrafanaPanel {
1988 title: "GPU Memory Usage".to_string(),
1989 panel_type: PanelType::BarGauge,
1990 queries: vec![
1991 "ringkernel_gpu_memory_allocated_bytes".to_string(),
1992 "ringkernel_gpu_memory_peak_bytes".to_string(),
1993 ],
1994 grid_pos: (0, 0, 12, 8),
1995 unit: Some("bytes".to_string()),
1996 }
1997 }
1998
1999 pub fn prometheus_metrics(&self) -> String {
2001 let mut output = String::new();
2002
2003 writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").unwrap();
2005 writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge").unwrap();
2006 writeln!(
2007 output,
2008 "ringkernel_gpu_memory_allocated_bytes {}",
2009 self.total_allocated()
2010 )
2011 .unwrap();
2012
2013 writeln!(
2015 output,
2016 "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2017 )
2018 .unwrap();
2019 writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge").unwrap();
2020 writeln!(
2021 output,
2022 "ringkernel_gpu_memory_peak_bytes {}",
2023 self.peak_allocated()
2024 )
2025 .unwrap();
2026
2027 writeln!(
2029 output,
2030 "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2031 )
2032 .unwrap();
2033 writeln!(
2034 output,
2035 "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2036 )
2037 .unwrap();
2038 writeln!(
2039 output,
2040 "ringkernel_gpu_memory_allocation_count {}",
2041 self.allocation_count()
2042 )
2043 .unwrap();
2044
2045 let device_stats = self.device_stats.read();
2047 for device in device_stats.values() {
2048 writeln!(
2049 output,
2050 "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2051 device.device_name, device.total_memory
2052 )
2053 .unwrap();
2054 writeln!(
2055 output,
2056 "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2057 device.device_name, device.free_memory
2058 )
2059 .unwrap();
2060 writeln!(
2061 output,
2062 "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2063 device.device_name,
2064 device.used_memory()
2065 )
2066 .unwrap();
2067 writeln!(
2068 output,
2069 "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2070 device.device_name,
2071 device.utilization()
2072 )
2073 .unwrap();
2074 }
2075
2076 output
2077 }
2078
2079 pub fn summary_report(&self) -> String {
2081 let mut report = String::new();
2082
2083 writeln!(report, "=== GPU Memory Dashboard ===").unwrap();
2084 writeln!(report, "Total Allocated: {} bytes", self.total_allocated()).unwrap();
2085 writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated()).unwrap();
2086 writeln!(report, "Active Allocations: {}", self.allocation_count()).unwrap();
2087 writeln!(report).unwrap();
2088
2089 let device_stats = self.device_stats.read();
2091 for device in device_stats.values() {
2092 writeln!(
2093 report,
2094 "--- Device {} ({}) ---",
2095 device.device_index, device.device_name
2096 )
2097 .unwrap();
2098 writeln!(
2099 report,
2100 " Total: {} MB",
2101 device.total_memory / (1024 * 1024)
2102 )
2103 .unwrap();
2104 writeln!(report, " Free: {} MB", device.free_memory / (1024 * 1024)).unwrap();
2105 writeln!(
2106 report,
2107 " RingKernel: {} MB",
2108 device.ringkernel_used / (1024 * 1024)
2109 )
2110 .unwrap();
2111 writeln!(report, " Utilization: {:.1}%", device.utilization()).unwrap();
2112 writeln!(
2113 report,
2114 " Pressure: {:?}",
2115 self.check_pressure(device.device_index)
2116 )
2117 .unwrap();
2118 }
2119
2120 let allocations = self.allocations.read();
2122 let mut sorted_allocs: Vec<_> = allocations.values().collect();
2123 sorted_allocs.sort_by(|a, b| b.size.cmp(&a.size));
2124
2125 if !sorted_allocs.is_empty() {
2126 writeln!(report).unwrap();
2127 writeln!(report, "--- Top 10 Allocations ---").unwrap();
2128 for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2129 writeln!(
2130 report,
2131 " {}. {} - {} bytes ({:?})",
2132 i + 1,
2133 alloc.name,
2134 alloc.size,
2135 alloc.memory_type
2136 )
2137 .unwrap();
2138 }
2139 }
2140
2141 report
2142 }
2143}
2144
2145impl Default for GpuMemoryDashboard {
2146 fn default() -> Self {
2147 Self {
2148 allocations: RwLock::new(HashMap::new()),
2149 device_stats: RwLock::new(HashMap::new()),
2150 thresholds: GpuMemoryThresholds::default(),
2151 allocation_counter: AtomicU64::new(1),
2152 total_allocated: AtomicU64::new(0),
2153 peak_allocated: AtomicU64::new(0),
2154 }
2155 }
2156}
2157
2158#[cfg(test)]
2159mod tests {
2160 use super::*;
2161 use crate::runtime::KernelId;
2162
2163 #[test]
2164 fn test_trace_id_generation() {
2165 let id1 = TraceId::new();
2166 let id2 = TraceId::new();
2167 assert_ne!(id1.0, id2.0);
2168 }
2169
2170 #[test]
2171 fn test_trace_id_hex() {
2172 let id = TraceId(0x123456789abcdef0123456789abcdef0);
2173 let hex = id.to_hex();
2174 assert_eq!(hex.len(), 32);
2175 let parsed = TraceId::from_hex(&hex).unwrap();
2176 assert_eq!(id, parsed);
2177 }
2178
2179 #[test]
2180 fn test_span_creation() {
2181 let span = Span::new("test_operation", SpanKind::Internal);
2182 assert!(!span.is_ended());
2183 assert_eq!(span.name, "test_operation");
2184 }
2185
2186 #[test]
2187 fn test_span_child() {
2188 let parent = Span::new("parent", SpanKind::Server);
2189 let child = parent.child("child", SpanKind::Internal);
2190
2191 assert_eq!(child.trace_id, parent.trace_id);
2192 assert_eq!(child.parent_span_id, Some(parent.span_id));
2193 }
2194
2195 #[test]
2196 fn test_span_attributes() {
2197 let mut span = Span::new("test", SpanKind::Internal);
2198 span.set_attribute("string_key", "value");
2199 span.set_attribute("int_key", 42i64);
2200 span.set_attribute("bool_key", true);
2201
2202 assert_eq!(span.attributes.len(), 3);
2203 }
2204
2205 #[test]
2206 fn test_span_events() {
2207 let mut span = Span::new("test", SpanKind::Internal);
2208 span.add_event("event1");
2209 span.add_event("event2");
2210
2211 assert_eq!(span.events.len(), 2);
2212 }
2213
2214 #[test]
2215 fn test_span_builder() {
2216 let parent = Span::new("parent", SpanKind::Server);
2217 let span = SpanBuilder::new("child")
2218 .kind(SpanKind::Client)
2219 .parent(&parent)
2220 .attribute("key", "value")
2221 .build();
2222
2223 assert_eq!(span.trace_id, parent.trace_id);
2224 assert_eq!(span.kind, SpanKind::Client);
2225 assert!(span.attributes.contains_key("key"));
2226 }
2227
2228 #[test]
2229 fn test_prometheus_exporter() {
2230 let exporter = PrometheusExporter::new();
2231 exporter.register_counter("test_counter", "A test counter", &["label1"]);
2232 exporter.register_gauge("test_gauge", "A test gauge", &[]);
2233
2234 exporter.inc_counter("test_counter", &["value1"]);
2235 exporter.inc_counter("test_counter", &["value1"]);
2236 exporter.set_metric("test_gauge", 42.0, &[]);
2237
2238 let output = exporter.render();
2239 assert!(output.contains("test_counter"));
2240 assert!(output.contains("test_gauge"));
2241 }
2242
2243 #[test]
2244 fn test_grafana_dashboard() {
2245 let dashboard = GrafanaDashboard::new("Test Dashboard")
2246 .description("A test dashboard")
2247 .add_throughput_panel()
2248 .add_latency_panel()
2249 .build();
2250
2251 assert!(dashboard.contains("Test Dashboard"));
2252 assert!(dashboard.contains("Message Throughput"));
2253 assert!(dashboard.contains("Message Latency"));
2254 }
2255
2256 #[test]
2257 fn test_observability_context() {
2258 let ctx = ObservabilityContext::new();
2259
2260 let span = ctx.start_span("test", SpanKind::Internal);
2261 assert_eq!(ctx.active_span_count(), 1);
2262
2263 ctx.end_span(span);
2264 assert_eq!(ctx.active_span_count(), 0);
2265
2266 let exported = ctx.export_spans();
2267 assert_eq!(exported.len(), 1);
2268 }
2269
2270 #[test]
2271 fn test_ringkernel_collector() {
2272 let collector = Arc::new(MetricsCollector::new());
2273 let kernel_id = KernelId::new("test");
2274
2275 collector.record_message_processed(&kernel_id, 100);
2276 collector.record_message_processed(&kernel_id, 200);
2277
2278 let prom_collector = RingKernelCollector::new(collector);
2279 let defs = prom_collector.definitions();
2280 let samples = prom_collector.collect();
2281
2282 assert!(!defs.is_empty());
2283 assert!(!samples.is_empty());
2284 }
2285
2286 #[test]
2289 fn test_profiler_color() {
2290 let color = ProfilerColor::new(128, 64, 32);
2291 assert_eq!(color.r, 128);
2292 assert_eq!(color.g, 64);
2293 assert_eq!(color.b, 32);
2294 assert_eq!(color.a, 255);
2295
2296 assert_eq!(ProfilerColor::RED.r, 255);
2297 assert_eq!(ProfilerColor::GREEN.g, 255);
2298 assert_eq!(ProfilerColor::BLUE.b, 255);
2299 }
2300
2301 #[test]
2302 fn test_null_profiler() {
2303 let profiler = NullProfiler;
2304 assert!(!profiler.is_available());
2305 assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2306
2307 assert!(profiler.start_capture().is_ok());
2309 assert!(profiler.end_capture().is_ok());
2310 assert!(profiler.trigger_capture().is_ok());
2311
2312 let range = profiler.push_range("test", ProfilerColor::RED);
2313 let _elapsed = range.elapsed(); profiler.pop_range();
2315 profiler.mark("marker", ProfilerColor::BLUE);
2316 profiler.set_thread_name("thread");
2317 }
2318
2319 #[test]
2320 fn test_nvtx_profiler_stub() {
2321 let profiler = NvtxProfiler::new();
2322 assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2323
2324 assert!(!profiler.is_available());
2326 assert!(!profiler.is_nvtx_loaded());
2327
2328 assert!(matches!(
2330 profiler.start_capture(),
2331 Err(ProfilerError::NotAvailable)
2332 ));
2333 }
2334
2335 #[test]
2336 fn test_renderdoc_profiler_stub() {
2337 let profiler = RenderDocProfiler::new();
2338 assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2339
2340 assert!(!profiler.is_available());
2342 assert!(!profiler.is_attached());
2343 assert!(profiler.get_capture_path().is_none());
2344
2345 assert!(matches!(
2347 profiler.launch_ui(),
2348 Err(ProfilerError::NotAttached)
2349 ));
2350 }
2351
2352 #[test]
2353 fn test_gpu_profiler_manager() {
2354 let manager = GpuProfilerManager::new();
2355
2356 assert!(!manager.is_enabled());
2358 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2359
2360 manager.set_enabled(true);
2362 assert!(manager.is_enabled());
2363 manager.set_enabled(false);
2364 assert!(!manager.is_enabled());
2365 }
2366
2367 #[test]
2368 fn test_profiler_scope() {
2369 let manager = GpuProfilerManager::new();
2370
2371 {
2373 let _scope = manager.scope("test_scope");
2374 }
2376
2377 {
2378 let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2379 }
2380
2381 manager.mark("test_marker");
2383 }
2384
2385 #[test]
2386 fn test_profiler_with_custom() {
2387 let custom_profiler = Arc::new(NullProfiler);
2388 let manager = GpuProfilerManager::with_profiler(custom_profiler);
2389
2390 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2391 }
2392
2393 #[test]
2394 fn test_profiler_range_elapsed() {
2395 let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2396 std::thread::sleep(std::time::Duration::from_millis(10));
2397 let elapsed = range.elapsed();
2398 assert!(elapsed.as_millis() >= 10);
2399 }
2400
2401 #[test]
2402 fn test_profiler_error_display() {
2403 let err = ProfilerError::NotAvailable;
2404 assert!(err.to_string().contains("not available"));
2405
2406 let err = ProfilerError::NotAttached;
2407 assert!(err.to_string().contains("not attached"));
2408
2409 let err = ProfilerError::CaptureInProgress;
2410 assert!(err.to_string().contains("in progress"));
2411
2412 let err = ProfilerError::Backend("test error".to_string());
2413 assert!(err.to_string().contains("test error"));
2414 }
2415
2416 #[test]
2419 fn test_gpu_memory_dashboard_creation() {
2420 let dashboard = GpuMemoryDashboard::new();
2421 assert_eq!(dashboard.total_allocated(), 0);
2422 assert_eq!(dashboard.peak_allocated(), 0);
2423 assert_eq!(dashboard.allocation_count(), 0);
2424 }
2425
2426 #[test]
2427 fn test_gpu_memory_allocation_tracking() {
2428 let dashboard = GpuMemoryDashboard::new();
2429
2430 dashboard.track_allocation(
2432 1,
2433 "test_buffer",
2434 65536,
2435 GpuMemoryType::DeviceLocal,
2436 0,
2437 Some("test_kernel"),
2438 );
2439
2440 assert_eq!(dashboard.total_allocated(), 65536);
2441 assert_eq!(dashboard.peak_allocated(), 65536);
2442 assert_eq!(dashboard.allocation_count(), 1);
2443
2444 dashboard.track_allocation(
2446 2,
2447 "queue_buffer",
2448 1024,
2449 GpuMemoryType::QueueBuffer,
2450 0,
2451 Some("test_kernel"),
2452 );
2453
2454 assert_eq!(dashboard.total_allocated(), 66560);
2455 assert_eq!(dashboard.peak_allocated(), 66560);
2456 assert_eq!(dashboard.allocation_count(), 2);
2457
2458 dashboard.track_deallocation(1);
2460 assert_eq!(dashboard.total_allocated(), 1024);
2461 assert_eq!(dashboard.peak_allocated(), 66560); assert_eq!(dashboard.allocation_count(), 1);
2463 }
2464
2465 #[test]
2466 fn test_gpu_memory_device_stats() {
2467 let dashboard = GpuMemoryDashboard::new();
2468
2469 dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); let stats = dashboard.get_device_stats(0).unwrap();
2473 assert_eq!(stats.device_index, 0);
2474 assert_eq!(stats.device_name, "NVIDIA RTX 4090");
2475 assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
2476 assert_eq!(stats.utilization(), 0.0);
2477
2478 let used = 8 * 1024 * 1024 * 1024; let free = 16 * 1024 * 1024 * 1024; dashboard.update_device_stats(0, free, used);
2482
2483 let stats = dashboard.get_device_stats(0).unwrap();
2484 assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
2485 }
2486
2487 #[test]
2488 fn test_gpu_memory_pressure_levels() {
2489 let dashboard = GpuMemoryDashboard::new();
2490
2491 dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
2493
2494 dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
2496 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
2497
2498 dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
2500 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
2501
2502 dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
2504 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
2505
2506 dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
2508 assert_eq!(
2509 dashboard.check_pressure(0),
2510 MemoryPressureLevel::OutOfMemory
2511 );
2512 }
2513
2514 #[test]
2515 fn test_gpu_memory_kernel_allocations() {
2516 let dashboard = GpuMemoryDashboard::new();
2517
2518 dashboard.track_allocation(
2520 1,
2521 "buf1",
2522 1000,
2523 GpuMemoryType::DeviceLocal,
2524 0,
2525 Some("kernel_a"),
2526 );
2527 dashboard.track_allocation(
2528 2,
2529 "buf2",
2530 2000,
2531 GpuMemoryType::DeviceLocal,
2532 0,
2533 Some("kernel_a"),
2534 );
2535 dashboard.track_allocation(
2536 3,
2537 "buf3",
2538 3000,
2539 GpuMemoryType::DeviceLocal,
2540 0,
2541 Some("kernel_b"),
2542 );
2543
2544 let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
2545 assert_eq!(kernel_a_allocs.len(), 2);
2546
2547 let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
2548 assert_eq!(kernel_b_allocs.len(), 1);
2549
2550 let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
2551 assert_eq!(kernel_c_allocs.len(), 0);
2552 }
2553
2554 #[test]
2555 fn test_gpu_memory_prometheus_metrics() {
2556 let dashboard = GpuMemoryDashboard::new();
2557 dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
2558 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
2559
2560 let metrics = dashboard.prometheus_metrics();
2561 assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
2562 assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
2563 assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
2564 }
2565
2566 #[test]
2567 fn test_gpu_memory_summary_report() {
2568 let dashboard = GpuMemoryDashboard::new();
2569 dashboard.track_allocation(
2570 1,
2571 "large_buffer",
2572 1024 * 1024,
2573 GpuMemoryType::DeviceLocal,
2574 0,
2575 None,
2576 );
2577 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
2578
2579 let report = dashboard.summary_report();
2580 assert!(report.contains("GPU Memory Dashboard"));
2581 assert!(report.contains("large_buffer"));
2582 }
2583
2584 #[test]
2585 fn test_gpu_memory_pool_stats() {
2586 let pool_stats = GpuMemoryPoolStats {
2587 name: "default".to_string(),
2588 capacity: 1024 * 1024,
2589 allocated: 512 * 1024,
2590 peak_allocated: 768 * 1024,
2591 allocation_count: 10,
2592 total_allocations: 100,
2593 total_deallocations: 90,
2594 fragmentation: 0.1,
2595 };
2596
2597 assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
2598 }
2599
2600 #[test]
2601 fn test_gpu_memory_types() {
2602 let types = [
2604 GpuMemoryType::DeviceLocal,
2605 GpuMemoryType::HostVisible,
2606 GpuMemoryType::HostCoherent,
2607 GpuMemoryType::Mapped,
2608 GpuMemoryType::QueueBuffer,
2609 GpuMemoryType::ControlBlock,
2610 GpuMemoryType::SharedMemory,
2611 ];
2612
2613 for (i, t1) in types.iter().enumerate() {
2614 for (j, t2) in types.iter().enumerate() {
2615 if i != j {
2616 assert_ne!(t1, t2);
2617 }
2618 }
2619 }
2620 }
2621
2622 #[test]
2623 fn test_gpu_memory_grafana_panel() {
2624 let dashboard = GpuMemoryDashboard::new();
2625 let panel = dashboard.grafana_panel();
2626
2627 assert_eq!(panel.title, "GPU Memory Usage");
2628 assert_eq!(panel.panel_type, PanelType::BarGauge);
2629 assert!(!panel.queries.is_empty());
2630 }
2631
2632 #[test]
2633 fn test_gpu_memory_allocation_id_generation() {
2634 let dashboard = GpuMemoryDashboard::new();
2635
2636 let id1 = dashboard.next_allocation_id();
2637 let id2 = dashboard.next_allocation_id();
2638 let id3 = dashboard.next_allocation_id();
2639
2640 assert_eq!(id1, 1);
2641 assert_eq!(id2, 2);
2642 assert_eq!(id3, 3);
2643 }
2644}