wasmflow_streams/streams/
packet_stream.rs1use std::collections::HashMap;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use pin_project_lite::pin_project;
6use wasmflow_packet::{Packet, PacketWrapper};
7
8pin_project! {
9 #[must_use]
11 pub struct PacketStream {
12 finished: bool,
13 buffer: Option<HashMap<String, Vec<Packet>>>,
14 addons: Vec<PacketWrapper>,
15 #[pin]
16 stream: Box<dyn Stream<Item = PacketWrapper> + Unpin + Send + Sync>,
17 }
18}
19
20impl std::fmt::Debug for PacketStream {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("PacketStream").finish()
23 }
24}
25
26impl PacketStream {
27 pub fn new(stream: Box<dyn Stream<Item = PacketWrapper> + Unpin + Send + Sync>) -> Self {
29 Self {
30 stream,
31 buffer: None,
32 addons: Vec::new(),
33 finished: false,
34 }
35 }
36
37 pub fn push(&mut self, packet: PacketWrapper) {
39 self.addons.push(packet);
40 }
41
42 async fn buffer(&mut self) {
43 if !self.finished {
44 let mut map = HashMap::new();
45 while let Some(next) = self.stream.next().await {
46 let entry = map.entry(next.port).or_insert_with(Vec::<Packet>::new);
47 if !next.payload.is_signal() {
48 entry.push(next.payload);
49 }
50 }
51 self.buffer = Some(map);
52 self.finished = true;
53 }
54 }
55
56 pub async fn take_port(&mut self, port: &str) -> Option<Vec<Packet>> {
58 self.buffer().await;
59 let buffer = self.buffer.as_mut().unwrap();
60 buffer.remove(port)
61 }
62
63 pub async fn as_map(&mut self) -> Result<HashMap<String, Vec<Packet>>, crate::error::Error> {
65 if self.finished {
66 Err(crate::error::Error::Closed)
67 } else {
68 self.buffer().await;
69 self.buffer.take().ok_or(crate::error::Error::Closed)
70 }
71 }
72}
73
74impl Stream for PacketStream {
75 type Item = PacketWrapper;
76
77 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
78 let this = self.project();
79 if !this.addons.is_empty() {
80 let packet = this.addons.pop().unwrap();
81 std::task::Poll::Ready(Some(packet))
82 } else {
83 this.stream.poll_next(cx)
84 }
85 }
86
87 fn size_hint(&self) -> (usize, Option<usize>) {
88 let (a, b) = self.stream.size_hint();
89 (a, b.map(|v| v + self.addons.len()))
90 }
91}