zest_core/application/subscription.rs
1//! Long-lived message sources composed of one or more [`Recipe`]s.
2
3pub mod recipe;
4pub mod slot;
5pub mod subscription_gen;
6
7pub use recipe::Recipe;
8pub use slot::Slot;
9pub use subscription_gen::SubscriptionGen;
10
11use recipe::recipe_id;
12
13use alloc::{boxed::Box, vec::Vec};
14use core::{
15 future::{pending, poll_fn},
16 pin::Pin,
17 task::Poll,
18};
19
20/// A long-running message source built from one or more [`Recipe`]s.
21///
22/// Use [`Subscription::none`] for "no subscription". For active sources:
23/// - [`Subscription::from_recipe`] wraps a single Recipe.
24/// - [`Subscription::batch`] combines multiple Subscriptions into one
25/// whose recipes fire concurrently.
26///
27/// The runtime refreshes subscriptions after every processed message and
28/// diffs slot-by-slot by Recipe identity: recipes whose identity matches
29/// across refreshes keep their pending futures intact; new recipes spawn;
30/// missing recipes are cancelled.
31pub struct Subscription<M: Clone + 'static> {
32 pub(crate) slots: Vec<Slot<M>>,
33}
34
35impl<M: Clone + 'static> Subscription<M> {
36 /// No subscription. Empty `slots`.
37 #[must_use]
38 pub fn none() -> Self {
39 Self { slots: Vec::new() }
40 }
41
42 /// Build a subscription from a single [`Recipe`]. Identity =
43 /// `TypeId::of::<R>()` ⊕ `Hash` of `recipe`.
44 #[must_use]
45 pub fn from_recipe<R>(mut recipe: R) -> Self
46 where
47 R: Recipe<Message = M>,
48 {
49 let id = recipe_id(&recipe);
50 let spawn: SubscriptionGen<M> = Box::new(move || recipe.next());
51 let mut slots = Vec::with_capacity(1);
52 slots.push(Slot {
53 id,
54 spawn: Some(spawn),
55 pending: None,
56 });
57 Self { slots }
58 }
59
60 /// Combine multiple subscriptions; each underlying recipe keeps its
61 /// individual identity. Nested batches flatten.
62 #[must_use]
63 pub fn batch(subs: impl IntoIterator<Item = Subscription<M>>) -> Self {
64 let mut slots = Vec::new();
65 for sub in subs {
66 slots.extend(sub.slots);
67 }
68 Self { slots }
69 }
70
71 /// Wait for the next message from any active recipe. Lazy-spawns
72 /// each slot's pending future on first poll. If empty (or all
73 /// recipes exhausted), pends forever — safe to use as a `select`
74 /// arm.
75 pub(crate) async fn next(&mut self) -> M {
76 if self.slots.is_empty() {
77 return pending().await;
78 }
79
80 poll_fn(|cx| {
81 for slot in &mut self.slots {
82 if slot.pending.is_none() {
83 let Some(spawn_fn) = slot.spawn.as_mut() else {
84 continue;
85 };
86 slot.pending = spawn_fn();
87 if slot.pending.is_none() {
88 slot.spawn = None; // recipe exhausted; mark dead
89 continue;
90 }
91 }
92 let fut = slot.pending.as_mut().expect("just spawned above");
93 if let Poll::Ready(msg) = Pin::as_mut(fut).poll(cx) {
94 slot.pending = None;
95 return Poll::Ready(msg);
96 }
97 }
98 Poll::Pending
99 })
100 .await
101 }
102
103 /// Diff `new` against `self` slot-by-slot. Slots with matching ids
104 /// keep their pending futures intact; slots only in `new` start
105 /// fresh; slots only in `self` are dropped (cancelled).
106 pub(crate) fn refresh(&mut self, new: Subscription<M>) {
107 let mut new_slots = new.slots;
108 for new_slot in new_slots.iter_mut() {
109 if let Some(pos) = self.slots.iter().position(|slot| slot.id == new_slot.id) {
110 let old = self.slots.swap_remove(pos);
111 new_slot.pending = old.pending;
112 }
113 }
114 // Anything left in self.slots had no match in new; dropped.
115 self.slots = new_slots;
116 }
117}
118
119impl<M: Clone + 'static> Default for Subscription<M> {
120 fn default() -> Self {
121 Self::none()
122 }
123}