use crate::windows::spsc::*;
use corcovado::{
event::Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token,
};
use miow::pipe::{AnonRead, AnonWrite};
use parking_lot::{Condvar, Mutex};
use windows_sys::Win32::System::IO::CancelSynchronousIo;
use std::io;
use std::os::windows::io::AsRawHandle;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, TryRecvError},
Arc,
};
use std::thread::{spawn, JoinHandle};
struct WaitTag {}
struct EventedAnonReadInner {
registration: Registration,
readiness: SetReadiness,
done: AtomicBool,
sig_buffer_not_full: Condvar,
wait_tag: Mutex<WaitTag>,
}
pub struct EventedAnonRead {
thread: Option<JoinHandle<()>>,
consumer: SpscBufferReader,
inner: Arc<EventedAnonReadInner>,
error_receiver: Receiver<String>,
}
macro_rules! try_or_send {
($e:expr, $sender:ident) => {
match $e {
Ok(value) => value,
Err(e) => {
$sender
.send(format!("{}", e))
.expect("Could not send error");
return;
}
}
};
}
impl EventedAnonRead {
pub fn new(mut pipe: AnonRead) -> Self {
let (registration, readiness) = Registration::new2();
let (mut producer, consumer) = spsc_buffer(65536);
let done = AtomicBool::new(false);
let sig_buffer_not_full = Condvar::new();
let wait_tag = Mutex::new(WaitTag {});
let (error_sender, error_receiver) = channel();
let inner = Arc::new(EventedAnonReadInner {
registration,
readiness,
done,
sig_buffer_not_full,
wait_tag,
});
let thread = {
let inner = inner.clone();
spawn(move || {
use std::io::Read;
let mut tmp_buf = [0u8; 65535];
loop {
if inner.done.load(Ordering::SeqCst) {
return;
}
let nbytes = try_or_send!(pipe.read(&mut tmp_buf[..]), error_sender);
let mut written = 0usize;
while written < nbytes {
if producer.is_full() {
let mut wait_tag = inner.wait_tag.lock();
inner.sig_buffer_not_full.wait(&mut wait_tag);
if inner.done.load(Ordering::SeqCst) {
return;
}
}
written += producer.write_from_slice(&tmp_buf[written..nbytes]);
if !inner.readiness.readiness().is_readable() {
try_or_send!(
inner.readiness.set_readiness(Ready::readable()),
error_sender
);
}
}
}
})
};
Self {
thread: Some(thread),
consumer,
inner,
error_receiver,
}
}
}
impl io::Read for EventedAnonRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.thread.is_none() {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, ""));
}
match self.error_receiver.try_recv() {
Ok(err) => {
self.thread.take().unwrap().join().unwrap();
return Err(io::Error::new(io::ErrorKind::BrokenPipe, err));
}
Err(TryRecvError::Disconnected) => {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, ""))
}
Err(TryRecvError::Empty) => {}
}
let nbytes = self.consumer.read_to_slice(buf);
if self.consumer.is_empty() {
self.inner.readiness.set_readiness(Ready::empty())?;
if !self.consumer.is_empty() {
self.inner.readiness.set_readiness(Ready::readable())?;
}
}
self.inner.sig_buffer_not_full.notify_one();
Ok(nbytes)
}
}
impl Evented for EventedAnonRead {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
poll.register(&self.inner.registration, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
poll.reregister(&self.inner.registration, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
poll.deregister(&self.inner.registration)
}
}
impl Drop for EventedAnonRead {
fn drop(&mut self) {
self.inner.done.store(true, Ordering::SeqCst);
self.inner.sig_buffer_not_full.notify_one();
let thread = self.thread.take().unwrap();
unsafe {
CancelSynchronousIo(thread.as_raw_handle() as isize);
}
thread
.join()
.expect("Could not close EventedAnonRead worker");
}
}
struct EventedAnonWriteInner {
registration: Registration,
readiness: SetReadiness,
done: AtomicBool,
sig_buffer_not_empty: Condvar,
wait_tag: Mutex<WaitTag>,
}
pub struct EventedAnonWrite {
thread: Option<JoinHandle<()>>,
producer: SpscBufferWriter,
inner: Arc<EventedAnonWriteInner>,
error_receiver: Receiver<String>,
}
impl EventedAnonWrite {
pub fn new(mut pipe: AnonWrite) -> Self {
let (registration, readiness) = Registration::new2();
let (producer, mut consumer) = spsc_buffer(65536);
let done = AtomicBool::new(false);
let sig_buffer_not_empty = Condvar::new();
let wait_tag = Mutex::new(WaitTag {});
let inner = Arc::new(EventedAnonWriteInner {
registration,
readiness,
done,
sig_buffer_not_empty,
wait_tag,
});
let (error_sender, error_receiver) = channel();
let thread = {
let inner = inner.clone();
spawn(move || {
use std::io::Write;
let mut tmp_buf = [0u8; 65535];
try_or_send!(
inner.readiness.set_readiness(Ready::writable()),
error_sender
);
loop {
if inner.done.load(Ordering::SeqCst) {
return;
}
let nbytes = {
if consumer.is_empty() {
let mut wait_tag = inner.wait_tag.lock();
inner.sig_buffer_not_empty.wait(&mut wait_tag);
if inner.done.load(Ordering::SeqCst) {
return;
}
}
let nbytes = consumer.read_to_slice(&mut tmp_buf);
if !inner.readiness.readiness().is_writable() {
try_or_send!(
inner.readiness.set_readiness(Ready::writable()),
error_sender
);
}
nbytes
};
let mut written = 0usize;
while written < nbytes {
written += try_or_send!(
pipe.write(&tmp_buf[written..nbytes]),
error_sender
);
}
}
})
};
Self {
thread: Some(thread),
producer,
inner,
error_receiver,
}
}
}
impl io::Write for EventedAnonWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.thread.is_none() {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, ""));
}
match self.error_receiver.try_recv() {
Ok(err) => {
self.thread.take().unwrap().join().unwrap();
return Err(io::Error::new(io::ErrorKind::BrokenPipe, err));
}
Err(TryRecvError::Disconnected) => {
return Err(io::Error::new(io::ErrorKind::BrokenPipe, ""))
}
Err(TryRecvError::Empty) => {}
}
let nbytes = self.producer.write_from_slice(buf);
if self.producer.is_full() {
self.inner.readiness.set_readiness(Ready::empty())?;
if !self.producer.is_full() {
self.inner.readiness.set_readiness(Ready::writable())?;
}
}
self.inner.sig_buffer_not_empty.notify_one();
Ok(nbytes)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Evented for EventedAnonWrite {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
poll.register(&self.inner.registration, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
poll.reregister(&self.inner.registration, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
poll.deregister(&self.inner.registration)
}
}
impl Drop for EventedAnonWrite {
fn drop(&mut self) {
self.inner.done.store(true, Ordering::SeqCst);
self.inner.sig_buffer_not_empty.notify_one();
self.thread
.take()
.unwrap()
.join()
.expect("Could not close EventedAnonWrite worker");
}
}