hiasync 0.2.2

Supports only single-threaded asynchronous runtime
Documentation
use crate::{BTreeMap, BTreeSet, HashMap, HashSet, JoinHandle, Task, TaskImpl, Waker};
use alloc::boxed::Box;
use alloc::rc::Rc;
use alloc::vec;
use alloc::vec::Vec;
use core::any::Any;
use core::cell::RefCell;
use core::future::Future;
use core::ptr::NonNull;
use core::task::Context;

/// 事件传递数据到异步任务,和`wait_event`配套使用.
#[derive(Clone, Debug)]
pub struct Event<'a> {
    pub id: u64,
    pub data: &'a dyn Any,
}

impl<'a> Event<'a> {
    pub fn new(id: u64, data: &'a dyn Any) -> Self {
        Self { id, data }
    }
}

pub(crate) struct SchedInfo {
    task_id: u64,
    event_id: u64,
    // 返回是Box<dyn Task>而非Rc<dyn Task>的主要目的是希望Task调度结束时所有内存能立即释放.
    // 如果使用Rc内存可能因为Rc::clone或者Weak等引用导致其内存空间的释放时间完全不可控.
    // 涉及到Task的状态修改, 数据通信全部通过task_id来实现,
    // 就是后续的waked/events_map/events/temp_ewvents/aborted等.
    running: Vec<Box<dyn Task>>,
    waked: Vec<u64>,
    events_map: HashMap<u64, u64>,
    events: HashMap<u64, &'static dyn Any>,
    temp_events: HashMap<u64, NonNull<dyn Any>>,
    aborted: HashSet<u64>,
}

impl SchedInfo {
    fn new() -> Self {
        Self {
            task_id: 0,
            event_id: 0,
            running: vec![],
            waked: vec![],
            events_map: BTreeMap::new(),
            events: BTreeMap::new(),
            temp_events: BTreeMap::new(),
            aborted: BTreeSet::new(),
        }
    }

    // 每次调用生成不同的event_id.
    pub fn new_event_id(&mut self) -> u64 {
        let id = self.event_id;
        self.event_id += 1;
        id
    }

    // 每次调用生成不同的task_id.
    pub fn new_task_id(&mut self) -> u64 {
        let id = self.task_id;
        self.task_id += 1;
        id
    }

    // 调用非阻塞API之后,非阻塞API的返回结果以事件数据的形式到来,这里需要将事假和task关联,这样事件到来后才能唤醒此task
    // 调用者保证事件ID唯一
    pub fn event_register(&mut self, event_id: u64, task_id: u64) {
        self.events_map.insert(event_id, task_id);
    }

    // 用于忽略事件, 调用此接口保证不会有资源泄露.
    pub fn event_unregister(&mut self, event_id: u64) {
        let _ = self.events_map.remove(&event_id);
        let _ = self.events.remove(&event_id);
    }

