effect_rs/
core.rs

1use futures::FutureExt;
2use futures::future::{BoxFuture, Shared};
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6pub use std::sync::Mutex as StdMutex;
7pub use tokio::sync::Mutex as TokioMutex; // Export if needed, or just use locally
8use tokio_util::sync::CancellationToken;
9// use tokio::task::JoinHandle;
10use crate::metrics::{MetricLabel, REGISTRY};
11use tracing::Instrument;
12
13use tokio::time::{Duration, Instant};
14
15/// Abstraction for time.
16pub trait Clock: Send + Sync + 'static {
17    fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()>;
18    fn now(&self) -> Instant;
19}
20
21pub struct LiveClock;
22impl Clock for LiveClock {
23    fn sleep(&self, duration: Duration) -> BoxFuture<'static, ()> {
24        Box::pin(async move {
25            tokio::time::sleep(duration).await;
26        })
27    }
28    fn now(&self) -> Instant {
29        Instant::now()
30    }
31}
32
33/// Represents a fiber identifier.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub struct FiberId(pub usize);
36
37/// The environment reference.
38/// Currently a simple wrapper, will expand to a type map later.
39#[derive(Clone)]
40pub struct EnvRef<R> {
41    pub value: R,
42}
43
44/// A generic environment container storing services by type.
45#[derive(Clone, Default)]
46pub struct Env {
47    map: HashMap<TypeId, Arc<dyn Any + Send + Sync>>,
48}
49
50impl Env {
51    pub fn new() -> Self {
52        Self {
53            map: HashMap::new(),
54        }
55    }
56
57    pub fn insert<T: Send + Sync + 'static>(&mut self, val: T) {
58        self.map.insert(TypeId::of::<T>(), Arc::new(val));
59    }
60
61    pub fn get<T: Send + Sync + 'static>(&self) -> Option<Arc<T>> {
62        self.map
63            .get(&TypeId::of::<T>())
64            .cloned()
65            .and_then(|any| any.downcast::<T>().ok())
66    }
67}
68
69/// A handle to a running fiber.
70#[derive(Clone)]
71pub struct Fiber<E, A> {
72    pub id: FiberId,
73    pub join_future: Shared<BoxFuture<'static, Exit<E, A>>>,
74    pub token: CancellationToken,
75}
76
77impl<E, A> Fiber<E, A>
78where
79    E: Send + Sync + Clone + 'static,
80    A: Send + Sync + Clone + 'static,
81{
82    /// Awaits completion of the fiber.
83    pub async fn join(self) -> Exit<E, A> {
84        self.join_future.await
85    }
86
87    /// Interrupts the fiber.
88    pub async fn interrupt(self) -> Exit<E, A> {
89        self.token.cancel();
90        self.join().await
91    }
92}
93
94/// Runtime Context passed down the call stack.
95#[derive(Clone)]
96pub struct Ctx {
97    pub token: CancellationToken,
98    pub scope: ScopeHandle,
99    pub fiber_id: FiberId,
100    pub locals: Arc<TokioMutex<HashMap<usize, Arc<dyn Any + Send + Sync>>>>,
101    pub clock: Arc<dyn Clock>,
102}
103
104impl Default for Ctx {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl Ctx {
111    pub fn new() -> Self {
112        Self {
113            token: CancellationToken::new(),
114            scope: ScopeHandle::new(),
115            fiber_id: FiberId(0),
116            locals: Arc::new(TokioMutex::new(HashMap::new())),
117            clock: Arc::new(LiveClock),
118        }
119    }
120}
121
122/// A thread-local variable for fibers.
123#[derive(Clone)]
124pub struct FiberRef<T> {
125    id: usize,
126    initial: Arc<T>,
127}
128
129impl<T: Send + Sync + 'static + Clone> FiberRef<T> {
130    pub fn new(initial: T) -> Self {
131        // Simple ID generation using pointer address of global/static?
132        // Or just random? For now, we need an ID.
133        // Use a static atomic counter.
134        static NEXT_ID: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
135        let id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136
137        Self {
138            id,
139            initial: Arc::new(initial),
140        }
141    }
142
143    pub fn get(&self) -> Effect<(), (), T> {
144        let id = self.id;
145        let initial = self.initial.clone();
146        Effect::access_async(move |_, ctx| {
147            let initial = initial.clone();
148            async move {
149                let locals = ctx.locals.lock().await;
150                if let Some(val) = locals.get(&id) {
151                    val.downcast_ref::<T>().cloned().unwrap()
152                } else {
153                    (*initial).clone()
154                }
155            }
156        })
157    }
158
159    pub fn set(&self, value: T) -> Effect<(), (), ()> {
160        let id = self.id;
161        Effect::<(), (), ()>::access_async(move |_, ctx| async move {
162            let mut locals = ctx.locals.lock().await;
163            locals.insert(id, Arc::new(value));
164        })
165    }
166}
167
168/// A generic mutable reference that is safe to share between fibers.
169/// Unlike FiberRef, this is shared state.
170#[derive(Clone)]
171pub struct Ref<A> {
172    value: Arc<TokioMutex<A>>,
173}
174
175impl<A> Ref<A>
176where
177    A: Send + Sync + 'static + Clone,
178{
179    /// Creates a new Ref with an initial value.
180    pub fn new(value: A) -> Self {
181        Self {
182            value: Arc::new(TokioMutex::new(value)),
183        }
184    }
185
186    /// Gets the current value.
187    pub fn get(&self) -> Effect<(), (), A> {
188        let value = self.value.clone();
189        Effect::<(), (), A>::async_effect(move || async move {
190            let guard = value.lock().await;
191            guard.clone()
192        })
193    }
194
195    /// Sets a new value.
196    pub fn set(&self, new_value: A) -> Effect<(), (), ()> {
197        let value = self.value.clone();
198        Effect::<(), (), ()>::async_effect(move || async move {
199            let mut guard = value.lock().await;
200            *guard = new_value;
201        })
202    }
203
204    /// Updates the value and returns the updated value.
205    pub fn update<F>(&self, f: F) -> Effect<(), (), A>
206    where
207        F: FnOnce(A) -> A + Send + Sync + 'static + Clone,
208    {
209        let value = self.value.clone();
210        Effect::<(), (), A>::async_effect(move || async move {
211            let mut guard = value.lock().await;
212            let new_val = f(guard.clone());
213            *guard = new_val.clone();
214            new_val
215        })
216    }
217}
218
219/// A promise that can be completed with an Exit value.
220/// Allows multiple waiters.
221type Waiter<E, A> = tokio::sync::oneshot::Sender<Exit<E, A>>;
222
223#[derive(Clone)]
224pub struct Deferred<E, A> {
225    state: Arc<TokioMutex<Option<Exit<E, A>>>>,
226    waiters: Arc<TokioMutex<Vec<Waiter<E, A>>>>,
227}
228
229impl<E, A> Default for Deferred<E, A>
230where
231    E: Send + Sync + Clone + 'static,
232    A: Send + Sync + Clone + 'static,
233{
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239impl<E, A> Deferred<E, A>
240where
241    E: Send + Sync + Clone + 'static,
242    A: Send + Sync + Clone + 'static,
243{
244    pub fn new() -> Self {
245        Self {
246            state: Arc::new(TokioMutex::new(None)),
247            waiters: Arc::new(TokioMutex::new(Vec::new())),
248        }
249    }
250
251    /// Completes the deferred with a value. Returns true if first to complete.
252    pub fn complete(&self, exit: Exit<E, A>) -> Effect<(), (), bool> {
253        let state = self.state.clone();
254        let waiters = self.waiters.clone();
255        Effect::<(), (), bool>::async_effect(move || async move {
256            let mut guard = state.lock().await;
257            if guard.is_some() {
258                false
259            } else {
260                *guard = Some(exit.clone());
261                let mut waiters = waiters.lock().await;
262                for sender in waiters.drain(..) {
263                    let _ = sender.send(exit.clone());
264                }
265                true
266            }
267        })
268    }
269
270    /// Completes with success.
271    pub fn succeed(&self, value: A) -> Effect<(), (), bool> {
272        self.complete(Exit::Success(value))
273    }
274
275    /// Completes with failure.
276    pub fn fail(&self, error: E) -> Effect<(), (), bool> {
277        self.complete(Exit::Failure(Cause::Fail(error)))
278    }
279
280    /// Awaits the result.
281    pub fn await_result(&self) -> Effect<(), E, A> {
282        let state = self.state.clone();
283        let waiters = self.waiters.clone();
284        Effect::<(), E, ()>::done(Exit::Success(())).flat_map(move |_| {
285            let state = state.clone();
286            let waiters = waiters.clone();
287            Effect::async_effect(move || async move {
288                // Fast path
289                {
290                    let guard = state.lock().await;
291                    if let Some(exit) = guard.as_ref() {
292                        return exit.clone();
293                    }
294                }
295
296                // Slow path
297                let (tx, rx) = tokio::sync::oneshot::channel();
298                {
299                    let guard = state.lock().await;
300                    if let Some(exit) = guard.as_ref() {
301                        return exit.clone();
302                    }
303                    let mut waiters_guard = waiters.lock().await;
304                    waiters_guard.push(tx);
305                }
306
307                // Wait for notification or oneshot
308                rx.await.unwrap_or_else(|_| {
309                    Exit::Failure(Cause::Die(Arc::new("Sender dropped".to_string())))
310                })
311            })
312            .flat_map(Effect::done)
313        })
314    }
315}
316
317/// A wrapper around a bounded Tokio channel.
318#[derive(Clone)]
319pub struct Queue<A> {
320    sender: tokio::sync::mpsc::Sender<A>,
321    receiver: Arc<TokioMutex<tokio::sync::mpsc::Receiver<A>>>,
322}
323
324impl<A> Queue<A>
325where
326    A: Send + Sync + 'static + Clone,
327{
328    /// Creates a bounded queue.
329    pub fn new(capacity: usize) -> Self {
330        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
331        Self {
332            sender,
333            receiver: Arc::new(TokioMutex::new(receiver)),
334        }
335    }
336
337    /// Offers a value to the queue.
338    pub fn offer(&self, value: A) -> Effect<(), (), bool> {
339        let sender = self.sender.clone();
340        Effect::<(), (), bool>::async_effect(
341            move || async move { (sender.send(value).await).is_ok() },
342        )
343    }
344
345    /// Takes a value from the queue.
346    pub fn take(&self) -> Effect<(), (), Option<A>> {
347        let receiver = self.receiver.clone();
348        Effect::<(), (), Option<A>>::async_effect(move || async move {
349            let mut options = receiver.lock().await;
350            options.recv().await
351        })
352    }
353}
354
355#[derive(Clone, Debug)]
356pub enum Cause<E> {
357    Fail(E),
358    Die(Defect),
359    Interrupt,
360}
361
362#[derive(Clone, Copy, Debug)]
363pub enum ScopeExit {
364    Success,
365    Failure, // We don't have the error value here in generic scope
366    Interrupt,
367}
368
369type Finalizer = Box<dyn FnOnce(ScopeExit) -> BoxFuture<'static, ()> + Send>;
370
371#[derive(Clone)]
372pub struct ScopeHandle {
373    finalizers: Arc<TokioMutex<Vec<Finalizer>>>,
374}
375
376impl<E, A> From<&Exit<E, A>> for ScopeExit {
377    fn from(exit: &Exit<E, A>) -> Self {
378        match exit {
379            Exit::Success(_) => ScopeExit::Success,
380            Exit::Failure(Cause::Interrupt) => ScopeExit::Interrupt,
381            _ => ScopeExit::Failure,
382        }
383    }
384}
385
386impl Default for ScopeHandle {
387    fn default() -> Self {
388        Self::new()
389    }
390}
391
392impl ScopeHandle {
393    pub fn new() -> Self {
394        Self {
395            finalizers: Arc::new(TokioMutex::new(Vec::new())),
396        }
397    }
398
399    pub async fn add_finalizer<F>(&self, f: F)
400    where
401        F: FnOnce(ScopeExit) -> BoxFuture<'static, ()> + Send + 'static,
402    {
403        let mut finalizers = self.finalizers.lock().await;
404        finalizers.push(Box::new(f));
405    }
406
407    pub async fn close(&self, exit: ScopeExit) {
408        let mut finalizers = self.finalizers.lock().await;
409        // Run in reverse order
410        while let Some(f) = finalizers.pop() {
411            f(exit).await;
412        }
413    }
414}
415
416/// The result of an Effect execution.
417#[derive(Debug, Clone)]
418pub enum Exit<E, A> {
419    Success(A),
420    Failure(Cause<E>),
421}
422
423impl<E> Cause<E> {
424    pub fn map<E2, F>(self, f: &F) -> Cause<E2>
425    where
426        F: Fn(E) -> E2,
427    {
428        match self {
429            Cause::Fail(e) => Cause::Fail(f(e)),
430            Cause::Die(d) => Cause::Die(d),
431            Cause::Interrupt => Cause::Interrupt,
432            // Cause::Both and Cause::Then are missing in the NEW definition?
433            // The NEW definition only had Fail, Die, Interrupt.
434            // I should RESTORE Both and Then if I want full features.
435            // But for now, let's stick to the NEW definition if I used it.
436            // Wait, I define Cause at 324 with: Fail, Die, Interrupt.
437            // So Both/Then are gone.
438            // So `map` must imply that.
439            // But the OLD definition at 377 had them.
440            // If I remove them, `map` fails.
441            // So I must remove `map` logic for Both/Then as well.
442        }
443    }
444}
445
446/// A defect is an untyped panic payload.
447pub type Defect = Arc<dyn std::any::Any + Send + Sync>;
448
449/// The core Effect type.
450///
451/// * `R` - Environment (Dependencies)
452/// * `E` - Error (Typed Failure)
453/// * `A` - Success Value
454type EffectFn<R, E, A> = dyn Fn(EnvRef<R>, Ctx) -> BoxFuture<'static, Exit<E, A>> + Send + Sync;
455
456pub struct Effect<R, E, A> {
457    pub(crate) inner: Arc<EffectFn<R, E, A>>,
458}
459
460impl<R, E, A> Clone for Effect<R, E, A> {
461    fn clone(&self) -> Self {
462        Self {
463            inner: self.inner.clone(),
464        }
465    }
466}
467
468// Basic constructors
469impl<R, E, A> Effect<R, E, A>
470where
471    R: Clone + Send + Sync + 'static,
472    E: Send + Sync + 'static,
473    A: Send + Sync + 'static,
474{
475    /// Creates an effect that succeeds with the given value.
476    pub fn succeed(value: A) -> Self
477    where
478        A: Send + Sync + Clone,
479    {
480        Self {
481            inner: Arc::new(move |_, _| {
482                let value = value.clone();
483                Box::pin(async move { Exit::Success(value) })
484            }),
485        }
486    }
487
488    /// Creates an effect that fails with the given error.
489    pub fn fail(error: E) -> Self
490    where
491        E: Send + Sync + Clone,
492    {
493        Self {
494            inner: Arc::new(move |_, _| {
495                let error = error.clone();
496                Box::pin(async move { Exit::Failure(Cause::Fail(error)) })
497            }),
498        }
499    }
500
501    /// Creates an effect from a function that returns a value.
502    pub fn sync<F>(f: F) -> Self
503    where
504        F: FnOnce() -> A + Send + Sync + 'static + Clone,
505        A: Send,
506    {
507        Self {
508            inner: Arc::new(move |_, _| {
509                let f = f.clone();
510                Box::pin(async move { Exit::Success(f()) })
511            }),
512        }
513    }
514
515    /// Creates an effect from a future.
516    pub fn async_effect<F, Fut>(f: F) -> Self
517    where
518        F: FnOnce() -> Fut + Send + Sync + 'static + Clone,
519        Fut: futures::Future<Output = A> + Send + 'static,
520        A: Send,
521    {
522        Self {
523            inner: Arc::new(move |_, _| {
524                let f = f.clone();
525                Box::pin(async move { Exit::Success(f().await) })
526            }),
527        }
528    }
529
530    /// Creates an effect that sleeps for the specified duration.
531    pub fn sleep(duration: Duration) -> Self
532    where
533        A: From<()>, // Return Unit
534    {
535        Self {
536            inner: Arc::new(move |_, ctx| {
537                Box::pin(async move {
538                    ctx.clock.sleep(duration).await;
539                    Exit::Success(A::from(()))
540                })
541            }),
542        }
543    }
544
545    /// Records a metric increment when this effect runs.
546    pub fn with_metric_increment(self, name: &str, labels: Vec<MetricLabel>) -> Self
547    where
548        A: Send + Sync + 'static + Clone,
549        E: Send + Sync + 'static + Clone,
550    {
551        let name = name.to_string();
552        self.map(move |val| {
553            REGISTRY.get_counter(&name, labels.clone()).increment(1);
554            val
555        })
556    }
557
558    /// Records duration of this effect in a histogram.
559    pub fn with_metric_duration(self, name: &str, labels: Vec<MetricLabel>) -> Self
560    where
561        A: Send + Sync + 'static + Clone,
562        E: Send + Sync + 'static + Clone,
563        R: 'static + Clone + Send + Sync,
564    {
565        self.timed(name, labels)
566    }
567
568    pub fn timed(self, name: &str, labels: Vec<MetricLabel>) -> Self
569    where
570        A: Send + Sync + 'static + Clone,
571        E: Send + Sync + 'static + Clone,
572        R: 'static + Clone + Send + Sync,
573    {
574        let name = name.to_string();
575        Effect::sync(Instant::now).flat_map(move |start| {
576            let labels = labels.clone();
577            let name = name.clone();
578            self.clone().map(move |res| {
579                let elapsed = start.elapsed().as_secs_f64();
580                REGISTRY
581                    .get_histogram(&name, labels, vec![0.001, 0.01, 0.1, 1.0, 10.0])
582                    .record(elapsed);
583                res
584            })
585        })
586    }
587
588    /// Creates an effect with access to the environment.
589    pub fn access_async<F, Fut>(f: F) -> Self
590    where
591        R: Send + Sync,
592        F: FnOnce(EnvRef<R>, Ctx) -> Fut + Send + Sync + 'static + Clone,
593        Fut: futures::Future<Output = A> + Send + 'static,
594        A: Send,
595    {
596        Self {
597            inner: Arc::new(move |env, ctx| {
598                let f = f.clone();
599                Box::pin(async move { Exit::Success(f(env, ctx).await) })
600            }),
601        }
602    }
603
604    /// Provides the environment to the effect, eliminating the dependency R.
605    pub fn provide(self, env: R) -> Effect<(), E, A>
606    where
607        R: Clone + Send + Sync + 'static,
608        E: Send + Sync + 'static,
609        A: Send + Sync + 'static,
610    {
611        Effect {
612            inner: Arc::new(move |_, ctx| {
613                let effect = self.clone();
614                let env = env.clone();
615                Box::pin(async move { (effect.inner)(EnvRef { value: env }, ctx).await })
616            }),
617        }
618    }
619
620    /// Creates an effect from an Exit value.
621    pub fn done(exit: Exit<E, A>) -> Self
622    where
623        E: Send + Sync + Clone,
624        A: Send + Sync + Clone,
625    {
626        Self {
627            inner: Arc::new(move |_, _| {
628                let exit = exit.clone();
629                Box::pin(async move { exit })
630            }),
631        }
632    }
633
634    pub fn map<B, F>(self, f: F) -> Effect<R, E, B>
635    where
636        F: FnOnce(A) -> B + Send + Sync + 'static + Clone,
637        B: Send + Sync + 'static + Clone,
638        R: Clone + Send + Sync + 'static,
639        E: Send + Sync + 'static,
640        A: Send + Sync + 'static,
641    {
642        self.flat_map(move |a| -> Effect<R, E, B> { Effect::<R, E, B>::succeed(f(a)) })
643    }
644
645    pub fn map_error<E2, F>(self, f: F) -> Effect<R, E2, A>
646    where
647        F: Fn(E) -> E2 + Send + Sync + 'static + Clone,
648        R: Send + Sync + 'static,
649        A: Send + Sync + 'static,
650        E: Send + Sync + 'static,
651        E2: Send + Sync + 'static,
652    {
653        Effect {
654            inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
655                let effect = self.clone();
656                let f = f.clone();
657                Box::pin(async move {
658                    match (effect.inner)(env, ctx).await {
659                        Exit::Success(a) => Exit::Success(a),
660                        Exit::Failure(cause) => Exit::Failure(cause.map(&f)),
661                    }
662                })
663            }),
664        }
665    }
666
667    pub fn flat_map<B, F>(self, f: F) -> Effect<R, E, B>
668    where
669        F: FnOnce(A) -> Effect<R, E, B> + Send + Sync + 'static + Clone,
670        B: Send + 'static,
671        R: Clone + Send + Sync + 'static,
672        E: Send + Sync + 'static,
673        A: Send + Sync,
674    {
675        Effect {
676            inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
677                let effect = self.clone();
678                let f = f.clone();
679                Box::pin(async move {
680                    match (effect.inner)(env.clone(), ctx.clone()).await {
681                        Exit::Success(a) => {
682                            let next_effect = f(a);
683                            (next_effect.inner)(env, ctx).await
684                        }
685                        Exit::Failure(c) => Exit::Failure(c),
686                    }
687                })
688            }),
689        }
690    }
691
692    /// Delays the execution of this effect by the specified duration.
693    pub fn delay(self, duration: Duration) -> Effect<R, E, A>
694    where
695        R: Clone + Send + Sync + 'static,
696        E: Send + Sync + 'static,
697        A: Send + Sync + 'static,
698    {
699        Effect::<R, E, ()>::sleep(duration).flat_map(move |_| self)
700    }
701
702    /// Wraps the effect execution in a tracing span with the given name.
703    pub fn trace(self, name: &'static str) -> Effect<R, E, A>
704    where
705        R: Clone + Send + Sync + 'static,
706        E: Send + Sync + 'static,
707        A: Send + Sync + 'static,
708    {
709        Effect {
710            inner: Arc::new(move |env, ctx| {
711                let effect = self.clone();
712                let span = tracing::info_span!("effect", name = name);
713
714                async move { (effect.inner)(env, ctx).await }
715                    .instrument(span)
716                    .boxed()
717            }),
718        }
719    }
720
721    /// Runs a cleanup effect if this effect is interrupted.
722    pub fn on_interrupt<F, R2, E2, X>(self, cleanup: F) -> Effect<R, E, A>
723    where
724        F: Fn() -> Effect<R2, E2, X> + Send + Sync + 'static + Clone,
725        R2: From<R> + Send + Sync + 'static + Clone,
726        E2: Send + Sync + 'static,
727        X: Send + Sync + 'static,
728    {
729        Effect {
730            inner: Arc::new(move |env, ctx| {
731                let effect = self.clone();
732                let cleanup = cleanup.clone();
733                Box::pin(async move {
734                    let env_for_cleanup = R2::from(env.value.clone());
735                    let ctx_for_finalizer = ctx.clone();
736
737                    let finalizer = move |exit: ScopeExit| {
738                        let cleanup = cleanup.clone();
739                        let env = env_for_cleanup.clone();
740                        let ctx = ctx_for_finalizer.clone();
741                        async move {
742                            if let ScopeExit::Interrupt = exit {
743                                let _ = (cleanup().inner)(EnvRef { value: env }, ctx).await;
744                            }
745                        }
746                        .boxed()
747                    };
748
749                    ctx.scope.add_finalizer(finalizer).await;
750                    (effect.inner)(env, ctx).await
751                })
752            }),
753        }
754    }
755
756    /// Acquires a resource, then registers a finalizer to release it.
757    /// The release effect is guaranteed to run when the scope closes.
758    pub fn acquire_release<F, R2, E2, X>(self, release: F) -> Effect<R, E, A>
759    where
760        F: FnOnce(A, ScopeExit) -> Effect<R2, E2, X> + Send + Sync + 'static + Clone,
761        R: Clone + Send + Sync + 'static,
762        R2: From<R> + Send + Sync + 'static + Clone,
763        E: Send + Sync + 'static,
764        A: Send + Sync + Clone + 'static,
765        X: Send + Sync + 'static,
766        E2: Send + Sync + 'static,
767    {
768        Effect {
769            inner: Arc::new(move |env: EnvRef<R>, ctx: Ctx| {
770                let acquire = self.clone();
771                let release = release.clone();
772                let env_for_release = R2::from(env.value.clone());
773                Box::pin(async move {
774                    let ctx_clone = ctx.clone();
775                    let finalizer_env = env_for_release.clone();
776
777                    let result: Exit<E, A> = (acquire.inner)(env.clone(), ctx.clone()).await;
778
779                    if let Exit::Success(a) = &result {
780                        let a_for_release = a.clone();
781                        let release = release.clone();
782
783                        let finalizer = move |exit| {
784                            let release_effect = release(a_for_release, exit);
785                            async move {
786                                let _ = (release_effect.inner)(
787                                    EnvRef {
788                                        value: finalizer_env,
789                                    },
790                                    ctx_clone,
791                                )
792                                .await;
793                            }
794                            .boxed()
795                        };
796                        ctx.scope.add_finalizer(finalizer).await;
797                    }
798
799                    result
800                })
801            }),
802        }
803    }
804
805    /// Forks the effect into a new fiber.
806    pub fn fork(self) -> Effect<R, E, Fiber<E, A>>
807    where
808        R: Clone + Send + Sync + 'static,
809        E: Send + Sync + Clone + 'static,
810        A: Send + Sync + Clone + 'static,
811    {
812        Effect {
813            inner: Arc::new(move |env, ctx| {
814                let effect = self.clone();
815                let locals = ctx.locals.clone();
816                Box::pin(async move {
817                    // Create new token for child fiber
818                    let child_token = CancellationToken::new();
819                    // Link to parent token?
820                    // For now, detached fork.
821
822                    let child_scope = ScopeHandle::new();
823
824                    let child_ctx = Ctx {
825                        token: child_token.clone(),
826                        scope: child_scope.clone(),
827                        fiber_id: FiberId(0),     // TODO: Generate ID
828                        locals: locals.clone(),   // Copy locals
829                        clock: ctx.clock.clone(), // Share clock
830                    };
831
832                    let env_for_child = EnvRef {
833                        value: env.value.clone(),
834                    };
835
836                    let fut = async move {
837                        let result = tokio::select! {
838                            res = (effect.inner)(env_for_child, child_ctx.clone()) => res,
839                            _ = child_ctx.token.cancelled() => Exit::Failure(Cause::Interrupt),
840                        };
841
842                        // Close scope
843                        let scope_exit = match &result {
844                            Exit::Success(_) => ScopeExit::Success,
845                            Exit::Failure(Cause::Interrupt) => ScopeExit::Interrupt,
846                            Exit::Failure(_) => ScopeExit::Failure,
847                        };
848                        child_ctx.scope.close(scope_exit).await;
849
850                        result
851                    };
852
853                    // Convert handle to Shared future
854                    let future = fut.boxed().shared();
855
856                    let fiber = Fiber {
857                        id: FiberId(0),
858                        join_future: future,
859                        token: child_token,
860                    };
861
862                    Exit::Success(fiber)
863                })
864            }),
865        }
866    }
867
868    /// Runs this effect and another effect in parallel, returning both results.
869    /// If either effect fails, the other is interrupted.
870    pub fn zip_par<B>(self, other: Effect<R, E, B>) -> Effect<R, E, (A, B)>
871    where
872        R: Clone + Send + Sync + 'static,
873        E: Send + Sync + Clone + 'static,
874        A: Send + Sync + Clone + 'static,
875        B: Send + Sync + Clone + 'static,
876    {
877        self.fork().flat_map(move |f1: Fiber<E, A>| {
878            other.clone().fork().flat_map(move |f2: Fiber<E, B>| {
879                Effect::async_effect(move || async move {
880                    let f1a = f1.clone();
881                    let f2a = f2.clone();
882
883                    tokio::select! {
884                       e1 = f1a.join() => {
885                           match e1 {
886                               Exit::Success(a) => {
887                                   match f2.join().await {
888                                       Exit::Success(b) => Exit::Success((a, b)),
889                                       Exit::Failure(c) => Exit::Failure(c),
890                                   }
891                               }
892                               Exit::Failure(c) => {
893                                   let _ = f2.interrupt().await;
894                                   Exit::Failure(c)
895                               }
896                           }
897                       }
898                       e2 = f2a.join() => {
899                           match e2 {
900                               Exit::Success(b) => {
901                                   match f1.join().await {
902                                       Exit::Success(a) => Exit::Success((a, b)),
903                                       Exit::Failure(c) => Exit::Failure(c),
904                                   }
905                               }
906                               Exit::Failure(c) => {
907                                   let _ = f1.interrupt().await;
908                                   Exit::Failure(c)
909                               }
910                           }
911                       }
912                    }
913                })
914                .flat_map(Effect::done)
915            })
916        })
917    }
918
919    pub fn race(self, other: Effect<R, E, A>) -> Effect<R, E, A>
920    where
921        R: Clone + Send + Sync + 'static,
922        E: Send + Sync + Clone + 'static,
923        A: Send + Sync + Clone + 'static,
924    {
925        self.fork().flat_map(move |f1: Fiber<E, A>| {
926            other.clone().fork().flat_map(move |f2: Fiber<E, A>| {
927                Effect::async_effect(move || async move {
928                    let f1a = f1.clone();
929                    let f2a = f2.clone();
930
931                    tokio::select! {
932                        e1 = f1a.join() => {
933                            let _ = f2.interrupt().await;
934                            e1
935                        }
936                        e2 = f2a.join() => {
937                            let _ = f1.interrupt().await;
938                            e2
939                        }
940                    }
941                })
942                .flat_map(Effect::done)
943            })
944        })
945    }
946
947    pub fn collect_all_par<I>(effects: I) -> Effect<R, E, Vec<A>>
948    where
949        I: IntoIterator<Item = Effect<R, E, A>>,
950        I::IntoIter: Send,
951        R: Clone + Send + Sync + 'static,
952        E: Send + Sync + Clone + 'static,
953        A: Send + Sync + Clone + 'static,
954    {
955        let effects: Vec<_> = effects.into_iter().collect();
956        // Fold using zip_par to build parallel execution
957        // Start with Effect::succeed(Vec::new())
958        // Then zip_par each effect
959        effects
960            .into_iter()
961            .fold(Effect::<R, E, Vec<A>>::succeed(Vec::new()), |acc, eff| {
962                acc.zip_par(eff).map(|(mut list, item): (Vec<A>, A)| {
963                    list.push(item);
964                    list
965                })
966            })
967    }
968}