Skip to main content

feldera_adapterlib/
catalog.rs

1use std::any::Any;
2use std::collections::HashSet;
3use std::fmt::{Debug, Formatter};
4use std::sync::atomic::AtomicUsize;
5use std::sync::{Arc, Mutex};
6
7use anyhow::Result as AnyResult;
8#[cfg(feature = "with-avro")]
9use apache_avro::{
10    Schema as AvroSchema,
11    schema::{Name as AvroName, NamesRef},
12    types::Value as AvroValue,
13};
14use arrow::record_batch::RecordBatch;
15use dbsp::circuit::NodeId;
16use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Factory};
17use dbsp::operator::StagedBuffers;
18use dyn_clone::DynClone;
19use feldera_sqllib::Variant;
20use feldera_types::format::csv::CsvParserConfig;
21use feldera_types::format::json::JsonFlavor;
22use feldera_types::program_schema::{Relation, SqlIdentifier};
23use feldera_types::serde_with_context::SqlSerdeConfig;
24use serde_arrow::ArrayBuilder;
25#[cfg(feature = "with-avro")]
26use std::collections::HashMap;
27
28use crate::errors::controller::ControllerError;
29use crate::format::InputBuffer;
30use crate::preprocess::PreprocessorRegistry;
31
32/// Descriptor that specifies the format in which records are received
33/// or into which they should be encoded before sending.
34#[derive(Clone)]
35pub enum RecordFormat {
36    // TODO: Support different JSON encodings:
37    // * Map - the default encoding
38    // * Array - allow the subset and the order of columns to be configurable
39    // * Raw - Only applicable to single-column tables.  Input records contain
40    // raw encoding of this column only.  This is particularly useful for
41    // tables that store raw JSON or binary data to be parsed using SQL.
42    Json(JsonFlavor),
43    Csv(CsvParserConfig),
44    Parquet(SqlSerdeConfig),
45    #[cfg(feature = "with-avro")]
46    Avro,
47    Raw(String),
48}
49
50/// An input handle that deserializes and buffers records.
51///
52/// A trait for a type that wraps a [`ZSetHandle`](`dbsp::ZSetHandle`) or an
53/// [`MapHandle`](`dbsp::MapHandle`) and collects serialized relational data for
54/// the associated input stream.  The client passes a byte array with a
55/// serialized data record (e.g., in JSON or CSV format) to
56/// [`insert`](`Self::insert`), [`delete`](`Self::delete`), and
57/// [`update`](`Self::update`) methods. The record gets deserialized into the
58/// strongly typed representation expected by the input stream.
59///
60/// Instances of this trait are created by calling
61/// [`DeCollectionHandle::configure_deserializer`].
62/// The data format accepted by the handle is determined
63/// by the `record_format` argument passed to this method.
64///
65/// The input handle internally buffers the deserialized records. Use the
66/// `InputBuffer` supertrait to push them to the circuit or extract them for
67/// later use.
68pub trait DeCollectionStream: Send + Sync + InputBuffer {
69    /// Buffer a new insert update.
70    ///
71    /// `metadata` contains optional metadata attached by the transport adapter or parser,
72    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
73    /// `DeserializeWithContext::deserialize_with_context_aux`.
74    ///
75    /// Returns an error if deserialization fails, i.e., the serialized
76    /// representation is corrupted or does not match the value type of
77    /// the underlying input stream.
78    fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
79
80    /// Buffer a new delete update.
81    ///
82    /// The `data` argument contains a serialized record whose
83    /// type depends on the underlying input stream: streams created by
84    /// [`RootCircuit::add_input_zset`](`dbsp::RootCircuit::add_input_zset`)
85    /// and [`RootCircuit::add_input_set`](`dbsp::RootCircuit::add_input_set`)
86    /// methods support deletion by value, hence the serialized record must
87    /// match the value type of the stream.  Streams created with
88    /// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
89    /// support deletion by key, so the serialized record must match the key
90    /// type of the stream.
91    ///
92    /// The record gets deserialized and pushed to the underlying input stream
93    /// handle as a delete update.
94    ///
95    /// `metadata` contains optional metadata attached by the transport adapter or parser,
96    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
97    /// `DeserializeWithContext::deserialize_with_context_aux`.
98    ///
99    /// Returns an error if deserialization fails, i.e., the serialized
100    /// representation is corrupted or does not match the value or key
101    /// type of the underlying input stream.
102    fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
103
104    /// Buffer a new update that will modify an existing record.
105    ///
106    /// `metadata` contains optional metadata attached by the transport adapter or parser,
107    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
108    /// `DeserializeWithContext::deserialize_with_context_aux`.
109    ///
110    /// This method can only be called on streams created with
111    /// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
112    /// and will fail on other streams.  The serialized record must match
113    /// the update type of this stream, specified as a type argument to
114    /// `Catalog::register_input_map`.
115    fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
116
117    /// Reserve space for at least `reservation` more updates in the
118    /// internal input buffer.
119    ///
120    /// Reservations are not required but can be used when the number
121    /// of inputs is known ahead of time to reduce reallocations.
122    fn reserve(&mut self, reservation: usize);
123
124    /// Removes any updates beyond the first `len`.
125    fn truncate(&mut self, len: usize);
126
127    /// Stages all of the `buffers`, which must have been obtained from a
128    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
129    /// to push the collected data into the circuit.  See [StagedBuffers] for
130    /// more information.
131    ///
132    /// [Parser]: crate::format::Parser
133    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
134
135    /// Create a new deserializer with the same configuration connected to the
136    /// same input stream. The new deserializer has an independent buffer that
137    /// is initially empty.
138    fn fork(&self) -> Box<dyn DeCollectionStream>;
139}
140
141/// Like `DeCollectionStream`, but deserializes Arrow-encoded records before pushing them to a
142/// stream.
143pub trait ArrowStream: InputBuffer + Send + Sync {
144    /// Buffer a new batch of insert updates.
145    ///
146    /// `metadata` contains optional metadata attached by the transport adapter or parser,
147    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
148    /// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
149    fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
150
151    /// Buffer a new batch of delete updates.
152    ///
153    /// `metadata` contains optional metadata attached by the transport adapter or parser,
154    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
155    /// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
156    fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
157
158    /// Insert records in `data` with polarities from the `polarities` array.
159    ///
160    /// `polarities` must be the same length as `data`.
161    fn insert_with_polarities(
162        &mut self,
163        data: &RecordBatch,
164        polarities: &[bool],
165        metadata: &Option<Variant>,
166    ) -> AnyResult<()>;
167
168    /// Create a new deserializer with the same configuration connected to
169    /// the same input stream.
170    fn fork(&self) -> Box<dyn ArrowStream>;
171
172    /// Stages all of the `buffers`, which must have been obtained from a
173    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
174    /// to push the collected data into the circuit.  See [StagedBuffers] for
175    /// more information.
176    ///
177    /// [Parser]: crate::format::Parser
178    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
179}
180
181#[cfg(feature = "with-avro")]
182pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
183
184/// Like `DeCollectionStream`, but deserializes Avro-encoded records before pushing them to a
185/// stream.
186#[cfg(feature = "with-avro")]
187pub trait AvroStream: InputBuffer + Send + Sync {
188    /// Buffer a new insert update.
189    ///
190    /// # Arguments
191    ///
192    /// * `schema` - The Avro schema to use for deserialization.
193    /// * `refs` - A map of named schema references that may be used to resolve references within `schema`.
194    /// * `metadata` - Optional metadata attached by the transport adapter or parser,
195    ///   such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
196    ///   `DeserializeWithContext::deserialize_with_context_aux`.
197    fn insert(
198        &mut self,
199        data: &AvroValue,
200        schema: &AvroSchema,
201        refs: &AvroSchemaRefs,
202        n_bytes: usize,
203        metadata: &Option<Variant>,
204    ) -> AnyResult<()>;
205
206    fn delete(
207        &mut self,
208        data: &AvroValue,
209        schema: &AvroSchema,
210        refs: &AvroSchemaRefs,
211        n_bytes: usize,
212        metadata: &Option<Variant>,
213    ) -> AnyResult<()>;
214
215    /// Create a new deserializer with the same configuration connected to
216    /// the same input stream.
217    fn fork(&self) -> Box<dyn AvroStream>;
218
219    /// Stages all of the `buffers`, which must have been obtained from a
220    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
221    /// to push the collected data into the circuit.  See [StagedBuffers] for
222    /// more information.
223    ///
224    /// [Parser]: crate::format::Parser
225    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
226}
227
228/// A handle to an input collection that can be used to feed serialized data
229/// to the collection.
230pub trait DeCollectionHandle: Send + Sync {
231    /// Create a [`DeCollectionStream`] object to parse input data encoded
232    /// using the format specified in `RecordFormat`.
233    fn configure_deserializer(
234        &self,
235        record_format: RecordFormat,
236    ) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
237
238    /// Create an `ArrowStream` object to parse Arrow-encoded input data.
239    fn configure_arrow_deserializer(
240        &self,
241        config: SqlSerdeConfig,
242    ) -> Result<Box<dyn ArrowStream>, ControllerError>;
243
244    /// Create an `AvroStream` object to parse Avro-encoded input data.
245    #[cfg(feature = "with-avro")]
246    fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
247
248    fn fork(&self) -> Box<dyn DeCollectionHandle>;
249}
250
251/// A type-erased batch whose contents can be serialized.
252///
253/// This is a wrapper around the DBSP `BatchReader` trait that returns a cursor that
254/// yields `erased_serde::Serialize` trait objects that can be used to serialize
255/// the contents of the batch without knowing its key and value types.
256// The reason we need the `Sync` trait below is so that we can wrap batches
257// in `Arc` and send the same batch to multiple output endpoint threads.
258pub trait SerBatchReader: 'static + Send + Sync {
259    /// Number of keys in the batch.
260    fn key_count(&self) -> usize;
261
262    /// Number of tuples in the batch.
263    fn len(&self) -> usize;
264
265    fn is_empty(&self) -> bool {
266        self.len() == 0
267    }
268
269    /// Create a cursor over the batch that yields records
270    /// formatted using the specified format.
271    fn cursor<'a>(
272        &'a self,
273        record_format: RecordFormat,
274    ) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
275
276    /// Returns all batches in this reader.
277    ///
278    /// A reader can wrap a single batch or a spine or a spine snapshot. This method extracts
279    /// all batches from the reader.
280    fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
281
282    fn snapshot(&self) -> Arc<dyn SerBatchReader>;
283
284    fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
285
286    fn key_factory(&self) -> &'static dyn Factory<DynData>;
287
288    fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
289
290    fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
291}
292
293impl Debug for dyn SerBatchReader {
294    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
295        let mut cursor = self
296            .cursor(RecordFormat::Json(Default::default()))
297            .map_err(|_| std::fmt::Error)?;
298        let mut key = Vec::new();
299        let mut val = Vec::new();
300        while cursor.key_valid() {
301            cursor
302                .serialize_key(&mut key)
303                .map_err(|_| std::fmt::Error)?;
304            write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
305
306            while cursor.val_valid() {
307                cursor
308                    .serialize_val(&mut val)
309                    .map_err(|_| std::fmt::Error)?;
310                write!(
311                    f,
312                    "{}=>{}, ",
313                    String::from_utf8_lossy(&val),
314                    cursor.weight()
315                )?;
316
317                val.clear();
318                cursor.step_val();
319            }
320
321            write!(f, "}}, ")?;
322            key.clear();
323            cursor.step_key();
324        }
325
326        Ok(())
327    }
328}
329
330impl Debug for dyn SerBatch {
331    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332        self.as_batch_reader().fmt(f)
333    }
334}
335
336/// A type-erased `Batch`.
337pub trait SerBatch: SerBatchReader {
338    /// Convert to `Arc<Any>`, which can then be downcast to a reference
339    /// to a concrete batch type.
340    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
341
342    /// Merge `self` with all batches in `other`.
343    fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
344
345    fn as_batch_reader(&self) -> &dyn SerBatchReader;
346
347    fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
348
349    /// Convert batch into a trace with identical contents.
350    fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
351}
352
353/// A type-erased `Trace`.
354pub trait SerTrace: SerBatchReader {
355    /// Insert a batch into the trace.
356    fn insert(&mut self, batch: Arc<dyn SerBatch>);
357
358    fn as_batch_reader(&self) -> &dyn SerBatchReader;
359}
360
361#[doc(hidden)]
362pub struct SplitCursorBuilder {
363    batch: Arc<dyn SerBatchReader>,
364    start_key: Box<DynData>,
365    end_key: Option<Box<DynData>>,
366    format: RecordFormat,
367}
368
369impl SplitCursorBuilder {
370    /// Create a [`SplitCursorBuilder`] for partition `index` given a batch,
371    /// pre-computed partition `bounds` (as returned by
372    /// [`SerBatchReader::partition_keys`]), and a record `format`.
373    ///
374    /// `bounds` contains `N-1` boundary keys for `N` partitions.
375    /// Partition 0 spans from the start of the batch to `bounds[0]`,
376    /// partition `i` spans from `bounds[i-1]` to `bounds[i]`, and the last
377    /// partition spans from `bounds[N-2]` to the end of the batch.
378    ///
379    /// Returns `None` if the partition is empty (the cursor has no key at the
380    /// start position).
381    pub fn from_bounds(
382        batch: Arc<dyn SerBatchReader>,
383        bounds: &DynVec<DynData>,
384        index: usize,
385        format: RecordFormat,
386    ) -> Option<Self> {
387        let start_bound = if index == 0 {
388            None
389        } else if index <= bounds.len() {
390            Some(bounds.index(index - 1).as_data())
391        } else {
392            None
393        };
394
395        let end_bound = if index < bounds.len() {
396            Some(bounds.index(index).as_data())
397        } else {
398            None
399        };
400
401        let start_key = {
402            let mut cursor = batch.cursor(format.clone()).unwrap();
403
404            // Seek to start. If None, the cursor starts at the beginning.
405            if let Some(start_bound) = start_bound {
406                cursor.seek_key_exact(start_bound);
407            }
408
409            // Clone the actual key the cursor landed on.
410            cursor.get_key().map(|s| {
411                let mut key = batch.key_factory().default_box();
412                s.clone_to(key.as_mut());
413                key
414            })
415        }?;
416
417        let end_key = end_bound.map(|e| {
418            let mut key = batch.key_factory().default_box();
419            e.clone_to(key.as_mut());
420            key
421        });
422
423        Some(SplitCursorBuilder {
424            batch,
425            start_key,
426            end_key,
427            format,
428        })
429    }
430
431    pub fn build<'a>(&'a self) -> SplitCursor<'a> {
432        let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
433
434        // Cannot use `seek_key_exact` here, so we can single-step the cursor afterward.
435        cursor.seek_key(self.start_key.as_data());
436
437        SplitCursor {
438            cursor,
439            start_key: self.start_key.clone(),
440            end_key: self.end_key.clone(),
441        }
442    }
443}
444
445#[doc(hidden)]
446pub struct SplitCursor<'a> {
447    cursor: Box<dyn SerCursor + 'a>,
448    start_key: Box<DynData>,
449    end_key: Option<Box<DynData>>,
450}
451
452impl SplitCursor<'_> {
453    fn finished(&self) -> bool {
454        if let Some(ref end_key) = self.end_key
455            && let Some(current_key) = self.cursor.get_key()
456        {
457            return current_key >= end_key.as_data();
458        }
459
460        false
461    }
462}
463
464impl SerCursor for SplitCursor<'_> {
465    fn key_valid(&self) -> bool {
466        self.cursor.key_valid() && !self.finished()
467    }
468
469    fn val_valid(&self) -> bool {
470        self.cursor.val_valid()
471    }
472
473    fn key(&self) -> &DynData {
474        self.cursor.key()
475    }
476
477    fn get_key(&self) -> Option<&DynData> {
478        if !self.key_valid() {
479            return None;
480        }
481
482        self.cursor.get_key()
483    }
484
485    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
486        self.cursor.serialize_key(dst)
487    }
488
489    fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
490        self.cursor.key_to_json()
491    }
492
493    fn serialize_key_fields(
494        &mut self,
495        fields: &HashSet<String>,
496        dst: &mut Vec<u8>,
497    ) -> AnyResult<()> {
498        self.cursor.serialize_key_fields(fields, dst)
499    }
500
501    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
502        self.cursor.serialize_key_to_arrow(dst)
503    }
504
505    fn serialize_key_to_arrow_with_metadata(
506        &mut self,
507        metadata: &dyn erased_serde::Serialize,
508        dst: &mut ArrayBuilder,
509    ) -> AnyResult<()> {
510        self.cursor
511            .serialize_key_to_arrow_with_metadata(metadata, dst)
512    }
513
514    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
515        self.cursor.serialize_val_to_arrow(dst)
516    }
517
518    fn serialize_val_to_arrow_with_metadata(
519        &mut self,
520        metadata: &dyn erased_serde::Serialize,
521        dst: &mut ArrayBuilder,
522    ) -> AnyResult<()> {
523        self.cursor
524            .serialize_val_to_arrow_with_metadata(metadata, dst)
525    }
526
527    #[cfg(feature = "with-avro")]
528    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
529        self.cursor.key_to_avro(schema, refs)
530    }
531
532    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
533        self.cursor.serialize_key_weight(dst)
534    }
535
536    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
537        self.cursor.serialize_val(dst)
538    }
539
540    fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
541        self.cursor.val_to_json()
542    }
543
544    #[cfg(feature = "with-avro")]
545    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
546        self.cursor.val_to_avro(schema, refs)
547    }
548
549    fn weight(&mut self) -> i64 {
550        self.cursor.weight()
551    }
552
553    fn step_key(&mut self) {
554        self.cursor.step_key();
555    }
556
557    fn step_val(&mut self) {
558        self.cursor.step_val();
559    }
560
561    fn rewind_keys(&mut self) {
562        self.cursor.rewind_keys();
563        self.cursor.seek_key(self.start_key.as_data());
564    }
565
566    fn rewind_vals(&mut self) {
567        self.cursor.rewind_vals();
568    }
569
570    fn seek_key_exact(&mut self, key: &DynData) -> bool {
571        if let Some(ref end_key) = self.end_key
572            && key >= end_key.as_data()
573        {
574            return false;
575        }
576
577        self.cursor.seek_key_exact(key)
578    }
579
580    fn seek_key(&mut self, key: &DynData) {
581        self.cursor.seek_key(key);
582    }
583}
584
585/// Cursor that allows serializing the contents of a type-erased batch.
586///
587/// This is a wrapper around the DBSP `Cursor` trait that yields keys and values
588/// of the underlying batch as `erased_serde::Serialize` trait objects.
589pub trait SerCursor: Send {
590    /// Indicates if the current key is valid.
591    ///
592    /// A value of `false` indicates that the cursor has exhausted all keys.
593    fn key_valid(&self) -> bool;
594
595    /// Indicates if the current value is valid.
596    ///
597    /// A value of `false` indicates that the cursor has exhausted all values
598    /// for this key.
599    fn val_valid(&self) -> bool;
600
601    fn key(&self) -> &DynData;
602
603    fn get_key(&self) -> Option<&DynData>;
604
605    /// Serialize current key. Panics if invalid.
606    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
607
608    /// Convert key to JSON. Used for error reporting to generate a human-readable
609    /// representation of the key.
610    fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
611
612    /// Like `serialize_key`, but only serializes the specified fields of the key.
613    fn serialize_key_fields(
614        &mut self,
615        fields: &HashSet<String>,
616        dst: &mut Vec<u8>,
617    ) -> AnyResult<()>;
618
619    /// Serialize current key into arrow format. Panics if invalid.
620    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
621
622    /// Serialize current key into arrow format, adding additional metadata columns.
623    /// `metadata` must be a struct or a map.
624    fn serialize_key_to_arrow_with_metadata(
625        &mut self,
626        metadata: &dyn erased_serde::Serialize,
627        dst: &mut ArrayBuilder,
628    ) -> AnyResult<()>;
629
630    /// Serialize current value into arrow format. Panics if invalid.
631    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
632
633    /// Serialize current value into arrow format, adding additional metadata columns.
634    /// `metadata` must be a struct or a map.
635    fn serialize_val_to_arrow_with_metadata(
636        &mut self,
637        metadata: &dyn erased_serde::Serialize,
638        dst: &mut ArrayBuilder,
639    ) -> AnyResult<()>;
640
641    #[cfg(feature = "with-avro")]
642    /// Convert current key to an Avro value.
643    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
644
645    /// Serialize the `(key, weight)` tuple.
646    ///
647    /// FIXME: This only exists to support the CSV serializer, which outputs
648    /// key and weight in the same CSV record.
649    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
650
651    /// Serialize current value. Panics if invalid.
652    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
653
654    /// Convert value to JSON. Used for error reporting to generate a human-readable
655    /// representation of the value.
656    fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
657
658    #[cfg(feature = "with-avro")]
659    /// Convert current value to Avro.
660    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
661
662    /// Returns the weight associated with the current key/value pair.
663    fn weight(&mut self) -> i64;
664
665    /// Advances the cursor to the next key.
666    fn step_key(&mut self);
667
668    /// Advances the cursor to the next value.
669    fn step_val(&mut self);
670
671    /// Rewinds the cursor to the first key.
672    fn rewind_keys(&mut self);
673
674    /// Rewinds the cursor to the first value for current key.
675    fn rewind_vals(&mut self);
676
677    fn count_keys(&mut self) -> usize {
678        let mut count = 0;
679
680        while self.key_valid() {
681            count += 1;
682            self.step_key()
683        }
684
685        count
686    }
687
688    fn seek_key_exact(&mut self, key: &DynData) -> bool;
689
690    fn seek_key(&mut self, key: &DynData);
691}
692
693/// A handle to an output stream of a circuit that yields type-erased
694/// read-only batches.
695///
696/// A trait for a type that wraps around an
697/// [`OutputHandle`](`dbsp::OutputHandle`) and yields output batches produced by
698/// the circuit as [`SerBatchReader`]s.
699pub trait SerBatchReaderHandle: Send + Sync + DynClone {
700    /// See [`OutputHandle::num_nonempty_mailboxes`](`dbsp::OutputHandle::num_nonempty_mailboxes`)
701    fn num_nonempty_mailboxes(&self) -> usize;
702
703    /// Like [`OutputHandle::take_from_worker`](`dbsp::OutputHandle::take_from_worker`),
704    /// but returns output batch as a [`SerBatchReader`] trait object.
705    fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
706
707    /// Like [`OutputHandle::take_from_all`](`dbsp::OutputHandle::take_from_all`),
708    /// but returns output batches as [`SerBatchReader`] trait objects.
709    fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
710
711    /// Concatenate outputs from all workers into a single batch reader.
712    fn concat(&self) -> Arc<dyn SerBatchReader>;
713}
714
715dyn_clone::clone_trait_object!(SerBatchReaderHandle);
716
717/// Cursor that iterates over deletions before insertions.
718///
719/// Most consumers don't understand Z-sets and expect a stream of upserts
720/// instead, which means that the order of updates matters. For a table
721/// with a primary key or unique constraint we must delete an existing record
722/// before creating a new one with the same key.  DBSP may not know about
723/// these constraints, so the safe thing to do is to output deletions before
724/// insertions.  This cursor helps by iterating over all deletions in
725/// the batch before insertions.
726pub struct CursorWithPolarity<'a> {
727    cursor: Box<dyn SerCursor + 'a>,
728    second_pass: bool,
729}
730
731impl<'a> CursorWithPolarity<'a> {
732    pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
733        let mut result = Self {
734            cursor,
735            second_pass: false,
736        };
737
738        if result.key_valid() {
739            result.advance_val();
740        }
741
742        result
743    }
744
745    fn advance_val(&mut self) {
746        while self.cursor.val_valid()
747            && ((!self.second_pass && self.cursor.weight() >= 0)
748                || (self.second_pass && self.cursor.weight() <= 0))
749        {
750            self.step_val();
751        }
752    }
753}
754
755impl SerCursor for CursorWithPolarity<'_> {
756    fn key_valid(&self) -> bool {
757        self.cursor.key_valid()
758    }
759
760    fn val_valid(&self) -> bool {
761        self.cursor.val_valid()
762    }
763
764    fn key(&self) -> &DynData {
765        self.cursor.key()
766    }
767
768    fn get_key(&self) -> Option<&DynData> {
769        self.cursor.get_key()
770    }
771
772    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
773        self.cursor.serialize_key(dst)
774    }
775
776    fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
777        self.cursor.key_to_json()
778    }
779
780    fn serialize_key_fields(
781        &mut self,
782        fields: &HashSet<String>,
783        dst: &mut Vec<u8>,
784    ) -> AnyResult<()> {
785        self.cursor.serialize_key_fields(fields, dst)
786    }
787
788    #[cfg(feature = "with-avro")]
789    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
790        self.cursor.key_to_avro(schema, refs)
791    }
792
793    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
794        self.cursor.serialize_key_weight(dst)
795    }
796
797    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
798        self.cursor.serialize_key_to_arrow(dst)
799    }
800
801    fn serialize_key_to_arrow_with_metadata(
802        &mut self,
803        metadata: &dyn erased_serde::Serialize,
804        dst: &mut ArrayBuilder,
805    ) -> AnyResult<()> {
806        self.cursor
807            .serialize_key_to_arrow_with_metadata(metadata, dst)
808    }
809
810    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
811        self.cursor.serialize_val_to_arrow(dst)
812    }
813
814    fn serialize_val_to_arrow_with_metadata(
815        &mut self,
816        metadata: &dyn erased_serde::Serialize,
817        dst: &mut ArrayBuilder,
818    ) -> AnyResult<()> {
819        self.cursor
820            .serialize_val_to_arrow_with_metadata(metadata, dst)
821    }
822
823    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
824        self.cursor.serialize_val(dst)
825    }
826
827    fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
828        self.cursor.val_to_json()
829    }
830
831    #[cfg(feature = "with-avro")]
832    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
833        self.cursor.val_to_avro(schema, refs)
834    }
835
836    fn weight(&mut self) -> i64 {
837        self.cursor.weight()
838    }
839
840    fn step_key(&mut self) {
841        self.cursor.step_key();
842        if !self.cursor.key_valid() && !self.second_pass {
843            self.cursor.rewind_keys();
844            self.second_pass = true;
845        }
846
847        if self.cursor.key_valid() {
848            self.advance_val();
849        }
850    }
851
852    fn step_val(&mut self) {
853        self.cursor.step_val();
854        self.advance_val();
855    }
856
857    fn rewind_keys(&mut self) {
858        self.cursor.rewind_keys();
859        self.second_pass = false;
860        if self.cursor.key_valid() {
861            self.advance_val();
862        }
863    }
864
865    fn rewind_vals(&mut self) {
866        self.cursor.rewind_vals();
867        self.advance_val();
868    }
869
870    fn seek_key_exact(&mut self, key: &DynData) -> bool {
871        self.cursor.seek_key_exact(key)
872    }
873
874    fn seek_key(&mut self, key: &DynData) {
875        self.cursor.seek_key(key);
876    }
877}
878
879/// A catalog of input and output stream handles of a circuit.
880pub trait CircuitCatalog: Send + Sync {
881    /// Look up an input stream handle by name.
882    fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
883
884    fn output_iter(
885        &self,
886    ) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
887
888    /// Look up output stream handles by name.
889    fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
890
891    fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
892
893    /// The registry used to insert new user-defined preprocessors
894    fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
895}
896
897#[doc(hidden)]
898pub struct InputCollectionHandle {
899    pub schema: Relation,
900    pub handle: Box<dyn DeCollectionHandle>,
901
902    /// Node id of the input stream in the circuit.
903    ///
904    /// Used to check whether the input stream needs to be backfilled during bootstrapping,
905    /// i.e., whether attached input connectors should be reset to their initial offsets or
906    /// continue from the checkpointed offsets.
907    pub node_id: NodeId,
908}
909
910impl InputCollectionHandle {
911    #[doc(hidden)]
912    pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
913    where
914        H: DeCollectionHandle + 'static,
915    {
916        Self {
917            schema,
918            handle: Box::new(handle),
919            node_id,
920        }
921    }
922}
923
924/// A set of stream handles associated with each output collection.
925#[derive(Clone)]
926pub struct OutputCollectionHandles {
927    pub key_schema: Option<Relation>,
928    pub value_schema: Relation,
929
930    pub index_of: Option<SqlIdentifier>,
931
932    /// Whether the integrate handle is an indexed Z-set.
933    pub integrate_handle_is_indexed: bool,
934
935    /// A handle to a snapshot of a materialized table/view.
936    pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
937
938    /// A stream of changes to the collection.
939    pub delta_handle: Box<dyn SerBatchReaderHandle>,
940
941    /// Reference to the enable count of the accumulator used to collect updates to this stream.
942    /// Incremented every time an output connector is attached to this stream; decremented when
943    /// the output connector is detached.
944    pub enable_count: Arc<AtomicUsize>,
945}