Skip to main content

co_actor/epic/
action_dispatch.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{Actions, TaskSpawner};
5use futures::{channel::mpsc::UnboundedSender, Stream};
6use std::future::Future;
7
8/// Action Dispatch.
9/// Encasulate epic logic in a single future.
10pub struct ActionDispatch<A, S, C> {
11	actions: Actions<A, S, C>,
12	tx: UnboundedSender<Result<A, anyhow::Error>>,
13}
14impl<A, S, C> Clone for ActionDispatch<A, S, C> {
15	fn clone(&self) -> Self {
16		Self { actions: self.actions.clone(), tx: self.tx.clone() }
17	}
18}
19impl<A, S, C> ActionDispatch<A, S, C>
20where
21	A: Clone + Send + 'static,
22	S: Send + 'static,
23	C: Send + 'static,
24{
25	/// Execute a stateful futures that dispatches/reacts to actions.
26	pub fn execute<F, Fut>(
27		actions: Actions<A, S, C>,
28		spawner: TaskSpawner,
29		f: F,
30	) -> impl Stream<Item = Result<A, anyhow::Error>>
31	where
32		F: FnOnce(Self) -> Fut,
33		Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
34	{
35		let (tx, rx) = futures::channel::mpsc::unbounded();
36		let dispatch = ActionDispatch { actions, tx };
37		let fut = f(dispatch.clone());
38		spawner.spawn(async move {
39			match fut.await {
40				Ok(_) => {},
41				Err(err) => {
42					dispatch.dispatch_result(Err(err));
43				},
44			}
45		});
46		rx
47	}
48
49	/// Execute a stateful futures that dispatches/reacts to actions with an final result action.
50	pub fn execute_with_response<F, Fut, R, O, E>(
51		actions: Actions<A, S, C>,
52		spawner: TaskSpawner,
53		f: F,
54		response: R,
55	) -> impl Stream<Item = Result<A, anyhow::Error>>
56	where
57		F: FnOnce(Self) -> Fut,
58		Fut: Future<Output = Result<O, E>> + Send + 'static,
59		R: FnOnce(Result<O, E>) -> A + Send + 'static,
60	{
61		let (tx, rx) = futures::channel::mpsc::unbounded();
62		let dispatch = ActionDispatch { actions, tx };
63		let fut = f(dispatch.clone());
64		spawner.spawn(async move {
65			dispatch.dispatch(response(fut.await));
66		});
67		rx
68	}
69
70	/// Dispatch an action.
71	/// Actions are dispatched immediately.
72	pub fn dispatch(&self, item: A) -> bool {
73		self.dispatch_result(Ok(item))
74	}
75
76	/// Dispatch an action result.
77	/// Actions are dispatched immediately.
78	pub fn dispatch_result(&self, item: Result<A, anyhow::Error>) -> bool {
79		self.tx.unbounded_send(item).is_ok()
80	}
81
82	/// Request/Response.
83	/// Dispatch `request` and wait for first `response`.
84	pub async fn request<F, O>(&self, request: A, response: F) -> Result<O, anyhow::Error>
85	where
86		F: (for<'a> Fn(&'a A) -> Option<O>) + Clone + Send + 'static,
87	{
88		let response_fut = self.actions.once_map(response);
89		self.dispatch(request);
90		response_fut.await
91	}
92}