pub struct Stream<M = AnyMessage> { /* private fields */ }Expand description
A source that emits messages from a provided stream or future.
Possible items of a stream (the M parameter):
- Any instance of
Message. Result<impl Message, impl Message>.
Note: the new() constructor is reserved until AsyncIterator is
stabilized.
All wrapped streams and futures are fused by the implementation.
Note: Stream::is_terminated() and Stream::terminate()
cannot be called inside the stream, because it leads to a deadlock.
§Tracing
- If the stream created using
Stream::from_futures03(), every message starts a new trace. - If created using
Stream::once(), the current trace is preserved. - If created using
Stream::generate(), the current trace is preserved.
You can always use scope::set_trace_id() to override the current trace.
§Examples
Create a stream based on futures::Stream:
use elfo::stream::Stream;
#[message]
struct MyItem(u32);
let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
ctx.attach(Stream::from_futures03(stream));
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
MyItem => { /* ... */ },
});
}Perform a background request:
use elfo::stream::Stream;
#[message]
struct DataFetched(u32);
#[message]
struct FetchDataFailed(String);
async fn fetch_data() -> Result<DataFetched, FetchDataFailed> {
// ...
}
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeEvent => {
ctx.attach(Stream::once(fetch_data()));
},
DataFetched => { /* ... */ },
FetchDataFailed => { /* ... */ },
});
}Generate a stream (an alternative to async-stream):
use elfo::stream::Stream;
#[message]
struct SomeMessage(u32);
#[message]
struct AnotherMessage;
ctx.attach(Stream::generate(|mut e| async move {
e.emit(SomeMessage(42)).await;
e.emit(AnotherMessage).await;
}));
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
SomeMessage(no) | AnotherMessage => { /* ... */ },
});
}Implementations§
Source§impl<M: StreamItem> Stream<M>
impl<M: StreamItem> Stream<M>
Sourcepub fn from_futures03<S>(stream: S) -> UnattachedSource<Self>
pub fn from_futures03<S>(stream: S) -> UnattachedSource<Self>
Creates an unattached source based on the provided futures::Stream.
Sourcepub fn once<F>(future: F) -> UnattachedSource<Self>
pub fn once<F>(future: F) -> UnattachedSource<Self>
Creates an uattached source based on the provided future.
Source§impl Stream<AnyMessage>
impl Stream<AnyMessage>
Sourcepub fn generate<G, F>(generator: G) -> UnattachedSource<Self>
pub fn generate<G, F>(generator: G) -> UnattachedSource<Self>
Generates a stream from the provided generator.
The generator receives Emitter as an argument and should return a
future that will produce messages by using Emitter::emit.
Trait Implementations§
Source§impl<M: StreamItem> SourceHandle for Stream<M>
impl<M: StreamItem> SourceHandle for Stream<M>
Source§fn is_terminated(&self) -> bool
fn is_terminated(&self) -> bool
true if the source has stopped producing messages.Source§fn terminate_by_ref(&self) -> bool
fn terminate_by_ref(&self) -> bool
Drop is called immediately. Read more