#[derive(Debug, Default)]
pub struct TransientSource<T> {
state: TransientSourceState<T>,
}
#[derive(Debug)]
enum TransientSourceState<T> {
Keep(T),
Register(T),
Disable(T),
Remove(T),
Replace {
new: T,
old: T,
},
None,
}
impl<T> Default for TransientSourceState<T> {
fn default() -> Self {
Self::None
}
}
impl<T> TransientSourceState<T> {
fn replace_state<F>(&mut self, replacer: F)
where
F: FnOnce(T) -> Self,
{
*self = match std::mem::take(self) {
Self::Keep(source)
| Self::Register(source)
| Self::Remove(source)
| Self::Disable(source)
| Self::Replace { new: source, .. } => replacer(source),
Self::None => return,
};
}
}
impl<T> TransientSource<T> {
pub fn map<F, U>(&mut self, f: F) -> Option<U>
where
F: FnOnce(&mut T) -> U,
{
match &mut self.state {
TransientSourceState::Keep(source)
| TransientSourceState::Register(source)
| TransientSourceState::Disable(source)
| TransientSourceState::Replace { new: source, .. } => Some(f(source)),
TransientSourceState::Remove(_) | TransientSourceState::None => None,
}
}
pub fn is_none(&self) -> bool {
matches!(self.state, TransientSourceState::None)
}
pub fn remove(&mut self) {
self.state.replace_state(TransientSourceState::Remove);
}
pub fn replace(&mut self, new: T) {
self.state
.replace_state(|old| TransientSourceState::Replace { new, old });
}
}
impl<T: crate::EventSource> From<T> for TransientSource<T> {
fn from(source: T) -> Self {
Self {
state: TransientSourceState::Register(source),
}
}
}
impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
type Event = T::Event;
type Metadata = T::Metadata;
type Ret = T::Ret;
type Error = T::Error;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<crate::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let reregister = if let TransientSourceState::Keep(source) = &mut self.state {
let child_post_action = source.process_events(readiness, token, callback)?;
match child_post_action {
crate::PostAction::Continue => false,
crate::PostAction::Reregister => true,
crate::PostAction::Disable => {
self.state.replace_state(TransientSourceState::Disable);
true
}
crate::PostAction::Remove => {
self.state.replace_state(TransientSourceState::Remove);
true
}
}
} else {
false
};
let post_action = if reregister {
crate::PostAction::Reregister
} else {
crate::PostAction::Continue
};
Ok(post_action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
match &mut self.state {
TransientSourceState::Keep(source) => {
source.register(poll, token_factory)?;
}
TransientSourceState::Register(source)
| TransientSourceState::Disable(source)
| TransientSourceState::Replace { new: source, .. } => {
source.register(poll, token_factory)?;
self.state.replace_state(TransientSourceState::Keep);
}
TransientSourceState::Remove(_source) => {
self.state.replace_state(|_| TransientSourceState::None);
}
TransientSourceState::None => (),
}
Ok(())
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
match &mut self.state {
TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?,
TransientSourceState::Register(source) => {
source.register(poll, token_factory)?;
self.state.replace_state(TransientSourceState::Keep);
}
TransientSourceState::Disable(source) => {
source.unregister(poll)?;
}
TransientSourceState::Remove(source) => {
source.unregister(poll)?;
self.state.replace_state(|_| TransientSourceState::None);
}
TransientSourceState::Replace { new, old } => {
old.unregister(poll)?;
new.register(poll, token_factory)?;
self.state.replace_state(TransientSourceState::Keep);
}
TransientSourceState::None => (),
}
Ok(())
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
match &mut self.state {
TransientSourceState::Keep(source)
| TransientSourceState::Register(source)
| TransientSourceState::Disable(source) => source.unregister(poll)?,
TransientSourceState::Remove(source) => {
source.unregister(poll)?;
self.state.replace_state(|_| TransientSourceState::None);
}
TransientSourceState::Replace { new, old } => {
old.unregister(poll)?;
new.unregister(poll)?;
self.state.replace_state(TransientSourceState::Register);
}
TransientSourceState::None => (),
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
channel::{channel, Channel, Event},
ping::{make_ping, PingSource},
Dispatcher, EventSource, PostAction,
};
use std::{
rc::Rc,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
#[test]
fn test_transient_drop() {
struct TestSource<'a> {
dropped: &'a AtomicBool,
ping: PingSource,
}
impl<'a> Drop for TestSource<'a> {
fn drop(&mut self) {
self.dropped.store(true, Ordering::Relaxed)
}
}
impl<'a> crate::EventSource for TestSource<'a> {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<crate::PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.ping.process_events(readiness, token, callback)?;
Ok(PostAction::Remove)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.ping.unregister(poll)
}
}
let mut fired = false;
let dropped = false.into();
let (pinger, ping) = make_ping().unwrap();
let inner = TestSource {
dropped: &dropped,
ping,
};
let outer: TransientSource<_> = inner.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let _token = handle
.insert_source(outer, |_, _, fired| {
*fired = true;
})
.unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
assert!(dropped.load(Ordering::Relaxed));
fired = false;
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(!fired);
}
#[test]
fn test_transient_passthrough() {
let (sender, receiver) = channel();
let outer: TransientSource<_> = receiver.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let mut msg_queue = vec![];
let _token = handle
.insert_source(outer, |msg, _, queue: &mut Vec<_>| {
queue.push(msg);
})
.unwrap();
sender.send(0u32).unwrap();
sender.send(1u32).unwrap();
sender.send(2u32).unwrap();
sender.send(3u32).unwrap();
drop(sender);
event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
assert!(matches!(
msg_queue.as_slice(),
&[
Event::Msg(0u32),
Event::Msg(1u32),
Event::Msg(2u32),
Event::Msg(3u32),
Event::Closed
]
));
}
#[test]
fn test_transient_map() {
struct IdSource {
id: u32,
ping: PingSource,
}
impl EventSource for IdSource {
type Event = u32;
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let id = self.id;
self.ping
.process_events(readiness, token, |_, md| callback(id, md))?;
let action = if self.id > 2 {
PostAction::Remove
} else {
PostAction::Continue
};
Ok(action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.ping.unregister(poll)
}
}
struct WrapperSource(TransientSource<IdSource>);
impl EventSource for WrapperSource {
type Event = <IdSource as EventSource>::Event;
type Metadata = <IdSource as EventSource>::Metadata;
type Ret = <IdSource as EventSource>::Ret;
type Error = <IdSource as EventSource>::Error;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let action = self.0.process_events(readiness, token, callback);
self.0.map(|inner| inner.id += 1);
action
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.0.map(|inner| inner.id += 1);
self.0.unregister(poll)
}
}
let mut id = 0;
let (pinger, ping) = make_ping().unwrap();
let inner = IdSource { id, ping };
let outer: TransientSource<_> = inner.into();
let top = WrapperSource(outer);
let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
*test_id = got_id;
});
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 1);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 2);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 3);
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
assert_eq!(id, 3);
handle.remove(token);
let mut top_after = dispatcher.into_source_inner();
assert!(top_after.0.map(|_| unreachable!()).is_none());
}
#[test]
fn test_transient_disable() {
struct DisablingSource(PingSource);
impl EventSource for DisablingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.0.process_events(readiness, token, callback)?;
Ok(PostAction::Disable)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.0.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.0.unregister(poll)
}
}
let mut fired = false;
let (pinger, ping) = make_ping().unwrap();
let inner = DisablingSource(ping);
let outer: TransientSource<_> = inner.into();
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let token = handle
.insert_source(outer, |_, _, fired| {
*fired = true;
})
.unwrap();
pinger.ping();
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
pinger.ping();
fired = false;
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(!fired);
handle.enable(&token).unwrap();
pinger.ping();
fired = false;
event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
assert!(fired);
}
#[test]
fn test_transient_replace_unregister() {
struct FinishImmediatelySource {
source: PingSource,
data: Option<i32>,
registered: bool,
dropped: Rc<AtomicBool>,
}
impl FinishImmediatelySource {
fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) {
let dropped = Rc::new(false.into());
(
Self {
source,
data: Some(data),
registered: false,
dropped: Rc::clone(&dropped),
},
dropped,
)
}
}
impl EventSource for FinishImmediatelySource {
type Event = i32;
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let mut data = self.data.take();
self.source.process_events(readiness, token, |_, _| {
if let Some(data) = data.take() {
callback(data, &mut ())
}
})?;
self.data = data;
Ok(if self.data.is_none() {
PostAction::Remove
} else {
PostAction::Continue
})
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.registered = true;
self.source.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.source.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.registered = false;
self.source.unregister(poll)
}
}
impl Drop for FinishImmediatelySource {
fn drop(&mut self) {
assert!(!self.registered, "source dropped while still registered");
self.dropped.store(true, Ordering::Relaxed);
}
}
struct WrapperSource {
current: TransientSource<FinishImmediatelySource>,
replacement: Option<FinishImmediatelySource>,
dropped: Rc<AtomicBool>,
}
impl WrapperSource {
fn new(
first: FinishImmediatelySource,
second: FinishImmediatelySource,
) -> (Self, Rc<AtomicBool>) {
let dropped = Rc::new(false.into());
(
Self {
current: first.into(),
replacement: second.into(),
dropped: Rc::clone(&dropped),
},
dropped,
)
}
}
impl EventSource for WrapperSource {
type Event = i32;
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let mut fired = false;
let post_action = self.current.process_events(readiness, token, |data, _| {
callback(data, &mut ());
fired = true;
})?;
if fired {
if let Some(replacement) = self.replacement.take() {
self.current.replace(replacement);
}
assert_eq!(post_action, PostAction::Reregister);
}
Ok(post_action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.current.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.current.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.current.unregister(poll)
}
}
impl Drop for WrapperSource {
fn drop(&mut self) {
self.dropped.store(true, Ordering::Relaxed);
}
}
let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap();
let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap();
let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0);
let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1);
let (outer, outer_dropped) = WrapperSource::new(inner0, inner1);
let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> =
crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let signal = event_loop.get_signal();
let mut context = (None, signal);
let _token = handle
.insert_source(outer, |data, _, (evt, sig)| {
*evt = Some(data);
sig.stop();
})
.unwrap();
ping0_tx.ping();
ping1_tx.ping();
event_loop.run(None, &mut context, |_| {}).unwrap();
assert_eq!(context.0.take(), Some(0), "first inner source did not fire");
assert!(
!outer_dropped.load(Ordering::Relaxed),
"outer source already dropped"
);
assert!(
inner0_dropped.load(Ordering::Relaxed),
"first inner source not dropped"
);
assert!(
!inner1_dropped.load(Ordering::Relaxed),
"replacement inner source dropped"
);
event_loop.run(None, &mut context, |_| {}).unwrap();
assert_eq!(context.0.take(), Some(1), "replacement source did not fire");
}
#[test]
fn test_transient_remove() {
const STOP_AT: i32 = 2;
struct WrapperSource {
inner: TransientSource<Channel<i32>>,
}
impl EventSource for WrapperSource {
type Event = i32;
type Metadata = ();
type Ret = ();
type Error = Box<dyn std::error::Error + Sync + Send>;
fn process_events<F>(
&mut self,
readiness: crate::Readiness,
token: crate::Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let mut remove = false;
let mut post_action = self.inner.process_events(readiness, token, |evt, _| {
if let Event::Msg(num) = evt {
callback(num, &mut ());
remove = num >= STOP_AT;
}
})?;
if remove {
self.inner.remove();
post_action |= PostAction::Reregister;
}
Ok(post_action)
}
fn register(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.inner.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut crate::Poll,
token_factory: &mut crate::TokenFactory,
) -> crate::Result<()> {
self.inner.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
self.inner.unregister(poll)
}
}
let (sender, receiver) = channel();
let wrapper = WrapperSource {
inner: receiver.into(),
};
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
handle
.insert_source(wrapper, |num, _, out: &mut Option<_>| {
*out = Some(num);
})
.unwrap();
let mut out = None;
for num in 0..=STOP_AT {
sender.send(num).unwrap();
event_loop.dispatch(Duration::ZERO, &mut out).unwrap();
assert_eq!(out.take(), Some(num));
}
assert!(matches!(
sender.send(STOP_AT + 1),
Err(std::sync::mpsc::SendError { .. })
));
}
}