#![allow(deprecated)]
use crate::runtime::execution::ExecutionState;
use generator::{Generator, Gn};
use scoped_tls::scoped_thread_local;
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::ops::Deref;
use std::ops::DerefMut;
use std::rc::Rc;
scoped_thread_local! {
pub(crate) static CONTINUATION_POOL: ContinuationPool
}
pub(crate) struct Continuation {
generator: Generator<'static, ContinuationInput, ContinuationOutput>,
function: ContinuationFunction,
state: ContinuationState,
}
#[allow(clippy::type_complexity)]
#[derive(Clone)]
struct ContinuationFunction(Rc<Cell<Option<Box<dyn FnOnce() + Send>>>>);
unsafe impl Send for ContinuationFunction {}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ContinuationInput {
Resume,
Exit,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ContinuationOutput {
Yielded,
Finished,
Exited,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum ContinuationState {
NotReady, Ready, Running, }
impl Continuation {
pub fn new(stack_size: usize) -> Self {
let function = ContinuationFunction(Rc::new(Cell::new(None)));
let mut gen = {
let function = function.clone();
Gn::new_opt(stack_size, move || {
let _ = &function;
loop {
match generator::yield_(ContinuationOutput::Finished) {
None | Some(ContinuationInput::Exit) => break,
_ => (),
}
let f = function.0.take().expect("must have a function to run");
f();
}
ContinuationOutput::Exited
})
};
let ret = gen.resume().unwrap();
debug_assert_eq!(ret, ContinuationOutput::Finished);
Self {
generator: gen,
function,
state: ContinuationState::NotReady,
}
}
pub fn initialize(&mut self, fun: Box<dyn FnOnce() + Send>) {
debug_assert_eq!(
self.state,
ContinuationState::NotReady,
"shouldn't replace a function before it runs"
);
let old = self.function.0.replace(Some(fun));
debug_assert!(old.is_none(), "shouldn't replace a function before it runs");
self.state = ContinuationState::Ready;
}
pub fn resume(&mut self) -> bool {
debug_assert!(self.state == ContinuationState::Ready || self.state == ContinuationState::Running);
let ret = self.resume_with_input(ContinuationInput::Resume);
debug_assert_ne!(
ret,
ContinuationOutput::Exited,
"continuation should not exit if resumed from user code"
);
ret == ContinuationOutput::Finished
}
fn resume_with_input(&mut self, input: ContinuationInput) -> ContinuationOutput {
self.generator.set_para(input);
let ret = self.generator.resume().unwrap();
if ret == ContinuationOutput::Finished {
self.state = ContinuationState::NotReady;
}
ret
}
fn reusable(&self) -> bool {
self.state == ContinuationState::NotReady
}
}
impl Drop for Continuation {
fn drop(&mut self) {
if self.reusable() {
let ret = self.resume_with_input(ContinuationInput::Exit);
debug_assert_eq!(ret, ContinuationOutput::Exited);
}
}
}
pub(crate) struct ContinuationPool {
continuations: Rc<RefCell<VecDeque<Continuation>>>,
}
impl ContinuationPool {
pub fn new() -> Self {
Self {
continuations: Rc::new(RefCell::new(VecDeque::new())),
}
}
pub fn acquire(stack_size: usize) -> PooledContinuation {
if CONTINUATION_POOL.is_set() {
CONTINUATION_POOL.with(|p| p.acquire_inner(stack_size))
} else {
let p = Self::new();
p.acquire_inner(stack_size)
}
}
fn acquire_inner(&self, stack_size: usize) -> PooledContinuation {
let continuation = self
.continuations
.borrow_mut()
.pop_front()
.unwrap_or_else(move || Continuation::new(stack_size));
PooledContinuation {
continuation: Some(continuation),
queue: self.continuations.clone(),
}
}
}
impl Drop for ContinuationPool {
fn drop(&mut self) {
for c in self.continuations.borrow_mut().iter_mut() {
c.state = ContinuationState::Running;
}
}
}
pub(crate) struct PooledContinuation {
continuation: Option<Continuation>,
queue: Rc<RefCell<VecDeque<Continuation>>>,
}
impl Drop for PooledContinuation {
fn drop(&mut self) {
let c = self.continuation.take().unwrap();
if c.reusable() {
self.queue.borrow_mut().push_back(c);
}
}
}
impl Deref for PooledContinuation {
type Target = Continuation;
fn deref(&self) -> &Self::Target {
self.continuation.as_ref().unwrap()
}
}
impl DerefMut for PooledContinuation {
fn deref_mut(&mut self) -> &mut Self::Target {
self.continuation.as_mut().unwrap()
}
}
impl std::fmt::Debug for PooledContinuation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("PooledContinuation").finish()
}
}
unsafe impl Send for PooledContinuation {}
pub(crate) fn switch() {
if ExecutionState::maybe_yield() {
let r = generator::yield_(ContinuationOutput::Yielded).unwrap();
assert!(matches!(r, ContinuationInput::Resume));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Config;
#[test]
fn reusable_continuation_drop() {
let pool = ContinuationPool::new();
let config: Config = Default::default();
let mut c = pool.acquire_inner(config.stack_size);
c.initialize(Box::new(|| {
let _ = 1 + 1;
}));
let r = c.resume();
assert!(r, "continuation only has one step");
drop(c);
assert_eq!(
pool.continuations.borrow().len(),
1,
"continuation should be reusable because the function finished"
);
let mut c = pool.acquire_inner(config.stack_size);
c.initialize(Box::new(|| {
generator::yield_with(ContinuationOutput::Yielded);
let _ = 1 + 1;
}));
let r = c.resume();
assert!(!r, "continuation yields once, shouldn't be finished yet");
drop(c);
assert_eq!(
pool.continuations.borrow().len(),
0,
"continuation should not be reusable because the function wasn't finished"
);
let c = pool.acquire_inner(config.stack_size);
drop(pool);
drop(c);
}
}