wasmflow_output/
component_output.rs

1use std::marker::PhantomData;
2
3use serde::de::DeserializeOwned;
4use tokio_stream::StreamExt;
5use wasmflow_packet::{Packet, PacketWrapper};
6use wasmflow_streams::PacketStream;
7use wasmflow_transport::TransportStream;
8
9use crate::error::Error;
10
11/// A wrapper object for the packets returned from the collection call.
12#[allow(missing_debug_implementations)]
13pub struct ComponentOutput {
14  packets: PacketStream,
15}
16
17impl ComponentOutput {
18  /// Initialize a [ComponentOutput] with a [Vec<TransportWrapper>]
19  #[must_use]
20  pub fn new<S>(packets: S) -> Self
21  where
22    S: tokio_stream::Stream<Item = PacketWrapper> + Unpin + Send + Sync + 'static,
23    S: Sized,
24  {
25    Self {
26      packets: PacketStream::new(Box::new(packets)),
27    }
28  }
29
30  /// Initialize a [ComponentOutput] with a [TransportStream]
31  #[must_use]
32  pub fn new_from_ts(packets: TransportStream) -> Self {
33    Self {
34      packets: PacketStream::new(Box::new(
35        packets.map(|a| PacketWrapper::new_raw(a.port, a.payload.into())),
36      )),
37    }
38  }
39  /// Get a list of [wasmflow_transport::MessageTransport] from the specified port.
40  pub async fn drain_port(&mut self, port: &str) -> Result<Vec<Packet>, Error> {
41    self
42      .packets
43      .take_port(port)
44      .await
45      .ok_or_else(|| Error::PortNotFound(port.to_owned()))
46  }
47}
48
49/// Iterator wrapper for a list of [wasmflow_transport::MessageTransport]s
50#[must_use]
51pub struct PortOutput<T: DeserializeOwned> {
52  name: String,
53  iter: Box<dyn Iterator<Item = Packet>>,
54  _data: PhantomData<T>,
55}
56
57impl<T: DeserializeOwned> std::fmt::Debug for PortOutput<T> {
58  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59    f.debug_struct("PortOutput").field("iter", &self.name).finish()
60  }
61}
62
63impl<T: DeserializeOwned> PortOutput<T> {
64  /// Constructor for [PortOutput] that takes a list of [wasmflow_transport::MessageTransport]
65  pub fn new(name: String, packets: Vec<Packet>) -> Self {
66    Self {
67      name,
68      iter: Box::new(packets.into_iter()),
69      _data: PhantomData,
70    }
71  }
72
73  /// Grab the next value and deserialize it in one method.
74  pub fn deserialize_next(&mut self) -> Result<T, Error> {
75    match self.iter.next() {
76      Some(val) => Ok(
77        val
78          .deserialize()
79          .map_err(|e| crate::error::Error::Codec(e.to_string()))?,
80      ),
81      None => Err(crate::error::Error::EndOfOutput(self.name.clone())),
82    }
83  }
84}
85
86impl<T: DeserializeOwned> Iterator for PortOutput<T> {
87  type Item = Packet;
88
89  fn next(&mut self) -> Option<Self::Item> {
90    self.iter.next()
91  }
92}