dbsp/operator/input.rs
1use crate::{
2 Circuit, DBData, DynZWeight, RootCircuit, Runtime, Stream, TypedBox, ZWeight,
3 circuit::{
4 LocalStoreMarker, Scope,
5 metadata::OperatorLocation,
6 operator_traits::{Operator, SourceOperator},
7 },
8 dynamic::{DowncastTrait, DynBool, DynData, DynPair, DynPairs, DynUnit, Erase, LeanVec},
9 operator::dynamic::{
10 input::{
11 AddInputIndexedZSetFactories, AddInputMapFactories, AddInputMapWithWaterlineFactories,
12 AddInputSetFactories, AddInputZSetFactories, CollectionHandle, UpsertHandle,
13 },
14 input_upsert::DynUpdate,
15 },
16 typed_batch::{OrdIndexedZSet, OrdZSet},
17 utils::Tup2,
18};
19use std::{
20 borrow::{Borrow, Cow},
21 collections::VecDeque,
22 fmt::Debug,
23 hash::{Hash, Hasher},
24 marker::PhantomData,
25 mem::{replace, take, transmute},
26 ops::{Deref, Range},
27 panic::Location,
28 sync::{Arc, Mutex},
29};
30use typedmap::TypedMapKey;
31
32pub use crate::operator::dynamic::input_upsert::{PatchFunc, Update};
33
34pub type IndexedZSetStream<K, V> = Stream<RootCircuit, OrdIndexedZSet<K, V>>;
35pub type ZSetStream<K> = Stream<RootCircuit, OrdZSet<K>>;
36
37/// Input prepared for flushing into an input handle.
38///
39/// [ZSetHandle], [IndexedZSetHandle], [SetHandle], and [MapHandle] all support
40/// similar ways to push data into a circuit. The following discussion just
41/// talks about `ZSetHandle`, for clarity.
42///
43/// There are two ways to push data into a circuit with [ZSetHandle]:
44///
45/// - Immediately, either one data point at a time with
46/// [push](ZSetHandle::push), or a vector at a time with, e.g.,
47/// [append](ZSetHandle::append).
48///
49/// - Preparing data in advance into [StagedBuffers] using
50/// `stage`. Then, later, calling [StagedBuffers::flush] pushes
51/// the input buffers into the circuit.
52///
53/// Both approaches are equally correct. They can differ in performance,
54/// because [push](ZSetHandle::push) and [append](ZSetHandle::append) have a
55/// significant cost for a large number of records. Using [StagedBuffers] has a
56/// similar cost, but it incurs it in the call to `stage` rather
57/// than in [StagedBuffers::flush]. This means that, if the code driving the
58/// circuit can buffer data ahead of the circuit's demand for it, the cost can
59/// be hidden and data processing as a whole runs faster.
60pub trait StagedBuffers: Send + Sync {
61 /// Flushes the data gathered into this buffer to the circuit.
62 fn flush(&mut self);
63}
64
65pub struct ZSetStagedBuffers {
66 input_handle: InputHandle<Vec<Box<DynPairs<DynPair<DynData, DynUnit>, DynZWeight>>>>,
67 vals: Vec<Box<DynPairs<DynPair<DynData, DynUnit>, DynZWeight>>>,
68}
69
70impl StagedBuffers for ZSetStagedBuffers {
71 fn flush(&mut self) {
72 for (worker, vals) in self.vals.drain(..).enumerate() {
73 self.input_handle.update_for_worker(worker, |tuples| {
74 tuples.push(vals);
75 });
76 }
77 }
78}
79
80#[repr(transparent)]
81pub struct ZSetHandle<K> {
82 handle: CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>,
83 phantom: PhantomData<fn(&K)>,
84}
85
86impl<K> Clone for ZSetHandle<K> {
87 fn clone(&self) -> Self {
88 Self {
89 handle: self.handle.clone(),
90 phantom: PhantomData,
91 }
92 }
93}
94
95impl<K> Deref for ZSetHandle<K> {
96 type Target = CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>;
97
98 fn deref(&self) -> &Self::Target {
99 &self.handle
100 }
101}
102
103impl<K> ZSetHandle<K>
104where
105 K: DBData,
106{
107 fn new(handle: CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>) -> Self {
108 Self {
109 handle,
110 phantom: PhantomData,
111 }
112 }
113
114 pub fn push(&self, k: K, mut w: ZWeight) {
115 self.handle.dyn_push(Tup2(k, ()).erase_mut(), w.erase_mut())
116 }
117
118 pub fn append(&self, vals: &mut Vec<Tup2<K, ZWeight>>) {
119 // SAFETY: `()` is a zero-sized type, more precisely it's a 1-ZST.
120 // According to the Rust spec adding it to a tuple doesn't change
121 // its memory layout.
122 let vals: &mut Vec<Tup2<Tup2<K, ()>, ZWeight>> = unsafe { transmute(vals) };
123 let vals = Box::new(LeanVec::from(take(vals)));
124
125 self.handle.dyn_append(&mut vals.erase_box())
126 }
127
128 pub fn stage(
129 &self,
130 buffers: impl IntoIterator<Item = VecDeque<Tup2<K, ZWeight>>>,
131 ) -> ZSetStagedBuffers {
132 let num_partitions = self.handle.num_partitions();
133 let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
134 let mut next_worker = 0;
135 for vals in buffers {
136 let mut vec = Vec::from(vals);
137 // SAFETY: `()` is a zero-sized type, more precisely it's a 1-ZST.
138 // According to the Rust spec adding it to a tuple doesn't change
139 // its memory layout.
140 let vals: &mut Vec<Tup2<Tup2<K, ()>, ZWeight>> = unsafe { transmute(&mut vec) };
141 let vals = Box::new(LeanVec::from(take(vals)));
142
143 self.handle
144 .dyn_stage(&mut vals.erase_box(), &mut next_worker, &mut partitions);
145 }
146 ZSetStagedBuffers {
147 input_handle: self.handle.input_handle.clone(),
148 vals: partitions,
149 }
150 }
151}
152
153pub struct IndexedZSetStagedBuffers {
154 input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynPair<DynData, DynZWeight>>>>>,
155 vals: Vec<Box<DynPairs<DynData, DynPair<DynData, DynZWeight>>>>,
156}
157
158impl StagedBuffers for IndexedZSetStagedBuffers {
159 fn flush(&mut self) {
160 for (worker, vals) in self.vals.drain(..).enumerate() {
161 self.input_handle.update_for_worker(worker, |tuples| {
162 tuples.push(vals);
163 });
164 }
165 }
166}
167
168#[derive(Clone)]
169#[repr(transparent)]
170pub struct IndexedZSetHandle<K, V> {
171 handle: CollectionHandle<DynData, DynPair<DynData, DynZWeight>>,
172 phantom: PhantomData<fn(&K, &V)>,
173}
174
175impl<K, V> IndexedZSetHandle<K, V>
176where
177 K: DBData,
178 V: DBData,
179{
180 fn new(handle: CollectionHandle<DynData, DynPair<DynData, DynZWeight>>) -> Self {
181 Self {
182 handle,
183 phantom: PhantomData,
184 }
185 }
186
187 pub fn push(&self, mut k: K, (v, w): (V, ZWeight)) {
188 self.handle.dyn_push(k.erase_mut(), Tup2(v, w).erase_mut())
189 }
190
191 pub fn append(&self, vals: &mut Vec<Tup2<K, Tup2<V, ZWeight>>>) {
192 let vals = Box::new(LeanVec::from(take(vals)));
193 self.handle.dyn_append(&mut vals.erase_box())
194 }
195}
196
197pub struct SetStagedBuffers {
198 input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynBool>>>>,
199 vals: Vec<Box<DynPairs<DynData, DynBool>>>,
200}
201
202impl StagedBuffers for SetStagedBuffers {
203 fn flush(&mut self) {
204 for (worker, vals) in self.vals.drain(..).enumerate() {
205 self.input_handle.update_for_worker(worker, |tuples| {
206 tuples.push(vals);
207 });
208 }
209 }
210}
211
212#[repr(transparent)]
213pub struct SetHandle<K> {
214 handle: UpsertHandle<DynData, DynBool>,
215 phantom: PhantomData<fn(&K)>,
216}
217
218impl<K> Clone for SetHandle<K> {
219 fn clone(&self) -> Self {
220 Self {
221 handle: self.handle.clone(),
222 phantom: PhantomData,
223 }
224 }
225}
226
227impl<K> SetHandle<K>
228where
229 K: DBData,
230{
231 fn new(handle: UpsertHandle<DynData, DynBool>) -> Self {
232 Self {
233 handle,
234 phantom: PhantomData,
235 }
236 }
237
238 pub fn push(&self, mut k: K, mut v: bool) {
239 self.handle.dyn_push(k.erase_mut(), v.erase_mut())
240 }
241
242 pub fn append(&mut self, vals: &mut Vec<Tup2<K, bool>>) {
243 let vals = Box::new(LeanVec::from(take(vals)));
244 self.handle.dyn_append(&mut vals.erase_box())
245 }
246
247 pub fn stage(
248 &self,
249 buffers: impl IntoIterator<Item = VecDeque<Tup2<K, bool>>>,
250 ) -> SetStagedBuffers {
251 let num_partitions = self.handle.num_partitions();
252 let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
253 for vals in buffers {
254 let vec = Vec::from(vals);
255 let vals = Box::new(LeanVec::from(vec));
256 self.handle
257 .dyn_stage(&mut vals.erase_box(), &mut partitions);
258 }
259 SetStagedBuffers {
260 input_handle: self.handle.input_handle.clone(),
261 vals: partitions,
262 }
263 }
264}
265
266pub struct MapStagedBuffers {
267 input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynUpdate<DynData, DynData>>>>>,
268 vals: Vec<Box<DynPairs<DynData, DynUpdate<DynData, DynData>>>>,
269}
270
271impl StagedBuffers for MapStagedBuffers {
272 fn flush(&mut self) {
273 for (worker, vals) in self.vals.drain(..).enumerate() {
274 self.input_handle.update_for_worker(worker, |tuples| {
275 tuples.push(vals);
276 });
277 }
278 }
279}
280
281#[repr(transparent)]
282pub struct MapHandle<K, V, U> {
283 handle: UpsertHandle<DynData, DynUpdate<DynData, DynData>>,
284 phantom: PhantomData<fn(&K, &V, &U)>,
285}
286
287impl<K, V, U> Clone for MapHandle<K, V, U> {
288 fn clone(&self) -> Self {
289 Self {
290 handle: self.handle.clone(),
291 phantom: PhantomData,
292 }
293 }
294}
295
296impl<K, V, U> MapHandle<K, V, U>
297where
298 K: DBData,
299 V: DBData,
300 U: DBData,
301{
302 fn new(handle: UpsertHandle<DynData, DynUpdate<DynData, DynData>>) -> Self {
303 Self {
304 handle,
305 phantom: PhantomData,
306 }
307 }
308
309 pub fn push(&self, mut k: K, mut upd: Update<V, U>) {
310 self.handle.dyn_push(k.erase_mut(), upd.erase_mut())
311 }
312
313 pub fn append(&mut self, vals: &mut Vec<Tup2<K, Update<V, U>>>) {
314 let vals = Box::new(LeanVec::from(take(vals)));
315 self.handle.dyn_append(&mut vals.erase_box())
316 }
317
318 pub fn stage(
319 &self,
320 buffers: impl IntoIterator<Item = VecDeque<Tup2<K, Update<V, U>>>>,
321 ) -> MapStagedBuffers {
322 let num_partitions = self.handle.num_partitions();
323 let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
324 for vals in buffers {
325 let vec = Vec::from(vals);
326 let vals = Box::new(LeanVec::from(vec));
327 self.handle
328 .dyn_stage(&mut vals.erase_box(), &mut partitions);
329 }
330 MapStagedBuffers {
331 input_handle: self.handle.input_handle.clone(),
332 vals: partitions,
333 }
334 }
335}
336
337impl RootCircuit {
338 /// Create an input stream that carries values of type `T`.
339 ///
340 /// Input streams are used to push data to the circuit from the outside
341 /// world via the [`InputHandle`] object returned by this method:
342 ///
343 /// ```text
344 /// ┌──────────────────────┐
345 /// │Circuit │
346 /// │ │
347 /// ┌───────────┐ │ stream │
348 /// │InputHandle├──────────────────► │
349 /// └───────────┘ │ │
350 /// │ │
351 /// └──────────────────────┘
352 /// ```
353 ///
354 /// At each clock cycle, the stream consumes the last value placed in it via
355 /// the `InputHandle` (or `<T as Default>::default()` if no value was
356 /// placed in the stream since the last clock cycle) and yields this
357 /// value to all downstream operators connected to it.
358 ///
359 /// See [`InputHandle`] for more details.
360 #[track_caller]
361 pub fn add_input_stream<T>(&self) -> (Stream<Self, T>, InputHandle<T>)
362 where
363 T: Default + Debug + Clone + Send + 'static,
364 {
365 let (input, input_handle) =
366 Input::new(Location::caller(), |x| x, Arc::new(|| Default::default()));
367 let stream = self.add_source(input);
368 (stream, input_handle)
369 }
370
371 /// Create an input stream that carries values of type
372 /// [`OrdZSet<K>`](`OrdZSet`).
373 ///
374 /// Creates an input stream that carries values of type `OrdZSet<K>` and
375 /// an input handle of type [`ZSetHandle<K>`](`ZSetHandle`)
376 /// used to construct input Z-sets out of individual elements. The
377 /// client invokes [`ZSetHandle::push`] and
378 /// [`ZSetHandle::append`] any number of times to add values to
379 /// the input Z-set. These values are distributed across all worker
380 /// threads (when running in a multithreaded [`Runtime`]) in a round-robin
381 /// fashion and buffered until the start of the next clock
382 /// cycle. At the start of a clock cycle (triggered by
383 /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
384 /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), the circuit
385 /// reads all buffered values and assembles them into an `OrdZSet`.
386 ///
387 /// See [`CollectionHandle`] for more details.
388 #[track_caller]
389 pub fn add_input_zset<K>(&self) -> (Stream<RootCircuit, OrdZSet<K>>, ZSetHandle<K>)
390 where
391 K: DBData,
392 {
393 let factories = AddInputZSetFactories::new::<K>();
394 let (stream, handle) = self.dyn_add_input_zset_mono(&factories);
395
396 (stream.typed(), ZSetHandle::new(handle))
397 }
398
399 /// Create an input stream that carries values of type
400 /// [`OrdIndexedZSet<K, V>`](`OrdIndexedZSet`).
401 ///
402 /// Creates an input stream that carries values of type `OrdIndexedZSet<K, V>`
403 /// and an input handle of type [`IndexedZSetHandle<K, V>`](`IndexedZSetHandle`)
404 /// used to construct input Z-sets out of individual elements. The client
405 /// invokes [`IndexedZSetHandle::push`] and [`IndexedZSetHandle::append`] any number
406 /// of times to add `key/value/weight` triples to the indexed Z-set. These triples
407 /// are distributed across all worker threads (when running in a
408 /// multithreaded [`Runtime`]) in a round-robin fashion, and
409 /// buffered until the start of the next clock cycle. At the start of a
410 /// clock cycle (triggered by
411 /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
412 /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), the circuit
413 /// reads all buffered values and assembles them into an `OrdIndexedZSet`.
414 ///
415 /// See [`CollectionHandle`] for more details.
416 #[allow(clippy::type_complexity)]
417 #[track_caller]
418 pub fn add_input_indexed_zset<K, V>(
419 &self,
420 ) -> (
421 Stream<RootCircuit, OrdIndexedZSet<K, V>>,
422 IndexedZSetHandle<K, V>,
423 )
424 where
425 K: DBData,
426 V: DBData,
427 {
428 let factories = AddInputIndexedZSetFactories::new::<K, V>();
429 let (stream, handle) = self.dyn_add_input_indexed_zset_mono(&factories);
430
431 (stream.typed(), IndexedZSetHandle::new(handle))
432 }
433
434 /// Create an input table with set semantics.
435 ///
436 /// # Motivation
437 ///
438 /// DBSP represents relational data using Z-sets, i.e., tables where each
439 /// record has a weight, which denotes the number of times the record occurs
440 /// in the table. Updates to Z-sets are also Z-sets, with
441 /// positive weights representing insertions and negative weights
442 /// representing deletions. The contents of the Z-set after an update
443 /// is computed by summing up the weights associated with each record.
444 /// Z-set updates are commutative, e.g., insert->insert->delete and
445 /// insert->delete->insert sequences are both equivalent to a single
446 /// insert. This internal representation enables efficient incremental
447 /// computation, but it does not always match the data model used by the
448 /// outside world, and may require a translation layer to eliminate this
449 /// mismatch when ingesting data into DBSP.
450 ///
451 /// In particular, input tables often behave as sets. A set is a special
452 /// case of a Z-set where all weights are equal to 1. Duplicate
453 /// insertions and deletions to sets are ignored, i.e., inserting an
454 /// existing element or deleting an element not in the set are both
455 /// no-ops. Set updates are not commutative, e.g., the
456 /// insert->delete->insert sequence is equivalent to a single insert,
457 /// while insert->insert->delete is equivalent to a delete.
458 ///
459 /// # Details
460 ///
461 /// The `add_input_set` operator creates an input table that internally
462 /// appears as a Z-set with unit weights, but that ingests input data
463 /// using set semantics. It returns a stream that carries values of type
464 /// `OrdZSet<K, R>` and an input handle of type
465 /// [`SetHandle<K>`](`SetHandle`). The client uses
466 /// [`SetHandle::push`] and [`SetHandle::append`] to submit
467 /// commands of the form `(val, true)` to insert an element to the set
468 /// and `(val, false) ` to delete `val` from the set. These commands
469 /// are buffered until the start of the next clock cycle.
470 ///
471 /// At the start of a clock cycle (triggered by
472 /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
473 /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), DBSP applies
474 /// buffered commands in order and computes an update to the input set as
475 /// an `OrdZSet` with weights `+1` and `-1` representing set insertions and
476 /// deletions respectively. The following table illustrates the
477 /// relationship between input commands, the contents of the set and the
478 /// contents of the stream produced by this operator:
479 ///
480 /// ```text
481 /// time │ input commands │content of the │ stream returned by │ comment
482 /// │ │input set │ `add_input_set` │
483 /// ─────┼──────────────────────────────┼─────────────────┼────────────────────────┼───────────────────────────────────────────────────────
484 /// 1 │{("foo",true),("bar",true)} │ {"foo","bar"} │ {("foo",+1),("bar",+1)}│
485 /// 2 │{("foo",true),("bar",false)} │ {"foo"} │ {("bar",-1)} │ignore duplicate insert of "foo"
486 /// 3 │{("foo",false),("foo",true)} │ {"foo"} │ {} │deleting and re-inserting "foo" is a no-op
487 /// 4 │{("foo",false),("bar",false)} │ {} │ {("foo",-1)} │deleting value "bar" that is not in the set is a no-op
488 /// ─────┴──────────────────────────────┴─────────────────┴────────────────────────┴────────────────────────────────────────────────────────
489 /// ```
490 ///
491 /// Internally, this operator maintains the contents of the input set
492 /// partitioned across all worker threads based on the hash of the
493 /// value. Insert/delete commands are routed to the worker in charge of
494 /// the given value.
495 ///
496 /// # Data retention
497 ///
498 /// Applying [`Stream::integrate_trace_retain_keys`], and
499 /// [`Stream::integrate_trace_with_bound`] methods to the stream has the
500 /// additional effect of filtering out all values that don't satisfy the
501 /// retention policy configured by these methods from the stream.
502 /// Specifically, retention conditions configured at logical time `t`
503 /// are applied starting from logical time `t+1`.
504 // TODO: Add a version that takes a custom hash function.
505 #[track_caller]
506 pub fn add_input_set<K>(&self) -> (Stream<RootCircuit, OrdZSet<K>>, SetHandle<K>)
507 where
508 K: DBData,
509 {
510 let factories = AddInputSetFactories::new::<K>();
511 let (stream, handle) = self.dyn_add_input_set_mono(None, &factories);
512
513 (stream.typed(), SetHandle::new(handle))
514 }
515
516 /// Create an input table as a key-value map with upsert update semantics.
517 ///
518 /// # Motivation
519 ///
520 /// DBSP represents indexed data using indexed Z-sets, i.e., sets
521 /// of `(key, value, weight)` tuples, where `weight`
522 /// denotes the number of times the key-value pair occurs in
523 /// the table. Updates to indexed Z-sets are also indexed Z-sets, with
524 /// positive weights representing insertions and negative weights
525 /// representing deletions. The contents of the indexed Z-set after an
526 /// update is computed by summing up weights associated with each
527 /// key-value pair. This representation enables efficient incremental
528 /// computation, but it does not always match the data model used by the
529 /// outside world, and may require a translation layer to eliminate this
530 /// mismatch when ingesting indexed data into DBSP.
531 ///
532 /// In particular, input tables often behave as key-value maps.
533 /// A map is a special case of an indexed Z-set where each key has
534 /// a unique value associated with it and where all weights are 1.
535 /// Map updates follow the update-or-insert (*upsert*) semantics,
536 /// where inserting a new key-value pair overwrites the old value
537 /// associated with the key, if any.
538 ///
539 /// # Details
540 ///
541 /// The `add_input_map` operator creates an input table that internally
542 /// appears as an indexed Z-set with all unit weights, but that ingests
543 /// input data using upsert semantics. It returns a stream that carries
544 /// values of type `OrdIndexedZSet<K, V, R>` and an input handle of type
545 /// [`MapHandle<K, V>`](`MapHandle`). The client uses
546 /// [`MapHandle::push`] and [`MapHandle::append`] to submit
547 /// commands of the form `(key, Update::Insert(val))` to insert a new
548 /// key-value pair, `(key, Update::Delete)` to delete the value
549 /// associated with `key`, and `(key, Update::Update)` to modify the
550 /// values associated with `key`, if it exists. These commands are
551 /// buffered until the start of the next clock cycle.
552 ///
553 /// At the start of a clock cycle (triggered by
554 /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
555 /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)),
556 /// DBSP applies buffered commands in order and
557 /// computes an update to the input set as an `OrdIndexedZSet` with weights
558 /// `+1` and `-1` representing insertions and deletions respectively.
559 /// The following table illustrates the relationship between input commands,
560 /// the contents of the map and the contents of the stream produced by this
561 /// operator:
562 ///
563 /// ```text
564 /// time │ input commands │content of the │ stream returned by │ comment
565 /// │ │input map │ `add_input_map` │
566 /// ─────┼─────────────────────────────────────────┼──────────────────────┼────────────────────────────┼───────────────────────────────────────────────────────
567 /// 1 │{(1,Insert("foo"), (2,Insert("bar"))} │{(1,"foo"),(2,"bar")} │ {(1,"foo",+1),(2,"bar",+1)}│
568 /// 2 │{(1,Insert("foo"), (2,Insert("baz"))} │{(1,"foo"),(2,"baz")} │ {(2,"bar",-1),(2,"baz",+1)}│ Ignore duplicate insert of (1,"foo"). New value
569 /// | | | | "baz" for key 2 overwrites the old value "bar".
570 /// 3 │{(1,Delete),(2,Insert("bar")),(2,Delete)}│{} │ {(1,"foo",-1),(2,"baz",-1)}│ Delete both keys. Upsert (2,"bar") is overridden
571 /// | | | | by subsequent delete command.
572 /// 4 |{(1,Update("new")), (2,Update("bar"))} |{(1,"foo")} | {(1,"new")} | Note that the second update is ignored because
573 /// | | | | key 2 is not present in the map.
574 /// ─────┴─────────────────────────────────────────┴──────────────────────┴────────────────────────────┴────────────────────────────────────────────────────────
575 /// ```
576 ///
577 /// Note that upsert commands cannot fail. Duplicate inserts and deletes
578 /// are simply ignored.
579 ///
580 /// Internally, this operator maintains the contents of the map
581 /// partitioned across all worker threads based on the hash of the
582 /// key. Upsert/delete commands are routed to the worker in charge of
583 /// the given key.
584 ///
585 /// # Data retention
586 ///
587 /// Applying the [`Stream::integrate_trace_retain_keys`] to the stream has the
588 /// additional effect of filtering out all updates that don't satisfy the
589 /// retention policy.
590 /// In particular, this means that attempts to overwrite, update, or delete
591 /// a key-value pair whose key doesn't satisfy current retention
592 /// conditions are ignored, since all these operations involve deleting
593 /// an existing tuple.
594 ///
595 /// Retention conditions configured at logical time `t`
596 /// are applied starting from logical time `t+1`.
597 ///
598 /// FIXME: see <https://github.com/feldera/feldera/issues/2669>
599 // TODO: Add a version that takes a custom hash function.
600 #[track_caller]
601 pub fn add_input_map<K, V, U, PF>(
602 &self,
603 patch_func: PF,
604 ) -> (
605 Stream<RootCircuit, OrdIndexedZSet<K, V>>,
606 MapHandle<K, V, U>,
607 )
608 where
609 K: DBData,
610 V: DBData,
611 U: DBData + Erase<DynData>,
612 PF: Fn(&mut V, &U) + 'static,
613 {
614 self.add_input_map_persistent(None, patch_func)
615 }
616
617 #[track_caller]
618 pub fn add_input_map_persistent<K, V, U, PF>(
619 &self,
620 persistent_id: Option<&str>,
621 patch_func: PF,
622 ) -> (
623 Stream<RootCircuit, OrdIndexedZSet<K, V>>,
624 MapHandle<K, V, U>,
625 )
626 where
627 K: DBData,
628 V: DBData,
629 U: DBData + Erase<DynData>,
630 PF: Fn(&mut V, &U) + 'static,
631 {
632 let factories = AddInputMapFactories::new::<K, V, U>();
633 let (stream, handle) = self.dyn_add_input_map_mono(
634 persistent_id,
635 &factories,
636 Box::new(move |v: &mut DynData, u: &DynData| unsafe {
637 patch_func(v.downcast_mut::<V>(), u.downcast::<U>())
638 }),
639 );
640
641 (stream.typed(), MapHandle::new(handle))
642 }
643
644 /// Like `add_input_map`, but additionally tracks a waterline of the input collection and
645 /// rejects inputs that are below the waterline.
646 ///
647 /// An input is rejected if the input record itself is below the waterline or if the existing
648 /// record it replaces is below the waterline.
649 ///
650 /// # Type arguments
651 ///
652 /// - `K`: The type of the key.
653 /// - `V`: The type of the value.
654 /// - `U`: The type that represents updates to values.
655 /// - `W`: The type of the waterline.
656 /// - `E`: The type of the error that is reported when the waterline is violated.
657 ///
658 /// # Arguments
659 ///
660 /// - `patch_func`: A function that applies the update to the value.
661 /// - `init`: A function that initializes the waterline.
662 /// - `extract_ts`: A function that extracts the timestamp from the key/value pair.
663 /// - `least_upper_bound`: A function that computes the least upper bound of two waterlines.
664 /// - `filter_func`: A function that filters out records below the waterline.
665 /// - `report_func`: A function that reports errors when the waterline is violated.
666 ///
667 /// # Returns
668 ///
669 /// - Stream of changes to the collection.
670 /// - Error stream that reports waterline violations.
671 /// - Stream of waterline values.
672 /// - Input handle that allows pushing updates to the collection.
673 ///
674 /// # Garbage collection
675 ///
676 /// This function supports waterlines over both key and values components of the tuple.
677 /// In case the waterline is applied to the key component, the internal index maintained
678 /// by this function can be GC'd by calling [`Stream::integrate_trace_retain_keys`]: on
679 /// the output stream returned by this function:
680 /// `stream.integrate_trace_retain_keys(&waterline, |key, wl| *key >= *wl)`, where
681 /// `waterline` is the stream of waterline values returned by this function.
682 #[track_caller]
683 pub fn add_input_map_with_waterline<K, V, U, W, E, PF, IF, WF, LB, FF, RF>(
684 &self,
685 patch_func: PF,
686 init: IF,
687 extract_ts: WF,
688 least_upper_bound: LB,
689 filter_func: FF,
690 report_func: RF,
691 ) -> (
692 Stream<RootCircuit, OrdIndexedZSet<K, V>>,
693 Stream<RootCircuit, OrdZSet<E>>,
694 Stream<RootCircuit, TypedBox<W, DynData>>,
695 MapHandle<K, V, U>,
696 )
697 where
698 K: DBData,
699 V: DBData,
700 U: DBData + Erase<DynData>,
701 W: DBData,
702 E: DBData,
703 PF: Fn(&mut V, &U) + 'static,
704 IF: Fn() -> W + 'static,
705 WF: Fn(&K, &V) -> W + 'static,
706 LB: Fn(&W, &W) -> W + Clone + 'static,
707 FF: Fn(&W, &K, &V) -> bool + 'static,
708 RF: Fn(&W, &K, &V, ZWeight) -> E + 'static,
709 {
710 self.add_input_map_with_waterline_persistent(
711 None,
712 patch_func,
713 init,
714 extract_ts,
715 least_upper_bound,
716 filter_func,
717 report_func,
718 )
719 }
720
721 #[allow(clippy::too_many_arguments)]
722 #[track_caller]
723 pub fn add_input_map_with_waterline_persistent<K, V, U, W, E, PF, IF, WF, LB, FF, RF>(
724 &self,
725 persistent_id: Option<&str>,
726 patch_func: PF,
727 init: IF,
728 extract_ts: WF,
729 least_upper_bound: LB,
730 filter_func: FF,
731 report_func: RF,
732 ) -> (
733 Stream<RootCircuit, OrdIndexedZSet<K, V>>,
734 Stream<RootCircuit, OrdZSet<E>>,
735 Stream<RootCircuit, TypedBox<W, DynData>>,
736 MapHandle<K, V, U>,
737 )
738 where
739 K: DBData,
740 V: DBData,
741 U: DBData + Erase<DynData>,
742 W: DBData + Erase<DynData>,
743 E: DBData + Erase<DynData>,
744 PF: Fn(&mut V, &U) + 'static,
745 IF: Fn() -> W + 'static,
746 WF: Fn(&K, &V) -> W + 'static,
747 LB: Fn(&W, &W) -> W + Clone + 'static,
748 FF: Fn(&W, &K, &V) -> bool + 'static,
749 RF: Fn(&W, &K, &V, ZWeight) -> E + 'static,
750 {
751 let factories = AddInputMapWithWaterlineFactories::new::<K, V, U, E>();
752 let (stream, errors, waterline, handle) = self.dyn_add_input_map_with_waterline_mono(
753 persistent_id,
754 &factories,
755 Box::new(move |v: &mut DynData, u: &DynData| unsafe {
756 patch_func(v.downcast_mut::<V>(), u.downcast::<U>())
757 }),
758 Box::new(move || Box::new(init())),
759 Box::new(move |k: &DynData, v: &DynData, ts: &mut DynData| {
760 let k = unsafe { k.downcast::<K>() };
761 let v = unsafe { v.downcast::<V>() };
762 let w = unsafe { ts.downcast_mut::<W>() };
763
764 *w = extract_ts(k, v);
765 }),
766 Box::new(move |a: &DynData, b: &DynData, ts: &mut DynData| {
767 let a = unsafe { a.downcast::<W>() };
768 let b = unsafe { b.downcast::<W>() };
769 let ts = unsafe { ts.downcast_mut::<W>() };
770 *ts = least_upper_bound(a, b)
771 }),
772 Box::new(move |wl: &DynData, k: &DynData, v: &DynData| {
773 let wl = unsafe { wl.downcast::<W>() };
774 let k = unsafe { k.downcast::<K>() };
775 let v = unsafe { v.downcast::<V>() };
776
777 filter_func(wl, k, v)
778 }),
779 Box::new(
780 move |wl: &DynData, k: &DynData, v: &DynData, w: ZWeight, err: &mut DynData| {
781 let wl = unsafe { wl.downcast::<W>() };
782 let k = unsafe { k.downcast::<K>() };
783 let v = unsafe { v.downcast::<V>() };
784 let err = unsafe { err.downcast_mut::<E>() };
785
786 *err = report_func(wl, k, v, w);
787 },
788 ),
789 );
790
791 (
792 stream.typed(),
793 errors.typed(),
794 unsafe { waterline.typed_data() },
795 MapHandle::new(handle),
796 )
797 }
798}
799
800/// `TypedMapKey` entry used to share InputHandle objects across workers in a
801/// runtime. The first worker to create the handle will store it in the map,
802/// subsequent workers will get a clone of the same handle.
803struct InputId<T> {
804 id: usize,
805 _marker: PhantomData<T>,
806}
807
808unsafe impl<T> Sync for InputId<T> {}
809
810// Implement `Hash`, `Eq` manually to avoid `T: Hash` type bound.
811impl<T> Hash for InputId<T> {
812 fn hash<H>(&self, state: &mut H)
813 where
814 H: Hasher,
815 {
816 self.id.hash(state);
817 }
818}
819
820impl<T> PartialEq for InputId<T> {
821 fn eq(&self, other: &Self) -> bool {
822 self.id == other.id
823 }
824}
825
826impl<T> Eq for InputId<T> {}
827
828impl<T> InputId<T> {
829 fn new(id: usize) -> Self {
830 Self {
831 id,
832 _marker: PhantomData,
833 }
834 }
835}
836
837impl<T> TypedMapKey<LocalStoreMarker> for InputId<T>
838where
839 T: 'static,
840{
841 type Value = InputHandle<T>;
842}
843
844/// Mailbox that buffers data between the circuit and the outside world.
845/// It is used inside an `InputHandle` to store data sent to a worker
846/// thread and inside an `OutputHandle` to store data sent by a worker
847/// thread to the outside world.
848#[derive(Clone)]
849pub(crate) struct Mailbox<T> {
850 empty: Arc<dyn Fn() -> T + Send + Sync>,
851 value: Arc<Mutex<T>>,
852}
853
854impl<T: Clone> Mailbox<T> {
855 pub(in crate::operator) fn new(empty: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
856 let v = empty();
857 Self {
858 empty,
859 value: Arc::new(Mutex::new(v)),
860 }
861 }
862
863 pub(in crate::operator) fn take(&self) -> T {
864 replace(&mut *self.value.lock().unwrap(), (self.empty)())
865 }
866
867 pub(super) fn map<F, O: 'static>(&self, func: F) -> O
868 where
869 F: Fn(&T) -> O,
870 {
871 func(self.value.lock().unwrap().borrow())
872 }
873
874 fn update<F>(&self, f: F)
875 where
876 F: FnOnce(&mut T),
877 {
878 f(&mut *self.value.lock().unwrap());
879 }
880
881 pub(in crate::operator) fn set(&self, v: T) {
882 *self.value.lock().unwrap() = v;
883 }
884
885 pub(in crate::operator) fn clear(&self) {
886 *self.value.lock().unwrap() = (self.empty)();
887 }
888}
889
890pub(crate) struct InputHandleInternal<T> {
891 pub(crate) mailbox: Vec<Mailbox<T>>,
892}
893
894impl<T> InputHandleInternal<T>
895where
896 T: Clone,
897{
898 // Returns a new `InputHandleInternal` for workers with indexes in the range
899 // of `workers`.
900 fn new(workers: Range<usize>, empty_val: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
901 assert!(!workers.is_empty());
902 Self {
903 mailbox: workers
904 .clone()
905 .map(move |_| Mailbox::new(empty_val.clone()))
906 .collect(),
907 }
908 }
909
910 fn set_for_worker(&self, worker: usize, v: T) {
911 self.mailbox(worker).set(v);
912 }
913
914 fn update_for_worker<F>(&self, worker: usize, f: F)
915 where
916 F: FnOnce(&mut T),
917 {
918 self.mailbox(worker).update(f);
919 }
920
921 /// Send the same value to all workers.
922 fn set_for_all(&self, v: T) {
923 for i in 0..self.mailbox.len() - 1 {
924 self.mailbox[i].set(v.clone());
925 }
926 self.mailbox[self.mailbox.len() - 1].set(v);
927 }
928
929 fn clear_for_all(&self) {
930 for mailbox in self.mailbox.iter() {
931 mailbox.clear();
932 }
933 }
934
935 fn mailbox(&self, worker: usize) -> &Mailbox<T> {
936 &self.mailbox[worker]
937 }
938}
939
940/// A handle used to write data to an input stream created by
941/// the [`RootCircuit::add_input_stream`] method.
942///
943/// Internally, the handle manages an array of mailboxes, one for
944/// each worker thread. At the start of each clock cycle, the
945/// circuit reads the current value from each mailbox and writes
946/// it to the input stream associated with the handle, leaving
947/// the mailbox empty (more precisely, the mailbox will contain
948/// `T::default()`). The handle is then used to write new values
949/// to the mailboxes, which will be consumed at the next
950/// logical clock tick.
951pub struct InputHandle<T>(pub(crate) Arc<InputHandleInternal<T>>);
952
953impl<T> Clone for InputHandle<T> {
954 fn clone(&self) -> Self {
955 Self(self.0.clone())
956 }
957}
958
959impl<T> InputHandle<T>
960where
961 T: Send + Clone + 'static,
962{
963 fn new(empty_val: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
964 match Runtime::runtime() {
965 None => Self(Arc::new(InputHandleInternal::new(0..1, empty_val))),
966 Some(runtime) => {
967 let input_id = runtime.sequence_next();
968
969 runtime
970 .local_store()
971 .entry(InputId::new(input_id))
972 .or_insert_with(|| {
973 Self(Arc::new(InputHandleInternal::new(
974 runtime.layout().local_workers(),
975 empty_val,
976 )))
977 })
978 .value()
979 .clone()
980 }
981 }
982 }
983
984 /// Returns the mailbox for the given `worker`.
985 ///
986 /// A `worker` of 0 is the first worker on the local host.
987 fn mailbox(&self, worker: usize) -> &Mailbox<T> {
988 self.0.mailbox(worker)
989 }
990
991 /// Write value `v` to the specified worker's mailbox,
992 /// overwriting any previous value in the mailbox.
993 ///
994 /// A `worker` of 0 is the first worker on the local host.
995 pub fn set_for_worker(&self, worker: usize, v: T) {
996 self.0.set_for_worker(worker, v);
997 }
998
999 /// Mutate the contents of the specified worker's mailbox
1000 /// using closure `f`.
1001 ///
1002 /// A `worker` of 0 is the first worker on the local host.
1003 pub fn update_for_worker<F>(&self, worker: usize, f: F)
1004 where
1005 F: FnOnce(&mut T),
1006 {
1007 self.0.update_for_worker(worker, f);
1008 }
1009
1010 /// Write value `v` to all worker mailboxes.
1011 pub fn set_for_all(&self, v: T) {
1012 self.0.set_for_all(v);
1013 }
1014
1015 pub fn clear_for_all(&self) {
1016 self.0.clear_for_all();
1017 }
1018}
1019
1020/// Source operator that injects data received via `InputHandle` to the circuit.
1021///
1022/// ```text
1023/// ┌───────────────────┐
1024/// │Circuit │
1025/// │ │
1026/// ┌───────────┐ │ ┌─────┐ │
1027/// │InputHandle├─────────►│Input├─────► │
1028/// └───────────┘ │ └─────┘ │
1029/// │ │
1030/// └───────────────────┘
1031/// ```
1032pub struct Input<IT, OT, F> {
1033 location: &'static Location<'static>,
1034 mailbox: Mailbox<IT>,
1035 input_func: F,
1036 phantom: PhantomData<OT>,
1037}
1038
1039impl<IT, OT, F> Input<IT, OT, F>
1040where
1041 IT: Clone + Send + 'static,
1042{
1043 pub fn new(
1044 location: &'static Location<'static>,
1045 input_func: F,
1046 default: Arc<dyn Fn() -> IT + Send + Sync>,
1047 ) -> (Self, InputHandle<IT>) {
1048 let handle = InputHandle::new(default);
1049 let mailbox = handle.mailbox(Runtime::local_worker_offset()).clone();
1050
1051 let input = Self {
1052 location,
1053 mailbox,
1054 input_func,
1055 phantom: PhantomData,
1056 };
1057
1058 (input, handle)
1059 }
1060}
1061
1062impl<IT, OT, F> Operator for Input<IT, OT, F>
1063where
1064 IT: 'static,
1065 OT: 'static,
1066 F: 'static,
1067{
1068 fn name(&self) -> Cow<'static, str> {
1069 Cow::from("Input")
1070 }
1071
1072 fn is_input(&self) -> bool {
1073 true
1074 }
1075
1076 fn location(&self) -> OperatorLocation {
1077 Some(self.location)
1078 }
1079
1080 fn fixedpoint(&self, _scope: Scope) -> bool {
1081 false
1082 }
1083}
1084
1085impl<IT, OT, F> SourceOperator<OT> for Input<IT, OT, F>
1086where
1087 IT: Clone + Debug + 'static,
1088 OT: 'static,
1089 F: Fn(IT) -> OT + 'static,
1090{
1091 async fn eval(&mut self) -> OT {
1092 let v = self.mailbox.take();
1093 (self.input_func)(v)
1094 }
1095}