Skip to main content

feldera_adapterlib/
catalog.rs

1use std::any::Any;
2use std::collections::HashSet;
3use std::fmt::{Debug, Formatter};
4use std::sync::atomic::AtomicUsize;
5use std::sync::{Arc, Mutex};
6
7use anyhow::Result as AnyResult;
8#[cfg(feature = "with-avro")]
9use apache_avro::{
10    Schema as AvroSchema,
11    schema::{Name as AvroName, NamesRef},
12    types::Value as AvroValue,
13};
14use arrow::record_batch::RecordBatch;
15use dbsp::circuit::NodeId;
16use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Erase, Factory};
17use dbsp::operator::StagedBuffers;
18use dyn_clone::DynClone;
19use feldera_sqllib::Variant;
20use feldera_types::format::csv::CsvParserConfig;
21use feldera_types::format::json::JsonFlavor;
22use feldera_types::program_schema::{Relation, SqlIdentifier};
23use feldera_types::serde_with_context::SqlSerdeConfig;
24use serde_arrow::ArrayBuilder;
25#[cfg(feature = "with-avro")]
26use std::collections::HashMap;
27
28use crate::errors::controller::ControllerError;
29use crate::format::InputBuffer;
30use crate::postprocess::PostprocessorRegistry;
31use crate::preprocess::PreprocessorRegistry;
32
33/// Descriptor that specifies the format in which records are received
34/// or into which they should be encoded before sending.
35#[derive(Clone)]
36pub enum RecordFormat {
37    // TODO: Support different JSON encodings:
38    // * Map - the default encoding
39    // * Array - allow the subset and the order of columns to be configurable
40    // * Raw - Only applicable to single-column tables.  Input records contain
41    // raw encoding of this column only.  This is particularly useful for
42    // tables that store raw JSON or binary data to be parsed using SQL.
43    Json(JsonFlavor),
44    Csv(CsvParserConfig),
45    Parquet(SqlSerdeConfig),
46    #[cfg(feature = "with-avro")]
47    Avro,
48    Raw(String),
49}
50
51/// An input handle that deserializes and buffers records.
52///
53/// A trait for a type that wraps a [`ZSetHandle`](`dbsp::ZSetHandle`) or an
54/// [`MapHandle`](`dbsp::MapHandle`) and collects serialized relational data for
55/// the associated input stream.  The client passes a byte array with a
56/// serialized data record (e.g., in JSON or CSV format) to
57/// [`insert`](`Self::insert`), [`delete`](`Self::delete`), and
58/// [`update`](`Self::update`) methods. The record gets deserialized into the
59/// strongly typed representation expected by the input stream.
60///
61/// Instances of this trait are created by calling
62/// [`DeCollectionHandle::configure_deserializer`].
63/// The data format accepted by the handle is determined
64/// by the `record_format` argument passed to this method.
65///
66/// The input handle internally buffers the deserialized records. Use the
67/// `InputBuffer` supertrait to push them to the circuit or extract them for
68/// later use.
69pub trait DeCollectionStream: Send + Sync + InputBuffer {
70    /// Buffer a new insert update.
71    ///
72    /// `metadata` contains optional metadata attached by the transport adapter or parser,
73    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
74    /// `DeserializeWithContext::deserialize_with_context_aux`.
75    ///
76    /// Returns an error if deserialization fails, i.e., the serialized
77    /// representation is corrupted or does not match the value type of
78    /// the underlying input stream.
79    fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
80
81    /// Buffer a new delete update.
82    ///
83    /// The `data` argument contains a serialized record whose
84    /// type depends on the underlying input stream: streams created by
85    /// [`RootCircuit::add_input_zset`](`dbsp::RootCircuit::add_input_zset`)
86    /// and [`RootCircuit::add_input_set`](`dbsp::RootCircuit::add_input_set`)
87    /// methods support deletion by value, hence the serialized record must
88    /// match the value type of the stream.  Streams created with
89    /// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
90    /// support deletion by key, so the serialized record must match the key
91    /// type of the stream.
92    ///
93    /// The record gets deserialized and pushed to the underlying input stream
94    /// handle as a delete update.
95    ///
96    /// `metadata` contains optional metadata attached by the transport adapter or parser,
97    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
98    /// `DeserializeWithContext::deserialize_with_context_aux`.
99    ///
100    /// Returns an error if deserialization fails, i.e., the serialized
101    /// representation is corrupted or does not match the value or key
102    /// type of the underlying input stream.
103    fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
104
105    /// Buffer a new update that will modify an existing record.
106    ///
107    /// `metadata` contains optional metadata attached by the transport adapter or parser,
108    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
109    /// `DeserializeWithContext::deserialize_with_context_aux`.
110    ///
111    /// This method can only be called on streams created with
112    /// [`RootCircuit::add_input_map`](`dbsp::RootCircuit::add_input_map`)
113    /// and will fail on other streams.  The serialized record must match
114    /// the update type of this stream, specified as a type argument to
115    /// `Catalog::register_input_map`.
116    fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
117
118    /// Reserve space for at least `reservation` more updates in the
119    /// internal input buffer.
120    ///
121    /// Reservations are not required but can be used when the number
122    /// of inputs is known ahead of time to reduce reallocations.
123    fn reserve(&mut self, reservation: usize);
124
125    /// Removes any updates beyond the first `len`.
126    fn truncate(&mut self, len: usize);
127
128    /// Stages all of the `buffers`, which must have been obtained from a
129    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
130    /// to push the collected data into the circuit.  See [StagedBuffers] for
131    /// more information.
132    ///
133    /// [Parser]: crate::format::Parser
134    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
135
136    /// Create a new deserializer with the same configuration connected to the
137    /// same input stream. The new deserializer has an independent buffer that
138    /// is initially empty.
139    fn fork(&self) -> Box<dyn DeCollectionStream>;
140}
141
142/// Like `DeCollectionStream`, but deserializes Arrow-encoded records before pushing them to a
143/// stream.
144pub trait ArrowStream: InputBuffer + Send + Sync {
145    /// Buffer a new batch of insert updates.
146    ///
147    /// `metadata` contains optional metadata attached by the transport adapter or parser,
148    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
149    /// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
150    fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
151
152    /// Buffer a new batch of delete updates.
153    ///
154    /// `metadata` contains optional metadata attached by the transport adapter or parser,
155    /// such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
156    /// `DeserializeWithContext::deserialize_with_context_aux` for each deserialized record.
157    fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
158
159    /// Insert records in `data` with polarities from the `polarities` array.
160    ///
161    /// `polarities` must be the same length as `data`.
162    fn insert_with_polarities(
163        &mut self,
164        data: &RecordBatch,
165        polarities: &[bool],
166        metadata: &Option<Variant>,
167    ) -> AnyResult<()>;
168
169    /// Create a new deserializer with the same configuration connected to
170    /// the same input stream.
171    fn fork(&self) -> Box<dyn ArrowStream>;
172
173    /// Stages all of the `buffers`, which must have been obtained from a
174    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
175    /// to push the collected data into the circuit.  See [StagedBuffers] for
176    /// more information.
177    ///
178    /// [Parser]: crate::format::Parser
179    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
180}
181
182#[cfg(feature = "with-avro")]
183pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
184
185/// Like `DeCollectionStream`, but deserializes Avro-encoded records before pushing them to a
186/// stream.
187#[cfg(feature = "with-avro")]
188pub trait AvroStream: InputBuffer + Send + Sync {
189    /// Buffer a new insert update.
190    ///
191    /// # Arguments
192    ///
193    /// * `schema` - The Avro schema to use for deserialization.
194    /// * `refs` - A map of named schema references that may be used to resolve references within `schema`.
195    /// * `metadata` - Optional metadata attached by the transport adapter or parser,
196    ///   such as Kafka headers, topic name, etc. This metadata is passed as an aux argument to
197    ///   `DeserializeWithContext::deserialize_with_context_aux`.
198    fn insert(
199        &mut self,
200        data: &AvroValue,
201        schema: &AvroSchema,
202        refs: &AvroSchemaRefs,
203        n_bytes: usize,
204        metadata: &Option<Variant>,
205    ) -> AnyResult<()>;
206
207    fn delete(
208        &mut self,
209        data: &AvroValue,
210        schema: &AvroSchema,
211        refs: &AvroSchemaRefs,
212        n_bytes: usize,
213        metadata: &Option<Variant>,
214    ) -> AnyResult<()>;
215
216    /// Create a new deserializer with the same configuration connected to
217    /// the same input stream.
218    fn fork(&self) -> Box<dyn AvroStream>;
219
220    /// Stages all of the `buffers`, which must have been obtained from a
221    /// [Parser] for this stream, into a [StagedBuffers] that may later be used
222    /// to push the collected data into the circuit.  See [StagedBuffers] for
223    /// more information.
224    ///
225    /// [Parser]: crate::format::Parser
226    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
227}
228
229/// A handle to an input collection that can be used to feed serialized data
230/// to the collection.
231pub trait DeCollectionHandle: Send + Sync {
232    /// Create a [`DeCollectionStream`] object to parse input data encoded
233    /// using the format specified in `RecordFormat`.
234    fn configure_deserializer(
235        &self,
236        record_format: RecordFormat,
237    ) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
238
239    /// Create an `ArrowStream` object to parse Arrow-encoded input data.
240    fn configure_arrow_deserializer(
241        &self,
242        config: SqlSerdeConfig,
243    ) -> Result<Box<dyn ArrowStream>, ControllerError>;
244
245    /// Create an `AvroStream` object to parse Avro-encoded input data.
246    #[cfg(feature = "with-avro")]
247    fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
248
249    fn fork(&self) -> Box<dyn DeCollectionHandle>;
250}
251
252/// A type-erased batch whose contents can be serialized.
253///
254/// This is a wrapper around the DBSP `BatchReader` trait that returns a cursor that
255/// yields `erased_serde::Serialize` trait objects that can be used to serialize
256/// the contents of the batch without knowing its key and value types.
257// The reason we need the `Sync` trait below is so that we can wrap batches
258// in `Arc` and send the same batch to multiple output endpoint threads.
259pub trait SerBatchReader: 'static + Send + Sync {
260    /// Number of keys in the batch.
261    fn key_count(&self) -> usize;
262
263    /// Number of tuples in the batch.
264    fn len(&self) -> usize;
265
266    fn is_empty(&self) -> bool {
267        self.len() == 0
268    }
269
270    /// Create a cursor over the batch that yields records
271    /// formatted using the specified format.
272    fn cursor<'a>(
273        &'a self,
274        record_format: RecordFormat,
275    ) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
276
277    /// Returns all batches in this reader.
278    ///
279    /// A reader can wrap a single batch or a spine or a spine snapshot. This method extracts
280    /// all batches from the reader.
281    fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
282
283    fn snapshot(&self) -> Arc<dyn SerBatchReader>;
284
285    fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
286
287    fn key_factory(&self) -> &'static dyn Factory<DynData>;
288
289    fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
290
291    fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
292}
293
294impl Debug for dyn SerBatchReader {
295    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
296        let mut cursor = self
297            .cursor(RecordFormat::Json(Default::default()))
298            .map_err(|_| std::fmt::Error)?;
299        let mut key = Vec::new();
300        let mut val = Vec::new();
301        while cursor.key_valid() {
302            cursor
303                .serialize_key(&mut key)
304                .map_err(|_| std::fmt::Error)?;
305            write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
306
307            while cursor.val_valid() {
308                cursor
309                    .serialize_val(&mut val)
310                    .map_err(|_| std::fmt::Error)?;
311                write!(
312                    f,
313                    "{}=>{}, ",
314                    String::from_utf8_lossy(&val),
315                    cursor.weight()
316                )?;
317
318                val.clear();
319                cursor.step_val();
320            }
321
322            write!(f, "}}, ")?;
323            key.clear();
324            cursor.step_key();
325        }
326
327        Ok(())
328    }
329}
330
331impl Debug for dyn SerBatch {
332    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
333        self.as_batch_reader().fmt(f)
334    }
335}
336
337/// A type-erased `Batch`.
338pub trait SerBatch: SerBatchReader {
339    /// Convert to `Arc<Any>`, which can then be downcast to a reference
340    /// to a concrete batch type.
341    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
342
343    /// Merge `self` with all batches in `other`.
344    fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
345
346    /// Concatenate `self.batches()` with `b.batches()` for all batches in `other`
347    /// into a spine snapshot.
348    fn concat(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatchReader>;
349
350    fn as_batch_reader(&self) -> &dyn SerBatchReader;
351
352    fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
353
354    /// Convert batch into a trace with identical contents.
355    fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
356}
357
358/// A type-erased `Trace`.
359pub trait SerTrace: SerBatchReader {
360    /// Insert a batch into the trace.
361    fn insert(&mut self, batch: Arc<dyn SerBatch>);
362
363    fn insert_without_blocking(&mut self, batch: Arc<dyn SerBatch>) -> bool;
364
365    fn backpressure_wait(&self);
366
367    fn as_batch_reader(&self) -> &dyn SerBatchReader;
368}
369
370#[doc(hidden)]
371pub struct SplitCursorBuilder {
372    batch: Arc<dyn SerBatchReader>,
373    start_key: Box<DynData>,
374    end_key: Option<Box<DynData>>,
375    format: RecordFormat,
376}
377
378impl SplitCursorBuilder {
379    /// Create a [`SplitCursorBuilder`] for partition `index` given a batch,
380    /// pre-computed partition `bounds` (as returned by
381    /// [`SerBatchReader::partition_keys`]), and a record `format`.
382    ///
383    /// `bounds` contains `N-1` boundary keys for `N` partitions.
384    /// Partition 0 spans from the start of the batch to `bounds[0]`,
385    /// partition `i` spans from `bounds[i-1]` to `bounds[i]`, and the last
386    /// partition spans from `bounds[N-2]` to the end of the batch.
387    ///
388    /// Returns `None` if the partition is empty (the cursor has no key at the
389    /// start position).
390    pub fn from_bounds(
391        batch: Arc<dyn SerBatchReader>,
392        bounds: &DynVec<DynData>,
393        index: usize,
394        format: RecordFormat,
395    ) -> Option<Self> {
396        let start_bound = if index == 0 {
397            None
398        } else if index <= bounds.len() {
399            Some(bounds.index(index - 1).as_data())
400        } else {
401            None
402        };
403
404        let end_bound = if index < bounds.len() {
405            Some(bounds.index(index).as_data())
406        } else {
407            None
408        };
409
410        let start_key = {
411            let mut cursor = batch.cursor(format.clone()).unwrap();
412
413            // Seek to start. If None, the cursor starts at the beginning.
414            if let Some(start_bound) = start_bound {
415                cursor.seek_key_exact(start_bound);
416            }
417
418            // Clone the actual key the cursor landed on.
419            cursor.get_key().map(|s| {
420                let mut key = batch.key_factory().default_box();
421                s.clone_to(key.as_mut());
422                key
423            })
424        }?;
425
426        let end_key = end_bound.map(|e| {
427            let mut key = batch.key_factory().default_box();
428            e.clone_to(key.as_mut());
429            key
430        });
431
432        Some(SplitCursorBuilder {
433            batch,
434            start_key,
435            end_key,
436            format,
437        })
438    }
439
440    pub fn build<'a>(&'a self) -> SplitCursor<'a> {
441        let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
442
443        // Cannot use `seek_key_exact` here, so we can single-step the cursor afterward.
444        cursor.seek_key(self.start_key.as_data());
445
446        SplitCursor {
447            cursor,
448            start_key: self.start_key.clone(),
449            end_key: self.end_key.clone(),
450        }
451    }
452}
453
454#[doc(hidden)]
455pub struct SplitCursor<'a> {
456    cursor: Box<dyn SerCursor + 'a>,
457    start_key: Box<DynData>,
458    end_key: Option<Box<DynData>>,
459}
460
461impl SplitCursor<'_> {
462    fn finished(&self) -> bool {
463        if let Some(ref end_key) = self.end_key
464            && let Some(current_key) = self.cursor.get_key()
465        {
466            return current_key >= end_key.as_data();
467        }
468
469        false
470    }
471}
472
473impl SerCursor for SplitCursor<'_> {
474    fn key_valid(&self) -> bool {
475        self.cursor.key_valid() && !self.finished()
476    }
477
478    fn val_valid(&self) -> bool {
479        self.cursor.val_valid()
480    }
481
482    fn key(&self) -> &DynData {
483        self.cursor.key()
484    }
485
486    fn val(&self) -> &DynData {
487        self.cursor.val()
488    }
489
490    fn get_key(&self) -> Option<&DynData> {
491        if !self.key_valid() {
492            return None;
493        }
494
495        self.cursor.get_key()
496    }
497
498    fn get_val(&self) -> Option<&DynData> {
499        self.cursor.get_val()
500    }
501
502    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
503        self.cursor.serialize_key(dst)
504    }
505
506    fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
507        self.cursor.key_to_json()
508    }
509
510    fn serialize_key_fields(
511        &mut self,
512        fields: &HashSet<String>,
513        dst: &mut Vec<u8>,
514    ) -> AnyResult<()> {
515        self.cursor.serialize_key_fields(fields, dst)
516    }
517
518    fn serialize_val_fields(
519        &mut self,
520        fields: &HashSet<String>,
521        dst: &mut Vec<u8>,
522    ) -> AnyResult<()> {
523        self.cursor.serialize_val_fields(fields, dst)
524    }
525
526    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
527        self.cursor.serialize_key_to_arrow(dst)
528    }
529
530    fn serialize_key_to_arrow_with_metadata(
531        &mut self,
532        metadata: &dyn erased_serde::Serialize,
533        dst: &mut ArrayBuilder,
534    ) -> AnyResult<()> {
535        self.cursor
536            .serialize_key_to_arrow_with_metadata(metadata, dst)
537    }
538
539    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
540        self.cursor.serialize_val_to_arrow(dst)
541    }
542
543    fn serialize_val_to_arrow_with_metadata(
544        &mut self,
545        metadata: &dyn erased_serde::Serialize,
546        dst: &mut ArrayBuilder,
547    ) -> AnyResult<()> {
548        self.cursor
549            .serialize_val_to_arrow_with_metadata(metadata, dst)
550    }
551
552    #[cfg(feature = "with-avro")]
553    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
554        self.cursor.key_to_avro(schema, refs)
555    }
556
557    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
558        self.cursor.serialize_key_weight(dst)
559    }
560
561    fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
562        self.cursor.serialize_val_weight(dst)
563    }
564
565    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
566        self.cursor.serialize_val(dst)
567    }
568
569    fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
570        self.cursor.val_to_json()
571    }
572
573    #[cfg(feature = "with-avro")]
574    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
575        self.cursor.val_to_avro(schema, refs)
576    }
577
578    fn weight(&mut self) -> i64 {
579        self.cursor.weight()
580    }
581
582    fn step_key(&mut self) {
583        self.cursor.step_key();
584    }
585
586    fn step_val(&mut self) {
587        self.cursor.step_val();
588    }
589
590    fn rewind_keys(&mut self) {
591        self.cursor.rewind_keys();
592        self.cursor.seek_key(self.start_key.as_data());
593    }
594
595    fn rewind_vals(&mut self) {
596        self.cursor.rewind_vals();
597    }
598
599    fn seek_key_exact(&mut self, key: &DynData) -> bool {
600        if let Some(ref end_key) = self.end_key
601            && key >= end_key.as_data()
602        {
603            return false;
604        }
605
606        self.cursor.seek_key_exact(key)
607    }
608
609    fn seek_key(&mut self, key: &DynData) {
610        self.cursor.seek_key(key);
611    }
612}
613
614/// A wrapper around a `SerCursor` that makes an `IndexedZSet<K, V>` look like a `ZSet<V>`
615/// by iterating over values only.
616///
617/// * Doesn't return values in order.
618/// * Panics when any operations over values are attempted.
619/// * Panics on seek_key_exact and seek_key.
620pub struct SerCursorFlattened<'a> {
621    val_valid: bool,
622    cursor: Box<dyn SerCursor + 'a>,
623}
624
625impl<'a> SerCursorFlattened<'a> {
626    pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
627        Self {
628            cursor,
629            val_valid: true,
630        }
631    }
632}
633
634impl<'a> SerCursor for SerCursorFlattened<'a> {
635    fn key_valid(&self) -> bool {
636        self.cursor.key_valid() && self.cursor.val_valid()
637    }
638
639    fn val_valid(&self) -> bool {
640        self.val_valid
641    }
642
643    fn key(&self) -> &DynData {
644        self.cursor.val()
645    }
646
647    fn get_key(&self) -> Option<&DynData> {
648        self.cursor.get_val()
649    }
650
651    fn val(&self) -> &DynData {
652        ().erase()
653    }
654
655    fn get_val(&self) -> Option<&DynData> {
656        if self.val_valid {
657            Some(().erase())
658        } else {
659            None
660        }
661    }
662
663    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
664        self.cursor.serialize_val(dst)
665    }
666
667    fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
668        self.cursor.val_to_json()
669    }
670
671    fn serialize_key_fields(
672        &mut self,
673        fields: &HashSet<String>,
674        dst: &mut Vec<u8>,
675    ) -> AnyResult<()> {
676        self.cursor.serialize_val_fields(fields, dst)
677    }
678
679    fn serialize_val_fields(
680        &mut self,
681        _fields: &HashSet<String>,
682        _dst: &mut Vec<u8>,
683    ) -> AnyResult<()> {
684        panic!("serialize_val_fields is not supported for flattened cursors");
685    }
686
687    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
688        self.cursor.serialize_val_to_arrow(dst)
689    }
690
691    fn serialize_key_to_arrow_with_metadata(
692        &mut self,
693        metadata: &dyn erased_serde::Serialize,
694        dst: &mut ArrayBuilder,
695    ) -> AnyResult<()> {
696        self.cursor
697            .serialize_val_to_arrow_with_metadata(metadata, dst)
698    }
699
700    fn serialize_val_to_arrow(&mut self, _dst: &mut ArrayBuilder) -> AnyResult<()> {
701        panic!("serialize_val_to_arrow is not supported for flattened cursors");
702    }
703
704    fn serialize_val_to_arrow_with_metadata(
705        &mut self,
706        _metadata: &dyn erased_serde::Serialize,
707        _dst: &mut ArrayBuilder,
708    ) -> AnyResult<()> {
709        panic!("serialize_val_to_arrow_with_metadata is not supported for flattened cursors");
710    }
711
712    #[cfg(feature = "with-avro")]
713    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
714        self.cursor.val_to_avro(schema, refs)
715    }
716
717    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
718        self.cursor.serialize_val_weight(dst)
719    }
720
721    fn serialize_val_weight(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
722        panic!("serialize_val_weight is not supported for flattened cursors");
723    }
724
725    fn serialize_val(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
726        panic!("serialize_val is not supported for flattened cursors");
727    }
728
729    fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
730        panic!("val_to_json is not supported for flattened cursors");
731    }
732
733    #[cfg(feature = "with-avro")]
734    fn val_to_avro(&mut self, _schema: &AvroSchema, _refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
735        panic!("val_to_avro is not supported for flattened cursors");
736    }
737
738    fn weight(&mut self) -> i64 {
739        self.cursor.weight()
740    }
741
742    fn step_key(&mut self) {
743        debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
744        self.cursor.step_val();
745        while !self.cursor.val_valid() && self.cursor.key_valid() {
746            self.cursor.step_key();
747        }
748        self.val_valid = true;
749    }
750
751    fn step_val(&mut self) {
752        self.val_valid = false;
753    }
754
755    fn rewind_keys(&mut self) {
756        self.cursor.rewind_keys();
757        self.val_valid = true;
758    }
759
760    fn rewind_vals(&mut self) {
761        self.val_valid = true;
762    }
763
764    fn seek_key_exact(&mut self, _key: &DynData) -> bool {
765        panic!("seek_key_exact is not supported for flattened cursors");
766    }
767
768    fn seek_key(&mut self, _key: &DynData) {
769        panic!("seek_key is not supported for flattened cursors");
770    }
771}
772
773/// Cursor that allows serializing the contents of a type-erased batch.
774///
775/// This is a wrapper around the DBSP `Cursor` trait that yields keys and values
776/// of the underlying batch as `erased_serde::Serialize` trait objects.
777pub trait SerCursor: Send {
778    /// Indicates if the current key is valid.
779    ///
780    /// A value of `false` indicates that the cursor has exhausted all keys.
781    fn key_valid(&self) -> bool;
782
783    /// Indicates if the current value is valid.
784    ///
785    /// A value of `false` indicates that the cursor has exhausted all values
786    /// for this key.
787    fn val_valid(&self) -> bool;
788
789    fn key(&self) -> &DynData;
790
791    fn get_key(&self) -> Option<&DynData>;
792
793    fn val(&self) -> &DynData;
794
795    fn get_val(&self) -> Option<&DynData>;
796
797    /// Serialize current key. Panics if invalid.
798    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
799
800    /// Convert key to JSON. Used for error reporting to generate a human-readable
801    /// representation of the key.
802    fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
803
804    /// Like `serialize_key`, but only serializes the specified fields of the key.
805    fn serialize_key_fields(
806        &mut self,
807        fields: &HashSet<String>,
808        dst: &mut Vec<u8>,
809    ) -> AnyResult<()>;
810
811    fn serialize_val_fields(
812        &mut self,
813        fields: &HashSet<String>,
814        dst: &mut Vec<u8>,
815    ) -> AnyResult<()>;
816
817    /// Serialize current key into arrow format. Panics if invalid.
818    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
819
820    /// Serialize current key into arrow format, adding additional metadata columns.
821    /// `metadata` must be a struct or a map.
822    fn serialize_key_to_arrow_with_metadata(
823        &mut self,
824        metadata: &dyn erased_serde::Serialize,
825        dst: &mut ArrayBuilder,
826    ) -> AnyResult<()>;
827
828    /// Serialize current value into arrow format. Panics if invalid.
829    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
830
831    /// Serialize current value into arrow format, adding additional metadata columns.
832    /// `metadata` must be a struct or a map.
833    fn serialize_val_to_arrow_with_metadata(
834        &mut self,
835        metadata: &dyn erased_serde::Serialize,
836        dst: &mut ArrayBuilder,
837    ) -> AnyResult<()>;
838
839    #[cfg(feature = "with-avro")]
840    /// Convert current key to an Avro value.
841    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
842
843    /// Serialize the `(key, weight)` tuple.
844    ///
845    /// FIXME: This only exists to support the CSV serializer, which outputs
846    /// key and weight in the same CSV record.
847    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
848
849    fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
850
851    /// Serialize current value. Panics if invalid.
852    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
853
854    /// Convert value to JSON. Used for error reporting to generate a human-readable
855    /// representation of the value.
856    fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
857
858    #[cfg(feature = "with-avro")]
859    /// Convert current value to Avro.
860    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
861
862    /// Returns the weight associated with the current key/value pair.
863    fn weight(&mut self) -> i64;
864
865    /// Advances the cursor to the next key.
866    fn step_key(&mut self);
867
868    /// Advances the cursor to the next value.
869    fn step_val(&mut self);
870
871    /// Rewinds the cursor to the first key.
872    fn rewind_keys(&mut self);
873
874    /// Rewinds the cursor to the first value for current key.
875    fn rewind_vals(&mut self);
876
877    fn count_keys(&mut self) -> usize {
878        let mut count = 0;
879
880        while self.key_valid() {
881            count += 1;
882            self.step_key()
883        }
884
885        count
886    }
887
888    fn seek_key_exact(&mut self, key: &DynData) -> bool;
889
890    fn seek_key(&mut self, key: &DynData);
891}
892
893/// A handle to an output stream of a circuit that yields type-erased
894/// read-only batches.
895///
896/// A trait for a type that wraps around an
897/// [`OutputHandle`](`dbsp::OutputHandle`) and yields output batches produced by
898/// the circuit as [`SerBatchReader`]s.
899pub trait SerBatchReaderHandle: Send + Sync + DynClone {
900    /// See [`OutputHandle::num_nonempty_mailboxes`](`dbsp::OutputHandle::num_nonempty_mailboxes`)
901    fn num_nonempty_mailboxes(&self) -> usize;
902
903    /// Like [`OutputHandle::take_from_worker`](`dbsp::OutputHandle::take_from_worker`),
904    /// but returns output batch as a [`SerBatchReader`] trait object.
905    fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
906
907    /// Like [`OutputHandle::take_from_all`](`dbsp::OutputHandle::take_from_all`),
908    /// but returns output batches as [`SerBatchReader`] trait objects.
909    fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
910
911    /// Concatenate outputs from all workers into a single batch reader.
912    fn concat(&self) -> Arc<dyn SerBatchReader>;
913}
914
915dyn_clone::clone_trait_object!(SerBatchReaderHandle);
916
917/// Cursor that iterates over deletions before insertions.
918///
919/// Most consumers don't understand Z-sets and expect a stream of upserts
920/// instead, which means that the order of updates matters. For a table
921/// with a primary key or unique constraint we must delete an existing record
922/// before creating a new one with the same key.  DBSP may not know about
923/// these constraints, so the safe thing to do is to output deletions before
924/// insertions.  This cursor helps by iterating over all deletions in
925/// the batch before insertions.
926pub struct CursorWithPolarity<'a> {
927    cursor: Box<dyn SerCursor + 'a>,
928    second_pass: bool,
929}
930
931impl<'a> CursorWithPolarity<'a> {
932    pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
933        let mut result = Self {
934            cursor,
935            second_pass: false,
936        };
937
938        if result.key_valid() {
939            result.advance_val();
940        }
941
942        result
943    }
944
945    fn advance_val(&mut self) {
946        while self.cursor.val_valid()
947            && ((!self.second_pass && self.cursor.weight() >= 0)
948                || (self.second_pass && self.cursor.weight() <= 0))
949        {
950            self.step_val();
951        }
952    }
953}
954
955impl SerCursor for CursorWithPolarity<'_> {
956    fn key_valid(&self) -> bool {
957        self.cursor.key_valid()
958    }
959
960    fn val_valid(&self) -> bool {
961        self.cursor.val_valid()
962    }
963
964    fn key(&self) -> &DynData {
965        self.cursor.key()
966    }
967
968    fn get_key(&self) -> Option<&DynData> {
969        self.cursor.get_key()
970    }
971
972    fn val(&self) -> &DynData {
973        self.cursor.val()
974    }
975
976    fn get_val(&self) -> Option<&DynData> {
977        self.cursor.get_val()
978    }
979
980    fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
981        self.cursor.serialize_key(dst)
982    }
983
984    fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
985        self.cursor.key_to_json()
986    }
987
988    fn serialize_key_fields(
989        &mut self,
990        fields: &HashSet<String>,
991        dst: &mut Vec<u8>,
992    ) -> AnyResult<()> {
993        self.cursor.serialize_key_fields(fields, dst)
994    }
995
996    fn serialize_val_fields(
997        &mut self,
998        fields: &HashSet<String>,
999        dst: &mut Vec<u8>,
1000    ) -> AnyResult<()> {
1001        self.cursor.serialize_val_fields(fields, dst)
1002    }
1003
1004    #[cfg(feature = "with-avro")]
1005    fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
1006        self.cursor.key_to_avro(schema, refs)
1007    }
1008
1009    fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1010        self.cursor.serialize_key_weight(dst)
1011    }
1012
1013    fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1014        self.cursor.serialize_val_weight(dst)
1015    }
1016
1017    fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
1018        self.cursor.serialize_key_to_arrow(dst)
1019    }
1020
1021    fn serialize_key_to_arrow_with_metadata(
1022        &mut self,
1023        metadata: &dyn erased_serde::Serialize,
1024        dst: &mut ArrayBuilder,
1025    ) -> AnyResult<()> {
1026        self.cursor
1027            .serialize_key_to_arrow_with_metadata(metadata, dst)
1028    }
1029
1030    fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
1031        self.cursor.serialize_val_to_arrow(dst)
1032    }
1033
1034    fn serialize_val_to_arrow_with_metadata(
1035        &mut self,
1036        metadata: &dyn erased_serde::Serialize,
1037        dst: &mut ArrayBuilder,
1038    ) -> AnyResult<()> {
1039        self.cursor
1040            .serialize_val_to_arrow_with_metadata(metadata, dst)
1041    }
1042
1043    fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1044        self.cursor.serialize_val(dst)
1045    }
1046
1047    fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
1048        self.cursor.val_to_json()
1049    }
1050
1051    #[cfg(feature = "with-avro")]
1052    fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
1053        self.cursor.val_to_avro(schema, refs)
1054    }
1055
1056    fn weight(&mut self) -> i64 {
1057        self.cursor.weight()
1058    }
1059
1060    fn step_key(&mut self) {
1061        self.cursor.step_key();
1062        if !self.cursor.key_valid() && !self.second_pass {
1063            self.cursor.rewind_keys();
1064            self.second_pass = true;
1065        }
1066
1067        if self.cursor.key_valid() {
1068            self.advance_val();
1069        }
1070    }
1071
1072    fn step_val(&mut self) {
1073        self.cursor.step_val();
1074        self.advance_val();
1075    }
1076
1077    fn rewind_keys(&mut self) {
1078        self.cursor.rewind_keys();
1079        self.second_pass = false;
1080        if self.cursor.key_valid() {
1081            self.advance_val();
1082        }
1083    }
1084
1085    fn rewind_vals(&mut self) {
1086        self.cursor.rewind_vals();
1087        self.advance_val();
1088    }
1089
1090    fn seek_key_exact(&mut self, key: &DynData) -> bool {
1091        self.cursor.seek_key_exact(key)
1092    }
1093
1094    fn seek_key(&mut self, key: &DynData) {
1095        self.cursor.seek_key(key);
1096    }
1097}
1098
1099/// A catalog of input and output stream handles of a circuit.
1100pub trait CircuitCatalog: Send + Sync {
1101    /// Look up an input stream handle by name.
1102    fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
1103
1104    fn output_iter(
1105        &self,
1106    ) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
1107
1108    /// Look up output stream handles by name.
1109    fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
1110
1111    /// Look up handles for an index of a stream.
1112    fn index_handles(
1113        &self,
1114        endpoint_name: &str,
1115        stream: &SqlIdentifier,
1116        index: &SqlIdentifier,
1117    ) -> Result<&OutputCollectionHandles, ControllerError>;
1118
1119    fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
1120
1121    /// The registry used to insert new user-defined preprocessors
1122    fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
1123
1124    /// The registry used to insert new user-defined postprocessors
1125    fn postprocessor_registry(&self) -> Arc<Mutex<PostprocessorRegistry>>;
1126}
1127
1128#[doc(hidden)]
1129pub struct InputCollectionHandle {
1130    pub schema: Relation,
1131    pub handle: Box<dyn DeCollectionHandle>,
1132
1133    /// Node id of the input stream in the circuit.
1134    ///
1135    /// Used to check whether the input stream needs to be backfilled during bootstrapping,
1136    /// i.e., whether attached input connectors should be reset to their initial offsets or
1137    /// continue from the checkpointed offsets.
1138    pub node_id: NodeId,
1139}
1140
1141impl InputCollectionHandle {
1142    #[doc(hidden)]
1143    pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
1144    where
1145        H: DeCollectionHandle + 'static,
1146    {
1147        Self {
1148            schema,
1149            handle: Box::new(handle),
1150            node_id,
1151        }
1152    }
1153}
1154
1155/// A set of stream handles associated with each output collection.
1156#[derive(Clone)]
1157pub struct OutputCollectionHandles {
1158    /// Schema of the keys in the stream.
1159    ///
1160    /// Only set for indexed Z-sets.
1161    pub key_schema: Option<Relation>,
1162
1163    /// Schema of the values in the stream.
1164    ///
1165    /// If the stream is an indexed Z-set, this is the schema of the values in the stream;
1166    /// if it is a Z-set, this is the schema of the keys in the stream.
1167    pub value_schema: Relation,
1168
1169    /// Set when this stream is an index of another stream.
1170    pub index_of: Option<SqlIdentifier>,
1171
1172    /// Set when the same stream is used to represent a view and its index.
1173    pub alias_as_index: Option<SqlIdentifier>,
1174
1175    /// A handle to a snapshot of a materialized table/view.
1176    pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
1177
1178    /// A stream of changes to the collection.
1179    pub delta_handle: Box<dyn SerBatchReaderHandle>,
1180
1181    /// Reference to the enable count of the accumulator used to collect updates to this stream.
1182    /// Incremented every time an output connector is attached to this stream; decremented when
1183    /// the output connector is detached.
1184    pub enable_count: Arc<AtomicUsize>,
1185}
1186
1187impl OutputCollectionHandles {
1188    /// Stream is an indexed Z-set.
1189    pub fn is_indexed(&self) -> bool {
1190        self.key_schema.is_some()
1191    }
1192}