hiasync 0.1.2

Supports only single-threaded asynchronous runtime
Documentation
use crate::{JoinHandle, Task, TaskImpl, Waker};
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::future::Future;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::Context;

/// 事件传递数据到异步任务,和`wait_event`配套使用.
#[derive(Clone, Debug)]
pub struct Event<'a> {
    pub id: u64,
    pub data: &'a dyn Any,
}

impl<'a> Event<'a> {
    pub fn new(id: u64, data: &'a dyn Any) -> Self {
        Self { id, data }
    }
}

#[derive(Debug)]
pub(crate) struct SchedInfo {
    event_id: u64,
    waked: Vec<u64>,
    events_map: HashMap<u64, u64>,
    events: HashMap<u64, &'static dyn Any>,
    temp_events: HashMap<u64, NonNull<dyn Any>>,
}

impl SchedInfo {
    fn new() -> Self {
        Self {
            event_id: 0,
            waked: vec![],
            events_map: HashMap::new(),
            events: HashMap::new(),
            temp_events: HashMap::new(),
        }
    }

    // 每次调用生成不同的event_id
    pub fn new_event_id(&mut self) -> u64 {
        let id = self.event_id;
        self.event_id += 1;
        id
    }

    // 调用非阻塞API之后,非阻塞API的返回结果以事件数据的形式到来,这里需要将事假和task关联,这样事件到来后才能唤醒此task
    // 调用者保证事件ID唯一
    pub fn event_register(&mut self, event_id: u64, task_id: u64) {
        self.events_map.insert(event_id, task_id);
    }

    // 用于忽略事件, 调用此接口保证不会有资源泄露.
    pub fn event_unregister(&mut self, event_id: u64) {
        let _ = self.events_map.remove(&event_id);
        let _ = self.events.remove(&event_id);
    }

    // 消费事件
    pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
        if let Some(data) = self.events.remove(&event_id) {
            return Some(data);
        }
        if let Some(data) = self.temp_events.remove(&event_id) {
            return Some(unsafe { data.as_ref() });
        }
        None
    }

    // 唤醒task
    pub fn task_wake(&mut self, task_id: u64) {
        self.waked.push(task_id);
    }

    // 事件通知,唤醒注册事件的task
    pub fn event_notify(&mut self, event_id: u64) {
        if let Some(task_id) = self.events_map.remove(&event_id) {
            self.waked.push(task_id);
        }
    }

    fn events_push(&mut self, events: &[Event<'static>]) {
        for event in events {
            self.events.insert(event.id, event.data);
            self.event_notify(event.id);
        }
    }

    fn temp_events_push(&mut self, events: &[Event<'_>]) {
        for event in events {
            let data = event.data as *const dyn Any;
            self.temp_events
                .insert(event.id, NonNull::new(data.cast_mut()).unwrap());
            self.event_notify(event.id);
        }
    }

    // 清空临时事件
    fn temp_events_clear(&mut self) {
        self.temp_events.clear();
    }
}

/// 单线程下的异步运行时.
pub struct Runtime {
    id: u64,
    pending: HashMap<u64, Box<dyn Task>>,
    running: Vec<Box<dyn Task>>,
    info: Rc<RefCell<SchedInfo>>,
}

impl Default for Runtime {
    fn default() -> Self {
        Self::new()
    }
}

impl Runtime {
    pub fn new() -> Self {
        Self {
            id: 0,
            pending: HashMap::new(),
            running: vec![],
            info: Rc::new(RefCell::new(SchedInfo::new())),
        }
    }

    /// 返回等待外部事件激活的异步任务数量
    pub fn pending_count(&self) -> usize {
        self.pending.len() + self.running.len()
    }

    /// 启动调度,直到没有可运行的任务后返回
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched(&mut self) -> usize {
        let mut cnt = 0;
        loop {
            self.collect_waked();
            if self.running.is_empty() {
                break;
            }
            let mut task = self.running.remove(0);
            let mut local_waker = Waker::new(self.info.clone(), task.get_id(), self.id);
            local_waker.tasks_swap(&mut self.running);
            let waker = local_waker.task_waker();
            let mut ctx = Context::from_waker(&waker);
            if task.run(&mut ctx).is_pending() {
                self.pending.insert(task.get_id(), task);
            } else {
                cnt += 1;
            }
            local_waker.tasks_swap(&mut self.running);
            self.id = local_waker.task_id();
        }
        cnt
    }

    /// 创建异步任务,还需要调用sched或者sched_events才执行此任务
    pub fn spawn<T: Future + 'static>(&mut self, future: T) -> JoinHandle<T> {
        let (task, handle) = TaskImpl::with_id(future, self.id);
        self.id += 1;
        self.running.push(task);
        handle
    }

    /// 通知事件之后, 还需要调用sched或者sched_events才唤醒等待此事件的任务
    pub fn notify_events(&mut self, events: &[Event<'static>]) {
        self.info.borrow_mut().events_push(events);
    }

    /// 临时事件通知并立即启动调度.
    /// 本次调度结束字后,无论events是否被消费,都将被清空.
    /// 如果需要留待后续消费,需要调用notify_events.
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
        self.info.borrow_mut().temp_events_push(events);
        let cnt = self.sched();
        self.info.borrow_mut().temp_events_clear();
        cnt
    }

    fn collect_waked(&mut self) {
        let mut info = self.info.borrow_mut();
        for id in info.waked.iter() {
            if let Some(task) = self.pending.remove(id) {
                self.running.push(task);
            }
        }
        info.waked.clear();
    }
}