use crate::{JoinHandle, Task, TaskImpl, Waker};
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::Context;
#[derive(Clone, Debug)]
pub struct Event<'a> {
pub id: u64,
pub data: &'a dyn Any,
}
impl<'a> Event<'a> {
pub fn new(id: u64, data: &'a dyn Any) -> Self {
Self { id, data }
}
}
#[derive(Debug)]
pub(crate) struct SchedInfo {
event_id: u64,
waked: Vec<u64>,
events_map: HashMap<u64, u64>,
events: HashMap<u64, &'static dyn Any>,
temp_events: HashMap<u64, NonNull<dyn Any>>,
}
impl SchedInfo {
fn new() -> Self {
Self {
event_id: 0,
waked: vec![],
events_map: HashMap::new(),
events: HashMap::new(),
temp_events: HashMap::new(),
}
}
pub fn new_event_id(&mut self) -> u64 {
let id = self.event_id;
self.event_id += 1;
id
}
pub fn event_register(&mut self, event_id: u64, task_id: u64) {
self.events_map.insert(event_id, task_id);
}
pub fn event_unregister(&mut self, event_id: u64) {
let _ = self.events_map.remove(&event_id);
let _ = self.events.remove(&event_id);
}
pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
if let Some(data) = self.events.remove(&event_id) {
return Some(data);
}
if let Some(data) = self.temp_events.remove(&event_id) {
return Some(unsafe { data.as_ref() });
}
None
}
pub fn task_wake(&mut self, task_id: u64) {
self.waked.push(task_id);
}
pub fn event_notify(&mut self, event_id: u64) {
if let Some(task_id) = self.events_map.remove(&event_id) {
self.waked.push(task_id);
}
}
fn events_push(&mut self, events: &[Event<'static>]) {
for event in events {
self.events.insert(event.id, event.data);
self.event_notify(event.id);
}
}
fn temp_events_push(&mut self, events: &[Event<'_>]) {
for event in events {
let data = event.data as *const dyn Any;
self.temp_events
.insert(event.id, NonNull::new(data.cast_mut()).unwrap());
self.event_notify(event.id);
}
}
fn temp_events_clear(&mut self) {
self.temp_events.clear();
}
}
pub struct Runtime {
id: u64,
pending: HashMap<u64, Box<dyn Task>>,
running: Vec<Box<dyn Task>>,
info: Rc<RefCell<SchedInfo>>,
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl Runtime {
pub fn new() -> Self {
Self {
id: 0,
pending: HashMap::new(),
running: vec![],
info: Rc::new(RefCell::new(SchedInfo::new())),
}
}
pub fn pending_count(&self) -> usize {
self.pending.len() + self.running.len()
}
pub fn sched(&mut self) -> usize {
let mut cnt = 0;
loop {
self.collect_waked();
if self.running.is_empty() {
break;
}
let mut task = self.running.remove(0);
let mut local_waker = Waker::new(self.info.clone(), task.get_id(), self.id);
local_waker.tasks_swap(&mut self.running);
let waker = local_waker.task_waker();
let mut ctx = Context::from_waker(&waker);
if task.run(&mut ctx).is_pending() {
self.pending.insert(task.get_id(), task);
} else {
cnt += 1;
}
local_waker.tasks_swap(&mut self.running);
self.id = local_waker.task_id();
}
cnt
}
pub fn spawn<T: Future + 'static>(&mut self, future: T) -> JoinHandle<T> {
let (task, handle) = TaskImpl::with_id(future, self.id);
self.id += 1;
self.running.push(task);
handle
}
pub fn notify_events(&mut self, events: &[Event<'static>]) {
self.info.borrow_mut().events_push(events);
}
pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
self.info.borrow_mut().temp_events_push(events);
let cnt = self.sched();
self.info.borrow_mut().temp_events_clear();
cnt
}
fn collect_waked(&mut self) {
let mut info = self.info.borrow_mut();
for id in info.waked.iter() {
if let Some(task) = self.pending.remove(id) {
self.running.push(task);
}
}
info.waked.clear();
}
}