1use super::ActorHandle;
5use crate::{epic::actions::ActionsEpic, TaskSpawner};
6use co_primitives::Tags;
7use futures::{
8 pin_mut,
9 stream::{self, BoxStream, Empty},
10 Stream, StreamExt,
11};
12use std::{
13 any::type_name,
14 fmt::Debug,
15 marker::{PhantomData, Send},
16 sync::Arc,
17};
18use tokio_util::sync::CancellationToken;
19
20mod action_dispatch;
21mod actions;
22
23pub use action_dispatch::ActionDispatch;
24pub use actions::Actions;
25
26pub trait Epic<A, S, C> {
30 fn epic(
35 &mut self,
36 actions: &Actions<A, S, C>,
37 action: &A,
38 state: &S,
39 context: &C,
40 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static>;
41
42 fn is_terminated(&self) -> bool {
44 false
45 }
46}
47
48impl<A, S, C, O, F> Epic<A, S, C> for F
50where
51 O: Stream<Item = Result<A, anyhow::Error>> + Send + 'static,
52 F: FnMut(&Actions<A, S, C>, &A, &S, &C) -> Option<O>,
53{
54 fn epic(
55 &mut self,
56 actions: &Actions<A, S, C>,
57 action: &A,
58 state: &S,
59 context: &C,
60 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
61 self(actions, action, state, context)
62 }
63}
64
65pub trait EpicExt<A, S, C>: Epic<A, S, C> {
66 fn join<E>(self, other: E) -> JoinEpic<Self, E>
73 where
74 Self: Sized,
75 A: Send + 'static,
76 {
77 JoinEpic(self, other)
78 }
79
80 fn switch(self) -> SwitchEpic<Self>
81 where
82 Self: Sized + Send + 'static,
83 {
84 SwitchEpic(self, None)
85 }
86
87 fn boxed(self) -> BoxEpic<'static, A, S, C>
88 where
89 Self: Sized + Send + 'static,
90 {
91 Box::new(self)
92 }
93}
94impl<T, A, S, C> EpicExt<A, S, C> for T where
95 T: Epic<A, S, C> + ?Sized + Send + 'static {
99}
100
101pub type BoxEpic<'a, A, S, C> = Box<dyn BoxStreamEpic<A, S, C> + Send + 'a>;
102
103pub trait BoxStreamEpic<A, S, C> {
105 fn box_epic(
106 &mut self,
107 actions: &Actions<A, S, C>,
108 action: &A,
109 state: &S,
110 context: &C,
111 ) -> Option<BoxStream<'static, Result<A, anyhow::Error>>>;
112
113 fn box_is_terminated(&self) -> bool;
114}
115impl<T, A, S, C> BoxStreamEpic<A, S, C> for T
116where
117 T: Epic<A, S, C>,
118{
119 fn box_epic(
120 &mut self,
121 actions: &Actions<A, S, C>,
122 action: &A,
123 state: &S,
124 context: &C,
125 ) -> Option<BoxStream<'static, Result<A, anyhow::Error>>> {
126 self.epic(actions, action, state, context).map(|stream| stream.boxed())
127 }
128
129 fn box_is_terminated(&self) -> bool {
130 self.is_terminated()
131 }
132}
133
134pub struct EpicRuntime<M, A, S, C> {
137 actions: Actions<A, S, C>,
138 epic: BoxEpic<'static, A, S, C>,
139 error: Arc<dyn Fn(anyhow::Error) -> Option<A> + Sync + Send + 'static>,
140 _actor: PhantomData<fn(M, A, S, C)>,
141}
142impl<M, A, S, C> EpicRuntime<M, A, S, C>
143where
144 M: Send + 'static,
145 A: Clone + Send + 'static + Into<M>,
146 S: Send + 'static,
147 C: Send + 'static,
148{
149 pub fn new(
150 epic: impl EpicExt<A, S, C> + Send + 'static,
151 error: impl Fn(anyhow::Error) -> Option<A> + Sync + Send + 'static,
152 ) -> Self {
153 let actions_epic = ActionsEpic::default();
154 let actions = actions_epic.actions();
155 Self { actions, epic: epic.join(actions_epic).boxed(), _actor: Default::default(), error: Arc::new(error) }
156 }
157
158 pub fn handle(&mut self, spawner: &TaskSpawner, actor: &ActorHandle<M>, action: &A, state: &S, context: &C) {
159 let stream = self.epic.box_epic(&self.actions, action, state, context);
160 if let Some(stream) = stream {
161 let actor = actor.clone();
162 let error = self.error.clone();
163 spawner.spawn_named(type_name::<A>(), async move {
164 let stream = stream.take_until(actor.closed());
165 pin_mut!(stream);
166 while let Some(action) = stream.next().await {
167 match action {
168 Ok(action) => {
169 actor.dispatch(action).ok();
170 },
171 Err(err) => {
172 if let Some(action) = (error)(err) {
173 actor.dispatch(action).ok();
174 }
175 },
176 }
177 }
178 });
179 }
180 }
181}
182impl<M, A, S, C> Debug for EpicRuntime<M, A, S, C> {
183 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 f.debug_struct("EpicRuntime").field("_actor", &self._actor).finish()
185 }
186}
187
188pub struct JoinEpic<E1, E2>(E1, E2);
190impl<E1, E2, A, S, C> Epic<A, S, C> for JoinEpic<E1, E2>
191where
192 A: Send + 'static,
193 E1: Epic<A, S, C>,
194 E2: Epic<A, S, C>,
195{
196 fn epic(
197 &mut self,
198 actions: &Actions<A, S, C>,
199 action: &A,
200 state: &S,
201 context: &C,
202 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
203 let s0 = if self.0.is_terminated() { None } else { self.0.epic(actions, action, state, context) };
204 let s1 = if self.1.is_terminated() { None } else { self.1.epic(actions, action, state, context) };
205 let s0 = async_stream::stream! {
206 if let Some(stream) = s0 {
207 for await item in stream {
208 yield item;
209 }
210 }
211 };
212 let s1 = async_stream::stream! {
213 if let Some(stream) = s1 {
214 for await item in stream {
215 yield item;
216 }
217 }
218 };
219 Some(futures::stream::select(s0, s1))
220 }
221}
222
223pub struct MergeEpic<A, S, C>(Vec<BoxEpic<'static, A, S, C>>);
225impl<A, S, C> Default for MergeEpic<A, S, C> {
226 fn default() -> Self {
227 Self(Default::default())
228 }
229}
230impl<A, S, C> MergeEpic<A, S, C> {
231 pub fn new() -> Self {
232 Self::default()
233 }
234
235 pub fn join(mut self, epic: impl EpicExt<A, S, C> + Send + 'static) -> Self {
236 self.0.push(epic.boxed());
237 self
238 }
239
240 pub fn push(&mut self, epic: impl EpicExt<A, S, C> + Send + 'static) {
241 self.0.push(epic.boxed());
242 }
243
244 pub fn box_push(&mut self, epic: BoxEpic<'static, A, S, C>) {
245 self.0.push(epic);
246 }
247
248 pub fn drain_terminated(&mut self) {
249 self.0.retain(|epic| !epic.box_is_terminated());
250 }
251}
252impl<A, S, C> Epic<A, S, C> for MergeEpic<A, S, C>
253where
254 A: Send + 'static,
255{
256 fn epic(
257 &mut self,
258 actions: &Actions<A, S, C>,
259 action: &A,
260 state: &S,
261 context: &C,
262 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
263 let streams: Vec<_> = self
264 .0
265 .iter_mut()
266 .filter(|epic| !epic.box_is_terminated())
267 .filter_map(|epic| epic.box_epic(actions, action, state, context))
268 .collect();
269 if !streams.is_empty() {
270 Some(stream::iter(streams).flatten_unordered(None))
271 } else {
272 None
273 }
274 }
275}
276
277pub struct TracingEpic(Tags);
279impl TracingEpic {
280 pub fn new(tags: Tags) -> Self {
281 Self(tags)
282 }
283}
284impl<A, S, C> Epic<A, S, C> for TracingEpic
285where
286 A: Debug + Send + 'static,
287 S: Debug + Send + 'static,
288{
289 fn epic(
290 &mut self,
291 _actions: &Actions<A, S, C>,
292 action: &A,
293 state: &S,
294 _context: &C,
295 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + 'static> {
296 tracing::debug!(?action, ?state, tags = ?self.0, "action");
297 Option::<Empty<_>>::None
298 }
299}
300
301pub struct SwitchEpic<E>(E, Option<CancellationToken>);
304impl<E, A, S, C> Epic<A, S, C> for SwitchEpic<E>
305where
306 E: Epic<A, S, C>,
307 A: Debug + Send + 'static,
308 S: Debug + Send + 'static,
309{
310 fn epic(
311 &mut self,
312 actions: &Actions<A, S, C>,
313 action: &A,
314 state: &S,
315 context: &C,
316 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + 'static> {
317 let next = self.0.epic(actions, action, state, context);
318 match next {
319 Some(stream) => {
320 if let Some(cancel) = self.1.take() {
322 cancel.cancel();
323 }
324
325 let token = CancellationToken::new();
327 self.1 = Some(token.clone());
328 Some(stream.take_until(token.cancelled_owned()))
329 },
330 None => None,
331 }
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use crate::{epic::Actions, Epic, EpicExt};
338 use futures::{stream, Stream, TryStreamExt};
339
340 #[derive(Debug, Clone, PartialEq)]
341 enum TestAction {
342 Hello,
343 World,
344 }
345 struct Test {}
346 impl Epic<TestAction, (), ()> for Test {
347 fn epic(
348 &mut self,
349 _actions: &Actions<TestAction, (), ()>,
350 action: &TestAction,
351 _state: &(),
352 _context: &(),
353 ) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
354 match action {
355 TestAction::Hello => Some(stream::once(async { Ok(TestAction::World) })),
356 _ => None,
357 }
358 }
359 }
360
361 #[tokio::test]
362 async fn test_hello() {
363 let actions = Actions::default();
364 let mut epic = Test {};
365 let result: Vec<TestAction> = epic
366 .epic(&actions, &TestAction::Hello, &(), &())
367 .expect("a stream")
368 .try_collect()
369 .await
370 .expect("no error");
371 assert_eq!(result, vec![TestAction::World]);
372 }
373
374 #[tokio::test]
375 async fn test_fn_epic() {
376 fn test(
377 _actions: &Actions<TestAction, (), ()>,
378 action: &TestAction,
379 _state: &(),
380 _context: &(),
381 ) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
382 match action {
383 TestAction::Hello => Some(stream::once(async { Ok(TestAction::World) })),
384 _ => None,
385 }
386 }
387 let actions = Actions::default();
388 let result: Vec<TestAction> = test
389 .epic(&actions, &TestAction::Hello, &(), &())
390 .expect("a stream")
391 .try_collect()
392 .await
393 .expect("no error");
394 assert_eq!(result, vec![TestAction::World]);
395 }
396
397 #[tokio::test]
398 async fn test_box_epic() {
399 let actions = Actions::default();
400 let mut epic = Test {}.boxed();
401 let result: Vec<TestAction> = epic
402 .box_epic(&actions, &TestAction::Hello, &(), &())
403 .expect("a stream")
404 .try_collect()
405 .await
406 .expect("no error");
407 assert_eq!(result, vec![TestAction::World]);
408 }
409}