ringkernel_core/
context.rs

1//! Ring context providing GPU intrinsics facade for kernel handlers.
2//!
3//! The RingContext provides a unified interface for GPU operations that
4//! abstracts over different backends (CUDA, Metal, WebGPU, CPU).
5//!
6//! # Domain Support (FR-2)
7//!
8//! RingContext supports domain-aware operations including:
9//! - Domain metadata access via `domain()`
10//! - Metrics collection via `record_latency()` and `record_throughput()`
11//! - Alert emission via `emit_alert()`
12//!
13//! # Example
14//!
15//! ```ignore
16//! use ringkernel_core::prelude::*;
17//!
18//! fn process(ctx: &mut RingContext, msg: MyMessage) {
19//!     let start = std::time::Instant::now();
20//!
21//!     // Process message...
22//!
23//!     // Record metrics
24//!     ctx.record_latency("process_message", start.elapsed().as_micros() as u64);
25//!     ctx.record_throughput("messages", 1);
26//!
27//!     // Emit alert if needed
28//!     if start.elapsed().as_millis() > 100 {
29//!         ctx.emit_alert(KernelAlert::high_latency("Slow message processing", 100));
30//!     }
31//! }
32//! ```
33
34use crate::domain::Domain;
35use crate::hlc::{HlcClock, HlcTimestamp};
36use crate::message::MessageEnvelope;
37use crate::types::{BlockId, Dim3, FenceScope, GlobalThreadId, MemoryOrder, ThreadId, WarpId};
38use tokio::sync::mpsc;
39
40// ============================================================================
41// Metrics Types (FR-2)
42// ============================================================================
43
44/// Type of metric entry.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum MetricType {
47    /// Latency measurement in microseconds.
48    Latency,
49    /// Throughput measurement (count per time period).
50    Throughput,
51    /// Counter increment.
52    Counter,
53    /// Gauge value (absolute measurement).
54    Gauge,
55}
56
57/// A single metrics entry recorded by the kernel.
58#[derive(Debug, Clone)]
59pub struct MetricsEntry {
60    /// Operation name (e.g., "process_order", "validate_tx").
61    pub operation: String,
62    /// Type of metric.
63    pub metric_type: MetricType,
64    /// Metric value.
65    pub value: u64,
66    /// When this metric was recorded.
67    pub timestamp: HlcTimestamp,
68    /// Kernel ID that recorded this metric.
69    pub kernel_id: u64,
70    /// Domain of the kernel (if any).
71    pub domain: Option<Domain>,
72}
73
74/// Buffer for collecting metrics within a kernel context.
75#[derive(Debug)]
76pub struct ContextMetricsBuffer {
77    entries: Vec<MetricsEntry>,
78    capacity: usize,
79}
80
81impl ContextMetricsBuffer {
82    /// Create a new metrics buffer with specified capacity.
83    pub fn new(capacity: usize) -> Self {
84        Self {
85            entries: Vec::with_capacity(capacity.min(1024)), // Cap allocation
86            capacity,
87        }
88    }
89
90    /// Record a new metrics entry.
91    pub fn record(&mut self, entry: MetricsEntry) {
92        if self.entries.len() < self.capacity {
93            self.entries.push(entry);
94        }
95        // If full, oldest entries are silently dropped
96        // (ring buffer behavior could be added for production)
97    }
98
99    /// Drain all buffered entries.
100    pub fn drain(&mut self) -> Vec<MetricsEntry> {
101        std::mem::take(&mut self.entries)
102    }
103
104    /// Check if buffer is full.
105    pub fn is_full(&self) -> bool {
106        self.entries.len() >= self.capacity
107    }
108
109    /// Get current entry count.
110    pub fn len(&self) -> usize {
111        self.entries.len()
112    }
113
114    /// Check if buffer is empty.
115    pub fn is_empty(&self) -> bool {
116        self.entries.is_empty()
117    }
118}
119
120impl Default for ContextMetricsBuffer {
121    fn default() -> Self {
122        Self::new(256)
123    }
124}
125
126// ============================================================================
127// Alert Types (FR-2)
128// ============================================================================
129
130/// Severity level for kernel alerts.
131#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
132pub enum AlertSeverity {
133    /// Informational alert.
134    Info = 0,
135    /// Warning - potential issue.
136    Warning = 1,
137    /// Error - operation failed.
138    Error = 2,
139    /// Critical - system-level issue.
140    Critical = 3,
141}
142
143/// Type of kernel alert.
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum KernelAlertType {
146    /// High latency detected.
147    HighLatency,
148    /// Queue approaching capacity.
149    QueuePressure,
150    /// Memory pressure detected.
151    MemoryPressure,
152    /// Processing error occurred.
153    ProcessingError,
154    /// Domain-specific alert (custom type code).
155    DomainAlert(u32),
156    /// Custom application alert (custom type code).
157    Custom(u32),
158}
159
160/// Routing destination for alerts.
161#[derive(Debug, Clone, Copy, Default)]
162pub enum AlertRouting {
163    /// Route to host only (default).
164    #[default]
165    Host,
166    /// Route to specific kernel via K2K.
167    Kernel(u64),
168    /// Broadcast to all kernels in same domain.
169    Domain,
170    /// Route to external monitoring system (via host bridge).
171    External,
172}
173
174/// An alert emitted from a kernel.
175#[derive(Debug, Clone)]
176pub struct KernelAlert {
177    /// Severity level.
178    pub severity: AlertSeverity,
179    /// Alert type.
180    pub alert_type: KernelAlertType,
181    /// Human-readable message.
182    pub message: String,
183    /// Source kernel ID.
184    pub source_kernel: u64,
185    /// Source domain (if applicable).
186    pub source_domain: Option<Domain>,
187    /// When this alert was created.
188    pub timestamp: HlcTimestamp,
189    /// Routing destination.
190    pub routing: AlertRouting,
191}
192
193impl KernelAlert {
194    /// Create a new alert.
195    pub fn new(
196        severity: AlertSeverity,
197        alert_type: KernelAlertType,
198        message: impl Into<String>,
199    ) -> Self {
200        Self {
201            severity,
202            alert_type,
203            message: message.into(),
204            source_kernel: 0,
205            source_domain: None,
206            timestamp: HlcTimestamp::zero(),
207            routing: AlertRouting::default(),
208        }
209    }
210
211    /// Create a high latency alert.
212    pub fn high_latency(message: impl Into<String>, latency_us: u64) -> Self {
213        Self::new(
214            AlertSeverity::Warning,
215            KernelAlertType::HighLatency,
216            format!("{} ({}µs)", message.into(), latency_us),
217        )
218    }
219
220    /// Create a processing error alert.
221    pub fn error(message: impl Into<String>) -> Self {
222        Self::new(
223            AlertSeverity::Error,
224            KernelAlertType::ProcessingError,
225            message,
226        )
227    }
228
229    /// Create a queue pressure warning.
230    pub fn queue_pressure(message: impl Into<String>, utilization_pct: u32) -> Self {
231        Self::new(
232            AlertSeverity::Warning,
233            KernelAlertType::QueuePressure,
234            format!("{} ({}% full)", message.into(), utilization_pct),
235        )
236    }
237
238    /// Set routing destination.
239    pub fn with_routing(mut self, routing: AlertRouting) -> Self {
240        self.routing = routing;
241        self
242    }
243}
244
245// ============================================================================
246// RingContext
247// ============================================================================
248
249/// GPU intrinsics facade for kernel handlers.
250///
251/// This struct provides access to GPU-specific operations like thread
252/// identification, synchronization, and atomic operations. The actual
253/// implementation varies by backend.
254///
255/// # Lifetime
256///
257/// The context is borrowed for the duration of the kernel handler execution.
258pub struct RingContext<'a> {
259    /// Thread identity within block.
260    pub thread_id: ThreadId,
261    /// Block identity within grid.
262    pub block_id: BlockId,
263    /// Block dimensions.
264    pub block_dim: Dim3,
265    /// Grid dimensions.
266    pub grid_dim: Dim3,
267    /// HLC clock instance.
268    clock: &'a HlcClock,
269    /// Kernel ID.
270    kernel_id: u64,
271    /// Backend implementation.
272    backend: ContextBackend,
273    /// Domain this kernel operates in (FR-2).
274    domain: Option<Domain>,
275    /// Metrics buffer for this context (FR-2).
276    metrics_buffer: ContextMetricsBuffer,
277    /// Alert sender channel (FR-2).
278    alert_sender: Option<mpsc::UnboundedSender<KernelAlert>>,
279}
280
281/// Backend-specific context implementation.
282#[derive(Debug, Clone)]
283pub enum ContextBackend {
284    /// CPU backend (for testing).
285    Cpu,
286    /// CUDA backend.
287    Cuda,
288    /// Metal backend.
289    Metal,
290    /// WebGPU backend.
291    Wgpu,
292}
293
294impl<'a> RingContext<'a> {
295    /// Create a new context with basic configuration.
296    ///
297    /// For domain-aware contexts, use `new_with_options` instead.
298    pub fn new(
299        thread_id: ThreadId,
300        block_id: BlockId,
301        block_dim: Dim3,
302        grid_dim: Dim3,
303        clock: &'a HlcClock,
304        kernel_id: u64,
305        backend: ContextBackend,
306    ) -> Self {
307        Self {
308            thread_id,
309            block_id,
310            block_dim,
311            grid_dim,
312            clock,
313            kernel_id,
314            backend,
315            domain: None,
316            metrics_buffer: ContextMetricsBuffer::default(),
317            alert_sender: None,
318        }
319    }
320
321    /// Create a new context with full configuration (FR-2).
322    ///
323    /// # Arguments
324    ///
325    /// * `thread_id` - Thread identity within block
326    /// * `block_id` - Block identity within grid
327    /// * `block_dim` - Block dimensions
328    /// * `grid_dim` - Grid dimensions
329    /// * `clock` - HLC clock instance
330    /// * `kernel_id` - Unique kernel identifier
331    /// * `backend` - Backend implementation
332    /// * `domain` - Optional domain for this kernel
333    /// * `metrics_capacity` - Metrics buffer capacity
334    /// * `alert_sender` - Optional channel for emitting alerts
335    #[allow(clippy::too_many_arguments)]
336    pub fn new_with_options(
337        thread_id: ThreadId,
338        block_id: BlockId,
339        block_dim: Dim3,
340        grid_dim: Dim3,
341        clock: &'a HlcClock,
342        kernel_id: u64,
343        backend: ContextBackend,
344        domain: Option<Domain>,
345        metrics_capacity: usize,
346        alert_sender: Option<mpsc::UnboundedSender<KernelAlert>>,
347    ) -> Self {
348        Self {
349            thread_id,
350            block_id,
351            block_dim,
352            grid_dim,
353            clock,
354            kernel_id,
355            backend,
356            domain,
357            metrics_buffer: ContextMetricsBuffer::new(metrics_capacity),
358            alert_sender,
359        }
360    }
361
362    // === Thread Identity ===
363
364    /// Get thread ID within block.
365    #[inline]
366    pub fn thread_id(&self) -> ThreadId {
367        self.thread_id
368    }
369
370    /// Get block ID within grid.
371    #[inline]
372    pub fn block_id(&self) -> BlockId {
373        self.block_id
374    }
375
376    /// Get global thread ID across all blocks.
377    #[inline]
378    pub fn global_thread_id(&self) -> GlobalThreadId {
379        GlobalThreadId::from_block_thread(self.block_id, self.thread_id, self.block_dim)
380    }
381
382    /// Get warp ID within block.
383    #[inline]
384    pub fn warp_id(&self) -> WarpId {
385        let linear = self
386            .thread_id
387            .linear_for_dim(self.block_dim.x, self.block_dim.y);
388        WarpId::from_thread_linear(linear)
389    }
390
391    /// Get lane ID within warp (0-31).
392    #[inline]
393    pub fn lane_id(&self) -> u32 {
394        let linear = self
395            .thread_id
396            .linear_for_dim(self.block_dim.x, self.block_dim.y);
397        WarpId::lane_id(linear)
398    }
399
400    /// Get block dimensions.
401    #[inline]
402    pub fn block_dim(&self) -> Dim3 {
403        self.block_dim
404    }
405
406    /// Get grid dimensions.
407    #[inline]
408    pub fn grid_dim(&self) -> Dim3 {
409        self.grid_dim
410    }
411
412    /// Get kernel ID.
413    #[inline]
414    pub fn kernel_id(&self) -> u64 {
415        self.kernel_id
416    }
417
418    // === Synchronization ===
419
420    /// Synchronize all threads in the block.
421    ///
422    /// All threads in the block must reach this barrier before any
423    /// thread can proceed past it.
424    #[inline]
425    pub fn sync_threads(&self) {
426        match self.backend {
427            ContextBackend::Cpu => {
428                // CPU: no-op (single-threaded simulation)
429            }
430            _ => {
431                // GPU backends would call __syncthreads() or equivalent
432                // Placeholder for actual implementation
433            }
434        }
435    }
436
437    /// Synchronize all threads in the grid (cooperative groups).
438    ///
439    /// Requires cooperative kernel launch support.
440    #[inline]
441    pub fn sync_grid(&self) {
442        match self.backend {
443            ContextBackend::Cpu => {
444                // CPU: no-op
445            }
446            _ => {
447                // GPU backends would call cooperative grid sync
448            }
449        }
450    }
451
452    /// Synchronize threads within a warp.
453    #[inline]
454    pub fn sync_warp(&self) {
455        match self.backend {
456            ContextBackend::Cpu => {
457                // CPU: no-op
458            }
459            _ => {
460                // GPU backends would call __syncwarp()
461            }
462        }
463    }
464
465    // === Memory Fencing ===
466
467    /// Memory fence at the specified scope.
468    #[inline]
469    pub fn thread_fence(&self, scope: FenceScope) {
470        match (self.backend.clone(), scope) {
471            (ContextBackend::Cpu, _) => {
472                std::sync::atomic::fence(std::sync::atomic::Ordering::SeqCst);
473            }
474            _ => {
475                // GPU backends would call appropriate fence intrinsic
476            }
477        }
478    }
479
480    /// Thread-scope fence (compiler barrier).
481    #[inline]
482    pub fn fence_thread(&self) {
483        self.thread_fence(FenceScope::Thread);
484    }
485
486    /// Block-scope fence.
487    #[inline]
488    pub fn fence_block(&self) {
489        self.thread_fence(FenceScope::Block);
490    }
491
492    /// Device-scope fence.
493    #[inline]
494    pub fn fence_device(&self) {
495        self.thread_fence(FenceScope::Device);
496    }
497
498    /// System-scope fence (CPU+GPU visible).
499    #[inline]
500    pub fn fence_system(&self) {
501        self.thread_fence(FenceScope::System);
502    }
503
504    // === HLC Operations ===
505
506    /// Get current HLC timestamp.
507    #[inline]
508    pub fn now(&self) -> HlcTimestamp {
509        self.clock.now()
510    }
511
512    /// Generate a new HLC timestamp (advances clock).
513    #[inline]
514    pub fn tick(&self) -> HlcTimestamp {
515        self.clock.tick()
516    }
517
518    /// Update clock with received timestamp.
519    #[inline]
520    pub fn update_clock(&self, received: &HlcTimestamp) -> crate::error::Result<HlcTimestamp> {
521        self.clock.update(received)
522    }
523
524    // === Atomic Operations ===
525
526    /// Atomic add and return old value.
527    #[inline]
528    pub fn atomic_add(
529        &self,
530        ptr: &std::sync::atomic::AtomicU64,
531        val: u64,
532        order: MemoryOrder,
533    ) -> u64 {
534        let ordering = match order {
535            MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
536            MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
537            MemoryOrder::Release => std::sync::atomic::Ordering::Release,
538            MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
539            MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
540        };
541        ptr.fetch_add(val, ordering)
542    }
543
544    /// Atomic compare-and-swap.
545    #[inline]
546    pub fn atomic_cas(
547        &self,
548        ptr: &std::sync::atomic::AtomicU64,
549        expected: u64,
550        desired: u64,
551        success: MemoryOrder,
552        failure: MemoryOrder,
553    ) -> Result<u64, u64> {
554        let success_ord = match success {
555            MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
556            MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
557            MemoryOrder::Release => std::sync::atomic::Ordering::Release,
558            MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
559            MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
560        };
561        let failure_ord = match failure {
562            MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
563            MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
564            MemoryOrder::Release => std::sync::atomic::Ordering::Release,
565            MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
566            MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
567        };
568        ptr.compare_exchange(expected, desired, success_ord, failure_ord)
569    }
570
571    /// Atomic exchange.
572    #[inline]
573    pub fn atomic_exchange(
574        &self,
575        ptr: &std::sync::atomic::AtomicU64,
576        val: u64,
577        order: MemoryOrder,
578    ) -> u64 {
579        let ordering = match order {
580            MemoryOrder::Relaxed => std::sync::atomic::Ordering::Relaxed,
581            MemoryOrder::Acquire => std::sync::atomic::Ordering::Acquire,
582            MemoryOrder::Release => std::sync::atomic::Ordering::Release,
583            MemoryOrder::AcquireRelease => std::sync::atomic::Ordering::AcqRel,
584            MemoryOrder::SeqCst => std::sync::atomic::Ordering::SeqCst,
585        };
586        ptr.swap(val, ordering)
587    }
588
589    // === Warp Primitives ===
590
591    /// Warp shuffle - get value from another lane.
592    ///
593    /// Returns the value from the specified source lane.
594    #[inline]
595    pub fn warp_shuffle<T: Copy>(&self, value: T, src_lane: u32) -> T {
596        match self.backend {
597            ContextBackend::Cpu => {
598                // CPU: just return own value (no other lanes)
599                let _ = src_lane;
600                value
601            }
602            _ => {
603                // GPU would use __shfl_sync()
604                let _ = src_lane;
605                value
606            }
607        }
608    }
609
610    /// Warp shuffle down - get value from lane + delta.
611    #[inline]
612    pub fn warp_shuffle_down<T: Copy>(&self, value: T, delta: u32) -> T {
613        self.warp_shuffle(value, self.lane_id().saturating_add(delta))
614    }
615
616    /// Warp shuffle up - get value from lane - delta.
617    #[inline]
618    pub fn warp_shuffle_up<T: Copy>(&self, value: T, delta: u32) -> T {
619        self.warp_shuffle(value, self.lane_id().saturating_sub(delta))
620    }
621
622    /// Warp shuffle XOR - get value from lane XOR mask.
623    #[inline]
624    pub fn warp_shuffle_xor<T: Copy>(&self, value: T, mask: u32) -> T {
625        self.warp_shuffle(value, self.lane_id() ^ mask)
626    }
627
628    /// Warp ballot - get bitmask of lanes where predicate is true.
629    #[inline]
630    pub fn warp_ballot(&self, predicate: bool) -> u32 {
631        match self.backend {
632            ContextBackend::Cpu => {
633                // CPU: single thread, return 1 or 0
634                if predicate {
635                    1
636                } else {
637                    0
638                }
639            }
640            _ => {
641                // GPU would use __ballot_sync()
642                if predicate {
643                    1 << self.lane_id()
644                } else {
645                    0
646                }
647            }
648        }
649    }
650
651    /// Warp all - check if predicate is true for all lanes.
652    #[inline]
653    pub fn warp_all(&self, predicate: bool) -> bool {
654        match self.backend {
655            ContextBackend::Cpu => predicate,
656            _ => {
657                // GPU would use __all_sync()
658                predicate
659            }
660        }
661    }
662
663    /// Warp any - check if predicate is true for any lane.
664    #[inline]
665    pub fn warp_any(&self, predicate: bool) -> bool {
666        match self.backend {
667            ContextBackend::Cpu => predicate,
668            _ => {
669                // GPU would use __any_sync()
670                predicate
671            }
672        }
673    }
674
675    // === K2K Messaging ===
676
677    /// Send message to another kernel (K2K).
678    ///
679    /// This is a placeholder; actual implementation requires runtime support.
680    #[inline]
681    pub fn k2k_send(
682        &self,
683        _target_kernel: u64,
684        _envelope: &MessageEnvelope,
685    ) -> crate::error::Result<()> {
686        // K2K messaging requires runtime bridge support
687        Err(crate::error::RingKernelError::NotSupported(
688            "K2K messaging requires runtime".to_string(),
689        ))
690    }
691
692    /// Try to receive message from K2K queue.
693    #[inline]
694    pub fn k2k_try_recv(&self) -> crate::error::Result<MessageEnvelope> {
695        // K2K messaging requires runtime bridge support
696        Err(crate::error::RingKernelError::NotSupported(
697            "K2K messaging requires runtime".to_string(),
698        ))
699    }
700
701    // === Domain Operations (FR-2) ===
702
703    /// Get the domain this kernel operates in.
704    ///
705    /// Returns `None` if no domain was configured.
706    #[inline]
707    pub fn domain(&self) -> Option<&Domain> {
708        self.domain.as_ref()
709    }
710
711    /// Set the domain for this kernel context.
712    ///
713    /// This allows runtime domain assignment for kernels that process
714    /// messages from multiple domains.
715    #[inline]
716    pub fn set_domain(&mut self, domain: Domain) {
717        self.domain = Some(domain);
718    }
719
720    /// Clear the domain setting.
721    #[inline]
722    pub fn clear_domain(&mut self) {
723        self.domain = None;
724    }
725
726    // === Metrics Operations (FR-2) ===
727
728    /// Record a latency metric in microseconds.
729    ///
730    /// # Arguments
731    ///
732    /// * `operation` - Name of the operation (e.g., "process_order")
733    /// * `latency_us` - Latency in microseconds
734    ///
735    /// # Example
736    ///
737    /// ```ignore
738    /// let start = std::time::Instant::now();
739    /// // ... process message ...
740    /// ctx.record_latency("process", start.elapsed().as_micros() as u64);
741    /// ```
742    pub fn record_latency(&mut self, operation: &str, latency_us: u64) {
743        let entry = MetricsEntry {
744            operation: operation.to_string(),
745            metric_type: MetricType::Latency,
746            value: latency_us,
747            timestamp: self.clock.now(),
748            kernel_id: self.kernel_id,
749            domain: self.domain,
750        };
751        self.metrics_buffer.record(entry);
752    }
753
754    /// Record a throughput metric (count per time period).
755    ///
756    /// # Arguments
757    ///
758    /// * `operation` - Name of the operation (e.g., "messages_processed")
759    /// * `count` - Number of items processed
760    pub fn record_throughput(&mut self, operation: &str, count: u64) {
761        let entry = MetricsEntry {
762            operation: operation.to_string(),
763            metric_type: MetricType::Throughput,
764            value: count,
765            timestamp: self.clock.now(),
766            kernel_id: self.kernel_id,
767            domain: self.domain,
768        };
769        self.metrics_buffer.record(entry);
770    }
771
772    /// Record a counter increment.
773    ///
774    /// Counters are monotonically increasing values (e.g., total orders received).
775    pub fn record_counter(&mut self, operation: &str, increment: u64) {
776        let entry = MetricsEntry {
777            operation: operation.to_string(),
778            metric_type: MetricType::Counter,
779            value: increment,
780            timestamp: self.clock.now(),
781            kernel_id: self.kernel_id,
782            domain: self.domain,
783        };
784        self.metrics_buffer.record(entry);
785    }
786
787    /// Record a gauge value (absolute measurement).
788    ///
789    /// Gauges represent point-in-time values that can go up or down
790    /// (e.g., queue depth, memory usage).
791    pub fn record_gauge(&mut self, operation: &str, value: u64) {
792        let entry = MetricsEntry {
793            operation: operation.to_string(),
794            metric_type: MetricType::Gauge,
795            value,
796            timestamp: self.clock.now(),
797            kernel_id: self.kernel_id,
798            domain: self.domain,
799        };
800        self.metrics_buffer.record(entry);
801    }
802
803    /// Flush and return all buffered metrics.
804    ///
805    /// After calling this method, the metrics buffer will be empty.
806    /// This is typically called by the runtime when transferring metrics
807    /// to the host telemetry pipeline.
808    pub fn flush_metrics(&mut self) -> Vec<MetricsEntry> {
809        self.metrics_buffer.drain()
810    }
811
812    /// Get the number of buffered metrics entries.
813    pub fn metrics_count(&self) -> usize {
814        self.metrics_buffer.len()
815    }
816
817    /// Check if the metrics buffer is full.
818    pub fn metrics_buffer_full(&self) -> bool {
819        self.metrics_buffer.is_full()
820    }
821
822    // === Alert Operations (FR-2) ===
823
824    /// Emit an alert from this kernel.
825    ///
826    /// Alerts are sent to the configured alert channel (if any).
827    /// The alert is enriched with kernel ID, domain, and timestamp.
828    ///
829    /// # Arguments
830    ///
831    /// * `alert` - The alert to emit (can be created via `KernelAlert::new()` helpers)
832    ///
833    /// # Example
834    ///
835    /// ```ignore
836    /// ctx.emit_alert(KernelAlert::high_latency("Slow processing", 500));
837    /// ctx.emit_alert(KernelAlert::error("Processing failed"));
838    /// ```
839    pub fn emit_alert(&self, alert: impl Into<KernelAlert>) {
840        if let Some(ref sender) = self.alert_sender {
841            let mut alert = alert.into();
842            // Enrich alert with context info
843            alert.source_kernel = self.kernel_id;
844            alert.source_domain = self.domain;
845            alert.timestamp = self.clock.now();
846            // Send (ignore errors - fire and forget)
847            let _ = sender.send(alert);
848        }
849    }
850
851    /// Check if alert sending is configured.
852    #[inline]
853    pub fn has_alert_channel(&self) -> bool {
854        self.alert_sender.is_some()
855    }
856
857    /// Emit a high-latency alert if latency exceeds threshold.
858    ///
859    /// Convenience method that only emits an alert when latency exceeds
860    /// the specified threshold.
861    ///
862    /// # Arguments
863    ///
864    /// * `operation` - Name of the operation
865    /// * `latency_us` - Actual latency in microseconds
866    /// * `threshold_us` - Threshold above which to emit alert
867    pub fn alert_if_slow(&self, operation: &str, latency_us: u64, threshold_us: u64) {
868        if latency_us > threshold_us {
869            self.emit_alert(KernelAlert::high_latency(
870                format!("{} exceeded threshold", operation),
871                latency_us,
872            ));
873        }
874    }
875}
876
877impl<'a> std::fmt::Debug for RingContext<'a> {
878    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
879        f.debug_struct("RingContext")
880            .field("thread_id", &self.thread_id)
881            .field("block_id", &self.block_id)
882            .field("block_dim", &self.block_dim)
883            .field("grid_dim", &self.grid_dim)
884            .field("kernel_id", &self.kernel_id)
885            .field("backend", &self.backend)
886            .finish()
887    }
888}
889
890#[cfg(test)]
891mod tests {
892    use super::*;
893
894    fn make_test_context(clock: &HlcClock) -> RingContext<'_> {
895        RingContext::new(
896            ThreadId::new_1d(0),
897            BlockId::new_1d(0),
898            Dim3::new_1d(256),
899            Dim3::new_1d(1),
900            clock,
901            1,
902            ContextBackend::Cpu,
903        )
904    }
905
906    #[test]
907    fn test_thread_identity() {
908        let clock = HlcClock::new(1);
909        let ctx = make_test_context(&clock);
910
911        assert_eq!(ctx.thread_id().x, 0);
912        assert_eq!(ctx.block_id().x, 0);
913        assert_eq!(ctx.global_thread_id().x, 0);
914    }
915
916    #[test]
917    fn test_warp_id() {
918        let clock = HlcClock::new(1);
919        let ctx = RingContext::new(
920            ThreadId::new_1d(35), // Thread 35 is in warp 1, lane 3
921            BlockId::new_1d(0),
922            Dim3::new_1d(256),
923            Dim3::new_1d(1),
924            &clock,
925            1,
926            ContextBackend::Cpu,
927        );
928
929        assert_eq!(ctx.warp_id().0, 1);
930        assert_eq!(ctx.lane_id(), 3);
931    }
932
933    #[test]
934    fn test_hlc_operations() {
935        let clock = HlcClock::new(1);
936        let ctx = make_test_context(&clock);
937
938        let ts1 = ctx.now();
939        let ts2 = ctx.tick();
940        assert!(ts2 >= ts1);
941    }
942
943    #[test]
944    fn test_warp_ballot_cpu() {
945        let clock = HlcClock::new(1);
946        let ctx = make_test_context(&clock);
947
948        assert_eq!(ctx.warp_ballot(true), 1);
949        assert_eq!(ctx.warp_ballot(false), 0);
950    }
951
952    // === FR-2 Tests ===
953
954    #[test]
955    fn test_domain_operations() {
956        let clock = HlcClock::new(1);
957        let mut ctx = make_test_context(&clock);
958
959        // Initially no domain
960        assert!(ctx.domain().is_none());
961
962        // Set domain
963        ctx.set_domain(Domain::OrderMatching);
964        assert_eq!(ctx.domain(), Some(&Domain::OrderMatching));
965
966        // Clear domain
967        ctx.clear_domain();
968        assert!(ctx.domain().is_none());
969    }
970
971    #[test]
972    fn test_context_with_domain() {
973        let clock = HlcClock::new(1);
974        let ctx = RingContext::new_with_options(
975            ThreadId::new_1d(0),
976            BlockId::new_1d(0),
977            Dim3::new_1d(256),
978            Dim3::new_1d(1),
979            &clock,
980            42,
981            ContextBackend::Cpu,
982            Some(Domain::RiskManagement),
983            128,
984            None,
985        );
986
987        assert_eq!(ctx.domain(), Some(&Domain::RiskManagement));
988        assert_eq!(ctx.kernel_id(), 42);
989    }
990
991    #[test]
992    fn test_metrics_buffer() {
993        let mut buffer = ContextMetricsBuffer::new(3);
994
995        assert!(buffer.is_empty());
996        assert!(!buffer.is_full());
997        assert_eq!(buffer.len(), 0);
998
999        let entry = MetricsEntry {
1000            operation: "test".to_string(),
1001            metric_type: MetricType::Latency,
1002            value: 100,
1003            timestamp: HlcTimestamp::zero(),
1004            kernel_id: 1,
1005            domain: None,
1006        };
1007
1008        buffer.record(entry.clone());
1009        assert_eq!(buffer.len(), 1);
1010
1011        buffer.record(entry.clone());
1012        buffer.record(entry.clone());
1013        assert!(buffer.is_full());
1014
1015        // Drain returns all entries
1016        let entries = buffer.drain();
1017        assert_eq!(entries.len(), 3);
1018        assert!(buffer.is_empty());
1019    }
1020
1021    #[test]
1022    fn test_record_metrics() {
1023        let clock = HlcClock::new(1);
1024        let mut ctx = RingContext::new_with_options(
1025            ThreadId::new_1d(0),
1026            BlockId::new_1d(0),
1027            Dim3::new_1d(256),
1028            Dim3::new_1d(1),
1029            &clock,
1030            100,
1031            ContextBackend::Cpu,
1032            Some(Domain::Compliance),
1033            256,
1034            None,
1035        );
1036
1037        ctx.record_latency("process_order", 500);
1038        ctx.record_throughput("orders_per_sec", 1000);
1039        ctx.record_counter("total_orders", 1);
1040        ctx.record_gauge("queue_depth", 42);
1041
1042        assert_eq!(ctx.metrics_count(), 4);
1043
1044        let metrics = ctx.flush_metrics();
1045        assert_eq!(metrics.len(), 4);
1046
1047        // Verify entries
1048        assert_eq!(metrics[0].operation, "process_order");
1049        assert_eq!(metrics[0].metric_type, MetricType::Latency);
1050        assert_eq!(metrics[0].value, 500);
1051        assert_eq!(metrics[0].kernel_id, 100);
1052        assert_eq!(metrics[0].domain, Some(Domain::Compliance));
1053
1054        assert_eq!(metrics[1].metric_type, MetricType::Throughput);
1055        assert_eq!(metrics[2].metric_type, MetricType::Counter);
1056        assert_eq!(metrics[3].metric_type, MetricType::Gauge);
1057        assert_eq!(metrics[3].value, 42);
1058
1059        // After flush, buffer is empty
1060        assert_eq!(ctx.metrics_count(), 0);
1061    }
1062
1063    #[test]
1064    fn test_kernel_alert_constructors() {
1065        let alert = KernelAlert::high_latency("Slow", 500);
1066        assert_eq!(alert.severity, AlertSeverity::Warning);
1067        assert_eq!(alert.alert_type, KernelAlertType::HighLatency);
1068        assert!(alert.message.contains("500µs"));
1069
1070        let alert = KernelAlert::error("Failed");
1071        assert_eq!(alert.severity, AlertSeverity::Error);
1072        assert_eq!(alert.alert_type, KernelAlertType::ProcessingError);
1073
1074        let alert = KernelAlert::queue_pressure("Input queue", 85);
1075        assert_eq!(alert.alert_type, KernelAlertType::QueuePressure);
1076        assert!(alert.message.contains("85%"));
1077
1078        let alert = KernelAlert::new(
1079            AlertSeverity::Critical,
1080            KernelAlertType::Custom(42),
1081            "Custom alert",
1082        )
1083        .with_routing(AlertRouting::External);
1084        assert_eq!(alert.severity, AlertSeverity::Critical);
1085        assert!(matches!(alert.routing, AlertRouting::External));
1086    }
1087
1088    #[test]
1089    fn test_emit_alert_with_channel() {
1090        let (tx, mut rx) = mpsc::unbounded_channel();
1091        let clock = HlcClock::new(1);
1092        let ctx = RingContext::new_with_options(
1093            ThreadId::new_1d(0),
1094            BlockId::new_1d(0),
1095            Dim3::new_1d(256),
1096            Dim3::new_1d(1),
1097            &clock,
1098            42,
1099            ContextBackend::Cpu,
1100            Some(Domain::OrderMatching),
1101            256,
1102            Some(tx),
1103        );
1104
1105        assert!(ctx.has_alert_channel());
1106        ctx.emit_alert(KernelAlert::error("Test error"));
1107
1108        // Receive the alert
1109        let alert = rx.try_recv().expect("Should receive alert");
1110        assert_eq!(alert.source_kernel, 42);
1111        assert_eq!(alert.source_domain, Some(Domain::OrderMatching));
1112        assert_eq!(alert.alert_type, KernelAlertType::ProcessingError);
1113    }
1114
1115    #[test]
1116    fn test_emit_alert_without_channel() {
1117        let clock = HlcClock::new(1);
1118        let ctx = make_test_context(&clock);
1119
1120        assert!(!ctx.has_alert_channel());
1121        // Should not panic when no channel configured
1122        ctx.emit_alert(KernelAlert::error("No-op"));
1123    }
1124
1125    #[test]
1126    fn test_alert_if_slow() {
1127        let (tx, mut rx) = mpsc::unbounded_channel();
1128        let clock = HlcClock::new(1);
1129        let ctx = RingContext::new_with_options(
1130            ThreadId::new_1d(0),
1131            BlockId::new_1d(0),
1132            Dim3::new_1d(256),
1133            Dim3::new_1d(1),
1134            &clock,
1135            1,
1136            ContextBackend::Cpu,
1137            None,
1138            256,
1139            Some(tx),
1140        );
1141
1142        // Below threshold - no alert
1143        ctx.alert_if_slow("fast_op", 50, 100);
1144        assert!(rx.try_recv().is_err());
1145
1146        // Above threshold - alert sent
1147        ctx.alert_if_slow("slow_op", 150, 100);
1148        let alert = rx.try_recv().expect("Should receive alert");
1149        assert!(alert.message.contains("slow_op"));
1150        assert!(alert.message.contains("150µs"));
1151    }
1152
1153    #[test]
1154    fn test_alert_severity_ordering() {
1155        assert!(AlertSeverity::Info < AlertSeverity::Warning);
1156        assert!(AlertSeverity::Warning < AlertSeverity::Error);
1157        assert!(AlertSeverity::Error < AlertSeverity::Critical);
1158    }
1159}