1use std::any::Any;
2use std::collections::HashSet;
3use std::fmt::{Debug, Formatter};
4use std::sync::atomic::AtomicUsize;
5use std::sync::{Arc, Mutex};
6
7use anyhow::Result as AnyResult;
8#[cfg(feature = "with-avro")]
9use apache_avro::{
10 Schema as AvroSchema,
11 schema::{Name as AvroName, NamesRef},
12 types::Value as AvroValue,
13};
14use arrow::record_batch::RecordBatch;
15use dbsp::circuit::NodeId;
16use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Factory};
17use dbsp::operator::StagedBuffers;
18use dyn_clone::DynClone;
19use feldera_sqllib::Variant;
20use feldera_types::format::csv::CsvParserConfig;
21use feldera_types::format::json::JsonFlavor;
22use feldera_types::program_schema::{Relation, SqlIdentifier};
23use feldera_types::serde_with_context::SqlSerdeConfig;
24use serde_arrow::ArrayBuilder;
25#[cfg(feature = "with-avro")]
26use std::collections::HashMap;
27
28use crate::errors::controller::ControllerError;
29use crate::format::InputBuffer;
30use crate::preprocess::PreprocessorRegistry;
31
32#[derive(Clone)]
35pub enum RecordFormat {
36 Json(JsonFlavor),
43 Csv(CsvParserConfig),
44 Parquet(SqlSerdeConfig),
45 #[cfg(feature = "with-avro")]
46 Avro,
47 Raw(String),
48}
49
50pub trait DeCollectionStream: Send + Sync + InputBuffer {
69 fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
79
80 fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
103
104 fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
116
117 fn reserve(&mut self, reservation: usize);
123
124 fn truncate(&mut self, len: usize);
126
127 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
134
135 fn fork(&self) -> Box<dyn DeCollectionStream>;
139}
140
141pub trait ArrowStream: InputBuffer + Send + Sync {
144 fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
150
151 fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
157
158 fn insert_with_polarities(
162 &mut self,
163 data: &RecordBatch,
164 polarities: &[bool],
165 metadata: &Option<Variant>,
166 ) -> AnyResult<()>;
167
168 fn fork(&self) -> Box<dyn ArrowStream>;
171
172 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
179}
180
181#[cfg(feature = "with-avro")]
182pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
183
184#[cfg(feature = "with-avro")]
187pub trait AvroStream: InputBuffer + Send + Sync {
188 fn insert(
198 &mut self,
199 data: &AvroValue,
200 schema: &AvroSchema,
201 refs: &AvroSchemaRefs,
202 n_bytes: usize,
203 metadata: &Option<Variant>,
204 ) -> AnyResult<()>;
205
206 fn delete(
207 &mut self,
208 data: &AvroValue,
209 schema: &AvroSchema,
210 refs: &AvroSchemaRefs,
211 n_bytes: usize,
212 metadata: &Option<Variant>,
213 ) -> AnyResult<()>;
214
215 fn fork(&self) -> Box<dyn AvroStream>;
218
219 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
226}
227
228pub trait DeCollectionHandle: Send + Sync {
231 fn configure_deserializer(
234 &self,
235 record_format: RecordFormat,
236 ) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
237
238 fn configure_arrow_deserializer(
240 &self,
241 config: SqlSerdeConfig,
242 ) -> Result<Box<dyn ArrowStream>, ControllerError>;
243
244 #[cfg(feature = "with-avro")]
246 fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
247
248 fn fork(&self) -> Box<dyn DeCollectionHandle>;
249}
250
251pub trait SerBatchReader: 'static + Send + Sync {
259 fn key_count(&self) -> usize;
261
262 fn len(&self) -> usize;
264
265 fn is_empty(&self) -> bool {
266 self.len() == 0
267 }
268
269 fn cursor<'a>(
272 &'a self,
273 record_format: RecordFormat,
274 ) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
275
276 fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
281
282 fn snapshot(&self) -> Arc<dyn SerBatchReader>;
283
284 fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
285
286 fn key_factory(&self) -> &'static dyn Factory<DynData>;
287
288 fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
289
290 fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
291}
292
293impl Debug for dyn SerBatchReader {
294 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
295 let mut cursor = self
296 .cursor(RecordFormat::Json(Default::default()))
297 .map_err(|_| std::fmt::Error)?;
298 let mut key = Vec::new();
299 let mut val = Vec::new();
300 while cursor.key_valid() {
301 cursor
302 .serialize_key(&mut key)
303 .map_err(|_| std::fmt::Error)?;
304 write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
305
306 while cursor.val_valid() {
307 cursor
308 .serialize_val(&mut val)
309 .map_err(|_| std::fmt::Error)?;
310 write!(
311 f,
312 "{}=>{}, ",
313 String::from_utf8_lossy(&val),
314 cursor.weight()
315 )?;
316
317 val.clear();
318 cursor.step_val();
319 }
320
321 write!(f, "}}, ")?;
322 key.clear();
323 cursor.step_key();
324 }
325
326 Ok(())
327 }
328}
329
330impl Debug for dyn SerBatch {
331 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332 self.as_batch_reader().fmt(f)
333 }
334}
335
336pub trait SerBatch: SerBatchReader {
338 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
341
342 fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
344
345 fn as_batch_reader(&self) -> &dyn SerBatchReader;
346
347 fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
348
349 fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
351}
352
353pub trait SerTrace: SerBatchReader {
355 fn insert(&mut self, batch: Arc<dyn SerBatch>);
357
358 fn as_batch_reader(&self) -> &dyn SerBatchReader;
359}
360
361#[doc(hidden)]
362pub struct SplitCursorBuilder {
363 batch: Arc<dyn SerBatchReader>,
364 start_key: Box<DynData>,
365 end_key: Option<Box<DynData>>,
366 format: RecordFormat,
367}
368
369impl SplitCursorBuilder {
370 pub fn from_bounds(
382 batch: Arc<dyn SerBatchReader>,
383 bounds: &DynVec<DynData>,
384 index: usize,
385 format: RecordFormat,
386 ) -> Option<Self> {
387 let start_bound = if index == 0 {
388 None
389 } else if index <= bounds.len() {
390 Some(bounds.index(index - 1).as_data())
391 } else {
392 None
393 };
394
395 let end_bound = if index < bounds.len() {
396 Some(bounds.index(index).as_data())
397 } else {
398 None
399 };
400
401 let start_key = {
402 let mut cursor = batch.cursor(format.clone()).unwrap();
403
404 if let Some(start_bound) = start_bound {
406 cursor.seek_key_exact(start_bound);
407 }
408
409 cursor.get_key().map(|s| {
411 let mut key = batch.key_factory().default_box();
412 s.clone_to(key.as_mut());
413 key
414 })
415 }?;
416
417 let end_key = end_bound.map(|e| {
418 let mut key = batch.key_factory().default_box();
419 e.clone_to(key.as_mut());
420 key
421 });
422
423 Some(SplitCursorBuilder {
424 batch,
425 start_key,
426 end_key,
427 format,
428 })
429 }
430
431 pub fn build<'a>(&'a self) -> SplitCursor<'a> {
432 let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
433
434 cursor.seek_key(self.start_key.as_data());
436
437 SplitCursor {
438 cursor,
439 start_key: self.start_key.clone(),
440 end_key: self.end_key.clone(),
441 }
442 }
443}
444
445#[doc(hidden)]
446pub struct SplitCursor<'a> {
447 cursor: Box<dyn SerCursor + 'a>,
448 start_key: Box<DynData>,
449 end_key: Option<Box<DynData>>,
450}
451
452impl SplitCursor<'_> {
453 fn finished(&self) -> bool {
454 if let Some(ref end_key) = self.end_key
455 && let Some(current_key) = self.cursor.get_key()
456 {
457 return current_key >= end_key.as_data();
458 }
459
460 false
461 }
462}
463
464impl SerCursor for SplitCursor<'_> {
465 fn key_valid(&self) -> bool {
466 self.cursor.key_valid() && !self.finished()
467 }
468
469 fn val_valid(&self) -> bool {
470 self.cursor.val_valid()
471 }
472
473 fn key(&self) -> &DynData {
474 self.cursor.key()
475 }
476
477 fn get_key(&self) -> Option<&DynData> {
478 if !self.key_valid() {
479 return None;
480 }
481
482 self.cursor.get_key()
483 }
484
485 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
486 self.cursor.serialize_key(dst)
487 }
488
489 fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
490 self.cursor.key_to_json()
491 }
492
493 fn serialize_key_fields(
494 &mut self,
495 fields: &HashSet<String>,
496 dst: &mut Vec<u8>,
497 ) -> AnyResult<()> {
498 self.cursor.serialize_key_fields(fields, dst)
499 }
500
501 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
502 self.cursor.serialize_key_to_arrow(dst)
503 }
504
505 fn serialize_key_to_arrow_with_metadata(
506 &mut self,
507 metadata: &dyn erased_serde::Serialize,
508 dst: &mut ArrayBuilder,
509 ) -> AnyResult<()> {
510 self.cursor
511 .serialize_key_to_arrow_with_metadata(metadata, dst)
512 }
513
514 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
515 self.cursor.serialize_val_to_arrow(dst)
516 }
517
518 fn serialize_val_to_arrow_with_metadata(
519 &mut self,
520 metadata: &dyn erased_serde::Serialize,
521 dst: &mut ArrayBuilder,
522 ) -> AnyResult<()> {
523 self.cursor
524 .serialize_val_to_arrow_with_metadata(metadata, dst)
525 }
526
527 #[cfg(feature = "with-avro")]
528 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
529 self.cursor.key_to_avro(schema, refs)
530 }
531
532 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
533 self.cursor.serialize_key_weight(dst)
534 }
535
536 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
537 self.cursor.serialize_val(dst)
538 }
539
540 fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
541 self.cursor.val_to_json()
542 }
543
544 #[cfg(feature = "with-avro")]
545 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
546 self.cursor.val_to_avro(schema, refs)
547 }
548
549 fn weight(&mut self) -> i64 {
550 self.cursor.weight()
551 }
552
553 fn step_key(&mut self) {
554 self.cursor.step_key();
555 }
556
557 fn step_val(&mut self) {
558 self.cursor.step_val();
559 }
560
561 fn rewind_keys(&mut self) {
562 self.cursor.rewind_keys();
563 self.cursor.seek_key(self.start_key.as_data());
564 }
565
566 fn rewind_vals(&mut self) {
567 self.cursor.rewind_vals();
568 }
569
570 fn seek_key_exact(&mut self, key: &DynData) -> bool {
571 if let Some(ref end_key) = self.end_key
572 && key >= end_key.as_data()
573 {
574 return false;
575 }
576
577 self.cursor.seek_key_exact(key)
578 }
579
580 fn seek_key(&mut self, key: &DynData) {
581 self.cursor.seek_key(key);
582 }
583}
584
585pub trait SerCursor: Send {
590 fn key_valid(&self) -> bool;
594
595 fn val_valid(&self) -> bool;
600
601 fn key(&self) -> &DynData;
602
603 fn get_key(&self) -> Option<&DynData>;
604
605 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
607
608 fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
611
612 fn serialize_key_fields(
614 &mut self,
615 fields: &HashSet<String>,
616 dst: &mut Vec<u8>,
617 ) -> AnyResult<()>;
618
619 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
621
622 fn serialize_key_to_arrow_with_metadata(
625 &mut self,
626 metadata: &dyn erased_serde::Serialize,
627 dst: &mut ArrayBuilder,
628 ) -> AnyResult<()>;
629
630 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
632
633 fn serialize_val_to_arrow_with_metadata(
636 &mut self,
637 metadata: &dyn erased_serde::Serialize,
638 dst: &mut ArrayBuilder,
639 ) -> AnyResult<()>;
640
641 #[cfg(feature = "with-avro")]
642 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
644
645 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
650
651 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
653
654 fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
657
658 #[cfg(feature = "with-avro")]
659 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
661
662 fn weight(&mut self) -> i64;
664
665 fn step_key(&mut self);
667
668 fn step_val(&mut self);
670
671 fn rewind_keys(&mut self);
673
674 fn rewind_vals(&mut self);
676
677 fn count_keys(&mut self) -> usize {
678 let mut count = 0;
679
680 while self.key_valid() {
681 count += 1;
682 self.step_key()
683 }
684
685 count
686 }
687
688 fn seek_key_exact(&mut self, key: &DynData) -> bool;
689
690 fn seek_key(&mut self, key: &DynData);
691}
692
693pub trait SerBatchReaderHandle: Send + Sync + DynClone {
700 fn num_nonempty_mailboxes(&self) -> usize;
702
703 fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
706
707 fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
710
711 fn concat(&self) -> Arc<dyn SerBatchReader>;
713}
714
715dyn_clone::clone_trait_object!(SerBatchReaderHandle);
716
717pub struct CursorWithPolarity<'a> {
727 cursor: Box<dyn SerCursor + 'a>,
728 second_pass: bool,
729}
730
731impl<'a> CursorWithPolarity<'a> {
732 pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
733 let mut result = Self {
734 cursor,
735 second_pass: false,
736 };
737
738 if result.key_valid() {
739 result.advance_val();
740 }
741
742 result
743 }
744
745 fn advance_val(&mut self) {
746 while self.cursor.val_valid()
747 && ((!self.second_pass && self.cursor.weight() >= 0)
748 || (self.second_pass && self.cursor.weight() <= 0))
749 {
750 self.step_val();
751 }
752 }
753}
754
755impl SerCursor for CursorWithPolarity<'_> {
756 fn key_valid(&self) -> bool {
757 self.cursor.key_valid()
758 }
759
760 fn val_valid(&self) -> bool {
761 self.cursor.val_valid()
762 }
763
764 fn key(&self) -> &DynData {
765 self.cursor.key()
766 }
767
768 fn get_key(&self) -> Option<&DynData> {
769 self.cursor.get_key()
770 }
771
772 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
773 self.cursor.serialize_key(dst)
774 }
775
776 fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
777 self.cursor.key_to_json()
778 }
779
780 fn serialize_key_fields(
781 &mut self,
782 fields: &HashSet<String>,
783 dst: &mut Vec<u8>,
784 ) -> AnyResult<()> {
785 self.cursor.serialize_key_fields(fields, dst)
786 }
787
788 #[cfg(feature = "with-avro")]
789 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
790 self.cursor.key_to_avro(schema, refs)
791 }
792
793 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
794 self.cursor.serialize_key_weight(dst)
795 }
796
797 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
798 self.cursor.serialize_key_to_arrow(dst)
799 }
800
801 fn serialize_key_to_arrow_with_metadata(
802 &mut self,
803 metadata: &dyn erased_serde::Serialize,
804 dst: &mut ArrayBuilder,
805 ) -> AnyResult<()> {
806 self.cursor
807 .serialize_key_to_arrow_with_metadata(metadata, dst)
808 }
809
810 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
811 self.cursor.serialize_val_to_arrow(dst)
812 }
813
814 fn serialize_val_to_arrow_with_metadata(
815 &mut self,
816 metadata: &dyn erased_serde::Serialize,
817 dst: &mut ArrayBuilder,
818 ) -> AnyResult<()> {
819 self.cursor
820 .serialize_val_to_arrow_with_metadata(metadata, dst)
821 }
822
823 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
824 self.cursor.serialize_val(dst)
825 }
826
827 fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
828 self.cursor.val_to_json()
829 }
830
831 #[cfg(feature = "with-avro")]
832 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
833 self.cursor.val_to_avro(schema, refs)
834 }
835
836 fn weight(&mut self) -> i64 {
837 self.cursor.weight()
838 }
839
840 fn step_key(&mut self) {
841 self.cursor.step_key();
842 if !self.cursor.key_valid() && !self.second_pass {
843 self.cursor.rewind_keys();
844 self.second_pass = true;
845 }
846
847 if self.cursor.key_valid() {
848 self.advance_val();
849 }
850 }
851
852 fn step_val(&mut self) {
853 self.cursor.step_val();
854 self.advance_val();
855 }
856
857 fn rewind_keys(&mut self) {
858 self.cursor.rewind_keys();
859 self.second_pass = false;
860 if self.cursor.key_valid() {
861 self.advance_val();
862 }
863 }
864
865 fn rewind_vals(&mut self) {
866 self.cursor.rewind_vals();
867 self.advance_val();
868 }
869
870 fn seek_key_exact(&mut self, key: &DynData) -> bool {
871 self.cursor.seek_key_exact(key)
872 }
873
874 fn seek_key(&mut self, key: &DynData) {
875 self.cursor.seek_key(key);
876 }
877}
878
879pub trait CircuitCatalog: Send + Sync {
881 fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
883
884 fn output_iter(
885 &self,
886 ) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
887
888 fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
890
891 fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
892
893 fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
895}
896
897#[doc(hidden)]
898pub struct InputCollectionHandle {
899 pub schema: Relation,
900 pub handle: Box<dyn DeCollectionHandle>,
901
902 pub node_id: NodeId,
908}
909
910impl InputCollectionHandle {
911 #[doc(hidden)]
912 pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
913 where
914 H: DeCollectionHandle + 'static,
915 {
916 Self {
917 schema,
918 handle: Box::new(handle),
919 node_id,
920 }
921 }
922}
923
924#[derive(Clone)]
926pub struct OutputCollectionHandles {
927 pub key_schema: Option<Relation>,
928 pub value_schema: Relation,
929
930 pub index_of: Option<SqlIdentifier>,
931
932 pub integrate_handle_is_indexed: bool,
934
935 pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
937
938 pub delta_handle: Box<dyn SerBatchReaderHandle>,
940
941 pub enable_count: Arc<AtomicUsize>,
945}