Skip to main content

co_actor/epic/
actions.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{BoxEpic, Epic, EpicExt, MergeEpic};
5use anyhow::anyhow;
6use futures::{
7	channel::oneshot,
8	pin_mut,
9	stream::{self},
10	FutureExt, Stream, StreamExt,
11};
12use std::{
13	future::{ready, Future},
14	mem::take,
15	ops::DerefMut,
16	sync::{Arc, Mutex},
17};
18
19pub struct Actions<A, S, C> {
20	pending: Arc<Mutex<Vec<BoxEpic<'static, A, S, C>>>>,
21}
22impl<A, S, C> Clone for Actions<A, S, C> {
23	fn clone(&self) -> Self {
24		Self { pending: self.pending.clone() }
25	}
26}
27impl<A, S, C> Default for Actions<A, S, C> {
28	fn default() -> Self {
29		Self { pending: Arc::new(Mutex::new(Default::default())) }
30	}
31}
32impl<A, S, C> Actions<A, S, C>
33where
34	A: Clone + Send + 'static,
35{
36	/// Wait once the epic emits its first action, remove the epic and return the action.
37	/// This is guarantted to see all actions that are dispatched after this call has returned the future.
38	pub fn once_epic<E>(&self, epic: E) -> impl Future<Output = Result<A, anyhow::Error>> + use<A, S, C, E>
39	where
40		E: EpicExt<A, S, C> + Send + 'static,
41	{
42		let (tx, rx) = oneshot::channel();
43
44		// add
45		{
46			self.pending
47				.lock()
48				.unwrap()
49				.push(OneshotEpic { epic, sender: Some(tx) }.boxed());
50		}
51
52		// wait
53		async move { rx.await? }
54	}
55
56	/// Wait for predicate to match once and return the action it mached.
57	/// This is guarantted to see all actions that are dispatched after this call has returned the future.
58	pub fn once<F>(&self, predicate: F) -> impl Future<Output = Result<A, anyhow::Error>> + use<A, S, C, F>
59	where
60		F: for<'a> Fn(&'a A) -> bool + Send + 'static,
61	{
62		self.once_epic(FilterEpic(predicate))
63	}
64
65	/// Wait for map to match once and return the mapped value of the action.
66	/// This is guarantted to see all actions that are dispatched after this call has returned the future.
67	pub fn once_map<F, O>(&self, map: F) -> impl Future<Output = Result<O, anyhow::Error>> + use<A, S, C, F, O>
68	where
69		F: (for<'a> Fn(&'a A) -> Option<O>) + Clone + Send + 'static,
70	{
71		let action = self.once_epic(FilterEpic({
72			let map = map.clone();
73			move |action: &A| -> bool { map(action).is_some() }
74		}));
75		async move {
76			let action = action.await?;
77			map(&action).ok_or(anyhow!("Expected preficate to return some output"))
78		}
79	}
80}
81
82/// Action handle.
83pub struct ActionsEpic<A, S, C> {
84	inner: MergeEpic<A, S, C>,
85	api: Actions<A, S, C>,
86}
87impl<A, S, C> Default for ActionsEpic<A, S, C> {
88	fn default() -> Self {
89		Self { inner: MergeEpic::new(), api: Default::default() }
90	}
91}
92impl<A, S, C> ActionsEpic<A, S, C>
93where
94	A: Clone + Send + 'static,
95{
96	pub fn actions(&self) -> Actions<A, S, C> {
97		self.api.clone()
98	}
99}
100impl<A, S, C> Epic<A, S, C> for ActionsEpic<A, S, C>
101where
102	A: Send + 'static,
103{
104	fn epic(
105		&mut self,
106		actions: &Actions<A, S, C>,
107		action: &A,
108		state: &S,
109		context: &C,
110	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
111		// move
112		let pending = { take(self.api.pending.lock().unwrap().deref_mut()) };
113		for item in pending {
114			self.inner.box_push(item);
115		}
116
117		// execute
118		let stream = self.inner.epic(actions, action, state, context).map(|s| s.boxed());
119
120		// drain
121		self.inner.drain_terminated();
122
123		// result
124		stream
125	}
126}
127
128/// If predicate F matches emiit the action.
129struct FilterEpic<F>(F);
130impl<F, A, S, C> Epic<A, S, C> for FilterEpic<F>
131where
132	F: Fn(&A) -> bool + Send + 'static,
133	A: Clone + Send + 'static,
134{
135	fn epic(
136		&mut self,
137		_actions: &Actions<A, S, C>,
138		action: &A,
139		_state: &S,
140		_context: &C,
141	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
142		if (self.0)(action) {
143			Some(stream::iter([Ok(action.clone())]))
144		} else {
145			None
146		}
147	}
148}
149
150/// This epic will never emit but send to channel once the inner epic returns a stream.
151struct OneshotEpic<E, A> {
152	epic: E,
153	sender: Option<oneshot::Sender<Result<A, anyhow::Error>>>,
154}
155impl<E, A, S, C> Epic<A, S, C> for OneshotEpic<E, A>
156where
157	E: Epic<A, S, C>,
158	A: Clone + Send + 'static,
159{
160	fn epic(
161		&mut self,
162		actions: &Actions<A, S, C>,
163		action: &A,
164		state: &S,
165		context: &C,
166	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
167		if self.sender.is_some() {
168			if let Some(stream) = self.epic.epic(actions, action, state, context) {
169				if let Some(sender) = self.sender.take() {
170					return Some(
171						async move {
172							pin_mut!(stream);
173							if let Some(action) = stream.next().await {
174								sender.send(action).ok();
175							}
176						}
177						.into_stream()
178						.filter_map(|_| ready(None)),
179					);
180				}
181			}
182		}
183		None
184	}
185}
186
187#[cfg(test)]
188mod tests {
189	use crate::{epic::actions::ActionsEpic, Actions, Epic, EpicExt};
190	use futures::{pin_mut, stream::select, FutureExt, Stream, TryStreamExt};
191
192	#[derive(Debug, Clone, PartialEq)]
193	enum TestAction {
194		Greet,
195		Hello,
196		World,
197	}
198	struct Test {}
199	impl Epic<TestAction, (), ()> for Test {
200		fn epic(
201			&mut self,
202			actions: &Actions<TestAction, (), ()>,
203			action: &TestAction,
204			_state: &(),
205			_context: &(),
206		) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
207			match action {
208				TestAction::Greet => Some({
209					let actions = actions.clone();
210					let answer_with_world = async move {
211						let once_world = actions.once(|a| matches!(a, TestAction::Hello));
212
213						// wait for world action
214						once_world.await?;
215
216						// greet
217						Ok(TestAction::World)
218					}
219					.into_stream();
220					let hello = async_stream::stream! { yield Ok(TestAction::Hello);};
221					select(answer_with_world, hello)
222				}),
223				_ => None,
224			}
225		}
226	}
227
228	#[tokio::test]
229	async fn test_once() {
230		let actions_epic = ActionsEpic::default();
231		let actions = actions_epic.actions();
232		let test_epic = Test {};
233		let mut epic = actions_epic.join(test_epic);
234		let stream = epic.epic(&actions, &TestAction::Greet, &(), &()).expect("a stream");
235		let mut result = Vec::new();
236		pin_mut!(stream);
237		while let Some(action) = stream.try_next().await.unwrap() {
238			result.push(action.clone());
239			if let Some(epic_actions) = epic.epic(&actions, &action, &(), &()) {
240				let mut epic_actions = epic_actions.try_collect::<Vec<TestAction>>().await.unwrap();
241				result.append(&mut epic_actions);
242			}
243		}
244		assert_eq!(result, vec![TestAction::Hello, TestAction::World]);
245	}
246}