Skip to main content

co_actor/
epic.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use super::ActorHandle;
5use crate::{epic::actions::ActionsEpic, TaskSpawner};
6use co_primitives::Tags;
7use futures::{
8	pin_mut,
9	stream::{self, BoxStream, Empty},
10	Stream, StreamExt,
11};
12use std::{
13	any::type_name,
14	fmt::Debug,
15	marker::{PhantomData, Send},
16	sync::Arc,
17};
18use tokio_util::sync::CancellationToken;
19
20mod action_dispatch;
21mod actions;
22
23pub use action_dispatch::ActionDispatch;
24pub use actions::Actions;
25
26/// Epic.
27///
28/// Defines side effects for actions which will produce other actions over time.
29pub trait Epic<A, S, C> {
30	/// Run the epic.
31	///
32	/// # Arguments
33	/// - `state`: The state after the action has been applied.
34	fn epic(
35		&mut self,
36		actions: &Actions<A, S, C>,
37		action: &A,
38		state: &S,
39		context: &C,
40	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static>;
41
42	/// Whether this epic is terminated and should be not be called futher.
43	fn is_terminated(&self) -> bool {
44		false
45	}
46}
47
48/// Fn impl for epics.
49impl<A, S, C, O, F> Epic<A, S, C> for F
50where
51	O: Stream<Item = Result<A, anyhow::Error>> + Send + 'static,
52	F: FnMut(&Actions<A, S, C>, &A, &S, &C) -> Option<O>,
53{
54	fn epic(
55		&mut self,
56		actions: &Actions<A, S, C>,
57		action: &A,
58		state: &S,
59		context: &C,
60	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
61		self(actions, action, state, context)
62	}
63}
64
65pub trait EpicExt<A, S, C>: Epic<A, S, C> {
66	/// Join two Epics.
67	///
68	/// # Notes
69	/// This will join on the stack.
70	/// If you want to join dozens of epics the heap should be used.
71	/// See: [`MergeEpic`].
72	fn join<E>(self, other: E) -> JoinEpic<Self, E>
73	where
74		Self: Sized,
75		A: Send + 'static,
76	{
77		JoinEpic(self, other)
78	}
79
80	fn switch(self) -> SwitchEpic<Self>
81	where
82		Self: Sized + Send + 'static,
83	{
84		SwitchEpic(self, None)
85	}
86
87	fn boxed(self) -> BoxEpic<'static, A, S, C>
88	where
89		Self: Sized + Send + 'static,
90	{
91		Box::new(self)
92	}
93}
94impl<T, A, S, C> EpicExt<A, S, C> for T where
95	T: Epic<A, S, C> + ?Sized + Send + 'static /* A: Send + Clone + 'static,
96	                                            * S: Send + Clone + 'static,
97	                                            * C: Send + Clone + 'static, */
98{
99}
100
101pub type BoxEpic<'a, A, S, C> = Box<dyn BoxStreamEpic<A, S, C> + Send + 'a>;
102
103/// Dynamic dispatchable epic.
104pub trait BoxStreamEpic<A, S, C> {
105	fn box_epic(
106		&mut self,
107		actions: &Actions<A, S, C>,
108		action: &A,
109		state: &S,
110		context: &C,
111	) -> Option<BoxStream<'static, Result<A, anyhow::Error>>>;
112
113	fn box_is_terminated(&self) -> bool;
114}
115impl<T, A, S, C> BoxStreamEpic<A, S, C> for T
116where
117	T: Epic<A, S, C>,
118{
119	fn box_epic(
120		&mut self,
121		actions: &Actions<A, S, C>,
122		action: &A,
123		state: &S,
124		context: &C,
125	) -> Option<BoxStream<'static, Result<A, anyhow::Error>>> {
126		self.epic(actions, action, state, context).map(|stream| stream.boxed())
127	}
128
129	fn box_is_terminated(&self) -> bool {
130		self.is_terminated()
131	}
132}
133
134/// Epic runtime to be uses as actor state.
135/// Expected to be called after the message has been applied to the state.
136pub struct EpicRuntime<M, A, S, C> {
137	actions: Actions<A, S, C>,
138	epic: BoxEpic<'static, A, S, C>,
139	error: Arc<dyn Fn(anyhow::Error) -> Option<A> + Sync + Send + 'static>,
140	_actor: PhantomData<fn(M, A, S, C)>,
141}
142impl<M, A, S, C> EpicRuntime<M, A, S, C>
143where
144	M: Send + 'static,
145	A: Clone + Send + 'static + Into<M>,
146	S: Send + 'static,
147	C: Send + 'static,
148{
149	pub fn new(
150		epic: impl EpicExt<A, S, C> + Send + 'static,
151		error: impl Fn(anyhow::Error) -> Option<A> + Sync + Send + 'static,
152	) -> Self {
153		let actions_epic = ActionsEpic::default();
154		let actions = actions_epic.actions();
155		Self { actions, epic: epic.join(actions_epic).boxed(), _actor: Default::default(), error: Arc::new(error) }
156	}
157
158	pub fn handle(&mut self, spawner: &TaskSpawner, actor: &ActorHandle<M>, action: &A, state: &S, context: &C) {
159		let stream = self.epic.box_epic(&self.actions, action, state, context);
160		if let Some(stream) = stream {
161			let actor = actor.clone();
162			let error = self.error.clone();
163			spawner.spawn_named(type_name::<A>(), async move {
164				let stream = stream.take_until(actor.closed());
165				pin_mut!(stream);
166				while let Some(action) = stream.next().await {
167					match action {
168						Ok(action) => {
169							actor.dispatch(action).ok();
170						},
171						Err(err) => {
172							if let Some(action) = (error)(err) {
173								actor.dispatch(action).ok();
174							}
175						},
176					}
177				}
178			});
179		}
180	}
181}
182impl<M, A, S, C> Debug for EpicRuntime<M, A, S, C> {
183	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184		f.debug_struct("EpicRuntime").field("_actor", &self._actor).finish()
185	}
186}
187
188/// Joins two epics into one.
189pub struct JoinEpic<E1, E2>(E1, E2);
190impl<E1, E2, A, S, C> Epic<A, S, C> for JoinEpic<E1, E2>
191where
192	A: Send + 'static,
193	E1: Epic<A, S, C>,
194	E2: Epic<A, S, C>,
195{
196	fn epic(
197		&mut self,
198		actions: &Actions<A, S, C>,
199		action: &A,
200		state: &S,
201		context: &C,
202	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
203		let s0 = if self.0.is_terminated() { None } else { self.0.epic(actions, action, state, context) };
204		let s1 = if self.1.is_terminated() { None } else { self.1.epic(actions, action, state, context) };
205		let s0 = async_stream::stream! {
206			if let Some(stream) = s0 {
207				for await item in stream {
208					yield item;
209				}
210			}
211		};
212		let s1 = async_stream::stream! {
213			if let Some(stream) = s1 {
214				for await item in stream {
215					yield item;
216				}
217			}
218		};
219		Some(futures::stream::select(s0, s1))
220	}
221}
222
223/// Merge BoxEpic into one.
224pub struct MergeEpic<A, S, C>(Vec<BoxEpic<'static, A, S, C>>);
225impl<A, S, C> Default for MergeEpic<A, S, C> {
226	fn default() -> Self {
227		Self(Default::default())
228	}
229}
230impl<A, S, C> MergeEpic<A, S, C> {
231	pub fn new() -> Self {
232		Self::default()
233	}
234
235	pub fn join(mut self, epic: impl EpicExt<A, S, C> + Send + 'static) -> Self {
236		self.0.push(epic.boxed());
237		self
238	}
239
240	pub fn push(&mut self, epic: impl EpicExt<A, S, C> + Send + 'static) {
241		self.0.push(epic.boxed());
242	}
243
244	pub fn box_push(&mut self, epic: BoxEpic<'static, A, S, C>) {
245		self.0.push(epic);
246	}
247
248	pub fn drain_terminated(&mut self) {
249		self.0.retain(|epic| !epic.box_is_terminated());
250	}
251}
252impl<A, S, C> Epic<A, S, C> for MergeEpic<A, S, C>
253where
254	A: Send + 'static,
255{
256	fn epic(
257		&mut self,
258		actions: &Actions<A, S, C>,
259		action: &A,
260		state: &S,
261		context: &C,
262	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
263		let streams: Vec<_> = self
264			.0
265			.iter_mut()
266			.filter(|epic| !epic.box_is_terminated())
267			.filter_map(|epic| epic.box_epic(actions, action, state, context))
268			.collect();
269		if !streams.is_empty() {
270			Some(stream::iter(streams).flatten_unordered(None))
271		} else {
272			None
273		}
274	}
275}
276
277/// Trace actions and state as debug messages.
278pub struct TracingEpic(Tags);
279impl TracingEpic {
280	pub fn new(tags: Tags) -> Self {
281		Self(tags)
282	}
283}
284impl<A, S, C> Epic<A, S, C> for TracingEpic
285where
286	A: Debug + Send + 'static,
287	S: Debug + Send + 'static,
288{
289	fn epic(
290		&mut self,
291		_actions: &Actions<A, S, C>,
292		action: &A,
293		state: &S,
294		_context: &C,
295	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + 'static> {
296		tracing::debug!(?action, ?state, tags = ?self.0, "action");
297		Option::<Empty<_>>::None
298	}
299}
300
301/// Only allow to run epic once.
302/// Once the epic returns another stream the previous will be dropped.
303pub struct SwitchEpic<E>(E, Option<CancellationToken>);
304impl<E, A, S, C> Epic<A, S, C> for SwitchEpic<E>
305where
306	E: Epic<A, S, C>,
307	A: Debug + Send + 'static,
308	S: Debug + Send + 'static,
309{
310	fn epic(
311		&mut self,
312		actions: &Actions<A, S, C>,
313		action: &A,
314		state: &S,
315		context: &C,
316	) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + 'static> {
317		let next = self.0.epic(actions, action, state, context);
318		match next {
319			Some(stream) => {
320				// cancel previous
321				if let Some(cancel) = self.1.take() {
322					cancel.cancel();
323				}
324
325				// create next
326				let token = CancellationToken::new();
327				self.1 = Some(token.clone());
328				Some(stream.take_until(token.cancelled_owned()))
329			},
330			None => None,
331		}
332	}
333}
334
335#[cfg(test)]
336mod tests {
337	use crate::{epic::Actions, Epic, EpicExt};
338	use futures::{stream, Stream, TryStreamExt};
339
340	#[derive(Debug, Clone, PartialEq)]
341	enum TestAction {
342		Hello,
343		World,
344	}
345	struct Test {}
346	impl Epic<TestAction, (), ()> for Test {
347		fn epic(
348			&mut self,
349			_actions: &Actions<TestAction, (), ()>,
350			action: &TestAction,
351			_state: &(),
352			_context: &(),
353		) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
354			match action {
355				TestAction::Hello => Some(stream::once(async { Ok(TestAction::World) })),
356				_ => None,
357			}
358		}
359	}
360
361	#[tokio::test]
362	async fn test_hello() {
363		let actions = Actions::default();
364		let mut epic = Test {};
365		let result: Vec<TestAction> = epic
366			.epic(&actions, &TestAction::Hello, &(), &())
367			.expect("a stream")
368			.try_collect()
369			.await
370			.expect("no error");
371		assert_eq!(result, vec![TestAction::World]);
372	}
373
374	#[tokio::test]
375	async fn test_fn_epic() {
376		fn test(
377			_actions: &Actions<TestAction, (), ()>,
378			action: &TestAction,
379			_state: &(),
380			_context: &(),
381		) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
382			match action {
383				TestAction::Hello => Some(stream::once(async { Ok(TestAction::World) })),
384				_ => None,
385			}
386		}
387		let actions = Actions::default();
388		let result: Vec<TestAction> = test
389			.epic(&actions, &TestAction::Hello, &(), &())
390			.expect("a stream")
391			.try_collect()
392			.await
393			.expect("no error");
394		assert_eq!(result, vec![TestAction::World]);
395	}
396
397	#[tokio::test]
398	async fn test_box_epic() {
399		let actions = Actions::default();
400		let mut epic = Test {}.boxed();
401		let result: Vec<TestAction> = epic
402			.box_epic(&actions, &TestAction::Hello, &(), &())
403			.expect("a stream")
404			.try_collect()
405			.await
406			.expect("no error");
407		assert_eq!(result, vec![TestAction::World]);
408	}
409}