1use parking_lot::RwLock;
31use std::collections::HashMap;
32use std::fmt::Write;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant, SystemTime};
36
37use crate::telemetry_pipeline::MetricsCollector;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub struct TraceId(pub u128);
46
47impl TraceId {
48 pub fn new() -> Self {
50 use std::hash::{Hash, Hasher};
51 let mut hasher = std::collections::hash_map::DefaultHasher::new();
52 SystemTime::now().hash(&mut hasher);
53 std::thread::current().id().hash(&mut hasher);
54 let high = hasher.finish() as u128;
55 hasher.write_u64(high as u64);
56 let low = hasher.finish() as u128;
57 Self((high << 64) | low)
58 }
59
60 pub fn from_hex(hex: &str) -> Option<Self> {
62 u128::from_str_radix(hex, 16).ok().map(Self)
63 }
64
65 pub fn to_hex(&self) -> String {
67 format!("{:032x}", self.0)
68 }
69}
70
71impl Default for TraceId {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79pub struct SpanId(pub u64);
80
81impl SpanId {
82 pub fn new() -> Self {
84 use std::hash::{Hash, Hasher};
85 let mut hasher = std::collections::hash_map::DefaultHasher::new();
86 SystemTime::now().hash(&mut hasher);
87 std::process::id().hash(&mut hasher);
88 Self(hasher.finish())
89 }
90
91 pub fn from_hex(hex: &str) -> Option<Self> {
93 u64::from_str_radix(hex, 16).ok().map(Self)
94 }
95
96 pub fn to_hex(&self) -> String {
98 format!("{:016x}", self.0)
99 }
100}
101
102impl Default for SpanId {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum SpanKind {
111 Internal,
113 Server,
115 Client,
117 Producer,
119 Consumer,
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum SpanStatus {
126 Unset,
128 Ok,
130 Error {
132 message: String,
134 },
135}
136
137#[derive(Debug, Clone)]
139pub struct Span {
140 pub trace_id: TraceId,
142 pub span_id: SpanId,
144 pub parent_span_id: Option<SpanId>,
146 pub name: String,
148 pub kind: SpanKind,
150 pub start_time: Instant,
152 pub end_time: Option<Instant>,
154 pub status: SpanStatus,
156 pub attributes: HashMap<String, AttributeValue>,
158 pub events: Vec<SpanEvent>,
160}
161
162#[derive(Debug, Clone)]
164pub enum AttributeValue {
165 String(String),
167 Int(i64),
169 Float(f64),
171 Bool(bool),
173 StringArray(Vec<String>),
175}
176
177impl From<&str> for AttributeValue {
178 fn from(s: &str) -> Self {
179 Self::String(s.to_string())
180 }
181}
182
183impl From<String> for AttributeValue {
184 fn from(s: String) -> Self {
185 Self::String(s)
186 }
187}
188
189impl From<i64> for AttributeValue {
190 fn from(i: i64) -> Self {
191 Self::Int(i)
192 }
193}
194
195impl From<f64> for AttributeValue {
196 fn from(f: f64) -> Self {
197 Self::Float(f)
198 }
199}
200
201impl From<bool> for AttributeValue {
202 fn from(b: bool) -> Self {
203 Self::Bool(b)
204 }
205}
206
207#[derive(Debug, Clone)]
209pub struct SpanEvent {
210 pub name: String,
212 pub timestamp: Instant,
214 pub attributes: HashMap<String, AttributeValue>,
216}
217
218impl Span {
219 pub fn new(name: impl Into<String>, kind: SpanKind) -> Self {
221 Self {
222 trace_id: TraceId::new(),
223 span_id: SpanId::new(),
224 parent_span_id: None,
225 name: name.into(),
226 kind,
227 start_time: Instant::now(),
228 end_time: None,
229 status: SpanStatus::Unset,
230 attributes: HashMap::new(),
231 events: Vec::new(),
232 }
233 }
234
235 pub fn child(&self, name: impl Into<String>, kind: SpanKind) -> Self {
237 Self {
238 trace_id: self.trace_id,
239 span_id: SpanId::new(),
240 parent_span_id: Some(self.span_id),
241 name: name.into(),
242 kind,
243 start_time: Instant::now(),
244 end_time: None,
245 status: SpanStatus::Unset,
246 attributes: HashMap::new(),
247 events: Vec::new(),
248 }
249 }
250
251 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<AttributeValue>) {
253 self.attributes.insert(key.into(), value.into());
254 }
255
256 pub fn add_event(&mut self, name: impl Into<String>) {
258 self.events.push(SpanEvent {
259 name: name.into(),
260 timestamp: Instant::now(),
261 attributes: HashMap::new(),
262 });
263 }
264
265 pub fn add_event_with_attributes(
267 &mut self,
268 name: impl Into<String>,
269 attributes: HashMap<String, AttributeValue>,
270 ) {
271 self.events.push(SpanEvent {
272 name: name.into(),
273 timestamp: Instant::now(),
274 attributes,
275 });
276 }
277
278 pub fn set_ok(&mut self) {
280 self.status = SpanStatus::Ok;
281 }
282
283 pub fn set_error(&mut self, message: impl Into<String>) {
285 self.status = SpanStatus::Error {
286 message: message.into(),
287 };
288 }
289
290 pub fn end(&mut self) {
292 self.end_time = Some(Instant::now());
293 }
294
295 pub fn duration(&self) -> Duration {
297 self.end_time
298 .unwrap_or_else(Instant::now)
299 .duration_since(self.start_time)
300 }
301
302 pub fn is_ended(&self) -> bool {
304 self.end_time.is_some()
305 }
306}
307
308pub struct SpanBuilder {
314 name: String,
315 kind: SpanKind,
316 parent: Option<(TraceId, SpanId)>,
317 attributes: HashMap<String, AttributeValue>,
318}
319
320impl SpanBuilder {
321 pub fn new(name: impl Into<String>) -> Self {
323 Self {
324 name: name.into(),
325 kind: SpanKind::Internal,
326 parent: None,
327 attributes: HashMap::new(),
328 }
329 }
330
331 pub fn kind(mut self, kind: SpanKind) -> Self {
333 self.kind = kind;
334 self
335 }
336
337 pub fn parent(mut self, parent: &Span) -> Self {
339 self.parent = Some((parent.trace_id, parent.span_id));
340 self
341 }
342
343 pub fn attribute(mut self, key: impl Into<String>, value: impl Into<AttributeValue>) -> Self {
345 self.attributes.insert(key.into(), value.into());
346 self
347 }
348
349 pub fn build(self) -> Span {
351 let mut span = Span::new(self.name, self.kind);
352 if let Some((trace_id, parent_id)) = self.parent {
353 span.trace_id = trace_id;
354 span.parent_span_id = Some(parent_id);
355 }
356 span.attributes = self.attributes;
357 span
358 }
359}
360
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
367pub enum MetricType {
368 Counter,
370 Gauge,
372 Histogram,
374 Summary,
376}
377
378#[derive(Debug, Clone)]
380pub struct MetricDefinition {
381 pub name: String,
383 pub metric_type: MetricType,
385 pub help: String,
387 pub labels: Vec<String>,
389}
390
391#[derive(Debug, Clone)]
393pub struct MetricSample {
394 pub name: String,
396 pub label_values: Vec<String>,
398 pub value: f64,
400 pub timestamp_ms: Option<u64>,
402}
403
404pub struct PrometheusExporter {
406 definitions: RwLock<Vec<MetricDefinition>>,
408 collectors: RwLock<Vec<Arc<dyn PrometheusCollector>>>,
410 custom_metrics: RwLock<HashMap<String, CustomMetric>>,
412 export_count: AtomicU64,
414}
415
416struct CustomMetric {
418 definition: MetricDefinition,
419 samples: Vec<MetricSample>,
420}
421
422pub trait PrometheusCollector: Send + Sync {
424 fn definitions(&self) -> Vec<MetricDefinition>;
426
427 fn collect(&self) -> Vec<MetricSample>;
429}
430
431impl PrometheusExporter {
432 pub fn new() -> Arc<Self> {
434 Arc::new(Self {
435 definitions: RwLock::new(Vec::new()),
436 collectors: RwLock::new(Vec::new()),
437 custom_metrics: RwLock::new(HashMap::new()),
438 export_count: AtomicU64::new(0),
439 })
440 }
441
442 pub fn register_collector(&self, collector: Arc<dyn PrometheusCollector>) {
444 let defs = collector.definitions();
445 self.definitions.write().extend(defs);
446 self.collectors.write().push(collector);
447 }
448
449 pub fn register_counter(&self, name: &str, help: &str, labels: &[&str]) {
451 let def = MetricDefinition {
452 name: name.to_string(),
453 metric_type: MetricType::Counter,
454 help: help.to_string(),
455 labels: labels.iter().map(|s| s.to_string()).collect(),
456 };
457 self.custom_metrics.write().insert(
458 name.to_string(),
459 CustomMetric {
460 definition: def,
461 samples: Vec::new(),
462 },
463 );
464 }
465
466 pub fn register_gauge(&self, name: &str, help: &str, labels: &[&str]) {
468 let def = MetricDefinition {
469 name: name.to_string(),
470 metric_type: MetricType::Gauge,
471 help: help.to_string(),
472 labels: labels.iter().map(|s| s.to_string()).collect(),
473 };
474 self.custom_metrics.write().insert(
475 name.to_string(),
476 CustomMetric {
477 definition: def,
478 samples: Vec::new(),
479 },
480 );
481 }
482
483 pub fn register_histogram(&self, name: &str, help: &str, labels: &[&str]) {
485 let def = MetricDefinition {
486 name: name.to_string(),
487 metric_type: MetricType::Histogram,
488 help: help.to_string(),
489 labels: labels.iter().map(|s| s.to_string()).collect(),
490 };
491 self.custom_metrics.write().insert(
492 name.to_string(),
493 CustomMetric {
494 definition: def,
495 samples: Vec::new(),
496 },
497 );
498 }
499
500 pub fn set_metric(&self, name: &str, value: f64, label_values: &[&str]) {
502 let mut metrics = self.custom_metrics.write();
503 if let Some(metric) = metrics.get_mut(name) {
504 let sample = MetricSample {
505 name: name.to_string(),
506 label_values: label_values.iter().map(|s| s.to_string()).collect(),
507 value,
508 timestamp_ms: None,
509 };
510 let existing = metric
512 .samples
513 .iter_mut()
514 .find(|s| s.label_values == sample.label_values);
515 if let Some(existing) = existing {
516 existing.value = value;
517 } else {
518 metric.samples.push(sample);
519 }
520 }
521 }
522
523 pub fn inc_counter(&self, name: &str, label_values: &[&str]) {
525 self.add_counter(name, 1.0, label_values);
526 }
527
528 pub fn add_counter(&self, name: &str, delta: f64, label_values: &[&str]) {
530 let mut metrics = self.custom_metrics.write();
531 if let Some(metric) = metrics.get_mut(name) {
532 let label_vec: Vec<String> = label_values.iter().map(|s| s.to_string()).collect();
533 let existing = metric
534 .samples
535 .iter_mut()
536 .find(|s| s.label_values == label_vec);
537 if let Some(existing) = existing {
538 existing.value += delta;
539 } else {
540 metric.samples.push(MetricSample {
541 name: name.to_string(),
542 label_values: label_vec,
543 value: delta,
544 timestamp_ms: None,
545 });
546 }
547 }
548 }
549
550 pub fn render(&self) -> String {
552 self.export_count.fetch_add(1, Ordering::Relaxed);
553
554 let mut output = String::new();
555
556 let collectors = self.collectors.read();
558 for collector in collectors.iter() {
559 let defs = collector.definitions();
560 let samples = collector.collect();
561
562 for def in &defs {
563 writeln!(output, "# HELP {} {}", def.name, def.help)
565 .expect("write to String is infallible");
566 writeln!(
567 output,
568 "# TYPE {} {}",
569 def.name,
570 match def.metric_type {
571 MetricType::Counter => "counter",
572 MetricType::Gauge => "gauge",
573 MetricType::Histogram => "histogram",
574 MetricType::Summary => "summary",
575 }
576 )
577 .expect("write to String is infallible");
578
579 for sample in samples.iter().filter(|s| s.name == def.name) {
581 Self::write_sample(&mut output, &def.labels, sample);
582 }
583 }
584 }
585
586 let custom = self.custom_metrics.read();
588 for metric in custom.values() {
589 writeln!(
590 output,
591 "# HELP {} {}",
592 metric.definition.name, metric.definition.help
593 )
594 .expect("write to String is infallible");
595 writeln!(
596 output,
597 "# TYPE {} {}",
598 metric.definition.name,
599 match metric.definition.metric_type {
600 MetricType::Counter => "counter",
601 MetricType::Gauge => "gauge",
602 MetricType::Histogram => "histogram",
603 MetricType::Summary => "summary",
604 }
605 )
606 .expect("write to String is infallible");
607
608 for sample in &metric.samples {
609 Self::write_sample(&mut output, &metric.definition.labels, sample);
610 }
611 }
612
613 output
614 }
615
616 fn write_sample(output: &mut String, labels: &[String], sample: &MetricSample) {
617 if labels.is_empty() || sample.label_values.is_empty() {
618 writeln!(output, "{} {}", sample.name, sample.value)
619 .expect("write to String is infallible");
620 } else {
621 let label_pairs: Vec<String> = labels
622 .iter()
623 .zip(sample.label_values.iter())
624 .map(|(k, v)| format!("{}=\"{}\"", k, v))
625 .collect();
626 writeln!(
627 output,
628 "{}{{{}}} {}",
629 sample.name,
630 label_pairs.join(","),
631 sample.value
632 )
633 .expect("write to String is infallible");
634 }
635 }
636
637 pub fn export_count(&self) -> u64 {
639 self.export_count.load(Ordering::Relaxed)
640 }
641}
642
643impl Default for PrometheusExporter {
644 fn default() -> Self {
645 Self {
646 definitions: RwLock::new(Vec::new()),
647 collectors: RwLock::new(Vec::new()),
648 custom_metrics: RwLock::new(HashMap::new()),
649 export_count: AtomicU64::new(0),
650 }
651 }
652}
653
654pub struct RingKernelCollector {
660 collector: Arc<MetricsCollector>,
662}
663
664impl RingKernelCollector {
665 pub fn new(collector: Arc<MetricsCollector>) -> Arc<Self> {
667 Arc::new(Self { collector })
668 }
669}
670
671impl PrometheusCollector for RingKernelCollector {
672 fn definitions(&self) -> Vec<MetricDefinition> {
673 vec![
674 MetricDefinition {
675 name: "ringkernel_messages_processed_total".to_string(),
676 metric_type: MetricType::Counter,
677 help: "Total number of messages processed by kernels".to_string(),
678 labels: vec!["kernel_id".to_string()],
679 },
680 MetricDefinition {
681 name: "ringkernel_messages_dropped_total".to_string(),
682 metric_type: MetricType::Counter,
683 help: "Total number of messages dropped by kernels".to_string(),
684 labels: vec!["kernel_id".to_string()],
685 },
686 MetricDefinition {
687 name: "ringkernel_latency_us".to_string(),
688 metric_type: MetricType::Gauge,
689 help: "Current average message latency in microseconds".to_string(),
690 labels: vec!["kernel_id".to_string(), "stat".to_string()],
691 },
692 MetricDefinition {
693 name: "ringkernel_throughput".to_string(),
694 metric_type: MetricType::Gauge,
695 help: "Current message throughput per second".to_string(),
696 labels: vec!["kernel_id".to_string()],
697 },
698 ]
699 }
700
701 fn collect(&self) -> Vec<MetricSample> {
702 let aggregate = self.collector.get_aggregate();
703 let elapsed = self.collector.elapsed().as_secs_f64().max(1.0);
704
705 vec![
706 MetricSample {
707 name: "ringkernel_messages_processed_total".to_string(),
708 label_values: vec!["aggregate".to_string()],
709 value: aggregate.messages_processed as f64,
710 timestamp_ms: None,
711 },
712 MetricSample {
713 name: "ringkernel_messages_dropped_total".to_string(),
714 label_values: vec!["aggregate".to_string()],
715 value: aggregate.messages_dropped as f64,
716 timestamp_ms: None,
717 },
718 MetricSample {
719 name: "ringkernel_latency_us".to_string(),
720 label_values: vec!["aggregate".to_string(), "avg".to_string()],
721 value: aggregate.avg_latency_us(),
722 timestamp_ms: None,
723 },
724 MetricSample {
725 name: "ringkernel_latency_us".to_string(),
726 label_values: vec!["aggregate".to_string(), "min".to_string()],
727 value: aggregate.min_latency_us as f64,
728 timestamp_ms: None,
729 },
730 MetricSample {
731 name: "ringkernel_latency_us".to_string(),
732 label_values: vec!["aggregate".to_string(), "max".to_string()],
733 value: aggregate.max_latency_us as f64,
734 timestamp_ms: None,
735 },
736 MetricSample {
737 name: "ringkernel_throughput".to_string(),
738 label_values: vec!["aggregate".to_string()],
739 value: aggregate.messages_processed as f64 / elapsed,
740 timestamp_ms: None,
741 },
742 ]
743 }
744}
745
746#[derive(Debug, Clone, Copy, PartialEq, Eq)]
752pub enum PanelType {
753 Graph,
755 Stat,
757 Table,
759 Heatmap,
761 BarGauge,
763}
764
765#[derive(Debug, Clone)]
767pub struct GrafanaPanel {
768 pub title: String,
770 pub panel_type: PanelType,
772 pub queries: Vec<String>,
774 pub grid_pos: (u32, u32, u32, u32), pub unit: Option<String>,
778}
779
780pub struct GrafanaDashboard {
782 title: String,
784 description: String,
786 panels: Vec<GrafanaPanel>,
788 refresh: String,
790 time_from: String,
792 tags: Vec<String>,
794}
795
796impl GrafanaDashboard {
797 pub fn new(title: impl Into<String>) -> Self {
799 Self {
800 title: title.into(),
801 description: String::new(),
802 panels: Vec::new(),
803 refresh: "5s".to_string(),
804 time_from: "now-1h".to_string(),
805 tags: vec!["ringkernel".to_string()],
806 }
807 }
808
809 pub fn description(mut self, desc: impl Into<String>) -> Self {
811 self.description = desc.into();
812 self
813 }
814
815 pub fn refresh(mut self, interval: impl Into<String>) -> Self {
817 self.refresh = interval.into();
818 self
819 }
820
821 pub fn time_from(mut self, from: impl Into<String>) -> Self {
823 self.time_from = from.into();
824 self
825 }
826
827 pub fn tag(mut self, tag: impl Into<String>) -> Self {
829 self.tags.push(tag.into());
830 self
831 }
832
833 pub fn panel(mut self, panel: GrafanaPanel) -> Self {
835 self.panels.push(panel);
836 self
837 }
838
839 pub fn add_throughput_panel(mut self) -> Self {
841 self.panels.push(GrafanaPanel {
842 title: "Message Throughput".to_string(),
843 panel_type: PanelType::Graph,
844 queries: vec!["rate(ringkernel_messages_processed_total[1m])".to_string()],
845 grid_pos: (0, 0, 12, 8),
846 unit: Some("msg/s".to_string()),
847 });
848 self
849 }
850
851 pub fn add_latency_panel(mut self) -> Self {
853 self.panels.push(GrafanaPanel {
854 title: "Message Latency".to_string(),
855 panel_type: PanelType::Graph,
856 queries: vec![
857 "ringkernel_latency_us{stat=\"avg\"}".to_string(),
858 "ringkernel_latency_us{stat=\"max\"}".to_string(),
859 ],
860 grid_pos: (12, 0, 12, 8),
861 unit: Some("µs".to_string()),
862 });
863 self
864 }
865
866 pub fn add_kernel_status_panel(mut self) -> Self {
868 self.panels.push(GrafanaPanel {
869 title: "Active Kernels".to_string(),
870 panel_type: PanelType::Stat,
871 queries: vec!["count(ringkernel_messages_processed_total)".to_string()],
872 grid_pos: (0, 8, 6, 4),
873 unit: None,
874 });
875 self
876 }
877
878 pub fn add_drop_rate_panel(mut self) -> Self {
880 self.panels.push(GrafanaPanel {
881 title: "Message Drop Rate".to_string(),
882 panel_type: PanelType::Graph,
883 queries: vec![
884 "rate(ringkernel_messages_dropped_total[1m]) / rate(ringkernel_messages_processed_total[1m])".to_string(),
885 ],
886 grid_pos: (6, 8, 6, 4),
887 unit: Some("percentunit".to_string()),
888 });
889 self
890 }
891
892 pub fn add_multi_gpu_panel(mut self) -> Self {
894 self.panels.push(GrafanaPanel {
895 title: "GPU Memory Usage".to_string(),
896 panel_type: PanelType::BarGauge,
897 queries: vec!["ringkernel_gpu_memory_used_bytes".to_string()],
898 grid_pos: (12, 8, 12, 4),
899 unit: Some("bytes".to_string()),
900 });
901 self
902 }
903
904 pub fn add_standard_panels(self) -> Self {
906 self.add_throughput_panel()
907 .add_latency_panel()
908 .add_kernel_status_panel()
909 .add_drop_rate_panel()
910 .add_multi_gpu_panel()
911 }
912
913 pub fn build(&self) -> String {
915 let panels_json: Vec<String> = self
916 .panels
917 .iter()
918 .enumerate()
919 .map(|(i, panel)| {
920 let queries_json: Vec<String> = panel
921 .queries
922 .iter()
923 .enumerate()
924 .map(|(j, q)| {
925 format!(
926 r#"{{
927 "expr": "{}",
928 "refId": "{}",
929 "legendFormat": "{{}}"
930 }}"#,
931 q,
932 (b'A' + j as u8) as char
933 )
934 })
935 .collect();
936
937 let unit_field = panel
938 .unit
939 .as_ref()
940 .map(|u| format!(r#""unit": "{}","#, u))
941 .unwrap_or_default();
942
943 format!(
944 r#"{{
945 "id": {},
946 "title": "{}",
947 "type": "{}",
948 "gridPos": {{"x": {}, "y": {}, "w": {}, "h": {}}},
949 {}
950 "targets": [{}],
951 "datasource": {{"type": "prometheus", "uid": "${{datasource}}"}}
952 }}"#,
953 i + 1,
954 panel.title,
955 match panel.panel_type {
956 PanelType::Graph => "timeseries",
957 PanelType::Stat => "stat",
958 PanelType::Table => "table",
959 PanelType::Heatmap => "heatmap",
960 PanelType::BarGauge => "bargauge",
961 },
962 panel.grid_pos.0,
963 panel.grid_pos.1,
964 panel.grid_pos.2,
965 panel.grid_pos.3,
966 unit_field,
967 queries_json.join(",")
968 )
969 })
970 .collect();
971
972 let tags_json: Vec<String> = self.tags.iter().map(|t| format!(r#""{}""#, t)).collect();
973
974 format!(
975 r#"{{
976 "title": "{}",
977 "description": "{}",
978 "tags": [{}],
979 "refresh": "{}",
980 "time": {{"from": "{}", "to": "now"}},
981 "templating": {{
982 "list": [
983 {{
984 "name": "datasource",
985 "type": "datasource",
986 "query": "prometheus"
987 }},
988 {{
989 "name": "kernel_id",
990 "type": "query",
991 "query": "label_values(ringkernel_messages_processed_total, kernel_id)",
992 "multi": true,
993 "includeAll": true
994 }}
995 ]
996 }},
997 "panels": [{}]
998 }}"#,
999 self.title,
1000 self.description,
1001 tags_json.join(","),
1002 self.refresh,
1003 self.time_from,
1004 panels_json.join(",")
1005 )
1006 }
1007}
1008
1009pub struct ObservabilityContext {
1015 active_spans: RwLock<HashMap<SpanId, Span>>,
1017 completed_spans: RwLock<Vec<Span>>,
1019 max_completed: usize,
1021 prometheus: Arc<PrometheusExporter>,
1023}
1024
1025impl ObservabilityContext {
1026 pub fn new() -> Arc<Self> {
1028 Arc::new(Self {
1029 active_spans: RwLock::new(HashMap::new()),
1030 completed_spans: RwLock::new(Vec::new()),
1031 max_completed: 10000,
1032 prometheus: PrometheusExporter::new(),
1033 })
1034 }
1035
1036 pub fn start_span(&self, name: impl Into<String>, kind: SpanKind) -> Span {
1038 let span = Span::new(name, kind);
1039 self.active_spans.write().insert(span.span_id, span.clone());
1040 span
1041 }
1042
1043 pub fn start_child_span(&self, parent: &Span, name: impl Into<String>, kind: SpanKind) -> Span {
1045 let span = parent.child(name, kind);
1046 self.active_spans.write().insert(span.span_id, span.clone());
1047 span
1048 }
1049
1050 pub fn end_span(&self, mut span: Span) {
1052 span.end();
1053 self.active_spans.write().remove(&span.span_id);
1054
1055 let mut completed = self.completed_spans.write();
1056 completed.push(span);
1057 if completed.len() > self.max_completed {
1058 completed.remove(0);
1059 }
1060 }
1061
1062 pub fn prometheus(&self) -> &Arc<PrometheusExporter> {
1064 &self.prometheus
1065 }
1066
1067 pub fn export_spans(&self) -> Vec<Span> {
1069 self.completed_spans.write().drain(..).collect()
1070 }
1071
1072 pub fn active_span_count(&self) -> usize {
1074 self.active_spans.read().len()
1075 }
1076}
1077
1078impl Default for ObservabilityContext {
1079 fn default() -> Self {
1080 Self {
1081 active_spans: RwLock::new(HashMap::new()),
1082 completed_spans: RwLock::new(Vec::new()),
1083 max_completed: 10000,
1084 prometheus: PrometheusExporter::new(),
1085 }
1086 }
1087}
1088
1089#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1095pub enum GpuProfilerBackend {
1096 Nsight,
1098 RenderDoc,
1100 Pix,
1102 MetalSystemTrace,
1104 Rgp,
1106 Custom,
1108}
1109
1110#[derive(Debug, Clone, Copy)]
1112pub struct ProfilerColor {
1113 pub r: u8,
1115 pub g: u8,
1117 pub b: u8,
1119 pub a: u8,
1121}
1122
1123impl ProfilerColor {
1124 pub const fn new(r: u8, g: u8, b: u8) -> Self {
1126 Self { r, g, b, a: 255 }
1127 }
1128
1129 pub const RED: Self = Self::new(255, 0, 0);
1131 pub const GREEN: Self = Self::new(0, 255, 0);
1133 pub const BLUE: Self = Self::new(0, 0, 255);
1135 pub const YELLOW: Self = Self::new(255, 255, 0);
1137 pub const CYAN: Self = Self::new(0, 255, 255);
1139 pub const MAGENTA: Self = Self::new(255, 0, 255);
1141 pub const ORANGE: Self = Self::new(255, 165, 0);
1143}
1144
1145pub struct ProfilerRange {
1147 #[allow(dead_code)]
1149 name: String,
1150 #[allow(dead_code)]
1152 backend: GpuProfilerBackend,
1153 start: Instant,
1155}
1156
1157impl ProfilerRange {
1158 fn new(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1160 Self {
1161 name: name.into(),
1162 backend,
1163 start: Instant::now(),
1164 }
1165 }
1166
1167 pub fn stub(name: impl Into<String>, backend: GpuProfilerBackend) -> Self {
1173 Self::new(name, backend)
1174 }
1175
1176 pub fn elapsed(&self) -> Duration {
1178 self.start.elapsed()
1179 }
1180}
1181
1182impl Drop for ProfilerRange {
1183 fn drop(&mut self) {
1184 }
1187}
1188
1189pub trait GpuProfiler: Send + Sync {
1194 fn is_available(&self) -> bool {
1196 false
1197 }
1198
1199 fn backend(&self) -> GpuProfilerBackend;
1201
1202 fn start_capture(&self) -> Result<(), ProfilerError> {
1204 Ok(())
1205 }
1206
1207 fn end_capture(&self) -> Result<(), ProfilerError> {
1209 Ok(())
1210 }
1211
1212 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1214 Ok(())
1215 }
1216
1217 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1219 ProfilerRange::new(name, self.backend())
1220 }
1221
1222 fn pop_range(&self) {}
1224
1225 fn mark(&self, _name: &str, _color: ProfilerColor) {}
1227
1228 fn set_thread_name(&self, _name: &str) {}
1230
1231 fn message(&self, _text: &str) {}
1233
1234 fn register_allocation(&self, _ptr: u64, _size: usize, _name: &str) {}
1236
1237 fn unregister_allocation(&self, _ptr: u64) {}
1239}
1240
1241#[derive(Debug, Clone, thiserror::Error)]
1243pub enum ProfilerError {
1244 #[error("GPU profiler not available")]
1246 NotAvailable,
1247 #[error("GPU profiler not attached")]
1249 NotAttached,
1250 #[error("Capture already in progress")]
1252 CaptureInProgress,
1253 #[error("No capture in progress")]
1255 NoCaptureInProgress,
1256 #[error("Profiler error: {0}")]
1258 Backend(String),
1259}
1260
1261pub struct NullProfiler;
1263
1264impl GpuProfiler for NullProfiler {
1265 fn backend(&self) -> GpuProfilerBackend {
1266 GpuProfilerBackend::Custom
1267 }
1268}
1269
1270pub struct NvtxProfiler {
1275 available: bool,
1277 capture_in_progress: std::sync::atomic::AtomicBool,
1279}
1280
1281impl NvtxProfiler {
1282 pub fn new() -> Self {
1286 Self {
1287 available: false, capture_in_progress: std::sync::atomic::AtomicBool::new(false),
1289 }
1290 }
1291
1292 pub fn is_nvtx_loaded(&self) -> bool {
1294 self.available
1296 }
1297}
1298
1299impl Default for NvtxProfiler {
1300 fn default() -> Self {
1301 Self::new()
1302 }
1303}
1304
1305impl GpuProfiler for NvtxProfiler {
1306 fn is_available(&self) -> bool {
1307 self.available
1308 }
1309
1310 fn backend(&self) -> GpuProfilerBackend {
1311 GpuProfilerBackend::Nsight
1312 }
1313
1314 fn start_capture(&self) -> Result<(), ProfilerError> {
1315 if !self.available {
1316 return Err(ProfilerError::NotAvailable);
1317 }
1318 if self.capture_in_progress.swap(true, Ordering::SeqCst) {
1319 return Err(ProfilerError::CaptureInProgress);
1320 }
1321 Ok(())
1323 }
1324
1325 fn end_capture(&self) -> Result<(), ProfilerError> {
1326 if !self.capture_in_progress.swap(false, Ordering::SeqCst) {
1327 return Err(ProfilerError::NoCaptureInProgress);
1328 }
1329 Ok(())
1331 }
1332
1333 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1334 ProfilerRange::new(name, self.backend())
1336 }
1337
1338 fn pop_range(&self) {
1339 }
1341
1342 fn mark(&self, _name: &str, _color: ProfilerColor) {
1343 }
1345
1346 fn set_thread_name(&self, _name: &str) {
1347 }
1349}
1350
1351pub struct RenderDocProfiler {
1355 attached: bool,
1357}
1358
1359impl RenderDocProfiler {
1360 pub fn new() -> Self {
1364 Self {
1365 attached: false, }
1367 }
1368
1369 pub fn is_attached(&self) -> bool {
1371 self.attached
1373 }
1374
1375 pub fn get_capture_path(&self) -> Option<String> {
1377 None
1379 }
1380
1381 pub fn launch_ui(&self) -> Result<(), ProfilerError> {
1383 if !self.attached {
1384 return Err(ProfilerError::NotAttached);
1385 }
1386 Ok(())
1388 }
1389}
1390
1391impl Default for RenderDocProfiler {
1392 fn default() -> Self {
1393 Self::new()
1394 }
1395}
1396
1397impl GpuProfiler for RenderDocProfiler {
1398 fn is_available(&self) -> bool {
1399 self.attached
1400 }
1401
1402 fn backend(&self) -> GpuProfilerBackend {
1403 GpuProfilerBackend::RenderDoc
1404 }
1405
1406 fn trigger_capture(&self) -> Result<(), ProfilerError> {
1407 if !self.attached {
1408 return Err(ProfilerError::NotAttached);
1409 }
1410 Ok(())
1412 }
1413
1414 fn start_capture(&self) -> Result<(), ProfilerError> {
1415 if !self.attached {
1416 return Err(ProfilerError::NotAttached);
1417 }
1418 Ok(())
1420 }
1421
1422 fn end_capture(&self) -> Result<(), ProfilerError> {
1423 Ok(())
1425 }
1426
1427 fn set_thread_name(&self, _name: &str) {
1428 }
1430}
1431
1432#[cfg(target_os = "macos")]
1436pub struct MetalProfiler {
1437 available: bool,
1439}
1440
1441#[cfg(target_os = "macos")]
1442impl MetalProfiler {
1443 pub fn new() -> Self {
1445 Self { available: true }
1446 }
1447}
1448
1449#[cfg(target_os = "macos")]
1450impl Default for MetalProfiler {
1451 fn default() -> Self {
1452 Self::new()
1453 }
1454}
1455
1456#[cfg(target_os = "macos")]
1457impl GpuProfiler for MetalProfiler {
1458 fn is_available(&self) -> bool {
1459 self.available
1460 }
1461
1462 fn backend(&self) -> GpuProfilerBackend {
1463 GpuProfilerBackend::MetalSystemTrace
1464 }
1465
1466 fn push_range(&self, name: &str, _color: ProfilerColor) -> ProfilerRange {
1467 ProfilerRange::new(name, self.backend())
1469 }
1470
1471 fn pop_range(&self) {
1472 }
1474
1475 fn mark(&self, _name: &str, _color: ProfilerColor) {
1476 }
1478}
1479
1480pub struct GpuProfilerManager {
1482 profiler: Arc<dyn GpuProfiler>,
1484 enabled: std::sync::atomic::AtomicBool,
1486}
1487
1488impl GpuProfilerManager {
1489 pub fn new() -> Self {
1491 let nvtx = NvtxProfiler::new();
1493 if nvtx.is_available() {
1494 return Self {
1495 profiler: Arc::new(nvtx),
1496 enabled: std::sync::atomic::AtomicBool::new(true),
1497 };
1498 }
1499
1500 let renderdoc = RenderDocProfiler::new();
1501 if renderdoc.is_available() {
1502 return Self {
1503 profiler: Arc::new(renderdoc),
1504 enabled: std::sync::atomic::AtomicBool::new(true),
1505 };
1506 }
1507
1508 Self {
1510 profiler: Arc::new(NullProfiler),
1511 enabled: std::sync::atomic::AtomicBool::new(false),
1512 }
1513 }
1514
1515 pub fn with_profiler(profiler: Arc<dyn GpuProfiler>) -> Self {
1517 let enabled = profiler.is_available();
1518 Self {
1519 profiler,
1520 enabled: std::sync::atomic::AtomicBool::new(enabled),
1521 }
1522 }
1523
1524 pub fn is_enabled(&self) -> bool {
1526 self.enabled.load(Ordering::Relaxed)
1527 }
1528
1529 pub fn set_enabled(&self, enabled: bool) {
1531 self.enabled.store(enabled, Ordering::Relaxed);
1532 }
1533
1534 pub fn backend(&self) -> GpuProfilerBackend {
1536 self.profiler.backend()
1537 }
1538
1539 pub fn scope(&self, name: &str) -> ProfilerScope<'_> {
1541 ProfilerScope::new(name, &*self.profiler, self.is_enabled())
1542 }
1543
1544 pub fn scope_colored(&self, name: &str, color: ProfilerColor) -> ProfilerScope<'_> {
1546 ProfilerScope::new_colored(name, &*self.profiler, self.is_enabled(), color)
1547 }
1548
1549 pub fn mark(&self, name: &str) {
1551 if self.is_enabled() {
1552 self.profiler.mark(name, ProfilerColor::CYAN);
1553 }
1554 }
1555
1556 pub fn profiler(&self) -> &dyn GpuProfiler {
1558 &*self.profiler
1559 }
1560}
1561
1562impl Default for GpuProfilerManager {
1563 fn default() -> Self {
1564 Self::new()
1565 }
1566}
1567
1568pub struct ProfilerScope<'a> {
1570 profiler: &'a dyn GpuProfiler,
1571 enabled: bool,
1572}
1573
1574impl<'a> ProfilerScope<'a> {
1575 fn new(name: &str, profiler: &'a dyn GpuProfiler, enabled: bool) -> Self {
1576 if enabled {
1577 profiler.push_range(name, ProfilerColor::CYAN);
1578 }
1579 Self { profiler, enabled }
1580 }
1581
1582 fn new_colored(
1583 name: &str,
1584 profiler: &'a dyn GpuProfiler,
1585 enabled: bool,
1586 color: ProfilerColor,
1587 ) -> Self {
1588 if enabled {
1589 profiler.push_range(name, color);
1590 }
1591 Self { profiler, enabled }
1592 }
1593}
1594
1595impl<'a> Drop for ProfilerScope<'a> {
1596 fn drop(&mut self) {
1597 if self.enabled {
1598 self.profiler.pop_range();
1599 }
1600 }
1601}
1602
1603#[macro_export]
1617macro_rules! gpu_profile {
1618 ($profiler:expr, $name:expr) => {
1619 let _scope = $profiler.scope($name);
1620 };
1621 ($profiler:expr, $name:expr, $color:expr) => {
1622 let _scope = $profiler.scope_colored($name, $color);
1623 };
1624}
1625
1626#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1632pub enum GpuMemoryType {
1633 DeviceLocal,
1635 HostVisible,
1637 HostCoherent,
1639 Mapped,
1641 QueueBuffer,
1643 ControlBlock,
1645 SharedMemory,
1647}
1648
1649#[derive(Debug, Clone)]
1651pub struct GpuMemoryAllocation {
1652 pub id: u64,
1654 pub name: String,
1656 pub size: usize,
1658 pub memory_type: GpuMemoryType,
1660 pub device_index: u32,
1662 pub kernel_id: Option<String>,
1664 pub allocated_at: Instant,
1666 pub in_use: bool,
1668}
1669
1670#[derive(Debug, Clone, Default)]
1672pub struct GpuMemoryPoolStats {
1673 pub name: String,
1675 pub capacity: usize,
1677 pub allocated: usize,
1679 pub peak_allocated: usize,
1681 pub allocation_count: u32,
1683 pub total_allocations: u64,
1685 pub total_deallocations: u64,
1687 pub fragmentation: f32,
1689}
1690
1691impl GpuMemoryPoolStats {
1692 pub fn utilization(&self) -> f32 {
1694 if self.capacity == 0 {
1695 0.0
1696 } else {
1697 (self.allocated as f32 / self.capacity as f32) * 100.0
1698 }
1699 }
1700}
1701
1702#[derive(Debug, Clone, Default)]
1704pub struct GpuDeviceMemoryStats {
1705 pub device_index: u32,
1707 pub device_name: String,
1709 pub total_memory: u64,
1711 pub free_memory: u64,
1713 pub ringkernel_used: u64,
1715 pub other_used: u64,
1717 pub pools: Vec<GpuMemoryPoolStats>,
1719}
1720
1721impl GpuDeviceMemoryStats {
1722 pub fn used_memory(&self) -> u64 {
1724 self.total_memory - self.free_memory
1725 }
1726
1727 pub fn utilization(&self) -> f32 {
1729 if self.total_memory == 0 {
1730 0.0
1731 } else {
1732 (self.used_memory() as f32 / self.total_memory as f32) * 100.0
1733 }
1734 }
1735}
1736
1737pub struct GpuMemoryDashboard {
1767 allocations: RwLock<HashMap<u64, GpuMemoryAllocation>>,
1769 device_stats: RwLock<HashMap<u32, GpuDeviceMemoryStats>>,
1771 thresholds: GpuMemoryThresholds,
1773 allocation_counter: AtomicU64,
1775 total_allocated: AtomicU64,
1777 peak_allocated: AtomicU64,
1779}
1780
1781#[derive(Debug, Clone)]
1783pub struct GpuMemoryThresholds {
1784 pub warning: f32,
1786 pub critical: f32,
1788 pub max_allocation_size: usize,
1790 pub max_allocation_count: u32,
1792}
1793
1794impl Default for GpuMemoryThresholds {
1795 fn default() -> Self {
1796 Self {
1797 warning: 75.0,
1798 critical: 90.0,
1799 max_allocation_size: 1024 * 1024 * 1024, max_allocation_count: 10000,
1801 }
1802 }
1803}
1804
1805#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1807pub enum MemoryPressureLevel {
1808 Normal,
1810 Elevated,
1812 Warning,
1814 Critical,
1816 OutOfMemory,
1818}
1819
1820impl GpuMemoryDashboard {
1821 pub fn new() -> Arc<Self> {
1823 Arc::new(Self {
1824 allocations: RwLock::new(HashMap::new()),
1825 device_stats: RwLock::new(HashMap::new()),
1826 thresholds: GpuMemoryThresholds::default(),
1827 allocation_counter: AtomicU64::new(1),
1828 total_allocated: AtomicU64::new(0),
1829 peak_allocated: AtomicU64::new(0),
1830 })
1831 }
1832
1833 pub fn with_thresholds(thresholds: GpuMemoryThresholds) -> Arc<Self> {
1835 Arc::new(Self {
1836 allocations: RwLock::new(HashMap::new()),
1837 device_stats: RwLock::new(HashMap::new()),
1838 thresholds,
1839 allocation_counter: AtomicU64::new(1),
1840 total_allocated: AtomicU64::new(0),
1841 peak_allocated: AtomicU64::new(0),
1842 })
1843 }
1844
1845 pub fn track_allocation(
1847 &self,
1848 id: u64,
1849 name: impl Into<String>,
1850 size: usize,
1851 memory_type: GpuMemoryType,
1852 device_index: u32,
1853 kernel_id: Option<&str>,
1854 ) {
1855 let allocation = GpuMemoryAllocation {
1856 id,
1857 name: name.into(),
1858 size,
1859 memory_type,
1860 device_index,
1861 kernel_id: kernel_id.map(String::from),
1862 allocated_at: Instant::now(),
1863 in_use: true,
1864 };
1865
1866 self.allocations.write().insert(id, allocation);
1867
1868 let new_total = self
1870 .total_allocated
1871 .fetch_add(size as u64, Ordering::Relaxed)
1872 + size as u64;
1873 let mut peak = self.peak_allocated.load(Ordering::Relaxed);
1874 while new_total > peak {
1875 match self.peak_allocated.compare_exchange_weak(
1876 peak,
1877 new_total,
1878 Ordering::Relaxed,
1879 Ordering::Relaxed,
1880 ) {
1881 Ok(_) => break,
1882 Err(current) => peak = current,
1883 }
1884 }
1885 }
1886
1887 pub fn next_allocation_id(&self) -> u64 {
1889 self.allocation_counter.fetch_add(1, Ordering::Relaxed)
1890 }
1891
1892 pub fn track_deallocation(&self, id: u64) {
1894 let mut allocations = self.allocations.write();
1895 if let Some(alloc) = allocations.remove(&id) {
1896 self.total_allocated
1897 .fetch_sub(alloc.size as u64, Ordering::Relaxed);
1898 }
1899 }
1900
1901 pub fn mark_unused(&self, id: u64) {
1903 let mut allocations = self.allocations.write();
1904 if let Some(alloc) = allocations.get_mut(&id) {
1905 alloc.in_use = false;
1906 }
1907 }
1908
1909 pub fn register_device(&self, device_index: u32, name: impl Into<String>, total_memory: u64) {
1911 let stats = GpuDeviceMemoryStats {
1912 device_index,
1913 device_name: name.into(),
1914 total_memory,
1915 free_memory: total_memory,
1916 ringkernel_used: 0,
1917 other_used: 0,
1918 pools: Vec::new(),
1919 };
1920 self.device_stats.write().insert(device_index, stats);
1921 }
1922
1923 pub fn update_device_stats(&self, device_index: u32, free_memory: u64, ringkernel_used: u64) {
1925 let mut stats = self.device_stats.write();
1926 if let Some(device) = stats.get_mut(&device_index) {
1927 device.free_memory = free_memory;
1928 device.ringkernel_used = ringkernel_used;
1929 device.other_used = device
1930 .total_memory
1931 .saturating_sub(free_memory + ringkernel_used);
1932 }
1933 }
1934
1935 pub fn get_device_stats(&self, device_index: u32) -> Option<GpuDeviceMemoryStats> {
1937 self.device_stats.read().get(&device_index).cloned()
1938 }
1939
1940 pub fn get_all_device_stats(&self) -> Vec<GpuDeviceMemoryStats> {
1942 self.device_stats.read().values().cloned().collect()
1943 }
1944
1945 pub fn get_allocations(&self) -> Vec<GpuMemoryAllocation> {
1947 self.allocations.read().values().cloned().collect()
1948 }
1949
1950 pub fn get_kernel_allocations(&self, kernel_id: &str) -> Vec<GpuMemoryAllocation> {
1952 self.allocations
1953 .read()
1954 .values()
1955 .filter(|a| a.kernel_id.as_deref() == Some(kernel_id))
1956 .cloned()
1957 .collect()
1958 }
1959
1960 pub fn total_allocated(&self) -> u64 {
1962 self.total_allocated.load(Ordering::Relaxed)
1963 }
1964
1965 pub fn peak_allocated(&self) -> u64 {
1967 self.peak_allocated.load(Ordering::Relaxed)
1968 }
1969
1970 pub fn allocation_count(&self) -> usize {
1972 self.allocations.read().len()
1973 }
1974
1975 pub fn check_pressure(&self, device_index: u32) -> MemoryPressureLevel {
1977 let stats = self.device_stats.read();
1978 if let Some(device) = stats.get(&device_index) {
1979 let utilization = device.utilization();
1980 if device.free_memory == 0 {
1981 MemoryPressureLevel::OutOfMemory
1982 } else if utilization >= self.thresholds.critical {
1983 MemoryPressureLevel::Critical
1984 } else if utilization >= self.thresholds.warning {
1985 MemoryPressureLevel::Warning
1986 } else if utilization >= self.thresholds.warning * 0.8 {
1987 MemoryPressureLevel::Elevated
1988 } else {
1989 MemoryPressureLevel::Normal
1990 }
1991 } else {
1992 MemoryPressureLevel::Normal
1993 }
1994 }
1995
1996 pub fn grafana_panel(&self) -> GrafanaPanel {
1998 GrafanaPanel {
1999 title: "GPU Memory Usage".to_string(),
2000 panel_type: PanelType::BarGauge,
2001 queries: vec![
2002 "ringkernel_gpu_memory_allocated_bytes".to_string(),
2003 "ringkernel_gpu_memory_peak_bytes".to_string(),
2004 ],
2005 grid_pos: (0, 0, 12, 8),
2006 unit: Some("bytes".to_string()),
2007 }
2008 }
2009
2010 pub fn prometheus_metrics(&self) -> String {
2014 let mut output = String::new();
2015
2016 writeln!(output, "# HELP ringkernel_gpu_memory_allocated_bytes Current GPU memory allocated by RingKernel").expect("write to String is infallible");
2018 writeln!(output, "# TYPE ringkernel_gpu_memory_allocated_bytes gauge")
2019 .expect("write to String is infallible");
2020 writeln!(
2021 output,
2022 "ringkernel_gpu_memory_allocated_bytes {}",
2023 self.total_allocated()
2024 )
2025 .expect("write to String is infallible");
2026
2027 writeln!(
2029 output,
2030 "# HELP ringkernel_gpu_memory_peak_bytes Peak GPU memory allocated by RingKernel"
2031 )
2032 .expect("write to String is infallible");
2033 writeln!(output, "# TYPE ringkernel_gpu_memory_peak_bytes gauge")
2034 .expect("write to String is infallible");
2035 writeln!(
2036 output,
2037 "ringkernel_gpu_memory_peak_bytes {}",
2038 self.peak_allocated()
2039 )
2040 .expect("write to String is infallible");
2041
2042 writeln!(
2044 output,
2045 "# HELP ringkernel_gpu_memory_allocation_count Number of active GPU allocations"
2046 )
2047 .expect("write to String is infallible");
2048 writeln!(
2049 output,
2050 "# TYPE ringkernel_gpu_memory_allocation_count gauge"
2051 )
2052 .expect("write to String is infallible");
2053 writeln!(
2054 output,
2055 "ringkernel_gpu_memory_allocation_count {}",
2056 self.allocation_count()
2057 )
2058 .expect("write to String is infallible");
2059
2060 let device_stats = self.device_stats.read();
2062 for device in device_stats.values() {
2063 writeln!(
2064 output,
2065 "ringkernel_gpu_device_memory_total_bytes{{device=\"{}\"}} {}",
2066 device.device_name, device.total_memory
2067 )
2068 .expect("write to String is infallible");
2069 writeln!(
2070 output,
2071 "ringkernel_gpu_device_memory_free_bytes{{device=\"{}\"}} {}",
2072 device.device_name, device.free_memory
2073 )
2074 .expect("write to String is infallible");
2075 writeln!(
2076 output,
2077 "ringkernel_gpu_device_memory_used_bytes{{device=\"{}\"}} {}",
2078 device.device_name,
2079 device.used_memory()
2080 )
2081 .expect("write to String is infallible");
2082 writeln!(
2083 output,
2084 "ringkernel_gpu_device_utilization{{device=\"{}\"}} {:.2}",
2085 device.device_name,
2086 device.utilization()
2087 )
2088 .expect("write to String is infallible");
2089 }
2090
2091 output
2092 }
2093
2094 pub fn summary_report(&self) -> String {
2098 let mut report = String::new();
2099
2100 writeln!(report, "=== GPU Memory Dashboard ===").expect("write to String is infallible");
2101 writeln!(report, "Total Allocated: {} bytes", self.total_allocated())
2102 .expect("write to String is infallible");
2103 writeln!(report, "Peak Allocated: {} bytes", self.peak_allocated())
2104 .expect("write to String is infallible");
2105 writeln!(report, "Active Allocations: {}", self.allocation_count())
2106 .expect("write to String is infallible");
2107 writeln!(report).expect("write to String is infallible");
2108
2109 let device_stats = self.device_stats.read();
2111 for device in device_stats.values() {
2112 writeln!(
2113 report,
2114 "--- Device {} ({}) ---",
2115 device.device_index, device.device_name
2116 )
2117 .expect("write to String is infallible");
2118 writeln!(
2119 report,
2120 " Total: {} MB",
2121 device.total_memory / (1024 * 1024)
2122 )
2123 .expect("write to String is infallible");
2124 writeln!(report, " Free: {} MB", device.free_memory / (1024 * 1024))
2125 .expect("write to String is infallible");
2126 writeln!(
2127 report,
2128 " RingKernel: {} MB",
2129 device.ringkernel_used / (1024 * 1024)
2130 )
2131 .expect("write to String is infallible");
2132 writeln!(report, " Utilization: {:.1}%", device.utilization())
2133 .expect("write to String is infallible");
2134 writeln!(
2135 report,
2136 " Pressure: {:?}",
2137 self.check_pressure(device.device_index)
2138 )
2139 .expect("write to String is infallible");
2140 }
2141
2142 let allocations = self.allocations.read();
2144 let mut sorted_allocs: Vec<_> = allocations.values().collect();
2145 sorted_allocs.sort_by_key(|a| std::cmp::Reverse(a.size));
2146
2147 if !sorted_allocs.is_empty() {
2148 writeln!(report).expect("write to String is infallible");
2149 writeln!(report, "--- Top 10 Allocations ---").expect("write to String is infallible");
2150 for (i, alloc) in sorted_allocs.iter().take(10).enumerate() {
2151 writeln!(
2152 report,
2153 " {}. {} - {} bytes ({:?})",
2154 i + 1,
2155 alloc.name,
2156 alloc.size,
2157 alloc.memory_type
2158 )
2159 .expect("write to String is infallible");
2160 }
2161 }
2162
2163 report
2164 }
2165}
2166
2167impl Default for GpuMemoryDashboard {
2168 fn default() -> Self {
2169 Self {
2170 allocations: RwLock::new(HashMap::new()),
2171 device_stats: RwLock::new(HashMap::new()),
2172 thresholds: GpuMemoryThresholds::default(),
2173 allocation_counter: AtomicU64::new(1),
2174 total_allocated: AtomicU64::new(0),
2175 peak_allocated: AtomicU64::new(0),
2176 }
2177 }
2178}
2179
2180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2186pub enum OtlpTransport {
2187 #[default]
2189 HttpJson,
2190 HttpProtobuf,
2192 Grpc,
2194}
2195
2196#[derive(Debug, Clone)]
2198pub struct OtlpConfig {
2199 pub endpoint: String,
2201 pub transport: OtlpTransport,
2203 pub service_name: String,
2205 pub service_version: String,
2207 pub service_instance_id: Option<String>,
2209 pub resource_attributes: Vec<(String, String)>,
2211 pub batch_size: usize,
2213 pub export_interval: Duration,
2215 pub timeout: Duration,
2217 pub max_retries: u32,
2219 pub retry_delay: Duration,
2221 pub authorization: Option<String>,
2223}
2224
2225impl Default for OtlpConfig {
2226 fn default() -> Self {
2227 Self {
2228 endpoint: "http://localhost:4318/v1/traces".to_string(),
2229 transport: OtlpTransport::HttpJson,
2230 service_name: "ringkernel".to_string(),
2231 service_version: env!("CARGO_PKG_VERSION").to_string(),
2232 service_instance_id: None,
2233 resource_attributes: Vec::new(),
2234 batch_size: 512,
2235 export_interval: Duration::from_secs(5),
2236 timeout: Duration::from_secs(30),
2237 max_retries: 3,
2238 retry_delay: Duration::from_millis(100),
2239 authorization: None,
2240 }
2241 }
2242}
2243
2244impl OtlpConfig {
2245 pub fn new(endpoint: impl Into<String>) -> Self {
2247 Self {
2248 endpoint: endpoint.into(),
2249 ..Default::default()
2250 }
2251 }
2252
2253 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
2255 self.service_name = name.into();
2256 self
2257 }
2258
2259 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
2261 self.service_version = version.into();
2262 self
2263 }
2264
2265 pub fn with_instance_id(mut self, id: impl Into<String>) -> Self {
2267 self.service_instance_id = Some(id.into());
2268 self
2269 }
2270
2271 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2273 self.resource_attributes.push((key.into(), value.into()));
2274 self
2275 }
2276
2277 pub fn with_batch_size(mut self, size: usize) -> Self {
2279 self.batch_size = size;
2280 self
2281 }
2282
2283 pub fn with_export_interval(mut self, interval: Duration) -> Self {
2285 self.export_interval = interval;
2286 self
2287 }
2288
2289 pub fn with_authorization(mut self, auth: impl Into<String>) -> Self {
2291 self.authorization = Some(auth.into());
2292 self
2293 }
2294
2295 pub fn jaeger(endpoint: impl Into<String>) -> Self {
2297 Self::new(endpoint).with_service_name("ringkernel")
2298 }
2299
2300 pub fn honeycomb(api_key: impl Into<String>) -> Self {
2302 Self::new("https://api.honeycomb.io/v1/traces")
2303 .with_authorization(format!("x-honeycomb-team {}", api_key.into()))
2304 }
2305
2306 pub fn grafana_cloud(instance_id: impl Into<String>, api_key: impl Into<String>) -> Self {
2308 let instance = instance_id.into();
2309 Self::new("https://otlp-gateway-prod-us-central-0.grafana.net/otlp/v1/traces")
2310 .with_authorization(format!("Basic {}", api_key.into()))
2311 .with_attribute("grafana.instance", instance)
2312 }
2313}
2314
2315#[derive(Debug, Clone)]
2317pub struct OtlpExportResult {
2318 pub spans_exported: usize,
2320 pub success: bool,
2322 pub error: Option<String>,
2324 pub duration: Duration,
2326 pub retry_count: u32,
2328}
2329
2330#[derive(Debug, Clone, Default)]
2332pub struct OtlpExporterStats {
2333 pub total_spans_exported: u64,
2335 pub total_exports: u64,
2337 pub successful_exports: u64,
2339 pub failed_exports: u64,
2341 pub total_retries: u64,
2343 pub buffered_spans: usize,
2345 pub last_export: Option<Instant>,
2347 pub last_error: Option<String>,
2349}
2350
2351pub struct OtlpExporter {
2355 config: OtlpConfig,
2356 buffer: RwLock<Vec<Span>>,
2357 stats: RwLock<OtlpExporterStats>,
2358}
2359
2360impl OtlpExporter {
2361 pub fn new(config: OtlpConfig) -> Self {
2363 Self {
2364 config,
2365 buffer: RwLock::new(Vec::new()),
2366 stats: RwLock::new(OtlpExporterStats::default()),
2367 }
2368 }
2369
2370 pub fn jaeger_local() -> Self {
2372 Self::new(OtlpConfig::jaeger("http://localhost:4318/v1/traces"))
2373 }
2374
2375 pub fn config(&self) -> &OtlpConfig {
2377 &self.config
2378 }
2379
2380 pub fn stats(&self) -> OtlpExporterStats {
2382 self.stats.read().clone()
2383 }
2384
2385 pub fn export_span(&self, span: Span) {
2387 let mut buffer = self.buffer.write();
2388 buffer.push(span);
2389
2390 let should_flush = buffer.len() >= self.config.batch_size;
2391 drop(buffer);
2392
2393 if should_flush {
2394 let _ = self.flush();
2395 }
2396 }
2397
2398 pub fn export_spans(&self, spans: Vec<Span>) {
2400 let mut buffer = self.buffer.write();
2401 buffer.extend(spans);
2402
2403 let should_flush = buffer.len() >= self.config.batch_size;
2404 drop(buffer);
2405
2406 if should_flush {
2407 let _ = self.flush();
2408 }
2409 }
2410
2411 pub fn buffered_count(&self) -> usize {
2413 self.buffer.read().len()
2414 }
2415
2416 pub fn flush(&self) -> OtlpExportResult {
2418 let spans: Vec<Span> = {
2419 let mut buffer = self.buffer.write();
2420 std::mem::take(&mut *buffer)
2421 };
2422
2423 if spans.is_empty() {
2424 return OtlpExportResult {
2425 spans_exported: 0,
2426 success: true,
2427 error: None,
2428 duration: Duration::ZERO,
2429 retry_count: 0,
2430 };
2431 }
2432
2433 let start = Instant::now();
2434 let result = self.send_spans(&spans);
2435 let duration = start.elapsed();
2436
2437 {
2439 let mut stats = self.stats.write();
2440 stats.total_exports += 1;
2441 stats.last_export = Some(Instant::now());
2442
2443 if result.success {
2444 stats.successful_exports += 1;
2445 stats.total_spans_exported += spans.len() as u64;
2446 } else {
2447 stats.failed_exports += 1;
2448 stats.last_error = result.error.clone();
2449 let mut buffer = self.buffer.write();
2451 buffer.extend(spans);
2452 }
2453 stats.total_retries += result.retry_count as u64;
2454 stats.buffered_spans = self.buffer.read().len();
2455 }
2456
2457 OtlpExportResult {
2458 spans_exported: if result.success {
2459 result.spans_exported
2460 } else {
2461 0
2462 },
2463 duration,
2464 ..result
2465 }
2466 }
2467
2468 fn send_spans(&self, spans: &[Span]) -> OtlpExportResult {
2470 #[cfg(not(any(feature = "alerting", feature = "otel")))]
2472 {
2473 tracing::debug!(
2474 span_count = spans.len(),
2475 endpoint = %self.config.endpoint,
2476 "OTLP stub: would export spans (enable 'alerting' or 'otel' feature for HTTP export)"
2477 );
2478 OtlpExportResult {
2479 spans_exported: spans.len(),
2480 success: true,
2481 error: None,
2482 duration: Duration::ZERO,
2483 retry_count: 0,
2484 }
2485 }
2486
2487 #[cfg(any(feature = "alerting", feature = "otel"))]
2488 {
2489 self.send_spans_http(spans)
2490 }
2491 }
2492
2493 #[cfg(any(feature = "alerting", feature = "otel"))]
2495 fn send_spans_http(&self, spans: &[Span]) -> OtlpExportResult {
2496 let payload = self.build_otlp_json(spans);
2497
2498 let client = reqwest::blocking::Client::builder()
2499 .timeout(self.config.timeout)
2500 .build();
2501
2502 let client = match client {
2503 Ok(c) => c,
2504 Err(e) => {
2505 return OtlpExportResult {
2506 spans_exported: 0,
2507 success: false,
2508 error: Some(format!("Failed to create HTTP client: {}", e)),
2509 duration: Duration::ZERO,
2510 retry_count: 0,
2511 };
2512 }
2513 };
2514
2515 let mut retry_count = 0;
2516 let mut last_error = None;
2517
2518 for attempt in 0..=self.config.max_retries {
2519 let mut request = client
2520 .post(&self.config.endpoint)
2521 .header("Content-Type", "application/json")
2522 .body(payload.clone());
2523
2524 if let Some(auth) = &self.config.authorization {
2525 request = request.header("Authorization", auth);
2526 }
2527
2528 match request.send() {
2529 Ok(response) => {
2530 if response.status().is_success() {
2531 return OtlpExportResult {
2532 spans_exported: spans.len(),
2533 success: true,
2534 error: None,
2535 duration: Duration::ZERO,
2536 retry_count,
2537 };
2538 } else {
2539 last_error = Some(format!(
2540 "HTTP {}: {}",
2541 response.status(),
2542 response.status().as_str()
2543 ));
2544 }
2545 }
2546 Err(e) => {
2547 last_error = Some(format!("Request failed: {}", e));
2548 }
2549 }
2550
2551 if attempt < self.config.max_retries {
2552 retry_count += 1;
2553 std::thread::sleep(self.config.retry_delay * (1 << attempt));
2554 }
2555 }
2556
2557 OtlpExportResult {
2558 spans_exported: 0,
2559 success: false,
2560 error: last_error,
2561 duration: Duration::ZERO,
2562 retry_count,
2563 }
2564 }
2565
2566 #[cfg(any(feature = "alerting", feature = "otel"))]
2568 fn build_otlp_json(&self, spans: &[Span]) -> String {
2569 use std::fmt::Write;
2570
2571 let mut json = String::with_capacity(4096);
2572
2573 json.push_str(r#"{"resourceSpans":[{"resource":{"attributes":["#);
2575
2576 let _ = write!(
2578 json,
2579 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}}"#,
2580 escape_json_str(&self.config.service_name)
2581 );
2582
2583 let _ = write!(
2585 json,
2586 r#",{{"key":"service.version","value":{{"stringValue":"{}"}}}}"#,
2587 escape_json_str(&self.config.service_version)
2588 );
2589
2590 if let Some(instance_id) = &self.config.service_instance_id {
2592 let _ = write!(
2593 json,
2594 r#",{{"key":"service.instance.id","value":{{"stringValue":"{}"}}}}"#,
2595 escape_json_str(instance_id)
2596 );
2597 }
2598
2599 for (key, value) in &self.config.resource_attributes {
2601 let _ = write!(
2602 json,
2603 r#",{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
2604 escape_json_str(key),
2605 escape_json_str(value)
2606 );
2607 }
2608
2609 json.push_str(r#"]},"scopeSpans":[{"scope":{"name":"ringkernel"},"spans":["#);
2610
2611 let mut first = true;
2613 for span in spans {
2614 if !first {
2615 json.push(',');
2616 }
2617 first = false;
2618 self.span_to_json(&mut json, span);
2619 }
2620
2621 json.push_str("]}]}]}");
2622 json
2623 }
2624
2625 #[cfg(any(feature = "alerting", feature = "otel"))]
2627 fn span_to_json(&self, json: &mut String, span: &Span) {
2628 use std::fmt::Write;
2629
2630 let _ = write!(
2631 json,
2632 r#"{{"traceId":"{}","spanId":"{}""#,
2633 span.trace_id.to_hex(),
2634 span.span_id.to_hex()
2635 );
2636
2637 if let Some(parent) = span.parent_span_id {
2638 let _ = write!(json, r#","parentSpanId":"{}""#, parent.to_hex());
2639 }
2640
2641 let _ = write!(
2642 json,
2643 r#","name":"{}","kind":{}"#,
2644 escape_json_str(&span.name),
2645 match span.kind {
2646 SpanKind::Internal => 1,
2647 SpanKind::Server => 2,
2648 SpanKind::Client => 3,
2649 SpanKind::Producer => 4,
2650 SpanKind::Consumer => 5,
2651 }
2652 );
2653
2654 let start_nanos = span.start_time.elapsed().as_nanos();
2656 let end_nanos = span
2657 .end_time
2658 .map(|t| t.elapsed().as_nanos())
2659 .unwrap_or(start_nanos);
2660
2661 let _ = write!(
2663 json,
2664 r#","startTimeUnixNano":"{}","endTimeUnixNano":"{}""#,
2665 start_nanos, end_nanos
2666 );
2667
2668 let _ = write!(
2670 json,
2671 r#","status":{{"code":{}}}"#,
2672 match &span.status {
2673 SpanStatus::Unset => 0,
2674 SpanStatus::Ok => 1,
2675 SpanStatus::Error { .. } => 2,
2676 }
2677 );
2678
2679 if !span.attributes.is_empty() {
2681 json.push_str(r#","attributes":["#);
2682 let mut first = true;
2683 for (key, value) in &span.attributes {
2684 if !first {
2685 json.push(',');
2686 }
2687 first = false;
2688 let _ = write!(
2689 json,
2690 r#"{{"key":"{}","value":{}}}"#,
2691 escape_json_str(key),
2692 attribute_value_to_json(value)
2693 );
2694 }
2695 json.push(']');
2696 }
2697
2698 if !span.events.is_empty() {
2700 json.push_str(r#","events":["#);
2701 let mut first = true;
2702 for event in &span.events {
2703 if !first {
2704 json.push(',');
2705 }
2706 first = false;
2707 let _ = write!(
2708 json,
2709 r#"{{"name":"{}","timeUnixNano":"{}"}}"#,
2710 escape_json_str(&event.name),
2711 event.timestamp.elapsed().as_nanos()
2712 );
2713 }
2714 json.push(']');
2715 }
2716
2717 json.push('}');
2718 }
2719}
2720
2721#[cfg(any(feature = "alerting", feature = "otel"))]
2723fn escape_json_str(s: &str) -> String {
2724 s.replace('\\', "\\\\")
2725 .replace('"', "\\\"")
2726 .replace('\n', "\\n")
2727 .replace('\r', "\\r")
2728 .replace('\t', "\\t")
2729}
2730
2731#[cfg(any(feature = "alerting", feature = "otel"))]
2733fn attribute_value_to_json(value: &AttributeValue) -> String {
2734 match value {
2735 AttributeValue::String(s) => format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)),
2736 AttributeValue::Int(i) => format!(r#"{{"intValue":"{}"}}"#, i),
2737 AttributeValue::Float(f) => format!(r#"{{"doubleValue":{}}}"#, f),
2738 AttributeValue::Bool(b) => format!(r#"{{"boolValue":{}}}"#, b),
2739 AttributeValue::StringArray(arr) => {
2740 let values: Vec<String> = arr
2741 .iter()
2742 .map(|s| format!(r#"{{"stringValue":"{}"}}"#, escape_json_str(s)))
2743 .collect();
2744 format!(r#"{{"arrayValue":{{"values":[{}]}}}}"#, values.join(","))
2745 }
2746 }
2747}
2748
2749#[cfg(test)]
2750mod tests {
2751 use super::*;
2752 use crate::runtime::KernelId;
2753
2754 #[test]
2755 fn test_trace_id_generation() {
2756 let id1 = TraceId::new();
2757 let id2 = TraceId::new();
2758 assert_ne!(id1.0, id2.0);
2759 }
2760
2761 #[test]
2762 fn test_trace_id_hex() {
2763 let id = TraceId(0x123456789abcdef0123456789abcdef0);
2764 let hex = id.to_hex();
2765 assert_eq!(hex.len(), 32);
2766 let parsed = TraceId::from_hex(&hex).unwrap();
2767 assert_eq!(id, parsed);
2768 }
2769
2770 #[test]
2771 fn test_span_creation() {
2772 let span = Span::new("test_operation", SpanKind::Internal);
2773 assert!(!span.is_ended());
2774 assert_eq!(span.name, "test_operation");
2775 }
2776
2777 #[test]
2778 fn test_span_child() {
2779 let parent = Span::new("parent", SpanKind::Server);
2780 let child = parent.child("child", SpanKind::Internal);
2781
2782 assert_eq!(child.trace_id, parent.trace_id);
2783 assert_eq!(child.parent_span_id, Some(parent.span_id));
2784 }
2785
2786 #[test]
2787 fn test_span_attributes() {
2788 let mut span = Span::new("test", SpanKind::Internal);
2789 span.set_attribute("string_key", "value");
2790 span.set_attribute("int_key", 42i64);
2791 span.set_attribute("bool_key", true);
2792
2793 assert_eq!(span.attributes.len(), 3);
2794 }
2795
2796 #[test]
2797 fn test_span_events() {
2798 let mut span = Span::new("test", SpanKind::Internal);
2799 span.add_event("event1");
2800 span.add_event("event2");
2801
2802 assert_eq!(span.events.len(), 2);
2803 }
2804
2805 #[test]
2806 fn test_span_builder() {
2807 let parent = Span::new("parent", SpanKind::Server);
2808 let span = SpanBuilder::new("child")
2809 .kind(SpanKind::Client)
2810 .parent(&parent)
2811 .attribute("key", "value")
2812 .build();
2813
2814 assert_eq!(span.trace_id, parent.trace_id);
2815 assert_eq!(span.kind, SpanKind::Client);
2816 assert!(span.attributes.contains_key("key"));
2817 }
2818
2819 #[test]
2820 fn test_prometheus_exporter() {
2821 let exporter = PrometheusExporter::new();
2822 exporter.register_counter("test_counter", "A test counter", &["label1"]);
2823 exporter.register_gauge("test_gauge", "A test gauge", &[]);
2824
2825 exporter.inc_counter("test_counter", &["value1"]);
2826 exporter.inc_counter("test_counter", &["value1"]);
2827 exporter.set_metric("test_gauge", 42.0, &[]);
2828
2829 let output = exporter.render();
2830 assert!(output.contains("test_counter"));
2831 assert!(output.contains("test_gauge"));
2832 }
2833
2834 #[test]
2835 fn test_grafana_dashboard() {
2836 let dashboard = GrafanaDashboard::new("Test Dashboard")
2837 .description("A test dashboard")
2838 .add_throughput_panel()
2839 .add_latency_panel()
2840 .build();
2841
2842 assert!(dashboard.contains("Test Dashboard"));
2843 assert!(dashboard.contains("Message Throughput"));
2844 assert!(dashboard.contains("Message Latency"));
2845 }
2846
2847 #[test]
2848 fn test_observability_context() {
2849 let ctx = ObservabilityContext::new();
2850
2851 let span = ctx.start_span("test", SpanKind::Internal);
2852 assert_eq!(ctx.active_span_count(), 1);
2853
2854 ctx.end_span(span);
2855 assert_eq!(ctx.active_span_count(), 0);
2856
2857 let exported = ctx.export_spans();
2858 assert_eq!(exported.len(), 1);
2859 }
2860
2861 #[test]
2862 fn test_ringkernel_collector() {
2863 let collector = Arc::new(MetricsCollector::new());
2864 let kernel_id = KernelId::new("test");
2865
2866 collector.record_message_processed(&kernel_id, 100);
2867 collector.record_message_processed(&kernel_id, 200);
2868
2869 let prom_collector = RingKernelCollector::new(collector);
2870 let defs = prom_collector.definitions();
2871 let samples = prom_collector.collect();
2872
2873 assert!(!defs.is_empty());
2874 assert!(!samples.is_empty());
2875 }
2876
2877 #[test]
2880 fn test_profiler_color() {
2881 let color = ProfilerColor::new(128, 64, 32);
2882 assert_eq!(color.r, 128);
2883 assert_eq!(color.g, 64);
2884 assert_eq!(color.b, 32);
2885 assert_eq!(color.a, 255);
2886
2887 assert_eq!(ProfilerColor::RED.r, 255);
2888 assert_eq!(ProfilerColor::GREEN.g, 255);
2889 assert_eq!(ProfilerColor::BLUE.b, 255);
2890 }
2891
2892 #[test]
2893 fn test_null_profiler() {
2894 let profiler = NullProfiler;
2895 assert!(!profiler.is_available());
2896 assert_eq!(profiler.backend(), GpuProfilerBackend::Custom);
2897
2898 assert!(profiler.start_capture().is_ok());
2900 assert!(profiler.end_capture().is_ok());
2901 assert!(profiler.trigger_capture().is_ok());
2902
2903 let range = profiler.push_range("test", ProfilerColor::RED);
2904 let _elapsed = range.elapsed(); profiler.pop_range();
2906 profiler.mark("marker", ProfilerColor::BLUE);
2907 profiler.set_thread_name("thread");
2908 }
2909
2910 #[test]
2911 fn test_nvtx_profiler_stub() {
2912 let profiler = NvtxProfiler::new();
2913 assert_eq!(profiler.backend(), GpuProfilerBackend::Nsight);
2914
2915 assert!(!profiler.is_available());
2917 assert!(!profiler.is_nvtx_loaded());
2918
2919 assert!(matches!(
2921 profiler.start_capture(),
2922 Err(ProfilerError::NotAvailable)
2923 ));
2924 }
2925
2926 #[test]
2927 fn test_renderdoc_profiler_stub() {
2928 let profiler = RenderDocProfiler::new();
2929 assert_eq!(profiler.backend(), GpuProfilerBackend::RenderDoc);
2930
2931 assert!(!profiler.is_available());
2933 assert!(!profiler.is_attached());
2934 assert!(profiler.get_capture_path().is_none());
2935
2936 assert!(matches!(
2938 profiler.launch_ui(),
2939 Err(ProfilerError::NotAttached)
2940 ));
2941 }
2942
2943 #[test]
2944 fn test_gpu_profiler_manager() {
2945 let manager = GpuProfilerManager::new();
2946
2947 assert!(!manager.is_enabled());
2949 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2950
2951 manager.set_enabled(true);
2953 assert!(manager.is_enabled());
2954 manager.set_enabled(false);
2955 assert!(!manager.is_enabled());
2956 }
2957
2958 #[test]
2959 fn test_profiler_scope() {
2960 let manager = GpuProfilerManager::new();
2961
2962 {
2964 let _scope = manager.scope("test_scope");
2965 }
2967
2968 {
2969 let _scope = manager.scope_colored("colored_scope", ProfilerColor::ORANGE);
2970 }
2971
2972 manager.mark("test_marker");
2974 }
2975
2976 #[test]
2977 fn test_profiler_with_custom() {
2978 let custom_profiler = Arc::new(NullProfiler);
2979 let manager = GpuProfilerManager::with_profiler(custom_profiler);
2980
2981 assert_eq!(manager.backend(), GpuProfilerBackend::Custom);
2982 }
2983
2984 #[test]
2985 fn test_profiler_range_elapsed() {
2986 let range = ProfilerRange::new("test", GpuProfilerBackend::Custom);
2987 std::thread::sleep(std::time::Duration::from_millis(10));
2988 let elapsed = range.elapsed();
2989 assert!(elapsed.as_millis() >= 10);
2990 }
2991
2992 #[test]
2993 fn test_profiler_error_display() {
2994 let err = ProfilerError::NotAvailable;
2995 assert!(err.to_string().contains("not available"));
2996
2997 let err = ProfilerError::NotAttached;
2998 assert!(err.to_string().contains("not attached"));
2999
3000 let err = ProfilerError::CaptureInProgress;
3001 assert!(err.to_string().contains("in progress"));
3002
3003 let err = ProfilerError::Backend("test error".to_string());
3004 assert!(err.to_string().contains("test error"));
3005 }
3006
3007 #[test]
3010 fn test_gpu_memory_dashboard_creation() {
3011 let dashboard = GpuMemoryDashboard::new();
3012 assert_eq!(dashboard.total_allocated(), 0);
3013 assert_eq!(dashboard.peak_allocated(), 0);
3014 assert_eq!(dashboard.allocation_count(), 0);
3015 }
3016
3017 #[test]
3018 fn test_gpu_memory_allocation_tracking() {
3019 let dashboard = GpuMemoryDashboard::new();
3020
3021 dashboard.track_allocation(
3023 1,
3024 "test_buffer",
3025 65536,
3026 GpuMemoryType::DeviceLocal,
3027 0,
3028 Some("test_kernel"),
3029 );
3030
3031 assert_eq!(dashboard.total_allocated(), 65536);
3032 assert_eq!(dashboard.peak_allocated(), 65536);
3033 assert_eq!(dashboard.allocation_count(), 1);
3034
3035 dashboard.track_allocation(
3037 2,
3038 "queue_buffer",
3039 1024,
3040 GpuMemoryType::QueueBuffer,
3041 0,
3042 Some("test_kernel"),
3043 );
3044
3045 assert_eq!(dashboard.total_allocated(), 66560);
3046 assert_eq!(dashboard.peak_allocated(), 66560);
3047 assert_eq!(dashboard.allocation_count(), 2);
3048
3049 dashboard.track_deallocation(1);
3051 assert_eq!(dashboard.total_allocated(), 1024);
3052 assert_eq!(dashboard.peak_allocated(), 66560); assert_eq!(dashboard.allocation_count(), 1);
3054 }
3055
3056 #[test]
3057 fn test_gpu_memory_device_stats() {
3058 let dashboard = GpuMemoryDashboard::new();
3059
3060 dashboard.register_device(0, "NVIDIA RTX 4090", 24 * 1024 * 1024 * 1024); let stats = dashboard.get_device_stats(0).unwrap();
3064 assert_eq!(stats.device_index, 0);
3065 assert_eq!(stats.device_name, "NVIDIA RTX 4090");
3066 assert_eq!(stats.total_memory, 24 * 1024 * 1024 * 1024);
3067 assert_eq!(stats.utilization(), 0.0);
3068
3069 let used = 8 * 1024 * 1024 * 1024; let free = 16 * 1024 * 1024 * 1024; dashboard.update_device_stats(0, free, used);
3073
3074 let stats = dashboard.get_device_stats(0).unwrap();
3075 assert!(stats.utilization() > 30.0 && stats.utilization() < 35.0);
3076 }
3077
3078 #[test]
3079 fn test_gpu_memory_pressure_levels() {
3080 let dashboard = GpuMemoryDashboard::new();
3081
3082 dashboard.register_device(0, "Test GPU", 1024 * 1024 * 1024);
3084
3085 dashboard.update_device_stats(0, 512 * 1024 * 1024, 256 * 1024 * 1024);
3087 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Normal);
3088
3089 dashboard.update_device_stats(0, 200 * 1024 * 1024, 600 * 1024 * 1024);
3091 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Warning);
3092
3093 dashboard.update_device_stats(0, 50 * 1024 * 1024, 900 * 1024 * 1024);
3095 assert_eq!(dashboard.check_pressure(0), MemoryPressureLevel::Critical);
3096
3097 dashboard.update_device_stats(0, 0, 1024 * 1024 * 1024);
3099 assert_eq!(
3100 dashboard.check_pressure(0),
3101 MemoryPressureLevel::OutOfMemory
3102 );
3103 }
3104
3105 #[test]
3106 fn test_gpu_memory_kernel_allocations() {
3107 let dashboard = GpuMemoryDashboard::new();
3108
3109 dashboard.track_allocation(
3111 1,
3112 "buf1",
3113 1000,
3114 GpuMemoryType::DeviceLocal,
3115 0,
3116 Some("kernel_a"),
3117 );
3118 dashboard.track_allocation(
3119 2,
3120 "buf2",
3121 2000,
3122 GpuMemoryType::DeviceLocal,
3123 0,
3124 Some("kernel_a"),
3125 );
3126 dashboard.track_allocation(
3127 3,
3128 "buf3",
3129 3000,
3130 GpuMemoryType::DeviceLocal,
3131 0,
3132 Some("kernel_b"),
3133 );
3134
3135 let kernel_a_allocs = dashboard.get_kernel_allocations("kernel_a");
3136 assert_eq!(kernel_a_allocs.len(), 2);
3137
3138 let kernel_b_allocs = dashboard.get_kernel_allocations("kernel_b");
3139 assert_eq!(kernel_b_allocs.len(), 1);
3140
3141 let kernel_c_allocs = dashboard.get_kernel_allocations("kernel_c");
3142 assert_eq!(kernel_c_allocs.len(), 0);
3143 }
3144
3145 #[test]
3146 fn test_gpu_memory_prometheus_metrics() {
3147 let dashboard = GpuMemoryDashboard::new();
3148 dashboard.track_allocation(1, "buf", 1000, GpuMemoryType::DeviceLocal, 0, None);
3149 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3150
3151 let metrics = dashboard.prometheus_metrics();
3152 assert!(metrics.contains("ringkernel_gpu_memory_allocated_bytes"));
3153 assert!(metrics.contains("ringkernel_gpu_memory_peak_bytes"));
3154 assert!(metrics.contains("ringkernel_gpu_memory_allocation_count"));
3155 }
3156
3157 #[test]
3158 fn test_gpu_memory_summary_report() {
3159 let dashboard = GpuMemoryDashboard::new();
3160 dashboard.track_allocation(
3161 1,
3162 "large_buffer",
3163 1024 * 1024,
3164 GpuMemoryType::DeviceLocal,
3165 0,
3166 None,
3167 );
3168 dashboard.register_device(0, "GPU0", 1024 * 1024 * 1024);
3169
3170 let report = dashboard.summary_report();
3171 assert!(report.contains("GPU Memory Dashboard"));
3172 assert!(report.contains("large_buffer"));
3173 }
3174
3175 #[test]
3176 fn test_gpu_memory_pool_stats() {
3177 let pool_stats = GpuMemoryPoolStats {
3178 name: "default".to_string(),
3179 capacity: 1024 * 1024,
3180 allocated: 512 * 1024,
3181 peak_allocated: 768 * 1024,
3182 allocation_count: 10,
3183 total_allocations: 100,
3184 total_deallocations: 90,
3185 fragmentation: 0.1,
3186 };
3187
3188 assert!(pool_stats.utilization() > 49.0 && pool_stats.utilization() < 51.0);
3189 }
3190
3191 #[test]
3192 fn test_gpu_memory_types() {
3193 let types = [
3195 GpuMemoryType::DeviceLocal,
3196 GpuMemoryType::HostVisible,
3197 GpuMemoryType::HostCoherent,
3198 GpuMemoryType::Mapped,
3199 GpuMemoryType::QueueBuffer,
3200 GpuMemoryType::ControlBlock,
3201 GpuMemoryType::SharedMemory,
3202 ];
3203
3204 for (i, t1) in types.iter().enumerate() {
3205 for (j, t2) in types.iter().enumerate() {
3206 if i != j {
3207 assert_ne!(t1, t2);
3208 }
3209 }
3210 }
3211 }
3212
3213 #[test]
3214 fn test_gpu_memory_grafana_panel() {
3215 let dashboard = GpuMemoryDashboard::new();
3216 let panel = dashboard.grafana_panel();
3217
3218 assert_eq!(panel.title, "GPU Memory Usage");
3219 assert_eq!(panel.panel_type, PanelType::BarGauge);
3220 assert!(!panel.queries.is_empty());
3221 }
3222
3223 #[test]
3224 fn test_gpu_memory_allocation_id_generation() {
3225 let dashboard = GpuMemoryDashboard::new();
3226
3227 let id1 = dashboard.next_allocation_id();
3228 let id2 = dashboard.next_allocation_id();
3229 let id3 = dashboard.next_allocation_id();
3230
3231 assert_eq!(id1, 1);
3232 assert_eq!(id2, 2);
3233 assert_eq!(id3, 3);
3234 }
3235
3236 #[test]
3239 fn test_otlp_config_default() {
3240 let config = OtlpConfig::default();
3241 assert_eq!(config.endpoint, "http://localhost:4318/v1/traces");
3242 assert_eq!(config.transport, OtlpTransport::HttpJson);
3243 assert_eq!(config.service_name, "ringkernel");
3244 assert_eq!(config.batch_size, 512);
3245 }
3246
3247 #[test]
3248 fn test_otlp_config_builder() {
3249 let config = OtlpConfig::new("http://example.com/v1/traces")
3250 .with_service_name("my-service")
3251 .with_service_version("1.0.0")
3252 .with_instance_id("instance-1")
3253 .with_attribute("env", "production")
3254 .with_batch_size(100);
3255
3256 assert_eq!(config.endpoint, "http://example.com/v1/traces");
3257 assert_eq!(config.service_name, "my-service");
3258 assert_eq!(config.service_version, "1.0.0");
3259 assert_eq!(config.service_instance_id, Some("instance-1".to_string()));
3260 assert_eq!(config.resource_attributes.len(), 1);
3261 assert_eq!(config.batch_size, 100);
3262 }
3263
3264 #[test]
3265 fn test_otlp_config_jaeger() {
3266 let config = OtlpConfig::jaeger("http://jaeger:4318/v1/traces");
3267 assert_eq!(config.endpoint, "http://jaeger:4318/v1/traces");
3268 assert_eq!(config.service_name, "ringkernel");
3269 }
3270
3271 #[test]
3272 fn test_otlp_config_honeycomb() {
3273 let config = OtlpConfig::honeycomb("my-api-key");
3274 assert_eq!(config.endpoint, "https://api.honeycomb.io/v1/traces");
3275 assert_eq!(
3276 config.authorization,
3277 Some("x-honeycomb-team my-api-key".to_string())
3278 );
3279 }
3280
3281 #[test]
3282 fn test_otlp_exporter_creation() {
3283 let exporter = OtlpExporter::new(OtlpConfig::default());
3284 assert_eq!(exporter.buffered_count(), 0);
3285 assert_eq!(exporter.config().service_name, "ringkernel");
3286 }
3287
3288 #[test]
3289 fn test_otlp_exporter_jaeger_local() {
3290 let exporter = OtlpExporter::jaeger_local();
3291 assert_eq!(
3292 exporter.config().endpoint,
3293 "http://localhost:4318/v1/traces"
3294 );
3295 }
3296
3297 #[test]
3298 fn test_otlp_exporter_buffering() {
3299 let config = OtlpConfig::default().with_batch_size(10);
3300 let exporter = OtlpExporter::new(config);
3301
3302 let span = Span::new("test_span", SpanKind::Internal);
3304
3305 for _ in 0..5 {
3307 exporter.export_span(span.clone());
3308 }
3309
3310 assert_eq!(exporter.buffered_count(), 5);
3311 }
3312
3313 #[test]
3314 fn test_otlp_exporter_flush_empty() {
3315 let exporter = OtlpExporter::new(OtlpConfig::default());
3316
3317 let result = exporter.flush();
3318 assert!(result.success);
3319 assert_eq!(result.spans_exported, 0);
3320 }
3321
3322 #[test]
3323 fn test_otlp_exporter_stats() {
3324 let exporter = OtlpExporter::new(OtlpConfig::default());
3325
3326 let stats = exporter.stats();
3328 assert_eq!(stats.total_exports, 0);
3329 assert_eq!(stats.total_spans_exported, 0);
3330 assert_eq!(stats.buffered_spans, 0);
3331 }
3332
3333 #[test]
3334 fn test_otlp_transport_default() {
3335 let transport = OtlpTransport::default();
3336 assert_eq!(transport, OtlpTransport::HttpJson);
3337 }
3338}