1use futures::FutureExt;
2use futures::future::{BoxFuture, Shared};
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6pub use std::sync::Mutex as StdMutex;
7pub use tokio::sync::Mutex as TokioMutex; use tokio_util::sync::CancellationToken;
9use crate::metrics::{MetricLabel, REGISTRY};
11use tracing::Instrument;
12
13use tokio::time::{Duration, Instant};
14
15pub trait Clock: Send + Sync + 'static {
17 fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()>;
18 fn now(&self) -> Instant;
19}
20
21pub struct LiveClock;
22impl Clock for LiveClock {
23 fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()> {
24 Box::pin(async move {
25 tokio::time::sleep(duration).await;
26 })
27 }
28 fn now(&self) -> Instant {
29 Instant::now()
30 }
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct FiberId(pub usize);
36
37#[derive(Clone)]
40pub struct EnvRef<R> {
41 pub value: R,
42}
43
44#[derive(Clone, Default)]
46pub struct Env {
47 map: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
48}
49
50impl Env {
51 pub fn new() -> Self {
52 Self {
53 map: HashMap::new(),
54 }
55 }
56
57 pub fn insert<T: Send + Sync + 'static>(&mut self, val: T) {
58 self.map.insert(TypeId::of::<T>(), Arc::new(val));
59 }
60
61 pub fn get<T: Send + Sync + 'static>(&self) -> Option<Arc<T>> {
62 self.map
63 .get(&TypeId::of::<T>())
64 .cloned()
65 .and_then(|any| any.downcast::<T>().ok())
66 }
67}
68
69#[derive(Clone)]
71pub struct Fiber<E, A> {
72 pub id: FiberId,
73 pub join_future: Shared<BoxFuture<'static, Exit<E, A>>>,
74 pub token: CancellationToken,
75}
76
77impl<E, A> Fiber<E, A>
78where
79 E: Send + Sync + Clone + 'static,
80 A: Send + Sync + Clone + 'static,
81{
82 pub async fn join(self) -> Exit<E, A> {
84 self.join_future.await
85 }
86
87 pub async fn interrupt(self) -> Exit<E, A> {
89 self.token.cancel();
90 self.join().await
91 }
92}
93
94#[derive(Clone)]
96pub struct Ctx {
97 pub token: CancellationToken,
98 pub scope: ScopeHandle,
99 pub fiber_id: FiberId,
100 pub locals: Arc<TokioMutex<HashMap<usize, Arc<dyn Any + Send + Sync>>>>,
101 pub clock: Arc<dyn Clock>,
102}
103
104impl Default for Ctx {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl Ctx {
111 pub fn new() -> Self {
112 Self {
113 token: CancellationToken::new(),
114 scope: ScopeHandle::new(),
115 fiber_id: FiberId(0),
116 locals: Arc::new(TokioMutex::new(HashMap::new())),
117 clock: Arc::new(LiveClock),
118 }
119 }
120}
121
122#[derive(Clone)]
124pub struct FiberRef<T> {
125 id: usize,
126 initial: Arc<T>,
127}
128
129impl<T: Send + Sync + 'static + Clone> FiberRef<T> {
130 pub fn new(initial: T) -> Self {
131 static NEXT_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
135 let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136
137 Self {
138 id,
139 initial: Arc::new(initial),
140 }
141 }
142
143 pub fn get(&self) -> Effect<(), (), T> {
144 let id = self.id;
145 let initial = self.initial.clone();
146 Effect::access_async(move |_, ctx| {
147 let initial = initial.clone();
148 async move {
149 let locals = ctx.locals.lock().await;
150 if let Some(val) = locals.get(&id) {
151 val.downcast_ref::<T>().cloned().unwrap()
152 } else {
153 (*initial).clone()
154 }
155 }
156 })
157 }
158
159 pub fn set(&self, value: T) -> Effect<(), (), ()> {
160 let id = self.id;
161 Effect::<(), (), ()>::access_async(move |_, ctx| async move {
162 let mut locals = ctx.locals.lock().await;
163 locals.insert(id, Arc::new(value));
164 })
165 }
166}
167
168#[derive(Clone)]
171pub struct Ref<A> {
172 value: Arc<TokioMutex<A>>,
173}
174
175impl<A> Ref<A>
176where
177 A: Send + Sync + 'static + Clone,
178{
179 pub fn new(value: A) -> Self {
181 Self {
182 value: Arc::new(TokioMutex::new(value)),
183 }
184 }
185
186 pub fn get(&self) -> Effect<(), (), A> {
188 let value = self.value.clone();
189 Effect::<(), (), A>::async_effect(move || async move {
190 let guard = value.lock().await;
191 guard.clone()
192 })
193 }
194
195 pub fn set(&self, new_value: A) -> Effect<(), (), ()> {
197 let value = self.value.clone();
198 Effect::<(), (), ()>::async_effect(move || async move {
199 let mut guard = value.lock().await;
200 *guard = new_value;
201 })
202 }
203
204 pub fn update<F>(&self, f: F) -> Effect<(), (), A>
206 where
207 F: FnOnce(A) -> A + Send + Sync + 'static + Clone,
208 {
209 let value = self.value.clone();
210 Effect::<(), (), A>::async_effect(move || async move {
211 let mut guard = value.lock().await;
212 let new_val = f(guard.clone());
213 *guard = new_val.clone();
214 new_val
215 })
216 }
217}
218
219type Waiter<E, A> = tokio::sync::oneshot::Sender<Exit<E, A>>;
222
223#[derive(Clone)]
224pub struct Deferred<E, A> {
225 state: Arc<TokioMutex<Option<Exit<E, A>>>>,
226 waiters: Arc<TokioMutex<Vec<Waiter<E, A>>>>,
227}
228
229impl<E, A> Default for Deferred<E, A>
230where
231 E: Send + Sync + Clone + 'static,
232 A: Send + Sync + Clone + 'static,
233{
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239impl<E, A> Deferred<E, A>
240where
241 E: Send + Sync + Clone + 'static,
242 A: Send + Sync + Clone + 'static,
243{
244 pub fn new() -> Self {
245 Self {
246 state: Arc::new(TokioMutex::new(None)),
247 waiters: Arc::new(TokioMutex::new(Vec::new())),
248 }
249 }
250
251 pub fn complete(&self, exit: Exit<E, A>) -> Effect<(), (), bool> {
253 let state = self.state.clone();
254 let waiters = self.waiters.clone();
255 Effect::<(), (), bool>::async_effect(move || async move {
256 let mut guard = state.lock().await;
257 if guard.is_some() {
258 false
259 } else {
260 *guard = Some(exit.clone());
261 let mut waiters = waiters.lock().await;
262 for sender in waiters.drain(..) {
263 let _ = sender.send(exit.clone());
264 }
265 true
266 }
267 })
268 }
269
270 pub fn succeed(&self, value: A) -> Effect<(), (), bool> {
272 self.complete(Exit::Success(value))
273 }
274
275 pub fn fail(&self, error: E) -> Effect<(), (), bool> {
277 self.complete(Exit::Failure(Cause::Fail(error)))
278 }
279
280 pub fn await_result(&self) -> Effect<(), E, A> {
282 let state = self.state.clone();
283 let waiters = self.waiters.clone();
284 Effect::<(), E, ()>::done(Exit::Success(())).flat_map(move |_| {
285 let state = state.clone();
286 let waiters = waiters.clone();
287 Effect::async_effect(move || async move {
288 {
290 let guard = state.lock().await;
291 if let Some(exit) = guard.as_ref() {
292 return exit.clone();
293 }
294 }
295
296 let (tx, rx) = tokio::sync::oneshot::channel();
298 {
299 let guard = state.lock().await;
300 if let Some(exit) = guard.as_ref() {
301 return exit.clone();
302 }
303 let mut waiters_guard = waiters.lock().await;
304 waiters_guard.push(tx);
305 }
306
307 rx.await.unwrap_or_else(|_| {
309 Exit::Failure(Cause::Die(Arc::new("Sender dropped".to_string())))
310 })
311 })
312 .flat_map(Effect::done)
313 })
314 }
315}
316
317#[derive(Clone)]
319pub struct Queue<A> {
320 sender: tokio::sync::mpsc::Sender<A>,
321 receiver: Arc<TokioMutex<tokio::sync::mpsc::Receiver<A>>>,
322}
323
324impl<A> Queue<A>
325where
326 A: Send + Sync + 'static + Clone,
327{
328 pub fn new(capacity: usize) -> Self {
330 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
331 Self {
332 sender,
333 receiver: Arc::new(TokioMutex::new(receiver)),
334 }
335 }
336
337 pub fn offer(&self, value: A) -> Effect<(), (), bool> {
339 let sender = self.sender.clone();
340 Effect::<(), (), bool>::async_effect(
341 move || async move { (sender.send(value).await).is_ok() },
342 )
343 }
344
345 pub fn take(&self) -> Effect<(), (), Option<A>> {
347 let receiver = self.receiver.clone();
348 Effect::<(), (), Option<A>>::async_effect(move || async move {
349 let mut options = receiver.lock().await;
350 options.recv().await
351 })
352 }
353}
354
355#[derive(Clone, Debug)]
356pub enum Cause<E> {
357 Fail(E),
358 Die(Defect),
359 Interrupt,
360}
361
362#[derive(Clone, Copy, Debug)]
363pub enum ScopeExit {
364 Success,
365 Failure, Interrupt,
367}
368
369type Finalizer = Box<dyn FnOnce(ScopeExit) -> BoxFuture<'static, ()> + Send>;
370
371#[derive(Clone)]
372pub struct ScopeHandle {
373 finalizers: Arc<TokioMutex<Vec<Finalizer>>>,
374}
375
376impl<E, A> From<&Exit<E, A>> for ScopeExit {
377 fn from(exit: &Exit<E, A>) -> Self {
378 match exit {
379 Exit::Success(_) => ScopeExit::Success,
380 Exit::Failure(Cause::Interrupt) => ScopeExit::Interrupt,
381 _ => ScopeExit::Failure,
382 }
383 }
384}
385
386impl Default for ScopeHandle {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392impl ScopeHandle {
393 pub fn new() -> Self {
394 Self {
395 finalizers: Arc::new(TokioMutex::new(Vec::new())),
396 }
397 }
398
399 pub async fn add_finalizer<F>(&self, f: F)
400 where
401 F: FnOnce(ScopeExit) -> BoxFuture<'static, ()> + Send + 'static,
402 {
403 let mut finalizers = self.finalizers.lock().await;
404 finalizers.push(Box::new(f));
405 }
406
407 pub async fn close(&self, exit: ScopeExit) {
408 let mut finalizers = self.finalizers.lock().await;
409 while let Some(f) = finalizers.pop() {
411 f(exit).await;
412 }
413 }
414}
415
416#[derive(Debug, Clone)]
418pub enum Exit<E, A> {
419 Success(A),
420 Failure(Cause<E>),
421}
422
423impl<E> Cause<E> {
424 pub fn map<E2, F>(self, f: &F) -> Cause<E2>
425 where
426 F: Fn(E) -> E2,
427 {
428 match self {
429 Cause::Fail(e) => Cause::Fail(f(e)),
430 Cause::Die(d) => Cause::Die(d),
431 Cause::Interrupt => Cause::Interrupt,
432 }
443 }
444}
445
446pub type Defect = Arc<dyn std::any::Any + Send + Sync>;
448
449type EffectFn<R, E, A> = dyn Fn(EnvRef<R>, Ctx) -> BoxFuture<'static, Exit<E, A>> + Send + Sync;
455
456pub struct Effect<R, E, A> {
457 pub(crate) inner: Arc<EffectFn<R, E, A>>,
458}
459
460impl<R, E, A> Clone for Effect<R, E, A> {
461 fn clone(&self) -> Self {
462 Self {
463 inner: self.inner.clone(),
464 }
465 }
466}
467
468impl<R, E, A> Effect<R, E, A>
470where
471 R: Clone + Send + Sync + 'static,
472 E: Send + Sync + 'static,
473 A: Send + Sync + 'static,
474{
475 pub fn succeed(value: A) -> Self
477 where
478 A: Send + Sync + Clone,
479 {
480 Self {
481 inner: Arc::new(move |_, _| {
482 let value = value.clone();
483 Box::pin(async move { Exit::Success(value) })
484 }),
485 }
486 }
487
488 pub fn fail(error: E) -> Self
490 where
491 E: Send + Sync + Clone,
492 {
493 Self {
494 inner: Arc::new(move |_, _| {
495 let error = error.clone();
496 Box::pin(async move { Exit::Failure(Cause::Fail(error)) })
497 }),
498 }
499 }
500
501 pub fn sync<F>(f: F) -> Self
503 where
504 F: FnOnce() -> A + Send + Sync + 'static + Clone,
505 A: Send,
506 {
507 Self {
508 inner: Arc::new(move |_, _| {
509 let f = f.clone();
510 Box::pin(async move { Exit::Success(f()) })
511 }),
512 }
513 }
514
515 pub fn async_effect<F, Fut>(f: F) -> Self
517 where
518 F: FnOnce() -> Fut + Send + Sync + 'static + Clone,
519 Fut: futures::Future<Output = A> + Send + 'static,
520 A: Send,
521 {
522 Self {
523 inner: Arc::new(move |_, _| {
524 let f = f.clone();
525 Box::pin(async move { Exit::Success(f().await) })
526 }),
527 }
528 }
529
530 pub fn sleep(duration: Duration) -> Self
532 where
533 A: From<()>, {
535 Self {
536 inner: Arc::new(move |_, ctx| {
537 Box::pin(async move {
538 ctx.clock.sleep(duration).await;
539 Exit::Success(A::from(()))
540 })
541 }),
542 }
543 }
544
545 pub fn with_metric_increment(self, name: &str, labels: Vec<MetricLabel>) -> Self
547 where
548 A: Send + Sync + 'static + Clone,
549 E: Send + Sync + 'static + Clone,
550 {
551 let name = name.to_string();
552 self.map(move |val| {
553 REGISTRY.get_counter(&name, labels.clone()).increment(1);
554 val
555 })
556 }
557
558 pub fn with_metric_duration(self, name: &str, labels: Vec<MetricLabel>) -> Self
560 where
561 A: Send + Sync + 'static + Clone,
562 E: Send + Sync + 'static + Clone,
563 R: 'static + Clone + Send + Sync,
564 {
565 self.timed(name, labels)
566 }
567
568 pub fn timed(self, name: &str, labels: Vec<MetricLabel>) -> Self
569 where
570 A: Send + Sync + 'static + Clone,
571 E: Send + Sync + 'static + Clone,
572 R: 'static + Clone + Send + Sync,
573 {
574 let name = name.to_string();
575 Effect::sync(Instant::now).flat_map(move |start| {
576 let labels = labels.clone();
577 let name = name.clone();
578 self.clone().map(move |res| {
579 let elapsed = start.elapsed().as_secs_f64();
580 REGISTRY
581 .get_histogram(&name, labels, vec![0.001, 0.01, 0.1, 1.0, 10.0])
582 .record(elapsed);
583 res
584 })
585 })
586 }
587
588 pub fn access_async<F, Fut>(f: F) -> Self
590 where
591 R: Send + Sync,
592 F: FnOnce(EnvRef<R>, Ctx) -> Fut + Send + Sync + 'static + Clone,
593 Fut: futures::Future<Output = A> + Send + 'static,
594 A: Send,
595 {
596 Self {
597 inner: Arc::new(move |env, ctx| {
598 let f = f.clone();
599 Box::pin(async move { Exit::Success(f(env, ctx).await) })
600 }),
601 }
602 }
603
604 pub fn provide(self, env: R) -> Effect<(), E, A>
606 where
607 R: Clone + Send + Sync + 'static,
608 E: Send + Sync + 'static,
609 A: Send + Sync + 'static,
610 {
611 Effect {
612 inner: Arc::new(move |_, ctx| {
613 let effect = self.clone();
614 let env = env.clone();
615 Box::pin(async move { (effect.inner)(EnvRef { value: env }, ctx).await })
616 }),
617 }
618 }
619
620 pub fn done(exit: Exit<E, A>) -> Self
622 where
623 E: Send + Sync + Clone,
624 A: Send + Sync + Clone,
625 {
626 Self {
627 inner: Arc::new(move |_, _| {
628 let exit = exit.clone();
629 Box::pin(async move { exit })
630 }),
631 }
632 }
633
634 pub fn map<B, F>(self, f: F) -> Effect<R, E, B>
635 where
636 F: FnOnce(A) -> B + Send + Sync + 'static + Clone,
637 B: Send + Sync + 'static + Clone,
638 R: Clone + Send + Sync + 'static,
639 E: Send + Sync + 'static,
640 A: Send + Sync + 'static,
641 {
642 self.flat_map(move |a| -> Effect<R, E, B> { Effect::<R, E, B>::succeed(f(a)) })
643 }
644
645 pub fn map_error<E2, F>(self, f: F) -> Effect<R, E2, A>
646 where
647 F: Fn(E) -> E2 + Send + Sync + 'static + Clone,
648 R: Send + Sync + 'static,
649 A: Send + Sync + 'static,
650 E: Send + Sync + 'static,
651 E2: Send + Sync + 'static,
652 {
653 Effect {
654 inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
655 let effect = self.clone();
656 let f = f.clone();
657 Box::pin(async move {
658 match (effect.inner)(env, ctx).await {
659 Exit::Success(a) => Exit::Success(a),
660 Exit::Failure(cause) => Exit::Failure(cause.map(&f)),
661 }
662 })
663 }),
664 }
665 }
666
667 pub fn flat_map<B, F>(self, f: F) -> Effect<R, E, B>
668 where
669 F: FnOnce(A) -> Effect<R, E, B> + Send + Sync + 'static + Clone,
670 B: Send + 'static,
671 R: Clone + Send + Sync + 'static,
672 E: Send + Sync + 'static,
673 A: Send + Sync,
674 {
675 Effect {
676 inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
677 let effect = self.clone();
678 let f = f.clone();
679 Box::pin(async move {
680 match (effect.inner)(env.clone(), ctx.clone()).await {
681 Exit::Success(a) => {
682 let next_effect = f(a);
683 (next_effect.inner)(env, ctx).await
684 }
685 Exit::Failure(c) => Exit::Failure(c),
686 }
687 })
688 }),
689 }
690 }
691
692 pub fn delay(self, duration: Duration) -> Effect<R, E, A>
694 where
695 R: Clone + Send + Sync + 'static,
696 E: Send + Sync + 'static,
697 A: Send + Sync + 'static,
698 {
699 Effect::<R, E, ()>::sleep(duration).flat_map(move |_| self)
700 }
701
702 pub fn trace(self, name: &'static str) -> Effect<R, E, A>
704 where
705 R: Clone + Send + Sync + 'static,
706 E: Send + Sync + 'static,
707 A: Send + Sync + 'static,
708 {
709 Effect {
710 inner: Arc::new(move |env, ctx| {
711 let effect = self.clone();
712 let span = tracing::info_span!("effect", name = name);
713
714 async move { (effect.inner)(env, ctx).await }
715 .instrument(span)
716 .boxed()
717 }),
718 }
719 }
720
721 pub fn on_interrupt<F, R2, E2, X>(self, cleanup: F) -> Effect<R, E, A>
723 where
724 F: Fn() -> Effect<R2, E2, X> + Send + Sync + 'static + Clone,
725 R2: From<R> + Send + Sync + 'static + Clone,
726 E2: Send + Sync + 'static,
727 X: Send + Sync + 'static,
728 {
729 Effect {
730 inner: Arc::new(move |env, ctx| {
731 let effect = self.clone();
732 let cleanup = cleanup.clone();
733 Box::pin(async move {
734 let env_for_cleanup = R2::from(env.value.clone());
735 let ctx_for_finalizer = ctx.clone();
736
737 let finalizer = move |exit: ScopeExit| {
738 let cleanup = cleanup.clone();
739 let env = env_for_cleanup.clone();
740 let ctx = ctx_for_finalizer.clone();
741 async move {
742 if let ScopeExit::Interrupt = exit {
743 let _ = (cleanup().inner)(EnvRef { value: env }, ctx).await;
744 }
745 }
746 .boxed()
747 };
748
749 ctx.scope.add_finalizer(finalizer).await;
750 (effect.inner)(env, ctx).await
751 })
752 }),
753 }
754 }
755
756 pub fn acquire_release<F, R2, E2, X>(self, release: F) -> Effect<R, E, A>
759 where
760 F: FnOnce(A, ScopeExit) -> Effect<R2, E2, X> + Send + Sync + 'static + Clone,
761 R: Clone + Send + Sync + 'static,
762 R2: From<R> + Send + Sync + 'static + Clone,
763 E: Send + Sync + 'static,
764 A: Send + Sync + Clone + 'static,
765 X: Send + Sync + 'static,
766 E2: Send + Sync + 'static,
767 {
768 Effect {
769 inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
770 let acquire = self.clone();
771 let release = release.clone();
772 let env_for_release = R2::from(env.value.clone());
773 Box::pin(async move {
774 let ctx_clone = ctx.clone();
775 let finalizer_env = env_for_release.clone();
776
777 let result: Exit<E, A> = (acquire.inner)(env.clone(), ctx.clone()).await;
778
779 if let Exit::Success(a) = &result {
780 let a_for_release = a.clone();
781 let release = release.clone();
782
783 let finalizer = move |exit| {
784 let release_effect = release(a_for_release, exit);
785 async move {
786 let _ = (release_effect.inner)(
787 EnvRef {
788 value: finalizer_env,
789 },
790 ctx_clone,
791 )
792 .await;
793 }
794 .boxed()
795 };
796 ctx.scope.add_finalizer(finalizer).await;
797 }
798
799 result
800 })
801 }),
802 }
803 }
804
805 pub fn fork(self) -> Effect<R, E, Fiber<E, A>>
807 where
808 R: Clone + Send + Sync + 'static,
809 E: Send + Sync + Clone + 'static,
810 A: Send + Sync + Clone + 'static,
811 {
812 Effect {
813 inner: Arc::new(move |env, ctx| {
814 let effect = self.clone();
815 let locals = ctx.locals.clone();
816 Box::pin(async move {
817 let child_token = CancellationToken::new();
819 let child_scope = ScopeHandle::new();
823
824 let child_ctx = Ctx {
825 token: child_token.clone(),
826 scope: child_scope.clone(),
827 fiber_id: FiberId(0), locals: locals.clone(), clock: ctx.clock.clone(), };
831
832 let env_for_child = EnvRef {
833 value: env.value.clone(),
834 };
835
836 let fut = async move {
837 let result = tokio::select! {
838 res = (effect.inner)(env_for_child, child_ctx.clone()) => res,
839 _ = child_ctx.token.cancelled() => Exit::Failure(Cause::Interrupt),
840 };
841
842 let scope_exit = match &result {
844 Exit::Success(_) => ScopeExit::Success,
845 Exit::Failure(Cause::Interrupt) => ScopeExit::Interrupt,
846 Exit::Failure(_) => ScopeExit::Failure,
847 };
848 child_ctx.scope.close(scope_exit).await;
849
850 result
851 };
852
853 let future = fut.boxed().shared();
855
856 let fiber = Fiber {
857 id: FiberId(0),
858 join_future: future,
859 token: child_token,
860 };
861
862 Exit::Success(fiber)
863 })
864 }),
865 }
866 }
867
868 pub fn zip_par<B>(self, other: Effect<R, E, B>) -> Effect<R, E, (A, B)>
871 where
872 R: Clone + Send + Sync + 'static,
873 E: Send + Sync + Clone + 'static,
874 A: Send + Sync + Clone + 'static,
875 B: Send + Sync + Clone + 'static,
876 {
877 self.fork().flat_map(move |f1: Fiber<E, A>| {
878 other.clone().fork().flat_map(move |f2: Fiber<E, B>| {
879 Effect::async_effect(move || async move {
880 let f1a = f1.clone();
881 let f2a = f2.clone();
882
883 tokio::select! {
884 e1 = f1a.join() => {
885 match e1 {
886 Exit::Success(a) => {
887 match f2.join().await {
888 Exit::Success(b) => Exit::Success((a, b)),
889 Exit::Failure(c) => Exit::Failure(c),
890 }
891 }
892 Exit::Failure(c) => {
893 let _ = f2.interrupt().await;
894 Exit::Failure(c)
895 }
896 }
897 }
898 e2 = f2a.join() => {
899 match e2 {
900 Exit::Success(b) => {
901 match f1.join().await {
902 Exit::Success(a) => Exit::Success((a, b)),
903 Exit::Failure(c) => Exit::Failure(c),
904 }
905 }
906 Exit::Failure(c) => {
907 let _ = f1.interrupt().await;
908 Exit::Failure(c)
909 }
910 }
911 }
912 }
913 })
914 .flat_map(Effect::done)
915 })
916 })
917 }
918
919 pub fn race(self, other: Effect<R, E, A>) -> Effect<R, E, A>
920 where
921 R: Clone + Send + Sync + 'static,
922 E: Send + Sync + Clone + 'static,
923 A: Send + Sync + Clone + 'static,
924 {
925 self.fork().flat_map(move |f1: Fiber<E, A>| {
926 other.clone().fork().flat_map(move |f2: Fiber<E, A>| {
927 Effect::async_effect(move || async move {
928 let f1a = f1.clone();
929 let f2a = f2.clone();
930
931 tokio::select! {
932 e1 = f1a.join() => {
933 let _ = f2.interrupt().await;
934 e1
935 }
936 e2 = f2a.join() => {
937 let _ = f1.interrupt().await;
938 e2
939 }
940 }
941 })
942 .flat_map(Effect::done)
943 })
944 })
945 }
946
947 pub fn collect_all_par<I>(effects: I) -> Effect<R, E, Vec<A>>
948 where
949 I: IntoIterator<Item = Effect<R, E, A>>,
950 I::IntoIter: Send,
951 R: Clone + Send + Sync + 'static,
952 E: Send + Sync + Clone + 'static,
953 A: Send + Sync + Clone + 'static,
954 {
955 let effects: Vec<_> = effects.into_iter().collect();
956 effects
960 .into_iter()
961 .fold(Effect::<R, E, Vec<A>>::succeed(Vec::new()), |acc, eff| {
962 acc.zip_par(eff).map(|(mut list, item): (Vec<A>, A)| {
963 list.push(item);
964 list
965 })
966 })
967 }
968}