1use std::{
2 future::Future,
3 mem,
4 pin::Pin,
5 task::{self, Poll},
6};
7
8use futures::{self, channel::mpsc, sink::SinkExt as _, stream, stream::StreamExt as _};
9use parking_lot::Mutex;
10use sealed::sealed;
11
12use crate::{
13 addr::Addr,
14 envelope::{Envelope, MessageKind},
15 message::Message,
16 tracing::TraceId,
17};
18
19pub struct Stream<S>(Mutex<StreamState<S>>);
25
26enum StreamState<S> {
27 Active(Pin<Box<S>>),
28 Closed,
29}
30
31impl<S> Stream<S> {
32 pub fn new(stream: S) -> Self {
34 Self(Mutex::new(StreamState::Active(Box::pin(stream))))
35 }
36
37 pub fn set(&self, stream: S) {
39 *self.0.lock() = StreamState::Active(Box::pin(stream));
40 }
41
42 pub fn replace(&self, stream: S) -> Option<S>
44 where
45 S: Unpin,
46 {
47 let new_state = StreamState::Active(Box::pin(stream));
48 match mem::replace(&mut *self.0.lock(), new_state) {
49 StreamState::Active(stream) => Some(*Pin::into_inner(stream)),
50 StreamState::Closed => None,
51 }
52 }
53
54 pub fn close(&self) -> bool {
58 !matches!(
59 mem::replace(&mut *self.0.lock(), StreamState::Closed),
60 StreamState::Closed
61 )
62 }
63}
64
65impl Stream<()> {
66 pub fn generate<G, F>(generator: G) -> Stream<impl futures::Stream<Item = Envelope>>
88 where
89 G: FnOnce(Yielder) -> F,
90 F: Future<Output = ()>,
91 {
92 let (tx, rx) = mpsc::channel(0);
94 let gen = generator(Yielder(tx));
95 let fake = stream::once(gen).filter_map(|_| async { None });
96 Stream::new(stream::select(fake, rx))
97 }
98}
99
100#[sealed]
101impl<S> crate::source::Source for Stream<S>
102where
103 S: futures::Stream,
104 S::Item: StreamItem,
105{
106 fn poll_recv(&self, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
107 let mut state = self.0.lock();
108
109 let stream = match &mut *state {
110 StreamState::Active(stream) => stream,
111 StreamState::Closed => return Poll::Pending, };
113
114 match stream.as_mut().poll_next(cx) {
116 Poll::Ready(Some(item)) => Poll::Ready(Some(item.unify())),
117 Poll::Ready(None) => {
118 *state = StreamState::Closed;
119 Poll::Ready(None)
120 }
121 Poll::Pending => Poll::Pending,
122 }
123 }
124}
125
126pub struct Yielder(mpsc::Sender<Envelope>);
130
131impl Yielder {
132 pub async fn emit<M: Message>(&mut self, message: M) {
134 let _ = self.0.send(message.unify()).await;
135 }
136}
137
138#[doc(hidden)]
141#[sealed]
142pub trait StreamItem {
143 #[doc(hidden)]
144 fn unify(self) -> Envelope;
145}
146
147#[doc(hidden)]
149#[sealed]
150impl StreamItem for Envelope {
151 #[doc(hidden)]
152 fn unify(self) -> Envelope {
153 self
154 }
155}
156
157#[doc(hidden)]
159#[sealed]
160impl<M: Message> StreamItem for (TraceId, M) {
161 #[doc(hidden)]
162 fn unify(self) -> Envelope {
163 let kind = MessageKind::Regular { sender: Addr::NULL };
164 Envelope::with_trace_id(self.1, kind, self.0).upcast()
165 }
166}
167
168#[doc(hidden)]
169#[sealed]
170impl<M: Message> StreamItem for M {
171 #[doc(hidden)]
172 fn unify(self) -> Envelope {
173 (TraceId::generate(), self).unify()
174 }
175}