intrepid_core/action/active/
active_action.rs

1use futures::{channel::mpsc, Stream, StreamExt};
2use tower::{Service, ServiceExt};
3
4use crate::{Action, ActionService, Frame, FrameFuture, FrameOutbox};
5
6/// A convenience type alias for an active action.
7pub type ActiveAction = Action<super::Active>;
8
9impl ActiveAction {
10    /// Create an active actionable from a given action service.
11    pub fn active(service: ActionService) -> Self {
12        Self {
13            action_state: service.into(),
14        }
15    }
16
17    /// Get a reference to the inner action service.
18    pub fn into_inner(self) -> ActionService {
19        self.action_state.0
20    }
21
22    /// Handle a frame using an active actionable.
23    pub fn handle_frame(&mut self, frame: Frame) -> FrameFuture {
24        FrameFuture::new(self.action_state.0.call(frame))
25    }
26
27    /// Turn the active actionable into a stream that emits frames returned
28    /// by the interior service.
29    ///
30    /// Take an active actionable into a stream and a stream handle. The stream handle can be used
31    /// in one part of an application to send frames to the actionable wherever it is being used, and
32    /// the stream will yield any frames returned from the actionable. This is useful for creating
33    /// background tasks out of either the handle or the stream itself, for example to create a frame
34    /// emitter out of an external event source.
35    ///
36    /// In order for the stream to be consumed, it needs to be pinned first. This can be done by
37    /// calling `tokio::pin!` on the stream before consuming it.
38    ///
39    /// # Example
40    ///
41    /// ```rust
42    /// use futures::StreamExt;
43    /// use intrepid::{Action, Frame, IntoFrame, Service};
44    ///
45    /// #[tokio::main]
46    /// async fn main() {
47    ///     let action = |name: String| async move { format!("Hello, {}!", name).into_frame() };
48    ///
49    ///     let actionable = action.as_into_actionable().into_actionable(());
50    ///
51    ///     let (mut stream, mut handle) = actionable.into_stream();
52    ///
53    ///     tokio::spawn(async move {
54    ///         handle.send("Alice".to_string()).await.unwrap();
55    ///         handle.send("Bob".to_string()).await.unwrap();
56    ///     });
57    ///
58    ///     tokio::pin!(stream);
59    ///
60    ///     while let Some(frame) = stream.next().await {
61    ///         println!("{:?}", frame);
62    ///     }
63    /// }
64    /// ```
65    ///
66    pub fn into_stream(self) -> (impl Stream<Item = crate::Result<Frame>>, FrameOutbox) {
67        let (tx, rx) = mpsc::channel(16);
68        let mut stream = self.call_all(rx);
69
70        let handle = FrameOutbox::new(tx);
71
72        let stream = async_stream::stream! {
73            while let Some(frame) = stream.next().await {
74                yield Ok(frame.map_err(crate::errors::StreamError)?);
75            }
76        };
77
78        (stream, handle)
79    }
80}
81
82impl From<ActionService> for ActiveAction {
83    fn from(action_service: ActionService) -> Self {
84        Self::active(action_service)
85    }
86}
87
88impl<IntoFrame> Service<IntoFrame> for ActiveAction
89where
90    IntoFrame: Into<Frame> + Clone + Send + 'static,
91{
92    type Response = Frame;
93    type Error = crate::Error;
94    type Future = FrameFuture;
95
96    fn poll_ready(
97        &mut self,
98        cx: &mut std::task::Context<'_>,
99    ) -> std::task::Poll<Result<(), Self::Error>> {
100        self.action_state.0.poll_ready(cx)
101    }
102
103    fn call(&mut self, frame: IntoFrame) -> Self::Future {
104        self.handle_frame(frame.into())
105    }
106}