use crate::backtrace_enabled;
use crate::current::get_name_for_task;
use crate::runtime::execution::{ExecutionState, TASK_ID_TO_TAGS};
use crate::runtime::storage::{AlreadyDestructedError, StorageKey, StorageMap};
use crate::runtime::task::clock::VectorClock;
use crate::runtime::task::labels::Labels;
use crate::runtime::thread;
use crate::runtime::thread::continuation::{
ContinuationInput, ContinuationOutput, ContinuationPool, PooledContinuation,
};
use crate::sync::{ResourceSignature, ResourceType};
use crate::thread::LocalKey;
use bitvec::prelude::*;
use corosensei::Yielder;
use std::any::Any;
use std::backtrace::Backtrace;
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::panic::Location;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Waker};
use tracing::{error_span, event, field, Level, Span};
pub(crate) mod clock;
pub(crate) mod labels;
pub(crate) mod waker;
use waker::make_waker;
pub(crate) const DEFAULT_INLINE_TASKS: usize = 16;
#[derive(Clone, PartialEq, Eq)]
pub struct TaskName(String);
impl From<String> for TaskName {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for TaskName {
fn from(s: &str) -> Self {
Self(String::from(s))
}
}
impl std::fmt::Debug for TaskName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<TaskName> for String {
fn from(task_name: TaskName) -> Self {
task_name.0
}
}
impl<'a> From<&'a TaskName> for &'a String {
fn from(task_name: &'a TaskName) -> Self {
&task_name.0
}
}
#[derive(Clone)]
#[allow(clippy::type_complexity)]
pub struct ChildLabelFn(pub Arc<dyn Fn(TaskId, &mut Labels) + 'static>);
impl Debug for ChildLabelFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ChildLabelFn")
}
}
#[deprecated]
#[allow(deprecated)]
pub trait Tag: Taggable {
fn as_any(&self) -> &dyn Any;
}
#[deprecated]
pub trait Taggable: Debug {}
#[allow(deprecated)]
impl<T> Tag for T
where
T: Taggable + Any,
{
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug, Clone)]
pub(crate) struct TaskSignature {
task_creation_stack: Vec<(&'static Location<'static>, u32)>,
spawn_call_site_hash: u64,
parent_signature_hash: u64,
signature_hash: u64,
child_counters: HashMap<&'static Location<'static>, u32>,
}
impl TaskSignature {
pub(crate) fn new_parentless(spawn_call_site: &'static Location<'static>) -> TaskSignature {
let mut hasher = DefaultHasher::new();
let task_creation_stack = vec![(spawn_call_site, 0)];
task_creation_stack.hash(&mut hasher);
let signature_hash = hasher.finish();
spawn_call_site.hash(&mut hasher);
Self {
task_creation_stack,
spawn_call_site_hash: hasher.finish(),
parent_signature_hash: 0,
signature_hash,
child_counters: HashMap::new(),
}
}
pub(crate) fn new_child(&mut self, spawn_call_site: &'static Location<'static>) -> Self {
let mut hasher = DefaultHasher::new();
let counter = self
.child_counters
.entry(spawn_call_site)
.and_modify(|c| *c += 1)
.or_insert(1);
let mut task_creation_stack = self.task_creation_stack.clone();
task_creation_stack.push((spawn_call_site, *counter));
spawn_call_site.hash(&mut hasher);
let spawn_call_site_hash = hasher.finish();
task_creation_stack.hash(&mut hasher);
Self {
task_creation_stack,
parent_signature_hash: self.signature_hash,
spawn_call_site_hash,
signature_hash: hasher.finish(),
child_counters: HashMap::new(),
}
}
#[track_caller]
pub(crate) fn new_resource(&mut self, resource_type: ResourceType) -> ResourceSignature {
let static_create_location = Location::caller();
let counter = self
.child_counters
.entry(static_create_location)
.and_modify(|c| *c += 1)
.or_insert(1);
ResourceSignature::new(resource_type, static_create_location, self.signature_hash, *counter)
}
pub(crate) fn static_create_location_hash(&self) -> u64 {
self.spawn_call_site_hash
}
pub(crate) fn signature_hash(&self) -> u64 {
self.signature_hash
}
pub(crate) fn parent_signature_hash(&self) -> u64 {
self.parent_signature_hash
}
}
impl Hash for TaskSignature {
fn hash<H: Hasher>(&self, state: &mut H) {
self.task_creation_stack.hash(state);
}
}
impl PartialEq for TaskSignature {
fn eq(&self, other: &Self) -> bool {
self.signature_hash == other.signature_hash
}
}
impl Eq for TaskSignature {}
#[derive(Debug)]
pub struct Task {
pub(super) id: TaskId,
pub(super) parent_task_id: Option<TaskId>,
pub(super) state: TaskState,
pub(super) detached: bool,
park_state: ParkState,
pub(super) continuation: Rc<RefCell<PooledContinuation>>,
pub(super) yielder: *const Yielder<ContinuationInput, ContinuationOutput>,
pub(crate) clock: VectorClock,
waiter: Option<TaskId>,
waker: Waker,
woken: bool,
name: Option<String>,
local_storage: StorageMap,
pub(crate) step_span: Span,
pub(super) span_stack: Vec<Span>,
#[allow(deprecated)]
tag: Option<Arc<dyn Tag>>,
pub(crate) backtrace: Option<Backtrace>,
pub(crate) signature: TaskSignature,
}
#[allow(deprecated)]
impl Task {
#[allow(clippy::too_many_arguments)]
fn new(
f: Box<dyn FnOnce() + 'static>,
stack_size: usize,
id: TaskId,
name: Option<String>,
clock: VectorClock,
parent_span_id: Option<tracing::span::Id>,
schedule_len: usize,
tag: Option<Arc<dyn Tag>>,
parent_task_id: Option<TaskId>,
signature: TaskSignature,
) -> Self {
#[cfg(all(any(test, feature = "vector-clocks"), not(feature = "bench-no-vector-clocks")))]
assert!(id.0 < clock.time.len());
let mut continuation = ContinuationPool::acquire(stack_size);
continuation.initialize(f);
let yielder = continuation.yielder;
let waker = make_waker(id);
let continuation = Rc::new(RefCell::new(continuation));
let step_span =
error_span!(parent: parent_span_id.clone(), "step", task = format!("{:?}", id), i = field::Empty);
let span_stack = vec![step_span.clone()];
let mut task = Self {
id,
parent_task_id,
state: TaskState::Runnable,
continuation,
yielder,
clock,
waiter: None,
waker,
woken: false,
detached: false,
park_state: ParkState::default(),
name,
step_span,
span_stack,
local_storage: StorageMap::new(),
tag: None,
backtrace: None,
signature,
};
if let Some(tag) = tag {
task.set_tag(tag);
}
error_span!(parent: parent_span_id, "new_task", parent = ?parent_task_id, i = schedule_len).in_scope(
|| event!(Level::DEBUG, task_id = ?task.id, signature = task.signature.signature_hash(), static_create_location = task.signature.static_create_location_hash(), "created task"),
);
task
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn from_closure(
f: Box<dyn FnOnce() + 'static>,
stack_size: usize,
id: TaskId,
name: Option<String>,
clock: VectorClock,
parent_span_id: Option<tracing::span::Id>,
schedule_len: usize,
tag: Option<Arc<dyn Tag>>,
parent_task_id: Option<TaskId>,
signature: TaskSignature,
) -> Self {
Self::new(
f,
stack_size,
id,
name,
clock,
parent_span_id,
schedule_len,
tag,
parent_task_id,
signature,
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn from_future<F>(
future: F,
stack_size: usize,
id: TaskId,
name: Option<String>,
clock: VectorClock,
parent_span_id: Option<tracing::span::Id>,
schedule_len: usize,
tag: Option<Arc<dyn Tag>>,
parent_task_id: Option<TaskId>,
signature: TaskSignature,
) -> Self
where
F: Future<Output = ()> + 'static,
{
let mut future = Box::pin(future);
Self::new(
Box::new(move || {
let waker = ExecutionState::with(|state| state.current_mut().waker());
let cx = &mut Context::from_waker(&waker);
while future.as_mut().poll(cx).is_pending() {
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
thread::switch();
}
}),
stack_size,
id,
name,
clock,
parent_span_id,
schedule_len,
tag,
parent_task_id,
signature,
)
}
pub fn id(&self) -> TaskId {
self.id
}
pub fn parent_task_id(&self) -> Option<TaskId> {
self.parent_task_id
}
pub(crate) fn runnable(&self) -> bool {
self.state == TaskState::Runnable
}
pub(crate) fn blocked(&self) -> bool {
matches!(self.state, TaskState::Blocked { .. })
}
pub(crate) fn can_spuriously_wakeup(&self) -> bool {
match self.state {
TaskState::Blocked { allow_spurious_wakeups } => allow_spurious_wakeups,
_ => false,
}
}
pub(crate) fn sleeping(&self) -> bool {
self.state == TaskState::Sleeping
}
pub(crate) fn finished(&self) -> bool {
self.state == TaskState::Finished
}
pub(crate) fn is_detached(&self) -> bool {
self.detached
}
pub(crate) fn detach(&mut self) {
self.detached = true;
}
pub(crate) fn abort(&mut self) {
self.detach();
}
pub(crate) fn waker(&self) -> Waker {
self.waker.clone()
}
pub(crate) fn block(&mut self, allow_spurious_wakeups: bool) {
self.backtrace = if backtrace_enabled() {
Some(Backtrace::force_capture())
} else {
None
};
assert!(self.state != TaskState::Finished);
self.state = TaskState::Blocked { allow_spurious_wakeups };
}
pub(crate) fn sleep(&mut self) {
self.backtrace = if backtrace_enabled() {
Some(Backtrace::force_capture())
} else {
None
};
assert!(self.state != TaskState::Finished);
self.state = TaskState::Sleeping;
}
pub(crate) fn unblock(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Runnable;
self.park_state.blocked_in_park = false;
}
pub(crate) fn finish(&mut self) {
assert!(self.state != TaskState::Finished);
self.state = TaskState::Finished;
}
pub(crate) fn sleep_unless_woken(&mut self) {
let was_woken = std::mem::replace(&mut self.woken, false);
if !was_woken {
self.sleep();
}
}
pub(super) fn wake(&mut self) {
self.woken = true;
if self.state == TaskState::Sleeping {
self.unblock();
}
}
pub(crate) fn set_waiter(&mut self, waiter: TaskId) -> bool {
assert!(
self.waiter.is_none() || self.waiter == Some(waiter),
"Task cannot have more than one waiter"
);
if self.finished() {
false
} else {
self.waiter = Some(waiter);
true
}
}
pub(crate) fn take_waiter(&mut self) -> Option<TaskId> {
self.waiter.take()
}
pub(crate) fn name(&self) -> Option<String> {
self.name.clone()
}
pub(crate) fn local<T: 'static>(&self, key: &'static LocalKey<T>) -> Option<Result<&T, AlreadyDestructedError>> {
self.local_storage.get(key.into())
}
pub(crate) fn init_local<T: 'static>(&mut self, key: &'static LocalKey<T>, value: T) {
self.local_storage.init(key.into(), value)
}
pub(crate) fn pop_local(&mut self) -> Option<Box<dyn Any>> {
self.local_storage.pop()
}
pub(crate) fn park(&mut self) -> bool {
assert!(
!self.park_state.blocked_in_park,
"task cannot park while already parked"
);
assert!(!self.blocked(), "task cannot park while blocked by something else");
if self.park_state.token_available {
self.park_state.token_available = false;
false
} else {
self.park_state.blocked_in_park = true;
self.block(true);
true
}
}
pub(crate) fn unpark(&mut self) {
if self.park_state.blocked_in_park {
assert!(
self.blocked() && self.can_spuriously_wakeup(),
"parked tasks should be blocked"
);
assert!(
!self.park_state.token_available,
"token shouldn't be available for parked task"
);
self.unblock();
} else {
self.park_state.token_available = true;
}
}
pub(crate) fn get_tag(&self) -> Option<Arc<dyn Tag>> {
self.tag.clone()
}
pub(crate) fn set_tag(&mut self, tag: Arc<dyn Tag>) -> Option<Arc<dyn Tag>> {
TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().insert(self.id(), tag.clone()));
self.tag.replace(tag)
}
pub(crate) fn format_for_deadlock(&self) -> String {
use crate::backtrace_enabled;
format!(
"{} (task {:?}{}{}){}",
self.name().unwrap_or_else(|| "<unknown>".to_string()),
self.id(),
if self.detached { ", detached" } else { "" },
if self.sleeping() { ", pending future" } else { "" },
if backtrace_enabled() {
format!("\nBacktrace:\n{:#?}\n", self.backtrace)
} else {
"".into()
}
)
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum TaskState {
Runnable,
Blocked { allow_spurious_wakeups: bool },
Sleeping,
Finished,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Default)]
pub(crate) struct ParkState {
token_available: bool,
blocked_in_park: bool,
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct TaskId(pub(super) usize);
impl Debug for TaskId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(name) = get_name_for_task(*self) {
f.write_str(&format!("{:?}({})", name, self.0))
} else {
f.debug_tuple("TaskId").field(&self.0).finish()
}
}
}
impl From<usize> for TaskId {
fn from(id: usize) -> Self {
TaskId(id)
}
}
impl From<TaskId> for usize {
fn from(tid: TaskId) -> usize {
tid.0
}
}
#[derive(PartialEq, Eq)]
pub(crate) struct TaskSet {
tasks: BitVec,
}
impl TaskSet {
pub const fn new() -> Self {
Self { tasks: BitVec::EMPTY }
}
pub fn contains(&self, tid: TaskId) -> bool {
(tid.0 < self.tasks.len()) && self.tasks[tid.0]
}
pub fn is_empty(&self) -> bool {
self.tasks.iter().all(|b| !*b)
}
pub fn insert(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
self.tasks.resize(DEFAULT_INLINE_TASKS.max(1 + tid.0), false);
}
!std::mem::replace(&mut *self.tasks.get_mut(tid.0).unwrap(), true)
}
pub fn remove(&mut self, tid: TaskId) -> bool {
if tid.0 >= self.tasks.len() {
return false;
}
std::mem::replace(&mut self.tasks.get_mut(tid.0).unwrap(), false)
}
pub fn iter(&self) -> impl Iterator<Item = TaskId> + '_ {
self.tasks
.iter()
.enumerate()
.filter(|(_, b)| **b)
.map(|(i, _)| TaskId(i))
}
}
impl Debug for TaskSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TaskSet {{ ")?;
for (i, t) in self.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{t:?}")?;
}
write!(f, " }}")
}
}
impl<T: 'static> From<&'static LocalKey<T>> for StorageKey {
fn from(key: &'static LocalKey<T>) -> Self {
Self(key as *const _ as usize, 0x1)
}
}