calloop/sources/transient.rs
1//! Wrapper for a transient Calloop event source.
2//!
3//! If you have high level event source that you expect to remain in the event
4//! loop indefinitely, and another event source nested inside that one that you
5//! expect to require removal or disabling from time to time, this module can
6//! handle it for you.
7
8/// A [`TransientSource`] wraps a Calloop event source and manages its
9/// registration. A user of this type only needs to perform the usual Calloop
10/// calls (`process_events()` and `*register()`) and the return value of
11/// [`process_events()`](crate::EventSource::process_events).
12///
13/// Rather than needing to check for the full set of
14/// [`PostAction`](crate::PostAction) values returned from `process_events()`,
15/// you can just check for `Continue` or `Reregister` and pass that back out
16/// through your own `process_events()` implementation. In your registration
17/// functions, you then only need to call the same function on this type ie.
18/// `register()` inside `register()` etc.
19///
20/// For example, say you have a source that contains a channel along with some
21/// other logic. If the channel's sending end has been dropped, it needs to be
22/// removed from the loop. So to manage this, you use this in your struct:
23///
24/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
25/// struct CompositeSource {
26/// // Event source for channel.
27/// mpsc_receiver: TransientSource<calloop::channel::Channel<T>>,
28///
29/// // Any other fields go here...
30/// }
31/// ```
32///
33/// To create the transient source, you can simply use the `Into`
34/// implementation:
35///
36/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
37/// let (sender, source) = channel();
38/// let mpsc_receiver: TransientSource<Channel> = source.into();
39/// ```
40///
41/// (If you want to start off with an empty `TransientSource`, you can just use
42/// `Default::default()` instead.)
43///
44/// `TransientSource` implements [`EventSource`](crate::EventSource) and passes
45/// through `process_events()` calls, so in the parent's `process_events()`
46/// implementation you can just do this:
47///
48/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
49/// fn process_events<F>(
50/// &mut self,
51/// readiness: calloop::Readiness,
52/// token: calloop::Token,
53/// callback: F,
54/// ) -> Result<calloop::PostAction, Self::Error>
55/// where
56/// F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
57/// {
58/// let channel_return = self.mpsc_receiver.process_events(readiness, token, callback)?;
59///
60/// // Perform other logic here...
61///
62/// Ok(channel_return)
63/// }
64/// ```
65///
66/// Note that:
67///
68/// - You can call `process_events()` on the `TransientSource<Channel>` even
69/// if the channel has been unregistered and dropped. All that will happen
70/// is that you won't get any events from it.
71///
72/// - The [`PostAction`](crate::PostAction) returned from `process_events()`
73/// will only ever be `PostAction::Continue` or `PostAction::Reregister`.
74/// You will still need to combine this with the result of any other sources
75/// (transient or not).
76///
77/// Once you return `channel_return` from your `process_events()` method (and
78/// assuming it propagates all the way up to the event loop itself through any
79/// other event sources), the event loop might call `reregister()` on your
80/// source. All your source has to do is:
81///
82/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
83/// fn reregister(
84/// &mut self,
85/// poll: &mut calloop::Poll,
86/// token_factory: &mut calloop::TokenFactory,
87/// ) -> crate::Result<()> {
88/// self.mpsc_receiver.reregister(poll, token_factory)?;
89///
90/// // Other registration actions...
91///
92/// Ok(())
93/// }
94/// ```
95///
96/// The `TransientSource` will take care of updating the registration of the
97/// inner source, even if it actually needs to be unregistered or initially
98/// registered.
99///
100/// ## Replacing or removing `TransientSource`s
101///
102/// Not properly removing or replacing `TransientSource`s can cause spurious
103/// wakeups of the event loop, and in some cases can leak file descriptors or
104/// fail to free entries in Calloop's internal data structures. No unsoundness
105/// or undefined behaviour will result, but leaking file descriptors can result
106/// in errors or panics.
107///
108/// If you want to remove a source before it returns `PostAction::Remove`, use
109/// the [`TransientSource::remove()`] method. If you want to replace a source
110/// with another one, use the [`TransientSource::replace()`] method. Either of
111/// these may be called at any time during processing or from outside the event
112/// loop. Both require either returning `PostAction::Reregister` from the
113/// `process_event()` call that does this, or reregistering the event source
114/// some other way eg. via the top-level loop handle.
115///
116/// If, instead, you directly assign a new source to the variable holding the
117/// `TransientSource`, the inner source will be dropped before it can be
118/// unregistered. For example:
119///
120/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
121/// self.mpsc_receiver = Default::default();
122/// self.mpsc_receiver = new_channel.into();
123/// ```
124#[derive(Debug, Default)]
125pub struct TransientSource<T> {
126 state: TransientSourceState<T>,
127}
128
129/// This is the internal state of the [`TransientSource`], as a separate type so
130/// it's not exposed.
131#[derive(Debug)]
132enum TransientSourceState<T> {
133 /// The source should be kept in the loop.
134 Keep(T),
135 /// The source needs to be registered with the loop.
136 Register(T),
137 /// The source needs to be disabled but kept.
138 Disable(T),
139 /// The source needs to be removed from the loop.
140 Remove(T),
141 /// The source is being replaced by another. For most API purposes (eg.
142 /// `map()`), this will be treated as the `Register` state enclosing the new
143 /// source.
144 Replace {
145 /// The new source, which will be registered and used from now on.
146 new: T,
147 /// The old source, which will be unregistered and dropped.
148 old: T,
149 },
150 /// The source has been removed from the loop and dropped (this might also
151 /// be observed if there is a panic while changing states).
152 None,
153}
154
155#[allow(clippy::derivable_impls)]
156impl<T> Default for TransientSourceState<T> {
157 fn default() -> Self {
158 Self::None
159 }
160}
161
162impl<T> TransientSourceState<T> {
163 /// If a caller needs to flag the contained source for removal or
164 /// registration, we need to replace the enum variant safely. This requires
165 /// having a `None` value in there temporarily while we do the swap.
166 ///
167 /// If the variant is `None` the value will not change and `replacer` will
168 /// not be called. If the variant is `Replace` then `replacer` will be
169 /// called **on the new source**, which may cause the old source to leak
170 /// registration in the event loop if it has not yet been unregistered.
171 ///
172 /// The `replacer` function here is expected to be one of the enum variant
173 /// constructors eg. `replace(TransientSource::Remove)`.
174 fn replace_state<F>(&mut self, replacer: F)
175 where
176 F: FnOnce(T) -> Self,
177 {
178 *self = match std::mem::take(self) {
179 Self::Keep(source)
180 | Self::Register(source)
181 | Self::Remove(source)
182 | Self::Disable(source)
183 | Self::Replace { new: source, .. } => replacer(source),
184 Self::None => return,
185 };
186 }
187}
188
189impl<T> TransientSource<T> {
190 /// Apply a function to the enclosed source, if it exists and is not about
191 /// to be removed.
192 pub fn map<F, U>(&mut self, f: F) -> Option<U>
193 where
194 F: FnOnce(&mut T) -> U,
195 {
196 match &mut self.state {
197 TransientSourceState::Keep(source)
198 | TransientSourceState::Register(source)
199 | TransientSourceState::Disable(source)
200 | TransientSourceState::Replace { new: source, .. } => Some(f(source)),
201 TransientSourceState::Remove(_) | TransientSourceState::None => None,
202 }
203 }
204
205 /// Returns `true` if there is no wrapped event source.
206 pub fn is_none(&self) -> bool {
207 matches!(self.state, TransientSourceState::None)
208 }
209
210 /// Removes the wrapped event source from the event loop and this wrapper.
211 ///
212 /// If this is called from outside of the event loop, you will need to wake
213 /// up the event loop for any changes to take place. If it is called from
214 /// within the event loop, you must return `PostAction::Reregister` from
215 /// your own event source's `process_events()`, and the source will be
216 /// unregistered as needed after it exits.
217 pub fn remove(&mut self) {
218 self.state.replace_state(TransientSourceState::Remove);
219 }
220
221 /// Replace the currently wrapped source with the given one. No more events
222 /// will be generated from the old source after this point. The old source
223 /// will not be dropped immediately, it will be kept so that it can be
224 /// deregistered.
225 ///
226 /// If this is called from outside of the event loop, you will need to wake
227 /// up the event loop for any changes to take place. If it is called from
228 /// within the event loop, you must return `PostAction::Reregister` from
229 /// your own event source's `process_events()`, and the sources will be
230 /// registered and unregistered as needed after it exits.
231 pub fn replace(&mut self, new: T) {
232 self.state
233 .replace_state(|old| TransientSourceState::Replace { new, old });
234 }
235}
236
237impl<T: crate::EventSource> From<T> for TransientSource<T> {
238 fn from(source: T) -> Self {
239 Self {
240 state: TransientSourceState::Register(source),
241 }
242 }
243}
244
245impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
246 type Event = T::Event;
247 type Metadata = T::Metadata;
248 type Ret = T::Ret;
249 type Error = T::Error;
250
251 fn process_events<F>(
252 &mut self,
253 readiness: crate::Readiness,
254 token: crate::Token,
255 callback: F,
256 ) -> Result<crate::PostAction, Self::Error>
257 where
258 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
259 {
260 let reregister = if let TransientSourceState::Keep(source) = &mut self.state {
261 let child_post_action = source.process_events(readiness, token, callback)?;
262
263 match child_post_action {
264 // Nothing needs to change.
265 crate::PostAction::Continue => false,
266
267 // Our child source needs re-registration, therefore this
268 // wrapper needs re-registration.
269 crate::PostAction::Reregister => true,
270
271 // If our nested source needs to be removed or disabled, we need
272 // to swap it out for the "Remove" or "Disable" variant.
273 crate::PostAction::Disable => {
274 self.state.replace_state(TransientSourceState::Disable);
275 true
276 }
277
278 crate::PostAction::Remove => {
279 self.state.replace_state(TransientSourceState::Remove);
280 true
281 }
282 }
283 } else {
284 false
285 };
286
287 let post_action = if reregister {
288 crate::PostAction::Reregister
289 } else {
290 crate::PostAction::Continue
291 };
292
293 Ok(post_action)
294 }
295
296 fn register(
297 &mut self,
298 poll: &mut crate::Poll,
299 token_factory: &mut crate::TokenFactory,
300 ) -> crate::Result<()> {
301 match &mut self.state {
302 TransientSourceState::Keep(source) => {
303 source.register(poll, token_factory)?;
304 }
305 TransientSourceState::Register(source)
306 | TransientSourceState::Disable(source)
307 | TransientSourceState::Replace { new: source, .. } => {
308 source.register(poll, token_factory)?;
309 self.state.replace_state(TransientSourceState::Keep);
310 // Drops the disposed source in the Replace case.
311 }
312 TransientSourceState::Remove(_source) => {
313 self.state.replace_state(|_| TransientSourceState::None);
314 }
315 TransientSourceState::None => (),
316 }
317 Ok(())
318 }
319
320 fn reregister(
321 &mut self,
322 poll: &mut crate::Poll,
323 token_factory: &mut crate::TokenFactory,
324 ) -> crate::Result<()> {
325 match &mut self.state {
326 TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?,
327 TransientSourceState::Register(source) => {
328 source.register(poll, token_factory)?;
329 self.state.replace_state(TransientSourceState::Keep);
330 }
331 TransientSourceState::Disable(source) => {
332 source.unregister(poll)?;
333 }
334 TransientSourceState::Remove(source) => {
335 source.unregister(poll)?;
336 self.state.replace_state(|_| TransientSourceState::None);
337 }
338 TransientSourceState::Replace { new, old } => {
339 old.unregister(poll)?;
340 new.register(poll, token_factory)?;
341 self.state.replace_state(TransientSourceState::Keep);
342 // Drops 'dispose'.
343 }
344 TransientSourceState::None => (),
345 }
346 Ok(())
347 }
348
349 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
350 match &mut self.state {
351 TransientSourceState::Keep(source)
352 | TransientSourceState::Register(source)
353 | TransientSourceState::Disable(source) => source.unregister(poll)?,
354 TransientSourceState::Remove(source) => {
355 source.unregister(poll)?;
356 self.state.replace_state(|_| TransientSourceState::None);
357 }
358 TransientSourceState::Replace { new, old } => {
359 old.unregister(poll)?;
360 new.unregister(poll)?;
361 self.state.replace_state(TransientSourceState::Register);
362 }
363 TransientSourceState::None => (),
364 }
365 Ok(())
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::{
373 channel::{channel, Channel, Event},
374 ping::{make_ping, PingSource},
375 Dispatcher, EventSource, PostAction,
376 };
377 use std::{
378 rc::Rc,
379 sync::atomic::{AtomicBool, Ordering},
380 time::Duration,
381 };
382
383 #[test]
384 fn test_transient_drop() {
385 // A test source that sets a flag when it's dropped.
386 struct TestSource<'a> {
387 dropped: &'a AtomicBool,
388 ping: PingSource,
389 }
390
391 impl Drop for TestSource<'_> {
392 fn drop(&mut self) {
393 self.dropped.store(true, Ordering::Relaxed)
394 }
395 }
396
397 impl crate::EventSource for TestSource<'_> {
398 type Event = ();
399 type Metadata = ();
400 type Ret = ();
401 type Error = Box<dyn std::error::Error + Sync + Send>;
402
403 fn process_events<F>(
404 &mut self,
405 readiness: crate::Readiness,
406 token: crate::Token,
407 callback: F,
408 ) -> Result<crate::PostAction, Self::Error>
409 where
410 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
411 {
412 self.ping.process_events(readiness, token, callback)?;
413 Ok(PostAction::Remove)
414 }
415
416 fn register(
417 &mut self,
418 poll: &mut crate::Poll,
419 token_factory: &mut crate::TokenFactory,
420 ) -> crate::Result<()> {
421 self.ping.register(poll, token_factory)
422 }
423
424 fn reregister(
425 &mut self,
426 poll: &mut crate::Poll,
427 token_factory: &mut crate::TokenFactory,
428 ) -> crate::Result<()> {
429 self.ping.reregister(poll, token_factory)
430 }
431
432 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
433 self.ping.unregister(poll)
434 }
435 }
436
437 // Test that the inner source is actually dropped when it asks to be
438 // removed from the loop, while the TransientSource remains. We use two
439 // flags for this:
440 // - fired: should be set only when the inner event source has an event
441 // - dropped: set by the drop handler for the inner source (it's an
442 // AtomicBool becaues it requires a longer lifetime than the fired
443 // flag)
444 let mut fired = false;
445 let dropped = false.into();
446
447 // The inner source that should be dropped after the first loop run.
448 let (pinger, ping) = make_ping().unwrap();
449 let inner = TestSource {
450 dropped: &dropped,
451 ping,
452 };
453
454 // The TransientSource wrapper.
455 let outer: TransientSource<_> = inner.into();
456
457 let mut event_loop = crate::EventLoop::try_new().unwrap();
458 let handle = event_loop.handle();
459
460 let _token = handle
461 .insert_source(outer, |_, _, fired| {
462 *fired = true;
463 })
464 .unwrap();
465
466 // First loop run: the ping generates an event for the inner source.
467 pinger.ping();
468
469 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
470
471 assert!(fired);
472 assert!(dropped.load(Ordering::Relaxed));
473
474 // Second loop run: the ping does nothing because the receiver has been
475 // dropped.
476 fired = false;
477
478 pinger.ping();
479
480 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
481 assert!(!fired);
482 }
483
484 #[test]
485 fn test_transient_passthrough() {
486 // Test that event processing works when a source is nested inside a
487 // TransientSource. In particular, we want to ensure that the final
488 // event is received even if it corresponds to that same event source
489 // returning `PostAction::Remove`.
490 let (sender, receiver) = channel();
491 let outer: TransientSource<_> = receiver.into();
492
493 let mut event_loop = crate::EventLoop::try_new().unwrap();
494 let handle = event_loop.handle();
495
496 // Our callback puts the receied events in here for us to check later.
497 let mut msg_queue = vec![];
498
499 let _token = handle
500 .insert_source(outer, |msg, _, queue: &mut Vec<_>| {
501 queue.push(msg);
502 })
503 .unwrap();
504
505 // Send some data and drop the sender. We specifically want to test that
506 // we get the "closed" message.
507 sender.send(0u32).unwrap();
508 sender.send(1u32).unwrap();
509 sender.send(2u32).unwrap();
510 sender.send(3u32).unwrap();
511 drop(sender);
512
513 // Run loop once to process events.
514 event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
515
516 assert!(matches!(
517 msg_queue.as_slice(),
518 &[
519 Event::Msg(0u32),
520 Event::Msg(1u32),
521 Event::Msg(2u32),
522 Event::Msg(3u32),
523 Event::Closed
524 ]
525 ));
526 }
527
528 #[test]
529 fn test_transient_map() {
530 struct IdSource {
531 id: u32,
532 ping: PingSource,
533 }
534
535 impl EventSource for IdSource {
536 type Event = u32;
537 type Metadata = ();
538 type Ret = ();
539 type Error = Box<dyn std::error::Error + Sync + Send>;
540
541 fn process_events<F>(
542 &mut self,
543 readiness: crate::Readiness,
544 token: crate::Token,
545 mut callback: F,
546 ) -> Result<PostAction, Self::Error>
547 where
548 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
549 {
550 let id = self.id;
551 self.ping
552 .process_events(readiness, token, |_, md| callback(id, md))?;
553
554 let action = if self.id > 2 {
555 PostAction::Remove
556 } else {
557 PostAction::Continue
558 };
559
560 Ok(action)
561 }
562
563 fn register(
564 &mut self,
565 poll: &mut crate::Poll,
566 token_factory: &mut crate::TokenFactory,
567 ) -> crate::Result<()> {
568 self.ping.register(poll, token_factory)
569 }
570
571 fn reregister(
572 &mut self,
573 poll: &mut crate::Poll,
574 token_factory: &mut crate::TokenFactory,
575 ) -> crate::Result<()> {
576 self.ping.reregister(poll, token_factory)
577 }
578
579 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
580 self.ping.unregister(poll)
581 }
582 }
583
584 struct WrapperSource(TransientSource<IdSource>);
585
586 impl EventSource for WrapperSource {
587 type Event = <IdSource as EventSource>::Event;
588 type Metadata = <IdSource as EventSource>::Metadata;
589 type Ret = <IdSource as EventSource>::Ret;
590 type Error = <IdSource as EventSource>::Error;
591
592 fn process_events<F>(
593 &mut self,
594 readiness: crate::Readiness,
595 token: crate::Token,
596 callback: F,
597 ) -> Result<PostAction, Self::Error>
598 where
599 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
600 {
601 let action = self.0.process_events(readiness, token, callback);
602 self.0.map(|inner| inner.id += 1);
603 action
604 }
605
606 fn register(
607 &mut self,
608 poll: &mut crate::Poll,
609 token_factory: &mut crate::TokenFactory,
610 ) -> crate::Result<()> {
611 self.0.map(|inner| inner.id += 1);
612 self.0.register(poll, token_factory)
613 }
614
615 fn reregister(
616 &mut self,
617 poll: &mut crate::Poll,
618 token_factory: &mut crate::TokenFactory,
619 ) -> crate::Result<()> {
620 self.0.map(|inner| inner.id += 1);
621 self.0.reregister(poll, token_factory)
622 }
623
624 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
625 self.0.map(|inner| inner.id += 1);
626 self.0.unregister(poll)
627 }
628 }
629
630 // To test the id later.
631 let mut id = 0;
632
633 // Create our source.
634 let (pinger, ping) = make_ping().unwrap();
635 let inner = IdSource { id, ping };
636
637 // The TransientSource wrapper.
638 let outer: TransientSource<_> = inner.into();
639
640 // The top level source.
641 let top = WrapperSource(outer);
642
643 // Create a dispatcher so we can check the source afterwards.
644 let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
645 *test_id = got_id;
646 });
647
648 let mut event_loop = crate::EventLoop::try_new().unwrap();
649 let handle = event_loop.handle();
650
651 let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
652
653 // First loop run: the ping generates an event for the inner source.
654 // The ID should be 1 after the increment in register().
655 pinger.ping();
656 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
657 assert_eq!(id, 1);
658
659 // Second loop run: the ID should be 2 after the previous
660 // process_events().
661 pinger.ping();
662 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
663 assert_eq!(id, 2);
664
665 // Third loop run: the ID should be 3 after another process_events().
666 pinger.ping();
667 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
668 assert_eq!(id, 3);
669
670 // Fourth loop run: the callback is no longer called by the inner
671 // source, so our local ID is not incremented.
672 pinger.ping();
673 event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
674 assert_eq!(id, 3);
675
676 // Remove the dispatcher so we can inspect the sources.
677 handle.remove(token);
678
679 let mut top_after = dispatcher.into_source_inner();
680
681 // I expect the inner source to be dropped, so the TransientSource
682 // variant is None (its version of None, not Option::None), so its map()
683 // won't call the passed-in function (hence the unreachable!()) and its
684 // return value should be Option::None.
685 assert!(top_after.0.map(|_| unreachable!()).is_none());
686 }
687
688 #[test]
689 fn test_transient_disable() {
690 // Test that disabling and enabling is handled properly.
691 struct DisablingSource(PingSource);
692
693 impl EventSource for DisablingSource {
694 type Event = ();
695 type Metadata = ();
696 type Ret = ();
697 type Error = Box<dyn std::error::Error + Sync + Send>;
698
699 fn process_events<F>(
700 &mut self,
701 readiness: crate::Readiness,
702 token: crate::Token,
703 callback: F,
704 ) -> Result<PostAction, Self::Error>
705 where
706 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
707 {
708 self.0.process_events(readiness, token, callback)?;
709 Ok(PostAction::Disable)
710 }
711
712 fn register(
713 &mut self,
714 poll: &mut crate::Poll,
715 token_factory: &mut crate::TokenFactory,
716 ) -> crate::Result<()> {
717 self.0.register(poll, token_factory)
718 }
719
720 fn reregister(
721 &mut self,
722 poll: &mut crate::Poll,
723 token_factory: &mut crate::TokenFactory,
724 ) -> crate::Result<()> {
725 self.0.reregister(poll, token_factory)
726 }
727
728 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
729 self.0.unregister(poll)
730 }
731 }
732
733 // Flag for checking when the source fires.
734 let mut fired = false;
735
736 // Create our source.
737 let (pinger, ping) = make_ping().unwrap();
738
739 let inner = DisablingSource(ping);
740
741 // The TransientSource wrapper.
742 let outer: TransientSource<_> = inner.into();
743
744 let mut event_loop = crate::EventLoop::try_new().unwrap();
745 let handle = event_loop.handle();
746 let token = handle
747 .insert_source(outer, |_, _, fired| {
748 *fired = true;
749 })
750 .unwrap();
751
752 // Ping here and not later, to check that disabling after an event is
753 // triggered but not processed does not discard the event.
754 pinger.ping();
755 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
756 assert!(fired);
757
758 // Source should now be disabled.
759 pinger.ping();
760 fired = false;
761 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
762 assert!(!fired);
763
764 // Re-enable the source.
765 handle.enable(&token).unwrap();
766
767 // Trigger another event.
768 pinger.ping();
769 fired = false;
770 event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
771 assert!(fired);
772 }
773
774 #[test]
775 fn test_transient_replace_unregister() {
776 // This is a bit of a complex test, but it essentially boils down to:
777 // how can a "parent" event source containing a TransientSource replace
778 // the "child" source without leaking the source's registration?
779
780 // First, a source that finishes immediately. This is so we cover the
781 // edge case of replacing a source as soon as it wants to be removed.
782 struct FinishImmediatelySource {
783 source: PingSource,
784 data: Option<i32>,
785 registered: bool,
786 dropped: Rc<AtomicBool>,
787 }
788
789 impl FinishImmediatelySource {
790 // The constructor passes out the drop flag so we can check that
791 // this source was or wasn't dropped.
792 fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) {
793 let dropped = Rc::new(false.into());
794
795 (
796 Self {
797 source,
798 data: Some(data),
799 registered: false,
800 dropped: Rc::clone(&dropped),
801 },
802 dropped,
803 )
804 }
805 }
806
807 impl EventSource for FinishImmediatelySource {
808 type Event = i32;
809 type Metadata = ();
810 type Ret = ();
811 type Error = Box<dyn std::error::Error + Sync + Send>;
812
813 fn process_events<F>(
814 &mut self,
815 readiness: crate::Readiness,
816 token: crate::Token,
817 mut callback: F,
818 ) -> Result<PostAction, Self::Error>
819 where
820 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
821 {
822 let mut data = self.data.take();
823
824 self.source.process_events(readiness, token, |_, _| {
825 if let Some(data) = data.take() {
826 callback(data, &mut ())
827 }
828 })?;
829
830 self.data = data;
831
832 Ok(if self.data.is_none() {
833 PostAction::Remove
834 } else {
835 PostAction::Continue
836 })
837 }
838
839 fn register(
840 &mut self,
841 poll: &mut crate::Poll,
842 token_factory: &mut crate::TokenFactory,
843 ) -> crate::Result<()> {
844 self.registered = true;
845 self.source.register(poll, token_factory)
846 }
847
848 fn reregister(
849 &mut self,
850 poll: &mut crate::Poll,
851 token_factory: &mut crate::TokenFactory,
852 ) -> crate::Result<()> {
853 self.source.reregister(poll, token_factory)
854 }
855
856 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
857 self.registered = false;
858 self.source.unregister(poll)
859 }
860 }
861
862 // The drop handler sets a flag we can check for debugging (we want to
863 // know that the source itself was dropped), and also checks that the
864 // source was unregistered. Ultimately neither the source nor its
865 // registration should be leaked.
866
867 impl Drop for FinishImmediatelySource {
868 fn drop(&mut self) {
869 assert!(!self.registered, "source dropped while still registered");
870 self.dropped.store(true, Ordering::Relaxed);
871 }
872 }
873
874 // Our wrapper source handles detecting when the child source finishes,
875 // and replacing that child source with another one that will generate
876 // more events. This is one intended use case of the TransientSource.
877
878 struct WrapperSource {
879 current: TransientSource<FinishImmediatelySource>,
880 replacement: Option<FinishImmediatelySource>,
881 dropped: Rc<AtomicBool>,
882 }
883
884 impl WrapperSource {
885 // The constructor passes out the drop flag so we can check that
886 // this source was or wasn't dropped.
887 fn new(
888 first: FinishImmediatelySource,
889 second: FinishImmediatelySource,
890 ) -> (Self, Rc<AtomicBool>) {
891 let dropped = Rc::new(false.into());
892
893 (
894 Self {
895 current: first.into(),
896 replacement: second.into(),
897 dropped: Rc::clone(&dropped),
898 },
899 dropped,
900 )
901 }
902 }
903
904 impl EventSource for WrapperSource {
905 type Event = i32;
906 type Metadata = ();
907 type Ret = ();
908 type Error = Box<dyn std::error::Error + Sync + Send>;
909
910 fn process_events<F>(
911 &mut self,
912 readiness: crate::Readiness,
913 token: crate::Token,
914 mut callback: F,
915 ) -> Result<PostAction, Self::Error>
916 where
917 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
918 {
919 // Did our event source generate an event?
920 let mut fired = false;
921
922 let post_action = self.current.process_events(readiness, token, |data, _| {
923 callback(data, &mut ());
924 fired = true;
925 })?;
926
927 if fired {
928 // The event source will be unregistered after the current
929 // process_events() iteration is finished. The replace()
930 // method will handle doing that even while we've added a
931 // new source.
932 if let Some(replacement) = self.replacement.take() {
933 self.current.replace(replacement);
934 }
935
936 // Parent source is responsible for flagging this, but it's
937 // already set.
938 assert_eq!(post_action, PostAction::Reregister);
939 }
940
941 Ok(post_action)
942 }
943
944 fn register(
945 &mut self,
946 poll: &mut crate::Poll,
947 token_factory: &mut crate::TokenFactory,
948 ) -> crate::Result<()> {
949 self.current.register(poll, token_factory)
950 }
951
952 fn reregister(
953 &mut self,
954 poll: &mut crate::Poll,
955 token_factory: &mut crate::TokenFactory,
956 ) -> crate::Result<()> {
957 self.current.reregister(poll, token_factory)
958 }
959
960 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
961 self.current.unregister(poll)
962 }
963 }
964
965 impl Drop for WrapperSource {
966 fn drop(&mut self) {
967 self.dropped.store(true, Ordering::Relaxed);
968 }
969 }
970
971 // Construct the various nested sources - FinishImmediatelySource inside
972 // TransientSource inside WrapperSource. The numbers let us verify which
973 // event source fires first.
974 let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap();
975 let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap();
976 let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0);
977 let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1);
978 let (outer, outer_dropped) = WrapperSource::new(inner0, inner1);
979
980 // Now the actual test starts.
981
982 let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> =
983 crate::EventLoop::try_new().unwrap();
984 let handle = event_loop.handle();
985 let signal = event_loop.get_signal();
986
987 // This is how we communicate with the event sources.
988 let mut context = (None, signal);
989
990 let _token = handle
991 .insert_source(outer, |data, _, (evt, sig)| {
992 *evt = Some(data);
993 sig.stop();
994 })
995 .unwrap();
996
997 // Ensure our sources fire.
998 ping0_tx.ping();
999 ping1_tx.ping();
1000
1001 // Use run() rather than dispatch() because it's not strictly part of
1002 // any API contract as to how many runs of the event loop it takes to
1003 // replace the nested source.
1004 event_loop.run(None, &mut context, |_| {}).unwrap();
1005
1006 // First, make sure the inner source actually did fire.
1007 assert_eq!(context.0.take(), Some(0), "first inner source did not fire");
1008
1009 // Make sure that the outer source is still alive.
1010 assert!(
1011 !outer_dropped.load(Ordering::Relaxed),
1012 "outer source already dropped"
1013 );
1014
1015 // Make sure that the inner child source IS dropped now.
1016 assert!(
1017 inner0_dropped.load(Ordering::Relaxed),
1018 "first inner source not dropped"
1019 );
1020
1021 // Make sure that, in between the first event and second event, the
1022 // replacement child source still exists.
1023 assert!(
1024 !inner1_dropped.load(Ordering::Relaxed),
1025 "replacement inner source dropped"
1026 );
1027
1028 // Run the event loop until we get a second event.
1029 event_loop.run(None, &mut context, |_| {}).unwrap();
1030
1031 // Ensure the replacement source fired (which checks that it was
1032 // registered and is being processed by the TransientSource).
1033 assert_eq!(context.0.take(), Some(1), "replacement source did not fire");
1034 }
1035
1036 #[test]
1037 fn test_transient_remove() {
1038 // This tests that calling remove(), even before an event source has
1039 // requested its own removal, results in the event source being removed.
1040
1041 const STOP_AT: i32 = 2;
1042
1043 // A wrapper source to automate the removal of the inner source.
1044 struct WrapperSource {
1045 inner: TransientSource<Channel<i32>>,
1046 }
1047
1048 impl EventSource for WrapperSource {
1049 type Event = i32;
1050 type Metadata = ();
1051 type Ret = ();
1052 type Error = Box<dyn std::error::Error + Sync + Send>;
1053
1054 fn process_events<F>(
1055 &mut self,
1056 readiness: crate::Readiness,
1057 token: crate::Token,
1058 mut callback: F,
1059 ) -> Result<PostAction, Self::Error>
1060 where
1061 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1062 {
1063 let mut remove = false;
1064
1065 let mut post_action = self.inner.process_events(readiness, token, |evt, _| {
1066 if let Event::Msg(num) = evt {
1067 callback(num, &mut ());
1068 remove = num >= STOP_AT;
1069 }
1070 })?;
1071
1072 if remove {
1073 self.inner.remove();
1074 post_action |= PostAction::Reregister;
1075 }
1076
1077 Ok(post_action)
1078 }
1079
1080 fn register(
1081 &mut self,
1082 poll: &mut crate::Poll,
1083 token_factory: &mut crate::TokenFactory,
1084 ) -> crate::Result<()> {
1085 self.inner.register(poll, token_factory)
1086 }
1087
1088 fn reregister(
1089 &mut self,
1090 poll: &mut crate::Poll,
1091 token_factory: &mut crate::TokenFactory,
1092 ) -> crate::Result<()> {
1093 self.inner.reregister(poll, token_factory)
1094 }
1095
1096 fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
1097 self.inner.unregister(poll)
1098 }
1099 }
1100
1101 // Create our sources and loop.
1102
1103 let (sender, receiver) = channel();
1104 let wrapper = WrapperSource {
1105 inner: receiver.into(),
1106 };
1107
1108 let mut event_loop = crate::EventLoop::try_new().unwrap();
1109 let handle = event_loop.handle();
1110
1111 handle
1112 .insert_source(wrapper, |num, _, out: &mut Option<_>| {
1113 *out = Some(num);
1114 })
1115 .unwrap();
1116
1117 // Storage for callback data.
1118 let mut out = None;
1119
1120 // Send some data we expect to get callbacks for.
1121 for num in 0..=STOP_AT {
1122 sender.send(num).unwrap();
1123 event_loop.dispatch(Duration::ZERO, &mut out).unwrap();
1124 assert_eq!(out.take(), Some(num));
1125 }
1126
1127 // Now we expect the receiver to be gone.
1128 assert!(matches!(
1129 sender.send(STOP_AT + 1),
1130 Err(std::sync::mpsc::SendError { .. })
1131 ));
1132 }
1133}