1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::max;
4use std::fmt::{Display, Error as FmtError, Formatter};
5use std::fs::File;
6use std::hash::Hasher;
7use std::io::{Error as IoError, Read};
8use std::ops::{Add, AddAssign, Range};
9use std::sync::Arc;
10
11use actix_web::HttpRequest;
12use anyhow::Result as AnyResult;
13use dbsp::operator::input::StagedBuffers;
14use erased_serde::Serialize as ErasedSerialize;
15use feldera_types::config::ConnectorConfig;
16use feldera_types::program_schema::Relation;
17use feldera_types::serde_with_context::FieldParseError;
18use serde::Serialize;
19use serde::de::StdError;
20
21use crate::ConnectorMetadata;
22use crate::catalog::{InputCollectionHandle, SerBatchReader};
23use crate::errors::controller::ControllerError;
24use crate::preprocess::Preprocessor;
25use crate::transport::Step;
26
27pub trait InputFormat: Send + Sync {
31 fn name(&self) -> Cow<'static, str>;
33
34 fn config_from_http_request(
48 &self,
49 endpoint_name: &str,
50 request: &HttpRequest,
51 ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
52
53 fn new_parser(
61 &self,
62 endpoint_name: &str,
63 input_stream: &InputCollectionHandle,
64 config: &serde_json::Value,
65 ) -> Result<Box<dyn Parser>, ControllerError>;
66}
67
68pub trait InputBuffer: Any + Send {
94 fn flush(&mut self);
97
98 fn len(&self) -> BufferSize;
100
101 fn hash(&self, hasher: &mut dyn Hasher);
109
110 fn is_empty(&self) -> bool {
111 self.len().is_empty()
112 }
113
114 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
134
135 fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
136 self.take_some(usize::MAX)
137 }
138}
139
140#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
142pub struct BufferSize {
143 pub records: usize,
145
146 pub bytes: usize,
151}
152
153impl BufferSize {
154 pub fn empty() -> Self {
156 Self::default()
157 }
158
159 pub fn is_empty(&self) -> bool {
161 self.records == 0 && self.bytes == 0
162 }
163}
164
165impl Add for BufferSize {
166 type Output = Self;
167
168 fn add(self, rhs: Self) -> Self::Output {
169 BufferSize {
170 records: self.records + rhs.records,
171 bytes: self.bytes + rhs.bytes,
172 }
173 }
174}
175
176impl AddAssign for BufferSize {
177 fn add_assign(&mut self, rhs: Self) {
178 self.records += rhs.records;
179 self.bytes += rhs.bytes;
180 }
181}
182
183impl InputBuffer for Option<Box<dyn InputBuffer>> {
184 fn len(&self) -> BufferSize {
185 self.as_ref()
186 .map_or(BufferSize::empty(), |buffer| buffer.len())
187 }
188
189 fn hash(&self, hasher: &mut dyn Hasher) {
190 if let Some(buffer) = self {
191 buffer.hash(hasher)
192 }
193 }
194
195 fn flush(&mut self) {
196 if let Some(buffer) = self.as_mut() {
197 buffer.flush()
198 }
199 }
200
201 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
202 self.as_mut().and_then(|buffer| buffer.take_some(n))
203 }
204}
205
206impl InputBuffer for Box<dyn InputBuffer> {
207 fn len(&self) -> BufferSize {
208 self.as_ref().len()
209 }
210
211 fn hash(&self, hasher: &mut dyn Hasher) {
212 self.as_ref().hash(hasher)
213 }
214
215 fn flush(&mut self) {
216 self.as_mut().flush()
217 }
218
219 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
220 self.as_mut().take_some(n)
221 }
222}
223
224impl InputBuffer for Vec<Box<dyn InputBuffer>> {
225 fn flush(&mut self) {
226 for v in self.iter_mut() {
227 v.flush();
228 }
229 }
230
231 fn len(&self) -> BufferSize {
232 let mut size = BufferSize::empty();
233 for v in self.iter() {
234 size += v.len();
235 }
236 size
237 }
238
239 fn hash(&self, hasher: &mut dyn Hasher) {
240 for v in self.iter() {
241 v.hash(hasher);
242 }
243 }
244
245 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
246 let mut result = Vec::new();
247 let mut remaining = n;
248 let mut index = 0;
250 for v in self.iter_mut() {
251 if remaining == 0 {
252 break;
253 }
254 let buf = v.take_some(remaining);
255 if let Some(ib) = buf {
256 let len = ib.len().records;
257 if remaining >= len {
258 index += 1;
260 }
261 remaining = remaining.saturating_sub(len);
262 result.push(ib);
263 }
264 }
265 self.drain(0..index);
266 if result.is_empty() {
267 None
268 } else {
269 Some(Box::new(result))
270 }
271 }
272}
273
274pub fn flatten_nested<T>(buffers: Vec<Box<dyn InputBuffer>>) -> Vec<Box<T>>
277where
278 T: Any,
279{
280 fn inner<T>(input: Vec<Box<dyn InputBuffer>>, output: &mut Vec<Box<T>>)
281 where
282 T: Any,
283 {
284 for buffer in input {
285 let any = buffer as Box<dyn Any>;
286 match any.downcast::<Vec<Box<dyn InputBuffer>>>() {
287 Ok(vec) => inner(*vec, output),
288 Err(any) => output.push(any.downcast().unwrap()),
289 }
290 }
291 }
292
293 let mut output = Vec::new();
294 inner(buffers, &mut output);
295 output
296}
297
298pub struct StagedInputBuffer {
314 buffer: Box<dyn StagedBuffers>,
315 size: BufferSize,
316}
317
318impl StagedInputBuffer {
319 pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
320 Self { buffer, size }
321 }
322}
323
324impl InputBuffer for StagedInputBuffer {
325 fn flush(&mut self) {
326 self.buffer.flush()
327 }
328
329 fn len(&self) -> BufferSize {
330 self.size
331 }
332
333 fn hash(&self, _hasher: &mut dyn Hasher) {
334 unimplemented!()
335 }
336
337 fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
338 unimplemented!()
339 }
340}
341
342pub trait Parser: Send + Sync {
344 fn parse(
350 &mut self,
351 data: &[u8],
352 metadata: Option<ConnectorMetadata>,
353 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
354
355 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
360
361 fn splitter(&self) -> Box<dyn Splitter>;
364
365 fn fork(&self) -> Box<dyn Parser>;
370}
371
372pub struct StreamingPreprocessedParser {
374 preprocessor: Box<dyn Preprocessor>,
375 stream_splitter: StreamSplitter,
376 parser: Box<dyn Parser>,
377}
378
379impl StreamingPreprocessedParser {
380 pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
381 Self {
382 preprocessor,
383 stream_splitter: StreamSplitter::new(parser.splitter()),
384 parser,
385 }
386 }
387}
388
389impl Parser for StreamingPreprocessedParser {
390 fn parse(
391 &mut self,
392 data: &[u8],
393 metadata: Option<ConnectorMetadata>,
394 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
395 let (pre_data, mut pre_errors) = self.preprocessor.process(data);
396 self.stream_splitter.append(&pre_data);
397 let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
398 while let Some(chunk) = self.stream_splitter.next(true) {
399 let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
400 pre_errors.append(&mut parse_errors);
401 if let Some(data) = parsed_data {
402 parsed.push(data);
403 }
404 }
405 if parsed.is_empty() {
406 (None, pre_errors)
407 } else {
408 (Some(Box::new(parsed)), pre_errors)
409 }
410 }
411
412 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
413 self.parser.stage(buffers)
414 }
415
416 fn splitter(&self) -> Box<dyn Splitter> {
417 let pre_splitter = self.preprocessor.splitter();
418 if let Some(splitter) = pre_splitter {
419 return splitter;
420 }
421 self.parser.splitter()
422 }
423
424 fn fork(&self) -> Box<dyn Parser> {
425 Box::new(StreamingPreprocessedParser::new(
426 self.preprocessor.fork(),
427 self.parser.fork(),
428 ))
429 }
430}
431
432pub struct MessageOrientedPreprocessedParser {
434 preprocessor: Box<dyn Preprocessor>,
435 parser: Box<dyn Parser>,
436}
437
438impl MessageOrientedPreprocessedParser {
439 pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
440 Self {
441 preprocessor,
442 parser,
443 }
444 }
445}
446
447impl Parser for MessageOrientedPreprocessedParser {
448 fn parse(
449 &mut self,
450 data: &[u8],
451 metadata: Option<ConnectorMetadata>,
452 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
453 let (pre_data, mut pre_errors) = self.preprocessor.process(data);
454 let mut parser_splitter = self.parser.splitter();
455 let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
456 let mut remaining = pre_data.as_slice();
457 while !remaining.is_empty() {
459 let chunk;
460 let split_offset = parser_splitter.input(remaining).unwrap_or(remaining.len());
461 (chunk, remaining) = remaining.split_at(split_offset);
462 let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
463 pre_errors.append(&mut parse_errors);
464 if let Some(data) = parsed_data {
465 parsed.push(data);
466 }
467 }
468 if parsed.is_empty() {
469 (None, pre_errors)
470 } else {
471 (Some(Box::new(parsed)), pre_errors)
472 }
473 }
474
475 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
476 self.parser.stage(buffers)
477 }
478
479 fn splitter(&self) -> Box<dyn Splitter> {
480 let pre_splitter = self.preprocessor.splitter();
481 if let Some(splitter) = pre_splitter {
482 return splitter;
483 }
484 self.parser.splitter()
485 }
486
487 fn fork(&self) -> Box<dyn Parser> {
488 Box::new(MessageOrientedPreprocessedParser::new(
489 self.preprocessor.fork(),
490 self.parser.fork(),
491 ))
492 }
493}
494
495pub trait Splitter: Send + Sync {
501 fn input(&mut self, data: &[u8]) -> Option<usize>;
510
511 fn clear(&mut self);
514}
515
516pub struct StreamSplitter {
523 buffer: Vec<u8>,
524 start: u64,
525 fragment: Range<usize>,
526 fed: usize,
527 splitter: Box<dyn Splitter>,
528}
529
530impl StreamSplitter {
531 pub fn new(splitter: Box<dyn Splitter>) -> Self {
533 Self {
534 buffer: Vec::new(),
535 start: 0,
536 fragment: 0..0,
537 fed: 0,
538 splitter,
539 }
540 }
541
542 pub fn next(&mut self, eoi: bool) -> Option<&[u8]> {
546 match self
547 .splitter
548 .input(&self.buffer[self.fed..self.fragment.end])
549 {
550 Some(n) => {
551 let chunk = &self.buffer[self.fragment.start..self.fed + n];
552 self.fed += n;
553 self.fragment.start = self.fed;
554 Some(chunk)
555 }
556 None => {
557 self.fed = self.fragment.end;
558 if eoi && !self.fragment.is_empty() {
559 let chunk = &self.buffer[self.fragment.clone()];
560 self.fragment.start = self.fragment.end;
561 Some(chunk)
562 } else {
563 None
564 }
565 }
566 }
567 }
568
569 pub fn append(&mut self, data: &[u8]) {
571 let final_len = self.fragment.len() + data.len();
572 if final_len > self.buffer.len() {
573 self.buffer.reserve(final_len - self.buffer.len());
574 }
575 self.buffer.copy_within(self.fragment.clone(), 0);
576 self.buffer.resize(self.fragment.len(), 0);
577 self.buffer.extend(data);
578 self.fed -= self.fragment.start;
579 self.start += self.fragment.start as u64;
580 self.fragment = 0..self.buffer.len();
581 }
582
583 pub fn read(
587 &mut self,
588 file: &mut File,
589 buffer_size: usize,
590 limit: usize,
591 ) -> Result<usize, IoError> {
592 if self.fragment.start != 0 {
594 self.buffer.copy_within(self.fragment.clone(), 0);
595 self.fed -= self.fragment.start;
596 self.start += self.fragment.start as u64;
597 self.fragment = 0..self.fragment.len();
598 }
599
600 if self.fragment.len() == self.buffer.len() {
602 self.buffer
603 .resize(max(buffer_size, self.buffer.capacity() * 2), 0);
604 }
605
606 let mut space = &mut self.buffer[self.fragment.len()..];
608 if space.len() > limit {
609 space = &mut space[..limit];
610 }
611 let result = file.read(space);
612 if let Ok(n) = result {
613 self.fragment.end += n;
614 }
615 result
616 }
617
618 pub fn position(&self) -> u64 {
621 self.start + self.fragment.start as u64
622 }
623
624 pub fn seek(&mut self, offset: u64) {
627 self.start = offset;
628 self.fragment = 0..0;
629 self.fed = 0;
630 self.splitter.clear();
631 }
632
633 pub fn reset(&mut self) {
635 self.seek(0);
636 }
637}
638
639pub trait OutputFormat: Send + Sync {
640 fn name(&self) -> Cow<'static, str>;
642
643 fn config_from_http_request(
648 &self,
649 endpoint_name: &str,
650 request: &HttpRequest,
651 ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
652
653 fn new_encoder(
661 &self,
662 endpoint_name: &str,
663 config: &ConnectorConfig,
664 key_schema: &Option<Relation>,
665 value_schema: &Relation,
666 consumer: Box<dyn OutputConsumer>,
667 ) -> Result<Box<dyn Encoder>, ControllerError>;
668}
669
670pub trait Encoder: Send {
671 fn consumer(&mut self) -> &mut dyn OutputConsumer;
673
674 fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
677}
678
679#[doc(hidden)]
680pub trait OutputConsumer: Send {
681 fn max_buffer_size_bytes(&self) -> usize;
684
685 fn batch_start(&mut self, step: Step);
686
687 fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
689
690 fn push_key(
692 &mut self,
693 key: Option<&[u8]>,
694 val: Option<&[u8]>,
695 headers: &[(&str, Option<&[u8]>)],
696 num_records: usize,
697 );
698 fn batch_end(&mut self);
699
700 fn memory(&self) -> usize {
705 0
706 }
707}
708
709pub const MAX_DUPLICATES: i64 = 1_000_000;
715
716pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
719
720#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
722#[serde(transparent)]
723pub struct ParseError(Box<ParseErrorInner>);
726impl Display for ParseError {
727 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
728 self.0.fmt(f)
729 }
730}
731
732impl StdError for ParseError {}
733
734impl ParseError {
735 pub fn new(
736 description: String,
737 event_number: Option<u64>,
738 field: Option<String>,
739 invalid_text: Option<&str>,
740 invalid_bytes: Option<&[u8]>,
741 suggestion: Option<Cow<'static, str>>,
742 error_tag: Option<String>,
743 ) -> Self {
744 Self(Box::new(ParseErrorInner::new(
745 description,
746 event_number,
747 field,
748 invalid_text,
749 invalid_bytes,
750 suggestion,
751 error_tag,
752 )))
753 }
754
755 pub fn text_event_error<E>(
756 msg: &str,
757 error: E,
758 event_number: u64,
759 invalid_text: Option<&str>,
760 suggestion: Option<Cow<'static, str>>,
761 ) -> Self
762 where
763 E: ToString,
764 {
765 Self(Box::new(ParseErrorInner::text_event_error(
766 msg,
767 error,
768 event_number,
769 invalid_text,
770 suggestion,
771 )))
772 }
773
774 pub fn text_envelope_error(
775 description: String,
776 invalid_text: &str,
777 suggestion: Option<Cow<'static, str>>,
778 ) -> Self {
779 Self(Box::new(ParseErrorInner::text_envelope_error(
780 description,
781 invalid_text,
782 suggestion,
783 )))
784 }
785
786 pub fn bin_event_error(
787 description: String,
788 event_number: u64,
789 invalid_bytes: &[u8],
790 suggestion: Option<Cow<'static, str>>,
791 ) -> Self {
792 Self(Box::new(ParseErrorInner::bin_event_error(
793 description,
794 event_number,
795 invalid_bytes,
796 suggestion,
797 )))
798 }
799
800 pub fn bin_envelope_error(
801 description: String,
802 invalid_bytes: &[u8],
803 suggestion: Option<Cow<'static, str>>,
804 ) -> Self {
805 Self(Box::new(ParseErrorInner::bin_envelope_error(
806 description,
807 invalid_bytes,
808 suggestion,
809 )))
810 }
811
812 pub fn map_description<F>(self, f: F) -> Self
816 where
817 F: FnOnce(&str) -> String,
818 {
819 let mut inner = self.0;
820 let description = f(&inner.description);
821 inner.description = description;
822 Self(inner)
823 }
824
825 pub fn get_error_tag(&self) -> Option<String> {
826 self.0.get_error_tag()
827 }
828}
829
830#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
831pub struct ParseErrorInner {
832 description: String,
834
835 event_number: Option<u64>,
843
844 field: Option<String>,
849
850 invalid_bytes: Option<Vec<u8>>,
855
856 invalid_text: Option<String>,
860
861 suggestion: Option<Cow<'static, str>>,
864
865 tag: Option<String>,
867}
868
869impl Display for ParseErrorInner {
870 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
871 let event = if let Some(event_number) = self.event_number {
872 format!(" (event #{})", event_number)
873 } else {
874 String::new()
875 };
876
877 let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
878 format!("\nInvalid bytes: {invalid_bytes:?}")
879 } else if let Some(invalid_text) = &self.invalid_text {
880 format!("\nInvalid fragment: '{invalid_text}'")
881 } else {
882 String::new()
883 };
884
885 let suggestion = if let Some(suggestion) = &self.suggestion {
886 format!("\n{suggestion}")
887 } else {
888 String::new()
889 };
890
891 write!(
892 f,
893 "Parse error{event}: {}{invalid_fragment}{suggestion}",
894 self.description
895 )
896 }
897}
898
899impl ParseErrorInner {
900 pub fn new(
901 description: String,
902 event_number: Option<u64>,
903 field: Option<String>,
904 invalid_text: Option<&str>,
905 invalid_bytes: Option<&[u8]>,
906 suggestion: Option<Cow<'static, str>>,
907 error_tag: Option<String>,
908 ) -> Self {
909 Self {
910 description,
911 event_number,
912 field,
913 invalid_text: invalid_text.map(str::to_string),
914 invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
915 suggestion,
916 tag: error_tag,
917 }
918 }
919
920 pub fn text_event_error<E>(
923 msg: &str,
924 error: E,
925 event_number: u64,
926 invalid_text: Option<&str>,
927 suggestion: Option<Cow<'static, str>>,
928 ) -> Self
929 where
930 E: ToString,
931 {
932 let err_str = error.to_string();
933 let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
937 if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
938 .into_iter::<FieldParseError>()
939 .next()
940 {
941 (err.description, Some(err.field))
942 } else {
943 (err_str, None)
944 }
945 } else {
946 (err_str, None)
947 };
948 let column_name = if let Some(field) = &field {
949 format!(": error parsing field '{field}'")
950 } else {
951 String::new()
952 };
953
954 Self::new(
955 format!("{msg}{column_name}: {descr}",),
956 Some(event_number),
957 field,
958 invalid_text,
959 None,
960 suggestion,
961 Some("text_event_err".to_string()),
962 )
963 }
964
965 pub fn text_envelope_error(
969 description: String,
970 invalid_text: &str,
971 suggestion: Option<Cow<'static, str>>,
972 ) -> Self {
973 Self::new(
974 description,
975 None,
976 None,
977 Some(invalid_text),
978 None,
979 suggestion,
980 Some("text_envelope_err".to_string()),
981 )
982 }
983
984 pub fn bin_event_error(
987 description: String,
988 event_number: u64,
989 invalid_bytes: &[u8],
990 suggestion: Option<Cow<'static, str>>,
991 ) -> Self {
992 Self::new(
993 description,
994 Some(event_number),
995 None,
996 None,
997 Some(invalid_bytes),
998 suggestion,
999 Some("bin_event_err".to_string()),
1000 )
1001 }
1002
1003 pub fn bin_envelope_error(
1007 description: String,
1008 invalid_bytes: &[u8],
1009 suggestion: Option<Cow<'static, str>>,
1010 ) -> Self {
1011 Self::new(
1012 description,
1013 None,
1014 None,
1015 None,
1016 Some(invalid_bytes),
1017 suggestion,
1018 Some("bin_envelope_err".to_string()),
1019 )
1020 }
1021
1022 pub fn get_error_tag(&self) -> Option<String> {
1023 self.tag.clone()
1024 }
1025}