1use std::any::Any;
2use std::collections::HashSet;
3use std::fmt::{Debug, Formatter};
4use std::sync::atomic::AtomicUsize;
5use std::sync::{Arc, Mutex};
6
7use anyhow::Result as AnyResult;
8#[cfg(feature = "with-avro")]
9use apache_avro::{
10 Schema as AvroSchema,
11 schema::{Name as AvroName, NamesRef},
12 types::Value as AvroValue,
13};
14use arrow::record_batch::RecordBatch;
15use dbsp::circuit::NodeId;
16use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Erase, Factory};
17use dbsp::operator::StagedBuffers;
18use dyn_clone::DynClone;
19use feldera_sqllib::Variant;
20use feldera_types::format::csv::CsvParserConfig;
21use feldera_types::format::json::JsonFlavor;
22use feldera_types::program_schema::{Relation, SqlIdentifier};
23use feldera_types::serde_with_context::SqlSerdeConfig;
24use serde_arrow::ArrayBuilder;
25#[cfg(feature = "with-avro")]
26use std::collections::HashMap;
27
28use crate::errors::controller::ControllerError;
29use crate::format::InputBuffer;
30use crate::postprocess::PostprocessorRegistry;
31use crate::preprocess::PreprocessorRegistry;
32
33#[derive(Clone)]
36pub enum RecordFormat {
37 Json(JsonFlavor),
44 Csv(CsvParserConfig),
45 Parquet(SqlSerdeConfig),
46 #[cfg(feature = "with-avro")]
47 Avro,
48 Raw(String),
49}
50
51pub trait DeCollectionStream: Send + Sync + InputBuffer {
70 fn insert(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
80
81 fn delete(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
104
105 fn update(&mut self, data: &[u8], metadata: &Option<Variant>) -> AnyResult<()>;
117
118 fn reserve(&mut self, reservation: usize);
124
125 fn truncate(&mut self, len: usize);
127
128 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
135
136 fn fork(&self) -> Box<dyn DeCollectionStream>;
140}
141
142pub trait ArrowStream: InputBuffer + Send + Sync {
145 fn insert(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
151
152 fn delete(&mut self, data: &RecordBatch, metadata: &Option<Variant>) -> AnyResult<()>;
158
159 fn insert_with_polarities(
163 &mut self,
164 data: &RecordBatch,
165 polarities: &[bool],
166 metadata: &Option<Variant>,
167 ) -> AnyResult<()>;
168
169 fn fork(&self) -> Box<dyn ArrowStream>;
172
173 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
180}
181
182#[cfg(feature = "with-avro")]
183pub type AvroSchemaRefs = HashMap<AvroName, AvroSchema>;
184
185#[cfg(feature = "with-avro")]
188pub trait AvroStream: InputBuffer + Send + Sync {
189 fn insert(
199 &mut self,
200 data: &AvroValue,
201 schema: &AvroSchema,
202 refs: &AvroSchemaRefs,
203 n_bytes: usize,
204 metadata: &Option<Variant>,
205 ) -> AnyResult<()>;
206
207 fn delete(
208 &mut self,
209 data: &AvroValue,
210 schema: &AvroSchema,
211 refs: &AvroSchemaRefs,
212 n_bytes: usize,
213 metadata: &Option<Variant>,
214 ) -> AnyResult<()>;
215
216 fn fork(&self) -> Box<dyn AvroStream>;
219
220 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
227}
228
229pub trait DeCollectionHandle: Send + Sync {
232 fn configure_deserializer(
235 &self,
236 record_format: RecordFormat,
237 ) -> Result<Box<dyn DeCollectionStream>, ControllerError>;
238
239 fn configure_arrow_deserializer(
241 &self,
242 config: SqlSerdeConfig,
243 ) -> Result<Box<dyn ArrowStream>, ControllerError>;
244
245 #[cfg(feature = "with-avro")]
247 fn configure_avro_deserializer(&self) -> Result<Box<dyn AvroStream>, ControllerError>;
248
249 fn fork(&self) -> Box<dyn DeCollectionHandle>;
250}
251
252pub trait SerBatchReader: 'static + Send + Sync {
260 fn key_count(&self) -> usize;
262
263 fn len(&self) -> usize;
265
266 fn is_empty(&self) -> bool {
267 self.len() == 0
268 }
269
270 fn cursor<'a>(
273 &'a self,
274 record_format: RecordFormat,
275 ) -> Result<Box<dyn SerCursor + Send + 'a>, ControllerError>;
276
277 fn batches(&self) -> Vec<Arc<dyn SerBatch>>;
282
283 fn snapshot(&self) -> Arc<dyn SerBatchReader>;
284
285 fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
286
287 fn key_factory(&self) -> &'static dyn Factory<DynData>;
288
289 fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
290
291 fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
292}
293
294impl Debug for dyn SerBatchReader {
295 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
296 let mut cursor = self
297 .cursor(RecordFormat::Json(Default::default()))
298 .map_err(|_| std::fmt::Error)?;
299 let mut key = Vec::new();
300 let mut val = Vec::new();
301 while cursor.key_valid() {
302 cursor
303 .serialize_key(&mut key)
304 .map_err(|_| std::fmt::Error)?;
305 write!(f, "{}=>{{", String::from_utf8_lossy(&key))?;
306
307 while cursor.val_valid() {
308 cursor
309 .serialize_val(&mut val)
310 .map_err(|_| std::fmt::Error)?;
311 write!(
312 f,
313 "{}=>{}, ",
314 String::from_utf8_lossy(&val),
315 cursor.weight()
316 )?;
317
318 val.clear();
319 cursor.step_val();
320 }
321
322 write!(f, "}}, ")?;
323 key.clear();
324 cursor.step_key();
325 }
326
327 Ok(())
328 }
329}
330
331impl Debug for dyn SerBatch {
332 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
333 self.as_batch_reader().fmt(f)
334 }
335}
336
337pub trait SerBatch: SerBatchReader {
339 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Sync + Send>;
342
343 fn merge(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatch>;
345
346 fn concat(self: Arc<Self>, other: Vec<Arc<dyn SerBatch>>) -> Arc<dyn SerBatchReader>;
349
350 fn as_batch_reader(&self) -> &dyn SerBatchReader;
351
352 fn arc_as_batch_reader(self: Arc<Self>) -> Arc<dyn SerBatchReader>;
353
354 fn into_trace(self: Arc<Self>) -> Box<dyn SerTrace>;
356}
357
358pub trait SerTrace: SerBatchReader {
360 fn insert(&mut self, batch: Arc<dyn SerBatch>);
362
363 fn insert_without_blocking(&mut self, batch: Arc<dyn SerBatch>) -> bool;
364
365 fn backpressure_wait(&self);
366
367 fn as_batch_reader(&self) -> &dyn SerBatchReader;
368}
369
370#[doc(hidden)]
371pub struct SplitCursorBuilder {
372 batch: Arc<dyn SerBatchReader>,
373 start_key: Box<DynData>,
374 end_key: Option<Box<DynData>>,
375 format: RecordFormat,
376}
377
378impl SplitCursorBuilder {
379 pub fn from_bounds(
391 batch: Arc<dyn SerBatchReader>,
392 bounds: &DynVec<DynData>,
393 index: usize,
394 format: RecordFormat,
395 ) -> Option<Self> {
396 let start_bound = if index == 0 {
397 None
398 } else if index <= bounds.len() {
399 Some(bounds.index(index - 1).as_data())
400 } else {
401 None
402 };
403
404 let end_bound = if index < bounds.len() {
405 Some(bounds.index(index).as_data())
406 } else {
407 None
408 };
409
410 let start_key = {
411 let mut cursor = batch.cursor(format.clone()).unwrap();
412
413 if let Some(start_bound) = start_bound {
415 cursor.seek_key_exact(start_bound);
416 }
417
418 cursor.get_key().map(|s| {
420 let mut key = batch.key_factory().default_box();
421 s.clone_to(key.as_mut());
422 key
423 })
424 }?;
425
426 let end_key = end_bound.map(|e| {
427 let mut key = batch.key_factory().default_box();
428 e.clone_to(key.as_mut());
429 key
430 });
431
432 Some(SplitCursorBuilder {
433 batch,
434 start_key,
435 end_key,
436 format,
437 })
438 }
439
440 pub fn build<'a>(&'a self) -> SplitCursor<'a> {
441 let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
442
443 cursor.seek_key(self.start_key.as_data());
445
446 SplitCursor {
447 cursor,
448 start_key: self.start_key.clone(),
449 end_key: self.end_key.clone(),
450 }
451 }
452}
453
454#[doc(hidden)]
455pub struct SplitCursor<'a> {
456 cursor: Box<dyn SerCursor + 'a>,
457 start_key: Box<DynData>,
458 end_key: Option<Box<DynData>>,
459}
460
461impl SplitCursor<'_> {
462 fn finished(&self) -> bool {
463 if let Some(ref end_key) = self.end_key
464 && let Some(current_key) = self.cursor.get_key()
465 {
466 return current_key >= end_key.as_data();
467 }
468
469 false
470 }
471}
472
473impl SerCursor for SplitCursor<'_> {
474 fn key_valid(&self) -> bool {
475 self.cursor.key_valid() && !self.finished()
476 }
477
478 fn val_valid(&self) -> bool {
479 self.cursor.val_valid()
480 }
481
482 fn key(&self) -> &DynData {
483 self.cursor.key()
484 }
485
486 fn val(&self) -> &DynData {
487 self.cursor.val()
488 }
489
490 fn get_key(&self) -> Option<&DynData> {
491 if !self.key_valid() {
492 return None;
493 }
494
495 self.cursor.get_key()
496 }
497
498 fn get_val(&self) -> Option<&DynData> {
499 self.cursor.get_val()
500 }
501
502 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
503 self.cursor.serialize_key(dst)
504 }
505
506 fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
507 self.cursor.key_to_json()
508 }
509
510 fn serialize_key_fields(
511 &mut self,
512 fields: &HashSet<String>,
513 dst: &mut Vec<u8>,
514 ) -> AnyResult<()> {
515 self.cursor.serialize_key_fields(fields, dst)
516 }
517
518 fn serialize_val_fields(
519 &mut self,
520 fields: &HashSet<String>,
521 dst: &mut Vec<u8>,
522 ) -> AnyResult<()> {
523 self.cursor.serialize_val_fields(fields, dst)
524 }
525
526 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
527 self.cursor.serialize_key_to_arrow(dst)
528 }
529
530 fn serialize_key_to_arrow_with_metadata(
531 &mut self,
532 metadata: &dyn erased_serde::Serialize,
533 dst: &mut ArrayBuilder,
534 ) -> AnyResult<()> {
535 self.cursor
536 .serialize_key_to_arrow_with_metadata(metadata, dst)
537 }
538
539 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
540 self.cursor.serialize_val_to_arrow(dst)
541 }
542
543 fn serialize_val_to_arrow_with_metadata(
544 &mut self,
545 metadata: &dyn erased_serde::Serialize,
546 dst: &mut ArrayBuilder,
547 ) -> AnyResult<()> {
548 self.cursor
549 .serialize_val_to_arrow_with_metadata(metadata, dst)
550 }
551
552 #[cfg(feature = "with-avro")]
553 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
554 self.cursor.key_to_avro(schema, refs)
555 }
556
557 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
558 self.cursor.serialize_key_weight(dst)
559 }
560
561 fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
562 self.cursor.serialize_val_weight(dst)
563 }
564
565 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
566 self.cursor.serialize_val(dst)
567 }
568
569 fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
570 self.cursor.val_to_json()
571 }
572
573 #[cfg(feature = "with-avro")]
574 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
575 self.cursor.val_to_avro(schema, refs)
576 }
577
578 fn weight(&mut self) -> i64 {
579 self.cursor.weight()
580 }
581
582 fn step_key(&mut self) {
583 self.cursor.step_key();
584 }
585
586 fn step_val(&mut self) {
587 self.cursor.step_val();
588 }
589
590 fn rewind_keys(&mut self) {
591 self.cursor.rewind_keys();
592 self.cursor.seek_key(self.start_key.as_data());
593 }
594
595 fn rewind_vals(&mut self) {
596 self.cursor.rewind_vals();
597 }
598
599 fn seek_key_exact(&mut self, key: &DynData) -> bool {
600 if let Some(ref end_key) = self.end_key
601 && key >= end_key.as_data()
602 {
603 return false;
604 }
605
606 self.cursor.seek_key_exact(key)
607 }
608
609 fn seek_key(&mut self, key: &DynData) {
610 self.cursor.seek_key(key);
611 }
612}
613
614pub struct SerCursorFlattened<'a> {
621 val_valid: bool,
622 cursor: Box<dyn SerCursor + 'a>,
623}
624
625impl<'a> SerCursorFlattened<'a> {
626 pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
627 Self {
628 cursor,
629 val_valid: true,
630 }
631 }
632}
633
634impl<'a> SerCursor for SerCursorFlattened<'a> {
635 fn key_valid(&self) -> bool {
636 self.cursor.key_valid() && self.cursor.val_valid()
637 }
638
639 fn val_valid(&self) -> bool {
640 self.val_valid
641 }
642
643 fn key(&self) -> &DynData {
644 self.cursor.val()
645 }
646
647 fn get_key(&self) -> Option<&DynData> {
648 self.cursor.get_val()
649 }
650
651 fn val(&self) -> &DynData {
652 ().erase()
653 }
654
655 fn get_val(&self) -> Option<&DynData> {
656 if self.val_valid {
657 Some(().erase())
658 } else {
659 None
660 }
661 }
662
663 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
664 self.cursor.serialize_val(dst)
665 }
666
667 fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
668 self.cursor.val_to_json()
669 }
670
671 fn serialize_key_fields(
672 &mut self,
673 fields: &HashSet<String>,
674 dst: &mut Vec<u8>,
675 ) -> AnyResult<()> {
676 self.cursor.serialize_val_fields(fields, dst)
677 }
678
679 fn serialize_val_fields(
680 &mut self,
681 _fields: &HashSet<String>,
682 _dst: &mut Vec<u8>,
683 ) -> AnyResult<()> {
684 panic!("serialize_val_fields is not supported for flattened cursors");
685 }
686
687 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
688 self.cursor.serialize_val_to_arrow(dst)
689 }
690
691 fn serialize_key_to_arrow_with_metadata(
692 &mut self,
693 metadata: &dyn erased_serde::Serialize,
694 dst: &mut ArrayBuilder,
695 ) -> AnyResult<()> {
696 self.cursor
697 .serialize_val_to_arrow_with_metadata(metadata, dst)
698 }
699
700 fn serialize_val_to_arrow(&mut self, _dst: &mut ArrayBuilder) -> AnyResult<()> {
701 panic!("serialize_val_to_arrow is not supported for flattened cursors");
702 }
703
704 fn serialize_val_to_arrow_with_metadata(
705 &mut self,
706 _metadata: &dyn erased_serde::Serialize,
707 _dst: &mut ArrayBuilder,
708 ) -> AnyResult<()> {
709 panic!("serialize_val_to_arrow_with_metadata is not supported for flattened cursors");
710 }
711
712 #[cfg(feature = "with-avro")]
713 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
714 self.cursor.val_to_avro(schema, refs)
715 }
716
717 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
718 self.cursor.serialize_val_weight(dst)
719 }
720
721 fn serialize_val_weight(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
722 panic!("serialize_val_weight is not supported for flattened cursors");
723 }
724
725 fn serialize_val(&mut self, _dst: &mut Vec<u8>) -> AnyResult<()> {
726 panic!("serialize_val is not supported for flattened cursors");
727 }
728
729 fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
730 panic!("val_to_json is not supported for flattened cursors");
731 }
732
733 #[cfg(feature = "with-avro")]
734 fn val_to_avro(&mut self, _schema: &AvroSchema, _refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
735 panic!("val_to_avro is not supported for flattened cursors");
736 }
737
738 fn weight(&mut self) -> i64 {
739 self.cursor.weight()
740 }
741
742 fn step_key(&mut self) {
743 debug_assert!(self.cursor.key_valid() && self.cursor.val_valid());
744 self.cursor.step_val();
745 while !self.cursor.val_valid() && self.cursor.key_valid() {
746 self.cursor.step_key();
747 }
748 self.val_valid = true;
749 }
750
751 fn step_val(&mut self) {
752 self.val_valid = false;
753 }
754
755 fn rewind_keys(&mut self) {
756 self.cursor.rewind_keys();
757 self.val_valid = true;
758 }
759
760 fn rewind_vals(&mut self) {
761 self.val_valid = true;
762 }
763
764 fn seek_key_exact(&mut self, _key: &DynData) -> bool {
765 panic!("seek_key_exact is not supported for flattened cursors");
766 }
767
768 fn seek_key(&mut self, _key: &DynData) {
769 panic!("seek_key is not supported for flattened cursors");
770 }
771}
772
773pub trait SerCursor: Send {
778 fn key_valid(&self) -> bool;
782
783 fn val_valid(&self) -> bool;
788
789 fn key(&self) -> &DynData;
790
791 fn get_key(&self) -> Option<&DynData>;
792
793 fn val(&self) -> &DynData;
794
795 fn get_val(&self) -> Option<&DynData>;
796
797 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
799
800 fn key_to_json(&mut self) -> AnyResult<serde_json::Value>;
803
804 fn serialize_key_fields(
806 &mut self,
807 fields: &HashSet<String>,
808 dst: &mut Vec<u8>,
809 ) -> AnyResult<()>;
810
811 fn serialize_val_fields(
812 &mut self,
813 fields: &HashSet<String>,
814 dst: &mut Vec<u8>,
815 ) -> AnyResult<()>;
816
817 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
819
820 fn serialize_key_to_arrow_with_metadata(
823 &mut self,
824 metadata: &dyn erased_serde::Serialize,
825 dst: &mut ArrayBuilder,
826 ) -> AnyResult<()>;
827
828 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()>;
830
831 fn serialize_val_to_arrow_with_metadata(
834 &mut self,
835 metadata: &dyn erased_serde::Serialize,
836 dst: &mut ArrayBuilder,
837 ) -> AnyResult<()>;
838
839 #[cfg(feature = "with-avro")]
840 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
842
843 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
848
849 fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
850
851 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()>;
853
854 fn val_to_json(&mut self) -> AnyResult<serde_json::Value>;
857
858 #[cfg(feature = "with-avro")]
859 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue>;
861
862 fn weight(&mut self) -> i64;
864
865 fn step_key(&mut self);
867
868 fn step_val(&mut self);
870
871 fn rewind_keys(&mut self);
873
874 fn rewind_vals(&mut self);
876
877 fn count_keys(&mut self) -> usize {
878 let mut count = 0;
879
880 while self.key_valid() {
881 count += 1;
882 self.step_key()
883 }
884
885 count
886 }
887
888 fn seek_key_exact(&mut self, key: &DynData) -> bool;
889
890 fn seek_key(&mut self, key: &DynData);
891}
892
893pub trait SerBatchReaderHandle: Send + Sync + DynClone {
900 fn num_nonempty_mailboxes(&self) -> usize;
902
903 fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
906
907 fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
910
911 fn concat(&self) -> Arc<dyn SerBatchReader>;
913}
914
915dyn_clone::clone_trait_object!(SerBatchReaderHandle);
916
917pub struct CursorWithPolarity<'a> {
927 cursor: Box<dyn SerCursor + 'a>,
928 second_pass: bool,
929}
930
931impl<'a> CursorWithPolarity<'a> {
932 pub fn new(cursor: Box<dyn SerCursor + 'a>) -> Self {
933 let mut result = Self {
934 cursor,
935 second_pass: false,
936 };
937
938 if result.key_valid() {
939 result.advance_val();
940 }
941
942 result
943 }
944
945 fn advance_val(&mut self) {
946 while self.cursor.val_valid()
947 && ((!self.second_pass && self.cursor.weight() >= 0)
948 || (self.second_pass && self.cursor.weight() <= 0))
949 {
950 self.step_val();
951 }
952 }
953}
954
955impl SerCursor for CursorWithPolarity<'_> {
956 fn key_valid(&self) -> bool {
957 self.cursor.key_valid()
958 }
959
960 fn val_valid(&self) -> bool {
961 self.cursor.val_valid()
962 }
963
964 fn key(&self) -> &DynData {
965 self.cursor.key()
966 }
967
968 fn get_key(&self) -> Option<&DynData> {
969 self.cursor.get_key()
970 }
971
972 fn val(&self) -> &DynData {
973 self.cursor.val()
974 }
975
976 fn get_val(&self) -> Option<&DynData> {
977 self.cursor.get_val()
978 }
979
980 fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
981 self.cursor.serialize_key(dst)
982 }
983
984 fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
985 self.cursor.key_to_json()
986 }
987
988 fn serialize_key_fields(
989 &mut self,
990 fields: &HashSet<String>,
991 dst: &mut Vec<u8>,
992 ) -> AnyResult<()> {
993 self.cursor.serialize_key_fields(fields, dst)
994 }
995
996 fn serialize_val_fields(
997 &mut self,
998 fields: &HashSet<String>,
999 dst: &mut Vec<u8>,
1000 ) -> AnyResult<()> {
1001 self.cursor.serialize_val_fields(fields, dst)
1002 }
1003
1004 #[cfg(feature = "with-avro")]
1005 fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
1006 self.cursor.key_to_avro(schema, refs)
1007 }
1008
1009 fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1010 self.cursor.serialize_key_weight(dst)
1011 }
1012
1013 fn serialize_val_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1014 self.cursor.serialize_val_weight(dst)
1015 }
1016
1017 fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
1018 self.cursor.serialize_key_to_arrow(dst)
1019 }
1020
1021 fn serialize_key_to_arrow_with_metadata(
1022 &mut self,
1023 metadata: &dyn erased_serde::Serialize,
1024 dst: &mut ArrayBuilder,
1025 ) -> AnyResult<()> {
1026 self.cursor
1027 .serialize_key_to_arrow_with_metadata(metadata, dst)
1028 }
1029
1030 fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
1031 self.cursor.serialize_val_to_arrow(dst)
1032 }
1033
1034 fn serialize_val_to_arrow_with_metadata(
1035 &mut self,
1036 metadata: &dyn erased_serde::Serialize,
1037 dst: &mut ArrayBuilder,
1038 ) -> AnyResult<()> {
1039 self.cursor
1040 .serialize_val_to_arrow_with_metadata(metadata, dst)
1041 }
1042
1043 fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
1044 self.cursor.serialize_val(dst)
1045 }
1046
1047 fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
1048 self.cursor.val_to_json()
1049 }
1050
1051 #[cfg(feature = "with-avro")]
1052 fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
1053 self.cursor.val_to_avro(schema, refs)
1054 }
1055
1056 fn weight(&mut self) -> i64 {
1057 self.cursor.weight()
1058 }
1059
1060 fn step_key(&mut self) {
1061 self.cursor.step_key();
1062 if !self.cursor.key_valid() && !self.second_pass {
1063 self.cursor.rewind_keys();
1064 self.second_pass = true;
1065 }
1066
1067 if self.cursor.key_valid() {
1068 self.advance_val();
1069 }
1070 }
1071
1072 fn step_val(&mut self) {
1073 self.cursor.step_val();
1074 self.advance_val();
1075 }
1076
1077 fn rewind_keys(&mut self) {
1078 self.cursor.rewind_keys();
1079 self.second_pass = false;
1080 if self.cursor.key_valid() {
1081 self.advance_val();
1082 }
1083 }
1084
1085 fn rewind_vals(&mut self) {
1086 self.cursor.rewind_vals();
1087 self.advance_val();
1088 }
1089
1090 fn seek_key_exact(&mut self, key: &DynData) -> bool {
1091 self.cursor.seek_key_exact(key)
1092 }
1093
1094 fn seek_key(&mut self, key: &DynData) {
1095 self.cursor.seek_key(key);
1096 }
1097}
1098
1099pub trait CircuitCatalog: Send + Sync {
1101 fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
1103
1104 fn output_iter(
1105 &self,
1106 ) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
1107
1108 fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
1110
1111 fn index_handles(
1113 &self,
1114 endpoint_name: &str,
1115 stream: &SqlIdentifier,
1116 index: &SqlIdentifier,
1117 ) -> Result<&OutputCollectionHandles, ControllerError>;
1118
1119 fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles>;
1120
1121 fn preprocessor_registry(&self) -> Arc<Mutex<PreprocessorRegistry>>;
1123
1124 fn postprocessor_registry(&self) -> Arc<Mutex<PostprocessorRegistry>>;
1126}
1127
1128#[doc(hidden)]
1129pub struct InputCollectionHandle {
1130 pub schema: Relation,
1131 pub handle: Box<dyn DeCollectionHandle>,
1132
1133 pub node_id: NodeId,
1139}
1140
1141impl InputCollectionHandle {
1142 #[doc(hidden)]
1143 pub fn new<H>(schema: Relation, handle: H, node_id: NodeId) -> Self
1144 where
1145 H: DeCollectionHandle + 'static,
1146 {
1147 Self {
1148 schema,
1149 handle: Box::new(handle),
1150 node_id,
1151 }
1152 }
1153}
1154
1155#[derive(Clone)]
1157pub struct OutputCollectionHandles {
1158 pub key_schema: Option<Relation>,
1162
1163 pub value_schema: Relation,
1168
1169 pub index_of: Option<SqlIdentifier>,
1171
1172 pub alias_as_index: Option<SqlIdentifier>,
1174
1175 pub integrate_handle: Option<Arc<dyn SerBatchReaderHandle>>,
1177
1178 pub delta_handle: Box<dyn SerBatchReaderHandle>,
1180
1181 pub enable_count: Arc<AtomicUsize>,
1185}
1186
1187impl OutputCollectionHandles {
1188 pub fn is_indexed(&self) -> bool {
1190 self.key_schema.is_some()
1191 }
1192}