buswatch_tui/source/channel.rs
1//! Channel-based data source.
2//!
3//! Receives monitor snapshots via a tokio watch channel.
4//! This is useful for message bus integration where snapshots
5//! are pushed rather than polled from a file.
6
7use tokio::sync::watch;
8
9use super::{DataSource, Snapshot};
10
11/// A data source that receives monitor snapshots via a channel.
12///
13/// This source is designed for integration with message bus systems.
14/// The producer (e.g., a message bus subscriber) sends snapshots
15/// through the channel, and this source provides them to the TUI.
16///
17/// # Example
18///
19/// ```
20/// use buswatch_tui::ChannelSource;
21///
22/// // Create a channel pair
23/// let (tx, source) = ChannelSource::create("rabbitmq://localhost");
24/// ```
25#[derive(Debug)]
26pub struct ChannelSource {
27 receiver: watch::Receiver<Snapshot>,
28 description: String,
29 /// Track if we've returned the initial value yet
30 initial_returned: bool,
31}
32
33impl ChannelSource {
34 /// Create a new channel source.
35 ///
36 /// # Arguments
37 ///
38 /// * `receiver` - The receiving end of a watch channel
39 /// * `source_description` - A description of where snapshots come from
40 /// (e.g., "rabbitmq://localhost", "nats://broker:4222")
41 pub fn new(receiver: watch::Receiver<Snapshot>, source_description: &str) -> Self {
42 let description = format!("channel: {}", source_description);
43 Self {
44 receiver,
45 description,
46 initial_returned: false,
47 }
48 }
49
50 /// Create a channel pair for sending snapshots to a ChannelSource.
51 ///
52 /// Returns (sender, source) where the sender can be used to push
53 /// snapshots and the source can be used with the Doctor TUI.
54 pub fn create(source_description: &str) -> (watch::Sender<Snapshot>, Self) {
55 let (tx, rx) = watch::channel(Snapshot::default());
56 let source = Self::new(rx, source_description);
57 (tx, source)
58 }
59}
60
61impl DataSource for ChannelSource {
62 fn poll(&mut self) -> Option<Snapshot> {
63 // Return the initial value on first poll
64 if !self.initial_returned {
65 self.initial_returned = true;
66 self.receiver.mark_changed();
67 }
68
69 // Check if there's a new value without blocking
70 if self.receiver.has_changed().unwrap_or(false) {
71 let snapshot = self.receiver.borrow_and_update().clone();
72 Some(snapshot)
73 } else {
74 None
75 }
76 }
77
78 fn description(&self) -> &str {
79 &self.description
80 }
81
82 fn error(&self) -> Option<&str> {
83 // Channel sources don't have file-based errors
84 // Connection errors would be handled by the message bus layer
85 None
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92
93 #[test]
94 fn test_channel_source_poll() {
95 let (tx, mut source) = ChannelSource::create("test");
96
97 // Initially returns the default (empty) snapshot
98 let snapshot = source.poll();
99 assert!(snapshot.is_some());
100 assert!(snapshot.unwrap().is_empty());
101
102 // No change, so poll returns None
103 assert!(source.poll().is_none());
104
105 // Send a new snapshot
106 let new_snapshot = Snapshot::builder().module("TestModule", |m| m).build();
107 tx.send(new_snapshot).unwrap();
108
109 // Now poll returns the new snapshot
110 let snapshot = source.poll();
111 assert!(snapshot.is_some());
112 assert_eq!(snapshot.unwrap().len(), 1);
113 }
114}