Skip to main content

palimpsest_dataflow/palimpsest/
materialization.rs

1//! Partial materialization key-set tracking.
2//!
3//! When an arrangement is materialized partially — only for the subset of
4//! keys that have been requested — consumers must know whether a key is
5//! already covered. If not, an *upquery* is issued to fetch missing rows
6//! from the upstream system before the consumer can serve a result.
7//!
8//! This module owns the key-set bookkeeping. The actual upquery dispatch
9//! lives in `crate::palimpsest::upquery`.
10
11use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
12
13/// Status of a key against a materialized arrangement.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum KeyStatus {
16    /// The key is fully materialized and the consumer can serve immediately.
17    Materialized,
18    /// The key is in flight: an upquery has been issued but is not yet complete.
19    Pending,
20    /// The key is not in the arrangement and no upquery has been issued.
21    Missing,
22}
23
24/// Outcome of probing a key against the tracker.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum LookupOutcome<K>
27where
28    K: Ord + Clone,
29{
30    /// The key is materialized; the consumer can read it now.
31    Hit(K),
32    /// An upquery was just enqueued; the caller must wait for it to land.
33    Upquery(K),
34    /// The key was already pending — wait, do not re-issue.
35    AlreadyPending(K),
36}
37
38impl<K> LookupOutcome<K>
39where
40    K: Ord + Clone,
41{
42    /// Returns the key, regardless of outcome variant.
43    #[must_use]
44    pub const fn key(&self) -> &K {
45        match self {
46            Self::Hit(key) | Self::Upquery(key) | Self::AlreadyPending(key) => key,
47        }
48    }
49}
50
51/// Tracks which keys are materialized (and which are pending an upquery).
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct MaterializedKeys<K>
54where
55    K: Ord + Clone,
56{
57    materialized: BTreeSet<K>,
58    pending: BTreeSet<K>,
59    pending_waiters: BTreeMap<K, usize>,
60}
61
62impl<K> Default for MaterializedKeys<K>
63where
64    K: Ord + Clone,
65{
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl<K> MaterializedKeys<K>
72where
73    K: Ord + Clone,
74{
75    /// Creates an empty tracker.
76    #[must_use]
77    pub const fn new() -> Self {
78        Self {
79            materialized: BTreeSet::new(),
80            pending: BTreeSet::new(),
81            pending_waiters: BTreeMap::new(),
82        }
83    }
84
85    /// Reports a key's status without mutating the tracker.
86    #[must_use]
87    pub fn status(&self, key: &K) -> KeyStatus {
88        if self.materialized.contains(key) {
89            KeyStatus::Materialized
90        } else if self.pending.contains(key) {
91            KeyStatus::Pending
92        } else {
93            KeyStatus::Missing
94        }
95    }
96
97    /// Probes the tracker for `key`. Missing keys are transitioned to pending and
98    /// an `Upquery` outcome is returned so the caller can dispatch the upquery.
99    pub fn lookup(&mut self, key: K) -> LookupOutcome<K> {
100        if self.materialized.contains(&key) {
101            return LookupOutcome::Hit(key);
102        }
103        if self.pending.contains(&key) {
104            *self.pending_waiters.entry(key.clone()).or_insert(0) += 1;
105            return LookupOutcome::AlreadyPending(key);
106        }
107        self.pending.insert(key.clone());
108        self.pending_waiters.insert(key.clone(), 1);
109        LookupOutcome::Upquery(key)
110    }
111
112    /// Marks `key` as fully materialized. Returns the number of waiters that
113    /// had been blocked on the upquery (so callers can wake them).
114    pub fn mark_materialized(&mut self, key: K) -> usize {
115        let waiters = self.pending_waiters.remove(&key).unwrap_or(0);
116        self.pending.remove(&key);
117        self.materialized.insert(key);
118        waiters
119    }
120
121    /// Bulk-marks several keys as materialized; returns total waiter count.
122    pub fn mark_many_materialized<I>(&mut self, keys: I) -> usize
123    where
124        I: IntoIterator<Item = K>,
125    {
126        keys.into_iter()
127            .map(|key| self.mark_materialized(key))
128            .sum()
129    }
130
131    /// Drops `key` from the materialized set (for example, after eviction).
132    pub fn forget(&mut self, key: &K) -> bool {
133        self.materialized.remove(key)
134    }
135
136    /// Aborts a pending upquery for `key`, transitioning it back to `Missing`.
137    pub fn fail_pending(&mut self, key: &K) -> usize {
138        let waiters = self.pending_waiters.remove(key).unwrap_or(0);
139        self.pending.remove(key);
140        waiters
141    }
142
143    /// Returns the materialized key set.
144    #[must_use]
145    pub const fn materialized(&self) -> &BTreeSet<K> {
146        &self.materialized
147    }
148
149    /// Returns the pending key set.
150    #[must_use]
151    pub const fn pending(&self) -> &BTreeSet<K> {
152        &self.pending
153    }
154
155    /// Returns the number of materialized keys.
156    #[must_use]
157    pub fn materialized_len(&self) -> usize {
158        self.materialized.len()
159    }
160
161    /// Returns the number of pending keys.
162    #[must_use]
163    pub fn pending_len(&self) -> usize {
164        self.pending.len()
165    }
166
167    /// Returns true when nothing is materialized or pending.
168    #[must_use]
169    pub fn is_empty(&self) -> bool {
170        self.materialized.is_empty() && self.pending.is_empty()
171    }
172}
173
174/// Result of probing a batch of keys.
175#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct BatchLookup<K>
177where
178    K: Ord + Clone,
179{
180    /// Keys that are already materialized.
181    pub hits: Vec<K>,
182    /// Keys that need an upquery (caller should issue these).
183    pub upqueries: Vec<K>,
184    /// Keys that already have an upquery in flight.
185    pub already_pending: Vec<K>,
186}
187
188impl<K> MaterializedKeys<K>
189where
190    K: Ord + Clone,
191{
192    /// Probes a batch of keys, splitting them into hit / upquery / pending sets.
193    pub fn lookup_batch<I>(&mut self, keys: I) -> BatchLookup<K>
194    where
195        I: IntoIterator<Item = K>,
196    {
197        let mut batch = BatchLookup {
198            hits: Vec::new(),
199            upqueries: Vec::new(),
200            already_pending: Vec::new(),
201        };
202        for key in keys {
203            match self.lookup(key) {
204                LookupOutcome::Hit(key) => batch.hits.push(key),
205                LookupOutcome::Upquery(key) => batch.upqueries.push(key),
206                LookupOutcome::AlreadyPending(key) => batch.already_pending.push(key),
207            }
208        }
209        batch
210    }
211}
212
213/// Multi-arrangement tracker keyed by arrangement name.
214#[derive(Debug, Clone, Default)]
215pub struct MaterializationTracker<K>
216where
217    K: Ord + Clone,
218{
219    arrangements: BTreeMap<String, MaterializedKeys<K>>,
220}
221
222impl<K> MaterializationTracker<K>
223where
224    K: Ord + Clone,
225{
226    /// Creates an empty multi-arrangement tracker.
227    #[must_use]
228    pub const fn new() -> Self {
229        Self {
230            arrangements: BTreeMap::new(),
231        }
232    }
233
234    /// Registers a fresh arrangement, returning an error if `name` is taken.
235    pub fn register(&mut self, name: impl Into<String>) -> Result<(), ArrangementAlreadyTracked> {
236        let name = name.into();
237        match self.arrangements.entry(name) {
238            Entry::Vacant(slot) => {
239                slot.insert(MaterializedKeys::new());
240                Ok(())
241            }
242            Entry::Occupied(slot) => Err(ArrangementAlreadyTracked {
243                name: slot.key().clone(),
244            }),
245        }
246    }
247
248    /// Returns a mutable view of an arrangement's tracker.
249    #[must_use]
250    pub fn arrangement_mut(&mut self, name: &str) -> Option<&mut MaterializedKeys<K>> {
251        self.arrangements.get_mut(name)
252    }
253
254    /// Returns an immutable view of an arrangement's tracker.
255    #[must_use]
256    pub fn arrangement(&self, name: &str) -> Option<&MaterializedKeys<K>> {
257        self.arrangements.get(name)
258    }
259
260    /// Returns the number of arrangements being tracked.
261    #[must_use]
262    pub fn len(&self) -> usize {
263        self.arrangements.len()
264    }
265
266    /// Returns true when no arrangements are tracked.
267    #[must_use]
268    pub fn is_empty(&self) -> bool {
269        self.arrangements.is_empty()
270    }
271
272    /// Drops `name` from the tracker.
273    pub fn forget_arrangement(&mut self, name: &str) -> Option<MaterializedKeys<K>> {
274        self.arrangements.remove(name)
275    }
276}
277
278/// Error returned when registering two arrangements under the same name.
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct ArrangementAlreadyTracked {
281    /// Name that was already registered.
282    pub name: String,
283}
284
285impl std::fmt::Display for ArrangementAlreadyTracked {
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        write!(f, "arrangement '{}' is already tracked", self.name)
288    }
289}
290
291impl std::error::Error for ArrangementAlreadyTracked {}
292
293#[cfg(test)]
294mod tests {
295    use super::{
296        ArrangementAlreadyTracked, BatchLookup, KeyStatus, LookupOutcome, MaterializationTracker,
297        MaterializedKeys,
298    };
299
300    #[test]
301    fn lookup_promotes_missing_keys_to_pending_and_returns_upquery() {
302        let mut keys = MaterializedKeys::<u64>::new();
303
304        assert_eq!(keys.lookup(7), LookupOutcome::Upquery(7));
305        assert_eq!(keys.status(&7), KeyStatus::Pending);
306        assert_eq!(keys.lookup(7), LookupOutcome::AlreadyPending(7));
307        assert_eq!(keys.pending_len(), 1);
308    }
309
310    #[test]
311    fn mark_materialized_returns_waiter_count_and_clears_pending() {
312        let mut keys = MaterializedKeys::<u64>::new();
313
314        let _ = keys.lookup(7);
315        let _ = keys.lookup(7);
316        assert_eq!(keys.mark_materialized(7), 2);
317        assert_eq!(keys.status(&7), KeyStatus::Materialized);
318        assert_eq!(keys.pending_len(), 0);
319    }
320
321    #[test]
322    fn fail_pending_resets_state_and_reports_waiters() {
323        let mut keys = MaterializedKeys::<u64>::new();
324
325        let _ = keys.lookup(7);
326        let _ = keys.lookup(7);
327        assert_eq!(keys.fail_pending(&7), 2);
328        assert_eq!(keys.status(&7), KeyStatus::Missing);
329    }
330
331    #[test]
332    fn forget_drops_materialized_keys() {
333        let mut keys = MaterializedKeys::<u64>::new();
334        let _ = keys.lookup(7);
335        keys.mark_materialized(7);
336        assert!(keys.forget(&7));
337        assert_eq!(keys.status(&7), KeyStatus::Missing);
338    }
339
340    #[test]
341    fn lookup_batch_splits_by_outcome() {
342        let mut keys = MaterializedKeys::<u64>::new();
343        let _ = keys.lookup(1);
344        keys.mark_materialized(1);
345        let _ = keys.lookup(2);
346
347        let BatchLookup {
348            hits,
349            upqueries,
350            already_pending,
351        } = keys.lookup_batch([1_u64, 2, 3]);
352        assert_eq!(hits, [1]);
353        assert_eq!(already_pending, [2]);
354        assert_eq!(upqueries, [3]);
355    }
356
357    #[test]
358    fn bulk_mark_returns_aggregate_waiter_count() {
359        let mut keys = MaterializedKeys::<u64>::new();
360        let _ = keys.lookup(1);
361        let _ = keys.lookup(1);
362        let _ = keys.lookup(2);
363        assert_eq!(keys.mark_many_materialized([1, 2, 3]), 3);
364    }
365
366    #[test]
367    fn tracker_routes_lookups_per_arrangement() {
368        let mut tracker = MaterializationTracker::<u64>::new();
369        tracker.register("posts-by-author").unwrap();
370        let posts = tracker.arrangement_mut("posts-by-author").unwrap();
371        assert_eq!(posts.lookup(7), LookupOutcome::Upquery(7));
372        assert_eq!(
373            tracker
374                .arrangement("posts-by-author")
375                .map(MaterializedKeys::pending_len),
376            Some(1)
377        );
378    }
379
380    #[test]
381    fn duplicate_registration_errors() {
382        let mut tracker = MaterializationTracker::<u64>::new();
383        tracker.register("a").unwrap();
384        let err = tracker.register("a").unwrap_err();
385        assert_eq!(err, ArrangementAlreadyTracked { name: "a".into() });
386    }
387}