    // 消费事件
    pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
        if let Some(data) = self.events.remove(&event_id) {
            return Some(data);
        }
        if let Some(data) = self.temp_events.remove(&event_id) {
            return Some(unsafe { data.as_ref() });
        }
        None
    }

    // 唤醒task
    pub fn task_wake(&mut self, task_id: u64) {
        self.waked.push(task_id);
    }

    // new task
    pub fn task_push(&mut self, task: Box<dyn Task>) {
        self.running.push(task);
    }

    pub fn task_abort(&mut self, task_id: u64) {
        self.aborted.insert(task_id);
    }

    pub fn task_aborted(&mut self, task_id: u64) -> bool {
        self.aborted.remove(&task_id)
    }

    // 事件通知,唤醒注册事件的task
    pub fn event_notify(&mut self, event_id: u64) {
        if let Some(task_id) = self.events_map.remove(&event_id) {
            self.waked.push(task_id);
        }
    }

    pub fn events_push(&mut self, events: &[Event<'static>]) {
        for event in events {
            self.events.insert(event.id, event.data);
            self.event_notify(event.id);
        }
    }

    fn temp_events_push(&mut self, events: &[Event<'_>]) {
        for event in events {
            let data = event.data as *const dyn Any;
            self.temp_events
                .insert(event.id, NonNull::new(data.cast_mut()).unwrap());
            self.event_notify(event.id);
        }
    }

    // 清空临时事件
    fn temp_events_clear(&mut self) {
        self.temp_events.clear();
    }
}

/// 单线程下的异步运行时.
pub struct Runtime {
    pending: HashMap<u64, Box<dyn Task>>,
    info: Rc<RefCell<SchedInfo>>,
}

impl Default for Runtime {
    fn default() -> Self {
        Self::new()
    }
}

impl Runtime {
    pub fn new() -> Self {
        Self {
            pending: BTreeMap::new(),
            info: Rc::new(RefCell::new(SchedInfo::new())),
        }
    }

    /// 返回等待外部事件激活的异步任务数量
    pub fn pending_count(&self) -> usize {
        self.pending.len() + self.info.borrow_mut().running.len()
    }

    /// 启动调度,直到没有可运行的任务后返回
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched(&mut self) -> usize {
        let mut cnt = 0;
        loop {
            self.collect_waked();
            if self.info.borrow().running.is_empty() {
                break;
            }
            let mut task = self.info.borrow_mut().running.remove(0);
            let task_id = task.get_id();
            if self.info.borrow_mut().task_aborted(task_id) {
                task.abort();
                continue;
            }
            let local_waker = Waker::new(self.info.clone(), task_id);
            let waker = local_waker.task_waker();
            let mut ctx = Context::from_waker(waker);
            if task.run(&mut ctx).is_pending() {
                self.pending.insert(task.get_id(), task);
            } else {
                cnt += 1;
            }
        }
        self.release_aborted();
        cnt
    }

    /// 创建异步任务,还需要调用sched或者sched_events才执行此任务
    pub fn spawn<T>(&mut self, future: T) -> JoinHandle<T::Output>
    where
        T: Future + 'static,
        T::Output: 'static,
    {
        let (task, handle) = TaskImpl::with_info(future, &self.info);
        self.info.borrow_mut().task_push(task);
        handle
    }

    /// 通知事件之后, 还需要调用sched或者sched_events才唤醒等待此事件的任务
    pub fn notify_events(&mut self, events: &[Event<'static>]) {
        self.info.borrow_mut().events_push(events);
    }

    /// 临时事件通知并立即启动调度.
    /// 本次调度结束字后,无论events是否被消费,都将被清空.
    /// 如果需要留待后续消费,需要调用notify_events.
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
        self.info.borrow_mut().temp_events_push(events);
        let cnt = self.sched();
        self.info.borrow_mut().temp_events_clear();
        cnt
    }

    /// 唤醒指定task.
    ///
    /// 和异步任务内部的`task_self/task_suspend`配合使用.
    pub fn task_resume(&mut self, task_id: u64) {
        self.info.borrow_mut().task_wake(task_id);
    }

    /// 强制退出指定task
    ///
    /// 强制退出是可能导致资源泄露的. 比如
    /// ```text
    /// let resp = wait_response(&key).await;
    /// // 后续代码可能因为`task_abort`而不会被调用.
    /// // 清理系统资源
    /// stop_sys_timer(htimer);
    /// //...
    /// ```
    ///
    /// 以上`stop_sys_timer(htime)`可能因为`task_force_abort`而不会被调用导致系统资源泄露.
    ///
    /// 对于系统资源,应该在析构函数中释放,比如:
    /// ```text
    /// struct SysTimer(Handle);
    /// impl Drop for SysTimer {
    ///     fn drop(&mut self) {
    ///         stop_sys_timer(self.0);
    ///     }
    /// }
    /// // 利用析构函数释放系统资源,即便调用task_force_abort也可以保证资源被正确释放.
    /// let timer = SysTimer(htimer);
    /// let resp = wait_response(&key).await;
    /// // 后续代码可能因为`task_abort`而不会被调用.
    /// ...
    /// ```
    pub fn task_force_abort(&mut self, task_id: u64) {
        if let Some(mut task) = self.pending.remove(&task_id) {
            task.abort();
        } else {
            self.info.borrow_mut().task_abort(task_id);
        }
    }

    fn collect_waked(&mut self) {
        let info: &mut SchedInfo = &mut self.info.borrow_mut();
        for id in info.waked.iter() {
            if let Some(task) = self.pending.remove(id) {
                info.running.push(task);
            }
        }
        info.waked.clear();
    }

    fn release_aborted(&mut self) {
        loop {
            let mut info = self.info.borrow_mut();
            let mut aborted = vec![];
            for task_id in &info.aborted {
                if let Some(task) = self.pending.remove(task_id) {
                    aborted.push(task);
                }
            }
            info.aborted.clear();
            if aborted.is_empty() {
                return;
            }
            // task资源析构的过程中可能会访问`self.info`.
            // 也可能强制释放task.
            core::mem::drop(info);
            for mut task in aborted {
                task.abort();
            }
        }
    }
}