use crate::sources::EventSource;
use polling::os::iocp::{CompletionPacket, PollerIocpExt};
use polling::Poller;
use tracing::{trace, warn};
use std::fmt;
use std::io;
use std::sync::{Arc, Mutex, TryLockError};
#[inline]
pub fn make_ping() -> io::Result<(Ping, PingSource)> {
let state = Arc::new(State {
counter: Mutex::new(Counter {
notified: false,
poll_state: None,
}),
});
Ok((
Ping {
state: state.clone(),
},
PingSource { state },
))
}
#[derive(Clone)]
pub struct Ping {
state: Arc<State>,
}
impl fmt::Debug for Ping {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_ping(&self.state, "Ping", f)
}
}
pub struct PingSource {
state: Arc<State>,
}
impl fmt::Debug for PingSource {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_ping(&self.state, "PingSource", f)
}
}
impl Ping {
pub fn ping(&self) {
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
counter.notified = true;
let poll_state = match &mut counter.poll_state {
Some(ps) => ps,
None => {
warn!("ping was not registered with the event loop");
return;
}
};
if let Err(e) = poll_state.notify() {
warn!("failed to post packet to IOCP: {e}");
}
}
}
impl Drop for Ping {
fn drop(&mut self) {
if Arc::strong_count(&self.state) <= 2 {
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
if let Some(poll_state) = &mut counter.poll_state {
if let Err(e) = poll_state.notify() {
warn!("failed to post packet to IOCP during drop: {e}");
}
}
}
}
}
impl EventSource for PingSource {
type Error = super::PingError;
type Event = ();
type Metadata = ();
type Ret = ();
fn process_events<F>(
&mut self,
_readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<crate::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
let poll_state = match &mut counter.poll_state {
Some(ps) => ps,
None => {
return Ok(crate::PostAction::Disable);
}
};
poll_state.inserted = false;
let token: usize = token.inner.into();
if poll_state.packet.event().key != token {
warn!(
"token does not match; expected {:x}, got {:x}",
poll_state.packet.event().key,
token
);
return Ok(crate::PostAction::Continue);
}
if counter.notified {
counter.notified = false;
callback((), &mut ());
}
let action = if Arc::strong_count(&self.state) <= 1 {
crate::PostAction::Remove
} else {
crate::PostAction::Continue
};
Ok(action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
let token = token_factory.token();
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
if counter.poll_state.is_some() {
return Err(io::Error::from(io::ErrorKind::AlreadyExists).into());
}
let packet = {
let token = token.inner.into();
let event = polling::Event::readable(token);
CompletionPacket::new(event)
};
let poll_state = PollState::new(poll.poller(), packet, counter.notified)?;
counter.poll_state = Some(poll_state);
Ok(())
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
let token = token_factory.token();
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
let poll_state = match &mut counter.poll_state {
Some(ps) => ps,
None => return Err(io::Error::from(io::ErrorKind::NotFound).into()),
};
if !Arc::ptr_eq(&poll_state.poller, poll.poller()) {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"attempted to reregister() a PingSource with a different poller",
)
.into());
}
let token = token.inner.into();
let event = polling::Event::readable(token);
if event.key != poll_state.packet.event().key {
poll_state.packet = CompletionPacket::new(event);
if poll_state.inserted {
poll_state.inserted = false;
poll_state.notify()?;
}
}
Ok(())
}
fn unregister(&mut self, _poll: &mut crate::Poll) -> crate::Result<()> {
let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner());
if counter.poll_state.take().is_none() {
trace!("unregistered a source that wasn't registered");
}
Ok(())
}
}
struct State {
counter: Mutex<Counter>,
}
struct Counter {
notified: bool,
poll_state: Option<PollState>,
}
struct PollState {
poller: Arc<Poller>,
inserted: bool,
packet: CompletionPacket,
}
impl PollState {
fn new(poller: &Arc<Poller>, packet: CompletionPacket, notified: bool) -> io::Result<Self> {
let mut poll_state = Self {
poller: poller.clone(),
packet,
inserted: false,
};
if notified {
poll_state.notify()?;
}
Ok(poll_state)
}
fn notify(&mut self) -> io::Result<()> {
if !self.inserted {
self.poller.post(self.packet.clone())?;
self.inserted = true;
}
Ok(())
}
}
#[inline]
fn debug_ping(state: &State, name: &str, f: &mut fmt::Formatter) -> fmt::Result {
let counter = match state.counter.try_lock() {
Ok(counter) => counter,
Err(TryLockError::WouldBlock) => {
return f
.debug_tuple("Ping")
.field(&format_args!("<locked>"))
.finish()
}
Err(TryLockError::Poisoned(_)) => {
return f
.debug_tuple("Ping")
.field(&format_args!("<poisoned>"))
.finish()
}
};
let mut s = f.debug_struct(name);
s.field("notified", &counter.notified);
match &counter.poll_state {
Some(poll_state) => {
s.field("packet", poll_state.packet.event());
s.field("inserted", &poll_state.inserted);
}
None => {
s.field("packet", &format_args!("<not registered>"));
}
}
s.finish()
}