dbsp/operator/dynamic/time_series/
partitioned.rs1use dyn_clone::clone_box;
13
14use crate::{
15 algebra::{IndexedZSet, OrdIndexedZSet},
16 dynamic::{DataTrait, DynPair, Factory, WeightTrait},
17 trace::{Batch, BatchReader, Cursor, cursor::Position},
18};
19use std::marker::PhantomData;
20
21pub trait PartitionedBatchReader<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
26 BatchReader<Val = DynPair<K, V>, Time = ()>
27{
28}
29impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedBatchReader<K, V> for B where
30 B: BatchReader<Val = DynPair<K, V>, Time = ()>
31{
32}
33
34pub trait PartitionedBatch<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
36 Batch<Val = DynPair<K, V>, Time = ()>
37{
38}
39impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedBatch<K, V> for B where
40 B: Batch<Val = DynPair<K, V>, Time = ()>
41{
42}
43
44pub trait PartitionedIndexedZSet<K: DataTrait + ?Sized, V: DataTrait + ?Sized>:
45 IndexedZSet<Val = DynPair<K, V>>
46{
47}
48
49impl<K: DataTrait + ?Sized, V: DataTrait + ?Sized, B> PartitionedIndexedZSet<K, V> for B where
50 B: IndexedZSet<Val = DynPair<K, V>>
51{
52}
53
54pub struct PartitionCursor<'b, PK, K, V, R, C>
58where
59 PK: DataTrait + ?Sized,
60 K: DataTrait + ?Sized,
61 V: DataTrait + ?Sized,
62 R: WeightTrait + ?Sized,
63{
64 cursor: &'b mut C,
65 key: Box<K>,
66 weight: Box<R>,
67 phantom: PhantomData<fn(&PK, &V, &R)>,
68}
69
70impl<'b, PK, K, V, R, C> PartitionCursor<'b, PK, K, V, R, C>
71where
72 C: Cursor<PK, DynPair<K, V>, (), R>,
73 PK: DataTrait + ?Sized,
74 K: DataTrait + ?Sized,
75 V: DataTrait + ?Sized,
76 R: WeightTrait + ?Sized,
77{
78 pub fn new(cursor: &'b mut C) -> Self {
79 let key = clone_box(cursor.val().fst());
80 let weight = clone_box(cursor.weight());
81 Self {
82 cursor,
83 key,
84 weight,
85 phantom: PhantomData,
86 }
87 }
88}
89
90impl<C, PK, K, V, R> Cursor<K, V, (), R> for PartitionCursor<'_, PK, K, V, R, C>
91where
92 C: Cursor<PK, DynPair<K, V>, (), R>,
93 PK: DataTrait + ?Sized,
94 K: DataTrait + ?Sized,
95 V: DataTrait + ?Sized,
96 R: WeightTrait + ?Sized,
97{
98 fn weight_factory(&self) -> &'static dyn Factory<R> {
99 self.cursor.weight_factory()
100 }
101
102 fn key_valid(&self) -> bool {
103 self.cursor.val_valid()
104 }
105
106 fn val_valid(&self) -> bool {
107 self.cursor.val_valid() && self.cursor.val().fst() == self.key.as_ref()
108 }
109
110 fn key(&self) -> &K {
111 &self.key
112 }
113
114 fn val(&self) -> &V {
115 self.cursor.val().snd()
116 }
117
118 fn map_times(&mut self, logic: &mut dyn FnMut(&(), &R)) {
119 self.cursor.map_times(logic)
120 }
121
122 fn map_times_through(&mut self, upper: &(), logic: &mut dyn FnMut(&(), &R)) {
123 self.cursor.map_times_through(upper, logic)
124 }
125
126 fn weight(&mut self) -> &R {
127 self.cursor.weight()
128 }
129
130 fn weight_checked(&mut self) -> &R {
131 self.weight()
132 }
133
134 fn map_values(&mut self, logic: &mut dyn FnMut(&V, &R)) {
135 while self.cursor.val_valid() {
136 if self.cursor.val().fst() == self.key.as_ref() {
137 self.cursor.weight().clone_to(&mut self.weight);
138 logic(self.cursor.val().snd(), &self.weight);
139 self.cursor.step_val();
140 } else {
141 self.cursor.val().fst().clone_to(&mut self.key);
142 break;
143 }
144 }
145 }
146
147 fn step_key(&mut self) {
148 while self.cursor.val_valid() {
149 if self.cursor.val().fst() == self.key.as_ref() {
150 self.cursor.step_val();
151 } else {
152 self.cursor.val().fst().clone_to(&mut self.key);
153 break;
154 }
155 }
156 }
157
158 fn step_key_reverse(&mut self) {
159 while self.cursor.val_valid() {
160 if self.cursor.val().fst() == self.key.as_ref() {
161 self.cursor.step_val_reverse();
162 } else {
163 self.cursor.val().fst().clone_to(&mut self.key);
164 break;
165 }
166 }
167 }
168
169 fn seek_key(&mut self, key: &K) {
170 self.cursor.seek_val_with(&|kv| kv.fst() >= key);
171 if self.cursor.val_valid() {
172 self.cursor.val().fst().clone_to(&mut self.key);
173 }
174 }
175
176 fn seek_key_exact(&mut self, key: &K, _hash: Option<u64>) -> bool {
177 self.seek_key(key);
178 self.key_valid() && self.key().eq(key)
179 }
180
181 fn seek_key_with(&mut self, predicate: &dyn Fn(&K) -> bool) {
182 self.cursor.seek_val_with(&|kv| predicate(kv.fst()));
183 if self.cursor.val_valid() {
184 self.cursor.val().fst().clone_to(&mut self.key);
185 }
186 }
187
188 fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&K) -> bool) {
189 self.cursor.seek_val_with_reverse(&|kv| predicate(kv.fst()));
190 if self.cursor.val_valid() {
191 self.cursor.val().fst().clone_to(&mut self.key);
192 }
193 }
194
195 fn seek_key_reverse(&mut self, key: &K) {
196 self.cursor.seek_val_with_reverse(&|kv| kv.fst() <= key);
197 if self.cursor.val_valid() {
198 self.cursor.val().fst().clone_to(&mut *self.key);
199 }
200 }
201
202 fn step_val(&mut self) {
203 self.cursor.step_val();
204 }
205
206 fn seek_val(&mut self, _val: &V) {
207 unimplemented!()
208 }
209
210 fn seek_val_with(&mut self, _predicate: &dyn Fn(&V) -> bool) {
211 unimplemented!()
212 }
213
214 fn rewind_keys(&mut self) {
215 debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
216
217 self.cursor.rewind_vals();
218 self.cursor.val().fst().clone_to(&mut self.key);
219 }
220
221 fn fast_forward_keys(&mut self) {
222 debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
223 self.cursor.fast_forward_vals();
224 self.cursor.val().fst().clone_to(&mut self.key);
225 }
226
227 fn rewind_vals(&mut self) {
228 unimplemented!()
229 }
230
231 fn step_val_reverse(&mut self) {
232 self.cursor.step_val_reverse();
233 }
234
235 fn seek_val_reverse(&mut self, _val: &V) {
236 unimplemented!()
237 }
238
239 fn seek_val_with_reverse(&mut self, _predicate: &dyn Fn(&V) -> bool) {
240 unimplemented!()
241 }
242
243 fn fast_forward_vals(&mut self) {
244 unimplemented!()
245 }
246
247 fn position(&self) -> Option<Position> {
248 None
249 }
250}
251
252pub type OrdPartitionedIndexedZSet<PK, TS, V> = OrdIndexedZSet<PK, DynPair<TS, V>>;