Skip to main content

termichart_data/
stream_adapter.rs

1use termichart_core::Candle;
2use tokio::sync::mpsc;
3
4/// A streaming data adapter backed by a tokio unbounded MPSC channel.
5///
6/// Unlike [`JsonAdapter`](crate::JsonAdapter) and [`CsvAdapter`](crate::CsvAdapter),
7/// `StreamAdapter` does not implement the synchronous [`DataAdapter`](termichart_core::DataAdapter)
8/// trait. Instead it exposes async and try-recv methods for consuming candles
9/// one at a time from a live data feed.
10pub struct StreamAdapter {
11    rx: mpsc::UnboundedReceiver<Candle>,
12}
13
14impl StreamAdapter {
15    /// Creates a new `StreamAdapter` together with the sender half that can
16    /// be used to push candles into the stream.
17    pub fn new() -> (Self, mpsc::UnboundedSender<Candle>) {
18        let (tx, rx) = mpsc::unbounded_channel();
19        (Self { rx }, tx)
20    }
21
22    /// Asynchronously receives the next candle from the stream.
23    ///
24    /// Returns `None` when the sender has been dropped and the channel is empty.
25    pub async fn recv(&mut self) -> Option<Candle> {
26        self.rx.recv().await
27    }
28
29    /// Attempts to receive a candle without blocking.
30    ///
31    /// Returns `None` if the channel is empty or closed.
32    pub fn try_recv(&mut self) -> Option<Candle> {
33        self.rx.try_recv().ok()
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40
41    #[test]
42    fn try_recv_returns_none_when_empty() {
43        let (mut adapter, _tx) = StreamAdapter::new();
44        assert!(adapter.try_recv().is_none());
45    }
46
47    #[test]
48    fn try_recv_returns_sent_candle() {
49        let (mut adapter, tx) = StreamAdapter::new();
50        let candle = Candle {
51            time: 1.0,
52            open: 10.0,
53            high: 15.0,
54            low: 9.0,
55            close: 14.0,
56            volume: 100.0,
57        };
58        tx.send(candle).unwrap();
59        let received = adapter.try_recv().unwrap();
60        assert_eq!(received.close, 14.0);
61    }
62
63    #[tokio::test]
64    async fn recv_returns_sent_candle() {
65        let (mut adapter, tx) = StreamAdapter::new();
66        let candle = Candle {
67            time: 2.0,
68            open: 20.0,
69            high: 25.0,
70            low: 19.0,
71            close: 24.0,
72            volume: 200.0,
73        };
74        tx.send(candle).unwrap();
75        let received = adapter.recv().await.unwrap();
76        assert_eq!(received.time, 2.0);
77    }
78
79    #[tokio::test]
80    async fn recv_returns_none_when_sender_dropped() {
81        let (mut adapter, tx) = StreamAdapter::new();
82        drop(tx);
83        assert!(adapter.recv().await.is_none());
84    }
85}