use std::mem;
use std::any::Any;
use std::pin::Pin;
use std::future::Future;
use nohash::IntMap;
use crate::{
JoinHandle,
platform::Platform,
error::InitError
};
pub type TaskId = u32;
type Task = Pin<Box<dyn Future<Output = Box<dyn Any>>>>;
pub enum WokenTask {
Root,
Child(Task)
}
struct JoinHandleInfo {
result: Option<Box<dyn Any>>,
waiting_task: Option<TaskId>
}
pub struct Runtime {
task_id_counter: TaskId,
pub current_task: TaskId,
tasks: IntMap<TaskId, Task>,
join_handles: IntMap<TaskId, JoinHandleInfo>,
task_wakeups: Vec<TaskId>,
pub plat: Platform,
}
impl Runtime {
pub fn new() -> Result<Self, InitError> {
let plat = Platform::new()?;
Ok(Self {
task_id_counter: 1, current_task: 0,
tasks: IntMap::default(),
join_handles: IntMap::default(),
task_wakeups: vec![0], plat
})
}
fn new_task_id(&mut self) -> TaskId {
let id = self.task_id_counter;
self.task_id_counter = id.wrapping_add(1);
if self.task_id_counter == 0 {
self.task_id_counter = 1;
}
id
}
pub fn reset(&mut self) -> IntMap<TaskId, Task> {
self.task_id_counter = 1;
self.current_task = 0;
self.join_handles = IntMap::default();
self.task_wakeups = vec![0];
self.plat.reset();
mem::replace(&mut self.tasks, IntMap::default())
}
pub fn wait_for_io(&mut self) {
self.plat.wait_for_io(&mut self.task_wakeups)
}
}
impl Runtime {
pub fn spawn<F: Future + 'static>(&mut self, task: F) -> JoinHandle<F::Output> {
let id = self.new_task_id();
let wrapped_task = async {
let res: Box<dyn Any> = Box::new(task.await);
res
};
self.tasks.insert(id, Box::pin(wrapped_task));
self.join_handles.insert(id, JoinHandleInfo { result: None, waiting_task: None });
self.task_wakeups.push(id);
JoinHandle::new(id)
}
pub fn get_woken_task(&mut self) -> Option<WokenTask> {
loop {
let id = self.task_wakeups.pop()?;
self.current_task = id;
if id == 0 {
return Some(WokenTask::Root);
}
else {
match self.tasks.remove(&id) {
Some(task) => return Some(WokenTask::Child(task)),
None => continue
}
}
}
}
pub fn task_finished(&mut self, res: Box<dyn Any>) {
match self.join_handles.get_mut(&self.current_task) {
Some(handle) => {
handle.result = Some(res);
if let Some(id) = handle.waiting_task {
self.task_wakeups.push(id);
}
},
None => ()
}
}
pub fn return_task(&mut self, task: Task) {
self.tasks.insert(self.current_task, task);
}
}
impl Runtime {
pub fn pop_join_handle_result(&mut self, id: TaskId) -> Option<Box<dyn Any>> {
let info = self.join_handles.remove(&id).expect("Join handle info not found");
if let Some(res) = info.result {
Some(res)
}
else {
self.join_handles.insert(id, info);
None
}
}
pub fn register_join_handle_wakeup(&mut self, id: TaskId) {
let info = self.join_handles.get_mut(&id).expect("Join handle info not found");
info.waiting_task = Some(self.current_task);
}
pub fn drop_join_handle(&mut self, id: TaskId) {
self.join_handles.remove(&id);
}
}