wasmflow_streams/streams/
packet_stream.rs

1use std::collections::HashMap;
2use std::pin::Pin;
3
4use futures::{Stream, StreamExt};
5use pin_project_lite::pin_project;
6use wasmflow_packet::{Packet, PacketWrapper};
7
8pin_project! {
9  /// A stream of [PacketWrapper]s
10  #[must_use]
11  pub struct PacketStream {
12    finished: bool,
13    buffer: Option<HashMap<String, Vec<Packet>>>,
14    addons: Vec<PacketWrapper>,
15    #[pin]
16    stream: Box<dyn Stream<Item = PacketWrapper> + Unpin + Send + Sync>,
17  }
18}
19
20impl std::fmt::Debug for PacketStream {
21  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22    f.debug_struct("PacketStream").finish()
23  }
24}
25
26impl PacketStream {
27  /// Instantiate a new [PacketStream]
28  pub fn new(stream: Box<dyn Stream<Item = PacketWrapper> + Unpin + Send + Sync>) -> Self {
29    Self {
30      stream,
31      buffer: None,
32      addons: Vec::new(),
33      finished: false,
34    }
35  }
36
37  /// Manually add a packet to the stream.
38  pub fn push(&mut self, packet: PacketWrapper) {
39    self.addons.push(packet);
40  }
41
42  async fn buffer(&mut self) {
43    if !self.finished {
44      let mut map = HashMap::new();
45      while let Some(next) = self.stream.next().await {
46        let entry = map.entry(next.port).or_insert_with(Vec::<Packet>::new);
47        if !next.payload.is_signal() {
48          entry.push(next.payload);
49        }
50      }
51      self.buffer = Some(map);
52      self.finished = true;
53    }
54  }
55
56  /// Wait for a packet stream to complete and return all packets for the specified port.
57  pub async fn take_port(&mut self, port: &str) -> Option<Vec<Packet>> {
58    self.buffer().await;
59    let buffer = self.buffer.as_mut().unwrap();
60    buffer.remove(port)
61  }
62
63  /// Wait for a packet stream to complete and return the packets mapped to their output port.
64  pub async fn as_map(&mut self) -> Result<HashMap<String, Vec<Packet>>, crate::error::Error> {
65    if self.finished {
66      Err(crate::error::Error::Closed)
67    } else {
68      self.buffer().await;
69      self.buffer.take().ok_or(crate::error::Error::Closed)
70    }
71  }
72}
73
74impl Stream for PacketStream {
75  type Item = PacketWrapper;
76
77  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
78    let this = self.project();
79    if !this.addons.is_empty() {
80      let packet = this.addons.pop().unwrap();
81      std::task::Poll::Ready(Some(packet))
82    } else {
83      this.stream.poll_next(cx)
84    }
85  }
86
87  fn size_hint(&self) -> (usize, Option<usize>) {
88    let (a, b) = self.stream.size_hint();
89    (a, b.map(|v| v + self.addons.len()))
90  }
91}