#[derive(Debug)]
pub enum TransientSource<T> {
Keep(T),
Register(T),
Disable(T),
Remove(T),
None,
}
impl<T> TransientSource<T> {
pub fn map<F, U>(&mut self, f: F) -> Option<U>
where
F: FnOnce(&mut T) -> U,
{
match self {
TransientSource::Keep(source)
| TransientSource::Register(source)
| TransientSource::Disable(source)
| TransientSource::Remove(source) => Some(f(source)),
TransientSource::None => None,
}
}
fn replace<F>(&mut self, replacer: F)
where
F: FnOnce(T) -> Self,
{
*self = match std::mem::replace(self, TransientSource::None) {
TransientSource::Keep(source)
| TransientSource::Register(source)
| TransientSource::Remove(source)
| TransientSource::Disable(source) => replacer(source),
TransientSource::None => return,
};
}
}
impl<T: crate::EventSource> From<T> for TransientSource<T> {
fn from(source: T) -> Self {
Self::Register(source)
}
}
impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
type Event = T::Event;
type Metadata = T::Metadata;
type Ret = T::Ret;
type Error = T::Error;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<crate::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let reregister = if let TransientSource::Keep(ref mut source) = self {
let child_post_action = source.process_events(readiness, token, callback)?;
match child_post_action {
crate::PostAction::Continue => false,
crate::PostAction::Reregister => true,
crate::PostAction::Disable => {
self.replace(TransientSource::Disable);
true
}
crate::PostAction::Remove => {
self.replace(TransientSource::Remove);
true
}
}
} else {
false
};
let post_action = if reregister {
crate::PostAction::Reregister
} else {
crate::PostAction::Continue
};
Ok(post_action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
match self {
TransientSource::Keep(source) => {
source.register(poll, token_factory)?;
}
TransientSource::Register(source) | TransientSource::Disable(source) => {
source.register(poll, token_factory)?;
self.replace(TransientSource::Keep);
}
TransientSource::Remove(_source) => {
*self = TransientSource::None;
}
TransientSource::None => (),
}
Ok(())
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
match self {
TransientSource::Keep(source) => source.reregister(poll, token_factory)?,
TransientSource::Register(source) => {
source.register(poll, token_factory)?;
self.replace(TransientSource::Keep);
}
TransientSource::Disable(source) => {
source.unregister(poll)?;
}
TransientSource::Remove(source) => {
source.unregister(poll)?;
*self = TransientSource::None;
}
TransientSource::None => (),
}
Ok(())
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
match self {
TransientSource::Keep(source)
| TransientSource::Register(source)
| TransientSource::Disable(source) => source.unregister(poll)?,
TransientSource::Remove(source) => {
source.unregister(poll)?;
*self = TransientSource::None;
}
TransientSource::None => (),
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
channel::{channel, Event},
ping::{make_ping, PingSource},
Dispatcher, EventSource, PostAction,
};
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
#[test]
fn test_transient_drop() {
struct TestSource<'a> {
dropped: &'a AtomicBool,
ping: PingSource,
}
impl<'a> Drop for TestSource<'a> {
fn drop(&mut self) {
self.dropped.store(true, Ordering::Relaxed)
}
}
impl<'a> crate::EventSource for TestSource<'a> {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<crate::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.ping.process_events(readiness, token, callback)?;
Ok(PostAction::Remove)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.ping.unregister(poll)
}
}
let mut fired = false;
let dropped = false.into();
let (pinger, ping) = make_ping().unwrap();
let inner = TestSource {
dropped: &dropped,
ping,
};
let outer: TransientSource<_> = inner.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let _token = handle
.insert_source(outer, |_, _, fired| {
*fired = true;
})
.unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
assert!(dropped.load(Ordering::Relaxed));
fired = false;
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(!fired);
}
#[test]
fn test_transient_passthrough() {
let (sender, receiver) = channel();
let outer: TransientSource<_> = receiver.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let mut msg_queue = vec![];
let _token = handle
.insert_source(outer, |msg, _, queue: &mut Vec<_>| {
queue.push(msg);
})
.unwrap();
sender.send(0u32).unwrap();
sender.send(1u32).unwrap();
sender.send(2u32).unwrap();
sender.send(3u32).unwrap();
drop(sender);
event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
assert!(matches!(
msg_queue.as_slice(),
&[
Event::Msg(0u32),
Event::Msg(1u32),
Event::Msg(2u32),
Event::Msg(3u32),
Event::Closed
]
));
}
#[test]
fn test_transient_map() {
struct IdSource {
id: u32,
ping: PingSource,
}
impl EventSource for IdSource {
type Event = u32;
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let id = self.id;
self.ping
.process_events(readiness, token, |_, md| callback(id, md))?;
let action = if self.id > 2 {
PostAction::Remove
} else {
PostAction::Continue
};
Ok(action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.ping.unregister(poll)
}
}
struct WrapperSource(TransientSource<IdSource>);
impl EventSource for WrapperSource {
type Event = <IdSource as EventSource>::Event;
type Metadata = <IdSource as EventSource>::Metadata;
type Ret = <IdSource as EventSource>::Ret;
type Error = <IdSource as EventSource>::Error;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let action = self.0.process_events(readiness, token, callback);
self.0.map(|inner| inner.id += 1);
action
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.unregister(poll)
}
}
let mut id = 0;
let (pinger, ping) = make_ping().unwrap();
let inner = IdSource { id, ping };
let outer: TransientSource<_> = inner.into();
let top = WrapperSource(outer);
let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
*test_id = got_id;
});
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 1);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 2);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 3);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 3);
handle.remove(token);
let mut top_after = dispatcher.into_source_inner();
assert!(top_after.0.map(|_| unreachable!()).is_none());
}
#[test]
fn test_transient_disable() {
struct DisablingSource(PingSource);
impl EventSource for DisablingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.0.process_events(readiness, token, callback)?;
Ok(PostAction::Disable)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.0.unregister(poll)
}
}
let mut fired = false;
let (pinger, ping) = make_ping().unwrap();
let inner = DisablingSource(ping);
let outer: TransientSource<_> = inner.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let token = handle
.insert_source(outer, |_, _, fired| {
*fired = true;
})
.unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
pinger.ping();
fired = false;
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(!fired);
handle.enable(&token).unwrap();
pinger.ping();
fired = false;
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
}
}