hiasync 0.1.4

Supports only single-threaded asynchronous runtime
Documentation
use crate::{JoinHandle, Task, TaskImpl, TaskState, Waker};
use std::any::Any;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::Context;

/// 事件传递数据到异步任务,和`wait_event`配套使用.
#[derive(Clone, Debug)]
pub struct Event<'a> {
    pub id: u64,
    pub data: &'a dyn Any,
}

impl<'a> Event<'a> {
    pub fn new(id: u64, data: &'a dyn Any) -> Self {
        Self { id, data }
    }
}

pub(crate) struct SchedInfo {
    task_id: u64,
    event_id: u64,
    // 返回是Box<dyn Task>而非Rc<dyn Task>的主要目的是希望Task调度结束时所有内存能立即释放.
    // 如果使用Rc内存可能因为Rc::clone或者Weak等引用导致其内存空间的释放时间完全不可控.
    // 涉及到Task的状态修改, 数据通信全部通过task_id来实现,
    // 就是后续的waked/events_map/events/temp_ewvents/aborted等.
    running: Vec<Box<dyn Task>>,
    waked: Vec<u64>,
    events_map: HashMap<u64, u64>,
    events: HashMap<u64, &'static dyn Any>,
    temp_events: HashMap<u64, NonNull<dyn Any>>,
    aborted: HashSet<u64>,
}

impl SchedInfo {
    fn new() -> Self {
        Self {
            task_id: 0,
            event_id: 0,
            running: vec![],
            waked: vec![],
            events_map: HashMap::new(),
            events: HashMap::new(),
            temp_events: HashMap::new(),
            aborted: HashSet::new(),
        }
    }

    // 每次调用生成不同的event_id.
    pub fn new_event_id(&mut self) -> u64 {
        let id = self.event_id;
        self.event_id += 1;
        id
    }

    // 每次调用生成不同的task_id.
    pub fn new_task_id(&mut self) -> u64 {
        let id = self.task_id;
        self.task_id += 1;
        id
    }

    // 调用非阻塞API之后,非阻塞API的返回结果以事件数据的形式到来,这里需要将事假和task关联,这样事件到来后才能唤醒此task
    // 调用者保证事件ID唯一
    pub fn event_register(&mut self, event_id: u64, task_id: u64) {
        self.events_map.insert(event_id, task_id);
    }

    // 用于忽略事件, 调用此接口保证不会有资源泄露.
    pub fn event_unregister(&mut self, event_id: u64) {
        let _ = self.events_map.remove(&event_id);
        let _ = self.events.remove(&event_id);
    }

    // 消费事件
    pub fn event_remove<'a>(&mut self, event_id: u64) -> Option<&'a dyn Any> {
        if let Some(data) = self.events.remove(&event_id) {
            return Some(data);
        }
        if let Some(data) = self.temp_events.remove(&event_id) {
            return Some(unsafe { data.as_ref() });
        }
        None
    }

    // 唤醒task
    pub fn task_wake(&mut self, task_id: u64) {
        self.waked.push(task_id);
    }

    // new task
    pub fn task_push(&mut self, task: Box<dyn Task>) {
        self.running.push(task);
    }

    pub fn task_abort(&mut self, task_id: u64) {
        self.aborted.insert(task_id);
    }

    pub fn task_aborted(&mut self, task_id: u64) -> bool {
        self.aborted.remove(&task_id)
    }

    // 事件通知,唤醒注册事件的task
    pub fn event_notify(&mut self, event_id: u64) {
        if let Some(task_id) = self.events_map.remove(&event_id) {
            self.waked.push(task_id);
        }
    }

    fn events_push(&mut self, events: &[Event<'static>]) {
        for event in events {
            self.events.insert(event.id, event.data);
            self.event_notify(event.id);
        }
    }

    fn temp_events_push(&mut self, events: &[Event<'_>]) {
        for event in events {
            let data = event.data as *const dyn Any;
            self.temp_events
                .insert(event.id, NonNull::new(data.cast_mut()).unwrap());
            self.event_notify(event.id);
        }
    }

    // 清空临时事件
    fn temp_events_clear(&mut self) {
        self.temp_events.clear();
    }
}

/// 单线程下的异步运行时.
pub struct Runtime {
    pending: HashMap<u64, Box<dyn Task>>,
    info: Rc<RefCell<SchedInfo>>,
}

impl Default for Runtime {
    fn default() -> Self {
        Self::new()
    }
}

impl Runtime {
    pub fn new() -> Self {
        Self {
            pending: HashMap::new(),
            info: Rc::new(RefCell::new(SchedInfo::new())),
        }
    }

    /// 返回等待外部事件激活的异步任务数量
    pub fn pending_count(&self) -> usize {
        self.pending.len() + self.info.borrow_mut().running.len()
    }

    /// 启动调度,直到没有可运行的任务后返回
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched(&mut self) -> usize {
        let mut cnt = 0;
        loop {
            self.collect_waked();
            if self.info.borrow_mut().running.is_empty() {
                break;
            }
            let mut task = self.info.borrow_mut().running.remove(0);
            let task_id = task.get_id();
            if let TaskState::Pending = task.get_status() {
                if self.info.borrow_mut().task_aborted(task_id) {
                    continue;
                }
                task.set_running();
            }
            let local_waker = Waker::new(self.info.clone(), task_id);
            let waker = local_waker.task_waker();
            let mut ctx = Context::from_waker(&waker);
            if task.run(&mut ctx).is_pending() {
                self.pending.insert(task.get_id(), task);
            } else {
                cnt += 1;
            }
        }
        cnt
    }

    /// 创建异步任务,还需要调用sched或者sched_events才执行此任务
    pub fn spawn<T>(&mut self, future: T) -> JoinHandle<T>
    where
        T: Future + 'static,
        T::Output: 'static,
    {
        let (task, handle) = TaskImpl::with_info(future, &self.info);
        self.info.borrow_mut().task_push(task);
        handle
    }

    /// 通知事件之后, 还需要调用sched或者sched_events才唤醒等待此事件的任务
    pub fn notify_events(&mut self, events: &[Event<'static>]) {
        self.info.borrow_mut().events_push(events);
    }

    /// 临时事件通知并立即启动调度.
    /// 本次调度结束字后,无论events是否被消费,都将被清空.
    /// 如果需要留待后续消费,需要调用notify_events.
    ///
    /// 返回本次调度过程结束的任务数量.
    pub fn sched_events(&mut self, events: &[Event<'_>]) -> usize {
        self.info.borrow_mut().temp_events_push(events);
        let cnt = self.sched();
        self.info.borrow_mut().temp_events_clear();
        cnt
    }

    fn collect_waked(&mut self) {
        let info: &mut SchedInfo = &mut self.info.borrow_mut();
        for id in info.waked.iter() {
            if let Some(task) = self.pending.remove(id) {
                info.running.push(task);
            }
        }
        info.waked.clear();
    }
}