Skip to main content

calloop/sources/
transient.rs

1//! Wrapper for a transient Calloop event source.
2//!
3//! If you have high level event source that you expect to remain in the event
4//! loop indefinitely, and another event source nested inside that one that you
5//! expect to require removal or disabling from time to time, this module can
6//! handle it for you.
7
8/// A [`TransientSource`] wraps a Calloop event source and manages its
9/// registration. A user of this type only needs to perform the usual Calloop
10/// calls (`process_events()` and `*register()`) and the return value of
11/// [`process_events()`](crate::EventSource::process_events).
12///
13/// Rather than needing to check for the full set of
14/// [`PostAction`](crate::PostAction) values returned from `process_events()`,
15/// you can just check for `Continue` or `Reregister` and pass that back out
16/// through your own `process_events()` implementation. In your registration
17/// functions, you then only need to call the same function on this type ie.
18/// `register()` inside `register()` etc.
19///
20/// For example, say you have a source that contains a channel along with some
21/// other logic. If the channel's sending end has been dropped, it needs to be
22/// removed from the loop. So to manage this, you use this in your struct:
23///
24/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
25/// struct CompositeSource {
26///    // Event source for channel.
27///    mpsc_receiver: TransientSource<calloop::channel::Channel<T>>,
28///
29///    // Any other fields go here...
30/// }
31/// ```
32///
33/// To create the transient source, you can simply use the `Into`
34/// implementation:
35///
36/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
37/// let (sender, source) = channel();
38/// let mpsc_receiver: TransientSource<Channel> = source.into();
39/// ```
40///
41/// (If you want to start off with an empty `TransientSource`, you can just use
42/// `Default::default()` instead.)
43///
44/// `TransientSource` implements [`EventSource`](crate::EventSource) and passes
45/// through `process_events()` calls, so in the parent's `process_events()`
46/// implementation you can just do this:
47///
48/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
49/// fn process_events<F>(
50///     &mut self,
51///     readiness: calloop::Readiness,
52///     token: calloop::Token,
53///     callback: F,
54/// ) -> Result<calloop::PostAction, Self::Error>
55/// where
56///     F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
57/// {
58///     let channel_return = self.mpsc_receiver.process_events(readiness, token, callback)?;
59///
60///     // Perform other logic here...
61///
62///     Ok(channel_return)
63/// }
64/// ```
65///
66/// Note that:
67///
68///   - You can call `process_events()` on the `TransientSource<Channel>` even
69///     if the channel has been unregistered and dropped. All that will happen
70///     is that you won't get any events from it.
71///
72///   - The [`PostAction`](crate::PostAction) returned from `process_events()`
73///     will only ever be `PostAction::Continue` or `PostAction::Reregister`.
74///     You will still need to combine this with the result of any other sources
75///     (transient or not).
76///
77/// Once you return `channel_return` from your `process_events()` method (and
78/// assuming it propagates all the way up to the event loop itself through any
79/// other event sources), the event loop might call `reregister()` on your
80/// source. All your source has to do is:
81///
82/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
83/// fn reregister(
84///     &mut self,
85///     poll: &mut calloop::Poll,
86///     token_factory: &mut calloop::TokenFactory,
87/// ) -> crate::Result<()> {
88///     self.mpsc_receiver.reregister(poll, token_factory)?;
89///
90///     // Other registration actions...
91///
92///     Ok(())
93/// }
94/// ```
95///
96/// The `TransientSource` will take care of updating the registration of the
97/// inner source, even if it actually needs to be unregistered or initially
98/// registered.
99///
100/// ## Replacing or removing `TransientSource`s
101///
102/// Not properly removing or replacing `TransientSource`s can cause spurious
103/// wakeups of the event loop, and in some cases can leak file descriptors or
104/// fail to free entries in Calloop's internal data structures. No unsoundness
105/// or undefined behaviour will result, but leaking file descriptors can result
106/// in errors or panics.
107///
108/// If you want to remove a source before it returns `PostAction::Remove`, use
109/// the [`TransientSource::remove()`] method. If you want to replace a source
110/// with another one, use the [`TransientSource::replace()`] method. Either of
111/// these may be called at any time during processing or from outside the event
112/// loop. Both require either returning `PostAction::Reregister` from the
113/// `process_event()` call that does this, or reregistering the event source
114/// some other way eg. via the top-level loop handle.
115///
116/// If, instead, you directly assign a new source to the variable holding the
117/// `TransientSource`, the inner source will be dropped before it can be
118/// unregistered. For example:
119///
120/// ```none,actually-rust-but-see-https://github.com/rust-lang/rust/issues/63193
121/// self.mpsc_receiver = Default::default();
122/// self.mpsc_receiver = new_channel.into();
123/// ```
124#[derive(Debug, Default)]
125pub struct TransientSource<T> {
126    state: TransientSourceState<T>,
127}
128
129/// This is the internal state of the [`TransientSource`], as a separate type so
130/// it's not exposed.
131#[derive(Debug)]
132enum TransientSourceState<T> {
133    /// The source should be kept in the loop.
134    Keep(T),
135    /// The source needs to be registered with the loop.
136    Register(T),
137    /// The source needs to be disabled but kept.
138    Disable(T),
139    /// The source needs to be removed from the loop.
140    Remove(T),
141    /// The source is being replaced by another. For most API purposes (eg.
142    /// `map()`), this will be treated as the `Register` state enclosing the new
143    /// source.
144    Replace {
145        /// The new source, which will be registered and used from now on.
146        new: T,
147        /// The old source, which will be unregistered and dropped.
148        old: T,
149    },
150    /// The source has been removed from the loop and dropped (this might also
151    /// be observed if there is a panic while changing states).
152    None,
153}
154
155#[allow(clippy::derivable_impls)]
156impl<T> Default for TransientSourceState<T> {
157    fn default() -> Self {
158        Self::None
159    }
160}
161
162impl<T> TransientSourceState<T> {
163    /// If a caller needs to flag the contained source for removal or
164    /// registration, we need to replace the enum variant safely. This requires
165    /// having a `None` value in there temporarily while we do the swap.
166    ///
167    /// If the variant is `None` the value will not change and `replacer` will
168    /// not be called. If the variant is `Replace` then `replacer` will be
169    /// called **on the new source**, which may cause the old source to leak
170    /// registration in the event loop if it has not yet been unregistered.
171    ///
172    /// The `replacer` function here is expected to be one of the enum variant
173    /// constructors eg. `replace(TransientSource::Remove)`.
174    fn replace_state<F>(&mut self, replacer: F)
175    where
176        F: FnOnce(T) -> Self,
177    {
178        *self = match std::mem::take(self) {
179            Self::Keep(source)
180            | Self::Register(source)
181            | Self::Remove(source)
182            | Self::Disable(source)
183            | Self::Replace { new: source, .. } => replacer(source),
184            Self::None => return,
185        };
186    }
187}
188
189impl<T> TransientSource<T> {
190    /// Apply a function to the enclosed source, if it exists and is not about
191    /// to be removed.
192    pub fn map<F, U>(&mut self, f: F) -> Option<U>
193    where
194        F: FnOnce(&mut T) -> U,
195    {
196        match &mut self.state {
197            TransientSourceState::Keep(source)
198            | TransientSourceState::Register(source)
199            | TransientSourceState::Disable(source)
200            | TransientSourceState::Replace { new: source, .. } => Some(f(source)),
201            TransientSourceState::Remove(_) | TransientSourceState::None => None,
202        }
203    }
204
205    /// Returns `true` if there is no wrapped event source.
206    pub fn is_none(&self) -> bool {
207        matches!(self.state, TransientSourceState::None)
208    }
209
210    /// Removes the wrapped event source from the event loop and this wrapper.
211    ///
212    /// If this is called from outside of the event loop, you will need to wake
213    /// up the event loop for any changes to take place. If it is called from
214    /// within the event loop, you must return `PostAction::Reregister` from
215    /// your own event source's `process_events()`, and the source will be
216    /// unregistered as needed after it exits.
217    pub fn remove(&mut self) {
218        self.state.replace_state(TransientSourceState::Remove);
219    }
220
221    /// Replace the currently wrapped source with the given one.  No more events
222    /// will be generated from the old source after this point. The old source
223    /// will not be dropped immediately, it will be kept so that it can be
224    /// deregistered.
225    ///
226    /// If this is called from outside of the event loop, you will need to wake
227    /// up the event loop for any changes to take place. If it is called from
228    /// within the event loop, you must return `PostAction::Reregister` from
229    /// your own event source's `process_events()`, and the sources will be
230    /// registered and unregistered as needed after it exits.
231    pub fn replace(&mut self, new: T) {
232        self.state
233            .replace_state(|old| TransientSourceState::Replace { new, old });
234    }
235}
236
237impl<T: crate::EventSource> From<T> for TransientSource<T> {
238    fn from(source: T) -> Self {
239        Self {
240            state: TransientSourceState::Register(source),
241        }
242    }
243}
244
245impl<T: crate::EventSource> crate::EventSource for TransientSource<T> {
246    type Event = T::Event;
247    type Metadata = T::Metadata;
248    type Ret = T::Ret;
249    type Error = T::Error;
250
251    fn process_events<F>(
252        &mut self,
253        readiness: crate::Readiness,
254        token: crate::Token,
255        callback: F,
256    ) -> Result<crate::PostAction, Self::Error>
257    where
258        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
259    {
260        let reregister = if let TransientSourceState::Keep(source) = &mut self.state {
261            let child_post_action = source.process_events(readiness, token, callback)?;
262
263            match child_post_action {
264                // Nothing needs to change.
265                crate::PostAction::Continue => false,
266
267                // Our child source needs re-registration, therefore this
268                // wrapper needs re-registration.
269                crate::PostAction::Reregister => true,
270
271                // If our nested source needs to be removed or disabled, we need
272                // to swap it out for the "Remove" or "Disable" variant.
273                crate::PostAction::Disable => {
274                    self.state.replace_state(TransientSourceState::Disable);
275                    true
276                }
277
278                crate::PostAction::Remove => {
279                    self.state.replace_state(TransientSourceState::Remove);
280                    true
281                }
282            }
283        } else {
284            false
285        };
286
287        let post_action = if reregister {
288            crate::PostAction::Reregister
289        } else {
290            crate::PostAction::Continue
291        };
292
293        Ok(post_action)
294    }
295
296    fn register(
297        &mut self,
298        poll: &mut crate::Poll,
299        token_factory: &mut crate::TokenFactory,
300    ) -> crate::Result<()> {
301        match &mut self.state {
302            TransientSourceState::Keep(source) => {
303                source.register(poll, token_factory)?;
304            }
305            TransientSourceState::Register(source)
306            | TransientSourceState::Disable(source)
307            | TransientSourceState::Replace { new: source, .. } => {
308                source.register(poll, token_factory)?;
309                self.state.replace_state(TransientSourceState::Keep);
310                // Drops the disposed source in the Replace case.
311            }
312            TransientSourceState::Remove(_source) => {
313                self.state.replace_state(|_| TransientSourceState::None);
314            }
315            TransientSourceState::None => (),
316        }
317        Ok(())
318    }
319
320    fn reregister(
321        &mut self,
322        poll: &mut crate::Poll,
323        token_factory: &mut crate::TokenFactory,
324    ) -> crate::Result<()> {
325        match &mut self.state {
326            TransientSourceState::Keep(source) => source.reregister(poll, token_factory)?,
327            TransientSourceState::Register(source) => {
328                source.register(poll, token_factory)?;
329                self.state.replace_state(TransientSourceState::Keep);
330            }
331            TransientSourceState::Disable(source) => {
332                source.unregister(poll)?;
333            }
334            TransientSourceState::Remove(source) => {
335                source.unregister(poll)?;
336                self.state.replace_state(|_| TransientSourceState::None);
337            }
338            TransientSourceState::Replace { new, old } => {
339                old.unregister(poll)?;
340                new.register(poll, token_factory)?;
341                self.state.replace_state(TransientSourceState::Keep);
342                // Drops 'dispose'.
343            }
344            TransientSourceState::None => (),
345        }
346        Ok(())
347    }
348
349    fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
350        match &mut self.state {
351            TransientSourceState::Keep(source)
352            | TransientSourceState::Register(source)
353            | TransientSourceState::Disable(source) => source.unregister(poll)?,
354            TransientSourceState::Remove(source) => {
355                source.unregister(poll)?;
356                self.state.replace_state(|_| TransientSourceState::None);
357            }
358            TransientSourceState::Replace { new, old } => {
359                old.unregister(poll)?;
360                new.unregister(poll)?;
361                self.state.replace_state(TransientSourceState::Register);
362            }
363            TransientSourceState::None => (),
364        }
365        Ok(())
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::{
373        channel::{channel, Channel, Event},
374        ping::{make_ping, PingSource},
375        Dispatcher, EventSource, PostAction,
376    };
377    use std::{
378        rc::Rc,
379        sync::atomic::{AtomicBool, Ordering},
380        time::Duration,
381    };
382
383    #[test]
384    fn test_transient_drop() {
385        // A test source that sets a flag when it's dropped.
386        struct TestSource<'a> {
387            dropped: &'a AtomicBool,
388            ping: PingSource,
389        }
390
391        impl Drop for TestSource<'_> {
392            fn drop(&mut self) {
393                self.dropped.store(true, Ordering::Relaxed)
394            }
395        }
396
397        impl crate::EventSource for TestSource<'_> {
398            type Event = ();
399            type Metadata = ();
400            type Ret = ();
401            type Error = Box<dyn std::error::Error + Sync + Send>;
402
403            fn process_events<F>(
404                &mut self,
405                readiness: crate::Readiness,
406                token: crate::Token,
407                callback: F,
408            ) -> Result<crate::PostAction, Self::Error>
409            where
410                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
411            {
412                self.ping.process_events(readiness, token, callback)?;
413                Ok(PostAction::Remove)
414            }
415
416            fn register(
417                &mut self,
418                poll: &mut crate::Poll,
419                token_factory: &mut crate::TokenFactory,
420            ) -> crate::Result<()> {
421                self.ping.register(poll, token_factory)
422            }
423
424            fn reregister(
425                &mut self,
426                poll: &mut crate::Poll,
427                token_factory: &mut crate::TokenFactory,
428            ) -> crate::Result<()> {
429                self.ping.reregister(poll, token_factory)
430            }
431
432            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
433                self.ping.unregister(poll)
434            }
435        }
436
437        // Test that the inner source is actually dropped when it asks to be
438        // removed from the loop, while the TransientSource remains. We use two
439        // flags for this:
440        // - fired: should be set only when the inner event source has an event
441        // - dropped: set by the drop handler for the inner source (it's an
442        //   AtomicBool becaues it requires a longer lifetime than the fired
443        //   flag)
444        let mut fired = false;
445        let dropped = false.into();
446
447        // The inner source that should be dropped after the first loop run.
448        let (pinger, ping) = make_ping().unwrap();
449        let inner = TestSource {
450            dropped: &dropped,
451            ping,
452        };
453
454        // The TransientSource wrapper.
455        let outer: TransientSource<_> = inner.into();
456
457        let mut event_loop = crate::EventLoop::try_new().unwrap();
458        let handle = event_loop.handle();
459
460        let _token = handle
461            .insert_source(outer, |_, _, fired| {
462                *fired = true;
463            })
464            .unwrap();
465
466        // First loop run: the ping generates an event for the inner source.
467        pinger.ping();
468
469        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
470
471        assert!(fired);
472        assert!(dropped.load(Ordering::Relaxed));
473
474        // Second loop run: the ping does nothing because the receiver has been
475        // dropped.
476        fired = false;
477
478        pinger.ping();
479
480        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
481        assert!(!fired);
482    }
483
484    #[test]
485    fn test_transient_passthrough() {
486        // Test that event processing works when a source is nested inside a
487        // TransientSource. In particular, we want to ensure that the final
488        // event is received even if it corresponds to that same event source
489        // returning `PostAction::Remove`.
490        let (sender, receiver) = channel();
491        let outer: TransientSource<_> = receiver.into();
492
493        let mut event_loop = crate::EventLoop::try_new().unwrap();
494        let handle = event_loop.handle();
495
496        // Our callback puts the receied events in here for us to check later.
497        let mut msg_queue = vec![];
498
499        let _token = handle
500            .insert_source(outer, |msg, _, queue: &mut Vec<_>| {
501                queue.push(msg);
502            })
503            .unwrap();
504
505        // Send some data and drop the sender. We specifically want to test that
506        // we get the "closed" message.
507        sender.send(0u32).unwrap();
508        sender.send(1u32).unwrap();
509        sender.send(2u32).unwrap();
510        sender.send(3u32).unwrap();
511        drop(sender);
512
513        // Run loop once to process events.
514        event_loop.dispatch(Duration::ZERO, &mut msg_queue).unwrap();
515
516        assert!(matches!(
517            msg_queue.as_slice(),
518            &[
519                Event::Msg(0u32),
520                Event::Msg(1u32),
521                Event::Msg(2u32),
522                Event::Msg(3u32),
523                Event::Closed
524            ]
525        ));
526    }
527
528    #[test]
529    fn test_transient_map() {
530        struct IdSource {
531            id: u32,
532            ping: PingSource,
533        }
534
535        impl EventSource for IdSource {
536            type Event = u32;
537            type Metadata = ();
538            type Ret = ();
539            type Error = Box<dyn std::error::Error + Sync + Send>;
540
541            fn process_events<F>(
542                &mut self,
543                readiness: crate::Readiness,
544                token: crate::Token,
545                mut callback: F,
546            ) -> Result<PostAction, Self::Error>
547            where
548                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
549            {
550                let id = self.id;
551                self.ping
552                    .process_events(readiness, token, |_, md| callback(id, md))?;
553
554                let action = if self.id > 2 {
555                    PostAction::Remove
556                } else {
557                    PostAction::Continue
558                };
559
560                Ok(action)
561            }
562
563            fn register(
564                &mut self,
565                poll: &mut crate::Poll,
566                token_factory: &mut crate::TokenFactory,
567            ) -> crate::Result<()> {
568                self.ping.register(poll, token_factory)
569            }
570
571            fn reregister(
572                &mut self,
573                poll: &mut crate::Poll,
574                token_factory: &mut crate::TokenFactory,
575            ) -> crate::Result<()> {
576                self.ping.reregister(poll, token_factory)
577            }
578
579            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
580                self.ping.unregister(poll)
581            }
582        }
583
584        struct WrapperSource(TransientSource<IdSource>);
585
586        impl EventSource for WrapperSource {
587            type Event = <IdSource as EventSource>::Event;
588            type Metadata = <IdSource as EventSource>::Metadata;
589            type Ret = <IdSource as EventSource>::Ret;
590            type Error = <IdSource as EventSource>::Error;
591
592            fn process_events<F>(
593                &mut self,
594                readiness: crate::Readiness,
595                token: crate::Token,
596                callback: F,
597            ) -> Result<PostAction, Self::Error>
598            where
599                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
600            {
601                let action = self.0.process_events(readiness, token, callback);
602                self.0.map(|inner| inner.id += 1);
603                action
604            }
605
606            fn register(
607                &mut self,
608                poll: &mut crate::Poll,
609                token_factory: &mut crate::TokenFactory,
610            ) -> crate::Result<()> {
611                self.0.map(|inner| inner.id += 1);
612                self.0.register(poll, token_factory)
613            }
614
615            fn reregister(
616                &mut self,
617                poll: &mut crate::Poll,
618                token_factory: &mut crate::TokenFactory,
619            ) -> crate::Result<()> {
620                self.0.map(|inner| inner.id += 1);
621                self.0.reregister(poll, token_factory)
622            }
623
624            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
625                self.0.map(|inner| inner.id += 1);
626                self.0.unregister(poll)
627            }
628        }
629
630        // To test the id later.
631        let mut id = 0;
632
633        // Create our source.
634        let (pinger, ping) = make_ping().unwrap();
635        let inner = IdSource { id, ping };
636
637        // The TransientSource wrapper.
638        let outer: TransientSource<_> = inner.into();
639
640        // The top level source.
641        let top = WrapperSource(outer);
642
643        // Create a dispatcher so we can check the source afterwards.
644        let dispatcher = Dispatcher::new(top, |got_id, _, test_id| {
645            *test_id = got_id;
646        });
647
648        let mut event_loop = crate::EventLoop::try_new().unwrap();
649        let handle = event_loop.handle();
650
651        let token = handle.register_dispatcher(dispatcher.clone()).unwrap();
652
653        // First loop run: the ping generates an event for the inner source.
654        // The ID should be 1 after the increment in register().
655        pinger.ping();
656        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
657        assert_eq!(id, 1);
658
659        // Second loop run: the ID should be 2 after the previous
660        // process_events().
661        pinger.ping();
662        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
663        assert_eq!(id, 2);
664
665        // Third loop run: the ID should be 3 after another process_events().
666        pinger.ping();
667        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
668        assert_eq!(id, 3);
669
670        // Fourth loop run: the callback is no longer called by the inner
671        // source, so our local ID is not incremented.
672        pinger.ping();
673        event_loop.dispatch(Duration::ZERO, &mut id).unwrap();
674        assert_eq!(id, 3);
675
676        // Remove the dispatcher so we can inspect the sources.
677        handle.remove(token);
678
679        let mut top_after = dispatcher.into_source_inner();
680
681        // I expect the inner source to be dropped, so the TransientSource
682        // variant is None (its version of None, not Option::None), so its map()
683        // won't call the passed-in function (hence the unreachable!()) and its
684        // return value should be Option::None.
685        assert!(top_after.0.map(|_| unreachable!()).is_none());
686    }
687
688    #[test]
689    fn test_transient_disable() {
690        // Test that disabling and enabling is handled properly.
691        struct DisablingSource(PingSource);
692
693        impl EventSource for DisablingSource {
694            type Event = ();
695            type Metadata = ();
696            type Ret = ();
697            type Error = Box<dyn std::error::Error + Sync + Send>;
698
699            fn process_events<F>(
700                &mut self,
701                readiness: crate::Readiness,
702                token: crate::Token,
703                callback: F,
704            ) -> Result<PostAction, Self::Error>
705            where
706                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
707            {
708                self.0.process_events(readiness, token, callback)?;
709                Ok(PostAction::Disable)
710            }
711
712            fn register(
713                &mut self,
714                poll: &mut crate::Poll,
715                token_factory: &mut crate::TokenFactory,
716            ) -> crate::Result<()> {
717                self.0.register(poll, token_factory)
718            }
719
720            fn reregister(
721                &mut self,
722                poll: &mut crate::Poll,
723                token_factory: &mut crate::TokenFactory,
724            ) -> crate::Result<()> {
725                self.0.reregister(poll, token_factory)
726            }
727
728            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
729                self.0.unregister(poll)
730            }
731        }
732
733        // Flag for checking when the source fires.
734        let mut fired = false;
735
736        // Create our source.
737        let (pinger, ping) = make_ping().unwrap();
738
739        let inner = DisablingSource(ping);
740
741        // The TransientSource wrapper.
742        let outer: TransientSource<_> = inner.into();
743
744        let mut event_loop = crate::EventLoop::try_new().unwrap();
745        let handle = event_loop.handle();
746        let token = handle
747            .insert_source(outer, |_, _, fired| {
748                *fired = true;
749            })
750            .unwrap();
751
752        // Ping here and not later, to check that disabling after an event is
753        // triggered but not processed does not discard the event.
754        pinger.ping();
755        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
756        assert!(fired);
757
758        // Source should now be disabled.
759        pinger.ping();
760        fired = false;
761        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
762        assert!(!fired);
763
764        // Re-enable the source.
765        handle.enable(&token).unwrap();
766
767        // Trigger another event.
768        pinger.ping();
769        fired = false;
770        event_loop.dispatch(Duration::ZERO, &mut fired).unwrap();
771        assert!(fired);
772    }
773
774    #[test]
775    fn test_transient_replace_unregister() {
776        // This is a bit of a complex test, but it essentially boils down to:
777        // how can a "parent" event source containing a TransientSource replace
778        // the "child" source without leaking the source's registration?
779
780        // First, a source that finishes immediately. This is so we cover the
781        // edge case of replacing a source as soon as it wants to be removed.
782        struct FinishImmediatelySource {
783            source: PingSource,
784            data: Option<i32>,
785            registered: bool,
786            dropped: Rc<AtomicBool>,
787        }
788
789        impl FinishImmediatelySource {
790            // The constructor passes out the drop flag so we can check that
791            // this source was or wasn't dropped.
792            fn new(source: PingSource, data: i32) -> (Self, Rc<AtomicBool>) {
793                let dropped = Rc::new(false.into());
794
795                (
796                    Self {
797                        source,
798                        data: Some(data),
799                        registered: false,
800                        dropped: Rc::clone(&dropped),
801                    },
802                    dropped,
803                )
804            }
805        }
806
807        impl EventSource for FinishImmediatelySource {
808            type Event = i32;
809            type Metadata = ();
810            type Ret = ();
811            type Error = Box<dyn std::error::Error + Sync + Send>;
812
813            fn process_events<F>(
814                &mut self,
815                readiness: crate::Readiness,
816                token: crate::Token,
817                mut callback: F,
818            ) -> Result<PostAction, Self::Error>
819            where
820                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
821            {
822                let mut data = self.data.take();
823
824                self.source.process_events(readiness, token, |_, _| {
825                    if let Some(data) = data.take() {
826                        callback(data, &mut ())
827                    }
828                })?;
829
830                self.data = data;
831
832                Ok(if self.data.is_none() {
833                    PostAction::Remove
834                } else {
835                    PostAction::Continue
836                })
837            }
838
839            fn register(
840                &mut self,
841                poll: &mut crate::Poll,
842                token_factory: &mut crate::TokenFactory,
843            ) -> crate::Result<()> {
844                self.registered = true;
845                self.source.register(poll, token_factory)
846            }
847
848            fn reregister(
849                &mut self,
850                poll: &mut crate::Poll,
851                token_factory: &mut crate::TokenFactory,
852            ) -> crate::Result<()> {
853                self.source.reregister(poll, token_factory)
854            }
855
856            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
857                self.registered = false;
858                self.source.unregister(poll)
859            }
860        }
861
862        // The drop handler sets a flag we can check for debugging (we want to
863        // know that the source itself was dropped), and also checks that the
864        // source was unregistered. Ultimately neither the source nor its
865        // registration should be leaked.
866
867        impl Drop for FinishImmediatelySource {
868            fn drop(&mut self) {
869                assert!(!self.registered, "source dropped while still registered");
870                self.dropped.store(true, Ordering::Relaxed);
871            }
872        }
873
874        // Our wrapper source handles detecting when the child source finishes,
875        // and replacing that child source with another one that will generate
876        // more events. This is one intended use case of the TransientSource.
877
878        struct WrapperSource {
879            current: TransientSource<FinishImmediatelySource>,
880            replacement: Option<FinishImmediatelySource>,
881            dropped: Rc<AtomicBool>,
882        }
883
884        impl WrapperSource {
885            // The constructor passes out the drop flag so we can check that
886            // this source was or wasn't dropped.
887            fn new(
888                first: FinishImmediatelySource,
889                second: FinishImmediatelySource,
890            ) -> (Self, Rc<AtomicBool>) {
891                let dropped = Rc::new(false.into());
892
893                (
894                    Self {
895                        current: first.into(),
896                        replacement: second.into(),
897                        dropped: Rc::clone(&dropped),
898                    },
899                    dropped,
900                )
901            }
902        }
903
904        impl EventSource for WrapperSource {
905            type Event = i32;
906            type Metadata = ();
907            type Ret = ();
908            type Error = Box<dyn std::error::Error + Sync + Send>;
909
910            fn process_events<F>(
911                &mut self,
912                readiness: crate::Readiness,
913                token: crate::Token,
914                mut callback: F,
915            ) -> Result<PostAction, Self::Error>
916            where
917                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
918            {
919                // Did our event source generate an event?
920                let mut fired = false;
921
922                let post_action = self.current.process_events(readiness, token, |data, _| {
923                    callback(data, &mut ());
924                    fired = true;
925                })?;
926
927                if fired {
928                    // The event source will be unregistered after the current
929                    // process_events() iteration is finished. The replace()
930                    // method will handle doing that even while we've added a
931                    // new source.
932                    if let Some(replacement) = self.replacement.take() {
933                        self.current.replace(replacement);
934                    }
935
936                    // Parent source is responsible for flagging this, but it's
937                    // already set.
938                    assert_eq!(post_action, PostAction::Reregister);
939                }
940
941                Ok(post_action)
942            }
943
944            fn register(
945                &mut self,
946                poll: &mut crate::Poll,
947                token_factory: &mut crate::TokenFactory,
948            ) -> crate::Result<()> {
949                self.current.register(poll, token_factory)
950            }
951
952            fn reregister(
953                &mut self,
954                poll: &mut crate::Poll,
955                token_factory: &mut crate::TokenFactory,
956            ) -> crate::Result<()> {
957                self.current.reregister(poll, token_factory)
958            }
959
960            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
961                self.current.unregister(poll)
962            }
963        }
964
965        impl Drop for WrapperSource {
966            fn drop(&mut self) {
967                self.dropped.store(true, Ordering::Relaxed);
968            }
969        }
970
971        // Construct the various nested sources - FinishImmediatelySource inside
972        // TransientSource inside WrapperSource. The numbers let us verify which
973        // event source fires first.
974        let (ping0_tx, ping0_rx) = crate::ping::make_ping().unwrap();
975        let (ping1_tx, ping1_rx) = crate::ping::make_ping().unwrap();
976        let (inner0, inner0_dropped) = FinishImmediatelySource::new(ping0_rx, 0);
977        let (inner1, inner1_dropped) = FinishImmediatelySource::new(ping1_rx, 1);
978        let (outer, outer_dropped) = WrapperSource::new(inner0, inner1);
979
980        // Now the actual test starts.
981
982        let mut event_loop: crate::EventLoop<(Option<i32>, crate::LoopSignal)> =
983            crate::EventLoop::try_new().unwrap();
984        let handle = event_loop.handle();
985        let signal = event_loop.get_signal();
986
987        // This is how we communicate with the event sources.
988        let mut context = (None, signal);
989
990        let _token = handle
991            .insert_source(outer, |data, _, (evt, sig)| {
992                *evt = Some(data);
993                sig.stop();
994            })
995            .unwrap();
996
997        // Ensure our sources fire.
998        ping0_tx.ping();
999        ping1_tx.ping();
1000
1001        // Use run() rather than dispatch() because it's not strictly part of
1002        // any API contract as to how many runs of the event loop it takes to
1003        // replace the nested source.
1004        event_loop.run(None, &mut context, |_| {}).unwrap();
1005
1006        // First, make sure the inner source actually did fire.
1007        assert_eq!(context.0.take(), Some(0), "first inner source did not fire");
1008
1009        // Make sure that the outer source is still alive.
1010        assert!(
1011            !outer_dropped.load(Ordering::Relaxed),
1012            "outer source already dropped"
1013        );
1014
1015        // Make sure that the inner child source IS dropped now.
1016        assert!(
1017            inner0_dropped.load(Ordering::Relaxed),
1018            "first inner source not dropped"
1019        );
1020
1021        // Make sure that, in between the first event and second event, the
1022        // replacement child source still exists.
1023        assert!(
1024            !inner1_dropped.load(Ordering::Relaxed),
1025            "replacement inner source dropped"
1026        );
1027
1028        // Run the event loop until we get a second event.
1029        event_loop.run(None, &mut context, |_| {}).unwrap();
1030
1031        // Ensure the replacement source fired (which checks that it was
1032        // registered and is being processed by the TransientSource).
1033        assert_eq!(context.0.take(), Some(1), "replacement source did not fire");
1034    }
1035
1036    #[test]
1037    fn test_transient_remove() {
1038        // This tests that calling remove(), even before an event source has
1039        // requested its own removal, results in the event source being removed.
1040
1041        const STOP_AT: i32 = 2;
1042
1043        // A wrapper source to automate the removal of the inner source.
1044        struct WrapperSource {
1045            inner: TransientSource<Channel<i32>>,
1046        }
1047
1048        impl EventSource for WrapperSource {
1049            type Event = i32;
1050            type Metadata = ();
1051            type Ret = ();
1052            type Error = Box<dyn std::error::Error + Sync + Send>;
1053
1054            fn process_events<F>(
1055                &mut self,
1056                readiness: crate::Readiness,
1057                token: crate::Token,
1058                mut callback: F,
1059            ) -> Result<PostAction, Self::Error>
1060            where
1061                F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
1062            {
1063                let mut remove = false;
1064
1065                let mut post_action = self.inner.process_events(readiness, token, |evt, _| {
1066                    if let Event::Msg(num) = evt {
1067                        callback(num, &mut ());
1068                        remove = num >= STOP_AT;
1069                    }
1070                })?;
1071
1072                if remove {
1073                    self.inner.remove();
1074                    post_action |= PostAction::Reregister;
1075                }
1076
1077                Ok(post_action)
1078            }
1079
1080            fn register(
1081                &mut self,
1082                poll: &mut crate::Poll,
1083                token_factory: &mut crate::TokenFactory,
1084            ) -> crate::Result<()> {
1085                self.inner.register(poll, token_factory)
1086            }
1087
1088            fn reregister(
1089                &mut self,
1090                poll: &mut crate::Poll,
1091                token_factory: &mut crate::TokenFactory,
1092            ) -> crate::Result<()> {
1093                self.inner.reregister(poll, token_factory)
1094            }
1095
1096            fn unregister(&mut self, poll: &mut crate::Poll) -> crate::Result<()> {
1097                self.inner.unregister(poll)
1098            }
1099        }
1100
1101        // Create our sources and loop.
1102
1103        let (sender, receiver) = channel();
1104        let wrapper = WrapperSource {
1105            inner: receiver.into(),
1106        };
1107
1108        let mut event_loop = crate::EventLoop::try_new().unwrap();
1109        let handle = event_loop.handle();
1110
1111        handle
1112            .insert_source(wrapper, |num, _, out: &mut Option<_>| {
1113                *out = Some(num);
1114            })
1115            .unwrap();
1116
1117        // Storage for callback data.
1118        let mut out = None;
1119
1120        // Send some data we expect to get callbacks for.
1121        for num in 0..=STOP_AT {
1122            sender.send(num).unwrap();
1123            event_loop.dispatch(Duration::ZERO, &mut out).unwrap();
1124            assert_eq!(out.take(), Some(num));
1125        }
1126
1127        // Now we expect the receiver to be gone.
1128        assert!(matches!(
1129            sender.send(STOP_AT + 1),
1130            Err(std::sync::mpsc::SendError { .. })
1131        ));
1132    }
1133}