1use crate::domain::Domain;
35use crate::hlc::{HlcClock, HlcTimestamp};
36use crate::message::MessageEnvelope;
37use crate::types::{BlockId, Dim3, FenceScope, GlobalThreadId, MemoryOrder, ThreadId, WarpId};
38use tokio::sync::mpsc;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum MetricType {
47 Latency,
49 Throughput,
51 Counter,
53 Gauge,
55}
56
57#[derive(Debug, Clone)]
59pub struct MetricsEntry {
60 pub operation: String,
62 pub metric_type: MetricType,
64 pub value: u64,
66 pub timestamp: HlcTimestamp,
68 pub kernel_id: u64,
70 pub domain: Option<Domain>,
72}
73
74#[derive(Debug)]
76pub struct ContextMetricsBuffer {
77 entries: Vec<MetricsEntry>,
78 capacity: usize,
79}
80
81impl ContextMetricsBuffer {
82 pub fn new(capacity: usize) -> Self {
84 Self {
85 entries: Vec::with_capacity(capacity.min(1024)), capacity,
87 }
88 }
89
90 pub fn record(&mut self, entry: MetricsEntry) {
92 if self.entries.len() < self.capacity {
93 self.entries.push(entry);
94 }
95 }
98
99 pub fn drain(&mut self) -> Vec<MetricsEntry> {
101 std::mem::take(&mut self.entries)
102 }
103
104 pub fn is_full(&self) -> bool {
106 self.entries.len() >= self.capacity
107 }
108
109 pub fn len(&self) -> usize {
111 self.entries.len()
112 }
113
114 pub fn is_empty(&self) -> bool {
116 self.entries.is_empty()
117 }
118}
119
120impl Default for ContextMetricsBuffer {
121 fn default() -> Self {
122 Self::new(256)
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
132pub enum AlertSeverity {
133 Info = 0,
135 Warning = 1,
137 Error = 2,
139 Critical = 3,
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum KernelAlertType {
146 HighLatency,
148 QueuePressure,
150 MemoryPressure,
152 ProcessingError,
154 DomainAlert(u32),
156 Custom(u32),
158}
159
160#[derive(Debug, Clone, Copy, Default)]
162pub enum AlertRouting {
163 #[default]
165 Host,
166 Kernel(u64),
168 Domain,
170 External,
172}
173
174#[derive(Debug, Clone)]
176pub struct KernelAlert {
177 pub severity: AlertSeverity,
179 pub alert_type: KernelAlertType,
181 pub message: String,
183 pub source_kernel: u64,
185 pub source_domain: Option<Domain>,
187 pub timestamp: HlcTimestamp,
189 pub routing: AlertRouting,
191}
192
193impl KernelAlert {
194 pub fn new(
196 severity: AlertSeverity,
197 alert_type: KernelAlertType,
198 message: impl Into<String>,
199 ) -> Self {
200 Self {
201 severity,
202 alert_type,
203 message: message.into(),
204 source_kernel: 0,
205 source_domain: None,
206 timestamp: HlcTimestamp::zero(),
207 routing: AlertRouting::default(),
208 }
209 }
210
211 pub fn high_latency(message: impl Into<String>, latency_us: u64) -> Self {
213 Self::new(
214 AlertSeverity::Warning,
215 KernelAlertType::HighLatency,
216 format!("{} ({}µs)", message.into(), latency_us),
217 )
218 }
219
220 pub fn error(message: impl Into<String>) -> Self {
222 Self::new(
223 AlertSeverity::Error,
224 KernelAlertType::ProcessingError,
225 message,
226 )
227 }
228
229 pub fn queue_pressure(message: impl Into<String>, utilization_pct: u32) -> Self {
231 Self::new(
232 AlertSeverity::Warning,
233 KernelAlertType::QueuePressure,
234 format!("{} ({}% full)", message.into(), utilization_pct),
235 )
236 }
237
238 pub fn with_routing(mut self, routing: AlertRouting) -> Self {
240 self.routing = routing;
241 self
242 }
243}
244
245pub struct RingContext<'a> {
259 pub thread_id: ThreadId,
261 pub block_id: BlockId,
263 pub block_dim: Dim3,
265 pub grid_dim: Dim3,
267 clock: &'a HlcClock,
269 kernel_id: u64,
271 backend: ContextBackend,
273 domain: Option<Domain>,
275 metrics_buffer: ContextMetricsBuffer,
277 alert_sender: Option<mpsc::UnboundedSender<KernelAlert>>,
279}
280
281#[derive(Debug, Clone)]
283pub enum ContextBackend {
284 Cpu,
286 Cuda,
288 Metal,
290 Wgpu,
292}
293
294impl<'a> RingContext<'a> {
295 pub fn new(
299 thread_id: ThreadId,
300 block_id: BlockId,
301 block_dim: Dim3,
302 grid_dim: Dim3,
303 clock: &'a HlcClock,
304 kernel_id: u64,
305 backend: ContextBackend,
306 ) -> Self {
307 Self {
308 thread_id,
309 block_id,
310 block_dim,
311 grid_dim,
312 clock,
313 kernel_id,
314 backend,
315 domain: None,
316 metrics_buffer: ContextMetricsBuffer::default(),
317 alert_sender: None,
318 }
319 }
320
321 #[allow(clippy::too_many_arguments)]
336 pub fn new_with_options(
337 thread_id: ThreadId,
338 block_id: BlockId,
339 block_dim: Dim3,
340 grid_dim: Dim3,
341 clock: &'a HlcClock,
342 kernel_id: u64,
343 backend: ContextBackend,
344 domain: Option<Domain>,
345 metrics_capacity: usize,
346 alert_sender: Option<mpsc::UnboundedSender<KernelAlert>>,
347 ) -> Self {
348 Self {
349 thread_id,
350 block_id,
351 block_dim,
352 grid_dim,
353 clock,
354 kernel_id,
355 backend,
356 domain,
357 metrics_buffer: ContextMetricsBuffer::new(metrics_capacity),
358 alert_sender,
359 }
360 }
361
362 #[inline]
366 pub fn thread_id(&self) -> ThreadId {
367 self.thread_id
368 }
369
370 #[inline]
372 pub fn block_id(&self) -> BlockId {
373 self.block_id
374 }
375
376 #[inline]
378 pub fn global_thread_id(&self) -> GlobalThreadId {
379 GlobalThreadId::from_block_thread(self.block_id, self.thread_id, self.block_dim)
380 }
381
382 #[inline]
384 pub fn warp_id(&self) -> WarpId {
385 let linear = self
386 .thread_id
387 .linear_for_dim(self.block_dim.x, self.block_dim.y);
388 WarpId::from_thread_linear(linear)
389 }
390
391 #[inline]
393 pub fn lane_id(&self) -> u32 {
394 let linear = self
395 .thread_id
396 .linear_for_dim(self.block_dim.x, self.block_dim.y);
397 WarpId::lane_id(linear)
398 }
399
400 #[inline]
402 pub fn block_dim(&self) -> Dim3 {
403 self.block_dim
404 }
405
406 #[inline]
408 pub fn grid_dim(&self) -> Dim3 {
409 self.grid_dim
410 }
411
412 #[inline]
414 pub fn kernel_id(&self) -> u64 {
415 self.kernel_id
416 }
417
418 #[inline]
425 pub fn sync_threads(&self) {
426 match self.backend {
427 ContextBackend::Cpu => {
428 }
430 _ => {
431 }
434 }
435 }
436
437 #[inline]
441 pub fn sync_grid(&self) {
442 match self.backend {
443 ContextBackend::Cpu => {
444 }
446 _ => {
447 }
449 }
450 }
451
452 #[inline]
454 pub fn sync_warp(&self) {
455 match self.backend {
456 ContextBackend::Cpu => {
457 }
459 _ => {
460 }
462 }
463 }
464
465 #[inline]
469 pub fn thread_fence(&self, scope: FenceScope) {
470 match (self.backend.clone(), scope) {
471 (ContextBackend::Cpu, _) => {
472 std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
473 }
474 _ => {
475 }
477 }
478 }
479
480 #[inline]
482 pub fn fence_thread(&self) {
483 self.thread_fence(FenceScope::Thread);
484 }
485
486 #[inline]
488 pub fn fence_block(&self) {
489 self.thread_fence(FenceScope::Block);
490 }
491
492 #[inline]
494 pub fn fence_device(&self) {
495 self.thread_fence(FenceScope::Device);
496 }
497
498 #[inline]
500 pub fn fence_system(&self) {
501 self.thread_fence(FenceScope::System);
502 }
503
504 #[inline]
508 pub fn now(&self) -> HlcTimestamp {
509 self.clock.now()
510 }
511
512 #[inline]
514 pub fn tick(&self) -> HlcTimestamp {
515 self.clock.tick()
516 }
517
518 #[inline]
520 pub fn update_clock(&self, received: &HlcTimestamp) -> crate::error::Result<HlcTimestamp> {
521 self.clock.update(received)
522 }
523
524 #[inline]
528 pub fn atomic_add(
529 &self,
530 ptr: &std::sync::atomic::AtomicU64,
531 val: u64,
532 order: MemoryOrder,
533 ) -> u64 {
534 let ordering = match order {
535 MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
536 MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
537 MemoryOrder::Release => std::sync::atomic::Ordering::Release,
538 MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
539 MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
540 };
541 ptr.fetch_add(val, ordering)
542 }
543
544 #[inline]
546 pub fn atomic_cas(
547 &self,
548 ptr: &std::sync::atomic::AtomicU64,
549 expected: u64,
550 desired: u64,
551 success: MemoryOrder,
552 failure: MemoryOrder,
553 ) -> Result<u64, u64> {
554 let success_ord = match success {
555 MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
556 MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
557 MemoryOrder::Release => std::sync::atomic::Ordering::Release,
558 MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
559 MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
560 };
561 let failure_ord = match failure {
562 MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
563 MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
564 MemoryOrder::Release => std::sync::atomic::Ordering::Release,
565 MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
566 MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
567 };
568 ptr.compare_exchange(expected, desired, success_ord, failure_ord)
569 }
570
571 #[inline]
573 pub fn atomic_exchange(
574 &self,
575 ptr: &std::sync::atomic::AtomicU64,
576 val: u64,
577 order: MemoryOrder,
578 ) -> u64 {
579 let ordering = match order {
580 MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
581 MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
582 MemoryOrder::Release => std::sync::atomic::Ordering::Release,
583 MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
584 MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
585 };
586 ptr.swap(val, ordering)
587 }
588
589 #[inline]
595 pub fn warp_shuffle<T: Copy>(&self, value: T, src_lane: u32) -> T {
596 match self.backend {
597 ContextBackend::Cpu => {
598 let _ = src_lane;
600 value
601 }
602 _ => {
603 let _ = src_lane;
605 value
606 }
607 }
608 }
609
610 #[inline]
612 pub fn warp_shuffle_down<T: Copy>(&self, value: T, delta: u32) -> T {
613 self.warp_shuffle(value, self.lane_id().saturating_add(delta))
614 }
615
616 #[inline]
618 pub fn warp_shuffle_up<T: Copy>(&self, value: T, delta: u32) -> T {
619 self.warp_shuffle(value, self.lane_id().saturating_sub(delta))
620 }
621
622 #[inline]
624 pub fn warp_shuffle_xor<T: Copy>(&self, value: T, mask: u32) -> T {
625 self.warp_shuffle(value, self.lane_id() ^ mask)
626 }
627
628 #[inline]
630 pub fn warp_ballot(&self, predicate: bool) -> u32 {
631 match self.backend {
632 ContextBackend::Cpu => {
633 if predicate {
635 1
636 } else {
637 0
638 }
639 }
640 _ => {
641 if predicate {
643 1 << self.lane_id()
644 } else {
645 0
646 }
647 }
648 }
649 }
650
651 #[inline]
653 pub fn warp_all(&self, predicate: bool) -> bool {
654 match self.backend {
655 ContextBackend::Cpu => predicate,
656 _ => {
657 predicate
659 }
660 }
661 }
662
663 #[inline]
665 pub fn warp_any(&self, predicate: bool) -> bool {
666 match self.backend {
667 ContextBackend::Cpu => predicate,
668 _ => {
669 predicate
671 }
672 }
673 }
674
675 #[inline]
681 pub fn k2k_send(
682 &self,
683 _target_kernel: u64,
684 _envelope: &MessageEnvelope,
685 ) -> crate::error::Result<()> {
686 Err(crate::error::RingKernelError::NotSupported(
688 "K2K messaging requires runtime".to_string(),
689 ))
690 }
691
692 #[inline]
694 pub fn k2k_try_recv(&self) -> crate::error::Result<MessageEnvelope> {
695 Err(crate::error::RingKernelError::NotSupported(
697 "K2K messaging requires runtime".to_string(),
698 ))
699 }
700
701 #[inline]
707 pub fn domain(&self) -> Option<&Domain> {
708 self.domain.as_ref()
709 }
710
711 #[inline]
716 pub fn set_domain(&mut self, domain: Domain) {
717 self.domain = Some(domain);
718 }
719
720 #[inline]
722 pub fn clear_domain(&mut self) {
723 self.domain = None;
724 }
725
726 pub fn record_latency(&mut self, operation: &str, latency_us: u64) {
743 let entry = MetricsEntry {
744 operation: operation.to_string(),
745 metric_type: MetricType::Latency,
746 value: latency_us,
747 timestamp: self.clock.now(),
748 kernel_id: self.kernel_id,
749 domain: self.domain,
750 };
751 self.metrics_buffer.record(entry);
752 }
753
754 pub fn record_throughput(&mut self, operation: &str, count: u64) {
761 let entry = MetricsEntry {
762 operation: operation.to_string(),
763 metric_type: MetricType::Throughput,
764 value: count,
765 timestamp: self.clock.now(),
766 kernel_id: self.kernel_id,
767 domain: self.domain,
768 };
769 self.metrics_buffer.record(entry);
770 }
771
772 pub fn record_counter(&mut self, operation: &str, increment: u64) {
776 let entry = MetricsEntry {
777 operation: operation.to_string(),
778 metric_type: MetricType::Counter,
779 value: increment,
780 timestamp: self.clock.now(),
781 kernel_id: self.kernel_id,
782 domain: self.domain,
783 };
784 self.metrics_buffer.record(entry);
785 }
786
787 pub fn record_gauge(&mut self, operation: &str, value: u64) {
792 let entry = MetricsEntry {
793 operation: operation.to_string(),
794 metric_type: MetricType::Gauge,
795 value,
796 timestamp: self.clock.now(),
797 kernel_id: self.kernel_id,
798 domain: self.domain,
799 };
800 self.metrics_buffer.record(entry);
801 }
802
803 pub fn flush_metrics(&mut self) -> Vec<MetricsEntry> {
809 self.metrics_buffer.drain()
810 }
811
812 pub fn metrics_count(&self) -> usize {
814 self.metrics_buffer.len()
815 }
816
817 pub fn metrics_buffer_full(&self) -> bool {
819 self.metrics_buffer.is_full()
820 }
821
822 pub fn emit_alert(&self, alert: impl Into<KernelAlert>) {
840 if let Some(ref sender) = self.alert_sender {
841 let mut alert = alert.into();
842 alert.source_kernel = self.kernel_id;
844 alert.source_domain = self.domain;
845 alert.timestamp = self.clock.now();
846 let _ = sender.send(alert);
848 }
849 }
850
851 #[inline]
853 pub fn has_alert_channel(&self) -> bool {
854 self.alert_sender.is_some()
855 }
856
857 pub fn alert_if_slow(&self, operation: &str, latency_us: u64, threshold_us: u64) {
868 if latency_us > threshold_us {
869 self.emit_alert(KernelAlert::high_latency(
870 format!("{} exceeded threshold", operation),
871 latency_us,
872 ));
873 }
874 }
875}
876
877impl<'a> std::fmt::Debug for RingContext<'a> {
878 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
879 f.debug_struct("RingContext")
880 .field("thread_id", &self.thread_id)
881 .field("block_id", &self.block_id)
882 .field("block_dim", &self.block_dim)
883 .field("grid_dim", &self.grid_dim)
884 .field("kernel_id", &self.kernel_id)
885 .field("backend", &self.backend)
886 .finish()
887 }
888}
889
890#[cfg(test)]
891mod tests {
892 use super::*;
893
894 fn make_test_context(clock: &HlcClock) -> RingContext<'_> {
895 RingContext::new(
896 ThreadId::new_1d(0),
897 BlockId::new_1d(0),
898 Dim3::new_1d(256),
899 Dim3::new_1d(1),
900 clock,
901 1,
902 ContextBackend::Cpu,
903 )
904 }
905
906 #[test]
907 fn test_thread_identity() {
908 let clock = HlcClock::new(1);
909 let ctx = make_test_context(&clock);
910
911 assert_eq!(ctx.thread_id().x, 0);
912 assert_eq!(ctx.block_id().x, 0);
913 assert_eq!(ctx.global_thread_id().x, 0);
914 }
915
916 #[test]
917 fn test_warp_id() {
918 let clock = HlcClock::new(1);
919 let ctx = RingContext::new(
920 ThreadId::new_1d(35), BlockId::new_1d(0),
922 Dim3::new_1d(256),
923 Dim3::new_1d(1),
924 &clock,
925 1,
926 ContextBackend::Cpu,
927 );
928
929 assert_eq!(ctx.warp_id().0, 1);
930 assert_eq!(ctx.lane_id(), 3);
931 }
932
933 #[test]
934 fn test_hlc_operations() {
935 let clock = HlcClock::new(1);
936 let ctx = make_test_context(&clock);
937
938 let ts1 = ctx.now();
939 let ts2 = ctx.tick();
940 assert!(ts2 >= ts1);
941 }
942
943 #[test]
944 fn test_warp_ballot_cpu() {
945 let clock = HlcClock::new(1);
946 let ctx = make_test_context(&clock);
947
948 assert_eq!(ctx.warp_ballot(true), 1);
949 assert_eq!(ctx.warp_ballot(false), 0);
950 }
951
952 #[test]
955 fn test_domain_operations() {
956 let clock = HlcClock::new(1);
957 let mut ctx = make_test_context(&clock);
958
959 assert!(ctx.domain().is_none());
961
962 ctx.set_domain(Domain::OrderMatching);
964 assert_eq!(ctx.domain(), Some(&Domain::OrderMatching));
965
966 ctx.clear_domain();
968 assert!(ctx.domain().is_none());
969 }
970
971 #[test]
972 fn test_context_with_domain() {
973 let clock = HlcClock::new(1);
974 let ctx = RingContext::new_with_options(
975 ThreadId::new_1d(0),
976 BlockId::new_1d(0),
977 Dim3::new_1d(256),
978 Dim3::new_1d(1),
979 &clock,
980 42,
981 ContextBackend::Cpu,
982 Some(Domain::RiskManagement),
983 128,
984 None,
985 );
986
987 assert_eq!(ctx.domain(), Some(&Domain::RiskManagement));
988 assert_eq!(ctx.kernel_id(), 42);
989 }
990
991 #[test]
992 fn test_metrics_buffer() {
993 let mut buffer = ContextMetricsBuffer::new(3);
994
995 assert!(buffer.is_empty());
996 assert!(!buffer.is_full());
997 assert_eq!(buffer.len(), 0);
998
999 let entry = MetricsEntry {
1000 operation: "test".to_string(),
1001 metric_type: MetricType::Latency,
1002 value: 100,
1003 timestamp: HlcTimestamp::zero(),
1004 kernel_id: 1,
1005 domain: None,
1006 };
1007
1008 buffer.record(entry.clone());
1009 assert_eq!(buffer.len(), 1);
1010
1011 buffer.record(entry.clone());
1012 buffer.record(entry.clone());
1013 assert!(buffer.is_full());
1014
1015 let entries = buffer.drain();
1017 assert_eq!(entries.len(), 3);
1018 assert!(buffer.is_empty());
1019 }
1020
1021 #[test]
1022 fn test_record_metrics() {
1023 let clock = HlcClock::new(1);
1024 let mut ctx = RingContext::new_with_options(
1025 ThreadId::new_1d(0),
1026 BlockId::new_1d(0),
1027 Dim3::new_1d(256),
1028 Dim3::new_1d(1),
1029 &clock,
1030 100,
1031 ContextBackend::Cpu,
1032 Some(Domain::Compliance),
1033 256,
1034 None,
1035 );
1036
1037 ctx.record_latency("process_order", 500);
1038 ctx.record_throughput("orders_per_sec", 1000);
1039 ctx.record_counter("total_orders", 1);
1040 ctx.record_gauge("queue_depth", 42);
1041
1042 assert_eq!(ctx.metrics_count(), 4);
1043
1044 let metrics = ctx.flush_metrics();
1045 assert_eq!(metrics.len(), 4);
1046
1047 assert_eq!(metrics[0].operation, "process_order");
1049 assert_eq!(metrics[0].metric_type, MetricType::Latency);
1050 assert_eq!(metrics[0].value, 500);
1051 assert_eq!(metrics[0].kernel_id, 100);
1052 assert_eq!(metrics[0].domain, Some(Domain::Compliance));
1053
1054 assert_eq!(metrics[1].metric_type, MetricType::Throughput);
1055 assert_eq!(metrics[2].metric_type, MetricType::Counter);
1056 assert_eq!(metrics[3].metric_type, MetricType::Gauge);
1057 assert_eq!(metrics[3].value, 42);
1058
1059 assert_eq!(ctx.metrics_count(), 0);
1061 }
1062
1063 #[test]
1064 fn test_kernel_alert_constructors() {
1065 let alert = KernelAlert::high_latency("Slow", 500);
1066 assert_eq!(alert.severity, AlertSeverity::Warning);
1067 assert_eq!(alert.alert_type, KernelAlertType::HighLatency);
1068 assert!(alert.message.contains("500µs"));
1069
1070 let alert = KernelAlert::error("Failed");
1071 assert_eq!(alert.severity, AlertSeverity::Error);
1072 assert_eq!(alert.alert_type, KernelAlertType::ProcessingError);
1073
1074 let alert = KernelAlert::queue_pressure("Input queue", 85);
1075 assert_eq!(alert.alert_type, KernelAlertType::QueuePressure);
1076 assert!(alert.message.contains("85%"));
1077
1078 let alert = KernelAlert::new(
1079 AlertSeverity::Critical,
1080 KernelAlertType::Custom(42),
1081 "Custom alert",
1082 )
1083 .with_routing(AlertRouting::External);
1084 assert_eq!(alert.severity, AlertSeverity::Critical);
1085 assert!(matches!(alert.routing, AlertRouting::External));
1086 }
1087
1088 #[test]
1089 fn test_emit_alert_with_channel() {
1090 let (tx, mut rx) = mpsc::unbounded_channel();
1091 let clock = HlcClock::new(1);
1092 let ctx = RingContext::new_with_options(
1093 ThreadId::new_1d(0),
1094 BlockId::new_1d(0),
1095 Dim3::new_1d(256),
1096 Dim3::new_1d(1),
1097 &clock,
1098 42,
1099 ContextBackend::Cpu,
1100 Some(Domain::OrderMatching),
1101 256,
1102 Some(tx),
1103 );
1104
1105 assert!(ctx.has_alert_channel());
1106 ctx.emit_alert(KernelAlert::error("Test error"));
1107
1108 let alert = rx.try_recv().expect("Should receive alert");
1110 assert_eq!(alert.source_kernel, 42);
1111 assert_eq!(alert.source_domain, Some(Domain::OrderMatching));
1112 assert_eq!(alert.alert_type, KernelAlertType::ProcessingError);
1113 }
1114
1115 #[test]
1116 fn test_emit_alert_without_channel() {
1117 let clock = HlcClock::new(1);
1118 let ctx = make_test_context(&clock);
1119
1120 assert!(!ctx.has_alert_channel());
1121 ctx.emit_alert(KernelAlert::error("No-op"));
1123 }
1124
1125 #[test]
1126 fn test_alert_if_slow() {
1127 let (tx, mut rx) = mpsc::unbounded_channel();
1128 let clock = HlcClock::new(1);
1129 let ctx = RingContext::new_with_options(
1130 ThreadId::new_1d(0),
1131 BlockId::new_1d(0),
1132 Dim3::new_1d(256),
1133 Dim3::new_1d(1),
1134 &clock,
1135 1,
1136 ContextBackend::Cpu,
1137 None,
1138 256,
1139 Some(tx),
1140 );
1141
1142 ctx.alert_if_slow("fast_op", 50, 100);
1144 assert!(rx.try_recv().is_err());
1145
1146 ctx.alert_if_slow("slow_op", 150, 100);
1148 let alert = rx.try_recv().expect("Should receive alert");
1149 assert!(alert.message.contains("slow_op"));
1150 assert!(alert.message.contains("150µs"));
1151 }
1152
1153 #[test]
1154 fn test_alert_severity_ordering() {
1155 assert!(AlertSeverity::Info < AlertSeverity::Warning);
1156 assert!(AlertSeverity::Warning < AlertSeverity::Error);
1157 assert!(AlertSeverity::Error < AlertSeverity::Critical);
1158 }
1159}