Skip to main content

dbsp/operator/dynamic/time_series/
partitioned.rs

1//! Traits and types that represent partitioned collections.
2//!
3//! Time series data typically uses two levels of indexing: by entity id (e.g.,
4//! user, tenant, session, etc.) and by time.  We refer to the former as
5//! partitioning, as it partitions a single collection into multiple
6//! indexed collections.  Our basic batch API doesn't directly support
7//! two-level indexing, so we emulate it by storing the secondary key
8//! with data.  The resulting collection is efficiently searchable
9//! first by the partition key and within each partition by the secondary
10//! key, e.g., timestamp.
11
12use dyn_clone::clone_box;
13
14use crate::{
15    algebra::{IndexedZSet, OrdIndexedZSet},
16    dynamic::{DataTrait, DynPair, Factory, WeightTrait},
17    trace::{Batch, BatchReader, Cursor, cursor::Position},
18};
19use std::marker::PhantomData;
20
21/// Read interface to collections with two levels of indexing.
22///
23/// Models a partitioned collection as a `BatchReader` indexed
24/// (partitioned) by `BatchReader::Key` and by `K` within each partition.
25pub trait PartitionedBatchReader<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
26    BatchReader<Val = DynPair<K, V>, Time = ()>
27{
28}
29impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedBatchReader<K, V> for B where
30    B: BatchReader<Val = DynPair<K, V>, Time = ()>
31{
32}
33
34/// Read/write API to partitioned data (see [`PartitionedBatchReader`]).
35pub trait PartitionedBatch<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
36    Batch<Val = DynPair<K, V>, Time = ()>
37{
38}
39impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedBatch<K, V> for B where
40    B: Batch<Val = DynPair<K, V>, Time = ()>
41{
42}
43
44pub trait PartitionedIndexedZSet<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
45    IndexedZSet<Val = DynPair<K, V>>
46{
47}
48
49impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedIndexedZSet<K, V> for B where
50    B: IndexedZSet<Val = DynPair<K, V>>
51{
52}
53
54/// Cursor over a single partition of a partitioned batch.
55///
56/// Iterates over a single partition of a partitioned collection.
57pub struct PartitionCursor<'b, PK, K, V, R, C>
58where
59    PK: DataTrait + ?Sized,
60    K: DataTrait + ?Sized,
61    V: DataTrait + ?Sized,
62    R: WeightTrait + ?Sized,
63{
64    cursor: &'b mut C,
65    key: Box<K>,
66    weight: Box<R>,
67    phantom: PhantomData<fn(&PK, &V, &R)>,
68}
69
70impl<'b, PK, K, V, R, C> PartitionCursor<'b, PK, K, V, R, C>
71where
72    C: Cursor<PK, DynPair<K, V>, (), R>,
73    PK: DataTrait + ?Sized,
74    K: DataTrait + ?Sized,
75    V: DataTrait + ?Sized,
76    R: WeightTrait + ?Sized,
77{
78    pub fn new(cursor: &'b mut C) -> Self {
79        let key = clone_box(cursor.val().fst());
80        let weight = clone_box(cursor.weight());
81        Self {
82            cursor,
83            key,
84            weight,
85            phantom: PhantomData,
86        }
87    }
88}
89
90impl<C, PK, K, V, R> Cursor<K, V, (), R> for PartitionCursor<'_, PK, K, V, R, C>
91where
92    C: Cursor<PK, DynPair<K, V>, (), R>,
93    PK: DataTrait + ?Sized,
94    K: DataTrait + ?Sized,
95    V: DataTrait + ?Sized,
96    R: WeightTrait + ?Sized,
97{
98    fn weight_factory(&self) -> &'static dyn Factory<R> {
99        self.cursor.weight_factory()
100    }
101
102    fn key_valid(&self) -> bool {
103        self.cursor.val_valid()
104    }
105
106    fn val_valid(&self) -> bool {
107        self.cursor.val_valid() && self.cursor.val().fst() == self.key.as_ref()
108    }
109
110    fn key(&self) -> &K {
111        &self.key
112    }
113
114    fn val(&self) -> &V {
115        self.cursor.val().snd()
116    }
117
118    fn map_times(&mut self, logic: &mut dyn FnMut(&(), &R)) {
119        self.cursor.map_times(logic)
120    }
121
122    fn map_times_through(&mut self, upper: &(), logic: &mut dyn FnMut(&(), &R)) {
123        self.cursor.map_times_through(upper, logic)
124    }
125
126    fn weight(&mut self) -> &R {
127        self.cursor.weight()
128    }
129
130    fn weight_checked(&mut self) -> &R {
131        self.weight()
132    }
133
134    fn map_values(&mut self, logic: &mut dyn FnMut(&V, &R)) {
135        while self.cursor.val_valid() {
136            if self.cursor.val().fst() == self.key.as_ref() {
137                self.cursor.weight().clone_to(&mut self.weight);
138                logic(self.cursor.val().snd(), &self.weight);
139                self.cursor.step_val();
140            } else {
141                self.cursor.val().fst().clone_to(&mut self.key);
142                break;
143            }
144        }
145    }
146
147    fn step_key(&mut self) {
148        while self.cursor.val_valid() {
149            if self.cursor.val().fst() == self.key.as_ref() {
150                self.cursor.step_val();
151            } else {
152                self.cursor.val().fst().clone_to(&mut self.key);
153                break;
154            }
155        }
156    }
157
158    fn step_key_reverse(&mut self) {
159        while self.cursor.val_valid() {
160            if self.cursor.val().fst() == self.key.as_ref() {
161                self.cursor.step_val_reverse();
162            } else {
163                self.cursor.val().fst().clone_to(&mut self.key);
164                break;
165            }
166        }
167    }
168
169    fn seek_key(&mut self, key: &K) {
170        self.cursor.seek_val_with(&|kv| kv.fst() >= key);
171        if self.cursor.val_valid() {
172            self.cursor.val().fst().clone_to(&mut self.key);
173        }
174    }
175
176    fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
177        self.seek_key(key);
178        self.key_valid() && self.key().eq(key)
179    }
180
181    fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
182        self.cursor.seek_val_with(&|kv| predicate(kv.fst()));
183        if self.cursor.val_valid() {
184            self.cursor.val().fst().clone_to(&mut self.key);
185        }
186    }
187
188    fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
189        self.cursor.seek_val_with_reverse(&|kv| predicate(kv.fst()));
190        if self.cursor.val_valid() {
191            self.cursor.val().fst().clone_to(&mut self.key);
192        }
193    }
194
195    fn seek_key_reverse(&mut self, key: &K) {
196        self.cursor.seek_val_with_reverse(&|kv| kv.fst() <= key);
197        if self.cursor.val_valid() {
198            self.cursor.val().fst().clone_to(&mut *self.key);
199        }
200    }
201
202    fn step_val(&mut self) {
203        self.cursor.step_val();
204    }
205
206    fn seek_val(&mut self, _val: &V) {
207        unimplemented!()
208    }
209
210    fn seek_val_with(&mut self, _predicate: &dyn Fn(&V) -> bool) {
211        unimplemented!()
212    }
213
214    fn rewind_keys(&mut self) {
215        debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
216
217        self.cursor.rewind_vals();
218        self.cursor.val().fst().clone_to(&mut self.key);
219    }
220
221    fn fast_forward_keys(&mut self) {
222        debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
223        self.cursor.fast_forward_vals();
224        self.cursor.val().fst().clone_to(&mut self.key);
225    }
226
227    fn rewind_vals(&mut self) {
228        unimplemented!()
229    }
230
231    fn step_val_reverse(&mut self) {
232        self.cursor.step_val_reverse();
233    }
234
235    fn seek_val_reverse(&mut self, _val: &V) {
236        unimplemented!()
237    }
238
239    fn seek_val_with_reverse(&mut self, _predicate: &dyn Fn(&V) -> bool) {
240        unimplemented!()
241    }
242
243    fn fast_forward_vals(&mut self) {
244        unimplemented!()
245    }
246
247    fn position(&self) -> Option<Position> {
248        None
249    }
250}
251
252pub type OrdPartitionedIndexedZSet<PK, TS, V> = OrdIndexedZSet<PK, DynPair<TS, V>>;