Skip to main content

ringkernel_core/
introspection.rs

1//! Per-Actor Introspection API — FR-015
2//!
3//! Runtime actor inspection for debugging and monitoring:
4//! - List all actors with state, queue depth, message rate
5//! - Inspect per-actor metrics (read-only)
6//! - Peek at queued messages
7//! - Trace recent message processing with timing
8//!
9//! # Streaming Introspection (v1.1)
10//!
11//! Live introspection streaming for sub-millisecond metric freshness:
12//! - [`IntrospectionStream`] — subscribe to periodic metric emissions per-actor
13//! - [`MetricAggregator`] — EWMA rate calculation and latency histogram
14//! - [`LiveMetrics`] — the observation record emitted to subscribers
15//! - Wire formats ([`SubscribeMetricsRequest`], [`LiveMetricsEvent`]) for GPU/CPU bridge
16
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use parking_lot::RwLock;
23use tokio::sync::mpsc;
24
25use crate::actor::{ActorId, ActorState};
26use crate::hlc::HlcTimestamp;
27
28/// Snapshot of a single actor's state for introspection.
29#[derive(Debug, Clone)]
30pub struct ActorSnapshot {
31    /// Actor identifier.
32    pub id: ActorId,
33    /// Human-readable name (from registry, if registered).
34    pub name: Option<String>,
35    /// Current state.
36    pub state: ActorState,
37    /// Parent actor (if supervised).
38    pub parent: Option<ActorId>,
39    /// Children (if any).
40    pub children: Vec<ActorId>,
41    /// Queue metrics.
42    pub queue: QueueSnapshot,
43    /// Performance metrics.
44    pub performance: PerformanceSnapshot,
45    /// When this snapshot was taken.
46    pub snapshot_at: Instant,
47}
48
49/// Snapshot of an actor's queue state.
50#[derive(Debug, Clone, Default)]
51pub struct QueueSnapshot {
52    /// Current input queue depth.
53    pub input_depth: u32,
54    /// Current output queue depth.
55    pub output_depth: u32,
56    /// Input queue capacity.
57    pub input_capacity: u32,
58    /// Output queue capacity.
59    pub output_capacity: u32,
60    /// Queue pressure (0-255).
61    pub pressure: u8,
62}
63
64impl QueueSnapshot {
65    /// Input queue utilization (0.0 - 1.0).
66    pub fn input_utilization(&self) -> f64 {
67        if self.input_capacity == 0 {
68            return 0.0;
69        }
70        self.input_depth as f64 / self.input_capacity as f64
71    }
72}
73
74/// Snapshot of an actor's performance metrics.
75#[derive(Debug, Clone, Default)]
76pub struct PerformanceSnapshot {
77    /// Total messages processed (lifetime).
78    pub messages_processed: u64,
79    /// Messages processed per second (recent window).
80    pub messages_per_second: f64,
81    /// Average processing latency per message.
82    pub avg_latency: Duration,
83    /// Maximum processing latency observed.
84    pub max_latency: Duration,
85    /// Number of restarts.
86    pub restart_count: u32,
87    /// Uptime since last restart.
88    pub uptime: Duration,
89}
90
91/// A recent message processing trace entry.
92#[derive(Debug, Clone)]
93pub struct TraceEntry {
94    /// Message sequence number.
95    pub sequence: u64,
96    /// When the message was received.
97    pub received_at: Instant,
98    /// Processing duration.
99    pub duration: Duration,
100    /// Source actor (if K2K).
101    pub source: Option<ActorId>,
102    /// Outcome.
103    pub outcome: TraceOutcome,
104}
105
106/// Outcome of a traced message.
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub enum TraceOutcome {
109    /// Successfully processed.
110    Success,
111    /// Processing failed.
112    Failed(String),
113    /// Message was forwarded to another actor.
114    Forwarded(ActorId),
115    /// Message was dropped (e.g., filtered).
116    Dropped,
117}
118
119/// Per-actor trace buffer (ring buffer of recent processing traces).
120pub struct TraceBuffer {
121    entries: Vec<TraceEntry>,
122    capacity: usize,
123    write_pos: usize,
124    total: u64,
125}
126
127impl TraceBuffer {
128    /// Create a new trace buffer.
129    pub fn new(capacity: usize) -> Self {
130        Self {
131            entries: Vec::with_capacity(capacity),
132            capacity,
133            write_pos: 0,
134            total: 0,
135        }
136    }
137
138    /// Record a trace entry.
139    pub fn record(&mut self, entry: TraceEntry) {
140        if self.entries.len() < self.capacity {
141            self.entries.push(entry);
142        } else {
143            self.entries[self.write_pos] = entry;
144        }
145        self.write_pos = (self.write_pos + 1) % self.capacity;
146        self.total += 1;
147    }
148
149    /// Get recent trace entries (most recent first).
150    pub fn recent(&self, limit: usize) -> Vec<&TraceEntry> {
151        let mut result: Vec<&TraceEntry> = self.entries.iter().collect();
152        result.sort_by_key(|e| std::cmp::Reverse(e.received_at));
153        result.truncate(limit);
154        result
155    }
156
157    /// Total entries recorded (lifetime).
158    pub fn total_recorded(&self) -> u64 {
159        self.total
160    }
161
162    /// Current buffer size.
163    pub fn len(&self) -> usize {
164        self.entries.len()
165    }
166
167    /// Check if empty.
168    pub fn is_empty(&self) -> bool {
169        self.entries.is_empty()
170    }
171}
172
173/// Introspection service that aggregates data from all actors.
174pub struct IntrospectionService {
175    /// Per-actor trace buffers.
176    traces: HashMap<ActorId, TraceBuffer>,
177    /// Default trace buffer capacity.
178    trace_capacity: usize,
179}
180
181impl IntrospectionService {
182    /// Create a new introspection service.
183    pub fn new(trace_capacity: usize) -> Self {
184        Self {
185            traces: HashMap::new(),
186            trace_capacity,
187        }
188    }
189
190    /// Register an actor for tracing.
191    pub fn register_actor(&mut self, id: ActorId) {
192        self.traces
193            .entry(id)
194            .or_insert_with(|| TraceBuffer::new(self.trace_capacity));
195    }
196
197    /// Record a trace entry for an actor.
198    pub fn record_trace(&mut self, actor: ActorId, entry: TraceEntry) {
199        self.traces
200            .entry(actor)
201            .or_insert_with(|| TraceBuffer::new(self.trace_capacity))
202            .record(entry);
203    }
204
205    /// Get recent traces for an actor.
206    pub fn get_traces(&self, actor: ActorId, limit: usize) -> Vec<&TraceEntry> {
207        self.traces
208            .get(&actor)
209            .map(|buf| buf.recent(limit))
210            .unwrap_or_default()
211    }
212
213    /// Deregister an actor.
214    pub fn deregister_actor(&mut self, id: ActorId) {
215        self.traces.remove(&id);
216    }
217
218    /// Number of traced actors.
219    pub fn actor_count(&self) -> usize {
220        self.traces.len()
221    }
222}
223
224impl Default for IntrospectionService {
225    fn default() -> Self {
226        Self::new(100)
227    }
228}
229
230// =============================================================================
231// Streaming Introspection (v1.1 spec §3.2)
232// =============================================================================
233
234/// Default EWMA smoothing factor for rate calculations.
235///
236/// Alpha = 0.2 gives moderate smoothing (newest sample weighted 20%, history 80%).
237pub const DEFAULT_EWMA_ALPHA: f64 = 0.2;
238
239/// Capacity of the [`LatencyHistogram`] ring buffer.
240pub const LATENCY_HISTOGRAM_CAPACITY: usize = 1024;
241
242/// Live per-actor metrics delivered to subscribers.
243///
244/// A `LiveMetrics` observation represents a point-in-time snapshot of an
245/// actor's operational state, emitted periodically (per the subscriber's
246/// configured interval) by a backend or aggregator.
247#[derive(Debug, Clone)]
248pub struct LiveMetrics {
249    /// Actor whose metrics are being reported.
250    pub actor_id: ActorId,
251    /// HLC timestamp for this observation (causal ordering across nodes).
252    pub timestamp: HlcTimestamp,
253    /// Current inbound queue depth.
254    pub queue_depth: usize,
255    /// Inbound message rate (messages/sec), EWMA-smoothed.
256    pub inbound_rate: f64,
257    /// Outbound message rate (messages/sec), EWMA-smoothed.
258    pub outbound_rate: f64,
259    /// Observed p50 processing latency.
260    pub latency_p50: Duration,
261    /// Observed p99 processing latency.
262    pub latency_p99: Duration,
263    /// Resident actor state size in bytes.
264    pub state_size_bytes: u64,
265    /// GPU utilization (0.0–1.0). Always 0.0 on the CPU backend.
266    pub gpu_utilization: f32,
267    /// Tenant this actor belongs to (0 = unspecified).
268    pub tenant_id: u64,
269}
270
271/// A handle to a single live-metrics subscription.
272///
273/// Wraps the sender half of a tokio unbounded MPSC channel along with the
274/// subscriber's desired emission interval. The interval is stored so the
275/// [`IntrospectionStream`] can honor per-subscriber cadence when dispatching.
276pub struct SubscriberHandle {
277    /// Desired emission interval for this subscriber.
278    pub interval: Duration,
279    /// Last time we forwarded a metric to this subscriber.
280    last_sent_at: parking_lot::Mutex<Option<Instant>>,
281    /// Unique subscription identifier (for wire-protocol correlation and teardown).
282    pub subscription_id: u64,
283    sender: mpsc::UnboundedSender<LiveMetrics>,
284}
285
286impl SubscriberHandle {
287    /// Subscription identifier.
288    pub fn subscription_id(&self) -> u64 {
289        self.subscription_id
290    }
291
292    /// Return true if the receiver has been dropped.
293    pub fn is_closed(&self) -> bool {
294        self.sender.is_closed()
295    }
296
297    /// Try to deliver a metric, respecting the subscriber's interval gate.
298    ///
299    /// Returns `Ok(true)` if delivered, `Ok(false)` if suppressed by the
300    /// interval gate, and `Err(())` if the receiver has dropped (the caller
301    /// should discard the handle).
302    fn try_send(&self, metrics: LiveMetrics) -> std::result::Result<bool, ()> {
303        let now = Instant::now();
304        {
305            let mut last = self.last_sent_at.lock();
306            if let Some(prev) = *last {
307                if now.duration_since(prev) < self.interval {
308                    return Ok(false);
309                }
310            }
311            *last = Some(now);
312        }
313        self.sender.send(metrics).map(|_| true).map_err(|_| ())
314    }
315}
316
317/// A ring-buffer latency histogram for p50/p99 percentile computation.
318///
319/// Stores up to [`LATENCY_HISTOGRAM_CAPACITY`] recent samples. Percentiles
320/// are computed by copying and sorting the live samples on demand (this is
321/// intentional — HDR histogram is overkill for v1.1).
322#[derive(Debug, Clone)]
323pub struct LatencyHistogram {
324    samples: [Duration; LATENCY_HISTOGRAM_CAPACITY],
325    idx: usize,
326    count: u64,
327}
328
329impl LatencyHistogram {
330    /// Create an empty histogram.
331    pub fn new() -> Self {
332        Self {
333            samples: [Duration::ZERO; LATENCY_HISTOGRAM_CAPACITY],
334            idx: 0,
335            count: 0,
336        }
337    }
338
339    /// Record a latency sample.
340    pub fn record(&mut self, d: Duration) {
341        self.samples[self.idx] = d;
342        self.idx = (self.idx + 1) % LATENCY_HISTOGRAM_CAPACITY;
343        self.count = self.count.saturating_add(1);
344    }
345
346    /// Number of samples recorded (lifetime).
347    pub fn total_recorded(&self) -> u64 {
348        self.count
349    }
350
351    /// Number of live samples currently in the ring buffer.
352    pub fn live_samples(&self) -> usize {
353        if (self.count as usize) < LATENCY_HISTOGRAM_CAPACITY {
354            self.count as usize
355        } else {
356            LATENCY_HISTOGRAM_CAPACITY
357        }
358    }
359
360    /// Compute the p50 (median) latency. Returns `Duration::ZERO` if empty.
361    pub fn p50(&self) -> Duration {
362        self.percentile(0.50)
363    }
364
365    /// Compute the p99 latency. Returns `Duration::ZERO` if empty.
366    pub fn p99(&self) -> Duration {
367        self.percentile(0.99)
368    }
369
370    /// Compute an arbitrary percentile in `[0.0, 1.0]`.
371    pub fn percentile(&self, p: f64) -> Duration {
372        let n = self.live_samples();
373        if n == 0 {
374            return Duration::ZERO;
375        }
376        let mut buf: Vec<Duration> = self.samples[..n].to_vec();
377        buf.sort_unstable();
378        let p = p.clamp(0.0, 1.0);
379        // Nearest-rank: ceil(p * n) - 1, clamped to [0, n-1].
380        let rank = ((p * n as f64).ceil() as usize)
381            .saturating_sub(1)
382            .min(n - 1);
383        buf[rank]
384    }
385}
386
387impl Default for LatencyHistogram {
388    fn default() -> Self {
389        Self::new()
390    }
391}
392
393/// Per-actor mutable state tracked by [`MetricAggregator`].
394#[derive(Debug)]
395struct ActorMetricState {
396    last_sample_at: Instant,
397    inbound_ewma: f64,
398    outbound_ewma: f64,
399    inbound_count: u64,
400    outbound_count: u64,
401    // Delta counters since the last EWMA update.
402    inbound_delta: u64,
403    outbound_delta: u64,
404    latency_histogram: LatencyHistogram,
405    state_size_bytes: u64,
406    queue_depth: usize,
407    gpu_utilization: f32,
408    tenant_id: u64,
409    hlc_node_id: u64,
410    initialized: bool,
411}
412
413impl ActorMetricState {
414    fn new(hlc_node_id: u64) -> Self {
415        Self {
416            last_sample_at: Instant::now(),
417            inbound_ewma: 0.0,
418            outbound_ewma: 0.0,
419            inbound_count: 0,
420            outbound_count: 0,
421            inbound_delta: 0,
422            outbound_delta: 0,
423            latency_histogram: LatencyHistogram::new(),
424            state_size_bytes: 0,
425            queue_depth: 0,
426            gpu_utilization: 0.0,
427            tenant_id: 0,
428            hlc_node_id,
429            initialized: false,
430        }
431    }
432}
433
434/// Aggregates per-actor metric counters and produces smoothed snapshots.
435///
436/// The aggregator is the CPU-side single source of truth for streaming
437/// metrics. Backends (CPU dispatcher, CUDA K2H processor) feed it raw
438/// counters via [`record_inbound`], [`record_outbound`], and
439/// [`record_latency`]; subscribers consume smoothed snapshots via
440/// [`snapshot`] / [`snapshot_all`].
441///
442/// The aggregator is `Send + Sync`. Internal state is guarded by
443/// `parking_lot::RwLock`.
444///
445/// [`record_inbound`]: MetricAggregator::record_inbound
446/// [`record_outbound`]: MetricAggregator::record_outbound
447/// [`record_latency`]: MetricAggregator::record_latency
448/// [`snapshot`]: MetricAggregator::snapshot
449/// [`snapshot_all`]: MetricAggregator::snapshot_all
450pub struct MetricAggregator {
451    per_actor: RwLock<HashMap<ActorId, ActorMetricState>>,
452    ewma_alpha: f64,
453    hlc_node_id: u64,
454}
455
456impl MetricAggregator {
457    /// Create an aggregator with the default EWMA alpha ([`DEFAULT_EWMA_ALPHA`]).
458    pub fn new() -> Self {
459        Self::with_alpha(DEFAULT_EWMA_ALPHA)
460    }
461
462    /// Create an aggregator with a custom EWMA smoothing factor.
463    ///
464    /// `alpha` is clamped to `(0.0, 1.0]`.
465    pub fn with_alpha(alpha: f64) -> Self {
466        let ewma_alpha = if alpha.is_finite() && alpha > 0.0 && alpha <= 1.0 {
467            alpha
468        } else {
469            DEFAULT_EWMA_ALPHA
470        };
471        Self {
472            per_actor: RwLock::new(HashMap::new()),
473            ewma_alpha,
474            hlc_node_id: 0,
475        }
476    }
477
478    /// Configure the HLC node identifier used when stamping snapshots.
479    pub fn with_hlc_node_id(mut self, node_id: u64) -> Self {
480        self.hlc_node_id = node_id;
481        self
482    }
483
484    /// Current EWMA smoothing factor.
485    pub fn ewma_alpha(&self) -> f64 {
486        self.ewma_alpha
487    }
488
489    /// Record `count` inbound messages for `actor_id`.
490    pub fn record_inbound(&self, actor_id: ActorId, count: u64) {
491        let mut guard = self.per_actor.write();
492        let state = guard
493            .entry(actor_id)
494            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
495        state.inbound_count = state.inbound_count.saturating_add(count);
496        state.inbound_delta = state.inbound_delta.saturating_add(count);
497    }
498
499    /// Record `count` outbound messages for `actor_id`.
500    pub fn record_outbound(&self, actor_id: ActorId, count: u64) {
501        let mut guard = self.per_actor.write();
502        let state = guard
503            .entry(actor_id)
504            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
505        state.outbound_count = state.outbound_count.saturating_add(count);
506        state.outbound_delta = state.outbound_delta.saturating_add(count);
507    }
508
509    /// Record an observed processing latency sample for `actor_id`.
510    pub fn record_latency(&self, actor_id: ActorId, d: Duration) {
511        let mut guard = self.per_actor.write();
512        let state = guard
513            .entry(actor_id)
514            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
515        state.latency_histogram.record(d);
516    }
517
518    /// Set the current queue depth for `actor_id`.
519    pub fn set_queue_depth(&self, actor_id: ActorId, depth: usize) {
520        let mut guard = self.per_actor.write();
521        let state = guard
522            .entry(actor_id)
523            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
524        state.queue_depth = depth;
525    }
526
527    /// Set the current resident state size for `actor_id`.
528    pub fn set_state_size(&self, actor_id: ActorId, bytes: u64) {
529        let mut guard = self.per_actor.write();
530        let state = guard
531            .entry(actor_id)
532            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
533        state.state_size_bytes = bytes;
534    }
535
536    /// Set the current GPU utilization for `actor_id`. Values outside `[0.0, 1.0]` are clamped.
537    pub fn set_gpu_utilization(&self, actor_id: ActorId, util: f32) {
538        let util = util.clamp(0.0, 1.0);
539        let mut guard = self.per_actor.write();
540        let state = guard
541            .entry(actor_id)
542            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
543        state.gpu_utilization = util;
544    }
545
546    /// Associate a tenant with `actor_id`.
547    pub fn set_tenant(&self, actor_id: ActorId, tenant_id: u64) {
548        let mut guard = self.per_actor.write();
549        let state = guard
550            .entry(actor_id)
551            .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
552        state.tenant_id = tenant_id;
553    }
554
555    /// Remove an actor's state. Returns true if the actor was tracked.
556    pub fn remove_actor(&self, actor_id: &ActorId) -> bool {
557        self.per_actor.write().remove(actor_id).is_some()
558    }
559
560    /// Number of actors currently tracked.
561    pub fn tracked_actors(&self) -> usize {
562        self.per_actor.read().len()
563    }
564
565    /// Produce a smoothed snapshot for `actor_id`, updating EWMA rates.
566    ///
567    /// Returns `None` if the actor is unknown.
568    pub fn snapshot(&self, actor_id: &ActorId) -> Option<LiveMetrics> {
569        let mut guard = self.per_actor.write();
570        let state = guard.get_mut(actor_id)?;
571        let (p50, p99) = (state.latency_histogram.p50(), state.latency_histogram.p99());
572        let metrics = Self::fold_snapshot(*actor_id, state, self.ewma_alpha, p50, p99);
573        Some(metrics)
574    }
575
576    /// Produce smoothed snapshots for all tracked actors.
577    pub fn snapshot_all(&self) -> Vec<LiveMetrics> {
578        let mut guard = self.per_actor.write();
579        let alpha = self.ewma_alpha;
580        let mut out = Vec::with_capacity(guard.len());
581        for (id, state) in guard.iter_mut() {
582            let (p50, p99) = (state.latency_histogram.p50(), state.latency_histogram.p99());
583            out.push(Self::fold_snapshot(*id, state, alpha, p50, p99));
584        }
585        out
586    }
587
588    fn fold_snapshot(
589        actor_id: ActorId,
590        state: &mut ActorMetricState,
591        alpha: f64,
592        p50: Duration,
593        p99: Duration,
594    ) -> LiveMetrics {
595        let now = Instant::now();
596        let elapsed = now.duration_since(state.last_sample_at).as_secs_f64();
597
598        // Compute per-sample rate from the counter deltas since the last
599        // snapshot. Guard against zero elapsed time (back-to-back snapshots).
600        let inbound_rate_sample = if elapsed > 0.0 {
601            state.inbound_delta as f64 / elapsed
602        } else {
603            0.0
604        };
605        let outbound_rate_sample = if elapsed > 0.0 {
606            state.outbound_delta as f64 / elapsed
607        } else {
608            0.0
609        };
610
611        if !state.initialized {
612            // Seed the EWMA with the first real observation so we don't bias
613            // toward zero for many intervals.
614            state.inbound_ewma = inbound_rate_sample;
615            state.outbound_ewma = outbound_rate_sample;
616            state.initialized = true;
617        } else {
618            state.inbound_ewma = alpha * inbound_rate_sample + (1.0 - alpha) * state.inbound_ewma;
619            state.outbound_ewma =
620                alpha * outbound_rate_sample + (1.0 - alpha) * state.outbound_ewma;
621        }
622
623        state.inbound_delta = 0;
624        state.outbound_delta = 0;
625        state.last_sample_at = now;
626
627        let physical = std::time::SystemTime::now()
628            .duration_since(std::time::UNIX_EPOCH)
629            .map(|d| d.as_micros() as u64)
630            .unwrap_or(0);
631        let timestamp = HlcTimestamp::new(physical, 0, state.hlc_node_id);
632
633        LiveMetrics {
634            actor_id,
635            timestamp,
636            queue_depth: state.queue_depth,
637            inbound_rate: state.inbound_ewma,
638            outbound_rate: state.outbound_ewma,
639            latency_p50: p50,
640            latency_p99: p99,
641            state_size_bytes: state.state_size_bytes,
642            gpu_utilization: state.gpu_utilization,
643            tenant_id: state.tenant_id,
644        }
645    }
646}
647
648impl Default for MetricAggregator {
649    fn default() -> Self {
650        Self::new()
651    }
652}
653
654impl std::fmt::Debug for MetricAggregator {
655    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
656        f.debug_struct("MetricAggregator")
657            .field("ewma_alpha", &self.ewma_alpha)
658            .field("hlc_node_id", &self.hlc_node_id)
659            .field("tracked_actors", &self.tracked_actors())
660            .finish()
661    }
662}
663
664/// Streaming live-metric dispatcher with per-actor subscriptions.
665///
666/// Backends call [`emit`] periodically with fresh [`LiveMetrics`]; the
667/// stream fans out to all subscribers registered for the metric's actor
668/// (plus any "all-actors" subscribers). Subscribers receive metrics via
669/// tokio unbounded MPSC channels, so dispatch is non-blocking.
670///
671/// Broken receivers (dropped by the subscriber) are detected on the next
672/// dispatch attempt and auto-pruned.
673///
674/// [`emit`]: IntrospectionStream::emit
675pub struct IntrospectionStream {
676    /// Per-actor subscribers. `ActorId::MAX` sentinel below is *not* used;
677    /// global subscribers live in a dedicated bucket.
678    subscriptions: RwLock<HashMap<ActorId, Vec<Arc<SubscriberHandle>>>>,
679    /// Subscribers that want every actor's metrics.
680    global_subscriptions: RwLock<Vec<Arc<SubscriberHandle>>>,
681    /// Aggregator that computes snapshots (optional — callers may emit
682    /// directly for GPU-sourced metrics).
683    aggregator: Arc<MetricAggregator>,
684    /// Monotonic counter for subscription IDs.
685    next_subscription_id: AtomicU64,
686}
687
688impl IntrospectionStream {
689    /// Create a new introspection stream with a fresh aggregator.
690    pub fn new() -> Self {
691        Self::with_aggregator(Arc::new(MetricAggregator::new()))
692    }
693
694    /// Create a stream bound to an existing aggregator.
695    pub fn with_aggregator(aggregator: Arc<MetricAggregator>) -> Self {
696        Self {
697            subscriptions: RwLock::new(HashMap::new()),
698            global_subscriptions: RwLock::new(Vec::new()),
699            aggregator,
700            next_subscription_id: AtomicU64::new(1),
701        }
702    }
703
704    /// Access the underlying aggregator (shared via `Arc`).
705    pub fn aggregator(&self) -> Arc<MetricAggregator> {
706        self.aggregator.clone()
707    }
708
709    /// Subscribe to metrics for one actor at the given interval.
710    ///
711    /// Returns the receiver half of a tokio unbounded MPSC channel. The
712    /// sender lives inside a [`SubscriberHandle`] held by the stream; when
713    /// the caller drops the returned receiver, the handle becomes closed
714    /// and is pruned on the next dispatch.
715    ///
716    /// An interval of `Duration::ZERO` is equivalent to unsubscribing — in
717    /// that case this function returns a receiver whose sender has already
718    /// been dropped.
719    pub fn subscribe(
720        &self,
721        actor_id: ActorId,
722        interval: Duration,
723    ) -> mpsc::UnboundedReceiver<LiveMetrics> {
724        let (tx, rx) = mpsc::unbounded_channel();
725        if interval.is_zero() {
726            // "interval = 0" means unsubscribe per spec. Drop tx so the
727            // receiver reports closed immediately, and proactively remove
728            // any existing subscriptions for this actor.
729            drop(tx);
730            self.unsubscribe(actor_id);
731            return rx;
732        }
733        let handle = Arc::new(SubscriberHandle {
734            interval,
735            last_sent_at: parking_lot::Mutex::new(None),
736            subscription_id: self.next_subscription_id.fetch_add(1, Ordering::Relaxed),
737            sender: tx,
738        });
739        self.subscriptions
740            .write()
741            .entry(actor_id)
742            .or_default()
743            .push(handle);
744        rx
745    }
746
747    /// Subscribe to metrics for every actor.
748    ///
749    /// `interval` of `Duration::ZERO` is equivalent to an immediate no-op
750    /// (returns a closed receiver).
751    pub fn subscribe_all(&self, interval: Duration) -> mpsc::UnboundedReceiver<LiveMetrics> {
752        let (tx, rx) = mpsc::unbounded_channel();
753        if interval.is_zero() {
754            drop(tx);
755            return rx;
756        }
757        let handle = Arc::new(SubscriberHandle {
758            interval,
759            last_sent_at: parking_lot::Mutex::new(None),
760            subscription_id: self.next_subscription_id.fetch_add(1, Ordering::Relaxed),
761            sender: tx,
762        });
763        self.global_subscriptions.write().push(handle);
764        rx
765    }
766
767    /// Drop every subscription for `actor_id`.
768    pub fn unsubscribe(&self, actor_id: ActorId) {
769        self.subscriptions.write().remove(&actor_id);
770    }
771
772    /// Drop every global "all-actors" subscription.
773    pub fn unsubscribe_all(&self) {
774        self.global_subscriptions.write().clear();
775    }
776
777    /// Emit a metric to all applicable subscribers.
778    ///
779    /// Called by backends (CPU dispatcher) or K2H processor (CUDA). Any
780    /// subscribers whose receiver has been dropped are auto-removed during
781    /// dispatch. Interval gating is applied per-subscriber, so a fast
782    /// producer with slow subscribers will not spam them.
783    pub fn emit(&self, metrics: LiveMetrics) {
784        // Dispatch to per-actor subscribers.
785        let actor_id = metrics.actor_id;
786        {
787            let mut guard = self.subscriptions.write();
788            if let Some(subs) = guard.get_mut(&actor_id) {
789                Self::dispatch_and_prune(subs, &metrics);
790                if subs.is_empty() {
791                    guard.remove(&actor_id);
792                }
793            }
794        }
795        // Dispatch to global subscribers.
796        {
797            let mut guard = self.global_subscriptions.write();
798            Self::dispatch_and_prune(&mut guard, &metrics);
799        }
800    }
801
802    fn dispatch_and_prune(subs: &mut Vec<Arc<SubscriberHandle>>, metrics: &LiveMetrics) {
803        subs.retain(|handle| {
804            if handle.is_closed() {
805                return false;
806            }
807            match handle.try_send(metrics.clone()) {
808                Ok(_) => true,
809                Err(()) => false,
810            }
811        });
812    }
813
814    /// Number of active subscribers for one actor.
815    pub fn subscriber_count(&self, actor_id: &ActorId) -> usize {
816        self.subscriptions
817            .read()
818            .get(actor_id)
819            .map(|v| v.iter().filter(|h| !h.is_closed()).count())
820            .unwrap_or(0)
821    }
822
823    /// Number of active "all-actors" subscribers.
824    pub fn global_subscriber_count(&self) -> usize {
825        self.global_subscriptions
826            .read()
827            .iter()
828            .filter(|h| !h.is_closed())
829            .count()
830    }
831
832    /// Total number of active subscribers across all buckets.
833    pub fn total_subscribers(&self) -> usize {
834        let per_actor: usize = self
835            .subscriptions
836            .read()
837            .values()
838            .map(|v| v.iter().filter(|h| !h.is_closed()).count())
839            .sum();
840        per_actor + self.global_subscriber_count()
841    }
842
843    /// Run a full aggregation cycle: snapshot every tracked actor and emit
844    /// the results. Useful for drivers that want a single "tick" call.
845    pub fn tick(&self) -> usize {
846        let snapshots = self.aggregator.snapshot_all();
847        let n = snapshots.len();
848        for metrics in snapshots {
849            self.emit(metrics);
850        }
851        n
852    }
853}
854
855impl Default for IntrospectionStream {
856    fn default() -> Self {
857        Self::new()
858    }
859}
860
861impl std::fmt::Debug for IntrospectionStream {
862    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
863        f.debug_struct("IntrospectionStream")
864            .field("total_subscribers", &self.total_subscribers())
865            .field("aggregator", &self.aggregator)
866            .finish()
867    }
868}
869
870// =============================================================================
871// H2K / K2H Wire Protocol (GPU-mockable, CPU-side only for v1.1)
872// =============================================================================
873
874/// H2K command: subscribe to metrics from a GPU actor.
875///
876/// Encoded into the H2K message payload on the CUDA backend. `interval_us`
877/// of 0 is interpreted as an unsubscribe request.
878///
879/// # Wire Layout
880///
881/// The type is `#[repr(C)]` and `Pod`-like (no padding-sensitive fields) so
882/// it maps 1:1 to the GPU-side C struct of the same name.
883#[derive(Debug, Clone, Copy, PartialEq, Eq)]
884#[repr(C)]
885pub struct SubscribeMetricsRequest {
886    /// Target actor (thread-block index on GPU).
887    pub actor_id: u64,
888    /// Emission interval in microseconds; 0 = unsubscribe.
889    pub interval_us: u64,
890    /// Caller-generated subscription ID (echoed back in events).
891    pub subscription_id: u64,
892}
893
894impl SubscribeMetricsRequest {
895    /// Size of the wire representation in bytes.
896    pub const WIRE_SIZE: usize = std::mem::size_of::<Self>();
897
898    /// Construct a new subscribe request.
899    pub const fn new(actor_id: u64, interval_us: u64, subscription_id: u64) -> Self {
900        Self {
901            actor_id,
902            interval_us,
903            subscription_id,
904        }
905    }
906
907    /// Construct an unsubscribe request (interval = 0).
908    pub const fn unsubscribe(actor_id: u64, subscription_id: u64) -> Self {
909        Self::new(actor_id, 0, subscription_id)
910    }
911
912    /// Return true if this message is an unsubscribe request.
913    pub const fn is_unsubscribe(&self) -> bool {
914        self.interval_us == 0
915    }
916
917    /// Serialize to a fixed-size byte array.
918    pub fn to_bytes(&self) -> [u8; Self::WIRE_SIZE] {
919        // SAFETY: `Self` is `#[repr(C)]` with only `u64` fields — no padding,
920        // no invalid bit patterns.
921        unsafe { std::mem::transmute::<Self, [u8; Self::WIRE_SIZE]>(*self) }
922    }
923
924    /// Deserialize from a byte slice. Returns `None` if the slice is too short.
925    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
926        if bytes.len() < Self::WIRE_SIZE {
927            return None;
928        }
929        let mut buf = [0u8; Self::WIRE_SIZE];
930        buf.copy_from_slice(&bytes[..Self::WIRE_SIZE]);
931        // SAFETY: All-u64 layout; every bit pattern is a valid `Self`.
932        Some(unsafe { std::mem::transmute::<[u8; Self::WIRE_SIZE], Self>(buf) })
933    }
934}
935
936/// K2H response: periodic metric emission from a GPU actor.
937///
938/// This is the on-wire form of [`LiveMetrics`] — fixed-size, `#[repr(C)]`,
939/// and compact so it can be emitted at high frequency from GPU-side. The
940/// CPU-side K2H processor lifts it into a [`LiveMetrics`] for subscribers.
941#[derive(Debug, Clone, Copy, PartialEq, Eq)]
942#[repr(C)]
943pub struct LiveMetricsEvent {
944    /// Subscription this event is emitted for.
945    pub subscription_id: u64,
946    /// Source actor (thread-block index on GPU).
947    pub actor_id: u64,
948    /// Microsecond-precision timestamp (wall clock, GPU-side origin).
949    pub timestamp_us: u64,
950    /// Total inbound messages observed (monotonic, cumulative).
951    pub inbound_total: u64,
952    /// Total outbound messages observed (monotonic, cumulative).
953    pub outbound_total: u64,
954    /// Tenant identifier (0 = unspecified).
955    pub tenant_id: u64,
956    /// Current inbound queue depth.
957    pub queue_depth: u32,
958    /// Observed p50 latency in microseconds.
959    pub latency_p50_us: u32,
960    /// Observed p99 latency in microseconds.
961    pub latency_p99_us: u32,
962    /// Resident actor state size in bytes (truncated to u32).
963    pub state_size_bytes: u32,
964    /// GPU utilization percentage 0–100.
965    pub gpu_utilization_pct: u8,
966    /// Explicit padding for 16-byte alignment (C ABI stability).
967    pub _pad: [u8; 7],
968}
969
970impl LiveMetricsEvent {
971    /// Size of the wire representation in bytes.
972    pub const WIRE_SIZE: usize = std::mem::size_of::<Self>();
973
974    /// Construct a new event. The padding field is zeroed.
975    #[allow(clippy::too_many_arguments)]
976    pub const fn new(
977        subscription_id: u64,
978        actor_id: u64,
979        timestamp_us: u64,
980        inbound_total: u64,
981        outbound_total: u64,
982        tenant_id: u64,
983        queue_depth: u32,
984        latency_p50_us: u32,
985        latency_p99_us: u32,
986        state_size_bytes: u32,
987        gpu_utilization_pct: u8,
988    ) -> Self {
989        Self {
990            subscription_id,
991            actor_id,
992            timestamp_us,
993            inbound_total,
994            outbound_total,
995            tenant_id,
996            queue_depth,
997            latency_p50_us,
998            latency_p99_us,
999            state_size_bytes,
1000            gpu_utilization_pct,
1001            _pad: [0; 7],
1002        }
1003    }
1004
1005    /// Serialize to a fixed-size byte array.
1006    pub fn to_bytes(&self) -> [u8; Self::WIRE_SIZE] {
1007        // SAFETY: `Self` is `#[repr(C)]` with `u64`/`u32`/`u8` fields and an
1008        // explicit padding array — no implicit padding holes, no invalid bit
1009        // patterns.
1010        unsafe { std::mem::transmute::<Self, [u8; Self::WIRE_SIZE]>(*self) }
1011    }
1012
1013    /// Deserialize from a byte slice. Returns `None` if the slice is too short.
1014    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
1015        if bytes.len() < Self::WIRE_SIZE {
1016            return None;
1017        }
1018        let mut buf = [0u8; Self::WIRE_SIZE];
1019        buf.copy_from_slice(&bytes[..Self::WIRE_SIZE]);
1020        // SAFETY: Plain-old-data layout; all bit patterns are valid.
1021        Some(unsafe { std::mem::transmute::<[u8; Self::WIRE_SIZE], Self>(buf) })
1022    }
1023
1024    /// Lift into a [`LiveMetrics`] observation.
1025    pub fn into_live_metrics(self, hlc_node_id: u64) -> LiveMetrics {
1026        LiveMetrics {
1027            actor_id: ActorId(self.actor_id as u32),
1028            timestamp: HlcTimestamp::new(self.timestamp_us, 0, hlc_node_id),
1029            queue_depth: self.queue_depth as usize,
1030            // Wire form carries cumulative totals; rate smoothing is a
1031            // CPU-side responsibility. We expose totals-as-rates = 0 to
1032            // indicate "not yet smoothed".
1033            inbound_rate: 0.0,
1034            outbound_rate: 0.0,
1035            latency_p50: Duration::from_micros(self.latency_p50_us as u64),
1036            latency_p99: Duration::from_micros(self.latency_p99_us as u64),
1037            state_size_bytes: self.state_size_bytes as u64,
1038            gpu_utilization: (self.gpu_utilization_pct as f32 / 100.0).clamp(0.0, 1.0),
1039            tenant_id: self.tenant_id,
1040        }
1041    }
1042}
1043
1044// Compile-time size assertions — changing wire sizes is a breaking change.
1045const _: () = assert!(SubscribeMetricsRequest::WIRE_SIZE == 24);
1046const _: () = assert!(LiveMetricsEvent::WIRE_SIZE == 72);
1047
1048#[cfg(test)]
1049mod tests {
1050    use super::*;
1051
1052    #[test]
1053    fn test_trace_buffer_basic() {
1054        let mut buf = TraceBuffer::new(3);
1055
1056        for i in 0..5 {
1057            buf.record(TraceEntry {
1058                sequence: i,
1059                received_at: Instant::now(),
1060                duration: Duration::from_micros(100),
1061                source: None,
1062                outcome: TraceOutcome::Success,
1063            });
1064        }
1065
1066        assert_eq!(buf.len(), 3); // Capacity limited
1067        assert_eq!(buf.total_recorded(), 5);
1068    }
1069
1070    #[test]
1071    fn test_trace_buffer_recent() {
1072        let mut buf = TraceBuffer::new(10);
1073
1074        for i in 0..5 {
1075            buf.record(TraceEntry {
1076                sequence: i,
1077                received_at: Instant::now(),
1078                duration: Duration::from_micros(100),
1079                source: None,
1080                outcome: TraceOutcome::Success,
1081            });
1082            std::thread::sleep(Duration::from_millis(1));
1083        }
1084
1085        let recent = buf.recent(3);
1086        assert_eq!(recent.len(), 3);
1087        // Most recent should have highest sequence
1088        assert!(recent[0].sequence > recent[2].sequence);
1089    }
1090
1091    #[test]
1092    fn test_introspection_service() {
1093        let mut svc = IntrospectionService::new(10);
1094
1095        let actor = ActorId(1);
1096        svc.register_actor(actor);
1097
1098        svc.record_trace(
1099            actor,
1100            TraceEntry {
1101                sequence: 1,
1102                received_at: Instant::now(),
1103                duration: Duration::from_micros(50),
1104                source: Some(ActorId(2)),
1105                outcome: TraceOutcome::Forwarded(ActorId(3)),
1106            },
1107        );
1108
1109        let traces = svc.get_traces(actor, 10);
1110        assert_eq!(traces.len(), 1);
1111        assert_eq!(traces[0].sequence, 1);
1112    }
1113
1114    #[test]
1115    fn test_queue_snapshot_utilization() {
1116        let snap = QueueSnapshot {
1117            input_depth: 75,
1118            input_capacity: 100,
1119            ..Default::default()
1120        };
1121        assert!((snap.input_utilization() - 0.75).abs() < 0.01);
1122    }
1123
1124    #[test]
1125    fn test_trace_outcome_variants() {
1126        assert_eq!(TraceOutcome::Success, TraceOutcome::Success);
1127        assert_ne!(TraceOutcome::Success, TraceOutcome::Dropped);
1128    }
1129
1130    // =========================================================================
1131    // Streaming introspection tests (v1.1 §3.2)
1132    // =========================================================================
1133
1134    fn mk_metrics(id: u32) -> LiveMetrics {
1135        LiveMetrics {
1136            actor_id: ActorId(id),
1137            timestamp: HlcTimestamp::new(0, 0, 0),
1138            queue_depth: 0,
1139            inbound_rate: 0.0,
1140            outbound_rate: 0.0,
1141            latency_p50: Duration::ZERO,
1142            latency_p99: Duration::ZERO,
1143            state_size_bytes: 0,
1144            gpu_utilization: 0.0,
1145            tenant_id: 0,
1146        }
1147    }
1148
1149    #[tokio::test]
1150    async fn test_subscribe_receives_metric() {
1151        let stream = IntrospectionStream::new();
1152        let actor = ActorId(42);
1153        let mut rx = stream.subscribe(actor, Duration::from_nanos(1));
1154        assert_eq!(stream.subscriber_count(&actor), 1);
1155
1156        stream.emit(mk_metrics(42));
1157        let got = rx.recv().await.expect("metric delivered");
1158        assert_eq!(got.actor_id, actor);
1159    }
1160
1161    #[tokio::test]
1162    async fn test_subscribe_filters_by_actor() {
1163        let stream = IntrospectionStream::new();
1164        let mut rx = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1165
1166        // Emit for a different actor — should not be delivered.
1167        stream.emit(mk_metrics(2));
1168        // Emit for the subscribed actor — should be delivered.
1169        stream.emit(mk_metrics(1));
1170
1171        let got = rx.recv().await.expect("metric delivered");
1172        assert_eq!(got.actor_id, ActorId(1));
1173        assert!(rx.try_recv().is_err(), "no other metric should be queued");
1174    }
1175
1176    #[tokio::test]
1177    async fn test_unsubscribe_stops_delivery() {
1178        let stream = IntrospectionStream::new();
1179        let actor = ActorId(7);
1180        let mut rx = stream.subscribe(actor, Duration::from_nanos(1));
1181        stream.unsubscribe(actor);
1182        assert_eq!(stream.subscriber_count(&actor), 0);
1183        stream.emit(mk_metrics(7));
1184
1185        // The sender was dropped inside the stream; the receiver should
1186        // either return None (closed) or have nothing queued.
1187        match rx.recv().await {
1188            None => {}
1189            Some(_) => panic!("no metric should be delivered after unsubscribe"),
1190        }
1191    }
1192
1193    #[tokio::test]
1194    async fn test_subscribe_interval_zero_is_unsubscribe() {
1195        let stream = IntrospectionStream::new();
1196        let actor = ActorId(3);
1197        // First subscribe normally, then call with interval zero.
1198        let _keep = stream.subscribe(actor, Duration::from_micros(1));
1199        assert_eq!(stream.subscriber_count(&actor), 1);
1200
1201        let mut rx_zero = stream.subscribe(actor, Duration::ZERO);
1202        assert_eq!(stream.subscriber_count(&actor), 0);
1203        stream.emit(mk_metrics(3));
1204        assert!(rx_zero.recv().await.is_none(), "zero interval = closed");
1205    }
1206
1207    #[tokio::test]
1208    async fn test_multi_subscriber_fanout() {
1209        let stream = IntrospectionStream::new();
1210        let actor = ActorId(10);
1211        let mut rx1 = stream.subscribe(actor, Duration::from_nanos(1));
1212        let mut rx2 = stream.subscribe(actor, Duration::from_nanos(1));
1213        let mut rx3 = stream.subscribe(actor, Duration::from_nanos(1));
1214        assert_eq!(stream.subscriber_count(&actor), 3);
1215
1216        stream.emit(mk_metrics(10));
1217
1218        assert_eq!(rx1.recv().await.unwrap().actor_id, actor);
1219        assert_eq!(rx2.recv().await.unwrap().actor_id, actor);
1220        assert_eq!(rx3.recv().await.unwrap().actor_id, actor);
1221    }
1222
1223    #[tokio::test]
1224    async fn test_broken_receiver_auto_cleanup() {
1225        let stream = IntrospectionStream::new();
1226        let actor = ActorId(9);
1227        let rx1 = stream.subscribe(actor, Duration::from_nanos(1));
1228        let mut rx2 = stream.subscribe(actor, Duration::from_nanos(1));
1229        assert_eq!(stream.subscriber_count(&actor), 2);
1230
1231        // Drop rx1 — next emit should prune its handle.
1232        drop(rx1);
1233        stream.emit(mk_metrics(9));
1234        assert_eq!(stream.subscriber_count(&actor), 1);
1235
1236        // rx2 is still alive and should have received the metric.
1237        assert_eq!(rx2.recv().await.unwrap().actor_id, actor);
1238    }
1239
1240    #[tokio::test]
1241    async fn test_broken_receiver_closes_empty_bucket() {
1242        let stream = IntrospectionStream::new();
1243        let actor = ActorId(15);
1244        let rx = stream.subscribe(actor, Duration::from_nanos(1));
1245        drop(rx);
1246        stream.emit(mk_metrics(15));
1247        assert_eq!(stream.subscriber_count(&actor), 0);
1248        // Emitting again after pruning must not panic.
1249        stream.emit(mk_metrics(15));
1250    }
1251
1252    #[tokio::test]
1253    async fn test_subscribe_all_receives_all_actors() {
1254        let stream = IntrospectionStream::new();
1255        let mut rx = stream.subscribe_all(Duration::from_nanos(1));
1256        stream.emit(mk_metrics(1));
1257        stream.emit(mk_metrics(2));
1258        stream.emit(mk_metrics(3));
1259
1260        let mut seen = Vec::new();
1261        for _ in 0..3 {
1262            seen.push(rx.recv().await.unwrap().actor_id);
1263        }
1264        seen.sort_by_key(|a| a.0);
1265        assert_eq!(seen, vec![ActorId(1), ActorId(2), ActorId(3)]);
1266    }
1267
1268    #[tokio::test]
1269    async fn test_subscribe_all_plus_specific_both_fire() {
1270        let stream = IntrospectionStream::new();
1271        let mut rx_all = stream.subscribe_all(Duration::from_nanos(1));
1272        let mut rx_one = stream.subscribe(ActorId(5), Duration::from_nanos(1));
1273
1274        stream.emit(mk_metrics(5));
1275
1276        assert_eq!(rx_all.recv().await.unwrap().actor_id, ActorId(5));
1277        assert_eq!(rx_one.recv().await.unwrap().actor_id, ActorId(5));
1278    }
1279
1280    #[tokio::test]
1281    async fn test_subscribe_all_interval_zero_is_no_op() {
1282        let stream = IntrospectionStream::new();
1283        let mut rx = stream.subscribe_all(Duration::ZERO);
1284        stream.emit(mk_metrics(1));
1285        assert!(rx.recv().await.is_none());
1286        assert_eq!(stream.global_subscriber_count(), 0);
1287    }
1288
1289    #[tokio::test]
1290    async fn test_unsubscribe_all_clears_global() {
1291        let stream = IntrospectionStream::new();
1292        let _rx1 = stream.subscribe_all(Duration::from_nanos(1));
1293        let _rx2 = stream.subscribe_all(Duration::from_nanos(1));
1294        assert_eq!(stream.global_subscriber_count(), 2);
1295        stream.unsubscribe_all();
1296        assert_eq!(stream.global_subscriber_count(), 0);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_interval_gating_suppresses_faster_emits() {
1301        let stream = IntrospectionStream::new();
1302        let actor = ActorId(11);
1303        let mut rx = stream.subscribe(actor, Duration::from_millis(500));
1304
1305        // First emit — delivered (no prior sample).
1306        stream.emit(mk_metrics(11));
1307        // Second emit immediately after — suppressed by interval gate.
1308        stream.emit(mk_metrics(11));
1309
1310        assert_eq!(rx.recv().await.unwrap().actor_id, actor);
1311        assert!(
1312            rx.try_recv().is_err(),
1313            "second emit should be gated by interval"
1314        );
1315    }
1316
1317    #[tokio::test]
1318    async fn test_total_subscribers_sums_buckets() {
1319        let stream = IntrospectionStream::new();
1320        let _a = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1321        let _b = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1322        let _c = stream.subscribe(ActorId(2), Duration::from_nanos(1));
1323        let _d = stream.subscribe_all(Duration::from_nanos(1));
1324        assert_eq!(stream.total_subscribers(), 4);
1325    }
1326
1327    #[test]
1328    fn test_subscription_ids_are_unique_and_monotonic() {
1329        let stream = IntrospectionStream::new();
1330        let _a = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1331        let _b = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1332        let _c = stream.subscribe_all(Duration::from_nanos(1));
1333
1334        let subs = stream.subscriptions.read();
1335        let handles = subs.get(&ActorId(1)).expect("bucket exists");
1336        assert_eq!(handles.len(), 2);
1337        assert!(handles[0].subscription_id < handles[1].subscription_id);
1338
1339        let globals = stream.global_subscriptions.read();
1340        assert_eq!(globals.len(), 1);
1341        assert!(globals[0].subscription_id > handles[1].subscription_id);
1342    }
1343
1344    // ---- MetricAggregator ---------------------------------------------------
1345
1346    #[test]
1347    fn test_aggregator_record_and_snapshot() {
1348        let agg = MetricAggregator::new();
1349        let a = ActorId(1);
1350        agg.record_inbound(a, 10);
1351        agg.record_outbound(a, 5);
1352        agg.set_queue_depth(a, 3);
1353        agg.set_state_size(a, 4096);
1354        agg.set_gpu_utilization(a, 0.75);
1355        agg.set_tenant(a, 42);
1356
1357        let snap = agg.snapshot(&a).expect("snapshot exists");
1358        assert_eq!(snap.queue_depth, 3);
1359        assert_eq!(snap.state_size_bytes, 4096);
1360        assert!((snap.gpu_utilization - 0.75).abs() < 1e-6);
1361        assert_eq!(snap.tenant_id, 42);
1362        // The first snapshot seeds EWMA with the raw rate — must be positive.
1363        assert!(snap.inbound_rate > 0.0);
1364        assert!(snap.outbound_rate > 0.0);
1365    }
1366
1367    #[test]
1368    fn test_aggregator_snapshot_unknown_actor_returns_none() {
1369        let agg = MetricAggregator::new();
1370        assert!(agg.snapshot(&ActorId(999)).is_none());
1371    }
1372
1373    #[test]
1374    fn test_aggregator_snapshot_all_covers_every_actor() {
1375        let agg = MetricAggregator::new();
1376        for i in 0..5 {
1377            agg.record_inbound(ActorId(i), 1);
1378        }
1379        let all = agg.snapshot_all();
1380        assert_eq!(all.len(), 5);
1381        let mut ids: Vec<u32> = all.iter().map(|m| m.actor_id.0).collect();
1382        ids.sort();
1383        assert_eq!(ids, vec![0, 1, 2, 3, 4]);
1384    }
1385
1386    #[test]
1387    fn test_aggregator_remove_actor() {
1388        let agg = MetricAggregator::new();
1389        agg.record_inbound(ActorId(1), 1);
1390        assert_eq!(agg.tracked_actors(), 1);
1391        assert!(agg.remove_actor(&ActorId(1)));
1392        assert_eq!(agg.tracked_actors(), 0);
1393        assert!(!agg.remove_actor(&ActorId(1)));
1394    }
1395
1396    #[test]
1397    fn test_aggregator_gpu_utilization_clamped() {
1398        let agg = MetricAggregator::new();
1399        let a = ActorId(1);
1400        agg.record_inbound(a, 1);
1401        agg.set_gpu_utilization(a, 5.0);
1402        assert!((agg.snapshot(&a).unwrap().gpu_utilization - 1.0).abs() < 1e-6);
1403        agg.set_gpu_utilization(a, -1.0);
1404        assert_eq!(agg.snapshot(&a).unwrap().gpu_utilization, 0.0);
1405    }
1406
1407    #[test]
1408    fn test_aggregator_with_custom_alpha() {
1409        let agg = MetricAggregator::with_alpha(0.5);
1410        assert!((agg.ewma_alpha() - 0.5).abs() < 1e-9);
1411        // Invalid alpha falls back to default.
1412        let agg = MetricAggregator::with_alpha(0.0);
1413        assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1414        let agg = MetricAggregator::with_alpha(2.0);
1415        assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1416        let agg = MetricAggregator::with_alpha(f64::NAN);
1417        assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1418    }
1419
1420    #[test]
1421    fn test_aggregator_ewma_smooths_spikes() {
1422        // Use a small alpha so a single spike cannot dominate.
1423        let agg = MetricAggregator::with_alpha(0.1);
1424        let a = ActorId(1);
1425
1426        // Seed the EWMA with a steady base rate.
1427        for _ in 0..5 {
1428            agg.record_inbound(a, 100);
1429            std::thread::sleep(Duration::from_millis(20));
1430            let _ = agg.snapshot(&a);
1431        }
1432        let baseline = agg.snapshot(&a).unwrap().inbound_rate;
1433
1434        // Send a giant spike and take one snapshot — EWMA should pull
1435        // the reported rate toward, but not all the way to, the spike.
1436        agg.record_inbound(a, 1_000_000);
1437        std::thread::sleep(Duration::from_millis(20));
1438        let after = agg.snapshot(&a).unwrap().inbound_rate;
1439
1440        assert!(after > baseline, "spike should raise the smoothed rate");
1441        // With alpha=0.1 the rate cannot jump by more than ~10% of the
1442        // raw spike delta in a single sample.
1443        let raw_spike_rate = 1_000_000.0 / 0.020;
1444        assert!(
1445            after < raw_spike_rate,
1446            "EWMA must not fully adopt a single spike (after={after}, raw={raw_spike_rate})"
1447        );
1448    }
1449
1450    #[test]
1451    fn test_aggregator_ewma_known_sequence() {
1452        // With alpha=1.0, EWMA degenerates to the most recent sample.
1453        // This lets us exercise the seeding + update paths deterministically.
1454        let agg = MetricAggregator::with_alpha(1.0);
1455        let a = ActorId(1);
1456
1457        agg.record_inbound(a, 10);
1458        std::thread::sleep(Duration::from_millis(10));
1459        let first = agg.snapshot(&a).unwrap().inbound_rate;
1460        assert!(first > 0.0);
1461
1462        // With alpha=1, the next snapshot should equal the new raw rate.
1463        agg.record_inbound(a, 0); // no new messages
1464        std::thread::sleep(Duration::from_millis(10));
1465        let second = agg.snapshot(&a).unwrap().inbound_rate;
1466        assert_eq!(second, 0.0, "alpha=1 adopts the raw rate verbatim");
1467    }
1468
1469    // ---- LatencyHistogram ----------------------------------------------------
1470
1471    #[test]
1472    fn test_latency_histogram_empty() {
1473        let h = LatencyHistogram::new();
1474        assert_eq!(h.p50(), Duration::ZERO);
1475        assert_eq!(h.p99(), Duration::ZERO);
1476        assert_eq!(h.live_samples(), 0);
1477        assert_eq!(h.total_recorded(), 0);
1478    }
1479
1480    #[test]
1481    fn test_latency_histogram_percentiles() {
1482        let mut h = LatencyHistogram::new();
1483        for i in 1..=100u64 {
1484            h.record(Duration::from_millis(i));
1485        }
1486        assert_eq!(h.live_samples(), 100);
1487        // p50 with nearest-rank on 100 samples = rank 49 (1-indexed 50th).
1488        assert_eq!(h.p50(), Duration::from_millis(50));
1489        // p99 = rank 98 (1-indexed 99th).
1490        assert_eq!(h.p99(), Duration::from_millis(99));
1491    }
1492
1493    #[test]
1494    fn test_latency_histogram_ring_wraparound() {
1495        let mut h = LatencyHistogram::new();
1496        // Fill past capacity — the oldest samples are overwritten.
1497        for i in 0..(LATENCY_HISTOGRAM_CAPACITY + 100) {
1498            h.record(Duration::from_micros(i as u64));
1499        }
1500        assert_eq!(h.live_samples(), LATENCY_HISTOGRAM_CAPACITY);
1501        assert_eq!(
1502            h.total_recorded(),
1503            (LATENCY_HISTOGRAM_CAPACITY + 100) as u64
1504        );
1505        // p99 should reflect only the retained (most recent) samples.
1506        assert!(h.p99() > Duration::from_micros(100));
1507    }
1508
1509    #[test]
1510    fn test_latency_histogram_percentile_clamps() {
1511        let mut h = LatencyHistogram::new();
1512        h.record(Duration::from_millis(5));
1513        h.record(Duration::from_millis(10));
1514        assert_eq!(h.percentile(-1.0), h.percentile(0.0));
1515        assert_eq!(h.percentile(2.0), h.percentile(1.0));
1516    }
1517
1518    #[test]
1519    fn test_aggregator_latency_snapshot_reflects_histogram() {
1520        let agg = MetricAggregator::new();
1521        let a = ActorId(1);
1522        for i in 1..=10u64 {
1523            agg.record_latency(a, Duration::from_millis(i));
1524        }
1525        agg.record_inbound(a, 1); // ensure rate fields are populated
1526        let snap = agg.snapshot(&a).unwrap();
1527        assert!(snap.latency_p50 >= Duration::from_millis(5));
1528        assert!(snap.latency_p99 >= Duration::from_millis(9));
1529    }
1530
1531    // ---- IntrospectionStream <-> Aggregator integration ---------------------
1532
1533    #[tokio::test]
1534    async fn test_stream_tick_emits_all_aggregated() {
1535        let stream = IntrospectionStream::new();
1536        let agg = stream.aggregator();
1537        agg.record_inbound(ActorId(1), 10);
1538        agg.record_inbound(ActorId(2), 20);
1539        let mut rx = stream.subscribe_all(Duration::from_nanos(1));
1540
1541        let emitted = stream.tick();
1542        assert_eq!(emitted, 2);
1543
1544        let mut seen = Vec::new();
1545        for _ in 0..2 {
1546            seen.push(rx.recv().await.unwrap().actor_id.0);
1547        }
1548        seen.sort();
1549        assert_eq!(seen, vec![1, 2]);
1550    }
1551
1552    #[tokio::test]
1553    async fn test_stream_with_shared_aggregator() {
1554        let agg = Arc::new(MetricAggregator::new());
1555        let stream = IntrospectionStream::with_aggregator(agg.clone());
1556        assert!(Arc::ptr_eq(&agg, &stream.aggregator()));
1557        agg.record_inbound(ActorId(1), 1);
1558        let mut rx = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1559        assert_eq!(stream.tick(), 1);
1560        assert_eq!(rx.recv().await.unwrap().actor_id, ActorId(1));
1561    }
1562
1563    // ---- Concurrency --------------------------------------------------------
1564
1565    #[tokio::test]
1566    async fn test_concurrent_subscribe_unsubscribe_race() {
1567        // Manual loom-style stress: many threads subscribing and unsubscribing
1568        // while another thread emits metrics. The invariant is safety (no
1569        // panic, no poisoned lock), not delivery guarantees.
1570        use std::sync::Barrier;
1571        use std::thread;
1572
1573        let stream = Arc::new(IntrospectionStream::new());
1574        let barrier = Arc::new(Barrier::new(4));
1575        let mut handles = Vec::new();
1576
1577        // Two subscriber threads.
1578        for worker in 0..2 {
1579            let s = stream.clone();
1580            let b = barrier.clone();
1581            handles.push(thread::spawn(move || {
1582                b.wait();
1583                for i in 0..200 {
1584                    let actor = ActorId((i + worker) % 8);
1585                    let _rx = s.subscribe(actor, Duration::from_nanos(1));
1586                    if i % 3 == 0 {
1587                        s.unsubscribe(actor);
1588                    }
1589                }
1590            }));
1591        }
1592
1593        // Emitter thread.
1594        {
1595            let s = stream.clone();
1596            let b = barrier.clone();
1597            handles.push(thread::spawn(move || {
1598                b.wait();
1599                for i in 0..400 {
1600                    s.emit(mk_metrics((i % 8) as u32));
1601                }
1602            }));
1603        }
1604
1605        // Reader thread probes subscriber counts to exercise read locks.
1606        {
1607            let s = stream.clone();
1608            let b = barrier.clone();
1609            handles.push(thread::spawn(move || {
1610                b.wait();
1611                for i in 0..400 {
1612                    let _ = s.subscriber_count(&ActorId((i % 8) as u32));
1613                    let _ = s.total_subscribers();
1614                }
1615            }));
1616        }
1617
1618        for h in handles {
1619            h.join().expect("worker thread did not panic");
1620        }
1621        // After the storm, the stream is still functional.
1622        let mut rx = stream.subscribe(ActorId(123), Duration::from_nanos(1));
1623        stream.emit(mk_metrics(123));
1624        assert_eq!(rx.recv().await.unwrap().actor_id, ActorId(123));
1625    }
1626
1627    #[tokio::test]
1628    async fn test_concurrent_aggregator_record_and_snapshot() {
1629        use std::sync::Barrier;
1630        use std::thread;
1631
1632        let agg = Arc::new(MetricAggregator::new());
1633        let barrier = Arc::new(Barrier::new(3));
1634        let mut handles = Vec::new();
1635
1636        for _ in 0..2 {
1637            let a = agg.clone();
1638            let b = barrier.clone();
1639            handles.push(thread::spawn(move || {
1640                b.wait();
1641                for i in 0..500 {
1642                    a.record_inbound(ActorId((i % 4) as u32), 1);
1643                    a.record_latency(
1644                        ActorId((i % 4) as u32),
1645                        Duration::from_micros((i % 100) as u64),
1646                    );
1647                }
1648            }));
1649        }
1650        {
1651            let a = agg.clone();
1652            let b = barrier.clone();
1653            handles.push(thread::spawn(move || {
1654                b.wait();
1655                for _ in 0..500 {
1656                    let _ = a.snapshot_all();
1657                }
1658            }));
1659        }
1660
1661        for h in handles {
1662            h.join().expect("no panic");
1663        }
1664        assert_eq!(agg.tracked_actors(), 4);
1665    }
1666
1667    // ---- Wire protocol ------------------------------------------------------
1668
1669    #[test]
1670    fn test_subscribe_metrics_request_roundtrip() {
1671        let req = SubscribeMetricsRequest::new(42, 1000, 7);
1672        assert!(!req.is_unsubscribe());
1673        let bytes = req.to_bytes();
1674        assert_eq!(bytes.len(), SubscribeMetricsRequest::WIRE_SIZE);
1675        let decoded = SubscribeMetricsRequest::from_bytes(&bytes).unwrap();
1676        assert_eq!(decoded, req);
1677    }
1678
1679    #[test]
1680    fn test_subscribe_metrics_request_unsubscribe() {
1681        let req = SubscribeMetricsRequest::unsubscribe(42, 7);
1682        assert!(req.is_unsubscribe());
1683        assert_eq!(req.interval_us, 0);
1684        let decoded =
1685            SubscribeMetricsRequest::from_bytes(&req.to_bytes()).expect("roundtrip decode");
1686        assert!(decoded.is_unsubscribe());
1687    }
1688
1689    #[test]
1690    fn test_subscribe_metrics_request_short_buffer() {
1691        let short = [0u8; SubscribeMetricsRequest::WIRE_SIZE - 1];
1692        assert!(SubscribeMetricsRequest::from_bytes(&short).is_none());
1693    }
1694
1695    #[test]
1696    fn test_live_metrics_event_roundtrip() {
1697        let evt = LiveMetricsEvent::new(
1698            /* subscription_id */ 9,
1699            /* actor_id */ 42,
1700            /* timestamp_us */ 1_700_000_000_000_000,
1701            /* inbound_total */ 123,
1702            /* outbound_total */ 45,
1703            /* tenant_id */ 7,
1704            /* queue_depth */ 16,
1705            /* latency_p50_us */ 500,
1706            /* latency_p99_us */ 2_500,
1707            /* state_size_bytes */ 8_192,
1708            /* gpu_utilization_pct */ 73,
1709        );
1710        let bytes = evt.to_bytes();
1711        assert_eq!(bytes.len(), LiveMetricsEvent::WIRE_SIZE);
1712        let decoded = LiveMetricsEvent::from_bytes(&bytes).unwrap();
1713        assert_eq!(decoded, evt);
1714    }
1715
1716    #[test]
1717    fn test_live_metrics_event_short_buffer() {
1718        let short = [0u8; LiveMetricsEvent::WIRE_SIZE - 1];
1719        assert!(LiveMetricsEvent::from_bytes(&short).is_none());
1720    }
1721
1722    #[test]
1723    fn test_live_metrics_event_into_live_metrics() {
1724        let evt = LiveMetricsEvent::new(1, 42, 1_234_567, 0, 0, 9, 16, 500, 2_500, 4096, 80);
1725        let metrics = evt.into_live_metrics(3);
1726        assert_eq!(metrics.actor_id, ActorId(42));
1727        assert_eq!(metrics.timestamp.physical, 1_234_567);
1728        assert_eq!(metrics.timestamp.node_id, 3);
1729        assert_eq!(metrics.queue_depth, 16);
1730        assert_eq!(metrics.latency_p50, Duration::from_micros(500));
1731        assert_eq!(metrics.latency_p99, Duration::from_micros(2_500));
1732        assert_eq!(metrics.state_size_bytes, 4096);
1733        assert!((metrics.gpu_utilization - 0.80).abs() < 1e-6);
1734        assert_eq!(metrics.tenant_id, 9);
1735    }
1736
1737    #[test]
1738    fn test_wire_sizes_are_stable() {
1739        // Compile-time assertions guard size changes; these runtime checks
1740        // give a friendlier error message if the layout drifts.
1741        assert_eq!(SubscribeMetricsRequest::WIRE_SIZE, 24);
1742        assert_eq!(LiveMetricsEvent::WIRE_SIZE, 72);
1743    }
1744
1745    #[test]
1746    fn test_live_metrics_event_roundtrip_preserves_gpu_pct_clamp() {
1747        // GPU util pct is a u8 0–100; conversion to f32 should clamp on read.
1748        let evt = LiveMetricsEvent::new(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 200);
1749        let m = evt.into_live_metrics(0);
1750        assert_eq!(m.gpu_utilization, 1.0);
1751    }
1752
1753    #[test]
1754    fn test_subscribe_metrics_request_size_stable() {
1755        // Size must match the GPU-side C struct exactly.
1756        assert_eq!(std::mem::size_of::<SubscribeMetricsRequest>(), 24);
1757    }
1758
1759    #[test]
1760    fn test_live_metrics_event_size_stable() {
1761        assert_eq!(std::mem::size_of::<LiveMetricsEvent>(), 72);
1762    }
1763
1764    // ---- Existing pull API remains working (regression guard) ---------------
1765
1766    #[test]
1767    fn test_pull_api_still_works_alongside_streaming() {
1768        let mut svc = IntrospectionService::new(4);
1769        let a = ActorId(1);
1770        svc.register_actor(a);
1771        svc.record_trace(
1772            a,
1773            TraceEntry {
1774                sequence: 0,
1775                received_at: Instant::now(),
1776                duration: Duration::from_micros(10),
1777                source: None,
1778                outcome: TraceOutcome::Success,
1779            },
1780        );
1781        assert_eq!(svc.get_traces(a, 10).len(), 1);
1782        // Streaming stack coexists, no interference.
1783        let stream = IntrospectionStream::new();
1784        assert_eq!(stream.total_subscribers(), 0);
1785    }
1786}