Skip to main content

elfo_core/
stream.rs

1use std::{
2    any::Any,
3    future::Future,
4    pin::Pin,
5    task::{self, Poll},
6};
7
8use futures::{self, channel::mpsc, sink::SinkExt as _, stream, stream::StreamExt as _};
9use pin_project::pin_project;
10use sealed::sealed;
11
12use crate::{
13    envelope::{Envelope, MessageKind},
14    message::{AnyMessage, Message},
15    scope::{self, Scope},
16    source::{SourceArc, SourceStream, UnattachedSource, UntypedSourceArc},
17    tracing::TraceId,
18    Addr,
19};
20
21// === Stream ===
22
23/// A source that emits messages from a provided stream or future.
24///
25/// Possible items of a stream (the `M` parameter):
26/// * Any instance of [`Message`].
27/// * `Result<impl Message, impl Message>`.
28///
29/// Note: the `new()` constructor is reserved until `AsyncIterator` is
30/// [stabilized](https://github.com/rust-lang/rust/issues/79024).
31///
32/// All wrapped streams and futures are fused by the implementation.
33///
34/// Note: [`Stream::is_terminated()`] and [`Stream::terminate()`]
35/// cannot be called *inside* the stream, because it leads to a deadlock.
36///
37/// [`Stream::is_terminated()`]: crate::source::SourceHandle::is_terminated()
38/// [`Stream::terminate()`]: crate::source::SourceHandle::terminate()
39///
40/// # Tracing
41///
42/// * If the stream created using [`Stream::from_futures03()`], every message
43///   starts a new trace.
44/// * If created using [`Stream::once()`], the current trace is preserved.
45/// * If created using [`Stream::generate()`], the current trace is preserved.
46///
47/// You can always use [`scope::set_trace_id()`] to override the current trace.
48///
49/// # Examples
50///
51/// Create a stream based on [`futures::Stream`]:
52/// ```
53/// # use elfo_core as elfo;
54/// # async fn exec(mut ctx: elfo::Context) {
55/// # use elfo::{message, msg};
56/// use elfo::stream::Stream;
57///
58/// #[message]
59/// struct MyItem(u32);
60///
61/// let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
62/// ctx.attach(Stream::from_futures03(stream));
63///
64/// while let Some(envelope) = ctx.recv().await {
65///     msg!(match envelope {
66///         MyItem => { /* ... */ },
67///     });
68/// }
69/// # }
70/// ```
71///
72/// Perform a background request:
73/// ```
74/// # use elfo_core as elfo;
75/// # async fn exec(mut ctx: elfo::Context) {
76/// # use elfo::{message, msg};
77/// # #[message]
78/// # struct SomeEvent;
79/// use elfo::stream::Stream;
80///
81/// #[message]
82/// struct DataFetched(u32);
83///
84/// #[message]
85/// struct FetchDataFailed(String);
86///
87/// async fn fetch_data() -> Result<DataFetched, FetchDataFailed> {
88///     // ...
89/// # todo!()
90/// }
91///
92/// while let Some(envelope) = ctx.recv().await {
93///     msg!(match envelope {
94///         SomeEvent => {
95///             ctx.attach(Stream::once(fetch_data()));
96///         },
97///         DataFetched => { /* ... */ },
98///         FetchDataFailed => { /* ... */ },
99///     });
100/// }
101/// # }
102/// ```
103///
104/// Generate a stream (an alternative to `async-stream`):
105/// ```
106/// # use elfo_core as elfo;
107/// # async fn exec(mut ctx: elfo::Context) {
108/// # use elfo::{message, msg};
109/// use elfo::stream::Stream;
110///
111/// #[message]
112/// struct SomeMessage(u32);
113///
114/// #[message]
115/// struct AnotherMessage;
116///
117/// ctx.attach(Stream::generate(|mut e| async move {
118///     e.emit(SomeMessage(42)).await;
119///     e.emit(AnotherMessage).await;
120/// }));
121///
122/// while let Some(envelope) = ctx.recv().await {
123///     msg!(match envelope {
124///         SomeMessage(no) | AnotherMessage => { /* ... */ },
125///     });
126/// }
127/// # }
128/// ```
129pub struct Stream<M = AnyMessage> {
130    source: SourceArc<StreamSource<dyn futures::Stream<Item = M> + Send + 'static>>,
131}
132
133#[sealed]
134impl<M: StreamItem> crate::source::SourceHandle for Stream<M> {
135    fn is_terminated(&self) -> bool {
136        self.source.is_terminated()
137    }
138
139    fn terminate_by_ref(&self) -> bool {
140        self.source.terminate_by_ref()
141    }
142}
143
144impl<M: StreamItem> Stream<M> {
145    /// Creates an unattached source based on the provided [`futures::Stream`].
146    pub fn from_futures03<S>(stream: S) -> UnattachedSource<Self>
147    where
148        S: futures::Stream<Item = M> + Send + 'static,
149    {
150        Self::from_futures03_inner(stream, true, false)
151    }
152
153    /// Creates an uattached source based on the provided future.
154    pub fn once<F>(future: F) -> UnattachedSource<Self>
155    where
156        F: Future<Output = M> + Send + 'static,
157    {
158        Self::from_futures03_inner(stream::once(future), false, true)
159    }
160
161    fn from_futures03_inner(
162        stream: impl futures::Stream<Item = M> + Send + 'static,
163        rewrite_trace_id: bool,
164        oneshot: bool,
165    ) -> UnattachedSource<Self> {
166        // TODO: should it be ok to create a stream outside the actor system?
167        // However, it requires some sort of `on_attach()` to get a scope inside.
168        #[cfg(not(feature = "test-util"))]
169        let scope = scope::expose();
170        #[cfg(feature = "test-util")]
171        let scope = scope::try_expose().unwrap_or_else(|| {
172            Scope::test(
173                Addr::NULL,
174                // XXX
175                std::sync::Arc::new(crate::actor::ActorMeta {
176                    group: "test".into(),
177                    key: "test".into(),
178                }),
179            )
180        });
181
182        let source = StreamSource {
183            scope,
184            rewrite_trace_id,
185            inner: stream,
186        };
187
188        if rewrite_trace_id {
189            source.scope.set_trace_id(TraceId::generate());
190        }
191
192        // See comments for `from_untyped` to get details why we use it directly here.
193        let source = SourceArc::from_untyped(UntypedSourceArc::new(source, oneshot));
194        UnattachedSource::new(source, |source| Self { source })
195    }
196}
197
198impl Stream<AnyMessage> {
199    /// Generates a stream from the provided generator.
200    ///
201    /// The generator receives [`Emitter`] as an argument and should return a
202    /// future that will produce messages by using [`Emitter::emit`].
203    pub fn generate<G, F>(generator: G) -> UnattachedSource<Self>
204    where
205        G: FnOnce(Emitter) -> F,
206        F: Future<Output = ()> + Send + 'static,
207    {
208        // Highly inspired by https://github.com/Riateche/stream_generator.
209        // TODO: `mpsc::channel` produces overhead here, replace with a custom slot.
210        let (tx, rx) = mpsc::channel(0);
211        let gen = generator(Emitter(tx));
212        let gen = stream::once(gen).filter_map(|_| async { None });
213        let stream = stream::select(gen, rx);
214
215        Self::from_futures03_inner(stream, false, false)
216    }
217}
218
219#[pin_project]
220struct StreamSource<S: ?Sized> {
221    scope: Scope,
222    rewrite_trace_id: bool,
223    #[pin]
224    inner: S,
225}
226
227impl<S, M> SourceStream for StreamSource<S>
228where
229    S: futures::Stream<Item = M> + ?Sized + Send + 'static,
230    M: StreamItem,
231{
232    fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
233        // We never call `SourceArc::lock().stream()`, so it can be unimplemented.
234        // Anyway, it cannot be implemented because `StreamSource<_>` is DST.
235        unreachable!()
236    }
237
238    fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
239        let this = self.project();
240
241        // TODO: get rid of cloning here.
242        // tokio's `LocalKey` API forces us to clone the current scope on every poll.
243        // Usually, it's not a problem, but `Scope` contains a shared `Arc` among
244        // all actors inside a group, which can lead to a high contention.
245        // We can avoid it by implementing a custom `LocalKey`.
246        let scope = this.scope.clone();
247
248        scope.sync_within(|| match this.inner.poll_next(cx) {
249            Poll::Ready(Some(message)) => {
250                let trace_id = scope::trace_id();
251
252                this.scope.set_trace_id(if *this.rewrite_trace_id {
253                    TraceId::generate()
254                } else {
255                    trace_id
256                });
257
258                Poll::Ready(Some(message.pack(trace_id)))
259            }
260            Poll::Ready(None) => Poll::Ready(None),
261            Poll::Pending => {
262                this.scope.set_trace_id(scope::trace_id());
263                Poll::Pending
264            }
265        })
266    }
267}
268
269// === Emitter ===
270
271/// A handle for emitting messages from [`Stream::generate`].
272pub struct Emitter(mpsc::Sender<AnyMessage>);
273
274impl Emitter {
275    /// Emits a message from the generated stream.
276    pub async fn emit<M: Message>(&mut self, message: M) {
277        // TODO: create `Envelope` to avoid extra allocation.
278        let _ = self.0.send(AnyMessage::new(message)).await;
279    }
280}
281
282// === StreamItem ===
283
284#[sealed]
285pub trait StreamItem: 'static {
286    /// This method is private.
287    #[doc(hidden)]
288    fn pack(self, trace_id: TraceId) -> Envelope;
289}
290
291#[sealed]
292impl<M: Message> StreamItem for M {
293    /// This method is private.
294    #[doc(hidden)]
295    fn pack(self, trace_id: TraceId) -> Envelope {
296        let kind = MessageKind::regular(Addr::NULL);
297        Envelope::with_trace_id(self, kind, trace_id)
298    }
299}
300
301#[sealed]
302impl<M1: Message, M2: Message> StreamItem for Result<M1, M2> {
303    /// This method is private.
304    #[doc(hidden)]
305    fn pack(self, trace_id: TraceId) -> Envelope {
306        match self {
307            Ok(msg) => msg.pack(trace_id),
308            Err(msg) => msg.pack(trace_id),
309        }
310    }
311}