mod core;
use self::core::Cell;
use self::core::Header;
mod error;
pub use self::error::JoinError;
mod harness;
use self::harness::Harness;
mod id;
pub use id::{id, try_id, Id};
#[cfg(feature = "rt")]
mod abort;
mod join;
#[cfg(feature = "rt")]
pub use self::abort::AbortHandle;
pub use self::join::JoinHandle;
mod list;
pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks};
mod raw;
pub(crate) use self::raw::RawTask;
mod state;
use self::state::State;
#[cfg(feature = "rt-multi-thread")]
mod atomic_notified;
#[cfg(feature = "rt-multi-thread")]
pub(crate) use self::atomic_notified::AtomicNotified;
mod waker;
pub(crate) use self::spawn_location::SpawnLocation;
cfg_taskdump! {
pub(crate) mod trace;
}
use crate::future::Future;
use crate::util::linked_list;
use crate::util::sharded_list;
use crate::runtime::TaskCallback;
use std::marker::PhantomData;
use std::panic::Location;
use std::ptr::NonNull;
use std::{fmt, mem};
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
unsafe impl<S> Send for Task<S> {}
unsafe impl<S> Sync for Task<S> {}
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>);
impl<S> Notified<S> {
#[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
#[inline]
pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> {
self.0.task_meta()
}
}
unsafe impl<S: Schedule> Send for Notified<S> {}
unsafe impl<S: Schedule> Sync for Notified<S> {}
#[repr(transparent)]
pub(crate) struct LocalNotified<S: 'static> {
task: Task<S>,
_not_send: PhantomData<*const ()>,
}
impl<S> LocalNotified<S> {
#[cfg(tokio_unstable)]
#[inline]
pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> {
self.task.task_meta()
}
}
pub(crate) struct UnownedTask<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
unsafe impl<S> Send for UnownedTask<S> {}
unsafe impl<S> Sync for UnownedTask<S> {}
pub(crate) type Result<T> = std::result::Result<T, JoinError>;
#[derive(Clone)]
pub(crate) struct TaskHarnessScheduleHooks {
pub(crate) task_terminate_callback: Option<TaskCallback>,
}
pub(crate) trait Schedule: Sync + Sized + 'static {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
fn schedule(&self, task: Notified<Self>);
fn hooks(&self) -> TaskHarnessScheduleHooks;
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}
fn unhandled_panic(&self) {
}
}
cfg_rt! {
fn new_task<T, S>(
task: T,
scheduler: S,
id: Id,
spawned_at: SpawnLocation,
) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let raw = RawTask::new::<T, S>(
task,
scheduler,
id,
spawned_at,
);
let task = Task {
raw,
_p: PhantomData,
};
let notified = Notified(Task {
raw,
_p: PhantomData,
});
let join = JoinHandle::new(raw);
(task, notified, join)
}
pub(crate) fn unowned<T, S>(
task: T,
scheduler: S,
id: Id,
spawned_at: SpawnLocation,
) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
S: Schedule,
T: Send + Future + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = new_task(
task,
scheduler,
id,
spawned_at,
);
let unowned = UnownedTask {
raw: task.raw,
_p: PhantomData,
};
std::mem::forget(task);
std::mem::forget(notified);
(unowned, join)
}
}
impl<S: 'static> Task<S> {
unsafe fn new(raw: RawTask) -> Task<S> {
Task {
raw,
_p: PhantomData,
}
}
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
unsafe { Task::new(RawTask::from_raw(ptr)) }
}
#[cfg(all(
tokio_unstable,
feature = "taskdump",
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
pub(super) fn as_raw(&self) -> RawTask {
self.raw
}
fn header(&self) -> &Header {
self.raw.header()
}
fn header_ptr(&self) -> NonNull<Header> {
self.raw.header_ptr()
}
#[cfg(tokio_unstable)]
pub(crate) fn id(&self) -> crate::task::Id {
unsafe { Header::get_id(self.raw.header_ptr()) }
}
#[cfg(tokio_unstable)]
pub(crate) fn spawned_at(&self) -> &'static Location<'static> {
unsafe { Header::get_spawn_location(self.raw.header_ptr()) }
}
#[cfg(tokio_unstable)]
pub(crate) fn task_meta<'meta>(&self) -> crate::runtime::TaskMeta<'meta> {
crate::runtime::TaskMeta {
id: self.id(),
spawned_at: self.spawned_at().into(),
_phantom: PhantomData,
}
}
cfg_taskdump! {
pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> {
if self.as_raw().state().transition_to_notified_for_tracing() {
Some(unsafe { Notified(Task::new(self.raw)) })
} else {
None
}
}
}
}
impl<S: 'static> Notified<S> {
fn header(&self) -> &Header {
self.0.header()
}
#[cfg(tokio_unstable)]
#[allow(dead_code)]
pub(crate) fn task_id(&self) -> crate::task::Id {
self.0.id()
}
}
impl<S: 'static> Notified<S> {
pub(crate) unsafe fn from_raw(ptr: RawTask) -> Notified<S> {
Notified(unsafe { Task::new(ptr) })
}
}
impl<S: 'static> Notified<S> {
pub(crate) fn into_raw(self) -> RawTask {
let raw = self.0.raw;
mem::forget(self);
raw
}
}
impl<S: Schedule> Task<S> {
pub(crate) fn shutdown(self) {
let raw = self.raw;
mem::forget(self);
raw.shutdown();
}
}
impl<S: Schedule> LocalNotified<S> {
pub(crate) fn run(self) {
let raw = self.task.raw;
mem::forget(self);
raw.poll();
}
}
impl<S: Schedule> UnownedTask<S> {
#[cfg(test)]
#[cfg_attr(target_family = "wasm", allow(dead_code))]
pub(super) fn into_notified(self) -> Notified<S> {
Notified(self.into_task())
}
fn into_task(self) -> Task<S> {
let task = Task {
raw: self.raw,
_p: PhantomData,
};
mem::forget(self);
task.header().state.ref_dec();
task
}
pub(crate) fn run(self) {
let raw = self.raw;
mem::forget(self);
let task = Task::<S> {
raw,
_p: PhantomData,
};
raw.poll();
drop(task);
}
pub(crate) fn shutdown(self) {
self.into_task().shutdown();
}
}
impl<S: 'static> Drop for Task<S> {
fn drop(&mut self) {
if self.header().state.ref_dec() {
self.raw.dealloc();
}
}
}
impl<S: 'static> Drop for UnownedTask<S> {
fn drop(&mut self) {
if self.raw.header().state.ref_dec_twice() {
self.raw.dealloc();
}
}
}
impl<S> fmt::Debug for Task<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Task({:p})", self.header())
}
}
impl<S> fmt::Debug for Notified<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "task::Notified({:p})", self.0.header())
}
}
unsafe impl<S> linked_list::Link for Task<S> {
type Handle = Task<S>;
type Target = Header;
fn as_raw(handle: &Task<S>) -> NonNull<Header> {
handle.raw.header_ptr()
}
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
unsafe { Task::from_raw(ptr) }
}
unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
unsafe { self::core::Trailer::addr_of_owned(Header::get_trailer(target)) }
}
}
unsafe impl<S> sharded_list::ShardedListItem for Task<S> {
unsafe fn get_shard_id(target: NonNull<Self::Target>) -> usize {
let task_id = unsafe { Header::get_id(target) };
task_id.0.get() as usize
}
}
#[cfg(tokio_unstable)]
mod spawn_location {
use std::panic::Location;
#[derive(Copy, Clone)]
pub(crate) struct SpawnLocation(pub &'static Location<'static>);
impl From<&'static Location<'static>> for SpawnLocation {
fn from(location: &'static Location<'static>) -> Self {
Self(location)
}
}
}
#[cfg(not(tokio_unstable))]
mod spawn_location {
use std::panic::Location;
#[derive(Copy, Clone)]
pub(crate) struct SpawnLocation();
impl From<&'static Location<'static>> for SpawnLocation {
fn from(_: &'static Location<'static>) -> Self {
Self()
}
}
#[cfg(test)]
#[test]
fn spawn_location_is_zero_sized() {
assert_eq!(std::mem::size_of::<SpawnLocation>(), 0);
}
}
impl SpawnLocation {
#[track_caller]
#[inline]
pub(crate) fn capture() -> Self {
Self::from(Location::caller())
}
}