use crate::runtime::execution::ExecutionState;
use crate::runtime::storage::{AlreadyDestructedError, StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::thread;
use crate::runtime::thread::continuation::{ContinuationPool, PooledContinuation};
use crate::thread::LocalKey;
use bitvec::prelude::*;
use std::any::Any;
use std::cell::RefCell;
use std::fmt::Debug;
use std::future::Future;
use std::rc::Rc;
use std::task::{Context, Waker};
pub(crate) mod clock;
pub(crate) mod waker;
use waker::make_waker;
pub(crate) const DEFAULT_INLINE_TASKS: usize = 16;
#[derive(Debug)]
pub(crate) struct Task {
pub(super) id: TaskId,
pub(super) state: TaskState,
pub(super) detached: bool,
park_state: ParkState,
pub(super) continuation: Rc<RefCell<PooledContinuation>>,
pub(crate) clock: VectorClock,
waiter: Option<TaskId>,
waker: Waker,
woken: bool,
name: Option<String>,
local_storage: StorageMap,
}
impl Task {
fn new<F>(f: F, stack_size: usize, id: TaskId, name: Option<String>, clock: VectorClock) -> Self
where
F: FnOnce() + Send + 'static,
{
assert!(id.0 < clock.time.len());
let mut continuation = ContinuationPool::acquire(stack_size);
continuation.initialize(Box::new(f));
let waker = make_waker(id);
let continuation = Rc::new(RefCell::new(continuation));
Self {
id,
state: TaskState::Runnable,
continuation,
clock,
waiter: None,
waker,
woken: false,
detached: false,
park_state: ParkState::Unavailable,
name,
local_storage: StorageMap::new(),
}
}
pub(crate) fn from_closure<F>(f: F, stack_size: usize, id: TaskId, name: Option<String>, clock: VectorClock) -> Self
where
F: FnOnce() + Send + 'static,
{
Self::new(f, stack_size, id, name, clock)
}
pub(crate) fn from_future<F>(
future: F,
stack_size: usize,
id: TaskId,
name: Option<String>,
clock: VectorClock,
) -> Self
where
F: Future<Output = ()> + Send + 'static,
{
let mut future = Box::pin(future);
Self::new(
move || {
let waker = ExecutionState::with(|state| state.current_mut().waker());
let cx = &mut Context::from_waker(&waker);
while future.as_mut().poll(cx).is_pending() {
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
thread::switch();
}
},
stack_size,
id,
name,
clock,
)
}
pub(crate) fn id(&self) -> TaskId {
self.id
}
pub(crate) fn runnable(&self) -> bool {
self.state == TaskState::Runnable
}
pub(crate) fn blocked(&self) -> bool {
self.state == TaskState::Blocked
}
pub(crate) fn sleeping(&self) -> bool {
self.state == TaskState::Sleeping
}
pub(crate) fn finished(&self) -> bool {
self.state == TaskState::Finished
}
pub(crate) fn detach(&mut self) {
self.detached = true;
}
pub(crate) fn waker(&self) -> Waker {
self.waker.clone()
}
pub(crate) fn block(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Blocked;
}
pub(crate) fn sleep(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Sleeping;
}
pub(crate) fn unblock(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Runnable;
}
pub(crate) fn finish(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Finished;
}
pub(crate) fn sleep_unless_woken(&mut self) {
let was_woken = std::mem::replace(&mut self.woken, false);
if !was_woken {
self.sleep();
}
}
pub(super) fn wake(&mut self) {
self.woken = true;
if self.state == TaskState::Sleeping {
self.unblock();
}
}
pub(crate) fn set_waiter(&mut self, waiter: TaskId) -> bool {
assert!(
self.waiter.is_none() || self.waiter == Some(waiter),
"Task cannot have more than one waiter"
);
if self.finished() {
false
} else {
self.waiter = Some(waiter);
true
}
}
pub(crate) fn take_waiter(&mut self) -> Option<TaskId> {
self.waiter.take()
}
pub(crate) fn name(&self) -> Option<String> {
self.name.clone()
}
pub(crate) fn local<T: 'static>(&self, key: &'static LocalKey<T>) -> Option<Result<&T, AlreadyDestructedError>> {
self.local_storage.get(key.into())
}
pub(crate) fn init_local<T: 'static>(&mut self, key: &'static LocalKey<T>, value: T) {
self.local_storage.init(key.into(), value)
}
pub(crate) fn pop_local(&mut self) -> Option<Box<dyn Any>> {
self.local_storage.pop()
}
pub(crate) fn park(&mut self) -> bool {
match self.park_state {
ParkState::Unparked => {
self.park_state = ParkState::Unavailable;
false
}
ParkState::Unavailable => {
self.park_state = ParkState::Parked;
self.block();
true
}
ParkState::Parked => unreachable!("cannot park a task that's already parked"),
}
}
pub(crate) fn unpark(&mut self) {
if std::mem::replace(&mut self.park_state, ParkState::Unparked) == ParkState::Parked {
assert!(self.blocked());
self.unblock();
}
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum TaskState {
Runnable,
Blocked,
Sleeping,
Finished,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum ParkState {
Parked,
Unparked,
Unavailable,
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord, Debug)]
pub struct TaskId(pub(super) usize);
impl From<usize> for TaskId {
fn from(id: usize) -> Self {
TaskId(id)
}
}
impl From<TaskId> for usize {
fn from(tid: TaskId) -> usize {
tid.0
}
}
#[derive(PartialEq, Eq)]
pub(crate) struct TaskSet {
tasks: BitVec,
}
impl TaskSet {
pub fn new() -> Self {
Self {
tasks: BitVec::from_bitslice(bits![0; DEFAULT_INLINE_TASKS]),
}
}
pub fn contains(&self, tid: TaskId) -> bool {
(tid.0 < self.tasks.len()) && self.tasks[tid.0]
}
pub fn is_empty(&self) -> bool {
self.tasks.iter().all(|b| !*b)
}
pub fn insert(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
self.tasks.resize(1 + tid.0, false);
}
!std::mem::replace(&mut *self.tasks.get_mut(tid.0).unwrap(), true)
}
pub fn remove(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
return false;
}
std::mem::replace(&mut self.tasks.get_mut(tid.0).unwrap(), false)
}
pub fn iter(&self) -> impl Iterator<Item = TaskId> + '_ {
self.tasks
.iter()
.enumerate()
.filter(|(_, b)| **b)
.map(|(i, _)| TaskId(i))
}
}
impl Debug for TaskSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TaskSet {{ ")?;
for t in self.iter() {
write!(f, "{} ", t.0)?;
}
write!(f, "}}")
}
}
impl<T: 'static> From<&'static LocalKey<T>> for StorageKey {
fn from(key: &'static LocalKey<T>) -> Self {
Self(key as *const _ as usize, 0x1)
}
}