1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::{HashMap, VecDeque};
4use std::sync::{Arc, OnceLock, RwLock};
5use std::time::SystemTime;
6
7pub mod channels_guard;
8pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
9
10use crate::http_api::start_metrics_server;
11mod http_api;
12mod wrappers;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct LogEntry {
17 pub index: u64,
18 pub timestamp: u64,
19 pub message: Option<String>,
20}
21
22impl LogEntry {
23 pub(crate) fn new(index: u64, timestamp: SystemTime, message: Option<String>) -> Self {
24 let timestamp_secs = timestamp
25 .duration_since(SystemTime::UNIX_EPOCH)
26 .unwrap_or_default()
27 .as_secs();
28 Self {
29 index,
30 timestamp: timestamp_secs,
31 message,
32 }
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ChannelType {
39 Bounded(usize),
40 Unbounded,
41 Oneshot,
42}
43
44impl std::fmt::Display for ChannelType {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
48 ChannelType::Unbounded => write!(f, "unbounded"),
49 ChannelType::Oneshot => write!(f, "oneshot"),
50 }
51 }
52}
53
54impl Serialize for ChannelType {
55 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56 where
57 S: serde::Serializer,
58 {
59 serializer.serialize_str(&self.to_string())
60 }
61}
62
63impl<'de> Deserialize<'de> for ChannelType {
64 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
65 where
66 D: serde::Deserializer<'de>,
67 {
68 let s = String::deserialize(deserializer)?;
69
70 match s.as_str() {
71 "unbounded" => Ok(ChannelType::Unbounded),
72 "oneshot" => Ok(ChannelType::Oneshot),
73 _ => {
74 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
76 let size = inner
77 .parse()
78 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
79 Ok(ChannelType::Bounded(size))
80 } else {
81 Err(serde::de::Error::custom("invalid channel type"))
82 }
83 }
84 }
85 }
86}
87
88#[derive(Clone, Copy, Debug, Default)]
90pub enum Format {
91 #[default]
92 Table,
93 Json,
94 JsonPretty,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
99pub enum ChannelState {
100 #[default]
101 Active,
102 Closed,
103 Full,
104 Notified,
105}
106
107impl std::fmt::Display for ChannelState {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "{}", self.as_str())
110 }
111}
112
113impl ChannelState {
114 pub fn as_str(&self) -> &'static str {
115 match self {
116 ChannelState::Active => "active",
117 ChannelState::Closed => "closed",
118 ChannelState::Full => "full",
119 ChannelState::Notified => "notified",
120 }
121 }
122}
123
124impl Serialize for ChannelState {
125 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
126 where
127 S: serde::Serializer,
128 {
129 serializer.serialize_str(self.as_str())
130 }
131}
132
133impl<'de> Deserialize<'de> for ChannelState {
134 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135 where
136 D: serde::Deserializer<'de>,
137 {
138 let s = String::deserialize(deserializer)?;
139 match s.as_str() {
140 "active" => Ok(ChannelState::Active),
141 "closed" => Ok(ChannelState::Closed),
142 "full" => Ok(ChannelState::Full),
143 "notified" => Ok(ChannelState::Notified),
144 _ => Err(serde::de::Error::custom("invalid channel state")),
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
151pub(crate) struct ChannelStats {
152 pub(crate) id: &'static str,
153 pub(crate) label: Option<&'static str>,
154 pub(crate) channel_type: ChannelType,
155 pub(crate) state: ChannelState,
156 pub(crate) sent_count: u64,
157 pub(crate) received_count: u64,
158 pub(crate) type_name: &'static str,
159 pub(crate) type_size: usize,
160 pub(crate) sent_logs: VecDeque<LogEntry>,
161 pub(crate) received_logs: VecDeque<LogEntry>,
162}
163
164impl ChannelStats {
165 pub fn queued(&self) -> u64 {
166 self.sent_count.saturating_sub(self.received_count)
167 }
168
169 pub fn total_bytes(&self) -> u64 {
170 self.sent_count * self.type_size as u64
171 }
172
173 pub fn queued_bytes(&self) -> u64 {
174 self.queued() * self.type_size as u64
175 }
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct SerializableChannelStats {
181 pub id: String,
182 pub label: String,
183 pub channel_type: ChannelType,
184 pub state: ChannelState,
185 pub sent_count: u64,
186 pub received_count: u64,
187 pub queued: u64,
188 pub type_name: String,
189 pub type_size: usize,
190 pub total_bytes: u64,
191 pub queued_bytes: u64,
192}
193
194impl From<&ChannelStats> for SerializableChannelStats {
195 fn from(stats: &ChannelStats) -> Self {
196 let label = resolve_label(stats.id, stats.label);
197 Self {
198 id: stats.id.to_string(),
199 label,
200 channel_type: stats.channel_type,
201 state: stats.state,
202 sent_count: stats.sent_count,
203 received_count: stats.received_count,
204 queued: stats.queued(),
205 type_name: stats.type_name.to_string(),
206 type_size: stats.type_size,
207 total_bytes: stats.total_bytes(),
208 queued_bytes: stats.queued_bytes(),
209 }
210 }
211}
212
213impl ChannelStats {
214 fn new(
215 id: &'static str,
216 label: Option<&'static str>,
217 channel_type: ChannelType,
218 type_name: &'static str,
219 type_size: usize,
220 ) -> Self {
221 Self {
222 id,
223 label,
224 channel_type,
225 state: ChannelState::default(),
226 sent_count: 0,
227 received_count: 0,
228 type_name,
229 type_size,
230 sent_logs: VecDeque::new(),
231 received_logs: VecDeque::new(),
232 }
233 }
234
235 fn update_state(&mut self) {
238 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
239 return;
240 }
241
242 if self.sent_count > self.received_count {
243 self.state = ChannelState::Full;
244 } else {
245 self.state = ChannelState::Active;
246 }
247 }
248}
249
250#[derive(Debug)]
252pub(crate) enum StatsEvent {
253 Created {
254 id: &'static str,
255 display_label: Option<&'static str>,
256 channel_type: ChannelType,
257 type_name: &'static str,
258 type_size: usize,
259 },
260 MessageSent {
261 id: &'static str,
262 log: Option<String>,
263 timestamp: SystemTime,
264 },
265 MessageReceived {
266 id: &'static str,
267 timestamp: SystemTime,
268 },
269 Closed {
270 id: &'static str,
271 },
272 #[allow(dead_code)]
273 Notified {
274 id: &'static str,
275 },
276}
277
278type StatsState = (
279 CbSender<StatsEvent>,
280 Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
281);
282
283static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
285
286const DEFAULT_LOG_LIMIT: usize = 50;
287
288fn get_log_limit() -> usize {
289 std::env::var("CHANNELS_CONSOLE_LOG_LIMIT")
290 .ok()
291 .and_then(|s| s.parse().ok())
292 .unwrap_or(DEFAULT_LOG_LIMIT)
293}
294
295fn init_stats_state() -> &'static StatsState {
298 STATS_STATE.get_or_init(|| {
299 let (tx, rx) = unbounded::<StatsEvent>();
300 let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
301 let stats_map_clone = Arc::clone(&stats_map);
302
303 std::thread::Builder::new()
304 .name("channel-stats-collector".into())
305 .spawn(move || {
306 while let Ok(event) = rx.recv() {
307 let mut stats = stats_map_clone.write().unwrap();
308 match event {
309 StatsEvent::Created {
310 id: key,
311 display_label,
312 channel_type,
313 type_name,
314 type_size,
315 } => {
316 stats.insert(
317 key,
318 ChannelStats::new(
319 key,
320 display_label,
321 channel_type,
322 type_name,
323 type_size,
324 ),
325 );
326 }
327 StatsEvent::MessageSent { id, log, timestamp } => {
328 if let Some(channel_stats) = stats.get_mut(id) {
329 channel_stats.sent_count += 1;
330 channel_stats.update_state();
331
332 let limit = get_log_limit();
333 if channel_stats.sent_logs.len() >= limit {
334 channel_stats.sent_logs.pop_front();
335 }
336 channel_stats.sent_logs.push_back(LogEntry::new(
337 channel_stats.sent_count,
338 timestamp,
339 log,
340 ));
341 }
342 }
343 StatsEvent::MessageReceived { id, timestamp } => {
344 if let Some(channel_stats) = stats.get_mut(id) {
345 channel_stats.received_count += 1;
346 channel_stats.update_state();
347
348 let limit = get_log_limit();
349 if channel_stats.received_logs.len() >= limit {
350 channel_stats.received_logs.pop_front();
351 }
352 channel_stats.received_logs.push_back(LogEntry::new(
353 channel_stats.received_count,
354 timestamp,
355 None,
356 ));
357 }
358 }
359 StatsEvent::Closed { id } => {
360 if let Some(channel_stats) = stats.get_mut(id) {
361 channel_stats.state = ChannelState::Closed;
362 }
363 }
364 StatsEvent::Notified { id } => {
365 if let Some(channel_stats) = stats.get_mut(id) {
366 channel_stats.state = ChannelState::Notified;
367 }
368 }
369 }
370 }
371 })
372 .expect("Failed to spawn channel-stats-collector thread");
373
374 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
377 .ok()
378 .and_then(|p| p.parse::<u16>().ok())
379 .unwrap_or(6770);
380 let addr = format!("127.0.0.1:{}", port);
381
382 std::thread::spawn(move || {
383 start_metrics_server(&addr);
384 });
385
386 (tx, stats_map)
387 })
388}
389
390fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
391 if let Some(l) = provided {
392 return l.to_string();
393 }
394 if let Some(pos) = id.rfind(':') {
395 let (path, line_part) = id.split_at(pos);
396 let line = &line_part[1..];
397 format!("{}:{}", extract_filename(path), line)
398 } else {
399 extract_filename(id)
400 }
401}
402
403fn extract_filename(path: &str) -> String {
404 let components: Vec<&str> = path.split('/').collect();
405 if components.len() >= 2 {
406 format!(
407 "{}/{}",
408 components[components.len() - 2],
409 components[components.len() - 1]
410 )
411 } else {
412 path.to_string()
413 }
414}
415
416pub fn format_bytes(bytes: u64) -> String {
418 if bytes == 0 {
419 return "0 B".to_string();
420 }
421
422 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
423 let mut size = bytes as f64;
424 let mut unit_idx = 0;
425
426 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
427 size /= 1024.0;
428 unit_idx += 1;
429 }
430
431 if unit_idx == 0 {
432 format!("{} {}", bytes, UNITS[unit_idx])
433 } else {
434 format!("{:.1} {}", size, UNITS[unit_idx])
435 }
436}
437
438#[doc(hidden)]
442pub trait Instrument {
443 type Output;
444 fn instrument(
445 self,
446 channel_id: &'static str,
447 label: Option<&'static str>,
448 capacity: Option<usize>,
449 ) -> Self::Output;
450}
451
452#[doc(hidden)]
456pub trait InstrumentLog {
457 type Output;
458 fn instrument_log(
459 self,
460 channel_id: &'static str,
461 label: Option<&'static str>,
462 capacity: Option<usize>,
463 ) -> Self::Output;
464}
465
466cfg_if::cfg_if! {
467 if #[cfg(any(feature = "tokio", feature = "futures"))] {
468 use std::sync::LazyLock;
469 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
470 tokio::runtime::Builder::new_multi_thread()
471 .enable_time()
472 .build()
473 .unwrap()
474 });
475 }
476}
477
478#[macro_export]
548macro_rules! instrument {
549 ($expr:expr) => {{
550 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
551 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
552 }};
553
554 ($expr:expr, label = $label:literal) => {{
555 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
556 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
557 }};
558
559 ($expr:expr, capacity = $capacity:expr) => {{
560 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
561 const _: usize = $capacity;
562 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
563 }};
564
565 ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
566 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
567 const _: usize = $capacity;
568 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
569 }};
570
571 ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
572 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
573 const _: usize = $capacity;
574 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
575 }};
576
577 ($expr:expr, log = true) => {{
579 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
580 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, None)
581 }};
582
583 ($expr:expr, label = $label:literal, log = true) => {{
584 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
585 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
586 }};
587
588 ($expr:expr, log = true, label = $label:literal) => {{
589 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
590 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), None)
591 }};
592
593 ($expr:expr, capacity = $capacity:expr, log = true) => {{
594 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
595 const _: usize = $capacity;
596 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
597 }};
598
599 ($expr:expr, log = true, capacity = $capacity:expr) => {{
600 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
601 const _: usize = $capacity;
602 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, None, Some($capacity))
603 }};
604
605 ($expr:expr, label = $label:literal, capacity = $capacity:expr, log = true) => {{
606 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
607 const _: usize = $capacity;
608 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
609 }};
610
611 ($expr:expr, label = $label:literal, log = true, capacity = $capacity:expr) => {{
612 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
613 const _: usize = $capacity;
614 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
615 }};
616
617 ($expr:expr, capacity = $capacity:expr, label = $label:literal, log = true) => {{
618 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
619 const _: usize = $capacity;
620 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
621 }};
622
623 ($expr:expr, capacity = $capacity:expr, log = true, label = $label:literal) => {{
624 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
625 const _: usize = $capacity;
626 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
627 }};
628
629 ($expr:expr, log = true, label = $label:literal, capacity = $capacity:expr) => {{
630 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
631 const _: usize = $capacity;
632 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
633 }};
634
635 ($expr:expr, log = true, capacity = $capacity:expr, label = $label:literal) => {{
636 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
637 const _: usize = $capacity;
638 $crate::InstrumentLog::instrument_log($expr, CHANNEL_ID, Some($label), Some($capacity))
639 }};
640}
641
642fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
643 if let Some((_, stats_map)) = STATS_STATE.get() {
644 stats_map.read().unwrap().clone()
645 } else {
646 HashMap::new()
647 }
648}
649
650fn get_serializable_stats() -> Vec<SerializableChannelStats> {
651 let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
652 .values()
653 .map(SerializableChannelStats::from)
654 .collect();
655
656 stats.sort_by(|a, b| a.id.cmp(&b.id));
657 stats
658}
659
660#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct ChannelLogs {
663 pub id: String,
664 pub sent_logs: Vec<LogEntry>,
665 pub received_logs: Vec<LogEntry>,
666}
667
668pub(crate) fn get_channel_logs(channel_id: &str) -> Option<ChannelLogs> {
669 let stats = get_channel_stats();
670 stats.get(channel_id).map(|channel_stats| {
671 let mut sent_logs: Vec<LogEntry> = channel_stats.sent_logs.iter().cloned().collect();
672
673 let mut received_logs: Vec<LogEntry> =
674 channel_stats.received_logs.iter().cloned().collect();
675
676 sent_logs.sort_by(|a, b| b.index.cmp(&a.index));
678 received_logs.sort_by(|a, b| b.index.cmp(&a.index));
679
680 ChannelLogs {
681 id: channel_id.to_string(),
682 sent_logs,
683 received_logs,
684 }
685 })
686}