pub struct Stream<M = AnyMessage> { /* private fields */ }
Expand description
A source that emits messages from a provided stream or future.
Possible items of a stream (the M
parameter):
- Any instance of
Message
. Result<impl Message, impl Message>
.
Note: the new()
constructor is reserved until AsyncIterator
is
stabilized.
All wrapped streams and futures are fused by the implementation.
Note: [Stream::is_terminated()
] and [Stream::terminate()
] cannot be
called inside the stream, because it leads to a deadlock.
Tracing
- If the stream created using
Stream::from_futures03()
, every message starts a new trace. - If created using
Stream::once()
, the current trace is preserved. - If created using
Stream::generate()
, the current trace is preserved.
You can always use scope::set_trace_id()
to override the current trace.
Examples
Create a stream based on [futures::Stream
]:
use elfo::stream::Stream;
#[message]
struct MyItem(u32);
let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
ctx.attach(Stream::from_futures03(stream));
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
MyItem => { /* ... */ },
});
}
Perform a background request:
use elfo::stream::Stream;
#[message]
struct DataFetched(u32);
#[message]
struct FetchDataFailed(String);
async fn fetch_data() -> Result<DataFetched, FetchDataFailed> {
// ...
}
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeEvent => {
ctx.attach(Stream::once(fetch_data()));
},
DataFetched => { /* ... */ },
FetchDataFailed => { /* ... */ },
});
}
Generate a stream (an alternative to async-stream
):
use elfo::stream::Stream;
#[message]
struct SomeMessage(u32);
#[message]
struct AnotherMessage;
ctx.attach(Stream::generate(|mut e| async move {
e.emit(SomeMessage(42)).await;
e.emit(AnotherMessage).await;
}));
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeMessage(no) | AnotherMessage => { /* ... */ },
});
}
Implementations§
source§impl<M> Stream<M>where
M: StreamItem,
impl<M> Stream<M>where M: StreamItem,
sourcepub fn from_futures03<S>(stream: S) -> UnattachedSource<Stream<M>>where
S: Stream<Item = M> + Send + 'static,
pub fn from_futures03<S>(stream: S) -> UnattachedSource<Stream<M>>where S: Stream<Item = M> + Send + 'static,
Creates an unattached source based on the provided [futures::Stream
].
source§impl Stream<AnyMessage>
impl Stream<AnyMessage>
sourcepub fn generate<G, F>(generator: G) -> UnattachedSource<Stream<AnyMessage>>where
G: FnOnce(Emitter) -> F,
F: Future<Output = ()> + Send + 'static,
pub fn generate<G, F>(generator: G) -> UnattachedSource<Stream<AnyMessage>>where G: FnOnce(Emitter) -> F, F: Future<Output = ()> + Send + 'static,
Generates a stream from the provided generator.
The generator receives Emitter
as an argument and should return a
future that will produce messages by using Emitter::emit
.