iced_futures/subscription.rs
1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::theme;
8use crate::core::window;
9use crate::futures::Stream;
10use crate::{BoxStream, MaybeSend};
11
12use std::any::TypeId;
13use std::hash::Hash;
14
15/// A subscription event.
16#[derive(Debug, Clone, PartialEq)]
17pub enum Event {
18 /// A user interacted with a user interface in a window.
19 Interaction {
20 /// The window holding the interface of the interaction.
21 window: window::Id,
22 /// The [`Event`] describing the interaction.
23 ///
24 /// [`Event`]: event::Event
25 event: event::Event,
26
27 /// The [`event::Status`] of the interaction.
28 status: event::Status,
29 },
30
31 /// The system theme has changed.
32 SystemThemeChanged(theme::Mode),
33}
34
35/// A stream of runtime events.
36///
37/// It is the input of a [`Subscription`].
38pub type EventStream = BoxStream<Event>;
39
40/// The hasher used for identifying subscriptions.
41pub type Hasher = rustc_hash::FxHasher;
42
43/// A request to listen to external events.
44///
45/// Besides performing async actions on demand with `Task`, most
46/// applications also need to listen to external events passively.
47///
48/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
49/// and it will generate events as long as the user keeps requesting it.
50///
51/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
52/// connection, keyboard presses, mouse events, time ticks, etc.
53///
54/// # The Lifetime of a [`Subscription`]
55/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
56/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
57/// in the `subscription` function of an `application` or a `daemon`.
58///
59/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
60/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
61/// [`Stream`] and executing it in an async runtime.
62///
63/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
64/// to build a certain [`Stream`] together with some way to _identify_ it.
65///
66/// Identification is important because when a specific [`Subscription`] stops being returned to the
67/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
68/// [`Subscription`] to keep track of it.
69///
70/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
71/// and whenever necessary.
72///
73/// ```
74/// # mod iced {
75/// # pub mod time {
76/// # pub use iced_futures::backend::default::time::every;
77/// # pub use std::time::{Duration, Instant};
78/// # }
79/// #
80/// # pub use iced_futures::Subscription;
81/// # }
82/// use iced::time::{self, Duration, Instant};
83/// use iced::Subscription;
84///
85/// struct State {
86/// timer_enabled: bool,
87/// }
88///
89/// fn subscription(state: &State) -> Subscription<Instant> {
90/// if state.timer_enabled {
91/// time::every(Duration::from_secs(1))
92/// } else {
93/// Subscription::none()
94/// }
95/// }
96/// ```
97///
98/// [`Future`]: std::future::Future
99#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
100pub struct Subscription<T> {
101 recipes: Vec<Box<dyn Recipe<Output = T>>>,
102}
103
104impl<T> Subscription<T> {
105 /// Returns an empty [`Subscription`] that will not produce any output.
106 pub fn none() -> Self {
107 Self {
108 recipes: Vec::new(),
109 }
110 }
111
112 /// Returns a [`Subscription`] that will call the given function to create and
113 /// asynchronously run the given [`Stream`].
114 ///
115 /// # Creating an asynchronous worker with bidirectional communication
116 /// You can leverage this helper to create a [`Subscription`] that spawns
117 /// an asynchronous worker in the background and establish a channel of
118 /// communication with an `iced` application.
119 ///
120 /// You can achieve this by creating an `mpsc` channel inside the closure
121 /// and returning the `Sender` as a `Message` for the `Application`:
122 ///
123 /// ```
124 /// # mod iced {
125 /// # pub use iced_futures::Subscription;
126 /// # pub use iced_futures::futures;
127 /// # pub use iced_futures::stream;
128 /// # }
129 /// use iced::futures::channel::mpsc;
130 /// use iced::futures::sink::SinkExt;
131 /// use iced::futures::Stream;
132 /// use iced::stream;
133 /// use iced::Subscription;
134 ///
135 /// pub enum Event {
136 /// Ready(mpsc::Sender<Input>),
137 /// WorkFinished,
138 /// // ...
139 /// }
140 ///
141 /// enum Input {
142 /// DoSomeWork,
143 /// // ...
144 /// }
145 ///
146 /// fn some_worker() -> impl Stream<Item = Event> {
147 /// stream::channel(100, async |mut output| {
148 /// // Create channel
149 /// let (sender, mut receiver) = mpsc::channel(100);
150 ///
151 /// // Send the sender back to the application
152 /// output.send(Event::Ready(sender)).await;
153 ///
154 /// loop {
155 /// use iced_futures::futures::StreamExt;
156 ///
157 /// // Read next input sent from `Application`
158 /// let input = receiver.select_next_some().await;
159 ///
160 /// match input {
161 /// Input::DoSomeWork => {
162 /// // Do some async work...
163 ///
164 /// // Finally, we can optionally produce a message to tell the
165 /// // `Application` the work is done
166 /// output.send(Event::WorkFinished).await;
167 /// }
168 /// }
169 /// }
170 /// })
171 /// }
172 ///
173 /// fn subscription() -> Subscription<Event> {
174 /// Subscription::run(some_worker)
175 /// }
176 /// ```
177 ///
178 /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
179 /// connection open.
180 ///
181 /// [`websocket`]: https://github.com/iced-rs/iced/tree/master/examples/websocket
182 pub fn run<S>(builder: fn() -> S) -> Self
183 where
184 S: Stream<Item = T> + MaybeSend + 'static,
185 T: 'static,
186 {
187 from_recipe(Runner {
188 data: builder,
189 spawn: |builder, _| builder(),
190 })
191 }
192
193 /// Returns a [`Subscription`] that will create and asynchronously run the
194 /// given [`Stream`].
195 ///
196 /// Both the `data` and the function pointer will be used to uniquely identify
197 /// the [`Subscription`].
198 pub fn run_with<D, S>(data: D, builder: fn(&D) -> S) -> Self
199 where
200 D: Hash + 'static,
201 S: Stream<Item = T> + MaybeSend + 'static,
202 T: 'static,
203 {
204 from_recipe(Runner {
205 data: (data, builder),
206 spawn: |(data, builder), _| builder(data),
207 })
208 }
209
210 /// Batches all the provided subscriptions and returns the resulting
211 /// [`Subscription`].
212 pub fn batch(subscriptions: impl IntoIterator<Item = Subscription<T>>) -> Self {
213 Self {
214 recipes: subscriptions
215 .into_iter()
216 .flat_map(|subscription| subscription.recipes)
217 .collect(),
218 }
219 }
220
221 /// Adds a value to the [`Subscription`] context.
222 ///
223 /// The value will be part of the identity of a [`Subscription`].
224 pub fn with<A>(self, value: A) -> Subscription<(A, T)>
225 where
226 T: 'static,
227 A: std::hash::Hash + Clone + Send + Sync + 'static,
228 {
229 struct With<A, B> {
230 recipe: Box<dyn Recipe<Output = A>>,
231 value: B,
232 }
233
234 impl<A, B> Recipe for With<A, B>
235 where
236 A: 'static,
237 B: 'static + std::hash::Hash + Clone + Send + Sync,
238 {
239 type Output = (B, A);
240
241 fn hash(&self, state: &mut Hasher) {
242 std::any::TypeId::of::<B>().hash(state);
243 self.value.hash(state);
244 self.recipe.hash(state);
245 }
246
247 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
248 use futures::StreamExt;
249
250 let value = self.value;
251
252 Box::pin(
253 self.recipe
254 .stream(input)
255 .map(move |element| (value.clone(), element)),
256 )
257 }
258 }
259
260 Subscription {
261 recipes: self
262 .recipes
263 .into_iter()
264 .map(|recipe| {
265 Box::new(With {
266 recipe,
267 value: value.clone(),
268 }) as Box<dyn Recipe<Output = (A, T)>>
269 })
270 .collect(),
271 }
272 }
273
274 /// Transforms the [`Subscription`] output with the given function.
275 ///
276 /// The closure provided must be a non-capturing closure.
277 pub fn map<F, A>(self, f: F) -> Subscription<A>
278 where
279 T: 'static,
280 F: Fn(T) -> A + MaybeSend + Clone + 'static,
281 A: 'static,
282 {
283 const {
284 check_zero_sized::<F>();
285 }
286
287 struct Map<A, B, F>
288 where
289 F: Fn(A) -> B + 'static,
290 {
291 recipe: Box<dyn Recipe<Output = A>>,
292 mapper: F,
293 }
294
295 impl<A, B, F> Recipe for Map<A, B, F>
296 where
297 A: 'static,
298 B: 'static,
299 F: Fn(A) -> B + 'static + MaybeSend,
300 {
301 type Output = B;
302
303 fn hash(&self, state: &mut Hasher) {
304 TypeId::of::<F>().hash(state);
305 self.recipe.hash(state);
306 }
307
308 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
309 use futures::StreamExt;
310
311 Box::pin(self.recipe.stream(input).map(self.mapper))
312 }
313 }
314
315 Subscription {
316 recipes: self
317 .recipes
318 .into_iter()
319 .map(|recipe| {
320 Box::new(Map {
321 recipe,
322 mapper: f.clone(),
323 }) as Box<dyn Recipe<Output = A>>
324 })
325 .collect(),
326 }
327 }
328
329 /// Transforms the [`Subscription`] output with the given function, yielding only
330 /// values only when the function returns `Some(A)`.
331 ///
332 /// The closure provided must be a non-capturing closure.
333 pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
334 where
335 T: MaybeSend + 'static,
336 F: Fn(T) -> Option<A> + MaybeSend + Clone + 'static,
337 A: MaybeSend + 'static,
338 {
339 const {
340 check_zero_sized::<F>();
341 }
342
343 struct FilterMap<A, B, F>
344 where
345 F: Fn(A) -> Option<B> + 'static,
346 {
347 recipe: Box<dyn Recipe<Output = A>>,
348 mapper: F,
349 }
350
351 impl<A, B, F> Recipe for FilterMap<A, B, F>
352 where
353 A: 'static,
354 B: 'static + MaybeSend,
355 F: Fn(A) -> Option<B> + MaybeSend,
356 {
357 type Output = B;
358
359 fn hash(&self, state: &mut Hasher) {
360 TypeId::of::<F>().hash(state);
361 self.recipe.hash(state);
362 }
363
364 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
365 use futures::StreamExt;
366 use futures::future;
367
368 let mapper = self.mapper;
369
370 Box::pin(
371 self.recipe
372 .stream(input)
373 .filter_map(move |a| future::ready(mapper(a))),
374 )
375 }
376 }
377
378 Subscription {
379 recipes: self
380 .recipes
381 .drain(..)
382 .map(|recipe| {
383 Box::new(FilterMap {
384 recipe,
385 mapper: f.clone(),
386 }) as Box<dyn Recipe<Output = A>>
387 })
388 .collect(),
389 }
390 }
391
392 /// Returns the amount of recipe units in this [`Subscription`].
393 pub fn units(&self) -> usize {
394 self.recipes.len()
395 }
396}
397
398/// Creates a [`Subscription`] from a [`Recipe`] describing it.
399pub fn from_recipe<T>(recipe: impl Recipe<Output = T> + 'static) -> Subscription<T> {
400 Subscription {
401 recipes: vec![Box::new(recipe)],
402 }
403}
404
405/// Returns the different recipes of the [`Subscription`].
406pub fn into_recipes<T>(subscription: Subscription<T>) -> Vec<Box<dyn Recipe<Output = T>>> {
407 subscription.recipes
408}
409
410impl<T> std::fmt::Debug for Subscription<T> {
411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 f.debug_struct("Subscription").finish()
413 }
414}
415
416/// The description of a [`Subscription`].
417///
418/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
419/// by runtimes to run and identify subscriptions. You can use it to create your
420/// own!
421pub trait Recipe {
422 /// The events that will be produced by a [`Subscription`] with this
423 /// [`Recipe`].
424 type Output;
425
426 /// Hashes the [`Recipe`].
427 ///
428 /// This is used by runtimes to uniquely identify a [`Subscription`].
429 fn hash(&self, state: &mut Hasher);
430
431 /// Executes the [`Recipe`] and produces the stream of events of its
432 /// [`Subscription`].
433 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
434}
435
436/// Creates a [`Subscription`] from a hashable id and a filter function.
437pub fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
438where
439 I: Hash + 'static,
440 F: Fn(Event) -> Option<T> + MaybeSend + 'static,
441 T: 'static + MaybeSend,
442{
443 from_recipe(Runner {
444 data: id,
445 spawn: |_, events| {
446 use futures::future;
447 use futures::stream::StreamExt;
448
449 events.filter_map(move |event| future::ready(f(event)))
450 },
451 })
452}
453
454struct Runner<I, F, S, T>
455where
456 F: FnOnce(&I, EventStream) -> S,
457 S: Stream<Item = T>,
458{
459 data: I,
460 spawn: F,
461}
462
463impl<I, F, S, T> Recipe for Runner<I, F, S, T>
464where
465 I: Hash + 'static,
466 F: FnOnce(&I, EventStream) -> S,
467 S: Stream<Item = T> + MaybeSend + 'static,
468{
469 type Output = T;
470
471 fn hash(&self, state: &mut Hasher) {
472 std::any::TypeId::of::<I>().hash(state);
473 self.data.hash(state);
474 }
475
476 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
477 crate::boxed_stream((self.spawn)(&self.data, input))
478 }
479}
480
481const fn check_zero_sized<T>() {
482 if std::mem::size_of::<T>() != 0 {
483 panic!(
484 "The Subscription closure provided is not non-capturing. \
485 Closures given to Subscription::map or filter_map cannot \
486 capture external variables. If you need to capture state, \
487 consider using Subscription::with."
488 );
489 }
490}