intrepid_core/action/active/active_action.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
use futures::{channel::mpsc, Stream, StreamExt};
use tower::{Service, ServiceExt};
use crate::{Action, ActionService, Frame, FrameFuture, FrameOutbox};
/// A convenience type alias for an active action.
pub type ActiveAction = Action<super::Active>;
impl ActiveAction {
/// Create an active actionable from a given action service.
pub fn active(service: ActionService) -> Self {
Self {
action_state: service.into(),
}
}
/// Get a reference to the inner action service.
pub fn into_inner(self) -> ActionService {
self.action_state.0
}
/// Handle a frame using an active actionable.
pub fn handle_frame(&mut self, frame: Frame) -> FrameFuture {
FrameFuture::new(self.action_state.0.call(frame))
}
/// Turn the active actionable into a stream that emits frames returned
/// by the interior service.
///
/// Take an active actionable into a stream and a stream handle. The stream handle can be used
/// in one part of an application to send frames to the actionable wherever it is being used, and
/// the stream will yield any frames returned from the actionable. This is useful for creating
/// background tasks out of either the handle or the stream itself, for example to create a frame
/// emitter out of an external event source.
///
/// In order for the stream to be consumed, it needs to be pinned first. This can be done by
/// calling `tokio::pin!` on the stream before consuming it.
///
/// # Example
///
/// ```rust
/// use futures::StreamExt;
/// use intrepid::{Action, Frame, IntoFrame, Service};
///
/// #[tokio::main]
/// async fn main() {
/// let action = |name: String| async move { format!("Hello, {}!", name).into_frame() };
///
/// let actionable = action.as_into_actionable().into_actionable(());
///
/// let (mut stream, mut handle) = actionable.into_stream();
///
/// tokio::spawn(async move {
/// handle.send("Alice".to_string()).await.unwrap();
/// handle.send("Bob".to_string()).await.unwrap();
/// });
///
/// tokio::pin!(stream);
///
/// while let Some(frame) = stream.next().await {
/// println!("{:?}", frame);
/// }
/// }
/// ```
///
pub fn into_stream(self) -> (impl Stream<Item = crate::Result<Frame>>, FrameOutbox) {
let (tx, rx) = mpsc::channel(16);
let mut stream = self.call_all(rx);
let handle = FrameOutbox::new(tx);
let stream = async_stream::stream! {
while let Some(frame) = stream.next().await {
yield Ok(frame.map_err(crate::errors::StreamError)?);
}
};
(stream, handle)
}
}
impl From<ActionService> for ActiveAction {
fn from(action_service: ActionService) -> Self {
Self::active(action_service)
}
}
impl<IntoFrame> Service<IntoFrame> for ActiveAction
where
IntoFrame: Into<Frame> + Clone + Send + 'static,
{
type Response = Frame;
type Error = crate::Error;
type Future = FrameFuture;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.action_state.0.poll_ready(cx)
}
fn call(&mut self, frame: IntoFrame) -> Self::Future {
self.handle_frame(frame.into())
}
}