wick_packet/
packet_stream.rs

1use std::pin::Pin;
2use std::task::Poll;
3
4use pin_project_lite::pin_project;
5use tokio_stream::Stream;
6use tracing::{span_enabled, Span};
7use wasmrs_rx::FluxChannel;
8
9use crate::{ContextTransport, InherentData, Packet, PacketExt, Result, RuntimeConfig};
10
11pub type PacketSender = FluxChannel<Packet, crate::Error>;
12
13type ContextConfig = (RuntimeConfig, InherentData);
14
15#[cfg(target_family = "wasm")]
16/// A Pin<Box<Stream>> of `T`.
17pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
18#[cfg(not(target_family = "wasm"))]
19/// A Pin<Box<Stream>> of `T`.
20pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
21
22#[cfg(target_family = "wasm")]
23pin_project! {
24  /// A stream of [Packet]s
25  #[must_use]
26  pub struct PacketStream {
27    #[pin]
28    inner: Box<dyn Stream<Item = Result<Packet>> + Unpin>,
29    config: Option<ContextConfig>,
30    span: Span
31  }
32}
33
34#[cfg(not(target_family = "wasm"))]
35pin_project! {
36  /// A stream of [Packet]s
37  #[must_use]
38  pub struct PacketStream {
39    #[pin]
40    inner: Box<dyn Stream<Item = Result<Packet>> + Send + Unpin>,
41    config: Option<ContextConfig>,
42    span: Span
43  }
44}
45
46impl Default for PacketStream {
47  fn default() -> Self {
48    PacketStream::empty()
49  }
50}
51
52impl From<BoxStream<Result<Packet>>> for PacketStream {
53  fn from(stream: BoxStream<Result<Packet>>) -> Self {
54    Self::new(stream)
55  }
56}
57
58impl From<Vec<Packet>> for PacketStream {
59  fn from(iter: Vec<Packet>) -> Self {
60    Self::new(Box::new(tokio_stream::iter(iter.into_iter().map(Ok))))
61  }
62}
63
64impl PacketStream {
65  #[cfg(target_family = "wasm")]
66  pub fn new(rx: impl Stream<Item = Result<Packet>> + Unpin + 'static) -> Self {
67    Self {
68      inner: Box::new(tokio_stream::StreamExt::fuse(rx)),
69      config: Default::default(),
70      span: Span::current(),
71    }
72  }
73
74  #[cfg(not(target_family = "wasm"))]
75  pub fn new<T: Stream<Item = Result<Packet>> + Unpin + Send + 'static>(rx: T) -> Self {
76    use tokio_stream::StreamExt;
77
78    Self {
79      inner: Box::new(rx.fuse()),
80      config: Default::default(),
81      span: Span::current(),
82    }
83  }
84
85  pub fn noop() -> Self {
86    Self::new(Box::new(tokio_stream::once(Ok(Packet::no_input()))))
87  }
88
89  pub fn set_span(&mut self, span: Span) {
90    self.span = span;
91  }
92
93  pub fn set_context(&mut self, context: RuntimeConfig, inherent: InherentData) {
94    self.config.replace((context, inherent));
95  }
96
97  pub fn new_channels() -> (PacketSender, Self) {
98    let (flux, rx) = FluxChannel::new_parts();
99    (flux, Self::new(Box::new(rx)))
100  }
101
102  pub fn empty() -> Self {
103    Self::new(Box::new(tokio_stream::empty()))
104  }
105}
106
107impl std::fmt::Debug for PacketStream {
108  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109    f.debug_tuple("PacketStream").finish()
110  }
111}
112
113impl Stream for PacketStream {
114  type Item = Result<Packet>;
115
116  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
117    let mut this = self;
118    let mut this = Pin::new(&mut this);
119    let config = this.config.take();
120    let poll = { Pin::new(&mut *this.inner).poll_next(cx) };
121
122    // Backwards compatibility note:
123    // This is a hack added when context & operation configuration was introduced.
124    // Rather than send it as a beginning packet, it's added as a sidecar to an existing packet and new
125    // components expect it to exist in the first packet they receive.
126    if let Some(config) = config {
127      match poll {
128        Poll::Ready(Some(Ok(mut packet))) => {
129          packet.set_context(
130            wasmrs_codec::messagepack::serialize(&ContextTransport::new(config.0, config.1))
131              .unwrap()
132              .into(),
133          );
134          tracing::trace!("attached context to packet on port '{}'", packet.port());
135          if cfg!(debug_assertions) {
136            this.span.in_scope(|| {
137              if span_enabled!(tracing::Level::TRACE) {
138                let debug_packet = packet
139                  .clone()
140                  .decode_value()
141                  .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
142                let until = std::cmp::min(debug_packet.len(), 2048);
143                this.span.in_scope(|| {
144                  tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
145                });
146              }
147            });
148          }
149          Poll::Ready(Some(Ok(packet)))
150        }
151        x => {
152          this.config.replace(config);
153          x
154        }
155      }
156    } else {
157      if let Poll::Ready(Some(Ok(packet))) = &poll {
158        if cfg!(debug_assertions) {
159          this.span.in_scope(|| {
160            if span_enabled!(tracing::Level::TRACE) {
161              let debug_packet = packet
162                .clone()
163                .decode_value()
164                .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
165              let until = std::cmp::min(debug_packet.len(), 2048);
166              this.span.in_scope(|| {
167                tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
168              });
169            }
170          });
171        }
172      }
173      poll
174    }
175  }
176}
177
178pub fn into_packet<N: Into<String>, T: serde::Serialize>(
179  name: N,
180) -> Box<dyn FnMut(anyhow::Result<T>) -> Result<Packet>> {
181  let name = name.into();
182  Box::new(move |x| Ok(x.map_or_else(|e| Packet::err(&name, e.to_string()), |x| Packet::encode(&name, &x))))
183}