elfo_core/
stream.rs

1use std::{
2    future::Future,
3    mem,
4    pin::Pin,
5    task::{self, Poll},
6};
7
8use futures::{self, channel::mpsc, sink::SinkExt as _, stream, stream::StreamExt as _};
9use parking_lot::Mutex;
10use sealed::sealed;
11
12use crate::{
13    addr::Addr,
14    envelope::{Envelope, MessageKind},
15    message::Message,
16    tracing::TraceId,
17};
18
19// === Stream ===
20
21/// A wrapper around [`futures::Stream`] implementing `Source` trait.
22///
23/// Stream items must implement [`Message`].
24pub struct Stream<S>(Mutex<StreamState<S>>);
25
26enum StreamState<S> {
27    Active(Pin<Box<S>>),
28    Closed,
29}
30
31impl<S> Stream<S> {
32    /// Wraps [`futures::Stream`] into the source.
33    pub fn new(stream: S) -> Self {
34        Self(Mutex::new(StreamState::Active(Box::pin(stream))))
35    }
36
37    /// Drops the inner stream and uses the provided one instead.
38    pub fn set(&self, stream: S) {
39        *self.0.lock() = StreamState::Active(Box::pin(stream));
40    }
41
42    /// Replaces the inner stream with the provided one.
43    pub fn replace(&self, stream: S) -> Option<S>
44    where
45        S: Unpin,
46    {
47        let new_state = StreamState::Active(Box::pin(stream));
48        match mem::replace(&mut *self.0.lock(), new_state) {
49            StreamState::Active(stream) => Some(*Pin::into_inner(stream)),
50            StreamState::Closed => None,
51        }
52    }
53
54    /// Drops the inner stream and stops emitting messages.
55    ///
56    /// [`Stream::set`] and [`Stream::replace`] can be used after this method.
57    pub fn close(&self) -> bool {
58        !matches!(
59            mem::replace(&mut *self.0.lock(), StreamState::Closed),
60            StreamState::Closed
61        )
62    }
63}
64
65impl Stream<()> {
66    /// Generates a stream from the provided generator.
67    ///
68    /// The generator receives [`Yielder`] as an argument and should return a
69    /// future that will produce messages by using [`Yielder::emit`].
70    ///
71    /// # Examples
72    ///
73    /// ```ignore
74    /// #[message]
75    /// struct SomeMessage(u32);
76    ///
77    /// #[message]
78    /// struct AnotherMessage;
79    ///
80    /// let stream = Stream::generate(|mut y| async move {
81    ///     y.emit(SomeMessage(42)).await;
82    ///     y.emit(AnotherMessage).await;
83    /// });
84    ///
85    /// let mut ctx = ctx.with(&stream);
86    /// ```
87    pub fn generate<G, F>(generator: G) -> Stream<impl futures::Stream<Item = Envelope>>
88    where
89        G: FnOnce(Yielder) -> F,
90        F: Future<Output = ()>,
91    {
92        // Highly inspired by https://github.com/Riateche/stream_generator.
93        let (tx, rx) = mpsc::channel(0);
94        let gen = generator(Yielder(tx));
95        let fake = stream::once(gen).filter_map(|_| async { None });
96        Stream::new(stream::select(fake, rx))
97    }
98}
99
100#[sealed]
101impl<S> crate::source::Source for Stream<S>
102where
103    S: futures::Stream,
104    S::Item: StreamItem,
105{
106    fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
107        let mut state = self.0.lock();
108
109        let stream = match &mut *state {
110            StreamState::Active(stream) => stream,
111            StreamState::Closed => return Poll::Pending, // TODO: `Poll::Ready(None)`?
112        };
113
114        // TODO: should we poll streams in a separate scope?
115        match stream.as_mut().poll_next(cx) {
116            Poll::Ready(Some(item)) => Poll::Ready(Some(item.unify())),
117            Poll::Ready(None) => {
118                *state = StreamState::Closed;
119                Poll::Ready(None)
120            }
121            Poll::Pending => Poll::Pending,
122        }
123    }
124}
125
126// === Yielder ===
127
128/// A handle for emitting messages from [`Stream::generate`].
129pub struct Yielder(mpsc::Sender<Envelope>);
130
131impl Yielder {
132    /// Emits a message from the generated stream.
133    pub async fn emit<M: Message>(&mut self, message: M) {
134        let _ = self.0.send(message.unify()).await;
135    }
136}
137
138// === StreamItem ===
139
140#[doc(hidden)]
141#[sealed]
142pub trait StreamItem {
143    #[doc(hidden)]
144    fn unify(self) -> Envelope;
145}
146
147// TODO(v0.2): it's inconsistent with `ctx.send()` that accepts only messages.
148#[doc(hidden)]
149#[sealed]
150impl StreamItem for Envelope {
151    #[doc(hidden)]
152    fn unify(self) -> Envelope {
153        self
154    }
155}
156
157// TODO(v0.2): remove it, use explicit `scope::set_trace_id()` instead.
158#[doc(hidden)]
159#[sealed]
160impl<M: Message> StreamItem for (TraceId, M) {
161    #[doc(hidden)]
162    fn unify(self) -> Envelope {
163        let kind = MessageKind::Regular { sender: Addr::NULL };
164        Envelope::with_trace_id(self.1, kind, self.0).upcast()
165    }
166}
167
168#[doc(hidden)]
169#[sealed]
170impl<M: Message> StreamItem for M {
171    #[doc(hidden)]
172    fn unify(self) -> Envelope {
173        (TraceId::generate(), self).unify()
174    }
175}