use std::cell::UnsafeCell;
use std::os::windows::io::AsRawSocket;
use std::time::Duration;
use std::{io, ptr};
use super::miow::{CompletionPort, CompletionStatus};
use crate::coroutine_impl::CoroutineImpl;
use crate::scheduler::Scheduler;
use crate::timeout_list::{now, TimeOutList, TimeoutHandle};
use crate::yield_now::set_co_para;
use smallvec::SmallVec;
use windows_sys::Win32::Foundation::*;
use windows_sys::Win32::System::IO::{CancelIoEx, GetOverlappedResult, OVERLAPPED};
pub struct TimerData {
event_data: *mut EventData,
}
type TimerList = TimeOutList<TimerData>;
pub type TimerHandle = TimeoutHandle<TimerData>;
#[repr(C)]
pub struct EventData {
overlapped: UnsafeCell<OVERLAPPED>,
pub handle: HANDLE,
pub timer: Option<TimerHandle>,
pub co: Option<CoroutineImpl>,
}
impl EventData {
pub fn new(handle: HANDLE) -> EventData {
EventData {
overlapped: UnsafeCell::new(unsafe { ::std::mem::zeroed() }),
handle,
timer: None,
co: None,
}
}
#[inline]
pub fn get_overlapped(&mut self) -> *mut OVERLAPPED {
self.overlapped.get()
}
#[cfg(feature = "io_timeout")]
pub fn timer_data(&self) -> TimerData {
TimerData {
event_data: self as *const _ as *mut _,
}
}
pub fn get_io_size(&self) -> usize {
let ol = unsafe { &*self.overlapped.get() };
ol.InternalHigh
}
}
pub type SysEvent = CompletionStatus;
struct SingleSelector {
port: CompletionPort,
timer_list: TimerList,
}
impl SingleSelector {
pub fn new() -> io::Result<SingleSelector> {
CompletionPort::new(1).map(|cp| SingleSelector {
port: cp,
timer_list: TimerList::new(),
})
}
}
pub(crate) struct Selector {
vec: SmallVec<[SingleSelector; 128]>,
}
impl Selector {
pub fn new(io_workers: usize) -> io::Result<Self> {
let mut s = Selector {
vec: SmallVec::new(),
};
for _ in 0..io_workers {
let ss = SingleSelector::new()?;
s.vec.push(ss);
}
Ok(s)
}
#[inline]
pub fn select(
&self,
scheduler: &Scheduler,
id: usize,
events: &mut [SysEvent],
timeout: Option<u64>,
) -> io::Result<Option<u64>> {
assert!(id < self.vec.len());
let timeout = timeout.map(Duration::from_nanos);
let single_selector = &self.vec[id];
let n = match single_selector.port.get_many(events, timeout) {
Ok(statuses) => statuses.len(),
Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
Err(e) => return Err(e),
};
for status in unsafe { events.get_unchecked(..n) } {
let overlapped = status.overlapped();
if overlapped.is_null() {
scheduler.collect_global(id);
continue;
}
let data = unsafe { &mut *(overlapped as *mut EventData) };
let mut co = data.co.take().expect("can't get co in selector");
data.timer.take().map(|h| {
unsafe {
h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
}
h.remove()
});
let overlapped = unsafe { &*overlapped };
const STATUS_CANCELLED_U32: u32 = STATUS_CANCELLED as u32;
match overlapped.Internal as u32 {
ERROR_OPERATION_ABORTED | STATUS_CANCELLED_U32 => {
warn!("coroutine timeout, stat=0x{:x}", overlapped.Internal);
set_co_para(&mut co, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
}
NO_ERROR => {
}
err => {
error!("iocp err=0x{err:08x}");
unsafe {
let mut size: u32 = 0;
let o = overlapped as *const _ as *mut _;
GetOverlappedResult(data.handle, o, &mut size, S_FALSE);
}
set_co_para(&mut co, io::Error::last_os_error());
}
}
#[cfg(feature = "work_steal")]
scheduler.schedule_with_id(co, id);
#[cfg(not(feature = "work_steal"))]
crate::coroutine_impl::run_coroutine(co);
}
scheduler.run_queued_tasks(id);
let next_expire = single_selector
.timer_list
.schedule_timer(now(), &timeout_handler);
Ok(next_expire)
}
#[inline]
pub fn wakeup(&self, id: usize) {
self.vec[id]
.port
.post(CompletionStatus::new(0, 0, ptr::null_mut()))
.unwrap();
}
#[inline]
pub fn add_socket<T: AsRawSocket + ?Sized>(&self, t: &T) -> io::Result<()> {
let fd = (t.as_raw_socket() as usize) >> 2;
let id = fd % self.vec.len();
self.vec[id].port.add_socket(fd, t)
}
#[inline]
#[cfg(feature = "io_timeout")]
pub fn add_io_timer(&self, io: &mut EventData, timeout: Duration) {
let id = (io.handle as usize % self.vec.len()) >> 2;
let (h, b_new) = self.vec[id].timer_list.add_timer(timeout, io.timer_data());
if b_new {
self.wakeup(0);
}
io.timer.replace(h);
}
}
unsafe fn cancel_io(handle: HANDLE, overlapped: *mut OVERLAPPED) -> io::Result<()> {
let ret = CancelIoEx(handle, overlapped);
if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
pub fn timeout_handler(data: TimerData) {
if data.event_data.is_null() {
return;
}
unsafe {
let event_data = &mut *data.event_data;
event_data.timer.take();
cancel_io(event_data.handle, event_data.get_overlapped())
.unwrap_or_else(|e| error!("CancelIoEx failed! e = {e}"));
}
}