use alloc::collections::vec_deque::VecDeque;
use core::cell::UnsafeCell;
use core::future::Future;
use core::mem;
use core::ptr::{self, NonNull};
pub use async_task::Task;
use async_task::{Runnable, ScheduleInfo, WithInfo};
use nginx_sys::{
ngx_del_timer, ngx_delete_posted_event, ngx_event_t, ngx_post_event, ngx_posted_next_events,
};
use crate::log::ngx_cycle_log;
use crate::{ngx_container_of, ngx_log_debug};
static SCHEDULER: Scheduler = Scheduler::new();
struct Scheduler(UnsafeCell<SchedulerInner>);
unsafe impl Send for Scheduler {}
unsafe impl Sync for Scheduler {}
impl Scheduler {
const fn new() -> Self {
Self(SchedulerInner::new())
}
pub fn schedule(&self, runnable: Runnable) {
let inner = unsafe { &mut *UnsafeCell::raw_get(&self.0) };
inner.send(runnable)
}
}
#[repr(C)]
struct SchedulerInner {
_ident: [usize; 4], event: ngx_event_t,
queue: VecDeque<Runnable>,
}
impl SchedulerInner {
const fn new() -> UnsafeCell<Self> {
let mut event: ngx_event_t = unsafe { mem::zeroed() };
event.handler = Some(Self::scheduler_event_handler);
UnsafeCell::new(Self {
_ident: [
0, 0, 0, 0x4153594e, ],
event,
queue: VecDeque::new(),
})
}
pub fn send(&mut self, runnable: Runnable) {
self.event.log = ngx_cycle_log().as_ptr();
if self.event.data.is_null() {
self.event.data = ptr::from_mut(self).cast();
}
self.queue.push_back(runnable);
unsafe { ngx_post_event(&mut self.event, ptr::addr_of_mut!(ngx_posted_next_events)) }
}
extern "C" fn scheduler_event_handler(ev: *mut ngx_event_t) {
let mut runnables = {
let cell: NonNull<UnsafeCell<Self>> =
unsafe { ngx_container_of!(NonNull::new_unchecked(ev), Self, event).cast() };
let this = unsafe { &mut *UnsafeCell::raw_get(cell.as_ptr()) };
ngx_log_debug!(
this.event.log,
"async: processing {} deferred wakeups",
this.queue.len()
);
mem::take(&mut this.queue)
};
for runnable in runnables.drain(..) {
runnable.run();
}
}
}
impl Drop for SchedulerInner {
fn drop(&mut self) {
if self.event.posted() != 0 {
unsafe { ngx_delete_posted_event(&mut self.event) };
}
if self.event.timer_set() != 0 {
unsafe { ngx_del_timer(&mut self.event) };
}
}
}
fn schedule(runnable: Runnable, info: ScheduleInfo) {
if info.woken_while_running {
SCHEDULER.schedule(runnable);
ngx_log_debug!(
ngx_cycle_log().as_ptr(),
"async: task scheduled while running"
);
} else {
runnable.run();
}
}
pub fn spawn<F, T>(future: F) -> Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
ngx_log_debug!(ngx_cycle_log().as_ptr(), "async: spawning new task");
let scheduler = WithInfo(schedule);
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, scheduler) };
runnable.schedule();
task
}