intrepid_core/action/active/active_action.rs
1use futures::{channel::mpsc, Stream, StreamExt};
2use tower::{Service, ServiceExt};
3
4use crate::{Action, ActionService, Frame, FrameFuture, FrameOutbox};
5
6/// A convenience type alias for an active action.
7pub type ActiveAction = Action<super::Active>;
8
9impl ActiveAction {
10 /// Create an active actionable from a given action service.
11 pub fn active(service: ActionService) -> Self {
12 Self {
13 action_state: service.into(),
14 }
15 }
16
17 /// Get a reference to the inner action service.
18 pub fn into_inner(self) -> ActionService {
19 self.action_state.0
20 }
21
22 /// Handle a frame using an active actionable.
23 pub fn handle_frame(&mut self, frame: Frame) -> FrameFuture {
24 FrameFuture::new(self.action_state.0.call(frame))
25 }
26
27 /// Turn the active actionable into a stream that emits frames returned
28 /// by the interior service.
29 ///
30 /// Take an active actionable into a stream and a stream handle. The stream handle can be used
31 /// in one part of an application to send frames to the actionable wherever it is being used, and
32 /// the stream will yield any frames returned from the actionable. This is useful for creating
33 /// background tasks out of either the handle or the stream itself, for example to create a frame
34 /// emitter out of an external event source.
35 ///
36 /// In order for the stream to be consumed, it needs to be pinned first. This can be done by
37 /// calling `tokio::pin!` on the stream before consuming it.
38 ///
39 /// # Example
40 ///
41 /// ```rust
42 /// use futures::StreamExt;
43 /// use intrepid::{Action, Frame, IntoFrame, Service};
44 ///
45 /// #[tokio::main]
46 /// async fn main() {
47 /// let action = |name: String| async move { format!("Hello, {}!", name).into_frame() };
48 ///
49 /// let actionable = action.as_into_actionable().into_actionable(());
50 ///
51 /// let (mut stream, mut handle) = actionable.into_stream();
52 ///
53 /// tokio::spawn(async move {
54 /// handle.send("Alice".to_string()).await.unwrap();
55 /// handle.send("Bob".to_string()).await.unwrap();
56 /// });
57 ///
58 /// tokio::pin!(stream);
59 ///
60 /// while let Some(frame) = stream.next().await {
61 /// println!("{:?}", frame);
62 /// }
63 /// }
64 /// ```
65 ///
66 pub fn into_stream(self) -> (impl Stream<Item = crate::Result<Frame>>, FrameOutbox) {
67 let (tx, rx) = mpsc::channel(16);
68 let mut stream = self.call_all(rx);
69
70 let handle = FrameOutbox::new(tx);
71
72 let stream = async_stream::stream! {
73 while let Some(frame) = stream.next().await {
74 yield Ok(frame.map_err(crate::errors::StreamError)?);
75 }
76 };
77
78 (stream, handle)
79 }
80}
81
82impl From<ActionService> for ActiveAction {
83 fn from(action_service: ActionService) -> Self {
84 Self::active(action_service)
85 }
86}
87
88impl<IntoFrame> Service<IntoFrame> for ActiveAction
89where
90 IntoFrame: Into<Frame> + Clone + Send + 'static,
91{
92 type Response = Frame;
93 type Error = crate::Error;
94 type Future = FrameFuture;
95
96 fn poll_ready(
97 &mut self,
98 cx: &mut std::task::Context<'_>,
99 ) -> std::task::Poll<Result<(), Self::Error>> {
100 self.action_state.0.poll_ready(cx)
101 }
102
103 fn call(&mut self, frame: IntoFrame) -> Self::Future {
104 self.handle_frame(frame.into())
105 }
106}