1use crossbeam_channel::{unbounded, Sender as CbSender};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::{Arc, OnceLock, RwLock};
5
6pub mod channels_guard;
7pub use channels_guard::{ChannelsGuard, ChannelsGuardBuilder};
8
9use crate::http_api::start_metrics_server;
10mod http_api;
11mod wrappers;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum ChannelType {
16 Bounded(usize),
17 Unbounded,
18 Oneshot,
19}
20
21impl std::fmt::Display for ChannelType {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 match self {
24 ChannelType::Bounded(size) => write!(f, "bounded[{}]", size),
25 ChannelType::Unbounded => write!(f, "unbounded"),
26 ChannelType::Oneshot => write!(f, "oneshot"),
27 }
28 }
29}
30
31impl Serialize for ChannelType {
32 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
33 where
34 S: serde::Serializer,
35 {
36 serializer.serialize_str(&self.to_string())
37 }
38}
39
40impl<'de> Deserialize<'de> for ChannelType {
41 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42 where
43 D: serde::Deserializer<'de>,
44 {
45 let s = String::deserialize(deserializer)?;
46
47 match s.as_str() {
48 "unbounded" => Ok(ChannelType::Unbounded),
49 "oneshot" => Ok(ChannelType::Oneshot),
50 _ => {
51 if let Some(inner) = s.strip_prefix("bounded[").and_then(|x| x.strip_suffix(']')) {
53 let size = inner
54 .parse()
55 .map_err(|_| serde::de::Error::custom("invalid bounded size"))?;
56 Ok(ChannelType::Bounded(size))
57 } else {
58 Err(serde::de::Error::custom("invalid channel type"))
59 }
60 }
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, Default)]
67pub enum Format {
68 #[default]
69 Table,
70 Json,
71 JsonPretty,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
76pub enum ChannelState {
77 #[default]
78 Active,
79 Closed,
80 Full,
81 Notified,
82}
83
84impl std::fmt::Display for ChannelState {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{}", self.as_str())
87 }
88}
89
90impl ChannelState {
91 pub fn as_str(&self) -> &'static str {
92 match self {
93 ChannelState::Active => "active",
94 ChannelState::Closed => "closed",
95 ChannelState::Full => "full",
96 ChannelState::Notified => "notified",
97 }
98 }
99}
100
101impl Serialize for ChannelState {
102 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
103 where
104 S: serde::Serializer,
105 {
106 serializer.serialize_str(self.as_str())
107 }
108}
109
110impl<'de> Deserialize<'de> for ChannelState {
111 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
112 where
113 D: serde::Deserializer<'de>,
114 {
115 let s = String::deserialize(deserializer)?;
116 match s.as_str() {
117 "active" => Ok(ChannelState::Active),
118 "closed" => Ok(ChannelState::Closed),
119 "full" => Ok(ChannelState::Full),
120 "notified" => Ok(ChannelState::Notified),
121 _ => Err(serde::de::Error::custom("invalid channel state")),
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
128pub(crate) struct ChannelStats {
129 pub(crate) id: &'static str,
130 pub(crate) label: Option<&'static str>,
131 pub(crate) channel_type: ChannelType,
132 pub(crate) state: ChannelState,
133 pub(crate) sent_count: u64,
134 pub(crate) received_count: u64,
135 pub(crate) type_name: &'static str,
136 pub(crate) type_size: usize,
137}
138
139impl ChannelStats {
140 pub fn queued(&self) -> u64 {
141 self.sent_count.saturating_sub(self.received_count)
142 }
143
144 pub fn total_bytes(&self) -> u64 {
145 self.sent_count * self.type_size as u64
146 }
147
148 pub fn queued_bytes(&self) -> u64 {
149 self.queued() * self.type_size as u64
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct SerializableChannelStats {
156 pub id: String,
157 pub label: String,
158 pub channel_type: ChannelType,
159 pub state: ChannelState,
160 pub sent_count: u64,
161 pub received_count: u64,
162 pub queued: u64,
163 pub type_name: String,
164 pub type_size: usize,
165 pub total_bytes: u64,
166 pub queued_bytes: u64,
167}
168
169impl From<&ChannelStats> for SerializableChannelStats {
170 fn from(stats: &ChannelStats) -> Self {
171 let label = resolve_label(stats.id, stats.label);
172 Self {
173 id: stats.id.to_string(),
174 label,
175 channel_type: stats.channel_type,
176 state: stats.state,
177 sent_count: stats.sent_count,
178 received_count: stats.received_count,
179 queued: stats.queued(),
180 type_name: stats.type_name.to_string(),
181 type_size: stats.type_size,
182 total_bytes: stats.total_bytes(),
183 queued_bytes: stats.queued_bytes(),
184 }
185 }
186}
187
188impl ChannelStats {
189 fn new(
190 id: &'static str,
191 label: Option<&'static str>,
192 channel_type: ChannelType,
193 type_name: &'static str,
194 type_size: usize,
195 ) -> Self {
196 Self {
197 id,
198 label,
199 channel_type,
200 state: ChannelState::default(),
201 sent_count: 0,
202 received_count: 0,
203 type_name,
204 type_size,
205 }
206 }
207
208 fn update_state(&mut self) {
211 if self.state == ChannelState::Closed || self.state == ChannelState::Notified {
212 return;
213 }
214
215 if self.sent_count > self.received_count {
216 self.state = ChannelState::Full;
217 } else {
218 self.state = ChannelState::Active;
219 }
220 }
221}
222
223#[derive(Debug)]
225pub(crate) enum StatsEvent {
226 Created {
227 id: &'static str,
228 display_label: Option<&'static str>,
229 channel_type: ChannelType,
230 type_name: &'static str,
231 type_size: usize,
232 },
233 MessageSent {
234 id: &'static str,
235 },
236 MessageReceived {
237 id: &'static str,
238 },
239 Closed {
240 id: &'static str,
241 },
242 #[allow(dead_code)]
243 Notified {
244 id: &'static str,
245 },
246}
247
248type StatsState = (
249 CbSender<StatsEvent>,
250 Arc<RwLock<HashMap<&'static str, ChannelStats>>>,
251);
252
253static STATS_STATE: OnceLock<StatsState> = OnceLock::new();
255
256fn init_stats_state() -> &'static StatsState {
259 STATS_STATE.get_or_init(|| {
260 let (tx, rx) = unbounded::<StatsEvent>();
261 let stats_map = Arc::new(RwLock::new(HashMap::<&'static str, ChannelStats>::new()));
262 let stats_map_clone = Arc::clone(&stats_map);
263
264 std::thread::Builder::new()
265 .name("channel-stats-collector".into())
266 .spawn(move || {
267 while let Ok(event) = rx.recv() {
268 let mut stats = stats_map_clone.write().unwrap();
269 match event {
270 StatsEvent::Created {
271 id: key,
272 display_label,
273 channel_type,
274 type_name,
275 type_size,
276 } => {
277 stats.insert(
278 key,
279 ChannelStats::new(
280 key,
281 display_label,
282 channel_type,
283 type_name,
284 type_size,
285 ),
286 );
287 }
288 StatsEvent::MessageSent { id } => {
289 if let Some(channel_stats) = stats.get_mut(id) {
290 channel_stats.sent_count += 1;
291 channel_stats.update_state();
292 }
293 }
294 StatsEvent::MessageReceived { id } => {
295 if let Some(channel_stats) = stats.get_mut(id) {
296 channel_stats.received_count += 1;
297 channel_stats.update_state();
298 }
299 }
300 StatsEvent::Closed { id } => {
301 if let Some(channel_stats) = stats.get_mut(id) {
302 channel_stats.state = ChannelState::Closed;
303 }
304 }
305 StatsEvent::Notified { id } => {
306 if let Some(channel_stats) = stats.get_mut(id) {
307 channel_stats.state = ChannelState::Notified;
308 }
309 }
310 }
311 }
312 })
313 .expect("Failed to spawn channel-stats-collector thread");
314
315 let port = std::env::var("CHANNELS_CONSOLE_METRICS_PORT")
318 .ok()
319 .and_then(|p| p.parse::<u16>().ok())
320 .unwrap_or(6770);
321 let addr = format!("127.0.0.1:{}", port);
322
323 std::thread::spawn(move || {
324 start_metrics_server(&addr);
325 });
326
327 (tx, stats_map)
328 })
329}
330
331fn resolve_label(id: &'static str, provided: Option<&'static str>) -> String {
332 if let Some(l) = provided {
333 return l.to_string();
334 }
335 if let Some(pos) = id.rfind(':') {
336 let (path, line_part) = id.split_at(pos);
337 let line = &line_part[1..];
338 format!("{}:{}", extract_filename(path), line)
339 } else {
340 extract_filename(id)
341 }
342}
343
344fn extract_filename(path: &str) -> String {
345 let components: Vec<&str> = path.split('/').collect();
346 if components.len() >= 2 {
347 format!(
348 "{}/{}",
349 components[components.len() - 2],
350 components[components.len() - 1]
351 )
352 } else {
353 path.to_string()
354 }
355}
356
357pub fn format_bytes(bytes: u64) -> String {
359 if bytes == 0 {
360 return "0 B".to_string();
361 }
362
363 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
364 let mut size = bytes as f64;
365 let mut unit_idx = 0;
366
367 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
368 size /= 1024.0;
369 unit_idx += 1;
370 }
371
372 if unit_idx == 0 {
373 format!("{} {}", bytes, UNITS[unit_idx])
374 } else {
375 format!("{:.1} {}", size, UNITS[unit_idx])
376 }
377}
378
379#[doc(hidden)]
383pub trait Instrument {
384 type Output;
385 fn instrument(
386 self,
387 channel_id: &'static str,
388 label: Option<&'static str>,
389 capacity: Option<usize>,
390 ) -> Self::Output;
391}
392
393cfg_if::cfg_if! {
394 if #[cfg(any(feature = "tokio", feature = "futures"))] {
395 use std::sync::LazyLock;
396 pub static RT: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
397 tokio::runtime::Builder::new_multi_thread()
398 .enable_time()
399 .build()
400 .unwrap()
401 });
402 }
403}
404
405#[macro_export]
460macro_rules! instrument {
461 ($expr:expr) => {{
462 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
463 $crate::Instrument::instrument($expr, CHANNEL_ID, None, None)
464 }};
465
466 ($expr:expr, label = $label:literal) => {{
467 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
468 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), None)
469 }};
470
471 ($expr:expr, capacity = $capacity:expr) => {{
472 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
473 const _: usize = $capacity;
474 $crate::Instrument::instrument($expr, CHANNEL_ID, None, Some($capacity))
475 }};
476
477 ($expr:expr, label = $label:literal, capacity = $capacity:expr) => {{
478 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
479 const _: usize = $capacity;
480 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
481 }};
482
483 ($expr:expr, capacity = $capacity:expr, label = $label:literal) => {{
484 const CHANNEL_ID: &'static str = concat!(file!(), ":", line!());
485 const _: usize = $capacity;
486 $crate::Instrument::instrument($expr, CHANNEL_ID, Some($label), Some($capacity))
487 }};
488}
489
490fn get_channel_stats() -> HashMap<&'static str, ChannelStats> {
491 if let Some((_, stats_map)) = STATS_STATE.get() {
492 stats_map.read().unwrap().clone()
493 } else {
494 HashMap::new()
495 }
496}
497
498fn get_serializable_stats() -> Vec<SerializableChannelStats> {
499 let mut stats: Vec<SerializableChannelStats> = get_channel_stats()
500 .values()
501 .map(SerializableChannelStats::from)
502 .collect();
503
504 stats.sort_by(|a, b| a.id.cmp(&b.id));
505 stats
506}