co_actor/epic/
action_dispatch.rs1use crate::{Actions, TaskSpawner};
5use futures::{channel::mpsc::UnboundedSender, Stream};
6use std::future::Future;
7
8pub struct ActionDispatch<A, S, C> {
11 actions: Actions<A, S, C>,
12 tx: UnboundedSender<Result<A, anyhow::Error>>,
13}
14impl<A, S, C> Clone for ActionDispatch<A, S, C> {
15 fn clone(&self) -> Self {
16 Self { actions: self.actions.clone(), tx: self.tx.clone() }
17 }
18}
19impl<A, S, C> ActionDispatch<A, S, C>
20where
21 A: Clone + Send + 'static,
22 S: Send + 'static,
23 C: Send + 'static,
24{
25 pub fn execute<F, Fut>(
27 actions: Actions<A, S, C>,
28 spawner: TaskSpawner,
29 f: F,
30 ) -> impl Stream<Item = Result<A, anyhow::Error>>
31 where
32 F: FnOnce(Self) -> Fut,
33 Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
34 {
35 let (tx, rx) = futures::channel::mpsc::unbounded();
36 let dispatch = ActionDispatch { actions, tx };
37 let fut = f(dispatch.clone());
38 spawner.spawn(async move {
39 match fut.await {
40 Ok(_) => {},
41 Err(err) => {
42 dispatch.dispatch_result(Err(err));
43 },
44 }
45 });
46 rx
47 }
48
49 pub fn execute_with_response<F, Fut, R, O, E>(
51 actions: Actions<A, S, C>,
52 spawner: TaskSpawner,
53 f: F,
54 response: R,
55 ) -> impl Stream<Item = Result<A, anyhow::Error>>
56 where
57 F: FnOnce(Self) -> Fut,
58 Fut: Future<Output = Result<O, E>> + Send + 'static,
59 R: FnOnce(Result<O, E>) -> A + Send + 'static,
60 {
61 let (tx, rx) = futures::channel::mpsc::unbounded();
62 let dispatch = ActionDispatch { actions, tx };
63 let fut = f(dispatch.clone());
64 spawner.spawn(async move {
65 dispatch.dispatch(response(fut.await));
66 });
67 rx
68 }
69
70 pub fn dispatch(&self, item: A) -> bool {
73 self.dispatch_result(Ok(item))
74 }
75
76 pub fn dispatch_result(&self, item: Result<A, anyhow::Error>) -> bool {
79 self.tx.unbounded_send(item).is_ok()
80 }
81
82 pub async fn request<F, O>(&self, request: A, response: F) -> Result<O, anyhow::Error>
85 where
86 F: (for<'a> Fn(&'a A) -> Option<O>) + Clone + Send + 'static,
87 {
88 let response_fut = self.actions.once_map(response);
89 self.dispatch(request);
90 response_fut.await
91 }
92}