Skip to main content

iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18    /// A user interacted with a user interface in a window.
19    Interaction {
20        /// The window holding the interface of the interaction.
21        window: window::Id,
22        /// The [`Event`] describing the interaction.
23        ///
24        /// [`Event`]: event::Event
25        event: event::Event,
26
27        /// The [`event::Status`] of the interaction.
28        status: event::Status,
29    },
30
31    /// The system theme has changed.
32    SystemThemeChanged(theme::Mode),
33}
34
35/// A stream of runtime events.
36///
37/// It is the input of a [`Subscription`].
38pub type EventStream = BoxStream<Event>;
39
40/// The hasher used for identifying subscriptions.
41pub type Hasher = rustc_hash::FxHasher;
42
43/// A request to listen to external events.
44///
45/// Besides performing async actions on demand with `Task`, most
46/// applications also need to listen to external events passively.
47///
48/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
49/// and it will generate events as long as the user keeps requesting it.
50///
51/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
52/// connection, keyboard presses, mouse events, time ticks, etc.
53///
54/// # The Lifetime of a [`Subscription`]
55/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
56/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
57/// in the `subscription` function of an `application` or a `daemon`.
58///
59/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
60/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
61/// [`Stream`] and executing it in an async runtime.
62///
63/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
64/// to build a certain [`Stream`] together with some way to _identify_ it.
65///
66/// Identification is important because when a specific [`Subscription`] stops being returned to the
67/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
68/// [`Subscription`] to keep track of it.
69///
70/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
71/// and whenever necessary.
72///
73/// ```
74/// # mod iced {
75/// #     pub mod time {
76/// #         pub use iced_futures::backend::default::time::every;
77/// #         pub use std::time::{Duration, Instant};
78/// #     }
79/// #
80/// #     pub use iced_futures::Subscription;
81/// # }
82/// use iced::time::{self, Duration, Instant};
83/// use iced::Subscription;
84///
85/// struct State {
86///     timer_enabled: bool,
87/// }
88///
89/// fn subscription(state: &State) -> Subscription<Instant> {
90///     if state.timer_enabled {
91///         time::every(Duration::from_secs(1))
92///     } else {
93///         Subscription::none()
94///     }
95/// }
96/// ```
97///
98/// [`Future`]: std::future::Future
99#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
100pub struct Subscription<T> {
101    recipes: Vec<Box<dyn Recipe<Output = T>>>,
102}
103
104impl<T> Subscription<T> {
105    /// Returns an empty [`Subscription`] that will not produce any output.
106    pub fn none() -> Self {
107        Self {
108            recipes: Vec::new(),
109        }
110    }
111
112    /// Returns a [`Subscription`] that will call the given function to create and
113    /// asynchronously run the given [`Stream`].
114    ///
115    /// # Creating an asynchronous worker with bidirectional communication
116    /// You can leverage this helper to create a [`Subscription`] that spawns
117    /// an asynchronous worker in the background and establish a channel of
118    /// communication with an `iced` application.
119    ///
120    /// You can achieve this by creating an `mpsc` channel inside the closure
121    /// and returning the `Sender` as a `Message` for the `Application`:
122    ///
123    /// ```
124    /// # mod iced {
125    /// #     pub use iced_futures::Subscription;   
126    /// #     pub use iced_futures::futures;
127    /// #     pub use iced_futures::stream;
128    /// # }
129    /// use iced::futures::channel::mpsc;
130    /// use iced::futures::sink::SinkExt;
131    /// use iced::futures::Stream;
132    /// use iced::stream;
133    /// use iced::Subscription;
134    ///
135    /// pub enum Event {
136    ///     Ready(mpsc::Sender<Input>),
137    ///     WorkFinished,
138    ///     // ...
139    /// }
140    ///
141    /// enum Input {
142    ///     DoSomeWork,
143    ///     // ...
144    /// }
145    ///
146    /// fn some_worker() -> impl Stream<Item = Event> {
147    ///     stream::channel(100, async |mut output| {
148    ///         // Create channel
149    ///         let (sender, mut receiver) = mpsc::channel(100);
150    ///
151    ///         // Send the sender back to the application
152    ///         output.send(Event::Ready(sender)).await;
153    ///
154    ///         loop {
155    ///             use iced_futures::futures::StreamExt;
156    ///
157    ///             // Read next input sent from `Application`
158    ///             let input = receiver.select_next_some().await;
159    ///
160    ///             match input {
161    ///                 Input::DoSomeWork => {
162    ///                     // Do some async work...
163    ///
164    ///                     // Finally, we can optionally produce a message to tell the
165    ///                     // `Application` the work is done
166    ///                     output.send(Event::WorkFinished).await;
167    ///                 }
168    ///             }
169    ///         }
170    ///     })
171    /// }
172    ///
173    /// fn subscription() -> Subscription<Event> {
174    ///     Subscription::run(some_worker)
175    /// }
176    /// ```
177    ///
178    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
179    /// connection open.
180    ///
181    /// [`websocket`]: https://github.com/iced-rs/iced/tree/master/examples/websocket
182    pub fn run<S>(builder: fn() -> S) -> Self
183    where
184        S: Stream<Item = T> + MaybeSend + 'static,
185        T: 'static,
186    {
187        from_recipe(Runner {
188            data: builder,
189            spawn: |builder, _| builder(),
190        })
191    }
192
193    /// Returns a [`Subscription`] that will create and asynchronously run the
194    /// given [`Stream`].
195    ///
196    /// Both the `data` and the function pointer will be used to uniquely identify
197    /// the [`Subscription`].
198    pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
199    where
200        D: Hash + 'static,
201        S: Stream<Item = T> + MaybeSend + 'static,
202        T: 'static,
203    {
204        from_recipe(Runner {
205            data: (data, builder),
206            spawn: |(data, builder), _| builder(data),
207        })
208    }
209
210    /// Batches all the provided subscriptions and returns the resulting
211    /// [`Subscription`].
212    pub fn batch(subscriptions: impl IntoIterator<Item = Subscription<T>>) -> Self {
213        Self {
214            recipes: subscriptions
215                .into_iter()
216                .flat_map(|subscription| subscription.recipes)
217                .collect(),
218        }
219    }
220
221    /// Adds a value to the [`Subscription`] context.
222    ///
223    /// The value will be part of the identity of a [`Subscription`].
224    pub fn with<A>(self, value: A) -> Subscription<(A, T)>
225    where
226        T: 'static,
227        A: std::hash::Hash + Clone + Send + Sync + 'static,
228    {
229        struct With<A, B> {
230            recipe: Box<dyn Recipe<Output = A>>,
231            value: B,
232        }
233
234        impl<A, B> Recipe for With<A, B>
235        where
236            A: 'static,
237            B: 'static + std::hash::Hash + Clone + Send + Sync,
238        {
239            type Output = (B, A);
240
241            fn hash(&self, state: &mut Hasher) {
242                std::any::TypeId::of::<B>().hash(state);
243                self.value.hash(state);
244                self.recipe.hash(state);
245            }
246
247            fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
248                use futures::StreamExt;
249
250                let value = self.value;
251
252                Box::pin(
253                    self.recipe
254                        .stream(input)
255                        .map(move |element| (value.clone(), element)),
256                )
257            }
258        }
259
260        Subscription {
261            recipes: self
262                .recipes
263                .into_iter()
264                .map(|recipe| {
265                    Box::new(With {
266                        recipe,
267                        value: value.clone(),
268                    }) as Box<dyn Recipe<Output = (A, T)>>
269                })
270                .collect(),
271        }
272    }
273
274    /// Transforms the [`Subscription`] output with the given function.
275    ///
276    /// The closure provided must be a non-capturing closure.
277    pub fn map<F, A>(self, f: F) -> Subscription<A>
278    where
279        T: 'static,
280        F: Fn(T) -> A + MaybeSend + Clone + 'static,
281        A: 'static,
282    {
283        const {
284            check_zero_sized::<F>();
285        }
286
287        struct Map<A, B, F>
288        where
289            F: Fn(A) -> B + 'static,
290        {
291            recipe: Box<dyn Recipe<Output = A>>,
292            mapper: F,
293        }
294
295        impl<A, B, F> Recipe for Map<A, B, F>
296        where
297            A: 'static,
298            B: 'static,
299            F: Fn(A) -> B + 'static + MaybeSend,
300        {
301            type Output = B;
302
303            fn hash(&self, state: &mut Hasher) {
304                TypeId::of::<F>().hash(state);
305                self.recipe.hash(state);
306            }
307
308            fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
309                use futures::StreamExt;
310
311                Box::pin(self.recipe.stream(input).map(self.mapper))
312            }
313        }
314
315        Subscription {
316            recipes: self
317                .recipes
318                .into_iter()
319                .map(|recipe| {
320                    Box::new(Map {
321                        recipe,
322                        mapper: f.clone(),
323                    }) as Box<dyn Recipe<Output = A>>
324                })
325                .collect(),
326        }
327    }
328
329    /// Transforms the [`Subscription`] output with the given function, yielding only
330    /// values only when the function returns `Some(A)`.
331    ///
332    /// The closure provided must be a non-capturing closure.
333    pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
334    where
335        T: MaybeSend + 'static,
336        F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
337        A: MaybeSend + 'static,
338    {
339        const {
340            check_zero_sized::<F>();
341        }
342
343        struct FilterMap<A, B, F>
344        where
345            F: Fn(A) -> Option<B> + 'static,
346        {
347            recipe: Box<dyn Recipe<Output = A>>,
348            mapper: F,
349        }
350
351        impl<A, B, F> Recipe for FilterMap<A, B, F>
352        where
353            A: 'static,
354            B: 'static + MaybeSend,
355            F: Fn(A) -> Option<B> + MaybeSend,
356        {
357            type Output = B;
358
359            fn hash(&self, state: &mut Hasher) {
360                TypeId::of::<F>().hash(state);
361                self.recipe.hash(state);
362            }
363
364            fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
365                use futures::StreamExt;
366                use futures::future;
367
368                let mapper = self.mapper;
369
370                Box::pin(
371                    self.recipe
372                        .stream(input)
373                        .filter_map(move |a| future::ready(mapper(a))),
374                )
375            }
376        }
377
378        Subscription {
379            recipes: self
380                .recipes
381                .drain(..)
382                .map(|recipe| {
383                    Box::new(FilterMap {
384                        recipe,
385                        mapper: f.clone(),
386                    }) as Box<dyn Recipe<Output = A>>
387                })
388                .collect(),
389        }
390    }
391
392    /// Returns the amount of recipe units in this [`Subscription`].
393    pub fn units(&self) -> usize {
394        self.recipes.len()
395    }
396}
397
398/// Creates a [`Subscription`] from a [`Recipe`] describing it.
399pub fn from_recipe<T>(recipe: impl Recipe<Output = T> + 'static) -> Subscription<T> {
400    Subscription {
401        recipes: vec![Box::new(recipe)],
402    }
403}
404
405/// Returns the different recipes of the [`Subscription`].
406pub fn into_recipes<T>(subscription: Subscription<T>) -> Vec<Box<dyn Recipe<Output = T>>> {
407    subscription.recipes
408}
409
410impl<T> std::fmt::Debug for Subscription<T> {
411    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412        f.debug_struct("Subscription").finish()
413    }
414}
415
416/// The description of a [`Subscription`].
417///
418/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
419/// by runtimes to run and identify subscriptions. You can use it to create your
420/// own!
421pub trait Recipe {
422    /// The events that will be produced by a [`Subscription`] with this
423    /// [`Recipe`].
424    type Output;
425
426    /// Hashes the [`Recipe`].
427    ///
428    /// This is used by runtimes to uniquely identify a [`Subscription`].
429    fn hash(&self, state: &mut Hasher);
430
431    /// Executes the [`Recipe`] and produces the stream of events of its
432    /// [`Subscription`].
433    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
434}
435
436/// Creates a [`Subscription`] from a hashable id and a filter function.
437pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
438where
439    I: Hash + 'static,
440    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
441    T: 'static + MaybeSend,
442{
443    from_recipe(Runner {
444        data: id,
445        spawn: |_, events| {
446            use futures::future;
447            use futures::stream::StreamExt;
448
449            events.filter_map(move |event| future::ready(f(event)))
450        },
451    })
452}
453
454struct Runner<I, F, S, T>
455where
456    F: FnOnce(&I, EventStream) -> S,
457    S: Stream<Item = T>,
458{
459    data: I,
460    spawn: F,
461}
462
463impl<I, F, S, T> Recipe for Runner<I, F, S, T>
464where
465    I: Hash + 'static,
466    F: FnOnce(&I, EventStream) -> S,
467    S: Stream<Item = T> + MaybeSend + 'static,
468{
469    type Output = T;
470
471    fn hash(&self, state: &mut Hasher) {
472        std::any::TypeId::of::<I>().hash(state);
473        self.data.hash(state);
474    }
475
476    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
477        crate::boxed_stream((self.spawn)(&self.data, input))
478    }
479}
480
481const fn check_zero_sized<T>() {
482    if std::mem::size_of::<T>() != 0 {
483        panic!(
484            "The Subscription closure provided is not non-capturing. \
485            Closures given to Subscription::map or filter_map cannot \
486            capture external variables. If you need to capture state, \
487            consider using Subscription::with."
488        );
489    }
490}