use std::{
io,
io::{Read, Write},
sync::{Arc, Mutex},
};
#[cfg(not(target_os = "windows"))]
use std::os::fd::OwnedFd;
#[cfg(target_os = "windows")]
use std::{thread::sleep, time::Duration};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
#[cfg(target_os = "windows")]
use mio_08::net::TcpListener;
#[cfg(not(target_os = "windows"))]
use socketpair::*;
use mio_08::{net::TcpStream, *};
pub struct PollEventSource {
rec_mio_socket: Mutex<mio_08::net::TcpStream>,
}
#[derive(Clone)]
pub struct PollEventSender {
send_mio_socket: Arc<Mutex<mio_08::net::TcpStream>>,
}
#[cfg(not(target_os = "windows"))]
fn set_non_blocking(s: SocketpairStream) -> io::Result<SocketpairStream> {
let owned_fd = OwnedFd::from(s);
let std_socket = std::net::TcpStream::from(owned_fd);
std_socket.set_nonblocking(true)?;
Ok(SocketpairStream::from(OwnedFd::from(std_socket)))
}
#[cfg(not(target_os = "windows"))]
pub fn make_poll_channel() -> io::Result<(PollEventSource, PollEventSender)> {
let (rec_sps, send_sps) = socketpair_stream()?;
let rec_sps = set_non_blocking(rec_sps)?;
let send_sps = set_non_blocking(send_sps)?;
let rec_mio_socket = TcpStream::from_std(std::net::TcpStream::from(OwnedFd::from(rec_sps)));
let send_mio_socket = TcpStream::from_std(std::net::TcpStream::from(OwnedFd::from(send_sps)));
Ok((
PollEventSource {
rec_mio_socket: Mutex::new(rec_mio_socket),
},
PollEventSender {
send_mio_socket: Arc::new(Mutex::new(send_mio_socket)),
},
))
}
#[cfg(target_os = "windows")]
pub fn make_poll_channel() -> io::Result<(PollEventSource, PollEventSender)> {
let listener = match TcpListener::bind("127.0.0.1:0".parse().unwrap()) {
Ok(listener) => listener,
Err(err) => {
error!("Failed to make listener!: {err}");
return Err(err);
}
};
let addr = match listener.local_addr() {
Ok(listener) => listener,
Err(err) => {
error!("Failed to retrieve local address: {err}");
return Err(err);
}
};
let rec_mio_socket = match TcpStream::connect(addr) {
Ok(tcp_stream) => tcp_stream,
Err(err) => {
error!("Failed to connect tcp stream: {err}");
return Err(err);
}
};
let mut send_mio_socket = listener.accept();
while send_mio_socket.is_err() {
sleep(Duration::from_millis(100));
send_mio_socket = listener.accept();
}
let (send_mio_socket, _addr_send) = send_mio_socket?;
debug!("IPC socket: {addr:#?} <-> {_addr_send:#?}");
Ok((
PollEventSource {
rec_mio_socket: Mutex::new(rec_mio_socket),
},
PollEventSender {
send_mio_socket: Arc::new(Mutex::new(send_mio_socket)),
},
))
}
impl PollEventSender {
pub fn send(&self) {
match self.send_mio_socket.lock().unwrap().write(&[0xcc]) {
Ok(_b) => { }
Err(e) => {
info!("PollEventSender.send: {e}");
}
}
}
}
impl PollEventSource {
pub fn drain(&self) {
let mut buf = Vec::with_capacity(16);
match self.rec_mio_socket.lock().unwrap().read_to_end(&mut buf) {
Ok(_) => (),
Err(err) => {
match err.kind() {
io::ErrorKind::WouldBlock => {} other_kind => {
info!("PollEventSource.drain(): {other_kind}");
}
}
}
}
}
}
impl event::Source for PollEventSource {
fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> io::Result<()> {
self
.rec_mio_socket
.lock()
.unwrap()
.register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
self
.rec_mio_socket
.lock()
.unwrap()
.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
self.rec_mio_socket.lock().unwrap().deregister(registry)
}
}