1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod stream_wrappers;
14mod wrappers;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct LogEntry {
19 pub index: u64,
20 pub timestamp: u64,
21 pub message: Option<String>,
22}
23
24impl LogEntry {
25 pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
26 let start_time = START_TIME.get().copied().unwrap_or(timestamp);
27 let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
28 Self {
29 index,
30 timestamp: timestamp_nanos,
31 message,
32 }
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ChannelType {
39 Bounded(usize),
40 Unbounded,
41 Oneshot,
42}
43
44impl std::fmt::Display for ChannelType {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
48 ChannelType::Unbounded => write!(f, "unbounded"),
49 ChannelType::Oneshot => write!(f, "oneshot"),
50 }
51 }
52}
53
54impl Serialize for ChannelType {
55 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: serde::Serializer,
58 {
59 serializer.serialize_str(&self.to_string())
60 }
61}
62
63impl<'de> Deserialize<'de> for ChannelType {
64 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
65 where
66 D: serde::Deserializer<'de>,
67 {
68 let s = String::deserialize(deserializer)?;
69
70 match s.as_str() {
71 "unbounded" => Ok(ChannelType::Unbounded),
72 "oneshot" => Ok(ChannelType::Oneshot),
73 _ => {
74 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
76 let size = inner
77 .parse()
78 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
79 Ok(ChannelType::Bounded(size))
80 } else {
81 Err(serde::de::Error::custom("invalid channel type"))
82 }
83 }
84 }
85 }
86}
87
88#[derive(Clone, Copy, Debug, Default)]
90pub enum Format {
91 #[default]
92 Table,
93 Json,
94 JsonPretty,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum ChannelState {
100 #[default]
101 Active,
102 Closed,
103 Full,
104 Notified,
105}
106
107impl std::fmt::Display for ChannelState {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "{}", self.as_str())
110 }
111}
112
113impl ChannelState {
114 pub fn as_str(&self) -> &'static str {
115 match self {
116 ChannelState::Active => "active",
117 ChannelState::Closed => "closed",
118 ChannelState::Full => "full",
119 ChannelState::Notified => "notified",
120 }
121 }
122}
123
124impl Serialize for ChannelState {
125 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126 where
127 S: serde::Serializer,
128 {
129 serializer.serialize_str(self.as_str())
130 }
131}
132
133impl<'de> Deserialize<'de> for ChannelState {
134 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135 where
136 D: serde::Deserializer<'de>,
137 {
138 let s = String::deserialize(deserializer)?;
139 match s.as_str() {
140 "active" => Ok(ChannelState::Active),
141 "closed" => Ok(ChannelState::Closed),
142 "full" => Ok(ChannelState::Full),
143 "notified" => Ok(ChannelState::Notified),
144 _ => Err(serde::de::Error::custom("invalid channel state")),
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
151pub(crate) struct ChannelStats {
152 pub(crate) id: u64,
153 pub(crate) source: &'static str,
154 pub(crate) label: Option<String>,
155 pub(crate) channel_type: ChannelType,
156 pub(crate) state: ChannelState,
157 pub(crate) sent_count: u64,
158 pub(crate) received_count: u64,
159 pub(crate) type_name: &'static str,
160 pub(crate) type_size: usize,
161 pub(crate) sent_logs: VecDeque<LogEntry>,
162 pub(crate) received_logs: VecDeque<LogEntry>,
163 pub(crate) iter: u32,
164}
165
166impl ChannelStats {
167 pub fn queued(&self) -> u64 {
168 self.sent_count
169 .saturating_sub(self.received_count)
170 .saturating_sub(1)
171 }
172
173 pub fn queued_bytes(&self) -> u64 {
174 self.queued() * self.type_size as u64
175 }
176}
177
178#[derive(Debug, Clone)]
180pub(crate) struct StreamStats {
181 pub(crate) id: u64,
182 pub(crate) source: &'static str,
183 pub(crate) label: Option<String>,
184 pub(crate) state: ChannelState, pub(crate) items_yielded: u64,
186 pub(crate) type_name: &'static str,
187 pub(crate) type_size: usize,
188 pub(crate) logs: VecDeque<LogEntry>,
189 pub(crate) iter: u32,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ChannelsJson {
195 pub current_elapsed_ns: u64,
197 pub channels: Vec<SerializableChannelStats>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct StreamsJson {
204 pub current_elapsed_ns: u64,
206 pub streams: Vec<SerializableStreamStats>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct CombinedJson {
213 pub current_elapsed_ns: u64,
215 pub channels: Vec<SerializableChannelStats>,
217 pub streams: Vec<SerializableStreamStats>,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct SerializableChannelStats {
224 pub id: u64,
225 pub source: String,
226 pub label: String,
227 pub has_custom_label: bool,
228 pub channel_type: ChannelType,
229 pub state: ChannelState,
230 pub sent_count: u64,
231 pub received_count: u64,
232 pub queued: u64,
233 pub type_name: String,
234 pub type_size: usize,
235 pub queued_bytes: u64,
236 pub iter: u32,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct SerializableStreamStats {
242 pub id: u64,
243 pub source: String,
244 pub label: String,
245 pub has_custom_label: bool,
246 pub state: ChannelState,
247 pub items_yielded: u64,
248 pub type_name: String,
249 pub type_size: usize,
250 pub iter: u32,
251}
252
253impl From<&ChannelStats> for SerializableChannelStats {
254 fn from(channel_stats: &ChannelStats) -> Self {
255 let label = resolve_label(
256 channel_stats.source,
257 channel_stats.label.as_deref(),
258 channel_stats.iter,
259 );
260
261 Self {
262 id: channel_stats.id,
263 source: channel_stats.source.to_string(),
264 label,
265 has_custom_label: channel_stats.label.is_some(),
266 channel_type: channel_stats.channel_type,
267 state: channel_stats.state,
268 sent_count: channel_stats.sent_count,
269 received_count: channel_stats.received_count,
270 queued: channel_stats.queued(),
271 type_name: channel_stats.type_name.to_string(),
272 type_size: channel_stats.type_size,
273 queued_bytes: channel_stats.queued_bytes(),
274 iter: channel_stats.iter,
275 }
276 }
277}
278
279impl From<&StreamStats> for SerializableStreamStats {
280 fn from(stream_stats: &StreamStats) -> Self {
281 let label = resolve_label(
282 stream_stats.source,
283 stream_stats.label.as_deref(),
284 stream_stats.iter,
285 );
286
287 Self {
288 id: stream_stats.id,
289 source: stream_stats.source.to_string(),
290 label,
291 has_custom_label: stream_stats.label.is_some(),
292 state: stream_stats.state,
293 items_yielded: stream_stats.items_yielded,
294 type_name: stream_stats.type_name.to_string(),
295 type_size: stream_stats.type_size,
296 iter: stream_stats.iter,
297 }
298 }
299}
300
301impl ChannelStats {
302 fn new(
303 id: u64,
304 source: &'static str,
305 label: Option<String>,
306 channel_type: ChannelType,
307 type_name: &'static str,
308 type_size: usize,
309 iter: u32,
310 ) -> Self {
311 Self {
312 id,
313 source,
314 label,
315 channel_type,
316 state: ChannelState::default(),
317 sent_count: 0,
318 received_count: 0,
319 type_name,
320 type_size,
321 sent_logs: VecDeque::new(),
322 received_logs: VecDeque::new(),
323 iter,
324 }
325 }
326
327 fn update_state(&mut self) {
328 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
329 return;
330 }
331
332 let queued = self.queued();
333 let is_full = match self.channel_type {
334 ChannelType::Bounded(cap) => queued >= cap as u64,
335 ChannelType::Oneshot => queued >= 1,
336 ChannelType::Unbounded => false,
337 };
338
339 if is_full {
340 self.state = ChannelState::Full;
341 } else {
342 self.state = ChannelState::Active;
343 }
344 }
345}
346
347impl StreamStats {
348 fn new(
349 id: u64,
350 source: &'static str,
351 label: Option<String>,
352 type_name: &'static str,
353 type_size: usize,
354 iter: u32,
355 ) -> Self {
356 Self {
357 id,
358 source,
359 label,
360 state: ChannelState::Active,
361 items_yielded: 0,
362 type_name,
363 type_size,
364 logs: VecDeque::new(),
365 iter,
366 }
367 }
368}
369
370#[derive(Debug)]
372pub(crate) enum ChannelEvent {
373 Created {
374 id: u64,
375 source: &'static str,
376 display_label: Option<String>,
377 channel_type: ChannelType,
378 type_name: &'static str,
379 type_size: usize,
380 },
381 MessageSent {
382 id: u64,
383 log: Option<String>,
384 timestamp: Instant,
385 },
386 MessageReceived {
387 id: u64,
388 timestamp: Instant,
389 },
390 Closed {
391 id: u64,
392 },
393 #[allow(dead_code)]
394 Notified {
395 id: u64,
396 },
397}
398
399#[derive(Debug)]
401pub(crate) enum StreamEvent {
402 Created {
403 id: u64,
404 source: &'static str,
405 display_label: Option<String>,
406 type_name: &'static str,
407 type_size: usize,
408 },
409 Yielded {
410 id: u64,
411 log: Option<String>,
412 timestamp: Instant,
413 },
414 Completed {
415 id: u64,
416 },
417}
418
419type ChannelStatsState = (
420 CbSender<ChannelEvent>,
421 Arc<RwLock<HashMap<u64, ChannelStats>>>,
422);
423type StreamStatsState = (
424 CbSender<StreamEvent>,
425 Arc<RwLock<HashMap<u64, StreamStats>>>,
426);
427
428static CHANNELS_STATE: OnceLock<ChannelStatsState> = OnceLock::new();
429
430static STREAMS_STATE: OnceLock<StreamStatsState> = OnceLock::new();
431
432static START_TIME: OnceLock<Instant> = OnceLock::new();
433
434pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
435
436pub(crate) static STREAM_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
437
438const DEFAULT_LOG_LIMIT: usize = 50;
439
440fn get_log_limit() -> usize {
441 std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
442 .ok()
443 .and_then(|s| s.parse().ok())
444 .unwrap_or(DEFAULT_LOG_LIMIT)
445}
446
447pub(crate) fn init_channels_state() -> &'static ChannelStatsState {
450 CHANNELS_STATE.get_or_init(|| {
451 START_TIME.get_or_init(Instant::now);
452
453 let (tx, rx) = unbounded::<ChannelEvent>();
454 let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
455 let stats_map_clone = Arc::clone(&stats_map);
456
457 std::thread::Builder::new()
458 .name("channel-stats-collector".into())
459 .spawn(move || {
460 while let Ok(event) = rx.recv() {
461 let mut stats = stats_map_clone.write().unwrap();
462 match event {
463 ChannelEvent::Created {
464 id,
465 source,
466 display_label,
467 channel_type,
468 type_name,
469 type_size,
470 } => {
471 let iter = stats.values().filter(|s| s.source == source).count() as u32;
473
474 stats.insert(
475 id,
476 ChannelStats::new(
477 id,
478 source,
479 display_label,
480 channel_type,
481 type_name,
482 type_size,
483 iter,
484 ),
485 );
486 }
487 ChannelEvent::MessageSent { id, log, timestamp } => {
488 if let Some(channel_stats) = stats.get_mut(&id) {
489 channel_stats.sent_count += 1;
490 channel_stats.update_state();
491
492 let limit = get_log_limit();
493 if channel_stats.sent_logs.len() >= limit {
494 channel_stats.sent_logs.pop_front();
495 }
496 channel_stats.sent_logs.push_back(LogEntry::new(
497 channel_stats.sent_count,
498 timestamp,
499 log,
500 ));
501 }
502 }
503 ChannelEvent::MessageReceived { id, timestamp } => {
504 if let Some(channel_stats) = stats.get_mut(&id) {
505 channel_stats.received_count += 1;
506 channel_stats.update_state();
507
508 let limit = get_log_limit();
509 if channel_stats.received_logs.len() >= limit {
510 channel_stats.received_logs.pop_front();
511 }
512 channel_stats.received_logs.push_back(LogEntry::new(
513 channel_stats.received_count,
514 timestamp,
515 None,
516 ));
517 }
518 }
519 ChannelEvent::Closed { id } => {
520 if let Some(channel_stats) = stats.get_mut(&id) {
521 channel_stats.state = ChannelState::Closed;
522 }
523 }
524 ChannelEvent::Notified { id } => {
525 if let Some(channel_stats) = stats.get_mut(&id) {
526 channel_stats.state = ChannelState::Notified;
527 }
528 }
529 }
530 }
531 })
532 .expect("Failed to spawn channel-stats-collector thread");
533
534 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
537 .ok()
538 .and_then(|p| p.parse::<u16>().ok())
539 .unwrap_or(6770);
540 let addr = format!("127.0.0.1:{}", port);
541
542 std::thread::spawn(move || {
543 start_metrics_server(&addr);
544 });
545
546 (tx, stats_map)
547 })
548}
549
550pub(crate) fn init_streams_state() -> &'static StreamStatsState {
553 STREAMS_STATE.get_or_init(|| {
554 START_TIME.get_or_init(Instant::now);
555
556 let (tx, rx) = unbounded::<StreamEvent>();
557 let stats_map = Arc::new(RwLock::new(HashMap::<u64, StreamStats>::new()));
558 let stats_map_clone = Arc::clone(&stats_map);
559
560 std::thread::Builder::new()
561 .name("stream-stats-collector".into())
562 .spawn(move || {
563 while let Ok(event) = rx.recv() {
564 let mut stats = stats_map_clone.write().unwrap();
565 match event {
566 StreamEvent::Created {
567 id,
568 source,
569 display_label,
570 type_name,
571 type_size,
572 } => {
573 let iter = stats.values().filter(|s| s.source == source).count() as u32;
575
576 stats.insert(
577 id,
578 StreamStats::new(
579 id,
580 source,
581 display_label,
582 type_name,
583 type_size,
584 iter,
585 ),
586 );
587 }
588 StreamEvent::Yielded { id, log, timestamp } => {
589 if let Some(stream_stats) = stats.get_mut(&id) {
590 stream_stats.items_yielded += 1;
591
592 let limit = get_log_limit();
593 if stream_stats.logs.len() >= limit {
594 stream_stats.logs.pop_front();
595 }
596 stream_stats.logs.push_back(LogEntry::new(
597 stream_stats.items_yielded,
598 timestamp,
599 log,
600 ));
601 }
602 }
603 StreamEvent::Completed { id } => {
604 if let Some(stream_stats) = stats.get_mut(&id) {
605 stream_stats.state = ChannelState::Closed;
606 }
607 }
608 }
609 }
610 })
611 .expect("Failed to spawn stream-stats-collector thread");
612
613 (tx, stats_map)
614 })
615}
616
617fn resolve_label(id: &'static str, provided: Option<&str>, iter: u32) -> String {
618 let base_label = if let Some(l) = provided {
619 l.to_string()
620 } else if let Some(pos) = id.rfind(':') {
621 let (path, line_part) = id.split_at(pos);
622 let line = &line_part[1..];
623 format!("{}:{}", extract_filename(path), line)
624 } else {
625 extract_filename(id)
626 };
627
628 if iter > 0 {
629 format!("{}-{}", base_label, iter + 1)
630 } else {
631 base_label
632 }
633}
634
635fn extract_filename(path: &str) -> String {
636 let components: Vec<&str> = path.split('/').collect();
637 if components.len() >= 2 {
638 format!(
639 "{}/{}",
640 components[components.len() - 2],
641 components[components.len() - 1]
642 )
643 } else {
644 path.to_string()
645 }
646}
647
648pub fn format_bytes(bytes: u64) -> String {
650 if bytes == 0 {
651 return "0 B".to_string();
652 }
653
654 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
655 let mut size = bytes as f64;
656 let mut unit_idx = 0;
657
658 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
659 size /= 1024.0;
660 unit_idx += 1;
661 }
662
663 if unit_idx == 0 {
664 format!("{} {}", bytes, UNITS[unit_idx])
665 } else {
666 format!("{:.1} {}", size, UNITS[unit_idx])
667 }
668}
669
670#[doc(hidden)]
674pub trait Instrument {
675 type Output;
676 fn instrument(
677 self,
678 source: &'static str,
679 label: Option<String>,
680 capacity: Option<usize>,
681 ) -> Self::Output;
682}
683
684#[doc(hidden)]
688pub trait InstrumentLog {
689 type Output;
690 fn instrument_log(
691 self,
692 source: &'static str,
693 label: Option<String>,
694 capacity: Option<usize>,
695 ) -> Self::Output;
696}
697
698#[doc(hidden)]
702pub trait InstrumentStream {
703 type Output;
704 fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output;
705}
706
707#[doc(hidden)]
711pub trait InstrumentStreamLog {
712 type Output;
713 fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output;
714}
715
716impl<S> InstrumentStream for S
718where
719 S: futures_util::Stream,
720{
721 type Output = stream_wrappers::InstrumentedStream<S>;
722
723 fn instrument_stream(self, source: &'static str, label: Option<String>) -> Self::Output {
724 stream_wrappers::InstrumentedStream::new(self, source, label)
725 }
726}
727
728impl<S> InstrumentStreamLog for S
730where
731 S: futures_util::Stream,
732 S::Item: std::fmt::Debug,
733{
734 type Output = stream_wrappers::InstrumentedStreamLog<S>;
735
736 fn instrument_stream_log(self, source: &'static str, label: Option<String>) -> Self::Output {
737 stream_wrappers::InstrumentedStreamLog::new(self, source, label)
738 }
739}
740
741cfg_if::cfg_if! {
742 if #[cfg(any(feature = "tokio", feature = "futures"))] {
743 use std::sync::LazyLock;
744 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
745 tokio::runtime::Builder::new_multi_thread()
746 .enable_time()
747 .build()
748 .unwrap()
749 });
750 }
751}
752
753#[macro_export]
778macro_rules! channel {
779 ($expr:expr) => {{
780 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
781 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
782 }};
783
784 ($expr:expr, label = $label:expr) => {{
785 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
786 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), None)
787 }};
788
789 ($expr:expr, capacity = $capacity:expr) => {{
790 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
791 const _: usize = $capacity;
792 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
793 }};
794
795 ($expr:expr, label = $label:expr, capacity = $capacity:expr) => {{
796 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
797 const _: usize = $capacity;
798 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
799 }};
800
801 ($expr:expr, capacity = $capacity:expr, label = $label:expr) => {{
802 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
803 const _: usize = $capacity;
804 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
805 }};
806
807 ($expr:expr, log = true) => {{
809 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
810 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
811 }};
812
813 ($expr:expr, label = $label:expr, log = true) => {{
814 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
815 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
816 }};
817
818 ($expr:expr, log = true, label = $label:expr) => {{
819 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
820 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
821 }};
822
823 ($expr:expr, capacity = $capacity:expr, log = true) => {{
824 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
825 const _: usize = $capacity;
826 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
827 }};
828
829 ($expr:expr, log = true, capacity = $capacity:expr) => {{
830 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
831 const _: usize = $capacity;
832 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
833 }};
834
835 ($expr:expr, label = $label:expr, capacity = $capacity:expr, log = true) => {{
836 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
837 const _: usize = $capacity;
838 $crate::InstrumentLog::instrument_log(
839 $expr,
840 CHANNEL_ID,
841 Some($label.to_string()),
842 Some($capacity),
843 )
844 }};
845
846 ($expr:expr, label = $label:expr, log = true, capacity = $capacity:expr) => {{
847 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
848 const _: usize = $capacity;
849 $crate::InstrumentLog::instrument_log(
850 $expr,
851 CHANNEL_ID,
852 Some($label.to_string()),
853 Some($capacity),
854 )
855 }};
856
857 ($expr:expr, capacity = $capacity:expr, label = $label:expr, log = true) => {{
858 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
859 const _: usize = $capacity;
860 $crate::InstrumentLog::instrument_log(
861 $expr,
862 CHANNEL_ID,
863 Some($label.to_string()),
864 Some($capacity),
865 )
866 }};
867
868 ($expr:expr, capacity = $capacity:expr, log = true, label = $label:expr) => {{
869 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
870 const _: usize = $capacity;
871 $crate::InstrumentLog::instrument_log(
872 $expr,
873 CHANNEL_ID,
874 Some($label.to_string()),
875 Some($capacity),
876 )
877 }};
878
879 ($expr:expr, log = true, label = $label:expr, capacity = $capacity:expr) => {{
880 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
881 const _: usize = $capacity;
882 $crate::InstrumentLog::instrument_log(
883 $expr,
884 CHANNEL_ID,
885 Some($label.to_string()),
886 Some($capacity),
887 )
888 }};
889
890 ($expr:expr, log = true, capacity = $capacity:expr, label = $label:expr) => {{
891 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
892 const _: usize = $capacity;
893 $crate::InstrumentLog::instrument_log(
894 $expr,
895 CHANNEL_ID,
896 Some($label.to_string()),
897 Some($capacity),
898 )
899 }};
900}
901
902#[macro_export]
925macro_rules! stream {
926 ($expr:expr) => {{
927 const STREAM_ID: &'static str = concat!(file!(), ":", line!());
928 $crate::InstrumentStream::instrument_stream($expr, STREAM_ID, None)
929 }};
930
931 ($expr:expr, label = $label:expr) => {{
932 const STREAM_ID: &'static str = concat!(file!(), ":", line!());
933 $crate::InstrumentStream::instrument_stream($expr, STREAM_ID, Some($label.to_string()))
934 }};
935
936 ($expr:expr, log = true) => {{
937 const STREAM_ID: &'static str = concat!(file!(), ":", line!());
938 $crate::InstrumentStreamLog::instrument_stream_log($expr, STREAM_ID, None)
939 }};
940
941 ($expr:expr, label = $label:expr, log = true) => {{
942 const STREAM_ID: &'static str = concat!(file!(), ":", line!());
943 $crate::InstrumentStreamLog::instrument_stream_log(
944 $expr,
945 STREAM_ID,
946 Some($label.to_string()),
947 )
948 }};
949
950 ($expr:expr, log = true, label = $label:expr) => {{
951 const STREAM_ID: &'static str = concat!(file!(), ":", line!());
952 $crate::InstrumentStreamLog::instrument_stream_log(
953 $expr,
954 STREAM_ID,
955 Some($label.to_string()),
956 )
957 }};
958}
959
960fn get_all_channel_stats() -> HashMap<u64, ChannelStats> {
961 if let Some((_, stats_map)) = CHANNELS_STATE.get() {
962 stats_map.read().unwrap().clone()
963 } else {
964 HashMap::new()
965 }
966}
967
968fn get_all_stream_stats() -> HashMap<u64, StreamStats> {
969 if let Some((_, stats_map)) = STREAMS_STATE.get() {
970 stats_map.read().unwrap().clone()
971 } else {
972 HashMap::new()
973 }
974}
975
976fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
979 let a_has_label = a.label.is_some();
980 let b_has_label = b.label.is_some();
981
982 match (a_has_label, b_has_label) {
983 (true, false) => std::cmp::Ordering::Less,
984 (false, true) => std::cmp::Ordering::Greater,
985 (true, true) => a
986 .label
987 .as_ref()
988 .unwrap()
989 .cmp(b.label.as_ref().unwrap())
990 .then_with(|| a.iter.cmp(&b.iter)),
991 (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
992 }
993}
994
995fn compare_stream_stats(a: &StreamStats, b: &StreamStats) -> std::cmp::Ordering {
998 let a_has_label = a.label.is_some();
999 let b_has_label = b.label.is_some();
1000
1001 match (a_has_label, b_has_label) {
1002 (true, false) => std::cmp::Ordering::Less,
1003 (false, true) => std::cmp::Ordering::Greater,
1004 (true, true) => a
1005 .label
1006 .as_ref()
1007 .unwrap()
1008 .cmp(b.label.as_ref().unwrap())
1009 .then_with(|| a.iter.cmp(&b.iter)),
1010 (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
1011 }
1012}
1013
1014pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
1015 let mut stats: Vec<ChannelStats> = get_all_channel_stats().into_values().collect();
1016 stats.sort_by(compare_channel_stats);
1017 stats
1018}
1019
1020pub(crate) fn get_sorted_stream_stats() -> Vec<StreamStats> {
1021 let mut stats: Vec<StreamStats> = get_all_stream_stats().into_values().collect();
1022 stats.sort_by(compare_stream_stats);
1023 stats
1024}
1025
1026pub(crate) fn get_channels_json() -> ChannelsJson {
1027 let channels = get_sorted_channel_stats()
1028 .iter()
1029 .map(SerializableChannelStats::from)
1030 .collect();
1031
1032 let current_elapsed_ns = START_TIME
1033 .get()
1034 .expect("START_TIME must be initialized")
1035 .elapsed()
1036 .as_nanos() as u64;
1037
1038 ChannelsJson {
1039 current_elapsed_ns,
1040 channels,
1041 }
1042}
1043
1044pub(crate) fn get_streams_json() -> StreamsJson {
1045 let streams = get_sorted_stream_stats()
1046 .iter()
1047 .map(SerializableStreamStats::from)
1048 .collect();
1049
1050 let current_elapsed_ns = START_TIME
1051 .get()
1052 .expect("START_TIME must be initialized")
1053 .elapsed()
1054 .as_nanos() as u64;
1055
1056 StreamsJson {
1057 current_elapsed_ns,
1058 streams,
1059 }
1060}
1061
1062pub(crate) fn get_combined_json() -> CombinedJson {
1063 let channels = get_sorted_channel_stats()
1064 .iter()
1065 .map(SerializableChannelStats::from)
1066 .collect();
1067
1068 let streams = get_sorted_stream_stats()
1069 .iter()
1070 .map(SerializableStreamStats::from)
1071 .collect();
1072
1073 let current_elapsed_ns = START_TIME
1074 .get()
1075 .expect("START_TIME must be initialized")
1076 .elapsed()
1077 .as_nanos() as u64;
1078
1079 CombinedJson {
1080 current_elapsed_ns,
1081 channels,
1082 streams,
1083 }
1084}
1085
1086#[derive(Debug, Clone, Serialize, Deserialize)]
1088pub struct ChannelLogs {
1089 pub id: String,
1090 pub sent_logs: Vec<LogEntry>,
1091 pub received_logs: Vec<LogEntry>,
1092}
1093
1094#[derive(Debug, Clone, Serialize, Deserialize)]
1096pub struct StreamLogs {
1097 pub id: String,
1098 pub logs: Vec<LogEntry>,
1099}
1100
1101pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
1102 let id = channel_id.parse::<u64>().ok()?;
1103 let stats = get_all_channel_stats();
1104 stats.get(&id).map(|channel_stats| {
1105 let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
1106 let mut received_logs: Vec<LogEntry> =
1107 channel_stats.received_logs.iter().cloned().collect();
1108
1109 sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
1111 received_logs.sort_by(|a, b| b.index.cmp(&a.index));
1112
1113 ChannelLogs {
1114 id: channel_id.to_string(),
1115 sent_logs,
1116 received_logs,
1117 }
1118 })
1119}
1120
1121pub(crate) fn get_stream_logs(stream_id: &str) -> Option<StreamLogs> {
1122 let id = stream_id.parse::<u64>().ok()?;
1123 let stats = get_all_stream_stats();
1124 stats.get(&id).map(|stream_stats| {
1125 let mut yielded_logs: Vec<LogEntry> = stream_stats.logs.iter().cloned().collect();
1126
1127 yielded_logs.sort_by(|a, b| b.index.cmp(&a.index));
1129
1130 StreamLogs {
1131 id: stream_id.to_string(),
1132 logs: yielded_logs,
1133 }
1134 })
1135}