1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::{Arc, OnceLock, RwLock};
5use std::time::Instant;
6
7pub mod channels_guard;
8pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
9
10use crate::http_api::start_metrics_server;
11mod http_api;
12mod wrappers;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogEntry {
17 pub index: u64,
18 pub timestamp: u64,
19 pub message: Option<String>,
20}
21
22impl LogEntry {
23 pub(crate) fn new(index: u64, timestamp: Instant, message: Option<String>) -> Self {
24 let start_time = START_TIME.get().copied().unwrap_or(timestamp);
25 let timestamp_nanos = timestamp.duration_since(start_time).as_nanos() as u64;
26 Self {
27 index,
28 timestamp: timestamp_nanos,
29 message,
30 }
31 }
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ChannelType {
37 Bounded(usize),
38 Unbounded,
39 Oneshot,
40}
41
42impl std::fmt::Display for ChannelType {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
46 ChannelType::Unbounded => write!(f, "unbounded"),
47 ChannelType::Oneshot => write!(f, "oneshot"),
48 }
49 }
50}
51
52impl Serialize for ChannelType {
53 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
54 where
55 S: serde::Serializer,
56 {
57 serializer.serialize_str(&self.to_string())
58 }
59}
60
61impl<'de> Deserialize<'de> for ChannelType {
62 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
63 where
64 D: serde::Deserializer<'de>,
65 {
66 let s = String::deserialize(deserializer)?;
67
68 match s.as_str() {
69 "unbounded" => Ok(ChannelType::Unbounded),
70 "oneshot" => Ok(ChannelType::Oneshot),
71 _ => {
72 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
74 let size = inner
75 .parse()
76 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
77 Ok(ChannelType::Bounded(size))
78 } else {
79 Err(serde::de::Error::custom("invalid channel type"))
80 }
81 }
82 }
83 }
84}
85
86#[derive(Clone, Copy, Debug, Default)]
88pub enum Format {
89 #[default]
90 Table,
91 Json,
92 JsonPretty,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
97pub enum ChannelState {
98 #[default]
99 Active,
100 Closed,
101 Full,
102 Notified,
103}
104
105impl std::fmt::Display for ChannelState {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "{}", self.as_str())
108 }
109}
110
111impl ChannelState {
112 pub fn as_str(&self) -> &'static str {
113 match self {
114 ChannelState::Active => "active",
115 ChannelState::Closed => "closed",
116 ChannelState::Full => "full",
117 ChannelState::Notified => "notified",
118 }
119 }
120}
121
122impl Serialize for ChannelState {
123 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
124 where
125 S: serde::Serializer,
126 {
127 serializer.serialize_str(self.as_str())
128 }
129}
130
131impl<'de> Deserialize<'de> for ChannelState {
132 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
133 where
134 D: serde::Deserializer<'de>,
135 {
136 let s = String::deserialize(deserializer)?;
137 match s.as_str() {
138 "active" => Ok(ChannelState::Active),
139 "closed" => Ok(ChannelState::Closed),
140 "full" => Ok(ChannelState::Full),
141 "notified" => Ok(ChannelState::Notified),
142 _ => Err(serde::de::Error::custom("invalid channel state")),
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
149pub(crate) struct ChannelStats {
150 pub(crate) id: &'static str,
151 pub(crate) label: Option<&'static str>,
152 pub(crate) channel_type: ChannelType,
153 pub(crate) state: ChannelState,
154 pub(crate) sent_count: u64,
155 pub(crate) received_count: u64,
156 pub(crate) type_name: &'static str,
157 pub(crate) type_size: usize,
158 pub(crate) sent_logs: VecDeque<LogEntry>,
159 pub(crate) received_logs: VecDeque<LogEntry>,
160}
161
162impl ChannelStats {
163 pub fn queued(&self) -> u64 {
164 self.sent_count
165 .saturating_sub(self.received_count)
166 .saturating_sub(1)
167 }
168
169 pub fn total_bytes(&self) -> u64 {
170 self.sent_count * self.type_size as u64
171 }
172
173 pub fn queued_bytes(&self) -> u64 {
174 self.queued() * self.type_size as u64
175 }
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct SerializableChannelStats {
181 pub id: String,
182 pub label: String,
183 pub channel_type: ChannelType,
184 pub state: ChannelState,
185 pub sent_count: u64,
186 pub received_count: u64,
187 pub queued: u64,
188 pub type_name: String,
189 pub type_size: usize,
190 pub total_bytes: u64,
191 pub queued_bytes: u64,
192}
193
194impl From<&ChannelStats> for SerializableChannelStats {
195 fn from(stats: &ChannelStats) -> Self {
196 let label = resolve_label(stats.id, stats.label);
197 Self {
198 id: stats.id.to_string(),
199 label,
200 channel_type: stats.channel_type,
201 state: stats.state,
202 sent_count: stats.sent_count,
203 received_count: stats.received_count,
204 queued: stats.queued(),
205 type_name: stats.type_name.to_string(),
206 type_size: stats.type_size,
207 total_bytes: stats.total_bytes(),
208 queued_bytes: stats.queued_bytes(),
209 }
210 }
211}
212
213impl ChannelStats {
214 fn new(
215 id: &'static str,
216 label: Option<&'static str>,
217 channel_type: ChannelType,
218 type_name: &'static str,
219 type_size: usize,
220 ) -> Self {
221 Self {
222 id,
223 label,
224 channel_type,
225 state: ChannelState::default(),
226 sent_count: 0,
227 received_count: 0,
228 type_name,
229 type_size,
230 sent_logs: VecDeque::new(),
231 received_logs: VecDeque::new(),
232 }
233 }
234
235 fn update_state(&mut self) {
236 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
237 return;
238 }
239
240 let queued = self.queued();
241 let is_full = match self.channel_type {
242 ChannelType::Bounded(cap) => queued >= cap as u64,
243 ChannelType::Oneshot => queued >= 1,
244 ChannelType::Unbounded => false,
245 };
246
247 if is_full {
248 self.state = ChannelState::Full;
249 } else {
250 self.state = ChannelState::Active;
251 }
252 }
253}
254
255#[derive(Debug)]
257pub(crate) enum StatsEvent {
258 Created {
259 id: &'static str,
260 display_label: Option<&'static str>,
261 channel_type: ChannelType,
262 type_name: &'static str,
263 type_size: usize,
264 },
265 MessageSent {
266 id: &'static str,
267 log: Option<String>,
268 timestamp: Instant,
269 },
270 MessageReceived {
271 id: &'static str,
272 timestamp: Instant,
273 },
274 Closed {
275 id: &'static str,
276 },
277 #[allow(dead_code)]
278 Notified {
279 id: &'static str,
280 },
281}
282
283type StatsState = (
284 CbSender<StatsEvent>,
285 Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
286);
287
288static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
290
291static START_TIME: OnceLock<Instant> = OnceLock::new();
292
293const DEFAULT_LOG_LIMIT: usize = 50;
294
295fn get_log_limit() -> usize {
296 std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
297 .ok()
298 .and_then(|s| s.parse().ok())
299 .unwrap_or(DEFAULT_LOG_LIMIT)
300}
301
302fn init_stats_state() -> &'static StatsState {
305 STATS_STATE.get_or_init(|| {
306 START_TIME.get_or_init(Instant::now);
307
308 let (tx, rx) = unbounded::<StatsEvent>();
309 let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
310 let stats_map_clone = Arc::clone(&stats_map);
311
312 std::thread::Builder::new()
313 .name("channel-stats-collector".into())
314 .spawn(move || {
315 while let Ok(event) = rx.recv() {
316 let mut stats = stats_map_clone.write().unwrap();
317 match event {
318 StatsEvent::Created {
319 id: key,
320 display_label,
321 channel_type,
322 type_name,
323 type_size,
324 } => {
325 stats.insert(
326 key,
327 ChannelStats::new(
328 key,
329 display_label,
330 channel_type,
331 type_name,
332 type_size,
333 ),
334 );
335 }
336 StatsEvent::MessageSent { id, log, timestamp } => {
337 if let Some(channel_stats) = stats.get_mut(id) {
338 channel_stats.sent_count += 1;
339 channel_stats.update_state();
340
341 let limit = get_log_limit();
342 if channel_stats.sent_logs.len() >= limit {
343 channel_stats.sent_logs.pop_front();
344 }
345 channel_stats.sent_logs.push_back(LogEntry::new(
346 channel_stats.sent_count,
347 timestamp,
348 log,
349 ));
350 }
351 }
352 StatsEvent::MessageReceived { id, timestamp } => {
353 if let Some(channel_stats) = stats.get_mut(id) {
354 channel_stats.received_count += 1;
355 channel_stats.update_state();
356
357 let limit = get_log_limit();
358 if channel_stats.received_logs.len() >= limit {
359 channel_stats.received_logs.pop_front();
360 }
361 channel_stats.received_logs.push_back(LogEntry::new(
362 channel_stats.received_count,
363 timestamp,
364 None,
365 ));
366 }
367 }
368 StatsEvent::Closed { id } => {
369 if let Some(channel_stats) = stats.get_mut(id) {
370 channel_stats.state = ChannelState::Closed;
371 }
372 }
373 StatsEvent::Notified { id } => {
374 if let Some(channel_stats) = stats.get_mut(id) {
375 channel_stats.state = ChannelState::Notified;
376 }
377 }
378 }
379 }
380 })
381 .expect("Failed to spawn channel-stats-collector thread");
382
383 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
386 .ok()
387 .and_then(|p| p.parse::<u16>().ok())
388 .unwrap_or(6770);
389 let addr = format!("127.0.0.1:{}", port);
390
391 std::thread::spawn(move || {
392 start_metrics_server(&addr);
393 });
394
395 (tx, stats_map)
396 })
397}
398
399fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
400 if let Some(l) = provided {
401 return l.to_string();
402 }
403 if let Some(pos) = id.rfind(':') {
404 let (path, line_part) = id.split_at(pos);
405 let line = &line_part[1..];
406 format!("{}:{}", extract_filename(path), line)
407 } else {
408 extract_filename(id)
409 }
410}
411
412fn extract_filename(path: &str) -> String {
413 let components: Vec<&str> = path.split('/').collect();
414 if components.len() >= 2 {
415 format!(
416 "{}/{}",
417 components[components.len() - 2],
418 components[components.len() - 1]
419 )
420 } else {
421 path.to_string()
422 }
423}
424
425pub fn format_bytes(bytes: u64) -> String {
427 if bytes == 0 {
428 return "0 B".to_string();
429 }
430
431 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
432 let mut size = bytes as f64;
433 let mut unit_idx = 0;
434
435 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
436 size /= 1024.0;
437 unit_idx += 1;
438 }
439
440 if unit_idx == 0 {
441 format!("{} {}", bytes, UNITS[unit_idx])
442 } else {
443 format!("{:.1} {}", size, UNITS[unit_idx])
444 }
445}
446
447#[doc(hidden)]
451pub trait Instrument {
452 type Output;
453 fn instrument(
454 self,
455 channel_id: &'static str,
456 label: Option<&'static str>,
457 capacity: Option<usize>,
458 ) -> Self::Output;
459}
460
461#[doc(hidden)]
465pub trait InstrumentLog {
466 type Output;
467 fn instrument_log(
468 self,
469 channel_id: &'static str,
470 label: Option<&'static str>,
471 capacity: Option<usize>,
472 ) -> Self::Output;
473}
474
475cfg_if::cfg_if! {
476 if #[cfg(any(feature = "tokio", feature = "futures"))] {
477 use std::sync::LazyLock;
478 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
479 tokio::runtime::Builder::new_multi_thread()
480 .enable_time()
481 .build()
482 .unwrap()
483 });
484 }
485}
486
487#[macro_export]
557macro_rules! instrument {
558 ($expr:expr) => {{
559 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
560 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
561 }};
562
563 ($expr:expr, label = $label:literal) => {{
564 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
565 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
566 }};
567
568 ($expr:expr, capacity = $capacity:expr) => {{
569 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
570 const _: usize = $capacity;
571 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
572 }};
573
574 ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
575 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
576 const _: usize = $capacity;
577 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
578 }};
579
580 ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
581 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
582 const _: usize = $capacity;
583 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
584 }};
585
586 ($expr:expr, log = true) => {{
588 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
589 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
590 }};
591
592 ($expr:expr, label = $label:literal, log = true) => {{
593 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
594 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
595 }};
596
597 ($expr:expr, log = true, label = $label:literal) => {{
598 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
599 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
600 }};
601
602 ($expr:expr, capacity = $capacity:expr, log = true) => {{
603 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
604 const _: usize = $capacity;
605 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
606 }};
607
608 ($expr:expr, log = true, capacity = $capacity:expr) => {{
609 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
610 const _: usize = $capacity;
611 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
612 }};
613
614 ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
615 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
616 const _: usize = $capacity;
617 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
618 }};
619
620 ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
621 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
622 const _: usize = $capacity;
623 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
624 }};
625
626 ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
627 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
628 const _: usize = $capacity;
629 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
630 }};
631
632 ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
633 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
634 const _: usize = $capacity;
635 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
636 }};
637
638 ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
639 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
640 const _: usize = $capacity;
641 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
642 }};
643
644 ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
645 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
646 const _: usize = $capacity;
647 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
648 }};
649}
650
651fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
652 if let Some((_, stats_map)) = STATS_STATE.get() {
653 stats_map.read().unwrap().clone()
654 } else {
655 HashMap::new()
656 }
657}
658
659fn get_serializable_stats() -> Vec<SerializableChannelStats> {
660 let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
661 .values()
662 .map(SerializableChannelStats::from)
663 .collect();
664
665 stats.sort_by(|a, b| a.id.cmp(&b.id));
666 stats
667}
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
671pub struct ChannelLogs {
672 pub id: String,
673 pub sent_logs: Vec<LogEntry>,
674 pub received_logs: Vec<LogEntry>,
675}
676
677pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
678 let stats = get_channel_stats();
679 stats.get(channel_id).map(|channel_stats| {
680 let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
681
682 let mut received_logs: Vec<LogEntry> =
683 channel_stats.received_logs.iter().cloned().collect();
684
685 sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
687 received_logs.sort_by(|a, b| b.index.cmp(&a.index));
688
689 ChannelLogs {
690 id: channel_id.to_string(),
691 sent_logs,
692 received_logs,
693 }
694 })
695}