1use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use parking_lot::RwLock;
23use tokio::sync::mpsc;
24
25use crate::actor::{ActorId, ActorState};
26use crate::hlc::HlcTimestamp;
27
28#[derive(Debug, Clone)]
30pub struct ActorSnapshot {
31 pub id: ActorId,
33 pub name: Option<String>,
35 pub state: ActorState,
37 pub parent: Option<ActorId>,
39 pub children: Vec<ActorId>,
41 pub queue: QueueSnapshot,
43 pub performance: PerformanceSnapshot,
45 pub snapshot_at: Instant,
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct QueueSnapshot {
52 pub input_depth: u32,
54 pub output_depth: u32,
56 pub input_capacity: u32,
58 pub output_capacity: u32,
60 pub pressure: u8,
62}
63
64impl QueueSnapshot {
65 pub fn input_utilization(&self) -> f64 {
67 if self.input_capacity == 0 {
68 return 0.0;
69 }
70 self.input_depth as f64 / self.input_capacity as f64
71 }
72}
73
74#[derive(Debug, Clone, Default)]
76pub struct PerformanceSnapshot {
77 pub messages_processed: u64,
79 pub messages_per_second: f64,
81 pub avg_latency: Duration,
83 pub max_latency: Duration,
85 pub restart_count: u32,
87 pub uptime: Duration,
89}
90
91#[derive(Debug, Clone)]
93pub struct TraceEntry {
94 pub sequence: u64,
96 pub received_at: Instant,
98 pub duration: Duration,
100 pub source: Option<ActorId>,
102 pub outcome: TraceOutcome,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
108pub enum TraceOutcome {
109 Success,
111 Failed(String),
113 Forwarded(ActorId),
115 Dropped,
117}
118
119pub struct TraceBuffer {
121 entries: Vec<TraceEntry>,
122 capacity: usize,
123 write_pos: usize,
124 total: u64,
125}
126
127impl TraceBuffer {
128 pub fn new(capacity: usize) -> Self {
130 Self {
131 entries: Vec::with_capacity(capacity),
132 capacity,
133 write_pos: 0,
134 total: 0,
135 }
136 }
137
138 pub fn record(&mut self, entry: TraceEntry) {
140 if self.entries.len() < self.capacity {
141 self.entries.push(entry);
142 } else {
143 self.entries[self.write_pos] = entry;
144 }
145 self.write_pos = (self.write_pos + 1) % self.capacity;
146 self.total += 1;
147 }
148
149 pub fn recent(&self, limit: usize) -> Vec<&TraceEntry> {
151 let mut result: Vec<&TraceEntry> = self.entries.iter().collect();
152 result.sort_by_key(|e| std::cmp::Reverse(e.received_at));
153 result.truncate(limit);
154 result
155 }
156
157 pub fn total_recorded(&self) -> u64 {
159 self.total
160 }
161
162 pub fn len(&self) -> usize {
164 self.entries.len()
165 }
166
167 pub fn is_empty(&self) -> bool {
169 self.entries.is_empty()
170 }
171}
172
173pub struct IntrospectionService {
175 traces: HashMap<ActorId, TraceBuffer>,
177 trace_capacity: usize,
179}
180
181impl IntrospectionService {
182 pub fn new(trace_capacity: usize) -> Self {
184 Self {
185 traces: HashMap::new(),
186 trace_capacity,
187 }
188 }
189
190 pub fn register_actor(&mut self, id: ActorId) {
192 self.traces
193 .entry(id)
194 .or_insert_with(|| TraceBuffer::new(self.trace_capacity));
195 }
196
197 pub fn record_trace(&mut self, actor: ActorId, entry: TraceEntry) {
199 self.traces
200 .entry(actor)
201 .or_insert_with(|| TraceBuffer::new(self.trace_capacity))
202 .record(entry);
203 }
204
205 pub fn get_traces(&self, actor: ActorId, limit: usize) -> Vec<&TraceEntry> {
207 self.traces
208 .get(&actor)
209 .map(|buf| buf.recent(limit))
210 .unwrap_or_default()
211 }
212
213 pub fn deregister_actor(&mut self, id: ActorId) {
215 self.traces.remove(&id);
216 }
217
218 pub fn actor_count(&self) -> usize {
220 self.traces.len()
221 }
222}
223
224impl Default for IntrospectionService {
225 fn default() -> Self {
226 Self::new(100)
227 }
228}
229
230pub const DEFAULT_EWMA_ALPHA: f64 = 0.2;
238
239pub const LATENCY_HISTOGRAM_CAPACITY: usize = 1024;
241
242#[derive(Debug, Clone)]
248pub struct LiveMetrics {
249 pub actor_id: ActorId,
251 pub timestamp: HlcTimestamp,
253 pub queue_depth: usize,
255 pub inbound_rate: f64,
257 pub outbound_rate: f64,
259 pub latency_p50: Duration,
261 pub latency_p99: Duration,
263 pub state_size_bytes: u64,
265 pub gpu_utilization: f32,
267 pub tenant_id: u64,
269}
270
271pub struct SubscriberHandle {
277 pub interval: Duration,
279 last_sent_at: parking_lot::Mutex<Option<Instant>>,
281 pub subscription_id: u64,
283 sender: mpsc::UnboundedSender<LiveMetrics>,
284}
285
286impl SubscriberHandle {
287 pub fn subscription_id(&self) -> u64 {
289 self.subscription_id
290 }
291
292 pub fn is_closed(&self) -> bool {
294 self.sender.is_closed()
295 }
296
297 fn try_send(&self, metrics: LiveMetrics) -> std::result::Result<bool, ()> {
303 let now = Instant::now();
304 {
305 let mut last = self.last_sent_at.lock();
306 if let Some(prev) = *last {
307 if now.duration_since(prev) < self.interval {
308 return Ok(false);
309 }
310 }
311 *last = Some(now);
312 }
313 self.sender.send(metrics).map(|_| true).map_err(|_| ())
314 }
315}
316
317#[derive(Debug, Clone)]
323pub struct LatencyHistogram {
324 samples: [Duration; LATENCY_HISTOGRAM_CAPACITY],
325 idx: usize,
326 count: u64,
327}
328
329impl LatencyHistogram {
330 pub fn new() -> Self {
332 Self {
333 samples: [Duration::ZERO; LATENCY_HISTOGRAM_CAPACITY],
334 idx: 0,
335 count: 0,
336 }
337 }
338
339 pub fn record(&mut self, d: Duration) {
341 self.samples[self.idx] = d;
342 self.idx = (self.idx + 1) % LATENCY_HISTOGRAM_CAPACITY;
343 self.count = self.count.saturating_add(1);
344 }
345
346 pub fn total_recorded(&self) -> u64 {
348 self.count
349 }
350
351 pub fn live_samples(&self) -> usize {
353 if (self.count as usize) < LATENCY_HISTOGRAM_CAPACITY {
354 self.count as usize
355 } else {
356 LATENCY_HISTOGRAM_CAPACITY
357 }
358 }
359
360 pub fn p50(&self) -> Duration {
362 self.percentile(0.50)
363 }
364
365 pub fn p99(&self) -> Duration {
367 self.percentile(0.99)
368 }
369
370 pub fn percentile(&self, p: f64) -> Duration {
372 let n = self.live_samples();
373 if n == 0 {
374 return Duration::ZERO;
375 }
376 let mut buf: Vec<Duration> = self.samples[..n].to_vec();
377 buf.sort_unstable();
378 let p = p.clamp(0.0, 1.0);
379 let rank = ((p * n as f64).ceil() as usize)
381 .saturating_sub(1)
382 .min(n - 1);
383 buf[rank]
384 }
385}
386
387impl Default for LatencyHistogram {
388 fn default() -> Self {
389 Self::new()
390 }
391}
392
393#[derive(Debug)]
395struct ActorMetricState {
396 last_sample_at: Instant,
397 inbound_ewma: f64,
398 outbound_ewma: f64,
399 inbound_count: u64,
400 outbound_count: u64,
401 inbound_delta: u64,
403 outbound_delta: u64,
404 latency_histogram: LatencyHistogram,
405 state_size_bytes: u64,
406 queue_depth: usize,
407 gpu_utilization: f32,
408 tenant_id: u64,
409 hlc_node_id: u64,
410 initialized: bool,
411}
412
413impl ActorMetricState {
414 fn new(hlc_node_id: u64) -> Self {
415 Self {
416 last_sample_at: Instant::now(),
417 inbound_ewma: 0.0,
418 outbound_ewma: 0.0,
419 inbound_count: 0,
420 outbound_count: 0,
421 inbound_delta: 0,
422 outbound_delta: 0,
423 latency_histogram: LatencyHistogram::new(),
424 state_size_bytes: 0,
425 queue_depth: 0,
426 gpu_utilization: 0.0,
427 tenant_id: 0,
428 hlc_node_id,
429 initialized: false,
430 }
431 }
432}
433
434pub struct MetricAggregator {
451 per_actor: RwLock<HashMap<ActorId, ActorMetricState>>,
452 ewma_alpha: f64,
453 hlc_node_id: u64,
454}
455
456impl MetricAggregator {
457 pub fn new() -> Self {
459 Self::with_alpha(DEFAULT_EWMA_ALPHA)
460 }
461
462 pub fn with_alpha(alpha: f64) -> Self {
466 let ewma_alpha = if alpha.is_finite() && alpha > 0.0 && alpha <= 1.0 {
467 alpha
468 } else {
469 DEFAULT_EWMA_ALPHA
470 };
471 Self {
472 per_actor: RwLock::new(HashMap::new()),
473 ewma_alpha,
474 hlc_node_id: 0,
475 }
476 }
477
478 pub fn with_hlc_node_id(mut self, node_id: u64) -> Self {
480 self.hlc_node_id = node_id;
481 self
482 }
483
484 pub fn ewma_alpha(&self) -> f64 {
486 self.ewma_alpha
487 }
488
489 pub fn record_inbound(&self, actor_id: ActorId, count: u64) {
491 let mut guard = self.per_actor.write();
492 let state = guard
493 .entry(actor_id)
494 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
495 state.inbound_count = state.inbound_count.saturating_add(count);
496 state.inbound_delta = state.inbound_delta.saturating_add(count);
497 }
498
499 pub fn record_outbound(&self, actor_id: ActorId, count: u64) {
501 let mut guard = self.per_actor.write();
502 let state = guard
503 .entry(actor_id)
504 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
505 state.outbound_count = state.outbound_count.saturating_add(count);
506 state.outbound_delta = state.outbound_delta.saturating_add(count);
507 }
508
509 pub fn record_latency(&self, actor_id: ActorId, d: Duration) {
511 let mut guard = self.per_actor.write();
512 let state = guard
513 .entry(actor_id)
514 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
515 state.latency_histogram.record(d);
516 }
517
518 pub fn set_queue_depth(&self, actor_id: ActorId, depth: usize) {
520 let mut guard = self.per_actor.write();
521 let state = guard
522 .entry(actor_id)
523 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
524 state.queue_depth = depth;
525 }
526
527 pub fn set_state_size(&self, actor_id: ActorId, bytes: u64) {
529 let mut guard = self.per_actor.write();
530 let state = guard
531 .entry(actor_id)
532 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
533 state.state_size_bytes = bytes;
534 }
535
536 pub fn set_gpu_utilization(&self, actor_id: ActorId, util: f32) {
538 let util = util.clamp(0.0, 1.0);
539 let mut guard = self.per_actor.write();
540 let state = guard
541 .entry(actor_id)
542 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
543 state.gpu_utilization = util;
544 }
545
546 pub fn set_tenant(&self, actor_id: ActorId, tenant_id: u64) {
548 let mut guard = self.per_actor.write();
549 let state = guard
550 .entry(actor_id)
551 .or_insert_with(|| ActorMetricState::new(self.hlc_node_id));
552 state.tenant_id = tenant_id;
553 }
554
555 pub fn remove_actor(&self, actor_id: &ActorId) -> bool {
557 self.per_actor.write().remove(actor_id).is_some()
558 }
559
560 pub fn tracked_actors(&self) -> usize {
562 self.per_actor.read().len()
563 }
564
565 pub fn snapshot(&self, actor_id: &ActorId) -> Option<LiveMetrics> {
569 let mut guard = self.per_actor.write();
570 let state = guard.get_mut(actor_id)?;
571 let (p50, p99) = (state.latency_histogram.p50(), state.latency_histogram.p99());
572 let metrics = Self::fold_snapshot(*actor_id, state, self.ewma_alpha, p50, p99);
573 Some(metrics)
574 }
575
576 pub fn snapshot_all(&self) -> Vec<LiveMetrics> {
578 let mut guard = self.per_actor.write();
579 let alpha = self.ewma_alpha;
580 let mut out = Vec::with_capacity(guard.len());
581 for (id, state) in guard.iter_mut() {
582 let (p50, p99) = (state.latency_histogram.p50(), state.latency_histogram.p99());
583 out.push(Self::fold_snapshot(*id, state, alpha, p50, p99));
584 }
585 out
586 }
587
588 fn fold_snapshot(
589 actor_id: ActorId,
590 state: &mut ActorMetricState,
591 alpha: f64,
592 p50: Duration,
593 p99: Duration,
594 ) -> LiveMetrics {
595 let now = Instant::now();
596 let elapsed = now.duration_since(state.last_sample_at).as_secs_f64();
597
598 let inbound_rate_sample = if elapsed > 0.0 {
601 state.inbound_delta as f64 / elapsed
602 } else {
603 0.0
604 };
605 let outbound_rate_sample = if elapsed > 0.0 {
606 state.outbound_delta as f64 / elapsed
607 } else {
608 0.0
609 };
610
611 if !state.initialized {
612 state.inbound_ewma = inbound_rate_sample;
615 state.outbound_ewma = outbound_rate_sample;
616 state.initialized = true;
617 } else {
618 state.inbound_ewma = alpha * inbound_rate_sample + (1.0 - alpha) * state.inbound_ewma;
619 state.outbound_ewma =
620 alpha * outbound_rate_sample + (1.0 - alpha) * state.outbound_ewma;
621 }
622
623 state.inbound_delta = 0;
624 state.outbound_delta = 0;
625 state.last_sample_at = now;
626
627 let physical = std::time::SystemTime::now()
628 .duration_since(std::time::UNIX_EPOCH)
629 .map(|d| d.as_micros() as u64)
630 .unwrap_or(0);
631 let timestamp = HlcTimestamp::new(physical, 0, state.hlc_node_id);
632
633 LiveMetrics {
634 actor_id,
635 timestamp,
636 queue_depth: state.queue_depth,
637 inbound_rate: state.inbound_ewma,
638 outbound_rate: state.outbound_ewma,
639 latency_p50: p50,
640 latency_p99: p99,
641 state_size_bytes: state.state_size_bytes,
642 gpu_utilization: state.gpu_utilization,
643 tenant_id: state.tenant_id,
644 }
645 }
646}
647
648impl Default for MetricAggregator {
649 fn default() -> Self {
650 Self::new()
651 }
652}
653
654impl std::fmt::Debug for MetricAggregator {
655 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
656 f.debug_struct("MetricAggregator")
657 .field("ewma_alpha", &self.ewma_alpha)
658 .field("hlc_node_id", &self.hlc_node_id)
659 .field("tracked_actors", &self.tracked_actors())
660 .finish()
661 }
662}
663
664pub struct IntrospectionStream {
676 subscriptions: RwLock<HashMap<ActorId, Vec<Arc<SubscriberHandle>>>>,
679 global_subscriptions: RwLock<Vec<Arc<SubscriberHandle>>>,
681 aggregator: Arc<MetricAggregator>,
684 next_subscription_id: AtomicU64,
686}
687
688impl IntrospectionStream {
689 pub fn new() -> Self {
691 Self::with_aggregator(Arc::new(MetricAggregator::new()))
692 }
693
694 pub fn with_aggregator(aggregator: Arc<MetricAggregator>) -> Self {
696 Self {
697 subscriptions: RwLock::new(HashMap::new()),
698 global_subscriptions: RwLock::new(Vec::new()),
699 aggregator,
700 next_subscription_id: AtomicU64::new(1),
701 }
702 }
703
704 pub fn aggregator(&self) -> Arc<MetricAggregator> {
706 self.aggregator.clone()
707 }
708
709 pub fn subscribe(
720 &self,
721 actor_id: ActorId,
722 interval: Duration,
723 ) -> mpsc::UnboundedReceiver<LiveMetrics> {
724 let (tx, rx) = mpsc::unbounded_channel();
725 if interval.is_zero() {
726 drop(tx);
730 self.unsubscribe(actor_id);
731 return rx;
732 }
733 let handle = Arc::new(SubscriberHandle {
734 interval,
735 last_sent_at: parking_lot::Mutex::new(None),
736 subscription_id: self.next_subscription_id.fetch_add(1, Ordering::Relaxed),
737 sender: tx,
738 });
739 self.subscriptions
740 .write()
741 .entry(actor_id)
742 .or_default()
743 .push(handle);
744 rx
745 }
746
747 pub fn subscribe_all(&self, interval: Duration) -> mpsc::UnboundedReceiver<LiveMetrics> {
752 let (tx, rx) = mpsc::unbounded_channel();
753 if interval.is_zero() {
754 drop(tx);
755 return rx;
756 }
757 let handle = Arc::new(SubscriberHandle {
758 interval,
759 last_sent_at: parking_lot::Mutex::new(None),
760 subscription_id: self.next_subscription_id.fetch_add(1, Ordering::Relaxed),
761 sender: tx,
762 });
763 self.global_subscriptions.write().push(handle);
764 rx
765 }
766
767 pub fn unsubscribe(&self, actor_id: ActorId) {
769 self.subscriptions.write().remove(&actor_id);
770 }
771
772 pub fn unsubscribe_all(&self) {
774 self.global_subscriptions.write().clear();
775 }
776
777 pub fn emit(&self, metrics: LiveMetrics) {
784 let actor_id = metrics.actor_id;
786 {
787 let mut guard = self.subscriptions.write();
788 if let Some(subs) = guard.get_mut(&actor_id) {
789 Self::dispatch_and_prune(subs, &metrics);
790 if subs.is_empty() {
791 guard.remove(&actor_id);
792 }
793 }
794 }
795 {
797 let mut guard = self.global_subscriptions.write();
798 Self::dispatch_and_prune(&mut guard, &metrics);
799 }
800 }
801
802 fn dispatch_and_prune(subs: &mut Vec<Arc<SubscriberHandle>>, metrics: &LiveMetrics) {
803 subs.retain(|handle| {
804 if handle.is_closed() {
805 return false;
806 }
807 match handle.try_send(metrics.clone()) {
808 Ok(_) => true,
809 Err(()) => false,
810 }
811 });
812 }
813
814 pub fn subscriber_count(&self, actor_id: &ActorId) -> usize {
816 self.subscriptions
817 .read()
818 .get(actor_id)
819 .map(|v| v.iter().filter(|h| !h.is_closed()).count())
820 .unwrap_or(0)
821 }
822
823 pub fn global_subscriber_count(&self) -> usize {
825 self.global_subscriptions
826 .read()
827 .iter()
828 .filter(|h| !h.is_closed())
829 .count()
830 }
831
832 pub fn total_subscribers(&self) -> usize {
834 let per_actor: usize = self
835 .subscriptions
836 .read()
837 .values()
838 .map(|v| v.iter().filter(|h| !h.is_closed()).count())
839 .sum();
840 per_actor + self.global_subscriber_count()
841 }
842
843 pub fn tick(&self) -> usize {
846 let snapshots = self.aggregator.snapshot_all();
847 let n = snapshots.len();
848 for metrics in snapshots {
849 self.emit(metrics);
850 }
851 n
852 }
853}
854
855impl Default for IntrospectionStream {
856 fn default() -> Self {
857 Self::new()
858 }
859}
860
861impl std::fmt::Debug for IntrospectionStream {
862 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
863 f.debug_struct("IntrospectionStream")
864 .field("total_subscribers", &self.total_subscribers())
865 .field("aggregator", &self.aggregator)
866 .finish()
867 }
868}
869
870#[derive(Debug, Clone, Copy, PartialEq, Eq)]
884#[repr(C)]
885pub struct SubscribeMetricsRequest {
886 pub actor_id: u64,
888 pub interval_us: u64,
890 pub subscription_id: u64,
892}
893
894impl SubscribeMetricsRequest {
895 pub const WIRE_SIZE: usize = std::mem::size_of::<Self>();
897
898 pub const fn new(actor_id: u64, interval_us: u64, subscription_id: u64) -> Self {
900 Self {
901 actor_id,
902 interval_us,
903 subscription_id,
904 }
905 }
906
907 pub const fn unsubscribe(actor_id: u64, subscription_id: u64) -> Self {
909 Self::new(actor_id, 0, subscription_id)
910 }
911
912 pub const fn is_unsubscribe(&self) -> bool {
914 self.interval_us == 0
915 }
916
917 pub fn to_bytes(&self) -> [u8; Self::WIRE_SIZE] {
919 unsafe { std::mem::transmute::<Self, [u8; Self::WIRE_SIZE]>(*self) }
922 }
923
924 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
926 if bytes.len() < Self::WIRE_SIZE {
927 return None;
928 }
929 let mut buf = [0u8; Self::WIRE_SIZE];
930 buf.copy_from_slice(&bytes[..Self::WIRE_SIZE]);
931 Some(unsafe { std::mem::transmute::<[u8; Self::WIRE_SIZE], Self>(buf) })
933 }
934}
935
936#[derive(Debug, Clone, Copy, PartialEq, Eq)]
942#[repr(C)]
943pub struct LiveMetricsEvent {
944 pub subscription_id: u64,
946 pub actor_id: u64,
948 pub timestamp_us: u64,
950 pub inbound_total: u64,
952 pub outbound_total: u64,
954 pub tenant_id: u64,
956 pub queue_depth: u32,
958 pub latency_p50_us: u32,
960 pub latency_p99_us: u32,
962 pub state_size_bytes: u32,
964 pub gpu_utilization_pct: u8,
966 pub _pad: [u8; 7],
968}
969
970impl LiveMetricsEvent {
971 pub const WIRE_SIZE: usize = std::mem::size_of::<Self>();
973
974 #[allow(clippy::too_many_arguments)]
976 pub const fn new(
977 subscription_id: u64,
978 actor_id: u64,
979 timestamp_us: u64,
980 inbound_total: u64,
981 outbound_total: u64,
982 tenant_id: u64,
983 queue_depth: u32,
984 latency_p50_us: u32,
985 latency_p99_us: u32,
986 state_size_bytes: u32,
987 gpu_utilization_pct: u8,
988 ) -> Self {
989 Self {
990 subscription_id,
991 actor_id,
992 timestamp_us,
993 inbound_total,
994 outbound_total,
995 tenant_id,
996 queue_depth,
997 latency_p50_us,
998 latency_p99_us,
999 state_size_bytes,
1000 gpu_utilization_pct,
1001 _pad: [0; 7],
1002 }
1003 }
1004
1005 pub fn to_bytes(&self) -> [u8; Self::WIRE_SIZE] {
1007 unsafe { std::mem::transmute::<Self, [u8; Self::WIRE_SIZE]>(*self) }
1011 }
1012
1013 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
1015 if bytes.len() < Self::WIRE_SIZE {
1016 return None;
1017 }
1018 let mut buf = [0u8; Self::WIRE_SIZE];
1019 buf.copy_from_slice(&bytes[..Self::WIRE_SIZE]);
1020 Some(unsafe { std::mem::transmute::<[u8; Self::WIRE_SIZE], Self>(buf) })
1022 }
1023
1024 pub fn into_live_metrics(self, hlc_node_id: u64) -> LiveMetrics {
1026 LiveMetrics {
1027 actor_id: ActorId(self.actor_id as u32),
1028 timestamp: HlcTimestamp::new(self.timestamp_us, 0, hlc_node_id),
1029 queue_depth: self.queue_depth as usize,
1030 inbound_rate: 0.0,
1034 outbound_rate: 0.0,
1035 latency_p50: Duration::from_micros(self.latency_p50_us as u64),
1036 latency_p99: Duration::from_micros(self.latency_p99_us as u64),
1037 state_size_bytes: self.state_size_bytes as u64,
1038 gpu_utilization: (self.gpu_utilization_pct as f32 / 100.0).clamp(0.0, 1.0),
1039 tenant_id: self.tenant_id,
1040 }
1041 }
1042}
1043
1044const _: () = assert!(SubscribeMetricsRequest::WIRE_SIZE == 24);
1046const _: () = assert!(LiveMetricsEvent::WIRE_SIZE == 72);
1047
1048#[cfg(test)]
1049mod tests {
1050 use super::*;
1051
1052 #[test]
1053 fn test_trace_buffer_basic() {
1054 let mut buf = TraceBuffer::new(3);
1055
1056 for i in 0..5 {
1057 buf.record(TraceEntry {
1058 sequence: i,
1059 received_at: Instant::now(),
1060 duration: Duration::from_micros(100),
1061 source: None,
1062 outcome: TraceOutcome::Success,
1063 });
1064 }
1065
1066 assert_eq!(buf.len(), 3); assert_eq!(buf.total_recorded(), 5);
1068 }
1069
1070 #[test]
1071 fn test_trace_buffer_recent() {
1072 let mut buf = TraceBuffer::new(10);
1073
1074 for i in 0..5 {
1075 buf.record(TraceEntry {
1076 sequence: i,
1077 received_at: Instant::now(),
1078 duration: Duration::from_micros(100),
1079 source: None,
1080 outcome: TraceOutcome::Success,
1081 });
1082 std::thread::sleep(Duration::from_millis(1));
1083 }
1084
1085 let recent = buf.recent(3);
1086 assert_eq!(recent.len(), 3);
1087 assert!(recent[0].sequence > recent[2].sequence);
1089 }
1090
1091 #[test]
1092 fn test_introspection_service() {
1093 let mut svc = IntrospectionService::new(10);
1094
1095 let actor = ActorId(1);
1096 svc.register_actor(actor);
1097
1098 svc.record_trace(
1099 actor,
1100 TraceEntry {
1101 sequence: 1,
1102 received_at: Instant::now(),
1103 duration: Duration::from_micros(50),
1104 source: Some(ActorId(2)),
1105 outcome: TraceOutcome::Forwarded(ActorId(3)),
1106 },
1107 );
1108
1109 let traces = svc.get_traces(actor, 10);
1110 assert_eq!(traces.len(), 1);
1111 assert_eq!(traces[0].sequence, 1);
1112 }
1113
1114 #[test]
1115 fn test_queue_snapshot_utilization() {
1116 let snap = QueueSnapshot {
1117 input_depth: 75,
1118 input_capacity: 100,
1119 ..Default::default()
1120 };
1121 assert!((snap.input_utilization() - 0.75).abs() < 0.01);
1122 }
1123
1124 #[test]
1125 fn test_trace_outcome_variants() {
1126 assert_eq!(TraceOutcome::Success, TraceOutcome::Success);
1127 assert_ne!(TraceOutcome::Success, TraceOutcome::Dropped);
1128 }
1129
1130 fn mk_metrics(id: u32) -> LiveMetrics {
1135 LiveMetrics {
1136 actor_id: ActorId(id),
1137 timestamp: HlcTimestamp::new(0, 0, 0),
1138 queue_depth: 0,
1139 inbound_rate: 0.0,
1140 outbound_rate: 0.0,
1141 latency_p50: Duration::ZERO,
1142 latency_p99: Duration::ZERO,
1143 state_size_bytes: 0,
1144 gpu_utilization: 0.0,
1145 tenant_id: 0,
1146 }
1147 }
1148
1149 #[tokio::test]
1150 async fn test_subscribe_receives_metric() {
1151 let stream = IntrospectionStream::new();
1152 let actor = ActorId(42);
1153 let mut rx = stream.subscribe(actor, Duration::from_nanos(1));
1154 assert_eq!(stream.subscriber_count(&actor), 1);
1155
1156 stream.emit(mk_metrics(42));
1157 let got = rx.recv().await.expect("metric delivered");
1158 assert_eq!(got.actor_id, actor);
1159 }
1160
1161 #[tokio::test]
1162 async fn test_subscribe_filters_by_actor() {
1163 let stream = IntrospectionStream::new();
1164 let mut rx = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1165
1166 stream.emit(mk_metrics(2));
1168 stream.emit(mk_metrics(1));
1170
1171 let got = rx.recv().await.expect("metric delivered");
1172 assert_eq!(got.actor_id, ActorId(1));
1173 assert!(rx.try_recv().is_err(), "no other metric should be queued");
1174 }
1175
1176 #[tokio::test]
1177 async fn test_unsubscribe_stops_delivery() {
1178 let stream = IntrospectionStream::new();
1179 let actor = ActorId(7);
1180 let mut rx = stream.subscribe(actor, Duration::from_nanos(1));
1181 stream.unsubscribe(actor);
1182 assert_eq!(stream.subscriber_count(&actor), 0);
1183 stream.emit(mk_metrics(7));
1184
1185 match rx.recv().await {
1188 None => {}
1189 Some(_) => panic!("no metric should be delivered after unsubscribe"),
1190 }
1191 }
1192
1193 #[tokio::test]
1194 async fn test_subscribe_interval_zero_is_unsubscribe() {
1195 let stream = IntrospectionStream::new();
1196 let actor = ActorId(3);
1197 let _keep = stream.subscribe(actor, Duration::from_micros(1));
1199 assert_eq!(stream.subscriber_count(&actor), 1);
1200
1201 let mut rx_zero = stream.subscribe(actor, Duration::ZERO);
1202 assert_eq!(stream.subscriber_count(&actor), 0);
1203 stream.emit(mk_metrics(3));
1204 assert!(rx_zero.recv().await.is_none(), "zero interval = closed");
1205 }
1206
1207 #[tokio::test]
1208 async fn test_multi_subscriber_fanout() {
1209 let stream = IntrospectionStream::new();
1210 let actor = ActorId(10);
1211 let mut rx1 = stream.subscribe(actor, Duration::from_nanos(1));
1212 let mut rx2 = stream.subscribe(actor, Duration::from_nanos(1));
1213 let mut rx3 = stream.subscribe(actor, Duration::from_nanos(1));
1214 assert_eq!(stream.subscriber_count(&actor), 3);
1215
1216 stream.emit(mk_metrics(10));
1217
1218 assert_eq!(rx1.recv().await.unwrap().actor_id, actor);
1219 assert_eq!(rx2.recv().await.unwrap().actor_id, actor);
1220 assert_eq!(rx3.recv().await.unwrap().actor_id, actor);
1221 }
1222
1223 #[tokio::test]
1224 async fn test_broken_receiver_auto_cleanup() {
1225 let stream = IntrospectionStream::new();
1226 let actor = ActorId(9);
1227 let rx1 = stream.subscribe(actor, Duration::from_nanos(1));
1228 let mut rx2 = stream.subscribe(actor, Duration::from_nanos(1));
1229 assert_eq!(stream.subscriber_count(&actor), 2);
1230
1231 drop(rx1);
1233 stream.emit(mk_metrics(9));
1234 assert_eq!(stream.subscriber_count(&actor), 1);
1235
1236 assert_eq!(rx2.recv().await.unwrap().actor_id, actor);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_broken_receiver_closes_empty_bucket() {
1242 let stream = IntrospectionStream::new();
1243 let actor = ActorId(15);
1244 let rx = stream.subscribe(actor, Duration::from_nanos(1));
1245 drop(rx);
1246 stream.emit(mk_metrics(15));
1247 assert_eq!(stream.subscriber_count(&actor), 0);
1248 stream.emit(mk_metrics(15));
1250 }
1251
1252 #[tokio::test]
1253 async fn test_subscribe_all_receives_all_actors() {
1254 let stream = IntrospectionStream::new();
1255 let mut rx = stream.subscribe_all(Duration::from_nanos(1));
1256 stream.emit(mk_metrics(1));
1257 stream.emit(mk_metrics(2));
1258 stream.emit(mk_metrics(3));
1259
1260 let mut seen = Vec::new();
1261 for _ in 0..3 {
1262 seen.push(rx.recv().await.unwrap().actor_id);
1263 }
1264 seen.sort_by_key(|a| a.0);
1265 assert_eq!(seen, vec![ActorId(1), ActorId(2), ActorId(3)]);
1266 }
1267
1268 #[tokio::test]
1269 async fn test_subscribe_all_plus_specific_both_fire() {
1270 let stream = IntrospectionStream::new();
1271 let mut rx_all = stream.subscribe_all(Duration::from_nanos(1));
1272 let mut rx_one = stream.subscribe(ActorId(5), Duration::from_nanos(1));
1273
1274 stream.emit(mk_metrics(5));
1275
1276 assert_eq!(rx_all.recv().await.unwrap().actor_id, ActorId(5));
1277 assert_eq!(rx_one.recv().await.unwrap().actor_id, ActorId(5));
1278 }
1279
1280 #[tokio::test]
1281 async fn test_subscribe_all_interval_zero_is_no_op() {
1282 let stream = IntrospectionStream::new();
1283 let mut rx = stream.subscribe_all(Duration::ZERO);
1284 stream.emit(mk_metrics(1));
1285 assert!(rx.recv().await.is_none());
1286 assert_eq!(stream.global_subscriber_count(), 0);
1287 }
1288
1289 #[tokio::test]
1290 async fn test_unsubscribe_all_clears_global() {
1291 let stream = IntrospectionStream::new();
1292 let _rx1 = stream.subscribe_all(Duration::from_nanos(1));
1293 let _rx2 = stream.subscribe_all(Duration::from_nanos(1));
1294 assert_eq!(stream.global_subscriber_count(), 2);
1295 stream.unsubscribe_all();
1296 assert_eq!(stream.global_subscriber_count(), 0);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_interval_gating_suppresses_faster_emits() {
1301 let stream = IntrospectionStream::new();
1302 let actor = ActorId(11);
1303 let mut rx = stream.subscribe(actor, Duration::from_millis(500));
1304
1305 stream.emit(mk_metrics(11));
1307 stream.emit(mk_metrics(11));
1309
1310 assert_eq!(rx.recv().await.unwrap().actor_id, actor);
1311 assert!(
1312 rx.try_recv().is_err(),
1313 "second emit should be gated by interval"
1314 );
1315 }
1316
1317 #[tokio::test]
1318 async fn test_total_subscribers_sums_buckets() {
1319 let stream = IntrospectionStream::new();
1320 let _a = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1321 let _b = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1322 let _c = stream.subscribe(ActorId(2), Duration::from_nanos(1));
1323 let _d = stream.subscribe_all(Duration::from_nanos(1));
1324 assert_eq!(stream.total_subscribers(), 4);
1325 }
1326
1327 #[test]
1328 fn test_subscription_ids_are_unique_and_monotonic() {
1329 let stream = IntrospectionStream::new();
1330 let _a = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1331 let _b = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1332 let _c = stream.subscribe_all(Duration::from_nanos(1));
1333
1334 let subs = stream.subscriptions.read();
1335 let handles = subs.get(&ActorId(1)).expect("bucket exists");
1336 assert_eq!(handles.len(), 2);
1337 assert!(handles[0].subscription_id < handles[1].subscription_id);
1338
1339 let globals = stream.global_subscriptions.read();
1340 assert_eq!(globals.len(), 1);
1341 assert!(globals[0].subscription_id > handles[1].subscription_id);
1342 }
1343
1344 #[test]
1347 fn test_aggregator_record_and_snapshot() {
1348 let agg = MetricAggregator::new();
1349 let a = ActorId(1);
1350 agg.record_inbound(a, 10);
1351 agg.record_outbound(a, 5);
1352 agg.set_queue_depth(a, 3);
1353 agg.set_state_size(a, 4096);
1354 agg.set_gpu_utilization(a, 0.75);
1355 agg.set_tenant(a, 42);
1356
1357 let snap = agg.snapshot(&a).expect("snapshot exists");
1358 assert_eq!(snap.queue_depth, 3);
1359 assert_eq!(snap.state_size_bytes, 4096);
1360 assert!((snap.gpu_utilization - 0.75).abs() < 1e-6);
1361 assert_eq!(snap.tenant_id, 42);
1362 assert!(snap.inbound_rate > 0.0);
1364 assert!(snap.outbound_rate > 0.0);
1365 }
1366
1367 #[test]
1368 fn test_aggregator_snapshot_unknown_actor_returns_none() {
1369 let agg = MetricAggregator::new();
1370 assert!(agg.snapshot(&ActorId(999)).is_none());
1371 }
1372
1373 #[test]
1374 fn test_aggregator_snapshot_all_covers_every_actor() {
1375 let agg = MetricAggregator::new();
1376 for i in 0..5 {
1377 agg.record_inbound(ActorId(i), 1);
1378 }
1379 let all = agg.snapshot_all();
1380 assert_eq!(all.len(), 5);
1381 let mut ids: Vec<u32> = all.iter().map(|m| m.actor_id.0).collect();
1382 ids.sort();
1383 assert_eq!(ids, vec![0, 1, 2, 3, 4]);
1384 }
1385
1386 #[test]
1387 fn test_aggregator_remove_actor() {
1388 let agg = MetricAggregator::new();
1389 agg.record_inbound(ActorId(1), 1);
1390 assert_eq!(agg.tracked_actors(), 1);
1391 assert!(agg.remove_actor(&ActorId(1)));
1392 assert_eq!(agg.tracked_actors(), 0);
1393 assert!(!agg.remove_actor(&ActorId(1)));
1394 }
1395
1396 #[test]
1397 fn test_aggregator_gpu_utilization_clamped() {
1398 let agg = MetricAggregator::new();
1399 let a = ActorId(1);
1400 agg.record_inbound(a, 1);
1401 agg.set_gpu_utilization(a, 5.0);
1402 assert!((agg.snapshot(&a).unwrap().gpu_utilization - 1.0).abs() < 1e-6);
1403 agg.set_gpu_utilization(a, -1.0);
1404 assert_eq!(agg.snapshot(&a).unwrap().gpu_utilization, 0.0);
1405 }
1406
1407 #[test]
1408 fn test_aggregator_with_custom_alpha() {
1409 let agg = MetricAggregator::with_alpha(0.5);
1410 assert!((agg.ewma_alpha() - 0.5).abs() < 1e-9);
1411 let agg = MetricAggregator::with_alpha(0.0);
1413 assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1414 let agg = MetricAggregator::with_alpha(2.0);
1415 assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1416 let agg = MetricAggregator::with_alpha(f64::NAN);
1417 assert!((agg.ewma_alpha() - DEFAULT_EWMA_ALPHA).abs() < 1e-9);
1418 }
1419
1420 #[test]
1421 fn test_aggregator_ewma_smooths_spikes() {
1422 let agg = MetricAggregator::with_alpha(0.1);
1424 let a = ActorId(1);
1425
1426 for _ in 0..5 {
1428 agg.record_inbound(a, 100);
1429 std::thread::sleep(Duration::from_millis(20));
1430 let _ = agg.snapshot(&a);
1431 }
1432 let baseline = agg.snapshot(&a).unwrap().inbound_rate;
1433
1434 agg.record_inbound(a, 1_000_000);
1437 std::thread::sleep(Duration::from_millis(20));
1438 let after = agg.snapshot(&a).unwrap().inbound_rate;
1439
1440 assert!(after > baseline, "spike should raise the smoothed rate");
1441 let raw_spike_rate = 1_000_000.0 / 0.020;
1444 assert!(
1445 after < raw_spike_rate,
1446 "EWMA must not fully adopt a single spike (after={after}, raw={raw_spike_rate})"
1447 );
1448 }
1449
1450 #[test]
1451 fn test_aggregator_ewma_known_sequence() {
1452 let agg = MetricAggregator::with_alpha(1.0);
1455 let a = ActorId(1);
1456
1457 agg.record_inbound(a, 10);
1458 std::thread::sleep(Duration::from_millis(10));
1459 let first = agg.snapshot(&a).unwrap().inbound_rate;
1460 assert!(first > 0.0);
1461
1462 agg.record_inbound(a, 0); std::thread::sleep(Duration::from_millis(10));
1465 let second = agg.snapshot(&a).unwrap().inbound_rate;
1466 assert_eq!(second, 0.0, "alpha=1 adopts the raw rate verbatim");
1467 }
1468
1469 #[test]
1472 fn test_latency_histogram_empty() {
1473 let h = LatencyHistogram::new();
1474 assert_eq!(h.p50(), Duration::ZERO);
1475 assert_eq!(h.p99(), Duration::ZERO);
1476 assert_eq!(h.live_samples(), 0);
1477 assert_eq!(h.total_recorded(), 0);
1478 }
1479
1480 #[test]
1481 fn test_latency_histogram_percentiles() {
1482 let mut h = LatencyHistogram::new();
1483 for i in 1..=100u64 {
1484 h.record(Duration::from_millis(i));
1485 }
1486 assert_eq!(h.live_samples(), 100);
1487 assert_eq!(h.p50(), Duration::from_millis(50));
1489 assert_eq!(h.p99(), Duration::from_millis(99));
1491 }
1492
1493 #[test]
1494 fn test_latency_histogram_ring_wraparound() {
1495 let mut h = LatencyHistogram::new();
1496 for i in 0..(LATENCY_HISTOGRAM_CAPACITY + 100) {
1498 h.record(Duration::from_micros(i as u64));
1499 }
1500 assert_eq!(h.live_samples(), LATENCY_HISTOGRAM_CAPACITY);
1501 assert_eq!(
1502 h.total_recorded(),
1503 (LATENCY_HISTOGRAM_CAPACITY + 100) as u64
1504 );
1505 assert!(h.p99() > Duration::from_micros(100));
1507 }
1508
1509 #[test]
1510 fn test_latency_histogram_percentile_clamps() {
1511 let mut h = LatencyHistogram::new();
1512 h.record(Duration::from_millis(5));
1513 h.record(Duration::from_millis(10));
1514 assert_eq!(h.percentile(-1.0), h.percentile(0.0));
1515 assert_eq!(h.percentile(2.0), h.percentile(1.0));
1516 }
1517
1518 #[test]
1519 fn test_aggregator_latency_snapshot_reflects_histogram() {
1520 let agg = MetricAggregator::new();
1521 let a = ActorId(1);
1522 for i in 1..=10u64 {
1523 agg.record_latency(a, Duration::from_millis(i));
1524 }
1525 agg.record_inbound(a, 1); let snap = agg.snapshot(&a).unwrap();
1527 assert!(snap.latency_p50 >= Duration::from_millis(5));
1528 assert!(snap.latency_p99 >= Duration::from_millis(9));
1529 }
1530
1531 #[tokio::test]
1534 async fn test_stream_tick_emits_all_aggregated() {
1535 let stream = IntrospectionStream::new();
1536 let agg = stream.aggregator();
1537 agg.record_inbound(ActorId(1), 10);
1538 agg.record_inbound(ActorId(2), 20);
1539 let mut rx = stream.subscribe_all(Duration::from_nanos(1));
1540
1541 let emitted = stream.tick();
1542 assert_eq!(emitted, 2);
1543
1544 let mut seen = Vec::new();
1545 for _ in 0..2 {
1546 seen.push(rx.recv().await.unwrap().actor_id.0);
1547 }
1548 seen.sort();
1549 assert_eq!(seen, vec![1, 2]);
1550 }
1551
1552 #[tokio::test]
1553 async fn test_stream_with_shared_aggregator() {
1554 let agg = Arc::new(MetricAggregator::new());
1555 let stream = IntrospectionStream::with_aggregator(agg.clone());
1556 assert!(Arc::ptr_eq(&agg, &stream.aggregator()));
1557 agg.record_inbound(ActorId(1), 1);
1558 let mut rx = stream.subscribe(ActorId(1), Duration::from_nanos(1));
1559 assert_eq!(stream.tick(), 1);
1560 assert_eq!(rx.recv().await.unwrap().actor_id, ActorId(1));
1561 }
1562
1563 #[tokio::test]
1566 async fn test_concurrent_subscribe_unsubscribe_race() {
1567 use std::sync::Barrier;
1571 use std::thread;
1572
1573 let stream = Arc::new(IntrospectionStream::new());
1574 let barrier = Arc::new(Barrier::new(4));
1575 let mut handles = Vec::new();
1576
1577 for worker in 0..2 {
1579 let s = stream.clone();
1580 let b = barrier.clone();
1581 handles.push(thread::spawn(move || {
1582 b.wait();
1583 for i in 0..200 {
1584 let actor = ActorId((i + worker) % 8);
1585 let _rx = s.subscribe(actor, Duration::from_nanos(1));
1586 if i % 3 == 0 {
1587 s.unsubscribe(actor);
1588 }
1589 }
1590 }));
1591 }
1592
1593 {
1595 let s = stream.clone();
1596 let b = barrier.clone();
1597 handles.push(thread::spawn(move || {
1598 b.wait();
1599 for i in 0..400 {
1600 s.emit(mk_metrics((i % 8) as u32));
1601 }
1602 }));
1603 }
1604
1605 {
1607 let s = stream.clone();
1608 let b = barrier.clone();
1609 handles.push(thread::spawn(move || {
1610 b.wait();
1611 for i in 0..400 {
1612 let _ = s.subscriber_count(&ActorId((i % 8) as u32));
1613 let _ = s.total_subscribers();
1614 }
1615 }));
1616 }
1617
1618 for h in handles {
1619 h.join().expect("worker thread did not panic");
1620 }
1621 let mut rx = stream.subscribe(ActorId(123), Duration::from_nanos(1));
1623 stream.emit(mk_metrics(123));
1624 assert_eq!(rx.recv().await.unwrap().actor_id, ActorId(123));
1625 }
1626
1627 #[tokio::test]
1628 async fn test_concurrent_aggregator_record_and_snapshot() {
1629 use std::sync::Barrier;
1630 use std::thread;
1631
1632 let agg = Arc::new(MetricAggregator::new());
1633 let barrier = Arc::new(Barrier::new(3));
1634 let mut handles = Vec::new();
1635
1636 for _ in 0..2 {
1637 let a = agg.clone();
1638 let b = barrier.clone();
1639 handles.push(thread::spawn(move || {
1640 b.wait();
1641 for i in 0..500 {
1642 a.record_inbound(ActorId((i % 4) as u32), 1);
1643 a.record_latency(
1644 ActorId((i % 4) as u32),
1645 Duration::from_micros((i % 100) as u64),
1646 );
1647 }
1648 }));
1649 }
1650 {
1651 let a = agg.clone();
1652 let b = barrier.clone();
1653 handles.push(thread::spawn(move || {
1654 b.wait();
1655 for _ in 0..500 {
1656 let _ = a.snapshot_all();
1657 }
1658 }));
1659 }
1660
1661 for h in handles {
1662 h.join().expect("no panic");
1663 }
1664 assert_eq!(agg.tracked_actors(), 4);
1665 }
1666
1667 #[test]
1670 fn test_subscribe_metrics_request_roundtrip() {
1671 let req = SubscribeMetricsRequest::new(42, 1000, 7);
1672 assert!(!req.is_unsubscribe());
1673 let bytes = req.to_bytes();
1674 assert_eq!(bytes.len(), SubscribeMetricsRequest::WIRE_SIZE);
1675 let decoded = SubscribeMetricsRequest::from_bytes(&bytes).unwrap();
1676 assert_eq!(decoded, req);
1677 }
1678
1679 #[test]
1680 fn test_subscribe_metrics_request_unsubscribe() {
1681 let req = SubscribeMetricsRequest::unsubscribe(42, 7);
1682 assert!(req.is_unsubscribe());
1683 assert_eq!(req.interval_us, 0);
1684 let decoded =
1685 SubscribeMetricsRequest::from_bytes(&req.to_bytes()).expect("roundtrip decode");
1686 assert!(decoded.is_unsubscribe());
1687 }
1688
1689 #[test]
1690 fn test_subscribe_metrics_request_short_buffer() {
1691 let short = [0u8; SubscribeMetricsRequest::WIRE_SIZE - 1];
1692 assert!(SubscribeMetricsRequest::from_bytes(&short).is_none());
1693 }
1694
1695 #[test]
1696 fn test_live_metrics_event_roundtrip() {
1697 let evt = LiveMetricsEvent::new(
1698 9,
1699 42,
1700 1_700_000_000_000_000,
1701 123,
1702 45,
1703 7,
1704 16,
1705 500,
1706 2_500,
1707 8_192,
1708 73,
1709 );
1710 let bytes = evt.to_bytes();
1711 assert_eq!(bytes.len(), LiveMetricsEvent::WIRE_SIZE);
1712 let decoded = LiveMetricsEvent::from_bytes(&bytes).unwrap();
1713 assert_eq!(decoded, evt);
1714 }
1715
1716 #[test]
1717 fn test_live_metrics_event_short_buffer() {
1718 let short = [0u8; LiveMetricsEvent::WIRE_SIZE - 1];
1719 assert!(LiveMetricsEvent::from_bytes(&short).is_none());
1720 }
1721
1722 #[test]
1723 fn test_live_metrics_event_into_live_metrics() {
1724 let evt = LiveMetricsEvent::new(1, 42, 1_234_567, 0, 0, 9, 16, 500, 2_500, 4096, 80);
1725 let metrics = evt.into_live_metrics(3);
1726 assert_eq!(metrics.actor_id, ActorId(42));
1727 assert_eq!(metrics.timestamp.physical, 1_234_567);
1728 assert_eq!(metrics.timestamp.node_id, 3);
1729 assert_eq!(metrics.queue_depth, 16);
1730 assert_eq!(metrics.latency_p50, Duration::from_micros(500));
1731 assert_eq!(metrics.latency_p99, Duration::from_micros(2_500));
1732 assert_eq!(metrics.state_size_bytes, 4096);
1733 assert!((metrics.gpu_utilization - 0.80).abs() < 1e-6);
1734 assert_eq!(metrics.tenant_id, 9);
1735 }
1736
1737 #[test]
1738 fn test_wire_sizes_are_stable() {
1739 assert_eq!(SubscribeMetricsRequest::WIRE_SIZE, 24);
1742 assert_eq!(LiveMetricsEvent::WIRE_SIZE, 72);
1743 }
1744
1745 #[test]
1746 fn test_live_metrics_event_roundtrip_preserves_gpu_pct_clamp() {
1747 let evt = LiveMetricsEvent::new(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 200);
1749 let m = evt.into_live_metrics(0);
1750 assert_eq!(m.gpu_utilization, 1.0);
1751 }
1752
1753 #[test]
1754 fn test_subscribe_metrics_request_size_stable() {
1755 assert_eq!(std::mem::size_of::<SubscribeMetricsRequest>(), 24);
1757 }
1758
1759 #[test]
1760 fn test_live_metrics_event_size_stable() {
1761 assert_eq!(std::mem::size_of::<LiveMetricsEvent>(), 72);
1762 }
1763
1764 #[test]
1767 fn test_pull_api_still_works_alongside_streaming() {
1768 let mut svc = IntrospectionService::new(4);
1769 let a = ActorId(1);
1770 svc.register_actor(a);
1771 svc.record_trace(
1772 a,
1773 TraceEntry {
1774 sequence: 0,
1775 received_at: Instant::now(),
1776 duration: Duration::from_micros(10),
1777 source: None,
1778 outcome: TraceOutcome::Success,
1779 },
1780 );
1781 assert_eq!(svc.get_traces(a, 10).len(), 1);
1782 let stream = IntrospectionStream::new();
1784 assert_eq!(stream.total_subscribers(), 0);
1785 }
1786}