buswatch_tui/source/
channel.rs

1//! Channel-based data source.
2//!
3//! Receives monitor snapshots via a tokio watch channel.
4//! This is useful for message bus integration where snapshots
5//! are pushed rather than polled from a file.
6
7use tokio::sync::watch;
8
9use super::{DataSource, Snapshot};
10
11/// A data source that receives monitor snapshots via a channel.
12///
13/// This source is designed for integration with message bus systems.
14/// The producer (e.g., a message bus subscriber) sends snapshots
15/// through the channel, and this source provides them to the TUI.
16///
17/// # Example
18///
19/// ```
20/// use buswatch_tui::ChannelSource;
21///
22/// // Create a channel pair
23/// let (tx, source) = ChannelSource::create("rabbitmq://localhost");
24/// ```
25#[derive(Debug)]
26pub struct ChannelSource {
27    receiver: watch::Receiver<Snapshot>,
28    description: String,
29    /// Track if we've returned the initial value yet
30    initial_returned: bool,
31}
32
33impl ChannelSource {
34    /// Create a new channel source.
35    ///
36    /// # Arguments
37    ///
38    /// * `receiver` - The receiving end of a watch channel
39    /// * `source_description` - A description of where snapshots come from
40    ///   (e.g., "rabbitmq://localhost", "nats://broker:4222")
41    pub fn new(receiver: watch::Receiver<Snapshot>, source_description: &str) -> Self {
42        let description = format!("channel: {}", source_description);
43        Self {
44            receiver,
45            description,
46            initial_returned: false,
47        }
48    }
49
50    /// Create a channel pair for sending snapshots to a ChannelSource.
51    ///
52    /// Returns (sender, source) where the sender can be used to push
53    /// snapshots and the source can be used with the Doctor TUI.
54    pub fn create(source_description: &str) -> (watch::Sender<Snapshot>, Self) {
55        let (tx, rx) = watch::channel(Snapshot::default());
56        let source = Self::new(rx, source_description);
57        (tx, source)
58    }
59}
60
61impl DataSource for ChannelSource {
62    fn poll(&mut self) -> Option<Snapshot> {
63        // Return the initial value on first poll
64        if !self.initial_returned {
65            self.initial_returned = true;
66            self.receiver.mark_changed();
67        }
68
69        // Check if there's a new value without blocking
70        if self.receiver.has_changed().unwrap_or(false) {
71            let snapshot = self.receiver.borrow_and_update().clone();
72            Some(snapshot)
73        } else {
74            None
75        }
76    }
77
78    fn description(&self) -> &str {
79        &self.description
80    }
81
82    fn error(&self) -> Option<&str> {
83        // Channel sources don't have file-based errors
84        // Connection errors would be handled by the message bus layer
85        None
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_channel_source_poll() {
95        let (tx, mut source) = ChannelSource::create("test");
96
97        // Initially returns the default (empty) snapshot
98        let snapshot = source.poll();
99        assert!(snapshot.is_some());
100        assert!(snapshot.unwrap().is_empty());
101
102        // No change, so poll returns None
103        assert!(source.poll().is_none());
104
105        // Send a new snapshot
106        let new_snapshot = Snapshot::builder().module("TestModule", |m| m).build();
107        tx.send(new_snapshot).unwrap();
108
109        // Now poll returns the new snapshot
110        let snapshot = source.poll();
111        assert!(snapshot.is_some());
112        assert_eq!(snapshot.unwrap().len(), 1);
113    }
114}