elfo_core/stream.rs
1use std::{
2 any::Any,
3 future::Future,
4 pin::Pin,
5 task::{self, Poll},
6};
7
8use futures::{self, channel::mpsc, sink::SinkExt as _, stream, stream::StreamExt as _};
9use pin_project::pin_project;
10use sealed::sealed;
11
12use crate::{
13 envelope::{Envelope, MessageKind},
14 message::{AnyMessage, Message},
15 scope::{self, Scope},
16 source::{SourceArc, SourceStream, UnattachedSource, UntypedSourceArc},
17 tracing::TraceId,
18 Addr,
19};
20
21// === Stream ===
22
23/// A source that emits messages from a provided stream or future.
24///
25/// Possible items of a stream (the `M` parameter):
26/// * Any instance of [`Message`].
27/// * `Result<impl Message, impl Message>`.
28///
29/// Note: the `new()` constructor is reserved until `AsyncIterator` is
30/// [stabilized](https://github.com/rust-lang/rust/issues/79024).
31///
32/// All wrapped streams and futures are fused by the implementation.
33///
34/// Note: [`Stream::is_terminated()`] and [`Stream::terminate()`]
35/// cannot be called *inside* the stream, because it leads to a deadlock.
36///
37/// [`Stream::is_terminated()`]: crate::source::SourceHandle::is_terminated()
38/// [`Stream::terminate()`]: crate::source::SourceHandle::terminate()
39///
40/// # Tracing
41///
42/// * If the stream created using [`Stream::from_futures03()`], every message
43/// starts a new trace.
44/// * If created using [`Stream::once()`], the current trace is preserved.
45/// * If created using [`Stream::generate()`], the current trace is preserved.
46///
47/// You can always use [`scope::set_trace_id()`] to override the current trace.
48///
49/// # Examples
50///
51/// Create a stream based on [`futures::Stream`]:
52/// ```
53/// # use elfo_core as elfo;
54/// # async fn exec(mut ctx: elfo::Context) {
55/// # use elfo::{message, msg};
56/// use elfo::stream::Stream;
57///
58/// #[message]
59/// struct MyItem(u32);
60///
61/// let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
62/// ctx.attach(Stream::from_futures03(stream));
63///
64/// while let Some(envelope) = ctx.recv().await {
65/// msg!(match envelope {
66/// MyItem => { /* ... */ },
67/// });
68/// }
69/// # }
70/// ```
71///
72/// Perform a background request:
73/// ```
74/// # use elfo_core as elfo;
75/// # async fn exec(mut ctx: elfo::Context) {
76/// # use elfo::{message, msg};
77/// # #[message]
78/// # struct SomeEvent;
79/// use elfo::stream::Stream;
80///
81/// #[message]
82/// struct DataFetched(u32);
83///
84/// #[message]
85/// struct FetchDataFailed(String);
86///
87/// async fn fetch_data() -> Result<DataFetched, FetchDataFailed> {
88/// // ...
89/// # todo!()
90/// }
91///
92/// while let Some(envelope) = ctx.recv().await {
93/// msg!(match envelope {
94/// SomeEvent => {
95/// ctx.attach(Stream::once(fetch_data()));
96/// },
97/// DataFetched => { /* ... */ },
98/// FetchDataFailed => { /* ... */ },
99/// });
100/// }
101/// # }
102/// ```
103///
104/// Generate a stream (an alternative to `async-stream`):
105/// ```
106/// # use elfo_core as elfo;
107/// # async fn exec(mut ctx: elfo::Context) {
108/// # use elfo::{message, msg};
109/// use elfo::stream::Stream;
110///
111/// #[message]
112/// struct SomeMessage(u32);
113///
114/// #[message]
115/// struct AnotherMessage;
116///
117/// ctx.attach(Stream::generate(|mut e| async move {
118/// e.emit(SomeMessage(42)).await;
119/// e.emit(AnotherMessage).await;
120/// }));
121///
122/// while let Some(envelope) = ctx.recv().await {
123/// msg!(match envelope {
124/// SomeMessage(no) | AnotherMessage => { /* ... */ },
125/// });
126/// }
127/// # }
128/// ```
129pub struct Stream<M = AnyMessage> {
130 source: SourceArc<StreamSource<dyn futures::Stream<Item = M> + Send + 'static>>,
131}
132
133#[sealed]
134impl<M: StreamItem> crate::source::SourceHandle for Stream<M> {
135 fn is_terminated(&self) -> bool {
136 self.source.is_terminated()
137 }
138
139 fn terminate_by_ref(&self) -> bool {
140 self.source.terminate_by_ref()
141 }
142}
143
144impl<M: StreamItem> Stream<M> {
145 /// Creates an unattached source based on the provided [`futures::Stream`].
146 pub fn from_futures03<S>(stream: S) -> UnattachedSource<Self>
147 where
148 S: futures::Stream<Item = M> + Send + 'static,
149 {
150 Self::from_futures03_inner(stream, true, false)
151 }
152
153 /// Creates an uattached source based on the provided future.
154 pub fn once<F>(future: F) -> UnattachedSource<Self>
155 where
156 F: Future<Output = M> + Send + 'static,
157 {
158 Self::from_futures03_inner(stream::once(future), false, true)
159 }
160
161 fn from_futures03_inner(
162 stream: impl futures::Stream<Item = M> + Send + 'static,
163 rewrite_trace_id: bool,
164 oneshot: bool,
165 ) -> UnattachedSource<Self> {
166 // TODO: should it be ok to create a stream outside the actor system?
167 // However, it requires some sort of `on_attach()` to get a scope inside.
168 #[cfg(not(feature = "test-util"))]
169 let scope = scope::expose();
170 #[cfg(feature = "test-util")]
171 let scope = scope::try_expose().unwrap_or_else(|| {
172 Scope::test(
173 Addr::NULL,
174 // XXX
175 std::sync::Arc::new(crate::actor::ActorMeta {
176 group: "test".into(),
177 key: "test".into(),
178 }),
179 )
180 });
181
182 let source = StreamSource {
183 scope,
184 rewrite_trace_id,
185 inner: stream,
186 };
187
188 if rewrite_trace_id {
189 source.scope.set_trace_id(TraceId::generate());
190 }
191
192 // See comments for `from_untyped` to get details why we use it directly here.
193 let source = SourceArc::from_untyped(UntypedSourceArc::new(source, oneshot));
194 UnattachedSource::new(source, |source| Self { source })
195 }
196}
197
198impl Stream<AnyMessage> {
199 /// Generates a stream from the provided generator.
200 ///
201 /// The generator receives [`Emitter`] as an argument and should return a
202 /// future that will produce messages by using [`Emitter::emit`].
203 pub fn generate<G, F>(generator: G) -> UnattachedSource<Self>
204 where
205 G: FnOnce(Emitter) -> F,
206 F: Future<Output = ()> + Send + 'static,
207 {
208 // Highly inspired by https://github.com/Riateche/stream_generator.
209 // TODO: `mpsc::channel` produces overhead here, replace with a custom slot.
210 let (tx, rx) = mpsc::channel(0);
211 let gen = generator(Emitter(tx));
212 let gen = stream::once(gen).filter_map(|_| async { None });
213 let stream = stream::select(gen, rx);
214
215 Self::from_futures03_inner(stream, false, false)
216 }
217}
218
219#[pin_project]
220struct StreamSource<S: ?Sized> {
221 scope: Scope,
222 rewrite_trace_id: bool,
223 #[pin]
224 inner: S,
225}
226
227impl<S, M> SourceStream for StreamSource<S>
228where
229 S: futures::Stream<Item = M> + ?Sized + Send + 'static,
230 M: StreamItem,
231{
232 fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
233 // We never call `SourceArc::lock().stream()`, so it can be unimplemented.
234 // Anyway, it cannot be implemented because `StreamSource<_>` is DST.
235 unreachable!()
236 }
237
238 fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
239 let this = self.project();
240
241 // TODO: get rid of cloning here.
242 // tokio's `LocalKey` API forces us to clone the current scope on every poll.
243 // Usually, it's not a problem, but `Scope` contains a shared `Arc` among
244 // all actors inside a group, which can lead to a high contention.
245 // We can avoid it by implementing a custom `LocalKey`.
246 let scope = this.scope.clone();
247
248 scope.sync_within(|| match this.inner.poll_next(cx) {
249 Poll::Ready(Some(message)) => {
250 let trace_id = scope::trace_id();
251
252 this.scope.set_trace_id(if *this.rewrite_trace_id {
253 TraceId::generate()
254 } else {
255 trace_id
256 });
257
258 Poll::Ready(Some(message.pack(trace_id)))
259 }
260 Poll::Ready(None) => Poll::Ready(None),
261 Poll::Pending => {
262 this.scope.set_trace_id(scope::trace_id());
263 Poll::Pending
264 }
265 })
266 }
267}
268
269// === Emitter ===
270
271/// A handle for emitting messages from [`Stream::generate`].
272pub struct Emitter(mpsc::Sender<AnyMessage>);
273
274impl Emitter {
275 /// Emits a message from the generated stream.
276 pub async fn emit<M: Message>(&mut self, message: M) {
277 // TODO: create `Envelope` to avoid extra allocation.
278 let _ = self.0.send(AnyMessage::new(message)).await;
279 }
280}
281
282// === StreamItem ===
283
284#[sealed]
285pub trait StreamItem: 'static {
286 /// This method is private.
287 #[doc(hidden)]
288 fn pack(self, trace_id: TraceId) -> Envelope;
289}
290
291#[sealed]
292impl<M: Message> StreamItem for M {
293 /// This method is private.
294 #[doc(hidden)]
295 fn pack(self, trace_id: TraceId) -> Envelope {
296 let kind = MessageKind::regular(Addr::NULL);
297 Envelope::with_trace_id(self, kind, trace_id)
298 }
299}
300
301#[sealed]
302impl<M1: Message, M2: Message> StreamItem for Result<M1, M2> {
303 /// This method is private.
304 #[doc(hidden)]
305 fn pack(self, trace_id: TraceId) -> Envelope {
306 match self {
307 Ok(msg) => msg.pack(trace_id),
308 Err(msg) => msg.pack(trace_id),
309 }
310 }
311}