intrepid_core/action/active/
active_action.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use futures::{channel::mpsc, Stream, StreamExt};
use tower::{Service, ServiceExt};

use crate::{Action, ActionService, Frame, FrameFuture, FrameOutbox};

/// A convenience type alias for an active action.
pub type ActiveAction = Action<super::Active>;

impl ActiveAction {
    /// Create an active actionable from a given action service.
    pub fn active(service: ActionService) -> Self {
        Self {
            action_state: service.into(),
        }
    }

    /// Get a reference to the inner action service.
    pub fn into_inner(self) -> ActionService {
        self.action_state.0
    }

    /// Handle a frame using an active actionable.
    pub fn handle_frame(&mut self, frame: Frame) -> FrameFuture {
        FrameFuture::new(self.action_state.0.call(frame))
    }

    /// Turn the active actionable into a stream that emits frames returned
    /// by the interior service.
    ///
    /// Take an active actionable into a stream and a stream handle. The stream handle can be used
    /// in one part of an application to send frames to the actionable wherever it is being used, and
    /// the stream will yield any frames returned from the actionable. This is useful for creating
    /// background tasks out of either the handle or the stream itself, for example to create a frame
    /// emitter out of an external event source.
    ///
    /// In order for the stream to be consumed, it needs to be pinned first. This can be done by
    /// calling `tokio::pin!` on the stream before consuming it.
    ///
    /// # Example
    ///
    /// ```rust
    /// use futures::StreamExt;
    /// use intrepid::{Action, Frame, IntoFrame, Service};
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let action = |name: String| async move { format!("Hello, {}!", name).into_frame() };
    ///
    ///     let actionable = action.as_into_actionable().into_actionable(());
    ///
    ///     let (mut stream, mut handle) = actionable.into_stream();
    ///
    ///     tokio::spawn(async move {
    ///         handle.send("Alice".to_string()).await.unwrap();
    ///         handle.send("Bob".to_string()).await.unwrap();
    ///     });
    ///
    ///     tokio::pin!(stream);
    ///
    ///     while let Some(frame) = stream.next().await {
    ///         println!("{:?}", frame);
    ///     }
    /// }
    /// ```
    ///
    pub fn into_stream(self) -> (impl Stream<Item = crate::Result<Frame>>, FrameOutbox) {
        let (tx, rx) = mpsc::channel(16);
        let mut stream = self.call_all(rx);

        let handle = FrameOutbox::new(tx);

        let stream = async_stream::stream! {
            while let Some(frame) = stream.next().await {
                yield Ok(frame.map_err(crate::errors::StreamError)?);
            }
        };

        (stream, handle)
    }
}

impl From<ActionService> for ActiveAction {
    fn from(action_service: ActionService) -> Self {
        Self::active(action_service)
    }
}

impl<IntoFrame> Service<IntoFrame> for ActiveAction
where
    IntoFrame: Into<Frame> + Clone + Send + 'static,
{
    type Response = Frame;
    type Error = crate::Error;
    type Future = FrameFuture;

    fn poll_ready(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        self.action_state.0.poll_ready(cx)
    }

    fn call(&mut self, frame: IntoFrame) -> Self::Future {
        self.handle_frame(frame.into())
    }
}