wasmflow_output/
component_output.rs1use std::marker::PhantomData;
2
3use serde::de::DeserializeOwned;
4use tokio_stream::StreamExt;
5use wasmflow_packet::{Packet, PacketWrapper};
6use wasmflow_streams::PacketStream;
7use wasmflow_transport::TransportStream;
8
9use crate::error::Error;
10
11#[allow(missing_debug_implementations)]
13pub struct ComponentOutput {
14 packets: PacketStream,
15}
16
17impl ComponentOutput {
18 #[must_use]
20 pub fn new<S>(packets: S) -> Self
21 where
22 S: tokio_stream::Stream<Item = PacketWrapper> + Unpin + Send + Sync + 'static,
23 S: Sized,
24 {
25 Self {
26 packets: PacketStream::new(Box::new(packets)),
27 }
28 }
29
30 #[must_use]
32 pub fn new_from_ts(packets: TransportStream) -> Self {
33 Self {
34 packets: PacketStream::new(Box::new(
35 packets.map(|a| PacketWrapper::new_raw(a.port, a.payload.into())),
36 )),
37 }
38 }
39 pub async fn drain_port(&mut self, port: &str) -> Result<Vec<Packet>, Error> {
41 self
42 .packets
43 .take_port(port)
44 .await
45 .ok_or_else(|| Error::PortNotFound(port.to_owned()))
46 }
47}
48
49#[must_use]
51pub struct PortOutput<T: DeserializeOwned> {
52 name: String,
53 iter: Box<dyn Iterator<Item = Packet>>,
54 _data: PhantomData<T>,
55}
56
57impl<T: DeserializeOwned> std::fmt::Debug for PortOutput<T> {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.debug_struct("PortOutput").field("iter", &self.name).finish()
60 }
61}
62
63impl<T: DeserializeOwned> PortOutput<T> {
64 pub fn new(name: String, packets: Vec<Packet>) -> Self {
66 Self {
67 name,
68 iter: Box::new(packets.into_iter()),
69 _data: PhantomData,
70 }
71 }
72
73 pub fn deserialize_next(&mut self) -> Result<T, Error> {
75 match self.iter.next() {
76 Some(val) => Ok(
77 val
78 .deserialize()
79 .map_err(|e| crate::error::Error::Codec(e.to_string()))?,
80 ),
81 None => Err(crate::error::Error::EndOfOutput(self.name.clone())),
82 }
83 }
84}
85
86impl<T: DeserializeOwned> Iterator for PortOutput<T> {
87 type Item = Packet;
88
89 fn next(&mut self) -> Option<Self::Item> {
90 self.iter.next()
91 }
92}