palimpsest_dataflow/palimpsest/
materialization.rs1use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum KeyStatus {
16 Materialized,
18 Pending,
20 Missing,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum LookupOutcome<K>
27where
28 K: Ord + Clone,
29{
30 Hit(K),
32 Upquery(K),
34 AlreadyPending(K),
36}
37
38impl<K> LookupOutcome<K>
39where
40 K: Ord + Clone,
41{
42 #[must_use]
44 pub const fn key(&self) -> &K {
45 match self {
46 Self::Hit(key) | Self::Upquery(key) | Self::AlreadyPending(key) => key,
47 }
48 }
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct MaterializedKeys<K>
54where
55 K: Ord + Clone,
56{
57 materialized: BTreeSet<K>,
58 pending: BTreeSet<K>,
59 pending_waiters: BTreeMap<K, usize>,
60}
61
62impl<K> Default for MaterializedKeys<K>
63where
64 K: Ord + Clone,
65{
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl<K> MaterializedKeys<K>
72where
73 K: Ord + Clone,
74{
75 #[must_use]
77 pub const fn new() -> Self {
78 Self {
79 materialized: BTreeSet::new(),
80 pending: BTreeSet::new(),
81 pending_waiters: BTreeMap::new(),
82 }
83 }
84
85 #[must_use]
87 pub fn status(&self, key: &K) -> KeyStatus {
88 if self.materialized.contains(key) {
89 KeyStatus::Materialized
90 } else if self.pending.contains(key) {
91 KeyStatus::Pending
92 } else {
93 KeyStatus::Missing
94 }
95 }
96
97 pub fn lookup(&mut self, key: K) -> LookupOutcome<K> {
100 if self.materialized.contains(&key) {
101 return LookupOutcome::Hit(key);
102 }
103 if self.pending.contains(&key) {
104 *self.pending_waiters.entry(key.clone()).or_insert(0) += 1;
105 return LookupOutcome::AlreadyPending(key);
106 }
107 self.pending.insert(key.clone());
108 self.pending_waiters.insert(key.clone(), 1);
109 LookupOutcome::Upquery(key)
110 }
111
112 pub fn mark_materialized(&mut self, key: K) -> usize {
115 let waiters = self.pending_waiters.remove(&key).unwrap_or(0);
116 self.pending.remove(&key);
117 self.materialized.insert(key);
118 waiters
119 }
120
121 pub fn mark_many_materialized<I>(&mut self, keys: I) -> usize
123 where
124 I: IntoIterator<Item = K>,
125 {
126 keys.into_iter()
127 .map(|key| self.mark_materialized(key))
128 .sum()
129 }
130
131 pub fn forget(&mut self, key: &K) -> bool {
133 self.materialized.remove(key)
134 }
135
136 pub fn fail_pending(&mut self, key: &K) -> usize {
138 let waiters = self.pending_waiters.remove(key).unwrap_or(0);
139 self.pending.remove(key);
140 waiters
141 }
142
143 #[must_use]
145 pub const fn materialized(&self) -> &BTreeSet<K> {
146 &self.materialized
147 }
148
149 #[must_use]
151 pub const fn pending(&self) -> &BTreeSet<K> {
152 &self.pending
153 }
154
155 #[must_use]
157 pub fn materialized_len(&self) -> usize {
158 self.materialized.len()
159 }
160
161 #[must_use]
163 pub fn pending_len(&self) -> usize {
164 self.pending.len()
165 }
166
167 #[must_use]
169 pub fn is_empty(&self) -> bool {
170 self.materialized.is_empty() && self.pending.is_empty()
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
176pub struct BatchLookup<K>
177where
178 K: Ord + Clone,
179{
180 pub hits: Vec<K>,
182 pub upqueries: Vec<K>,
184 pub already_pending: Vec<K>,
186}
187
188impl<K> MaterializedKeys<K>
189where
190 K: Ord + Clone,
191{
192 pub fn lookup_batch<I>(&mut self, keys: I) -> BatchLookup<K>
194 where
195 I: IntoIterator<Item = K>,
196 {
197 let mut batch = BatchLookup {
198 hits: Vec::new(),
199 upqueries: Vec::new(),
200 already_pending: Vec::new(),
201 };
202 for key in keys {
203 match self.lookup(key) {
204 LookupOutcome::Hit(key) => batch.hits.push(key),
205 LookupOutcome::Upquery(key) => batch.upqueries.push(key),
206 LookupOutcome::AlreadyPending(key) => batch.already_pending.push(key),
207 }
208 }
209 batch
210 }
211}
212
213#[derive(Debug, Clone, Default)]
215pub struct MaterializationTracker<K>
216where
217 K: Ord + Clone,
218{
219 arrangements: BTreeMap<String, MaterializedKeys<K>>,
220}
221
222impl<K> MaterializationTracker<K>
223where
224 K: Ord + Clone,
225{
226 #[must_use]
228 pub const fn new() -> Self {
229 Self {
230 arrangements: BTreeMap::new(),
231 }
232 }
233
234 pub fn register(&mut self, name: impl Into<String>) -> Result<(), ArrangementAlreadyTracked> {
236 let name = name.into();
237 match self.arrangements.entry(name) {
238 Entry::Vacant(slot) => {
239 slot.insert(MaterializedKeys::new());
240 Ok(())
241 }
242 Entry::Occupied(slot) => Err(ArrangementAlreadyTracked {
243 name: slot.key().clone(),
244 }),
245 }
246 }
247
248 #[must_use]
250 pub fn arrangement_mut(&mut self, name: &str) -> Option<&mut MaterializedKeys<K>> {
251 self.arrangements.get_mut(name)
252 }
253
254 #[must_use]
256 pub fn arrangement(&self, name: &str) -> Option<&MaterializedKeys<K>> {
257 self.arrangements.get(name)
258 }
259
260 #[must_use]
262 pub fn len(&self) -> usize {
263 self.arrangements.len()
264 }
265
266 #[must_use]
268 pub fn is_empty(&self) -> bool {
269 self.arrangements.is_empty()
270 }
271
272 pub fn forget_arrangement(&mut self, name: &str) -> Option<MaterializedKeys<K>> {
274 self.arrangements.remove(name)
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct ArrangementAlreadyTracked {
281 pub name: String,
283}
284
285impl std::fmt::Display for ArrangementAlreadyTracked {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 write!(f, "arrangement '{}' is already tracked", self.name)
288 }
289}
290
291impl std::error::Error for ArrangementAlreadyTracked {}
292
293#[cfg(test)]
294mod tests {
295 use super::{
296 ArrangementAlreadyTracked, BatchLookup, KeyStatus, LookupOutcome, MaterializationTracker,
297 MaterializedKeys,
298 };
299
300 #[test]
301 fn lookup_promotes_missing_keys_to_pending_and_returns_upquery() {
302 let mut keys = MaterializedKeys::<u64>::new();
303
304 assert_eq!(keys.lookup(7), LookupOutcome::Upquery(7));
305 assert_eq!(keys.status(&7), KeyStatus::Pending);
306 assert_eq!(keys.lookup(7), LookupOutcome::AlreadyPending(7));
307 assert_eq!(keys.pending_len(), 1);
308 }
309
310 #[test]
311 fn mark_materialized_returns_waiter_count_and_clears_pending() {
312 let mut keys = MaterializedKeys::<u64>::new();
313
314 let _ = keys.lookup(7);
315 let _ = keys.lookup(7);
316 assert_eq!(keys.mark_materialized(7), 2);
317 assert_eq!(keys.status(&7), KeyStatus::Materialized);
318 assert_eq!(keys.pending_len(), 0);
319 }
320
321 #[test]
322 fn fail_pending_resets_state_and_reports_waiters() {
323 let mut keys = MaterializedKeys::<u64>::new();
324
325 let _ = keys.lookup(7);
326 let _ = keys.lookup(7);
327 assert_eq!(keys.fail_pending(&7), 2);
328 assert_eq!(keys.status(&7), KeyStatus::Missing);
329 }
330
331 #[test]
332 fn forget_drops_materialized_keys() {
333 let mut keys = MaterializedKeys::<u64>::new();
334 let _ = keys.lookup(7);
335 keys.mark_materialized(7);
336 assert!(keys.forget(&7));
337 assert_eq!(keys.status(&7), KeyStatus::Missing);
338 }
339
340 #[test]
341 fn lookup_batch_splits_by_outcome() {
342 let mut keys = MaterializedKeys::<u64>::new();
343 let _ = keys.lookup(1);
344 keys.mark_materialized(1);
345 let _ = keys.lookup(2);
346
347 let BatchLookup {
348 hits,
349 upqueries,
350 already_pending,
351 } = keys.lookup_batch([1_u64, 2, 3]);
352 assert_eq!(hits, [1]);
353 assert_eq!(already_pending, [2]);
354 assert_eq!(upqueries, [3]);
355 }
356
357 #[test]
358 fn bulk_mark_returns_aggregate_waiter_count() {
359 let mut keys = MaterializedKeys::<u64>::new();
360 let _ = keys.lookup(1);
361 let _ = keys.lookup(1);
362 let _ = keys.lookup(2);
363 assert_eq!(keys.mark_many_materialized([1, 2, 3]), 3);
364 }
365
366 #[test]
367 fn tracker_routes_lookups_per_arrangement() {
368 let mut tracker = MaterializationTracker::<u64>::new();
369 tracker.register("posts-by-author").unwrap();
370 let posts = tracker.arrangement_mut("posts-by-author").unwrap();
371 assert_eq!(posts.lookup(7), LookupOutcome::Upquery(7));
372 assert_eq!(
373 tracker
374 .arrangement("posts-by-author")
375 .map(MaterializedKeys::pending_len),
376 Some(1)
377 );
378 }
379
380 #[test]
381 fn duplicate_registration_errors() {
382 let mut tracker = MaterializationTracker::<u64>::new();
383 tracker.register("a").unwrap();
384 let err = tracker.register("a").unwrap_err();
385 assert_eq!(err, ArrangementAlreadyTracked { name: "a".into() });
386 }
387}