Skip to main content

zest_core/application/
subscription.rs

1//! Long-lived message sources composed of one or more [`Recipe`]s.
2
3pub mod recipe;
4pub mod slot;
5pub mod subscription_gen;
6
7pub use recipe::Recipe;
8pub use slot::Slot;
9pub use subscription_gen::SubscriptionGen;
10
11use recipe::recipe_id;
12
13use alloc::{boxed::Box, vec::Vec};
14use core::{
15    future::{pending, poll_fn},
16    pin::Pin,
17    task::Poll,
18};
19
20/// A long-running message source built from one or more [`Recipe`]s.
21///
22/// Use [`Subscription::none`] for "no subscription". For active sources:
23/// - [`Subscription::from_recipe`] wraps a single Recipe.
24/// - [`Subscription::batch`] combines multiple Subscriptions into one
25///   whose recipes fire concurrently.
26///
27/// The runtime refreshes subscriptions after every processed message and
28/// diffs slot-by-slot by Recipe identity: recipes whose identity matches
29/// across refreshes keep their pending futures intact; new recipes spawn;
30/// missing recipes are cancelled.
31pub struct Subscription<M: Clone + 'static> {
32    pub(crate) slots: Vec<Slot<M>>,
33}
34
35impl<M: Clone + 'static> Subscription<M> {
36    /// No subscription. Empty `slots`.
37    #[must_use]
38    pub fn none() -> Self {
39        Self { slots: Vec::new() }
40    }
41
42    /// Build a subscription from a single [`Recipe`]. Identity =
43    /// `TypeId::of::<R>()` ⊕ `Hash` of `recipe`.
44    #[must_use]
45    pub fn from_recipe<R>(mut recipe: R) -> Self
46    where
47        R: Recipe<Message = M>,
48    {
49        let id = recipe_id(&recipe);
50        let spawn: SubscriptionGen<M> = Box::new(move || recipe.next());
51        let mut slots = Vec::with_capacity(1);
52        slots.push(Slot {
53            id,
54            spawn: Some(spawn),
55            pending: None,
56        });
57        Self { slots }
58    }
59
60    /// Combine multiple subscriptions; each underlying recipe keeps its
61    /// individual identity. Nested batches flatten.
62    #[must_use]
63    pub fn batch(subs: impl IntoIterator<Item = Subscription<M>>) -> Self {
64        let mut slots = Vec::new();
65        for sub in subs {
66            slots.extend(sub.slots);
67        }
68        Self { slots }
69    }
70
71    /// Wait for the next message from any active recipe. Lazy-spawns
72    /// each slot's pending future on first poll. If empty (or all
73    /// recipes exhausted), pends forever — safe to use as a `select`
74    /// arm.
75    pub(crate) async fn next(&mut self) -> M {
76        if self.slots.is_empty() {
77            return pending().await;
78        }
79
80        poll_fn(|cx| {
81            for slot in &mut self.slots {
82                if slot.pending.is_none() {
83                    let Some(spawn_fn) = slot.spawn.as_mut() else {
84                        continue;
85                    };
86                    slot.pending = spawn_fn();
87                    if slot.pending.is_none() {
88                        slot.spawn = None; // recipe exhausted; mark dead
89                        continue;
90                    }
91                }
92                let fut = slot.pending.as_mut().expect("just spawned above");
93                if let Poll::Ready(msg) = Pin::as_mut(fut).poll(cx) {
94                    slot.pending = None;
95                    return Poll::Ready(msg);
96                }
97            }
98            Poll::Pending
99        })
100        .await
101    }
102
103    /// Diff `new` against `self` slot-by-slot. Slots with matching ids
104    /// keep their pending futures intact; slots only in `new` start
105    /// fresh; slots only in `self` are dropped (cancelled).
106    pub(crate) fn refresh(&mut self, new: Subscription<M>) {
107        let mut new_slots = new.slots;
108        for new_slot in new_slots.iter_mut() {
109            if let Some(pos) = self.slots.iter().position(|slot| slot.id == new_slot.id) {
110                let old = self.slots.swap_remove(pos);
111                new_slot.pending = old.pending;
112            }
113        }
114        // Anything left in self.slots had no match in new; dropped.
115        self.slots = new_slots;
116    }
117}
118
119impl<M: Clone + 'static> Default for Subscription<M> {
120    fn default() -> Self {
121        Self::none()
122    }
123}