Skip to main content

dbsp/operator/
input.rs

1use crate::{
2    Circuit, DBData, DynZWeight, RootCircuit, Runtime, Stream, TypedBox, ZWeight,
3    circuit::{
4        LocalStoreMarker, Scope,
5        metadata::OperatorLocation,
6        operator_traits::{Operator, SourceOperator},
7    },
8    dynamic::{DowncastTrait, DynBool, DynData, DynPair, DynPairs, DynUnit, Erase, LeanVec},
9    operator::dynamic::{
10        input::{
11            AddInputIndexedZSetFactories, AddInputMapFactories, AddInputMapWithWaterlineFactories,
12            AddInputSetFactories, AddInputZSetFactories, CollectionHandle, UpsertHandle,
13        },
14        input_upsert::DynUpdate,
15    },
16    typed_batch::{OrdIndexedZSet, OrdZSet},
17    utils::Tup2,
18};
19use std::{
20    borrow::{Borrow, Cow},
21    collections::VecDeque,
22    fmt::Debug,
23    hash::{Hash, Hasher},
24    marker::PhantomData,
25    mem::{replace, take, transmute},
26    ops::{Deref, Range},
27    panic::Location,
28    sync::{Arc, Mutex},
29};
30use typedmap::TypedMapKey;
31
32pub use crate::operator::dynamic::input_upsert::{PatchFunc, Update};
33
34pub type IndexedZSetStream<K, V> = Stream<RootCircuit, OrdIndexedZSet<K, V>>;
35pub type ZSetStream<K> = Stream<RootCircuit, OrdZSet<K>>;
36
37/// Input prepared for flushing into an input handle.
38///
39/// [ZSetHandle], [IndexedZSetHandle], [SetHandle], and [MapHandle] all support
40/// similar ways to push data into a circuit.  The following discussion just
41/// talks about `ZSetHandle`, for clarity.
42///
43/// There are two ways to push data into a circuit with [ZSetHandle]:
44///
45/// - Immediately, either one data point at a time with
46///   [push](ZSetHandle::push), or a vector at a time with, e.g.,
47///   [append](ZSetHandle::append).
48///
49/// - Preparing data in advance into [StagedBuffers] using
50///   `stage`.  Then, later, calling [StagedBuffers::flush] pushes
51///   the input buffers into the circuit.
52///
53/// Both approaches are equally correct.  They can differ in performance,
54/// because [push](ZSetHandle::push) and [append](ZSetHandle::append) have a
55/// significant cost for a large number of records.  Using [StagedBuffers] has a
56/// similar cost, but it incurs it in the call to `stage` rather
57/// than in [StagedBuffers::flush].  This means that, if the code driving the
58/// circuit can buffer data ahead of the circuit's demand for it, the cost can
59/// be hidden and data processing as a whole runs faster.
60pub trait StagedBuffers: Send + Sync {
61    /// Flushes the data gathered into this buffer to the circuit.
62    fn flush(&mut self);
63}
64
65pub struct ZSetStagedBuffers {
66    input_handle: InputHandle<Vec<Box<DynPairs<DynPair<DynData, DynUnit>, DynZWeight>>>>,
67    vals: Vec<Box<DynPairs<DynPair<DynData, DynUnit>, DynZWeight>>>,
68}
69
70impl StagedBuffers for ZSetStagedBuffers {
71    fn flush(&mut self) {
72        for (worker, vals) in self.vals.drain(..).enumerate() {
73            self.input_handle.update_for_worker(worker, |tuples| {
74                tuples.push(vals);
75            });
76        }
77    }
78}
79
80#[repr(transparent)]
81pub struct ZSetHandle<K> {
82    handle: CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>,
83    phantom: PhantomData<fn(&K)>,
84}
85
86impl<K> Clone for ZSetHandle<K> {
87    fn clone(&self) -> Self {
88        Self {
89            handle: self.handle.clone(),
90            phantom: PhantomData,
91        }
92    }
93}
94
95impl<K> Deref for ZSetHandle<K> {
96    type Target = CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>;
97
98    fn deref(&self) -> &Self::Target {
99        &self.handle
100    }
101}
102
103impl<K> ZSetHandle<K>
104where
105    K: DBData,
106{
107    fn new(handle: CollectionHandle<DynPair<DynData, DynUnit>, DynZWeight>) -> Self {
108        Self {
109            handle,
110            phantom: PhantomData,
111        }
112    }
113
114    pub fn push(&self, k: K, mut w: ZWeight) {
115        self.handle.dyn_push(Tup2(k, ()).erase_mut(), w.erase_mut())
116    }
117
118    pub fn append(&self, vals: &mut Vec<Tup2<K, ZWeight>>) {
119        // SAFETY: `()` is a zero-sized type, more precisely it's a 1-ZST.
120        // According to the Rust spec adding it to a tuple doesn't change
121        // its memory layout.
122        let vals: &mut Vec<Tup2<Tup2<K, ()>, ZWeight>> = unsafe { transmute(vals) };
123        let vals = Box::new(LeanVec::from(take(vals)));
124
125        self.handle.dyn_append(&mut vals.erase_box())
126    }
127
128    pub fn stage(
129        &self,
130        buffers: impl IntoIterator<Item = VecDeque<Tup2<K, ZWeight>>>,
131    ) -> ZSetStagedBuffers {
132        let num_partitions = self.handle.num_partitions();
133        let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
134        let mut next_worker = 0;
135        for vals in buffers {
136            let mut vec = Vec::from(vals);
137            // SAFETY: `()` is a zero-sized type, more precisely it's a 1-ZST.
138            // According to the Rust spec adding it to a tuple doesn't change
139            // its memory layout.
140            let vals: &mut Vec<Tup2<Tup2<K, ()>, ZWeight>> = unsafe { transmute(&mut vec) };
141            let vals = Box::new(LeanVec::from(take(vals)));
142
143            self.handle
144                .dyn_stage(&mut vals.erase_box(), &mut next_worker, &mut partitions);
145        }
146        ZSetStagedBuffers {
147            input_handle: self.handle.input_handle.clone(),
148            vals: partitions,
149        }
150    }
151}
152
153pub struct IndexedZSetStagedBuffers {
154    input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynPair<DynData, DynZWeight>>>>>,
155    vals: Vec<Box<DynPairs<DynData, DynPair<DynData, DynZWeight>>>>,
156}
157
158impl StagedBuffers for IndexedZSetStagedBuffers {
159    fn flush(&mut self) {
160        for (worker, vals) in self.vals.drain(..).enumerate() {
161            self.input_handle.update_for_worker(worker, |tuples| {
162                tuples.push(vals);
163            });
164        }
165    }
166}
167
168#[derive(Clone)]
169#[repr(transparent)]
170pub struct IndexedZSetHandle<K, V> {
171    handle: CollectionHandle<DynData, DynPair<DynData, DynZWeight>>,
172    phantom: PhantomData<fn(&K, &V)>,
173}
174
175impl<K, V> IndexedZSetHandle<K, V>
176where
177    K: DBData,
178    V: DBData,
179{
180    fn new(handle: CollectionHandle<DynData, DynPair<DynData, DynZWeight>>) -> Self {
181        Self {
182            handle,
183            phantom: PhantomData,
184        }
185    }
186
187    pub fn push(&self, mut k: K, (v, w): (V, ZWeight)) {
188        self.handle.dyn_push(k.erase_mut(), Tup2(v, w).erase_mut())
189    }
190
191    pub fn append(&self, vals: &mut Vec<Tup2<K, Tup2<V, ZWeight>>>) {
192        let vals = Box::new(LeanVec::from(take(vals)));
193        self.handle.dyn_append(&mut vals.erase_box())
194    }
195}
196
197pub struct SetStagedBuffers {
198    input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynBool>>>>,
199    vals: Vec<Box<DynPairs<DynData, DynBool>>>,
200}
201
202impl StagedBuffers for SetStagedBuffers {
203    fn flush(&mut self) {
204        for (worker, vals) in self.vals.drain(..).enumerate() {
205            self.input_handle.update_for_worker(worker, |tuples| {
206                tuples.push(vals);
207            });
208        }
209    }
210}
211
212#[repr(transparent)]
213pub struct SetHandle<K> {
214    handle: UpsertHandle<DynData, DynBool>,
215    phantom: PhantomData<fn(&K)>,
216}
217
218impl<K> Clone for SetHandle<K> {
219    fn clone(&self) -> Self {
220        Self {
221            handle: self.handle.clone(),
222            phantom: PhantomData,
223        }
224    }
225}
226
227impl<K> SetHandle<K>
228where
229    K: DBData,
230{
231    fn new(handle: UpsertHandle<DynData, DynBool>) -> Self {
232        Self {
233            handle,
234            phantom: PhantomData,
235        }
236    }
237
238    pub fn push(&self, mut k: K, mut v: bool) {
239        self.handle.dyn_push(k.erase_mut(), v.erase_mut())
240    }
241
242    pub fn append(&mut self, vals: &mut Vec<Tup2<K, bool>>) {
243        let vals = Box::new(LeanVec::from(take(vals)));
244        self.handle.dyn_append(&mut vals.erase_box())
245    }
246
247    pub fn stage(
248        &self,
249        buffers: impl IntoIterator<Item = VecDeque<Tup2<K, bool>>>,
250    ) -> SetStagedBuffers {
251        let num_partitions = self.handle.num_partitions();
252        let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
253        for vals in buffers {
254            let vec = Vec::from(vals);
255            let vals = Box::new(LeanVec::from(vec));
256            self.handle
257                .dyn_stage(&mut vals.erase_box(), &mut partitions);
258        }
259        SetStagedBuffers {
260            input_handle: self.handle.input_handle.clone(),
261            vals: partitions,
262        }
263    }
264}
265
266pub struct MapStagedBuffers {
267    input_handle: InputHandle<Vec<Box<DynPairs<DynData, DynUpdate<DynData, DynData>>>>>,
268    vals: Vec<Box<DynPairs<DynData, DynUpdate<DynData, DynData>>>>,
269}
270
271impl StagedBuffers for MapStagedBuffers {
272    fn flush(&mut self) {
273        for (worker, vals) in self.vals.drain(..).enumerate() {
274            self.input_handle.update_for_worker(worker, |tuples| {
275                tuples.push(vals);
276            });
277        }
278    }
279}
280
281#[repr(transparent)]
282pub struct MapHandle<K, V, U> {
283    handle: UpsertHandle<DynData, DynUpdate<DynData, DynData>>,
284    phantom: PhantomData<fn(&K, &V, &U)>,
285}
286
287impl<K, V, U> Clone for MapHandle<K, V, U> {
288    fn clone(&self) -> Self {
289        Self {
290            handle: self.handle.clone(),
291            phantom: PhantomData,
292        }
293    }
294}
295
296impl<K, V, U> MapHandle<K, V, U>
297where
298    K: DBData,
299    V: DBData,
300    U: DBData,
301{
302    fn new(handle: UpsertHandle<DynData, DynUpdate<DynData, DynData>>) -> Self {
303        Self {
304            handle,
305            phantom: PhantomData,
306        }
307    }
308
309    pub fn push(&self, mut k: K, mut upd: Update<V, U>) {
310        self.handle.dyn_push(k.erase_mut(), upd.erase_mut())
311    }
312
313    pub fn append(&mut self, vals: &mut Vec<Tup2<K, Update<V, U>>>) {
314        let vals = Box::new(LeanVec::from(take(vals)));
315        self.handle.dyn_append(&mut vals.erase_box())
316    }
317
318    pub fn stage(
319        &self,
320        buffers: impl IntoIterator<Item = VecDeque<Tup2<K, Update<V, U>>>>,
321    ) -> MapStagedBuffers {
322        let num_partitions = self.handle.num_partitions();
323        let mut partitions = vec![self.handle.pairs_factory.default_box(); num_partitions];
324        for vals in buffers {
325            let vec = Vec::from(vals);
326            let vals = Box::new(LeanVec::from(vec));
327            self.handle
328                .dyn_stage(&mut vals.erase_box(), &mut partitions);
329        }
330        MapStagedBuffers {
331            input_handle: self.handle.input_handle.clone(),
332            vals: partitions,
333        }
334    }
335}
336
337impl RootCircuit {
338    /// Create an input stream that carries values of type `T`.
339    ///
340    /// Input streams are used to push data to the circuit from the outside
341    /// world via the [`InputHandle`] object returned by this method:
342    ///
343    /// ```text
344    ///                   ┌──────────────────────┐
345    ///                   │Circuit               │
346    ///                   │                      │
347    /// ┌───────────┐     │   stream             │
348    /// │InputHandle├──────────────────►         │
349    /// └───────────┘     │                      │
350    ///                   │                      │
351    ///                   └──────────────────────┘
352    /// ```
353    ///
354    /// At each clock cycle, the stream consumes the last value placed in it via
355    /// the `InputHandle` (or `<T as Default>::default()` if no value was
356    /// placed in the stream since the last clock cycle) and yields this
357    /// value to all downstream operators connected to it.
358    ///
359    /// See [`InputHandle`] for more details.
360    #[track_caller]
361    pub fn add_input_stream<T>(&self) -> (Stream<Self, T>, InputHandle<T>)
362    where
363        T: Default + Debug + Clone + Send + 'static,
364    {
365        let (input, input_handle) =
366            Input::new(Location::caller(), |x| x, Arc::new(|| Default::default()));
367        let stream = self.add_source(input);
368        (stream, input_handle)
369    }
370
371    /// Create an input stream that carries values of type
372    /// [`OrdZSet<K>`](`OrdZSet`).
373    ///
374    /// Creates an input stream that carries values of type `OrdZSet<K>` and
375    /// an input handle of type [`ZSetHandle<K>`](`ZSetHandle`)
376    /// used to construct input Z-sets out of individual elements.  The
377    /// client invokes [`ZSetHandle::push`] and
378    /// [`ZSetHandle::append`] any number of times to add values to
379    /// the input Z-set. These values are distributed across all worker
380    /// threads (when running in a multithreaded [`Runtime`]) in a round-robin
381    /// fashion and buffered until the start of the next clock
382    /// cycle.  At the start of a clock cycle (triggered by
383    /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
384    /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), the circuit
385    /// reads all buffered values and assembles them into an `OrdZSet`.
386    ///
387    /// See [`CollectionHandle`] for more details.
388    #[track_caller]
389    pub fn add_input_zset<K>(&self) -> (Stream<RootCircuit, OrdZSet<K>>, ZSetHandle<K>)
390    where
391        K: DBData,
392    {
393        let factories = AddInputZSetFactories::new::<K>();
394        let (stream, handle) = self.dyn_add_input_zset_mono(&factories);
395
396        (stream.typed(), ZSetHandle::new(handle))
397    }
398
399    /// Create an input stream that carries values of type
400    /// [`OrdIndexedZSet<K, V>`](`OrdIndexedZSet`).
401    ///
402    /// Creates an input stream that carries values of type `OrdIndexedZSet<K, V>`
403    /// and an input handle of type [`IndexedZSetHandle<K, V>`](`IndexedZSetHandle`)
404    /// used to construct input Z-sets out of individual elements.  The client
405    /// invokes [`IndexedZSetHandle::push`] and [`IndexedZSetHandle::append`] any number
406    /// of times to add `key/value/weight` triples to the indexed Z-set. These triples
407    /// are distributed across all worker threads (when running in a
408    /// multithreaded [`Runtime`]) in a round-robin fashion, and
409    /// buffered until the start of the next clock cycle.  At the start of a
410    /// clock cycle (triggered by
411    /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
412    /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), the circuit
413    /// reads all buffered values and assembles them into an `OrdIndexedZSet`.
414    ///
415    /// See [`CollectionHandle`] for more details.
416    #[allow(clippy::type_complexity)]
417    #[track_caller]
418    pub fn add_input_indexed_zset<K, V>(
419        &self,
420    ) -> (
421        Stream<RootCircuit, OrdIndexedZSet<K, V>>,
422        IndexedZSetHandle<K, V>,
423    )
424    where
425        K: DBData,
426        V: DBData,
427    {
428        let factories = AddInputIndexedZSetFactories::new::<K, V>();
429        let (stream, handle) = self.dyn_add_input_indexed_zset_mono(&factories);
430
431        (stream.typed(), IndexedZSetHandle::new(handle))
432    }
433
434    /// Create an input table with set semantics.
435    ///
436    /// # Motivation
437    ///
438    /// DBSP represents relational data using Z-sets, i.e., tables where each
439    /// record has a weight, which denotes the number of times the record occurs
440    /// in the table.  Updates to Z-sets are also Z-sets, with
441    /// positive weights representing insertions and negative weights
442    /// representing deletions.  The contents of the Z-set after an update
443    /// is computed by summing up the weights associated with each record.
444    /// Z-set updates are commutative, e.g., insert->insert->delete and
445    /// insert->delete->insert sequences are both equivalent to a single
446    /// insert.  This internal representation enables efficient incremental
447    /// computation, but it does not always match the data model used by the
448    /// outside world, and may require a translation layer to eliminate this
449    /// mismatch when ingesting data into DBSP.
450    ///
451    /// In particular, input tables often behave as sets.  A set is a special
452    /// case of a Z-set where all weights are equal to 1.  Duplicate
453    /// insertions and deletions to sets are ignored, i.e., inserting an
454    /// existing element or deleting an element not in the set are both
455    /// no-ops.  Set updates are not commutative, e.g., the
456    /// insert->delete->insert sequence is equivalent to a single insert,
457    /// while insert->insert->delete is equivalent to a delete.
458    ///
459    /// # Details
460    ///
461    /// The `add_input_set` operator creates an input table that internally
462    /// appears as a Z-set with unit weights, but that ingests input data
463    /// using set semantics. It returns a stream that carries values of type
464    /// `OrdZSet<K, R>` and an input handle of type
465    /// [`SetHandle<K>`](`SetHandle`).  The client uses
466    /// [`SetHandle::push`] and [`SetHandle::append`] to submit
467    /// commands of the form `(val, true)` to insert an element to the set
468    /// and `(val, false) ` to delete `val` from the set.  These commands
469    /// are buffered until the start of the next clock cycle.
470    ///
471    /// At the start of a clock cycle (triggered by
472    /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
473    /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)), DBSP applies
474    /// buffered commands in order and computes an update to the input set as
475    /// an `OrdZSet` with weights `+1` and `-1` representing set insertions and
476    /// deletions respectively. The following table illustrates the
477    /// relationship between input commands, the contents of the set and the
478    /// contents of the stream produced by this operator:
479    ///
480    /// ```text
481    /// time │      input commands          │content of the   │ stream returned by     │  comment
482    ///      │                              │input set        │ `add_input_set`        │
483    /// ─────┼──────────────────────────────┼─────────────────┼────────────────────────┼───────────────────────────────────────────────────────
484    ///    1 │{("foo",true),("bar",true)}   │  {"foo","bar"}  │ {("foo",+1),("bar",+1)}│
485    ///    2 │{("foo",true),("bar",false)}  │  {"foo"}        │ {("bar",-1)}           │ignore duplicate insert of "foo"
486    ///    3 │{("foo",false),("foo",true)}  │  {"foo"}        │ {}                     │deleting and re-inserting "foo" is a no-op
487    ///    4 │{("foo",false),("bar",false)} │  {}             │ {("foo",-1)}           │deleting value "bar" that is not in the set is a no-op
488    /// ─────┴──────────────────────────────┴─────────────────┴────────────────────────┴────────────────────────────────────────────────────────
489    /// ```
490    ///
491    /// Internally, this operator maintains the contents of the input set
492    /// partitioned across all worker threads based on the hash of the
493    /// value.  Insert/delete commands are routed to the worker in charge of
494    /// the given value.
495    ///
496    /// # Data retention
497    ///
498    /// Applying [`Stream::integrate_trace_retain_keys`], and
499    /// [`Stream::integrate_trace_with_bound`] methods to the stream has the
500    /// additional effect of filtering out all values that don't satisfy the
501    /// retention policy configured by these methods from the stream.
502    /// Specifically, retention conditions configured at logical time `t`
503    /// are applied starting from logical time `t+1`.
504    // TODO: Add a version that takes a custom hash function.
505    #[track_caller]
506    pub fn add_input_set<K>(&self) -> (Stream<RootCircuit, OrdZSet<K>>, SetHandle<K>)
507    where
508        K: DBData,
509    {
510        let factories = AddInputSetFactories::new::<K>();
511        let (stream, handle) = self.dyn_add_input_set_mono(None, &factories);
512
513        (stream.typed(), SetHandle::new(handle))
514    }
515
516    /// Create an input table as a key-value map with upsert update semantics.
517    ///
518    /// # Motivation
519    ///
520    /// DBSP represents indexed data using indexed Z-sets, i.e., sets
521    /// of `(key, value, weight)` tuples, where `weight`
522    /// denotes the number of times the key-value pair occurs in
523    /// the table.  Updates to indexed Z-sets are also indexed Z-sets, with
524    /// positive weights representing insertions and negative weights
525    /// representing deletions.  The contents of the indexed Z-set after an
526    /// update is computed by summing up weights associated with each
527    /// key-value pair. This representation enables efficient incremental
528    /// computation, but it does not always match the data model used by the
529    /// outside world, and may require a translation layer to eliminate this
530    /// mismatch when ingesting indexed data into DBSP.
531    ///
532    /// In particular, input tables often behave as key-value maps.
533    /// A map is a special case of an indexed Z-set where each key has
534    /// a unique value associated with it and where all weights are 1.
535    /// Map updates follow the update-or-insert (*upsert*) semantics,
536    /// where inserting a new key-value pair overwrites the old value
537    /// associated with the key, if any.
538    ///
539    /// # Details
540    ///
541    /// The `add_input_map` operator creates an input table that internally
542    /// appears as an indexed Z-set with all unit weights, but that ingests
543    /// input data using upsert semantics. It returns a stream that carries
544    /// values of type `OrdIndexedZSet<K, V, R>` and an input handle of type
545    /// [`MapHandle<K, V>`](`MapHandle`).  The client uses
546    /// [`MapHandle::push`] and [`MapHandle::append`] to submit
547    /// commands of the form `(key, Update::Insert(val))` to insert a new
548    /// key-value pair, `(key, Update::Delete)` to delete the value
549    /// associated with `key`, and `(key, Update::Update)` to modify the
550    /// values associated with `key`, if it exists. These commands are
551    /// buffered until the start of the next clock cycle.
552    ///
553    /// At the start of a clock cycle (triggered by
554    /// [`DBSPHandle::step`](`crate::DBSPHandle::step`) or
555    /// [`CircuitHandle::step`](`crate::CircuitHandle::step`)),
556    /// DBSP applies buffered commands in order and
557    /// computes an update to the input set as an `OrdIndexedZSet` with weights
558    /// `+1` and `-1` representing insertions and deletions respectively.
559    /// The following table illustrates the relationship between input commands,
560    /// the contents of the map and the contents of the stream produced by this
561    /// operator:
562    ///
563    /// ```text
564    /// time │      input commands                     │content of the        │ stream returned by         │  comment
565    ///      │                                         │input map             │ `add_input_map`            │
566    /// ─────┼─────────────────────────────────────────┼──────────────────────┼────────────────────────────┼───────────────────────────────────────────────────────
567    ///    1 │{(1,Insert("foo"), (2,Insert("bar"))}    │{(1,"foo"),(2,"bar")} │ {(1,"foo",+1),(2,"bar",+1)}│
568    ///    2 │{(1,Insert("foo"), (2,Insert("baz"))}    │{(1,"foo"),(2,"baz")} │ {(2,"bar",-1),(2,"baz",+1)}│ Ignore duplicate insert of (1,"foo"). New value
569    ///      |                                         |                      |                            | "baz" for key 2 overwrites the old value "bar".
570    ///    3 │{(1,Delete),(2,Insert("bar")),(2,Delete)}│{}                    │ {(1,"foo",-1),(2,"baz",-1)}│ Delete both keys. Upsert (2,"bar") is overridden
571    ///      |                                         |                      |                            | by subsequent delete command.
572    ///    4 |{(1,Update("new")), (2,Update("bar"))}   |{(1,"foo")}           | {(1,"new")}                | Note that the second update is ignored because
573    ///      |                                         |                      |                            | key 2 is not present in the map.
574    /// ─────┴─────────────────────────────────────────┴──────────────────────┴────────────────────────────┴────────────────────────────────────────────────────────
575    /// ```
576    ///
577    /// Note that upsert commands cannot fail.  Duplicate inserts and deletes
578    /// are simply ignored.
579    ///
580    /// Internally, this operator maintains the contents of the map
581    /// partitioned across all worker threads based on the hash of the
582    /// key.  Upsert/delete commands are routed to the worker in charge of
583    /// the given key.
584    ///
585    /// # Data retention
586    ///
587    /// Applying the [`Stream::integrate_trace_retain_keys`] to the stream has the
588    /// additional effect of filtering out all updates that don't satisfy the
589    /// retention policy.
590    /// In particular, this means that attempts to overwrite, update, or delete
591    /// a key-value pair whose key doesn't satisfy current retention
592    /// conditions are ignored, since all these operations involve deleting
593    /// an existing tuple.
594    ///
595    /// Retention conditions configured at logical time `t`
596    /// are applied starting from logical time `t+1`.
597    ///
598    /// FIXME: see <https://github.com/feldera/feldera/issues/2669>
599    // TODO: Add a version that takes a custom hash function.
600    #[track_caller]
601    pub fn add_input_map<K, V, U, PF>(
602        &self,
603        patch_func: PF,
604    ) -> (
605        Stream<RootCircuit, OrdIndexedZSet<K, V>>,
606        MapHandle<K, V, U>,
607    )
608    where
609        K: DBData,
610        V: DBData,
611        U: DBData + Erase<DynData>,
612        PF: Fn(&mut V, &U) + 'static,
613    {
614        self.add_input_map_persistent(None, patch_func)
615    }
616
617    #[track_caller]
618    pub fn add_input_map_persistent<K, V, U, PF>(
619        &self,
620        persistent_id: Option<&str>,
621        patch_func: PF,
622    ) -> (
623        Stream<RootCircuit, OrdIndexedZSet<K, V>>,
624        MapHandle<K, V, U>,
625    )
626    where
627        K: DBData,
628        V: DBData,
629        U: DBData + Erase<DynData>,
630        PF: Fn(&mut V, &U) + 'static,
631    {
632        let factories = AddInputMapFactories::new::<K, V, U>();
633        let (stream, handle) = self.dyn_add_input_map_mono(
634            persistent_id,
635            &factories,
636            Box::new(move |v: &mut DynData, u: &DynData| unsafe {
637                patch_func(v.downcast_mut::<V>(), u.downcast::<U>())
638            }),
639        );
640
641        (stream.typed(), MapHandle::new(handle))
642    }
643
644    /// Like `add_input_map`, but additionally tracks a waterline of the input collection and
645    /// rejects inputs that are below the waterline.
646    ///
647    /// An input is rejected if the input record itself is below the waterline or if the existing
648    /// record it replaces is below the waterline.
649    ///
650    /// # Type arguments
651    ///
652    /// - `K`: The type of the key.
653    /// - `V`: The type of the value.
654    /// - `U`: The type that represents updates to values.
655    /// - `W`: The type of the waterline.
656    /// - `E`: The type of the error that is reported when the waterline is violated.
657    ///
658    /// # Arguments
659    ///
660    /// - `patch_func`: A function that applies the update to the value.
661    /// - `init`: A function that initializes the waterline.
662    /// - `extract_ts`: A function that extracts the timestamp from the key/value pair.
663    /// - `least_upper_bound`: A function that computes the least upper bound of two waterlines.
664    /// - `filter_func`: A function that filters out records below the waterline.
665    /// - `report_func`: A function that reports errors when the waterline is violated.
666    ///
667    /// # Returns
668    ///
669    /// - Stream of changes to the collection.
670    /// - Error stream that reports waterline violations.
671    /// - Stream of waterline values.
672    /// - Input handle that allows pushing updates to the collection.
673    ///
674    /// # Garbage collection
675    ///
676    /// This function supports waterlines over both key and values components of the tuple.
677    /// In case the waterline is applied to the key component, the internal index maintained
678    /// by this function can be GC'd by calling [`Stream::integrate_trace_retain_keys`]: on
679    /// the output stream returned by this function:
680    /// `stream.integrate_trace_retain_keys(&waterline, |key, wl| *key >= *wl)`, where
681    /// `waterline` is the stream of waterline values returned by this function.
682    #[track_caller]
683    pub fn add_input_map_with_waterline<K, V, U, W, E, PF, IF, WF, LB, FF, RF>(
684        &self,
685        patch_func: PF,
686        init: IF,
687        extract_ts: WF,
688        least_upper_bound: LB,
689        filter_func: FF,
690        report_func: RF,
691    ) -> (
692        Stream<RootCircuit, OrdIndexedZSet<K, V>>,
693        Stream<RootCircuit, OrdZSet<E>>,
694        Stream<RootCircuit, TypedBox<W, DynData>>,
695        MapHandle<K, V, U>,
696    )
697    where
698        K: DBData,
699        V: DBData,
700        U: DBData + Erase<DynData>,
701        W: DBData,
702        E: DBData,
703        PF: Fn(&mut V, &U) + 'static,
704        IF: Fn() -> W + 'static,
705        WF: Fn(&K, &V) -> W + 'static,
706        LB: Fn(&W, &W) -> W + Clone + 'static,
707        FF: Fn(&W, &K, &V) -> bool + 'static,
708        RF: Fn(&W, &K, &V, ZWeight) -> E + 'static,
709    {
710        self.add_input_map_with_waterline_persistent(
711            None,
712            patch_func,
713            init,
714            extract_ts,
715            least_upper_bound,
716            filter_func,
717            report_func,
718        )
719    }
720
721    #[allow(clippy::too_many_arguments)]
722    #[track_caller]
723    pub fn add_input_map_with_waterline_persistent<K, V, U, W, E, PF, IF, WF, LB, FF, RF>(
724        &self,
725        persistent_id: Option<&str>,
726        patch_func: PF,
727        init: IF,
728        extract_ts: WF,
729        least_upper_bound: LB,
730        filter_func: FF,
731        report_func: RF,
732    ) -> (
733        Stream<RootCircuit, OrdIndexedZSet<K, V>>,
734        Stream<RootCircuit, OrdZSet<E>>,
735        Stream<RootCircuit, TypedBox<W, DynData>>,
736        MapHandle<K, V, U>,
737    )
738    where
739        K: DBData,
740        V: DBData,
741        U: DBData + Erase<DynData>,
742        W: DBData + Erase<DynData>,
743        E: DBData + Erase<DynData>,
744        PF: Fn(&mut V, &U) + 'static,
745        IF: Fn() -> W + 'static,
746        WF: Fn(&K, &V) -> W + 'static,
747        LB: Fn(&W, &W) -> W + Clone + 'static,
748        FF: Fn(&W, &K, &V) -> bool + 'static,
749        RF: Fn(&W, &K, &V, ZWeight) -> E + 'static,
750    {
751        let factories = AddInputMapWithWaterlineFactories::new::<K, V, U, E>();
752        let (stream, errors, waterline, handle) = self.dyn_add_input_map_with_waterline_mono(
753            persistent_id,
754            &factories,
755            Box::new(move |v: &mut DynData, u: &DynData| unsafe {
756                patch_func(v.downcast_mut::<V>(), u.downcast::<U>())
757            }),
758            Box::new(move || Box::new(init())),
759            Box::new(move |k: &DynData, v: &DynData, ts: &mut DynData| {
760                let k = unsafe { k.downcast::<K>() };
761                let v = unsafe { v.downcast::<V>() };
762                let w = unsafe { ts.downcast_mut::<W>() };
763
764                *w = extract_ts(k, v);
765            }),
766            Box::new(move |a: &DynData, b: &DynData, ts: &mut DynData| {
767                let a = unsafe { a.downcast::<W>() };
768                let b = unsafe { b.downcast::<W>() };
769                let ts = unsafe { ts.downcast_mut::<W>() };
770                *ts = least_upper_bound(a, b)
771            }),
772            Box::new(move |wl: &DynData, k: &DynData, v: &DynData| {
773                let wl = unsafe { wl.downcast::<W>() };
774                let k = unsafe { k.downcast::<K>() };
775                let v = unsafe { v.downcast::<V>() };
776
777                filter_func(wl, k, v)
778            }),
779            Box::new(
780                move |wl: &DynData, k: &DynData, v: &DynData, w: ZWeight, err: &mut DynData| {
781                    let wl = unsafe { wl.downcast::<W>() };
782                    let k = unsafe { k.downcast::<K>() };
783                    let v = unsafe { v.downcast::<V>() };
784                    let err = unsafe { err.downcast_mut::<E>() };
785
786                    *err = report_func(wl, k, v, w);
787                },
788            ),
789        );
790
791        (
792            stream.typed(),
793            errors.typed(),
794            unsafe { waterline.typed_data() },
795            MapHandle::new(handle),
796        )
797    }
798}
799
800/// `TypedMapKey` entry used to share InputHandle objects across workers in a
801/// runtime. The first worker to create the handle will store it in the map,
802/// subsequent workers will get a clone of the same handle.
803struct InputId<T> {
804    id: usize,
805    _marker: PhantomData<T>,
806}
807
808unsafe impl<T> Sync for InputId<T> {}
809
810// Implement `Hash`, `Eq` manually to avoid `T: Hash` type bound.
811impl<T> Hash for InputId<T> {
812    fn hash<H>(&self, state: &mut H)
813    where
814        H: Hasher,
815    {
816        self.id.hash(state);
817    }
818}
819
820impl<T> PartialEq for InputId<T> {
821    fn eq(&self, other: &Self) -> bool {
822        self.id == other.id
823    }
824}
825
826impl<T> Eq for InputId<T> {}
827
828impl<T> InputId<T> {
829    fn new(id: usize) -> Self {
830        Self {
831            id,
832            _marker: PhantomData,
833        }
834    }
835}
836
837impl<T> TypedMapKey<LocalStoreMarker> for InputId<T>
838where
839    T: 'static,
840{
841    type Value = InputHandle<T>;
842}
843
844/// Mailbox that buffers data between the circuit and the outside world.
845/// It is used inside an `InputHandle` to store data sent to a worker
846/// thread and inside an `OutputHandle` to store data sent by a worker
847/// thread to the outside world.
848#[derive(Clone)]
849pub(crate) struct Mailbox<T> {
850    empty: Arc<dyn Fn() -> T + Send + Sync>,
851    value: Arc<Mutex<T>>,
852}
853
854impl<T: Clone> Mailbox<T> {
855    pub(in crate::operator) fn new(empty: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
856        let v = empty();
857        Self {
858            empty,
859            value: Arc::new(Mutex::new(v)),
860        }
861    }
862
863    pub(in crate::operator) fn take(&self) -> T {
864        replace(&mut *self.value.lock().unwrap(), (self.empty)())
865    }
866
867    pub(super) fn map<F, O: 'static>(&self, func: F) -> O
868    where
869        F: Fn(&T) -> O,
870    {
871        func(self.value.lock().unwrap().borrow())
872    }
873
874    fn update<F>(&self, f: F)
875    where
876        F: FnOnce(&mut T),
877    {
878        f(&mut *self.value.lock().unwrap());
879    }
880
881    pub(in crate::operator) fn set(&self, v: T) {
882        *self.value.lock().unwrap() = v;
883    }
884
885    pub(in crate::operator) fn clear(&self) {
886        *self.value.lock().unwrap() = (self.empty)();
887    }
888}
889
890pub(crate) struct InputHandleInternal<T> {
891    pub(crate) mailbox: Vec<Mailbox<T>>,
892}
893
894impl<T> InputHandleInternal<T>
895where
896    T: Clone,
897{
898    // Returns a new `InputHandleInternal` for workers with indexes in the range
899    // of `workers`.
900    fn new(workers: Range<usize>, empty_val: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
901        assert!(!workers.is_empty());
902        Self {
903            mailbox: workers
904                .clone()
905                .map(move |_| Mailbox::new(empty_val.clone()))
906                .collect(),
907        }
908    }
909
910    fn set_for_worker(&self, worker: usize, v: T) {
911        self.mailbox(worker).set(v);
912    }
913
914    fn update_for_worker<F>(&self, worker: usize, f: F)
915    where
916        F: FnOnce(&mut T),
917    {
918        self.mailbox(worker).update(f);
919    }
920
921    /// Send the same value to all workers.
922    fn set_for_all(&self, v: T) {
923        for i in 0..self.mailbox.len() - 1 {
924            self.mailbox[i].set(v.clone());
925        }
926        self.mailbox[self.mailbox.len() - 1].set(v);
927    }
928
929    fn clear_for_all(&self) {
930        for mailbox in self.mailbox.iter() {
931            mailbox.clear();
932        }
933    }
934
935    fn mailbox(&self, worker: usize) -> &Mailbox<T> {
936        &self.mailbox[worker]
937    }
938}
939
940/// A handle used to write data to an input stream created by
941/// the [`RootCircuit::add_input_stream`] method.
942///
943/// Internally, the handle manages an array of mailboxes, one for
944/// each worker thread.  At the start of each clock cycle, the
945/// circuit reads the current value from each mailbox and writes
946/// it to the input stream associated with the handle, leaving
947/// the mailbox empty (more precisely, the mailbox will contain
948/// `T::default()`).  The handle is then used to write new values
949/// to the mailboxes, which will be consumed at the next
950/// logical clock tick.
951pub struct InputHandle<T>(pub(crate) Arc<InputHandleInternal<T>>);
952
953impl<T> Clone for InputHandle<T> {
954    fn clone(&self) -> Self {
955        Self(self.0.clone())
956    }
957}
958
959impl<T> InputHandle<T>
960where
961    T: Send + Clone + 'static,
962{
963    fn new(empty_val: Arc<dyn Fn() -> T + Send + Sync>) -> Self {
964        match Runtime::runtime() {
965            None => Self(Arc::new(InputHandleInternal::new(0..1, empty_val))),
966            Some(runtime) => {
967                let input_id = runtime.sequence_next();
968
969                runtime
970                    .local_store()
971                    .entry(InputId::new(input_id))
972                    .or_insert_with(|| {
973                        Self(Arc::new(InputHandleInternal::new(
974                            runtime.layout().local_workers(),
975                            empty_val,
976                        )))
977                    })
978                    .value()
979                    .clone()
980            }
981        }
982    }
983
984    /// Returns the mailbox for the given `worker`.
985    ///
986    /// A `worker` of 0 is the first worker on the local host.
987    fn mailbox(&self, worker: usize) -> &Mailbox<T> {
988        self.0.mailbox(worker)
989    }
990
991    /// Write value `v` to the specified worker's mailbox,
992    /// overwriting any previous value in the mailbox.
993    ///
994    /// A `worker` of 0 is the first worker on the local host.
995    pub fn set_for_worker(&self, worker: usize, v: T) {
996        self.0.set_for_worker(worker, v);
997    }
998
999    /// Mutate the contents of the specified worker's mailbox
1000    /// using closure `f`.
1001    ///
1002    /// A `worker` of 0 is the first worker on the local host.
1003    pub fn update_for_worker<F>(&self, worker: usize, f: F)
1004    where
1005        F: FnOnce(&mut T),
1006    {
1007        self.0.update_for_worker(worker, f);
1008    }
1009
1010    /// Write value `v` to all worker mailboxes.
1011    pub fn set_for_all(&self, v: T) {
1012        self.0.set_for_all(v);
1013    }
1014
1015    pub fn clear_for_all(&self) {
1016        self.0.clear_for_all();
1017    }
1018}
1019
1020/// Source operator that injects data received via `InputHandle` to the circuit.
1021///
1022/// ```text
1023///                   ┌───────────────────┐
1024///                   │Circuit            │
1025///                   │                   │
1026/// ┌───────────┐     │    ┌─────┐        │
1027/// │InputHandle├─────────►│Input├─────►  │
1028/// └───────────┘     │    └─────┘        │
1029///                   │                   │
1030///                   └───────────────────┘
1031/// ```
1032pub struct Input<IT, OT, F> {
1033    location: &'static Location<'static>,
1034    mailbox: Mailbox<IT>,
1035    input_func: F,
1036    phantom: PhantomData<OT>,
1037}
1038
1039impl<IT, OT, F> Input<IT, OT, F>
1040where
1041    IT: Clone + Send + 'static,
1042{
1043    pub fn new(
1044        location: &'static Location<'static>,
1045        input_func: F,
1046        default: Arc<dyn Fn() -> IT + Send + Sync>,
1047    ) -> (Self, InputHandle<IT>) {
1048        let handle = InputHandle::new(default);
1049        let mailbox = handle.mailbox(Runtime::local_worker_offset()).clone();
1050
1051        let input = Self {
1052            location,
1053            mailbox,
1054            input_func,
1055            phantom: PhantomData,
1056        };
1057
1058        (input, handle)
1059    }
1060}
1061
1062impl<IT, OT, F> Operator for Input<IT, OT, F>
1063where
1064    IT: 'static,
1065    OT: 'static,
1066    F: 'static,
1067{
1068    fn name(&self) -> Cow<'static, str> {
1069        Cow::from("Input")
1070    }
1071
1072    fn is_input(&self) -> bool {
1073        true
1074    }
1075
1076    fn location(&self) -> OperatorLocation {
1077        Some(self.location)
1078    }
1079
1080    fn fixedpoint(&self, _scope: Scope) -> bool {
1081        false
1082    }
1083}
1084
1085impl<IT, OT, F> SourceOperator<OT> for Input<IT, OT, F>
1086where
1087    IT: Clone + Debug + 'static,
1088    OT: 'static,
1089    F: Fn(IT) -> OT + 'static,
1090{
1091    async fn eval(&mut self) -> OT {
1092        let v = self.mailbox.take();
1093        (self.input_func)(v)
1094    }
1095}