use crate::{JoinHandle, Task, TaskImpl, TaskState, Waker};
use std::any::Any;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::Context;
#[derive(Clone, Debug)]
pub struct Event<'a> {
pub id: u64,
pub data: &'a dyn Any,
}
impl<'a> Event<'a> {
pub fn new(id: u64, data: &'a dyn Any) -> Self {
Self { id, data }
}
}
pub(crate) struct SchedInfo {
task_id: u64,
event_id: u64,
running: Vec<Box<dyn Task>>,
waked: Vec<u64>,
events_map: HashMap<u64, u64>,
events: HashMap<u64, &'static dyn Any>,
temp_events: HashMap<u64, NonNull<dyn Any>>,
aborted: HashSet<u64>,
}
impl SchedInfo {
fn new() -> Self {
Self {
task_id: 0,
event_id: 0,
running: vec![],
waked: vec![],
events_map: HashMap::new(),
events: HashMap::new(),
temp_events: HashMap::new(),
aborted: HashSet::new(),
}
}
pub fn new_event_id(&mut self) -> u64 {
let id = self.event_id;
self.event_id += 1;
id
}
pub fn new_task_id(&mut self) -> u64 {
let id = self.task_id;
self.task_id += 1;
id
}
pub fn event_register(&mut self, event_id: u64, task_id: u64) {
self.events_map.insert(event_id, task_id);
}
pub fn event_unregister(&mut self, event_id: u64) {
let _ = self.events_map.remove(&event_id);
let _ = self.events.remove(&event_id);
}
pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
if let Some(data) = self.events.remove(&event_id) {
return Some(data);
}
if let Some(data) = self.temp_events.remove(&event_id) {
return Some(unsafe { data.as_ref() });
}
None
}
pub fn task_wake(&mut self, task_id: u64) {
self.waked.push(task_id);
}
pub fn task_push(&mut self, task: Box<dyn Task>) {
self.running.push(task);
}
pub fn task_abort(&mut self, task_id: u64) {
self.aborted.insert(task_id);
}
pub fn task_aborted(&mut self, task_id: u64) -> bool {
self.aborted.remove(&task_id)
}
pub fn event_notify(&mut self, event_id: u64) {
if let Some(task_id) = self.events_map.remove(&event_id) {
self.waked.push(task_id);
}
}
pub fn events_push(&mut self, events: &[Event<'static>]) {
for event in events {
self.events.insert(event.id, event.data);
self.event_notify(event.id);
}
}
fn temp_events_push(&mut self, events: &[Event<'_>]) {
for event in events {
let data = event.data as *const dyn Any;
self.temp_events
.insert(event.id, NonNull::new(data.cast_mut()).unwrap());
self.event_notify(event.id);
}
}
fn temp_events_clear(&mut self) {
self.temp_events.clear();
}
}
pub struct Runtime {
pending: HashMap<u64, Box<dyn Task>>,
info: Rc<RefCell<SchedInfo>>,
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Runtime {
pub fn new() -> Self {
Self {
pending: HashMap::new(),
info: Rc::new(RefCell::new(SchedInfo::new())),
}
}
pub fn pending_count(&self) -> usize {
self.pending.len() + self.info.borrow_mut().running.len()
}
pub fn sched(&mut self) -> usize {
let mut cnt = 0;
loop {
self.collect_waked();
if self.info.borrow_mut().running.is_empty() {
break;
}
let mut task = self.info.borrow_mut().running.remove(0);
let task_id = task.get_id();
if let TaskState::Pending = task.get_status() {
if self.info.borrow_mut().task_aborted(task_id) {
continue;
}
task.set_running();
}
let local_waker = Waker::new(self.info.clone(), task_id);
let waker = local_waker.task_waker();
let mut ctx = Context::from_waker(&waker);
if task.run(&mut ctx).is_pending() {
self.pending.insert(task.get_id(), task);
} else {
cnt += 1;
}
}
cnt
}
pub fn spawn<T>(&mut self, future: T) -> JoinHandle<T>
where
T: Future + 'static,
T::Output: 'static,
{
let (task, handle) = TaskImpl::with_info(future, &self.info);
self.info.borrow_mut().task_push(task);
handle
}
pub fn notify_events(&mut self, events: &[Event<'static>]) {
self.info.borrow_mut().events_push(events);
}
pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
self.info.borrow_mut().temp_events_push(events);
let cnt = self.sched();
self.info.borrow_mut().temp_events_clear();
cnt
}
pub fn task_resume(&mut self, task_id: u64) {
self.info.borrow_mut().task_wake(task_id);
}
fn collect_waked(&mut self) {
let info: &mut SchedInfo = &mut self.info.borrow_mut();
for id in info.waked.iter() {
if let Some(task) = self.pending.remove(id) {
info.running.push(task);
}
}
info.waked.clear();
}
}