#![allow(deprecated)]
#![allow(clippy::all)]
use std::cell::UnsafeCell;
use std::os::windows::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{fmt, io};
use windows_sys::Win32::Foundation::WAIT_TIMEOUT;
use windows_sys::Win32::System::IO::OVERLAPPED;
use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;
use lazycell::AtomicLazyCell;
use miow;
use miow::iocp::{CompletionPort, CompletionStatus};
use event_imp::{Event, Evented, Ready};
use poll::{self, Poll};
use sys::windows::buffer_pool::BufferPool;
use {PollOpt, Token};
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
pub struct Selector {
inner: Arc<SelectorInner>,
}
struct SelectorInner {
id: usize,
port: CompletionPort,
buffers: Mutex<BufferPool>,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
CompletionPort::new(0).map(|port| Selector {
inner: Arc::new(SelectorInner {
id,
port,
buffers: Mutex::new(BufferPool::new(256)),
}),
})
}
pub fn select(
&self,
events: &mut Events,
awakener: Token,
timeout: Option<Duration>,
) -> io::Result<bool> {
trace!("select; timeout={:?}", timeout);
events.clear();
trace!("polling IOCP");
let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
Ok(statuses) => statuses.len(),
Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
Err(e) => return Err(e),
};
let mut ret = false;
for status in events.statuses[..n].iter() {
if status.overlapped() as usize == 0 {
assert_eq!(status.token(), usize::from(awakener));
ret = true;
continue;
}
let callback =
unsafe { (*(status.overlapped() as *mut Overlapped)).callback };
trace!("select; -> got overlapped");
callback(status.entry());
}
trace!("returning");
Ok(ret)
}
pub fn port(&self) -> &CompletionPort {
&self.inner.port
}
pub fn clone_ref(&self) -> Selector {
Selector {
inner: self.inner.clone(),
}
}
pub fn id(&self) -> usize {
self.inner.id
}
}
impl SelectorInner {
fn identical(&self, other: &SelectorInner) -> bool {
std::ptr::eq(self, other)
}
}
pub struct Binding {
selector: AtomicLazyCell<Arc<SelectorInner>>,
}
impl Default for Binding {
fn default() -> Self {
Self::new()
}
}
impl Binding {
pub fn new() -> Binding {
Binding {
selector: AtomicLazyCell::new(),
}
}
pub unsafe fn register_handle(
&self,
handle: &dyn AsRawHandle,
token: Token,
poll: &Poll,
) -> io::Result<()> {
let selector = poll::selector(poll);
drop(self.selector.fill(selector.inner.clone()));
self.check_same_selector(poll)?;
selector.inner.port.add_handle(usize::from(token), handle)
}
pub unsafe fn register_socket(
&self,
handle: &dyn AsRawSocket,
token: Token,
poll: &Poll,
) -> io::Result<()> {
let selector = poll::selector(poll);
drop(self.selector.fill(selector.inner.clone()));
self.check_same_selector(poll)?;
selector.inner.port.add_socket(usize::from(token), handle)
}
pub unsafe fn reregister_handle(
&self,
_handle: &dyn AsRawHandle,
_token: Token,
poll: &Poll,
) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn reregister_socket(
&self,
_socket: &dyn AsRawSocket,
_token: Token,
poll: &Poll,
) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn deregister_handle(
&self,
_handle: &dyn AsRawHandle,
poll: &Poll,
) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn deregister_socket(
&self,
_socket: &dyn AsRawSocket,
poll: &Poll,
) -> io::Result<()> {
self.check_same_selector(poll)
}
fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
match self.selector.borrow() {
Some(prev) if prev.identical(&selector.inner) => Ok(()),
Some(_) | None => Err(other("socket already registered")),
}
}
}
impl fmt::Debug for Binding {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Binding").finish()
}
}
pub struct ReadyBinding {
binding: Binding,
readiness: Option<poll::SetReadiness>,
}
impl ReadyBinding {
#[allow(unused)]
pub fn new() -> ReadyBinding {
ReadyBinding {
binding: Binding::new(),
readiness: None,
}
}
#[allow(unused)]
pub fn registered(&self) -> bool {
self.readiness.is_some()
}
pub fn get_buffer(&self, size: usize) -> Vec<u8> {
match self.binding.selector.borrow() {
Some(i) => i.buffers.lock().unwrap().get(size),
None => Vec::with_capacity(size),
}
}
pub fn put_buffer(&self, buf: Vec<u8>) {
if let Some(i) = self.binding.selector.borrow() {
i.buffers.lock().unwrap().put(buf);
}
}
pub fn set_readiness(&self, set: Ready) {
if let Some(ref i) = self.readiness {
trace!("set readiness to {:?}", set);
i.set_readiness(set).expect("event loop disappeared?");
}
}
pub fn readiness(&self) -> Ready {
match self.readiness {
Some(ref i) => i.readiness(),
None => Ready::empty(),
}
}
pub fn register_socket(
&mut self,
socket: &dyn AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>,
) -> io::Result<()> {
trace!("register {:?} {:?}", token, events);
unsafe {
self.binding.register_socket(socket, token, poll)?;
}
let (r, s) = poll::new_registration(poll, token, events, opts);
self.readiness = Some(s);
*registration.lock().unwrap() = Some(r);
Ok(())
}
pub fn reregister_socket(
&mut self,
socket: &dyn AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>,
) -> io::Result<()> {
trace!("reregister {:?} {:?}", token, events);
unsafe {
self.binding.reregister_socket(socket, token, poll)?;
}
registration
.lock()
.unwrap()
.as_mut()
.unwrap()
.reregister(poll, token, events, opts)
}
pub fn deregister(
&mut self,
socket: &dyn AsRawSocket,
poll: &Poll,
registration: &Mutex<Option<poll::Registration>>,
) -> io::Result<()> {
trace!("deregistering");
unsafe {
self.binding.deregister_socket(socket, poll)?;
}
registration
.lock()
.unwrap()
.as_ref()
.unwrap()
.deregister(poll)
}
}
fn other(s: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, s)
}
#[derive(Debug)]
pub struct Events {
statuses: Box<[CompletionStatus]>,
events: Vec<Event>,
}
impl Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
events: Vec::with_capacity(cap),
}
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn capacity(&self) -> usize {
self.events.capacity()
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.events.get(idx).copied()
}
pub fn push_event(&mut self, event: Event) {
self.events.push(event);
}
pub fn clear(&mut self) {
self.events.truncate(0);
}
}
macro_rules! overlapped2arc {
($e:expr, $t:ty, $($field:ident).+) => ({
let offset = offset_of!($t, $($field).+);
debug_assert!(offset < mem::size_of::<$t>());
FromRawArc::from_raw(($e as usize - offset) as *mut $t)
})
}
#[allow(deref_nullptr)]
macro_rules! offset_of {
($t:ty, $($field:ident).+) => (
&(*(0 as *const $t)).$($field).+ as *const _ as usize
)
}
#[repr(C)]
pub struct Overlapped {
inner: UnsafeCell<miow::Overlapped>,
callback: fn(&OVERLAPPED_ENTRY),
}
impl Overlapped {
pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
Overlapped {
inner: UnsafeCell::new(miow::Overlapped::zero()),
callback: cb,
}
}
pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
unsafe { (*self.inner.get()).raw() }
}
}
impl fmt::Debug for Overlapped {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Overlapped").finish()
}
}
unsafe impl Send for Overlapped {}
unsafe impl Sync for Overlapped {}