use crate::{BTreeMap, BTreeSet, HashMap, HashSet, JoinHandle, Task, TaskImpl, Waker};
use alloc::boxed::Box;
use alloc::rc::Rc;
use alloc::vec;
use alloc::vec::Vec;
use core::any::Any;
use core::cell::RefCell;
use core::future::Future;
use core::ptr::NonNull;
use core::task::Context;
#[derive(Clone, Debug)]
pub struct Event<'a> {
pub id: u64,
pub data: &'a dyn Any,
}
impl<'a> Event<'a> {
pub fn new(id: u64, data: &'a dyn Any) -> Self {
Self { id, data }
}
}
pub(crate) struct SchedInfo {
task_id: u64,
event_id: u64,
running: Vec<Box<dyn Task>>,
waked: Vec<u64>,
events_map: HashMap<u64, u64>,
events: HashMap<u64, &'static dyn Any>,
temp_events: HashMap<u64, NonNull<dyn Any>>,
aborted: HashSet<u64>,
}
impl SchedInfo {
fn new() -> Self {
Self {
task_id: 0,
event_id: 0,
running: vec![],
waked: vec![],
events_map: BTreeMap::new(),
events: BTreeMap::new(),
temp_events: BTreeMap::new(),
aborted: BTreeSet::new(),
}
}
pub fn new_event_id(&mut self) -> u64 {
let id = self.event_id;
self.event_id += 1;
id
}
pub fn new_task_id(&mut self) -> u64 {
let id = self.task_id;
self.task_id += 1;
id
}
pub fn event_register(&mut self, event_id: u64, task_id: u64) {
self.events_map.insert(event_id, task_id);
}
pub fn event_unregister(&mut self, event_id: u64) {
let _ = self.events_map.remove(&event_id);
let _ = self.events.remove(&event_id);
}
pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
if let Some(data) = self.events.remove(&event_id) {
return Some(data);
}
if let Some(data) = self.temp_events.remove(&event_id) {
return Some(unsafe { data.as_ref() });
}
None
}
pub fn task_wake(&mut self, task_id: u64) {
self.waked.push(task_id);
}
pub fn task_push(&mut self, task: Box<dyn Task>) {
self.running.push(task);
}
pub fn task_abort(&mut self, task_id: u64) {
self.aborted.insert(task_id);
}
pub fn task_aborted(&mut self, task_id: u64) -> bool {
self.aborted.remove(&task_id)
}
pub fn event_notify(&mut self, event_id: u64) {
if let Some(task_id) = self.events_map.remove(&event_id) {
self.waked.push(task_id);
}
}
pub fn events_push(&mut self, events: &[Event<'static>]) {
for event in events {
self.events.insert(event.id, event.data);
self.event_notify(event.id);
}
}
fn temp_events_push(&mut self, events: &[Event<'_>]) {
for event in events {
let data = event.data as *const dyn Any;
self.temp_events
.insert(event.id, NonNull::new(data.cast_mut()).unwrap());
self.event_notify(event.id);
}
}
fn temp_events_clear(&mut self) {
self.temp_events.clear();
}
}
pub struct Runtime {
pending: HashMap<u64, Box<dyn Task>>,
info: Rc<RefCell<SchedInfo>>,
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Runtime {
pub fn new() -> Self {
Self {
pending: BTreeMap::new(),
info: Rc::new(RefCell::new(SchedInfo::new())),
}
}
pub fn pending_count(&self) -> usize {
self.pending.len() + self.info.borrow_mut().running.len()
}
pub fn sched(&mut self) -> usize {
let mut cnt = 0;
loop {
self.collect_waked();
if self.info.borrow().running.is_empty() {
break;
}
let mut task = self.info.borrow_mut().running.remove(0);
let task_id = task.get_id();
if self.info.borrow_mut().task_aborted(task_id) {
task.abort();
continue;
}
let local_waker = Waker::new(self.info.clone(), task_id);
let waker = local_waker.task_waker();
let mut ctx = Context::from_waker(waker);
if task.run(&mut ctx).is_pending() {
self.pending.insert(task.get_id(), task);
} else {
cnt += 1;
}
}
self.release_aborted();
cnt
}
pub fn spawn<T>(&mut self, future: T) -> JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: 'static,
{
let (task, handle) = TaskImpl::with_info(future, &self.info);
self.info.borrow_mut().task_push(task);
handle
}
pub fn notify_events(&mut self, events: &[Event<'static>]) {
self.info.borrow_mut().events_push(events);
}
pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
self.info.borrow_mut().temp_events_push(events);
let cnt = self.sched();
self.info.borrow_mut().temp_events_clear();
cnt
}
pub fn task_resume(&mut self, task_id: u64) {
self.info.borrow_mut().task_wake(task_id);
}
pub fn task_force_abort(&mut self, task_id: u64) {
if let Some(mut task) = self.pending.remove(&task_id) {
task.abort();
} else {
self.info.borrow_mut().task_abort(task_id);
}
}
fn collect_waked(&mut self) {
let info: &mut SchedInfo = &mut self.info.borrow_mut();
for id in info.waked.iter() {
if let Some(task) = self.pending.remove(id) {
info.running.push(task);
}
}
info.waked.clear();
}
fn release_aborted(&mut self) {
loop {
let mut info = self.info.borrow_mut();
let mut aborted = vec![];
for task_id in &info.aborted {
if let Some(task) = self.pending.remove(task_id) {
aborted.push(task);
}
}
info.aborted.clear();
if aborted.is_empty() {
return;
}
core::mem::drop(info);
for mut task in aborted {
task.abort();
}
}
}
}