termichart_data/
stream_adapter.rs1use termichart_core::Candle;
2use tokio::sync::mpsc;
3
4pub struct StreamAdapter {
11 rx: mpsc::UnboundedReceiver<Candle>,
12}
13
14impl StreamAdapter {
15 pub fn new() -> (Self, mpsc::UnboundedSender<Candle>) {
18 let (tx, rx) = mpsc::unbounded_channel();
19 (Self { rx }, tx)
20 }
21
22 pub async fn recv(&mut self) -> Option<Candle> {
26 self.rx.recv().await
27 }
28
29 pub fn try_recv(&mut self) -> Option<Candle> {
33 self.rx.try_recv().ok()
34 }
35}
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40
41 #[test]
42 fn try_recv_returns_none_when_empty() {
43 let (mut adapter, _tx) = StreamAdapter::new();
44 assert!(adapter.try_recv().is_none());
45 }
46
47 #[test]
48 fn try_recv_returns_sent_candle() {
49 let (mut adapter, tx) = StreamAdapter::new();
50 let candle = Candle {
51 time: 1.0,
52 open: 10.0,
53 high: 15.0,
54 low: 9.0,
55 close: 14.0,
56 volume: 100.0,
57 };
58 tx.send(candle).unwrap();
59 let received = adapter.try_recv().unwrap();
60 assert_eq!(received.close, 14.0);
61 }
62
63 #[tokio::test]
64 async fn recv_returns_sent_candle() {
65 let (mut adapter, tx) = StreamAdapter::new();
66 let candle = Candle {
67 time: 2.0,
68 open: 20.0,
69 high: 25.0,
70 low: 19.0,
71 close: 24.0,
72 volume: 200.0,
73 };
74 tx.send(candle).unwrap();
75 let received = adapter.recv().await.unwrap();
76 assert_eq!(received.time, 2.0);
77 }
78
79 #[tokio::test]
80 async fn recv_returns_none_when_sender_dropped() {
81 let (mut adapter, tx) = StreamAdapter::new();
82 drop(tx);
83 assert!(adapter.recv().await.is_none());
84 }
85}