Skip to main content

hyphae/map_query/
share.rs

1//! Clone-able multicast handle that subscribes a [`MapQuery`] upstream lazily
2//! on the first downstream install, then fans diffs out to N consumers.
3//!
4//! Mirrors [`crate::pipeline::SharedPipeline`] for value pipelines, but the
5//! signal type is [`MapDiff`] and the upstream may produce multiple
6//! [`SubscriptionGuard`]s (one per root source).
7//!
8//! # Hot-path emission cost
9//!
10//! Each upstream diff costs:
11//!  * one Mutex-acquire to apply the diff to a small in-memory snapshot
12//!    (needed so late-binding subscribers can be replayed an Initial), and
13//!  * one [`ArcSwap::load`] to pull the subscriber list, plus N callback
14//!    invocations to fan the diff out.
15//!
16//! No mutex on the subscriber list during fanout, no per-emission allocation
17//! beyond what the in-place state mutation already needs. Compared to
18//! materialize-then-clone-cell — where each shared consumer would walk the
19//! full per-key cell + diffs_cell pipeline — this saves the per-share-point
20//! `CellMap` allocation and most of its wiring.
21//!
22//! # Lifecycle
23//!
24//! On the first downstream install, the share point consumes the wrapped
25//! upstream plan and installs once. The returned upstream guards are kept
26//! until the last shared subscriber drops, at which point they are released.
27//! The upstream plan was consumed on first install, so a fully drained
28//! `SharedMapQuery` does not reactivate — hold the handle (or one
29//! materialized leaf) for as long as you want the shared work to run.
30
31use std::{
32    collections::HashMap,
33    hash::Hash,
34    sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use uuid::Uuid;
39
40use crate::{
41    cell_map::MapDiff,
42    map_query::{MapDiffSink, MapQuery, MapQueryInstall},
43    subscription::SubscriptionGuard,
44    traits::CellValue,
45};
46
47type DiffSubscriber<K, V> = Arc<dyn Fn(&MapDiff<K, V>) + Send + Sync>;
48
49/// Type-erased one-shot installer for the wrapped upstream plan. `MapQuery`'s
50/// `install` consumes `self`, so we wrap a one-shot `FnOnce` in a slot and
51/// take it on the first downstream install.
52type UpstreamInstall<K, V> =
53    Box<dyn FnOnce(MapDiffSink<K, V>) -> Vec<SubscriptionGuard> + Send + Sync>;
54
55pub(crate) struct SharedMapQueryInner<K, V>
56where
57    K: CellValue + Hash + Eq,
58    V: CellValue,
59{
60    /// One-shot upstream installer. Taken on the first downstream install;
61    /// `None` thereafter. Drained share points do not reactivate.
62    upstream: Mutex<Option<UpstreamInstall<K, V>>>,
63    /// Held subscriptions on the upstream plan once installed. Released when
64    /// the last shared subscriber drops.
65    upstream_guards: Mutex<Vec<SubscriptionGuard>>,
66    /// In-memory snapshot of the joined / projected state. Applied on every
67    /// diff so a late-binding subscriber can be replayed an Initial without
68    /// re-running the upstream plan.
69    state: Mutex<HashMap<K, V>>,
70    /// Lock-free subscriber registry. Read on every emission (`.load()`),
71    /// rebuilt on subscribe / unsubscribe under the writer mutex.
72    subscribers: ArcSwap<Vec<(Uuid, DiffSubscriber<K, V>)>>,
73    /// Serializes mutation of `subscribers`. Emission readers do not acquire.
74    subs_writer: Mutex<()>,
75}
76
77impl<K, V> SharedMapQueryInner<K, V>
78where
79    K: CellValue + Hash + Eq,
80    V: CellValue,
81{
82    fn add_subscriber(&self, id: Uuid, cb: DiffSubscriber<K, V>) {
83        let _w = self.subs_writer.lock().expect("share subs_writer poisoned");
84        let current = self.subscribers.load();
85        let mut next = (**current).clone();
86        next.push((id, cb));
87        self.subscribers.store(Arc::new(next));
88    }
89
90    /// Returns the remaining subscriber count after removal.
91    fn remove_subscriber(&self, id: Uuid) -> usize {
92        let _w = self.subs_writer.lock().expect("share subs_writer poisoned");
93        let current = self.subscribers.load();
94        let mut next: Vec<(Uuid, DiffSubscriber<K, V>)> = (**current)
95            .iter()
96            .filter(|(i, _)| *i != id)
97            .cloned()
98            .collect();
99        let remaining = next.len();
100        next.shrink_to_fit();
101        self.subscribers.store(Arc::new(next));
102        remaining
103    }
104
105    /// Apply a diff to the in-memory snapshot. Called from the fanout closure
106    /// before downstream callbacks fire so that any concurrent late-binding
107    /// install sees a consistent state.
108    fn apply_diff(&self, diff: &MapDiff<K, V>) {
109        let mut state = self.state.lock().expect("share state poisoned");
110        Self::apply_diff_into(&mut state, diff);
111    }
112
113    fn apply_diff_into(state: &mut HashMap<K, V>, diff: &MapDiff<K, V>) {
114        match diff {
115            MapDiff::Initial { entries } => {
116                state.clear();
117                state.reserve(entries.len());
118                for (k, v) in entries {
119                    state.insert(k.clone(), v.clone());
120                }
121            }
122            MapDiff::Insert { key, value } => {
123                state.insert(key.clone(), value.clone());
124            }
125            MapDiff::Remove { key, .. } => {
126                state.remove(key);
127            }
128            MapDiff::Update { key, new_value, .. } => {
129                state.insert(key.clone(), new_value.clone());
130            }
131            MapDiff::Batch { changes } => {
132                for c in changes {
133                    Self::apply_diff_into(state, c);
134                }
135            }
136        }
137    }
138
139    fn snapshot_initial(&self) -> MapDiff<K, V> {
140        let state = self.state.lock().expect("share state poisoned");
141        let entries: Vec<(K, V)> = state.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
142        MapDiff::Initial { entries }
143    }
144}
145
146/// Clone-able multicast handle for fanning a [`MapQuery`] out to many
147/// consumers without an intermediate [`crate::CellMap`].
148///
149/// Cloning is an `Arc` bump and does not subscribe upstream. Subscription
150/// happens lazily on the first downstream `materialize()` (or other install).
151/// Once installed, the upstream subscription stays live until the last
152/// shared subscriber drops.
153pub struct SharedMapQuery<K, V>
154where
155    K: CellValue + Hash + Eq,
156    V: CellValue,
157{
158    inner: Arc<SharedMapQueryInner<K, V>>,
159}
160
161impl<K, V> Clone for SharedMapQuery<K, V>
162where
163    K: CellValue + Hash + Eq,
164    V: CellValue,
165{
166    fn clone(&self) -> Self {
167        Self {
168            inner: Arc::clone(&self.inner),
169        }
170    }
171}
172
173impl<K, V> SharedMapQuery<K, V>
174where
175    K: CellValue + Hash + Eq,
176    V: CellValue,
177{
178    /// Wrap a map query in a shared, multicast handle.
179    ///
180    /// Prefer the [`MapQueryShareExt::share`] extension method — it reads as
181    /// `query.share()` at the call site.
182    pub fn new<Q: MapQuery<K, V>>(q: Q) -> Self {
183        let upstream: UpstreamInstall<K, V> = Box::new(move |sink| q.install(sink));
184        Self {
185            inner: Arc::new(SharedMapQueryInner {
186                upstream: Mutex::new(Some(upstream)),
187                upstream_guards: Mutex::new(Vec::new()),
188                state: Mutex::new(HashMap::new()),
189                subscribers: ArcSwap::from_pointee(Vec::new()),
190                subs_writer: Mutex::new(()),
191            }),
192        }
193    }
194}
195
196impl<K, V> MapQueryInstall<K, V> for SharedMapQuery<K, V>
197where
198    K: CellValue + Hash + Eq,
199    V: CellValue,
200{
201    fn install(self, sink: MapDiffSink<K, V>) -> Vec<SubscriptionGuard> {
202        let id = Uuid::new_v4();
203
204        // Decide whether this is the first install. If the upstream slot is
205        // still populated, we will:
206        //   1. Register `sink` first so the synchronous Initial diff emitted
207        //      by upstream during install reaches it.
208        //   2. Take and run the upstream installer.
209        //   3. Stash the returned guards.
210        //
211        // If the upstream has already been installed by an earlier consumer,
212        // we instead:
213        //   1. Register `sink`.
214        //   2. Synthesize an Initial from our state snapshot and deliver it
215        //      to the new `sink` so it sees a coherent starting state.
216        let upstream_take = {
217            let mut slot = self.inner.upstream.lock().expect("share upstream poisoned");
218            slot.take()
219        };
220
221        if let Some(install_fn) = upstream_take {
222            self.inner.add_subscriber(id, sink);
223            let weak = Arc::downgrade(&self.inner);
224            let fanout: MapDiffSink<K, V> = Arc::new(move |diff: &MapDiff<K, V>| {
225                let Some(inner) = weak.upgrade() else {
226                    return;
227                };
228                // 1. Update internal state under its own Mutex.
229                inner.apply_diff(diff);
230                // 2. Lock-free fanout to the subscriber snapshot.
231                let subs = inner.subscribers.load();
232                for (_, cb) in subs.iter() {
233                    cb(diff);
234                }
235            });
236            let guards = install_fn(fanout);
237            let mut slot = self
238                .inner
239                .upstream_guards
240                .lock()
241                .expect("share upstream_guards poisoned");
242            slot.extend(guards);
243        } else {
244            // Replay current state to the late-binding subscriber, then
245            // register so it picks up subsequent diffs. Order matters: if we
246            // registered first, an upstream emission landing between
247            // registration and our manual Initial would arrive before the
248            // Initial — out-of-order.
249            //
250            // Synthesize the Initial under the state lock so a concurrent
251            // upstream emission cannot mutate state while we read it.
252            let initial = self.inner.snapshot_initial();
253            sink(&initial);
254            self.inner.add_subscriber(id, sink);
255        }
256
257        // Guard whose Drop removes this subscriber and, if last, releases all
258        // upstream guards.
259        let weak = Arc::downgrade(&self.inner);
260        vec![SubscriptionGuard::from_callback(move || {
261            let Some(inner) = weak.upgrade() else {
262                return;
263            };
264            let remaining = inner.remove_subscriber(id);
265            if remaining == 0 {
266                // Drain upstream guards outside any other lock: dropping a
267                // guard may itself acquire locks.
268                let drained: Vec<SubscriptionGuard> = {
269                    let mut slot = inner
270                        .upstream_guards
271                        .lock()
272                        .expect("share upstream_guards poisoned");
273                    std::mem::take(&mut *slot)
274                };
275                drop(drained);
276            }
277        })]
278    }
279}
280
281#[allow(private_bounds)]
282impl<K, V> MapQuery<K, V> for SharedMapQuery<K, V>
283where
284    K: CellValue + Hash + Eq,
285    V: CellValue,
286{
287    // Default `materialize` is correct: it allocates a CellMap that subscribes
288    // through `install()` above. Each materialized leaf adds one fan-out
289    // subscriber; the share point's upstream plan installs exactly once.
290}
291
292/// Extension trait that adds [`share`](MapQueryShareExt::share) to any
293/// [`MapQuery`].
294///
295/// `share()` consumes the query and returns a Clone-able [`SharedMapQuery`]
296/// handle. Each clone of the handle, when materialized (or otherwise
297/// installed), adds one fan-out subscriber but does NOT add another upstream
298/// subscription — the share point installs upstream exactly once.
299pub trait MapQueryShareExt<K, V>: MapQuery<K, V>
300where
301    K: CellValue + Hash + Eq,
302    V: CellValue,
303{
304    /// Convert this map query into a Clone-able multicast handle.
305    fn share(self) -> SharedMapQuery<K, V> {
306        SharedMapQuery::new(self)
307    }
308}
309
310impl<K, V, Q> MapQueryShareExt<K, V> for Q
311where
312    K: CellValue + Hash + Eq,
313    V: CellValue,
314    Q: MapQuery<K, V>,
315{
316}