1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::atomic::AtomicU64;
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::Instant;
7
8pub mod channels_guard;
9pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
10
11use crate::http_api::start_metrics_server;
12mod http_api;
13mod wrappers;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct LogEntry {
18 pub index: u64,
19 pub timestamp: u64,
20 pub message: Option<String>,
21}
22
23impl LogEntry {
24 pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
25 let start_time = START_TIME.get().copied().unwrap_or(timestamp);
26 let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
27 Self {
28 index,
29 timestamp: timestamp_nanos,
30 message,
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ChannelType {
38 Bounded(usize),
39 Unbounded,
40 Oneshot,
41}
42
43impl std::fmt::Display for ChannelType {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
47 ChannelType::Unbounded => write!(f, "unbounded"),
48 ChannelType::Oneshot => write!(f, "oneshot"),
49 }
50 }
51}
52
53impl Serialize for ChannelType {
54 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
55 where
56 S: serde::Serializer,
57 {
58 serializer.serialize_str(&self.to_string())
59 }
60}
61
62impl<'de> Deserialize<'de> for ChannelType {
63 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
64 where
65 D: serde::Deserializer<'de>,
66 {
67 let s = String::deserialize(deserializer)?;
68
69 match s.as_str() {
70 "unbounded" => Ok(ChannelType::Unbounded),
71 "oneshot" => Ok(ChannelType::Oneshot),
72 _ => {
73 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
75 let size = inner
76 .parse()
77 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
78 Ok(ChannelType::Bounded(size))
79 } else {
80 Err(serde::de::Error::custom("invalid channel type"))
81 }
82 }
83 }
84 }
85}
86
87#[derive(Clone, Copy, Debug, Default)]
89pub enum Format {
90 #[default]
91 Table,
92 Json,
93 JsonPretty,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum ChannelState {
99 #[default]
100 Active,
101 Closed,
102 Full,
103 Notified,
104}
105
106impl std::fmt::Display for ChannelState {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 write!(f, "{}", self.as_str())
109 }
110}
111
112impl ChannelState {
113 pub fn as_str(&self) -> &'static str {
114 match self {
115 ChannelState::Active => "active",
116 ChannelState::Closed => "closed",
117 ChannelState::Full => "full",
118 ChannelState::Notified => "notified",
119 }
120 }
121}
122
123impl Serialize for ChannelState {
124 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
125 where
126 S: serde::Serializer,
127 {
128 serializer.serialize_str(self.as_str())
129 }
130}
131
132impl<'de> Deserialize<'de> for ChannelState {
133 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
134 where
135 D: serde::Deserializer<'de>,
136 {
137 let s = String::deserialize(deserializer)?;
138 match s.as_str() {
139 "active" => Ok(ChannelState::Active),
140 "closed" => Ok(ChannelState::Closed),
141 "full" => Ok(ChannelState::Full),
142 "notified" => Ok(ChannelState::Notified),
143 _ => Err(serde::de::Error::custom("invalid channel state")),
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
150pub(crate) struct ChannelStats {
151 pub(crate) id: u64,
152 pub(crate) source: &'static str,
153 pub(crate) label: Option<&'static str>,
154 pub(crate) channel_type: ChannelType,
155 pub(crate) state: ChannelState,
156 pub(crate) sent_count: u64,
157 pub(crate) received_count: u64,
158 pub(crate) type_name: &'static str,
159 pub(crate) type_size: usize,
160 pub(crate) sent_logs: VecDeque<LogEntry>,
161 pub(crate) received_logs: VecDeque<LogEntry>,
162 pub(crate) iter: u32,
163}
164
165impl ChannelStats {
166 pub fn queued(&self) -> u64 {
167 self.sent_count
168 .saturating_sub(self.received_count)
169 .saturating_sub(1)
170 }
171
172 pub fn queued_bytes(&self) -> u64 {
173 self.queued() * self.type_size as u64
174 }
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct SerializableChannelStats {
180 pub id: u64,
181 pub source: String,
182 pub label: String,
183 pub has_custom_label: bool,
184 pub channel_type: ChannelType,
185 pub state: ChannelState,
186 pub sent_count: u64,
187 pub received_count: u64,
188 pub queued: u64,
189 pub type_name: String,
190 pub type_size: usize,
191 pub queued_bytes: u64,
192 pub iter: u32,
193}
194
195impl From<&ChannelStats> for SerializableChannelStats {
196 fn from(stats: &ChannelStats) -> Self {
197 let label = resolve_label(stats.source, stats.label, stats.iter);
198 Self {
199 id: stats.id,
200 source: stats.source.to_string(),
201 label,
202 has_custom_label: stats.label.is_some(),
203 channel_type: stats.channel_type,
204 state: stats.state,
205 sent_count: stats.sent_count,
206 received_count: stats.received_count,
207 queued: stats.queued(),
208 type_name: stats.type_name.to_string(),
209 type_size: stats.type_size,
210 queued_bytes: stats.queued_bytes(),
211 iter: stats.iter,
212 }
213 }
214}
215
216impl ChannelStats {
217 fn new(
218 id: u64,
219 source: &'static str,
220 label: Option<&'static str>,
221 channel_type: ChannelType,
222 type_name: &'static str,
223 type_size: usize,
224 iter: u32,
225 ) -> Self {
226 Self {
227 id,
228 source,
229 label,
230 channel_type,
231 state: ChannelState::default(),
232 sent_count: 0,
233 received_count: 0,
234 type_name,
235 type_size,
236 sent_logs: VecDeque::new(),
237 received_logs: VecDeque::new(),
238 iter,
239 }
240 }
241
242 fn update_state(&mut self) {
243 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
244 return;
245 }
246
247 let queued = self.queued();
248 let is_full = match self.channel_type {
249 ChannelType::Bounded(cap) => queued >= cap as u64,
250 ChannelType::Oneshot => queued >= 1,
251 ChannelType::Unbounded => false,
252 };
253
254 if is_full {
255 self.state = ChannelState::Full;
256 } else {
257 self.state = ChannelState::Active;
258 }
259 }
260}
261
262#[derive(Debug)]
264pub(crate) enum StatsEvent {
265 Created {
266 id: u64,
267 source: &'static str,
268 display_label: Option<&'static str>,
269 channel_type: ChannelType,
270 type_name: &'static str,
271 type_size: usize,
272 },
273 MessageSent {
274 id: u64,
275 log: Option<String>,
276 timestamp: Instant,
277 },
278 MessageReceived {
279 id: u64,
280 timestamp: Instant,
281 },
282 Closed {
283 id: u64,
284 },
285 #[allow(dead_code)]
286 Notified {
287 id: u64,
288 },
289}
290
291type StatsState = (
292 CbSender<StatsEvent>,
293 Arc<RwLock<HashMap<u64, ChannelStats>>>,
294);
295
296static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
298
299static START_TIME: OnceLock<Instant> = OnceLock::new();
300
301pub(crate) static CHANNEL_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
303
304const DEFAULT_LOG_LIMIT: usize = 50;
305
306fn get_log_limit() -> usize {
307 std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
308 .ok()
309 .and_then(|s| s.parse().ok())
310 .unwrap_or(DEFAULT_LOG_LIMIT)
311}
312
313fn init_stats_state() -> &'static StatsState {
316 STATS_STATE.get_or_init(|| {
317 START_TIME.get_or_init(Instant::now);
318
319 let (tx, rx) = unbounded::<StatsEvent>();
320 let stats_map = Arc::new(RwLock::new(HashMap::<u64, ChannelStats>::new()));
321 let stats_map_clone = Arc::clone(&stats_map);
322
323 std::thread::Builder::new()
324 .name("channel-stats-collector".into())
325 .spawn(move || {
326 while let Ok(event) = rx.recv() {
327 let mut stats = stats_map_clone.write().unwrap();
328 match event {
329 StatsEvent::Created {
330 id,
331 source,
332 display_label,
333 channel_type,
334 type_name,
335 type_size,
336 } => {
337 let iter =
339 stats.values().filter(|cs| cs.source == source).count() as u32;
340
341 stats.insert(
342 id,
343 ChannelStats::new(
344 id,
345 source,
346 display_label,
347 channel_type,
348 type_name,
349 type_size,
350 iter,
351 ),
352 );
353 }
354 StatsEvent::MessageSent { id, log, timestamp } => {
355 if let Some(channel_stats) = stats.get_mut(&id) {
356 channel_stats.sent_count += 1;
357 channel_stats.update_state();
358
359 let limit = get_log_limit();
360 if channel_stats.sent_logs.len() >= limit {
361 channel_stats.sent_logs.pop_front();
362 }
363 channel_stats.sent_logs.push_back(LogEntry::new(
364 channel_stats.sent_count,
365 timestamp,
366 log,
367 ));
368 }
369 }
370 StatsEvent::MessageReceived { id, timestamp } => {
371 if let Some(channel_stats) = stats.get_mut(&id) {
372 channel_stats.received_count += 1;
373 channel_stats.update_state();
374
375 let limit = get_log_limit();
376 if channel_stats.received_logs.len() >= limit {
377 channel_stats.received_logs.pop_front();
378 }
379 channel_stats.received_logs.push_back(LogEntry::new(
380 channel_stats.received_count,
381 timestamp,
382 None,
383 ));
384 }
385 }
386 StatsEvent::Closed { id } => {
387 if let Some(channel_stats) = stats.get_mut(&id) {
388 channel_stats.state = ChannelState::Closed;
389 }
390 }
391 StatsEvent::Notified { id } => {
392 if let Some(channel_stats) = stats.get_mut(&id) {
393 channel_stats.state = ChannelState::Notified;
394 }
395 }
396 }
397 }
398 })
399 .expect("Failed to spawn channel-stats-collector thread");
400
401 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
404 .ok()
405 .and_then(|p| p.parse::<u16>().ok())
406 .unwrap_or(6770);
407 let addr = format!("127.0.0.1:{}", port);
408
409 std::thread::spawn(move || {
410 start_metrics_server(&addr);
411 });
412
413 (tx, stats_map)
414 })
415}
416
417fn resolve_label(id: &'static str, provided: Option<&'static str>, iter: u32) -> String {
418 let base_label = if let Some(l) = provided {
419 l.to_string()
420 } else if let Some(pos) = id.rfind(':') {
421 let (path, line_part) = id.split_at(pos);
422 let line = &line_part[1..];
423 format!("{}:{}", extract_filename(path), line)
424 } else {
425 extract_filename(id)
426 };
427
428 if iter > 0 {
429 format!("{}-{}", base_label, iter + 1)
430 } else {
431 base_label
432 }
433}
434
435fn extract_filename(path: &str) -> String {
436 let components: Vec<&str> = path.split('/').collect();
437 if components.len() >= 2 {
438 format!(
439 "{}/{}",
440 components[components.len() - 2],
441 components[components.len() - 1]
442 )
443 } else {
444 path.to_string()
445 }
446}
447
448pub fn format_bytes(bytes: u64) -> String {
450 if bytes == 0 {
451 return "0 B".to_string();
452 }
453
454 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
455 let mut size = bytes as f64;
456 let mut unit_idx = 0;
457
458 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
459 size /= 1024.0;
460 unit_idx += 1;
461 }
462
463 if unit_idx == 0 {
464 format!("{} {}", bytes, UNITS[unit_idx])
465 } else {
466 format!("{:.1} {}", size, UNITS[unit_idx])
467 }
468}
469
470#[doc(hidden)]
474pub trait Instrument {
475 type Output;
476 fn instrument(
477 self,
478 source: &'static str,
479 label: Option<&'static str>,
480 capacity: Option<usize>,
481 ) -> Self::Output;
482}
483
484#[doc(hidden)]
488pub trait InstrumentLog {
489 type Output;
490 fn instrument_log(
491 self,
492 source: &'static str,
493 label: Option<&'static str>,
494 capacity: Option<usize>,
495 ) -> Self::Output;
496}
497
498cfg_if::cfg_if! {
499 if #[cfg(any(feature = "tokio", feature = "futures"))] {
500 use std::sync::LazyLock;
501 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
502 tokio::runtime::Builder::new_multi_thread()
503 .enable_time()
504 .build()
505 .unwrap()
506 });
507 }
508}
509
510#[macro_export]
580macro_rules! instrument {
581 ($expr:expr) => {{
582 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
583 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
584 }};
585
586 ($expr:expr, label = $label:literal) => {{
587 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
588 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
589 }};
590
591 ($expr:expr, capacity = $capacity:expr) => {{
592 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
593 const _: usize = $capacity;
594 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
595 }};
596
597 ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
598 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
599 const _: usize = $capacity;
600 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
601 }};
602
603 ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
604 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
605 const _: usize = $capacity;
606 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
607 }};
608
609 ($expr:expr, log = true) => {{
611 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
612 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
613 }};
614
615 ($expr:expr, label = $label:literal, log = true) => {{
616 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
617 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
618 }};
619
620 ($expr:expr, log = true, label = $label:literal) => {{
621 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
623 }};
624
625 ($expr:expr, capacity = $capacity:expr, log = true) => {{
626 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
627 const _: usize = $capacity;
628 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
629 }};
630
631 ($expr:expr, log = true, capacity = $capacity:expr) => {{
632 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
633 const _: usize = $capacity;
634 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
635 }};
636
637 ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
638 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
639 const _: usize = $capacity;
640 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
641 }};
642
643 ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
644 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
645 const _: usize = $capacity;
646 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
647 }};
648
649 ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
650 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
651 const _: usize = $capacity;
652 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
653 }};
654
655 ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
656 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
657 const _: usize = $capacity;
658 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
659 }};
660
661 ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
662 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
663 const _: usize = $capacity;
664 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
665 }};
666
667 ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
668 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
669 const _: usize = $capacity;
670 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
671 }};
672}
673
674fn get_channel_stats() -> HashMap<u64, ChannelStats> {
675 if let Some((_, stats_map)) = STATS_STATE.get() {
676 stats_map.read().unwrap().clone()
677 } else {
678 HashMap::new()
679 }
680}
681
682fn compare_channel_stats(a: &ChannelStats, b: &ChannelStats) -> std::cmp::Ordering {
685 match (a.label.is_some(), b.label.is_some()) {
686 (true, false) => std::cmp::Ordering::Less,
687 (false, true) => std::cmp::Ordering::Greater,
688 (true, true) => a
689 .label
690 .unwrap()
691 .cmp(b.label.unwrap())
692 .then_with(|| a.iter.cmp(&b.iter)),
693 (false, false) => a.source.cmp(b.source).then_with(|| a.iter.cmp(&b.iter)),
694 }
695}
696
697pub(crate) fn get_sorted_channel_stats() -> Vec<ChannelStats> {
698 let mut stats: Vec<ChannelStats> = get_channel_stats().into_values().collect();
699 stats.sort_by(compare_channel_stats);
700 stats
701}
702
703fn get_serializable_stats() -> Vec<SerializableChannelStats> {
704 get_sorted_channel_stats()
705 .iter()
706 .map(SerializableChannelStats::from)
707 .collect()
708}
709
710#[derive(Debug, Clone, Serialize, Deserialize)]
712pub struct ChannelLogs {
713 pub id: String,
714 pub sent_logs: Vec<LogEntry>,
715 pub received_logs: Vec<LogEntry>,
716}
717
718pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
719 let id = channel_id.parse::<u64>().ok()?;
720 let stats = get_channel_stats();
721 stats.get(&id).map(|channel_stats| {
722 let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
723
724 let mut received_logs: Vec<LogEntry> =
725 channel_stats.received_logs.iter().cloned().collect();
726
727 sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
729 received_logs.sort_by(|a, b| b.index.cmp(&a.index));
730
731 ChannelLogs {
732 id: channel_id.to_string(),
733 sent_logs,
734 received_logs,
735 }
736 })
737}