1use crate::{BoxEpic, Epic, EpicExt, MergeEpic};
5use anyhow::anyhow;
6use futures::{
7 channel::oneshot,
8 pin_mut,
9 stream::{self},
10 FutureExt, Stream, StreamExt,
11};
12use std::{
13 future::{ready, Future},
14 mem::take,
15 ops::DerefMut,
16 sync::{Arc, Mutex},
17};
18
19pub struct Actions<A, S, C> {
20 pending: Arc<Mutex<Vec<BoxEpic<'static, A, S, C>>>>,
21}
22impl<A, S, C> Clone for Actions<A, S, C> {
23 fn clone(&self) -> Self {
24 Self { pending: self.pending.clone() }
25 }
26}
27impl<A, S, C> Default for Actions<A, S, C> {
28 fn default() -> Self {
29 Self { pending: Arc::new(Mutex::new(Default::default())) }
30 }
31}
32impl<A, S, C> Actions<A, S, C>
33where
34 A: Clone + Send + 'static,
35{
36 pub fn once_epic<E>(&self, epic: E) -> impl Future<Output = Result<A, anyhow::Error>> + use<A, S, C, E>
39 where
40 E: EpicExt<A, S, C> + Send + 'static,
41 {
42 let (tx, rx) = oneshot::channel();
43
44 {
46 self.pending
47 .lock()
48 .unwrap()
49 .push(OneshotEpic { epic, sender: Some(tx) }.boxed());
50 }
51
52 async move { rx.await? }
54 }
55
56 pub fn once<F>(&self, predicate: F) -> impl Future<Output = Result<A, anyhow::Error>> + use<A, S, C, F>
59 where
60 F: for<'a> Fn(&'a A) -> bool + Send + 'static,
61 {
62 self.once_epic(FilterEpic(predicate))
63 }
64
65 pub fn once_map<F, O>(&self, map: F) -> impl Future<Output = Result<O, anyhow::Error>> + use<A, S, C, F, O>
68 where
69 F: (for<'a> Fn(&'a A) -> Option<O>) + Clone + Send + 'static,
70 {
71 let action = self.once_epic(FilterEpic({
72 let map = map.clone();
73 move |action: &A| -> bool { map(action).is_some() }
74 }));
75 async move {
76 let action = action.await?;
77 map(&action).ok_or(anyhow!("Expected preficate to return some output"))
78 }
79 }
80}
81
82pub struct ActionsEpic<A, S, C> {
84 inner: MergeEpic<A, S, C>,
85 api: Actions<A, S, C>,
86}
87impl<A, S, C> Default for ActionsEpic<A, S, C> {
88 fn default() -> Self {
89 Self { inner: MergeEpic::new(), api: Default::default() }
90 }
91}
92impl<A, S, C> ActionsEpic<A, S, C>
93where
94 A: Clone + Send + 'static,
95{
96 pub fn actions(&self) -> Actions<A, S, C> {
97 self.api.clone()
98 }
99}
100impl<A, S, C> Epic<A, S, C> for ActionsEpic<A, S, C>
101where
102 A: Send + 'static,
103{
104 fn epic(
105 &mut self,
106 actions: &Actions<A, S, C>,
107 action: &A,
108 state: &S,
109 context: &C,
110 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
111 let pending = { take(self.api.pending.lock().unwrap().deref_mut()) };
113 for item in pending {
114 self.inner.box_push(item);
115 }
116
117 let stream = self.inner.epic(actions, action, state, context).map(|s| s.boxed());
119
120 self.inner.drain_terminated();
122
123 stream
125 }
126}
127
128struct FilterEpic<F>(F);
130impl<F, A, S, C> Epic<A, S, C> for FilterEpic<F>
131where
132 F: Fn(&A) -> bool + Send + 'static,
133 A: Clone + Send + 'static,
134{
135 fn epic(
136 &mut self,
137 _actions: &Actions<A, S, C>,
138 action: &A,
139 _state: &S,
140 _context: &C,
141 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
142 if (self.0)(action) {
143 Some(stream::iter([Ok(action.clone())]))
144 } else {
145 None
146 }
147 }
148}
149
150struct OneshotEpic<E, A> {
152 epic: E,
153 sender: Option<oneshot::Sender<Result<A, anyhow::Error>>>,
154}
155impl<E, A, S, C> Epic<A, S, C> for OneshotEpic<E, A>
156where
157 E: Epic<A, S, C>,
158 A: Clone + Send + 'static,
159{
160 fn epic(
161 &mut self,
162 actions: &Actions<A, S, C>,
163 action: &A,
164 state: &S,
165 context: &C,
166 ) -> Option<impl Stream<Item = Result<A, anyhow::Error>> + Send + 'static> {
167 if self.sender.is_some() {
168 if let Some(stream) = self.epic.epic(actions, action, state, context) {
169 if let Some(sender) = self.sender.take() {
170 return Some(
171 async move {
172 pin_mut!(stream);
173 if let Some(action) = stream.next().await {
174 sender.send(action).ok();
175 }
176 }
177 .into_stream()
178 .filter_map(|_| ready(None)),
179 );
180 }
181 }
182 }
183 None
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use crate::{epic::actions::ActionsEpic, Actions, Epic, EpicExt};
190 use futures::{pin_mut, stream::select, FutureExt, Stream, TryStreamExt};
191
192 #[derive(Debug, Clone, PartialEq)]
193 enum TestAction {
194 Greet,
195 Hello,
196 World,
197 }
198 struct Test {}
199 impl Epic<TestAction, (), ()> for Test {
200 fn epic(
201 &mut self,
202 actions: &Actions<TestAction, (), ()>,
203 action: &TestAction,
204 _state: &(),
205 _context: &(),
206 ) -> Option<impl Stream<Item = Result<TestAction, anyhow::Error>> + Send + 'static> {
207 match action {
208 TestAction::Greet => Some({
209 let actions = actions.clone();
210 let answer_with_world = async move {
211 let once_world = actions.once(|a| matches!(a, TestAction::Hello));
212
213 once_world.await?;
215
216 Ok(TestAction::World)
218 }
219 .into_stream();
220 let hello = async_stream::stream! { yield Ok(TestAction::Hello);};
221 select(answer_with_world, hello)
222 }),
223 _ => None,
224 }
225 }
226 }
227
228 #[tokio::test]
229 async fn test_once() {
230 let actions_epic = ActionsEpic::default();
231 let actions = actions_epic.actions();
232 let test_epic = Test {};
233 let mut epic = actions_epic.join(test_epic);
234 let stream = epic.epic(&actions, &TestAction::Greet, &(), &()).expect("a stream");
235 let mut result = Vec::new();
236 pin_mut!(stream);
237 while let Some(action) = stream.try_next().await.unwrap() {
238 result.push(action.clone());
239 if let Some(epic_actions) = epic.epic(&actions, &action, &(), &()) {
240 let mut epic_actions = epic_actions.try_collect::<Vec<TestAction>>().await.unwrap();
241 result.append(&mut epic_actions);
242 }
243 }
244 assert_eq!(result, vec![TestAction::Hello, TestAction::World]);
245 }
246}