varpulis_runtime/
stream.rs1use std::collections::VecDeque;
4
5use tokio::sync::mpsc;
6
7use crate::event::Event;
8
9#[derive(Debug)]
11pub struct Stream {
12 pub name: String,
13 receiver: mpsc::Receiver<Event>,
14 buffer: VecDeque<Event>,
15}
16
17impl Stream {
18 pub fn new(name: impl Into<String>, receiver: mpsc::Receiver<Event>) -> Self {
19 Self {
20 name: name.into(),
21 receiver,
22 buffer: VecDeque::new(),
23 }
24 }
25
26 pub async fn next(&mut self) -> Option<Event> {
27 if let Some(event) = self.buffer.pop_front() {
28 return Some(event);
29 }
30 self.receiver.recv().await
31 }
32
33 pub fn push_back(&mut self, event: Event) {
34 self.buffer.push_back(event);
35 }
36}
37
38#[derive(Debug)]
40pub struct StreamSender {
41 pub name: String,
42 sender: mpsc::Sender<Event>,
43}
44
45impl StreamSender {
46 pub fn new(name: impl Into<String>, sender: mpsc::Sender<Event>) -> Self {
47 Self {
48 name: name.into(),
49 sender,
50 }
51 }
52
53 pub async fn send(&self, event: Event) -> Result<(), mpsc::error::SendError<Event>> {
54 self.sender.send(event).await
55 }
56}
57
58pub fn channel(name: impl Into<String>, buffer: usize) -> (StreamSender, Stream) {
60 let name = name.into();
61 let (tx, rx) = mpsc::channel(buffer);
62 (StreamSender::new(name.clone(), tx), Stream::new(name, rx))
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68
69 #[tokio::test]
70 async fn test_stream_channel() {
71 let (sender, stream) = channel("test_stream", 10);
72
73 assert_eq!(sender.name, "test_stream");
74 assert_eq!(stream.name, "test_stream");
75 }
76
77 #[tokio::test]
78 async fn test_stream_send_receive() {
79 let (sender, mut stream) = channel("test", 10);
80
81 let event = Event::new("TestEvent").with_field("id", 1i64);
82 sender.send(event).await.unwrap();
83
84 let received = stream.next().await.unwrap();
85 assert_eq!(&*received.event_type, "TestEvent");
86 assert_eq!(received.get_int("id"), Some(1));
87 }
88
89 #[tokio::test]
90 async fn test_stream_push_back() {
91 let (_sender, mut stream) = channel("test", 10);
92
93 stream.push_back(Event::new("First"));
95 stream.push_back(Event::new("Second"));
96
97 let first = stream.next().await.unwrap();
99 assert_eq!(&*first.event_type, "First");
100
101 let second = stream.next().await.unwrap();
102 assert_eq!(&*second.event_type, "Second");
103 }
104
105 #[tokio::test]
106 async fn test_stream_buffer_then_channel() {
107 let (sender, mut stream) = channel("test", 10);
108
109 stream.push_back(Event::new("Buffered"));
111
112 sender.send(Event::new("FromChannel")).await.unwrap();
114
115 let first = stream.next().await.unwrap();
117 assert_eq!(&*first.event_type, "Buffered");
118
119 let second = stream.next().await.unwrap();
121 assert_eq!(&*second.event_type, "FromChannel");
122 }
123
124 #[tokio::test]
125 async fn test_stream_multiple_events() {
126 let (sender, mut stream) = channel("test", 100);
127
128 for i in 0..10 {
129 sender
130 .send(Event::new("Event").with_field("id", i as i64))
131 .await
132 .unwrap();
133 }
134
135 for i in 0..10 {
136 let event = stream.next().await.unwrap();
137 assert_eq!(event.get_int("id"), Some(i));
138 }
139 }
140
141 #[tokio::test]
142 async fn test_stream_closed() {
143 let (sender, mut stream) = channel("test", 10);
144
145 sender.send(Event::new("Last")).await.unwrap();
146 drop(sender); let event = stream.next().await.unwrap();
149 assert_eq!(&*event.event_type, "Last");
150
151 assert!(stream.next().await.is_none());
153 }
154
155 #[tokio::test]
156 async fn test_stream_sender_name() {
157 let (sender, _stream) = channel("sender_test", 10);
158 assert_eq!(sender.name, "sender_test");
159 }
160
161 #[tokio::test]
162 async fn test_stream_buffer_order() {
163 let (_sender, mut stream) = channel("test", 10);
164
165 stream.push_back(Event::new("A"));
167 stream.push_back(Event::new("B"));
168 stream.push_back(Event::new("C"));
169
170 assert_eq!(&*stream.next().await.unwrap().event_type, "A");
172 assert_eq!(&*stream.next().await.unwrap().event_type, "B");
173 assert_eq!(&*stream.next().await.unwrap().event_type, "C");
174 }
175
176 #[tokio::test]
177 async fn test_stream_sender_closed_error() {
178 let (sender, stream) = channel("test", 10);
179 drop(stream); let event = Event::new("Test");
182 let result = sender.send(event).await;
183 assert!(result.is_err());
184 }
185
186 #[tokio::test]
187 async fn test_stream_large_buffer() {
188 let (sender, mut stream) = channel("test", 1000);
189
190 for i in 0..100 {
192 sender
193 .send(Event::new("Event").with_field("seq", i as i64))
194 .await
195 .unwrap();
196 }
197
198 for i in 0..100 {
200 let event = stream.next().await.unwrap();
201 assert_eq!(event.get_int("seq"), Some(i));
202 }
203 }
204
205 #[tokio::test]
206 async fn test_stream_interleaved_buffer_and_channel() {
207 let (sender, mut stream) = channel("test", 10);
208
209 stream.push_back(Event::new("Buf1"));
211
212 sender.send(Event::new("Chan1")).await.unwrap();
214
215 stream.push_back(Event::new("Buf2"));
217
218 sender.send(Event::new("Chan2")).await.unwrap();
220
221 assert_eq!(&*stream.next().await.unwrap().event_type, "Buf1");
223 assert_eq!(&*stream.next().await.unwrap().event_type, "Buf2");
224
225 assert_eq!(&*stream.next().await.unwrap().event_type, "Chan1");
227 assert_eq!(&*stream.next().await.unwrap().event_type, "Chan2");
228 }
229
230 #[test]
231 fn test_stream_sender_new_directly() {
232 let (tx, _rx) = mpsc::channel(10);
233 let sender = StreamSender::new("direct", tx);
234 assert_eq!(sender.name, "direct");
235 }
236
237 #[test]
238 fn test_stream_new_directly() {
239 let (_tx, rx) = mpsc::channel(10);
240 let stream = Stream::new("direct_stream", rx);
241 assert_eq!(stream.name, "direct_stream");
242 }
243}