Skip to main content

varpulis_runtime/
stream.rs

1//! Stream abstraction for the runtime
2
3use std::collections::VecDeque;
4
5use tokio::sync::mpsc;
6
7use crate::event::Event;
8
9/// A stream of events
10#[derive(Debug)]
11pub struct Stream {
12    pub name: String,
13    receiver: mpsc::Receiver<Event>,
14    buffer: VecDeque<Event>,
15}
16
17impl Stream {
18    pub fn new(name: impl Into<String>, receiver: mpsc::Receiver<Event>) -> Self {
19        Self {
20            name: name.into(),
21            receiver,
22            buffer: VecDeque::new(),
23        }
24    }
25
26    pub async fn next(&mut self) -> Option<Event> {
27        if let Some(event) = self.buffer.pop_front() {
28            return Some(event);
29        }
30        self.receiver.recv().await
31    }
32
33    pub fn push_back(&mut self, event: Event) {
34        self.buffer.push_back(event);
35    }
36}
37
38/// Stream sender for producing events
39#[derive(Debug)]
40pub struct StreamSender {
41    pub name: String,
42    sender: mpsc::Sender<Event>,
43}
44
45impl StreamSender {
46    pub fn new(name: impl Into<String>, sender: mpsc::Sender<Event>) -> Self {
47        Self {
48            name: name.into(),
49            sender,
50        }
51    }
52
53    pub async fn send(&self, event: Event) -> Result<(), mpsc::error::SendError<Event>> {
54        self.sender.send(event).await
55    }
56}
57
58/// Create a stream channel pair
59pub fn channel(name: impl Into<String>, buffer: usize) -> (StreamSender, Stream) {
60    let name = name.into();
61    let (tx, rx) = mpsc::channel(buffer);
62    (StreamSender::new(name.clone(), tx), Stream::new(name, rx))
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68
69    #[tokio::test]
70    async fn test_stream_channel() {
71        let (sender, stream) = channel("test_stream", 10);
72
73        assert_eq!(sender.name, "test_stream");
74        assert_eq!(stream.name, "test_stream");
75    }
76
77    #[tokio::test]
78    async fn test_stream_send_receive() {
79        let (sender, mut stream) = channel("test", 10);
80
81        let event = Event::new("TestEvent").with_field("id", 1i64);
82        sender.send(event).await.unwrap();
83
84        let received = stream.next().await.unwrap();
85        assert_eq!(&*received.event_type, "TestEvent");
86        assert_eq!(received.get_int("id"), Some(1));
87    }
88
89    #[tokio::test]
90    async fn test_stream_push_back() {
91        let (_sender, mut stream) = channel("test", 10);
92
93        // Push events into buffer
94        stream.push_back(Event::new("First"));
95        stream.push_back(Event::new("Second"));
96
97        // Should receive from buffer first
98        let first = stream.next().await.unwrap();
99        assert_eq!(&*first.event_type, "First");
100
101        let second = stream.next().await.unwrap();
102        assert_eq!(&*second.event_type, "Second");
103    }
104
105    #[tokio::test]
106    async fn test_stream_buffer_then_channel() {
107        let (sender, mut stream) = channel("test", 10);
108
109        // Push to buffer
110        stream.push_back(Event::new("Buffered"));
111
112        // Send via channel
113        sender.send(Event::new("FromChannel")).await.unwrap();
114
115        // Buffer first
116        let first = stream.next().await.unwrap();
117        assert_eq!(&*first.event_type, "Buffered");
118
119        // Then channel
120        let second = stream.next().await.unwrap();
121        assert_eq!(&*second.event_type, "FromChannel");
122    }
123
124    #[tokio::test]
125    async fn test_stream_multiple_events() {
126        let (sender, mut stream) = channel("test", 100);
127
128        for i in 0..10 {
129            sender
130                .send(Event::new("Event").with_field("id", i as i64))
131                .await
132                .unwrap();
133        }
134
135        for i in 0..10 {
136            let event = stream.next().await.unwrap();
137            assert_eq!(event.get_int("id"), Some(i));
138        }
139    }
140
141    #[tokio::test]
142    async fn test_stream_closed() {
143        let (sender, mut stream) = channel("test", 10);
144
145        sender.send(Event::new("Last")).await.unwrap();
146        drop(sender); // Close the channel
147
148        let event = stream.next().await.unwrap();
149        assert_eq!(&*event.event_type, "Last");
150
151        // Next call should return None (channel closed)
152        assert!(stream.next().await.is_none());
153    }
154
155    #[tokio::test]
156    async fn test_stream_sender_name() {
157        let (sender, _stream) = channel("sender_test", 10);
158        assert_eq!(sender.name, "sender_test");
159    }
160
161    #[tokio::test]
162    async fn test_stream_buffer_order() {
163        let (_sender, mut stream) = channel("test", 10);
164
165        // Push events in specific order
166        stream.push_back(Event::new("A"));
167        stream.push_back(Event::new("B"));
168        stream.push_back(Event::new("C"));
169
170        // Should receive in FIFO order
171        assert_eq!(&*stream.next().await.unwrap().event_type, "A");
172        assert_eq!(&*stream.next().await.unwrap().event_type, "B");
173        assert_eq!(&*stream.next().await.unwrap().event_type, "C");
174    }
175
176    #[tokio::test]
177    async fn test_stream_sender_closed_error() {
178        let (sender, stream) = channel("test", 10);
179        drop(stream); // Close receiver
180
181        let event = Event::new("Test");
182        let result = sender.send(event).await;
183        assert!(result.is_err());
184    }
185
186    #[tokio::test]
187    async fn test_stream_large_buffer() {
188        let (sender, mut stream) = channel("test", 1000);
189
190        // Send many events
191        for i in 0..100 {
192            sender
193                .send(Event::new("Event").with_field("seq", i as i64))
194                .await
195                .unwrap();
196        }
197
198        // Receive all events
199        for i in 0..100 {
200            let event = stream.next().await.unwrap();
201            assert_eq!(event.get_int("seq"), Some(i));
202        }
203    }
204
205    #[tokio::test]
206    async fn test_stream_interleaved_buffer_and_channel() {
207        let (sender, mut stream) = channel("test", 10);
208
209        // Push one to buffer
210        stream.push_back(Event::new("Buf1"));
211
212        // Send one via channel
213        sender.send(Event::new("Chan1")).await.unwrap();
214
215        // Push another to buffer
216        stream.push_back(Event::new("Buf2"));
217
218        // Send another via channel
219        sender.send(Event::new("Chan2")).await.unwrap();
220
221        // Buffer events first (in order pushed)
222        assert_eq!(&*stream.next().await.unwrap().event_type, "Buf1");
223        assert_eq!(&*stream.next().await.unwrap().event_type, "Buf2");
224
225        // Then channel events (in order sent)
226        assert_eq!(&*stream.next().await.unwrap().event_type, "Chan1");
227        assert_eq!(&*stream.next().await.unwrap().event_type, "Chan2");
228    }
229
230    #[test]
231    fn test_stream_sender_new_directly() {
232        let (tx, _rx) = mpsc::channel(10);
233        let sender = StreamSender::new("direct", tx);
234        assert_eq!(sender.name, "direct");
235    }
236
237    #[test]
238    fn test_stream_new_directly() {
239        let (_tx, rx) = mpsc::channel(10);
240        let stream = Stream::new("direct_stream", rx);
241        assert_eq!(stream.name, "direct_stream");
242    }
243}