1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod wrappers;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct LogEntry {
18 pub index: u64,
19 pub timestamp: u64,
20 pub message: Option<String>,
21}
22
23impl LogEntry {
24 pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
25 let start_time = START_TIME.get().copied().unwrap_or(timestamp);
26 let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
27 Self {
28 index,
29 timestamp: timestamp_nanos,
30 message,
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ChannelType {
38 Bounded(usize),
39 Unbounded,
40 Oneshot,
41}
42
43impl std::fmt::Display for ChannelType {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
47 ChannelType::Unbounded => write!(f, "unbounded"),
48 ChannelType::Oneshot => write!(f, "oneshot"),
49 }
50 }
51}
52
53impl Serialize for ChannelType {
54 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: serde::Serializer,
57 {
58 serializer.serialize_str(&self.to_string())
59 }
60}
61
62impl<'de> Deserialize<'de> for ChannelType {
63 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64 where
65 D: serde::Deserializer<'de>,
66 {
67 let s = String::deserialize(deserializer)?;
68
69 match s.as_str() {
70 "unbounded" => Ok(ChannelType::Unbounded),
71 "oneshot" => Ok(ChannelType::Oneshot),
72 _ => {
73 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
75 let size = inner
76 .parse()
77 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
78 Ok(ChannelType::Bounded(size))
79 } else {
80 Err(serde::de::Error::custom("invalid channel type"))
81 }
82 }
83 }
84 }
85}
86
87#[derive(Clone, Copy, Debug, Default)]
89pub enum Format {
90 #[default]
91 Table,
92 Json,
93 JsonPretty,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum ChannelState {
99 #[default]
100 Active,
101 Closed,
102 Full,
103 Notified,
104}
105
106impl std::fmt::Display for ChannelState {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 write!(f, "{}", self.as_str())
109 }
110}
111
112impl ChannelState {
113 pub fn as_str(&self) -> &'static str {
114 match self {
115 ChannelState::Active => "active",
116 ChannelState::Closed => "closed",
117 ChannelState::Full => "full",
118 ChannelState::Notified => "notified",
119 }
120 }
121}
122
123impl Serialize for ChannelState {
124 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
125 where
126 S: serde::Serializer,
127 {
128 serializer.serialize_str(self.as_str())
129 }
130}
131
132impl<'de> Deserialize<'de> for ChannelState {
133 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
134 where
135 D: serde::Deserializer<'de>,
136 {
137 let s = String::deserialize(deserializer)?;
138 match s.as_str() {
139 "active" => Ok(ChannelState::Active),
140 "closed" => Ok(ChannelState::Closed),
141 "full" => Ok(ChannelState::Full),
142 "notified" => Ok(ChannelState::Notified),
143 _ => Err(serde::de::Error::custom("invalid channel state")),
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
150pub(crate) struct ChannelStats {
151 pub(crate) id: u64,
152 pub(crate) source: &'static str,
153 pub(crate) label: Option<String>,
154 pub(crate) channel_type: ChannelType,
155 pub(crate) state: ChannelState,
156 pub(crate) sent_count: u64,
157 pub(crate) received_count: u64,
158 pub(crate) type_name: &'static str,
159 pub(crate) type_size: usize,
160 pub(crate) sent_logs: VecDeque<LogEntry>,
161 pub(crate) received_logs: VecDeque<LogEntry>,
162 pub(crate) iter: u32,
163}
164
165impl ChannelStats {
166 pub fn queued(&self) -> u64 {
167 self.sent_count
168 .saturating_sub(self.received_count)
169 .saturating_sub(1)
170 }
171
172 pub fn queued_bytes(&self) -> u64 {
173 self.queued() * self.type_size as u64
174 }
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct MetricsJson {
180 pub current_elapsed_ns: u64,
182 pub stats: Vec<SerializableChannelStats>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct SerializableChannelStats {
189 pub id: u64,
190 pub source: String,
191 pub label: String,
192 pub has_custom_label: bool,
193 pub channel_type: ChannelType,
194 pub state: ChannelState,
195 pub sent_count: u64,
196 pub received_count: u64,
197 pub queued: u64,
198 pub type_name: String,
199 pub type_size: usize,
200 pub queued_bytes: u64,
201 pub iter: u32,
202}
203
204impl From<&ChannelStats> for SerializableChannelStats {
205 fn from(stats: &ChannelStats) -> Self {
206 let label = resolve_label(stats.source, stats.label.as_deref(), stats.iter);
207
208 Self {
209 id: stats.id,
210 source: stats.source.to_string(),
211 label,
212 has_custom_label: stats.label.is_some(),
213 channel_type: stats.channel_type,
214 state: stats.state,
215 sent_count: stats.sent_count,
216 received_count: stats.received_count,
217 queued: stats.queued(),
218 type_name: stats.type_name.to_string(),
219 type_size: stats.type_size,
220 queued_bytes: stats.queued_bytes(),
221 iter: stats.iter,
222 }
223 }
224}
225
226impl ChannelStats {
227 fn new(
228 id: u64,
229 source: &'static str,
230 label: Option<String>,
231 channel_type: ChannelType,
232 type_name: &'static str,
233 type_size: usize,
234 iter: u32,
235 ) -> Self {
236 Self {
237 id,
238 source,
239 label,
240 channel_type,
241 state: ChannelState::default(),
242 sent_count: 0,
243 received_count: 0,
244 type_name,
245 type_size,
246 sent_logs: VecDeque::new(),
247 received_logs: VecDeque::new(),
248 iter,
249 }
250 }
251
252 fn update_state(&mut self) {
253 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
254 return;
255 }
256
257 let queued = self.queued();
258 let is_full = match self.channel_type {
259 ChannelType::Bounded(cap) => queued >= cap as u64,
260 ChannelType::Oneshot => queued >= 1,
261 ChannelType::Unbounded => false,
262 };
263
264 if is_full {
265 self.state = ChannelState::Full;
266 } else {
267 self.state = ChannelState::Active;
268 }
269 }
270}
271
272#[derive(Debug)]
274pub(crate) enum StatsEvent {
275 Created {
276 id: u64,
277 source: &'static str,
278 display_label: Option<String>,
279 channel_type: ChannelType,
280 type_name: &'static str,
281 type_size: usize,
282 },
283 MessageSent {
284 id: u64,
285 log: Option<String>,
286 timestamp: Instant,
287 },
288 MessageReceived {
289 id: u64,
290 timestamp: Instant,
291 },
292 Closed {
293 id: u64,
294 },
295 #[allow(dead_code)]
296 Notified {
297 id: u64,
298 },
299}
300
301type StatsState = (
302 CbSender<StatsEvent>,
303 Arc<RwLock<HashMap<u64, ChannelStats>>>,
304);
305
306static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
308
309static START_TIME: OnceLock<Instant> = OnceLock::new();
310
311pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
313
314const DEFAULT_LOG_LIMIT: usize = 50;
315
316fn get_log_limit() -> usize {
317 std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
318 .ok()
319 .and_then(|s| s.parse().ok())
320 .unwrap_or(DEFAULT_LOG_LIMIT)
321}
322
323fn init_stats_state() -> &'static StatsState {
326 STATS_STATE.get_or_init(|| {
327 START_TIME.get_or_init(Instant::now);
328
329 let (tx, rx) = unbounded::<StatsEvent>();
330 let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
331 let stats_map_clone = Arc::clone(&stats_map);
332
333 std::thread::Builder::new()
334 .name("channel-stats-collector".into())
335 .spawn(move || {
336 while let Ok(event) = rx.recv() {
337 let mut stats = stats_map_clone.write().unwrap();
338 match event {
339 StatsEvent::Created {
340 id,
341 source,
342 display_label,
343 channel_type,
344 type_name,
345 type_size,
346 } => {
347 let iter =
349 stats.values().filter(|cs| cs.source == source).count() as u32;
350
351 stats.insert(
352 id,
353 ChannelStats::new(
354 id,
355 source,
356 display_label,
357 channel_type,
358 type_name,
359 type_size,
360 iter,
361 ),
362 );
363 }
364 StatsEvent::MessageSent { id, log, timestamp } => {
365 if let Some(channel_stats) = stats.get_mut(&id) {
366 channel_stats.sent_count += 1;
367 channel_stats.update_state();
368
369 let limit = get_log_limit();
370 if channel_stats.sent_logs.len() >= limit {
371 channel_stats.sent_logs.pop_front();
372 }
373 channel_stats.sent_logs.push_back(LogEntry::new(
374 channel_stats.sent_count,
375 timestamp,
376 log,
377 ));
378 }
379 }
380 StatsEvent::MessageReceived { id, timestamp } => {
381 if let Some(channel_stats) = stats.get_mut(&id) {
382 channel_stats.received_count += 1;
383 channel_stats.update_state();
384
385 let limit = get_log_limit();
386 if channel_stats.received_logs.len() >= limit {
387 channel_stats.received_logs.pop_front();
388 }
389 channel_stats.received_logs.push_back(LogEntry::new(
390 channel_stats.received_count,
391 timestamp,
392 None,
393 ));
394 }
395 }
396 StatsEvent::Closed { id } => {
397 if let Some(channel_stats) = stats.get_mut(&id) {
398 channel_stats.state = ChannelState::Closed;
399 }
400 }
401 StatsEvent::Notified { id } => {
402 if let Some(channel_stats) = stats.get_mut(&id) {
403 channel_stats.state = ChannelState::Notified;
404 }
405 }
406 }
407 }
408 })
409 .expect("Failed to spawn channel-stats-collector thread");
410
411 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
414 .ok()
415 .and_then(|p| p.parse::<u16>().ok())
416 .unwrap_or(6770);
417 let addr = format!("127.0.0.1:{}", port);
418
419 std::thread::spawn(move || {
420 start_metrics_server(&addr);
421 });
422
423 (tx, stats_map)
424 })
425}
426
427fn resolve_label(id: &'static str, provided: Option<&str>, iter: u32) -> String {
428 let base_label = if let Some(l) = provided {
429 l.to_string()
430 } else if let Some(pos) = id.rfind(':') {
431 let (path, line_part) = id.split_at(pos);
432 let line = &line_part[1..];
433 format!("{}:{}", extract_filename(path), line)
434 } else {
435 extract_filename(id)
436 };
437
438 if iter > 0 {
439 format!("{}-{}", base_label, iter + 1)
440 } else {
441 base_label
442 }
443}
444
445fn extract_filename(path: &str) -> String {
446 let components: Vec<&str> = path.split('/').collect();
447 if components.len() >= 2 {
448 format!(
449 "{}/{}",
450 components[components.len() - 2],
451 components[components.len() - 1]
452 )
453 } else {
454 path.to_string()
455 }
456}
457
458pub fn format_bytes(bytes: u64) -> String {
460 if bytes == 0 {
461 return "0 B".to_string();
462 }
463
464 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
465 let mut size = bytes as f64;
466 let mut unit_idx = 0;
467
468 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
469 size /= 1024.0;
470 unit_idx += 1;
471 }
472
473 if unit_idx == 0 {
474 format!("{} {}", bytes, UNITS[unit_idx])
475 } else {
476 format!("{:.1} {}", size, UNITS[unit_idx])
477 }
478}
479
480#[doc(hidden)]
484pub trait Instrument {
485 type Output;
486 fn instrument(
487 self,
488 source: &'static str,
489 label: Option<String>,
490 capacity: Option<usize>,
491 ) -> Self::Output;
492}
493
494#[doc(hidden)]
498pub trait InstrumentLog {
499 type Output;
500 fn instrument_log(
501 self,
502 source: &'static str,
503 label: Option<String>,
504 capacity: Option<usize>,
505 ) -> Self::Output;
506}
507
508cfg_if::cfg_if! {
509 if #[cfg(any(feature = "tokio", feature = "futures"))] {
510 use std::sync::LazyLock;
511 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
512 tokio::runtime::Builder::new_multi_thread()
513 .enable_time()
514 .build()
515 .unwrap()
516 });
517 }
518}
519
520#[macro_export]
590macro_rules! instrument {
591 ($expr:expr) => {{
592 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
593 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
594 }};
595
596 ($expr:expr, label = $label:expr) => {{
597 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
598 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), None)
599 }};
600
601 ($expr:expr, capacity = $capacity:expr) => {{
602 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
603 const _: usize = $capacity;
604 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
605 }};
606
607 ($expr:expr, label = $label:expr, capacity = $capacity:expr) => {{
608 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
609 const _: usize = $capacity;
610 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
611 }};
612
613 ($expr:expr, capacity = $capacity:expr, label = $label:expr) => {{
614 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
615 const _: usize = $capacity;
616 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label.to_string()), Some($capacity))
617 }};
618
619 ($expr:expr, log = true) => {{
621 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
623 }};
624
625 ($expr:expr, label = $label:expr, log = true) => {{
626 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
627 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
628 }};
629
630 ($expr:expr, log = true, label = $label:expr) => {{
631 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
632 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label.to_string()), None)
633 }};
634
635 ($expr:expr, capacity = $capacity:expr, log = true) => {{
636 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
637 const _: usize = $capacity;
638 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
639 }};
640
641 ($expr:expr, log = true, capacity = $capacity:expr) => {{
642 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
643 const _: usize = $capacity;
644 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
645 }};
646
647 ($expr:expr, label = $label:expr, capacity = $capacity:expr, log = true) => {{
648 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
649 const _: usize = $capacity;
650 $crate::InstrumentLog::instrument_log(
651 $expr,
652 CHANNEL_ID,
653 Some($label.to_string()),
654 Some($capacity),
655 )
656 }};
657
658 ($expr:expr, label = $label:expr, log = true, capacity = $capacity:expr) => {{
659 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
660 const _: usize = $capacity;
661 $crate::InstrumentLog::instrument_log(
662 $expr,
663 CHANNEL_ID,
664 Some($label.to_string()),
665 Some($capacity),
666 )
667 }};
668
669 ($expr:expr, capacity = $capacity:expr, label = $label:expr, log = true) => {{
670 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
671 const _: usize = $capacity;
672 $crate::InstrumentLog::instrument_log(
673 $expr,
674 CHANNEL_ID,
675 Some($label.to_string()),
676 Some($capacity),
677 )
678 }};
679
680 ($expr:expr, capacity = $capacity:expr, log = true, label = $label:expr) => {{
681 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
682 const _: usize = $capacity;
683 $crate::InstrumentLog::instrument_log(
684 $expr,
685 CHANNEL_ID,
686 Some($label.to_string()),
687 Some($capacity),
688 )
689 }};
690
691 ($expr:expr, log = true, label = $label:expr, capacity = $capacity:expr) => {{
692 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
693 const _: usize = $capacity;
694 $crate::InstrumentLog::instrument_log(
695 $expr,
696 CHANNEL_ID,
697 Some($label.to_string()),
698 Some($capacity),
699 )
700 }};
701
702 ($expr:expr, log = true, capacity = $capacity:expr, label = $label:expr) => {{
703 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
704 const _: usize = $capacity;
705 $crate::InstrumentLog::instrument_log(
706 $expr,
707 CHANNEL_ID,
708 Some($label.to_string()),
709 Some($capacity),
710 )
711 }};
712}
713
714fn get_channel_stats() -> HashMap<u64, ChannelStats> {
715 if let Some((_, stats_map)) = STATS_STATE.get() {
716 stats_map.read().unwrap().clone()
717 } else {
718 HashMap::new()
719 }
720}
721
722fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
725 match (a.label.is_some(), b.label.is_some()) {
726 (true, false) => std::cmp::Ordering::Less,
727 (false, true) => std::cmp::Ordering::Greater,
728 (true, true) => a
729 .label
730 .as_ref()
731 .unwrap()
732 .cmp(b.label.as_ref().unwrap())
733 .then_with(|| a.iter.cmp(&b.iter)),
734 (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
735 }
736}
737
738pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
739 let mut stats: Vec<ChannelStats> = get_channel_stats().into_values().collect();
740 stats.sort_by(compare_channel_stats);
741 stats
742}
743
744fn get_metrics_json() -> MetricsJson {
745 let stats = get_sorted_channel_stats()
746 .iter()
747 .map(SerializableChannelStats::from)
748 .collect();
749
750 let current_elapsed_ns = START_TIME
751 .get()
752 .expect("START_TIME must be initialized")
753 .elapsed()
754 .as_nanos() as u64;
755
756 MetricsJson {
757 current_elapsed_ns,
758 stats,
759 }
760}
761
762#[derive(Debug, Clone, Serialize, Deserialize)]
764pub struct ChannelLogs {
765 pub id: String,
766 pub sent_logs: Vec<LogEntry>,
767 pub received_logs: Vec<LogEntry>,
768}
769
770pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
771 let id = channel_id.parse::<u64>().ok()?;
772 let stats = get_channel_stats();
773 stats.get(&id).map(|channel_stats| {
774 let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
775
776 let mut received_logs: Vec<LogEntry> =
777 channel_stats.received_logs.iter().cloned().collect();
778
779 sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
781 received_logs.sort_by(|a, b| b.index.cmp(&a.index));
782
783 ChannelLogs {
784 id: channel_id.to_string(),
785 sent_logs,
786 received_logs,
787 }
788 })
789}