hyphae/map_query/share.rs
1//! Clone-able multicast handle that subscribes a [`MapQuery`] upstream lazily
2//! on the first downstream install, then fans diffs out to N consumers.
3//!
4//! Mirrors [`crate::pipeline::SharedPipeline`] for value pipelines, but the
5//! signal type is [`MapDiff`] and the upstream may produce multiple
6//! [`SubscriptionGuard`]s (one per root source).
7//!
8//! # Hot-path emission cost
9//!
10//! Each upstream diff costs:
11//! * one Mutex-acquire to apply the diff to a small in-memory snapshot
12//! (needed so late-binding subscribers can be replayed an Initial), and
13//! * one [`ArcSwap::load`] to pull the subscriber list, plus N callback
14//! invocations to fan the diff out.
15//!
16//! No mutex on the subscriber list during fanout, no per-emission allocation
17//! beyond what the in-place state mutation already needs. Compared to
18//! materialize-then-clone-cell — where each shared consumer would walk the
19//! full per-key cell + diffs_cell pipeline — this saves the per-share-point
20//! `CellMap` allocation and most of its wiring.
21//!
22//! # Lifecycle
23//!
24//! On the first downstream install, the share point consumes the wrapped
25//! upstream plan and installs once. The returned upstream guards are kept
26//! until the last shared subscriber drops, at which point they are released.
27//! The upstream plan was consumed on first install, so a fully drained
28//! `SharedMapQuery` does not reactivate — hold the handle (or one
29//! materialized leaf) for as long as you want the shared work to run.
30
31use std::{
32 collections::HashMap,
33 hash::Hash,
34 sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use uuid::Uuid;
39
40use crate::{
41 cell_map::MapDiff,
42 map_query::{MapDiffSink, MapQuery, MapQueryInstall},
43 subscription::SubscriptionGuard,
44 traits::CellValue,
45};
46
47type DiffSubscriber<K, V> = Arc<dyn Fn(&MapDiff<K, V>) + Send + Sync>;
48
49/// Type-erased one-shot installer for the wrapped upstream plan. `MapQuery`'s
50/// `install` consumes `self`, so we wrap a one-shot `FnOnce` in a slot and
51/// take it on the first downstream install.
52type UpstreamInstall<K, V> =
53 Box<dyn FnOnce(MapDiffSink<K, V>) -> Vec<SubscriptionGuard> + Send + Sync>;
54
55pub(crate) struct SharedMapQueryInner<K, V>
56where
57 K: CellValue + Hash + Eq,
58 V: CellValue,
59{
60 /// One-shot upstream installer. Taken on the first downstream install;
61 /// `None` thereafter. Drained share points do not reactivate.
62 upstream: Mutex<Option<UpstreamInstall<K, V>>>,
63 /// Held subscriptions on the upstream plan once installed. Released when
64 /// the last shared subscriber drops.
65 upstream_guards: Mutex<Vec<SubscriptionGuard>>,
66 /// In-memory snapshot of the joined / projected state. Applied on every
67 /// diff so a late-binding subscriber can be replayed an Initial without
68 /// re-running the upstream plan.
69 state: Mutex<HashMap<K, V>>,
70 /// Lock-free subscriber registry. Read on every emission (`.load()`),
71 /// rebuilt on subscribe / unsubscribe under the writer mutex.
72 subscribers: ArcSwap<Vec<(Uuid, DiffSubscriber<K, V>)>>,
73 /// Serializes mutation of `subscribers`. Emission readers do not acquire.
74 subs_writer: Mutex<()>,
75}
76
77impl<K, V> SharedMapQueryInner<K, V>
78where
79 K: CellValue + Hash + Eq,
80 V: CellValue,
81{
82 fn add_subscriber(&self, id: Uuid, cb: DiffSubscriber<K, V>) {
83 let _w = self.subs_writer.lock().expect("share subs_writer poisoned");
84 let current = self.subscribers.load();
85 let mut next = (**current).clone();
86 next.push((id, cb));
87 self.subscribers.store(Arc::new(next));
88 }
89
90 /// Returns the remaining subscriber count after removal.
91 fn remove_subscriber(&self, id: Uuid) -> usize {
92 let _w = self.subs_writer.lock().expect("share subs_writer poisoned");
93 let current = self.subscribers.load();
94 let mut next: Vec<(Uuid, DiffSubscriber<K, V>)> = (**current)
95 .iter()
96 .filter(|(i, _)| *i != id)
97 .cloned()
98 .collect();
99 let remaining = next.len();
100 next.shrink_to_fit();
101 self.subscribers.store(Arc::new(next));
102 remaining
103 }
104
105 /// Apply a diff to the in-memory snapshot. Called from the fanout closure
106 /// before downstream callbacks fire so that any concurrent late-binding
107 /// install sees a consistent state.
108 fn apply_diff(&self, diff: &MapDiff<K, V>) {
109 let mut state = self.state.lock().expect("share state poisoned");
110 Self::apply_diff_into(&mut state, diff);
111 }
112
113 fn apply_diff_into(state: &mut HashMap<K, V>, diff: &MapDiff<K, V>) {
114 match diff {
115 MapDiff::Initial { entries } => {
116 state.clear();
117 state.reserve(entries.len());
118 for (k, v) in entries {
119 state.insert(k.clone(), v.clone());
120 }
121 }
122 MapDiff::Insert { key, value } => {
123 state.insert(key.clone(), value.clone());
124 }
125 MapDiff::Remove { key, .. } => {
126 state.remove(key);
127 }
128 MapDiff::Update { key, new_value, .. } => {
129 state.insert(key.clone(), new_value.clone());
130 }
131 MapDiff::Batch { changes } => {
132 for c in changes {
133 Self::apply_diff_into(state, c);
134 }
135 }
136 }
137 }
138
139 fn snapshot_initial(&self) -> MapDiff<K, V> {
140 let state = self.state.lock().expect("share state poisoned");
141 let entries: Vec<(K, V)> = state.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
142 MapDiff::Initial { entries }
143 }
144}
145
146/// Clone-able multicast handle for fanning a [`MapQuery`] out to many
147/// consumers without an intermediate [`crate::CellMap`].
148///
149/// Cloning is an `Arc` bump and does not subscribe upstream. Subscription
150/// happens lazily on the first downstream `materialize()` (or other install).
151/// Once installed, the upstream subscription stays live until the last
152/// shared subscriber drops.
153pub struct SharedMapQuery<K, V>
154where
155 K: CellValue + Hash + Eq,
156 V: CellValue,
157{
158 inner: Arc<SharedMapQueryInner<K, V>>,
159}
160
161impl<K, V> Clone for SharedMapQuery<K, V>
162where
163 K: CellValue + Hash + Eq,
164 V: CellValue,
165{
166 fn clone(&self) -> Self {
167 Self {
168 inner: Arc::clone(&self.inner),
169 }
170 }
171}
172
173impl<K, V> SharedMapQuery<K, V>
174where
175 K: CellValue + Hash + Eq,
176 V: CellValue,
177{
178 /// Wrap a map query in a shared, multicast handle.
179 ///
180 /// Prefer the [`MapQueryShareExt::share`] extension method — it reads as
181 /// `query.share()` at the call site.
182 pub fn new<Q: MapQuery<K, V>>(q: Q) -> Self {
183 let upstream: UpstreamInstall<K, V> = Box::new(move |sink| q.install(sink));
184 Self {
185 inner: Arc::new(SharedMapQueryInner {
186 upstream: Mutex::new(Some(upstream)),
187 upstream_guards: Mutex::new(Vec::new()),
188 state: Mutex::new(HashMap::new()),
189 subscribers: ArcSwap::from_pointee(Vec::new()),
190 subs_writer: Mutex::new(()),
191 }),
192 }
193 }
194}
195
196impl<K, V> MapQueryInstall<K, V> for SharedMapQuery<K, V>
197where
198 K: CellValue + Hash + Eq,
199 V: CellValue,
200{
201 fn install(self, sink: MapDiffSink<K, V>) -> Vec<SubscriptionGuard> {
202 let id = Uuid::new_v4();
203
204 // Decide whether this is the first install. If the upstream slot is
205 // still populated, we will:
206 // 1. Register `sink` first so the synchronous Initial diff emitted
207 // by upstream during install reaches it.
208 // 2. Take and run the upstream installer.
209 // 3. Stash the returned guards.
210 //
211 // If the upstream has already been installed by an earlier consumer,
212 // we instead:
213 // 1. Register `sink`.
214 // 2. Synthesize an Initial from our state snapshot and deliver it
215 // to the new `sink` so it sees a coherent starting state.
216 let upstream_take = {
217 let mut slot = self.inner.upstream.lock().expect("share upstream poisoned");
218 slot.take()
219 };
220
221 if let Some(install_fn) = upstream_take {
222 self.inner.add_subscriber(id, sink);
223 let weak = Arc::downgrade(&self.inner);
224 let fanout: MapDiffSink<K, V> = Arc::new(move |diff: &MapDiff<K, V>| {
225 let Some(inner) = weak.upgrade() else {
226 return;
227 };
228 // 1. Update internal state under its own Mutex.
229 inner.apply_diff(diff);
230 // 2. Lock-free fanout to the subscriber snapshot.
231 let subs = inner.subscribers.load();
232 for (_, cb) in subs.iter() {
233 cb(diff);
234 }
235 });
236 let guards = install_fn(fanout);
237 let mut slot = self
238 .inner
239 .upstream_guards
240 .lock()
241 .expect("share upstream_guards poisoned");
242 slot.extend(guards);
243 } else {
244 // Replay current state to the late-binding subscriber, then
245 // register so it picks up subsequent diffs. Order matters: if we
246 // registered first, an upstream emission landing between
247 // registration and our manual Initial would arrive before the
248 // Initial — out-of-order.
249 //
250 // Synthesize the Initial under the state lock so a concurrent
251 // upstream emission cannot mutate state while we read it.
252 let initial = self.inner.snapshot_initial();
253 sink(&initial);
254 self.inner.add_subscriber(id, sink);
255 }
256
257 // Guard whose Drop removes this subscriber and, if last, releases all
258 // upstream guards.
259 let weak = Arc::downgrade(&self.inner);
260 vec![SubscriptionGuard::from_callback(move || {
261 let Some(inner) = weak.upgrade() else {
262 return;
263 };
264 let remaining = inner.remove_subscriber(id);
265 if remaining == 0 {
266 // Drain upstream guards outside any other lock: dropping a
267 // guard may itself acquire locks.
268 let drained: Vec<SubscriptionGuard> = {
269 let mut slot = inner
270 .upstream_guards
271 .lock()
272 .expect("share upstream_guards poisoned");
273 std::mem::take(&mut *slot)
274 };
275 drop(drained);
276 }
277 })]
278 }
279}
280
281#[allow(private_bounds)]
282impl<K, V> MapQuery<K, V> for SharedMapQuery<K, V>
283where
284 K: CellValue + Hash + Eq,
285 V: CellValue,
286{
287 // Default `materialize` is correct: it allocates a CellMap that subscribes
288 // through `install()` above. Each materialized leaf adds one fan-out
289 // subscriber; the share point's upstream plan installs exactly once.
290}
291
292/// Extension trait that adds [`share`](MapQueryShareExt::share) to any
293/// [`MapQuery`].
294///
295/// `share()` consumes the query and returns a Clone-able [`SharedMapQuery`]
296/// handle. Each clone of the handle, when materialized (or otherwise
297/// installed), adds one fan-out subscriber but does NOT add another upstream
298/// subscription — the share point installs upstream exactly once.
299pub trait MapQueryShareExt<K, V>: MapQuery<K, V>
300where
301 K: CellValue + Hash + Eq,
302 V: CellValue,
303{
304 /// Convert this map query into a Clone-able multicast handle.
305 fn share(self) -> SharedMapQuery<K, V> {
306 SharedMapQuery::new(self)
307 }
308}
309
310impl<K, V, Q> MapQueryShareExt<K, V> for Q
311where
312 K: CellValue + Hash + Eq,
313 V: CellValue,
314 Q: MapQuery<K, V>,
315{
316}