wick_packet/
packet_stream.rs1use std::pin::Pin;
2use std::task::Poll;
3
4use pin_project_lite::pin_project;
5use tokio_stream::Stream;
6use tracing::{span_enabled, Span};
7use wasmrs_rx::FluxChannel;
8
9use crate::{ContextTransport, InherentData, Packet, PacketExt, Result, RuntimeConfig};
10
11pub type PacketSender = FluxChannel<Packet, crate::Error>;
12
13type ContextConfig = (RuntimeConfig, InherentData);
14
15#[cfg(target_family = "wasm")]
16pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
18#[cfg(not(target_family = "wasm"))]
19pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
21
22#[cfg(target_family = "wasm")]
23pin_project! {
24 #[must_use]
26 pub struct PacketStream {
27 #[pin]
28 inner: Box<dyn Stream<Item = Result<Packet>> + Unpin>,
29 config: Option<ContextConfig>,
30 span: Span
31 }
32}
33
34#[cfg(not(target_family = "wasm"))]
35pin_project! {
36 #[must_use]
38 pub struct PacketStream {
39 #[pin]
40 inner: Box<dyn Stream<Item = Result<Packet>> + Send + Unpin>,
41 config: Option<ContextConfig>,
42 span: Span
43 }
44}
45
46impl Default for PacketStream {
47 fn default() -> Self {
48 PacketStream::empty()
49 }
50}
51
52impl From<BoxStream<Result<Packet>>> for PacketStream {
53 fn from(stream: BoxStream<Result<Packet>>) -> Self {
54 Self::new(stream)
55 }
56}
57
58impl From<Vec<Packet>> for PacketStream {
59 fn from(iter: Vec<Packet>) -> Self {
60 Self::new(Box::new(tokio_stream::iter(iter.into_iter().map(Ok))))
61 }
62}
63
64impl PacketStream {
65 #[cfg(target_family = "wasm")]
66 pub fn new(rx: impl Stream<Item = Result<Packet>> + Unpin + 'static) -> Self {
67 Self {
68 inner: Box::new(tokio_stream::StreamExt::fuse(rx)),
69 config: Default::default(),
70 span: Span::current(),
71 }
72 }
73
74 #[cfg(not(target_family = "wasm"))]
75 pub fn new<T: Stream<Item = Result<Packet>> + Unpin + Send + 'static>(rx: T) -> Self {
76 use tokio_stream::StreamExt;
77
78 Self {
79 inner: Box::new(rx.fuse()),
80 config: Default::default(),
81 span: Span::current(),
82 }
83 }
84
85 pub fn noop() -> Self {
86 Self::new(Box::new(tokio_stream::once(Ok(Packet::no_input()))))
87 }
88
89 pub fn set_span(&mut self, span: Span) {
90 self.span = span;
91 }
92
93 pub fn set_context(&mut self, context: RuntimeConfig, inherent: InherentData) {
94 self.config.replace((context, inherent));
95 }
96
97 pub fn new_channels() -> (PacketSender, Self) {
98 let (flux, rx) = FluxChannel::new_parts();
99 (flux, Self::new(Box::new(rx)))
100 }
101
102 pub fn empty() -> Self {
103 Self::new(Box::new(tokio_stream::empty()))
104 }
105}
106
107impl std::fmt::Debug for PacketStream {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_tuple("PacketStream").finish()
110 }
111}
112
113impl Stream for PacketStream {
114 type Item = Result<Packet>;
115
116 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
117 let mut this = self;
118 let mut this = Pin::new(&mut this);
119 let config = this.config.take();
120 let poll = { Pin::new(&mut *this.inner).poll_next(cx) };
121
122 if let Some(config) = config {
127 match poll {
128 Poll::Ready(Some(Ok(mut packet))) => {
129 packet.set_context(
130 wasmrs_codec::messagepack::serialize(&ContextTransport::new(config.0, config.1))
131 .unwrap()
132 .into(),
133 );
134 tracing::trace!("attached context to packet on port '{}'", packet.port());
135 if cfg!(debug_assertions) {
136 this.span.in_scope(|| {
137 if span_enabled!(tracing::Level::TRACE) {
138 let debug_packet = packet
139 .clone()
140 .decode_value()
141 .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
142 let until = std::cmp::min(debug_packet.len(), 2048);
143 this.span.in_scope(|| {
144 tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
145 });
146 }
147 });
148 }
149 Poll::Ready(Some(Ok(packet)))
150 }
151 x => {
152 this.config.replace(config);
153 x
154 }
155 }
156 } else {
157 if let Poll::Ready(Some(Ok(packet))) = &poll {
158 if cfg!(debug_assertions) {
159 this.span.in_scope(|| {
160 if span_enabled!(tracing::Level::TRACE) {
161 let debug_packet = packet
162 .clone()
163 .decode_value()
164 .map_or_else(|_| format!("{:?}", packet.payload()), |j| j.to_string());
165 let until = std::cmp::min(debug_packet.len(), 2048);
166 this.span.in_scope(|| {
167 tracing::trace!(flags=packet.flags(), port=packet.port(), packet=%&debug_packet[..until], "packet");
168 });
169 }
170 });
171 }
172 }
173 poll
174 }
175 }
176}
177
178pub fn into_packet<N: Into<String>, T: serde::Serialize>(
179 name: N,
180) -> Box<dyn FnMut(anyhow::Result<T>) -> Result<Packet>> {
181 let name = name.into();
182 Box::new(move |x| Ok(x.map_or_else(|e| Packet::err(&name, e.to_string()), |x| Packet::encode(&name, &x))))
183}