1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//! An effectful computation with some effects handled

use super::{coproduct::Either, Context, Continue, Effectful, Poll};

use std::marker::PhantomData;
use std::pin::Pin;

#[derive(Debug)]
pub enum HandlerArgument<T, E> {
    Done(T),
    Effect(E),
}

/// An effectful computation with some effects handled
#[derive(Debug)]
pub struct Handled<C, H, HC, E, I> {
    source: C,
    handler: H,
    handler_stack: Vec<Box<HC>>,
    state: ActiveComputation,
    phantom: PhantomData<(E, I)>,
}

impl<C, H, HC, E, I> Handled<C, H, HC, E, I> {
    pub(crate) fn new(source: C, handler: H) -> Self {
        Handled {
            source,
            handler,
            handler_stack: vec![],
            state: ActiveComputation::Source,
            phantom: PhantomData,
        }
    }
}

#[derive(Debug)]
enum ActiveComputation {
    Source,
    Handler,
}

impl<C, Output, Effect, H, HC, HandledEffect, NewOutput, NewEffect, I> Effectful
    for Handled<C, H, HC, HandledEffect, I>
where
    C: Effectful<Output = Output, Effect = Effect>,
    H: FnMut(HandlerArgument<Output, HandledEffect>) -> HC,
    HC: Effectful<Output = NewOutput, Effect = Either<Continue<Output>, NewEffect>>,
    Effect: super::coproduct::Subset<HandledEffect, I, Remainder = NewEffect>,
{
    type Output = NewOutput;
    type Effect = NewEffect;

    // I'm not sure if this inline improves performance;
    // this method is much larger than I expected
    #[inline]
    fn poll(mut self: Pin<&mut Self>, cx: &Context) -> Poll<Self::Output, Self::Effect> {
        use Poll::*;

        // TODO: verify soundness
        unsafe {
            let this = self.as_mut().get_unchecked_mut();
            loop {
                match &mut this.state {
                    ActiveComputation::Source => {
                        match Pin::new_unchecked(&mut this.source).poll(cx) {
                            Done(v) => {
                                this.state = ActiveComputation::Handler;
                                if this.handler_stack.is_empty() {
                                    let comp = (this.handler)(HandlerArgument::Done(v));
                                    this.handler_stack.push(Box::new(comp));
                                } else {
                                    // fulfill Continue<Output> effect
                                    cx.set(v);
                                }
                            }
                            Effect(e) => match e.subset() {
                                Ok(e) => {
                                    this.state = ActiveComputation::Handler;
                                    let comp = (this.handler)(HandlerArgument::Effect(e));
                                    this.handler_stack.push(Box::new(comp));
                                }
                                Err(rem) => return Effect(rem),
                            },
                            Pending => return Pending,
                        }
                    }
                    ActiveComputation::Handler => {
                        let handler = this.handler_stack.last_mut().unwrap();
                        match Pin::new_unchecked(&mut **handler).poll(cx) {
                            Done(v) => {
                                this.handler_stack.pop();

                                // the last handler
                                if this.handler_stack.is_empty() {
                                    return Done(v);
                                } else {
                                    cx.set(v);
                                }
                            }
                            Effect(Either::A(_, _)) => {
                                // continue the original computation
                                this.state = ActiveComputation::Source;

                                // if the handler has already waken the task, continue the computation
                                // otherwise, wait until wake() is called
                                if !cx.contains() {
                                    return Pending;
                                }
                            }
                            Effect(Either::B(e)) => return Effect(e),
                            Pending => return Pending,
                        }
                    }
                }
            }
        }
    }
}