1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::max;
4use std::fmt::{Display, Error as FmtError, Formatter};
5use std::fs::File;
6use std::hash::Hasher;
7use std::io::{Error as IoError, Read};
8use std::ops::{Add, AddAssign, Range};
9use std::sync::Arc;
10
11use actix_web::HttpRequest;
12use anyhow::Result as AnyResult;
13use dbsp::operator::input::StagedBuffers;
14use erased_serde::Serialize as ErasedSerialize;
15use feldera_types::config::ConnectorConfig;
16use feldera_types::program_schema::Relation;
17use feldera_types::serde_with_context::FieldParseError;
18use serde::Serialize;
19use serde::de::StdError;
20
21use crate::ConnectorMetadata;
22use crate::catalog::{InputCollectionHandle, SerBatchReader};
23use crate::errors::controller::ControllerError;
24use crate::postprocess::Postprocessor;
25use crate::preprocess::Preprocessor;
26use crate::transport::{OutputBatchType, Step};
27
28pub trait InputFormat: Send + Sync {
32 fn name(&self) -> Cow<'static, str>;
34
35 fn config_from_http_request(
49 &self,
50 endpoint_name: &str,
51 request: &HttpRequest,
52 ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
53
54 fn new_parser(
62 &self,
63 endpoint_name: &str,
64 input_stream: &InputCollectionHandle,
65 config: &serde_json::Value,
66 ) -> Result<Box<dyn Parser>, ControllerError>;
67}
68
69pub trait InputBuffer: Any + Send {
95 fn flush(&mut self);
98
99 fn len(&self) -> BufferSize;
101
102 fn hash(&self, hasher: &mut dyn Hasher);
110
111 fn is_empty(&self) -> bool {
112 self.len().is_empty()
113 }
114
115 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
135
136 fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
137 self.take_some(usize::MAX)
138 }
139}
140
141#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
143pub struct BufferSize {
144 pub records: usize,
146
147 pub bytes: usize,
152}
153
154impl BufferSize {
155 pub fn empty() -> Self {
157 Self::default()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.records == 0 && self.bytes == 0
163 }
164}
165
166impl Add for BufferSize {
167 type Output = Self;
168
169 fn add(self, rhs: Self) -> Self::Output {
170 BufferSize {
171 records: self.records + rhs.records,
172 bytes: self.bytes + rhs.bytes,
173 }
174 }
175}
176
177impl AddAssign for BufferSize {
178 fn add_assign(&mut self, rhs: Self) {
179 self.records += rhs.records;
180 self.bytes += rhs.bytes;
181 }
182}
183
184impl InputBuffer for Option<Box<dyn InputBuffer>> {
185 fn len(&self) -> BufferSize {
186 self.as_ref()
187 .map_or(BufferSize::empty(), |buffer| buffer.len())
188 }
189
190 fn hash(&self, hasher: &mut dyn Hasher) {
191 if let Some(buffer) = self {
192 buffer.hash(hasher)
193 }
194 }
195
196 fn flush(&mut self) {
197 if let Some(buffer) = self.as_mut() {
198 buffer.flush()
199 }
200 }
201
202 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
203 self.as_mut().and_then(|buffer| buffer.take_some(n))
204 }
205}
206
207impl InputBuffer for Box<dyn InputBuffer> {
208 fn len(&self) -> BufferSize {
209 self.as_ref().len()
210 }
211
212 fn hash(&self, hasher: &mut dyn Hasher) {
213 self.as_ref().hash(hasher)
214 }
215
216 fn flush(&mut self) {
217 self.as_mut().flush()
218 }
219
220 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
221 self.as_mut().take_some(n)
222 }
223}
224
225impl InputBuffer for Vec<Box<dyn InputBuffer>> {
226 fn flush(&mut self) {
227 for v in self.iter_mut() {
228 v.flush();
229 }
230 }
231
232 fn len(&self) -> BufferSize {
233 let mut size = BufferSize::empty();
234 for v in self.iter() {
235 size += v.len();
236 }
237 size
238 }
239
240 fn hash(&self, hasher: &mut dyn Hasher) {
241 for v in self.iter() {
242 v.hash(hasher);
243 }
244 }
245
246 fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
247 let mut result = Vec::new();
248 let mut remaining = n;
249 let mut index = 0;
251 for v in self.iter_mut() {
252 if remaining == 0 {
253 break;
254 }
255 let buf = v.take_some(remaining);
256 if let Some(ib) = buf {
257 let len = ib.len().records;
258 if remaining >= len {
259 index += 1;
261 }
262 remaining = remaining.saturating_sub(len);
263 result.push(ib);
264 }
265 }
266 self.drain(0..index);
267 if result.is_empty() {
268 None
269 } else {
270 Some(Box::new(result))
271 }
272 }
273}
274
275pub fn flatten_nested<T>(buffers: Vec<Box<dyn InputBuffer>>) -> Vec<Box<T>>
278where
279 T: Any,
280{
281 fn inner<T>(input: Vec<Box<dyn InputBuffer>>, output: &mut Vec<Box<T>>)
282 where
283 T: Any,
284 {
285 for buffer in input {
286 let any = buffer as Box<dyn Any>;
287 match any.downcast::<Vec<Box<dyn InputBuffer>>>() {
288 Ok(vec) => inner(*vec, output),
289 Err(any) => output.push(any.downcast().unwrap()),
290 }
291 }
292 }
293
294 let mut output = Vec::new();
295 inner(buffers, &mut output);
296 output
297}
298
299pub struct StagedInputBuffer {
315 buffer: Box<dyn StagedBuffers>,
316 size: BufferSize,
317}
318
319impl StagedInputBuffer {
320 pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
321 Self { buffer, size }
322 }
323}
324
325impl InputBuffer for StagedInputBuffer {
326 fn flush(&mut self) {
327 self.buffer.flush()
328 }
329
330 fn len(&self) -> BufferSize {
331 self.size
332 }
333
334 fn hash(&self, _hasher: &mut dyn Hasher) {
335 unimplemented!()
336 }
337
338 fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
339 unimplemented!()
340 }
341}
342
343pub trait Parser: Send + Sync {
345 fn parse(
351 &mut self,
352 data: &[u8],
353 metadata: Option<ConnectorMetadata>,
354 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
355
356 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
361
362 fn splitter(&self) -> Box<dyn Splitter>;
365
366 fn fork(&self) -> Box<dyn Parser>;
371}
372
373pub struct StreamingPreprocessedParser {
375 preprocessor: Box<dyn Preprocessor>,
376 stream_splitter: StreamSplitter,
377 parser: Box<dyn Parser>,
378}
379
380impl StreamingPreprocessedParser {
381 pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
382 Self {
383 preprocessor,
384 stream_splitter: StreamSplitter::new(parser.splitter()),
385 parser,
386 }
387 }
388}
389
390impl Parser for StreamingPreprocessedParser {
391 fn parse(
392 &mut self,
393 data: &[u8],
394 metadata: Option<ConnectorMetadata>,
395 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
396 let (pre_data, mut pre_errors) = self.preprocessor.process(data);
397 self.stream_splitter.append(&pre_data);
398 let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
399 while let Some(chunk) = self.stream_splitter.next(true) {
400 let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
401 pre_errors.append(&mut parse_errors);
402 if let Some(data) = parsed_data {
403 parsed.push(data);
404 }
405 }
406 if parsed.is_empty() {
407 (None, pre_errors)
408 } else {
409 (Some(Box::new(parsed)), pre_errors)
410 }
411 }
412
413 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
414 self.parser.stage(buffers)
415 }
416
417 fn splitter(&self) -> Box<dyn Splitter> {
418 let pre_splitter = self.preprocessor.splitter();
419 if let Some(splitter) = pre_splitter {
420 return splitter;
421 }
422 self.parser.splitter()
423 }
424
425 fn fork(&self) -> Box<dyn Parser> {
426 Box::new(StreamingPreprocessedParser::new(
427 self.preprocessor.fork(),
428 self.parser.fork(),
429 ))
430 }
431}
432
433pub struct MessageOrientedPreprocessedParser {
435 preprocessor: Box<dyn Preprocessor>,
436 parser: Box<dyn Parser>,
437}
438
439impl MessageOrientedPreprocessedParser {
440 pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
441 Self {
442 preprocessor,
443 parser,
444 }
445 }
446}
447
448impl Parser for MessageOrientedPreprocessedParser {
449 fn parse(
450 &mut self,
451 data: &[u8],
452 metadata: Option<ConnectorMetadata>,
453 ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
454 let (pre_data, mut pre_errors) = self.preprocessor.process(data);
455 let mut parser_splitter = self.parser.splitter();
456 let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
457 let mut remaining = pre_data.as_slice();
458 while !remaining.is_empty() {
460 let chunk;
461 let split_offset = parser_splitter.input(remaining).unwrap_or(remaining.len());
462 (chunk, remaining) = remaining.split_at(split_offset);
463 let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
464 pre_errors.append(&mut parse_errors);
465 if let Some(data) = parsed_data {
466 parsed.push(data);
467 }
468 }
469 if parsed.is_empty() {
470 (None, pre_errors)
471 } else {
472 (Some(Box::new(parsed)), pre_errors)
473 }
474 }
475
476 fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
477 self.parser.stage(buffers)
478 }
479
480 fn splitter(&self) -> Box<dyn Splitter> {
481 let pre_splitter = self.preprocessor.splitter();
482 if let Some(splitter) = pre_splitter {
483 return splitter;
484 }
485 self.parser.splitter()
486 }
487
488 fn fork(&self) -> Box<dyn Parser> {
489 Box::new(MessageOrientedPreprocessedParser::new(
490 self.preprocessor.fork(),
491 self.parser.fork(),
492 ))
493 }
494}
495
496pub trait Splitter: Send + Sync {
502 fn input(&mut self, data: &[u8]) -> Option<usize>;
511
512 fn clear(&mut self);
515}
516
517pub struct StreamSplitter {
524 buffer: Vec<u8>,
525 start: u64,
526 fragment: Range<usize>,
527 fed: usize,
528 splitter: Box<dyn Splitter>,
529}
530
531impl StreamSplitter {
532 pub fn new(splitter: Box<dyn Splitter>) -> Self {
534 Self {
535 buffer: Vec::new(),
536 start: 0,
537 fragment: 0..0,
538 fed: 0,
539 splitter,
540 }
541 }
542
543 pub fn next(&mut self, eoi: bool) -> Option<&[u8]> {
547 match self
548 .splitter
549 .input(&self.buffer[self.fed..self.fragment.end])
550 {
551 Some(n) => {
552 let chunk = &self.buffer[self.fragment.start..self.fed + n];
553 self.fed += n;
554 self.fragment.start = self.fed;
555 Some(chunk)
556 }
557 None => {
558 self.fed = self.fragment.end;
559 if eoi && !self.fragment.is_empty() {
560 let chunk = &self.buffer[self.fragment.clone()];
561 self.fragment.start = self.fragment.end;
562 Some(chunk)
563 } else {
564 None
565 }
566 }
567 }
568 }
569
570 pub fn append(&mut self, data: &[u8]) {
572 let final_len = self.fragment.len() + data.len();
573 if final_len > self.buffer.len() {
574 self.buffer.reserve(final_len - self.buffer.len());
575 }
576 self.buffer.copy_within(self.fragment.clone(), 0);
577 self.buffer.resize(self.fragment.len(), 0);
578 self.buffer.extend(data);
579 self.fed -= self.fragment.start;
580 self.start += self.fragment.start as u64;
581 self.fragment = 0..self.buffer.len();
582 }
583
584 pub fn read(
588 &mut self,
589 file: &mut File,
590 buffer_size: usize,
591 limit: usize,
592 ) -> Result<usize, IoError> {
593 if self.fragment.start != 0 {
595 self.buffer.copy_within(self.fragment.clone(), 0);
596 self.fed -= self.fragment.start;
597 self.start += self.fragment.start as u64;
598 self.fragment = 0..self.fragment.len();
599 }
600
601 if self.fragment.len() == self.buffer.len() {
603 self.buffer
604 .resize(max(buffer_size, self.buffer.capacity() * 2), 0);
605 }
606
607 let mut space = &mut self.buffer[self.fragment.len()..];
609 if space.len() > limit {
610 space = &mut space[..limit];
611 }
612 let result = file.read(space);
613 if let Ok(n) = result {
614 self.fragment.end += n;
615 }
616 result
617 }
618
619 pub fn position(&self) -> u64 {
622 self.start + self.fragment.start as u64
623 }
624
625 pub fn seek(&mut self, offset: u64) {
628 self.start = offset;
629 self.fragment = 0..0;
630 self.fed = 0;
631 self.splitter.clear();
632 }
633
634 pub fn reset(&mut self) {
636 self.seek(0);
637 }
638}
639
640pub trait OutputFormat: Send + Sync {
641 fn name(&self) -> Cow<'static, str>;
643
644 fn config_from_http_request(
649 &self,
650 endpoint_name: &str,
651 request: &HttpRequest,
652 ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
653
654 fn new_encoder(
669 &self,
670 endpoint_name: &str,
671 config: &ConnectorConfig,
672 key_schema: &Option<Relation>,
673 value_schema: &Relation,
674 consumer: Box<dyn OutputConsumer>,
675 is_index: bool,
676 ) -> Result<Box<dyn Encoder>, ControllerError>;
677}
678
679pub trait Encoder: Send {
680 fn consumer(&mut self) -> &mut dyn OutputConsumer;
682
683 fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
692}
693
694#[doc(hidden)]
695pub trait OutputConsumer: Send {
696 fn max_buffer_size_bytes(&self) -> usize;
699
700 fn batch_start(&mut self, step: Step, batch_type: OutputBatchType);
701
702 fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
704
705 fn push_key(
707 &mut self,
708 key: Option<&[u8]>,
709 val: Option<&[u8]>,
710 headers: &[(&str, Option<&[u8]>)],
711 num_records: usize,
712 );
713 fn batch_end(&mut self);
714
715 fn memory(&self) -> usize {
720 0
721 }
722}
723
724pub type PostprocessorErrorCallback = Box<dyn Fn(anyhow::Error) + Send + Sync>;
730
731pub struct PostprocessedConsumer {
738 inner: Box<dyn OutputConsumer>,
739 postprocessor: Box<dyn Postprocessor>,
740 error_cb: PostprocessorErrorCallback,
741}
742
743impl PostprocessedConsumer {
744 pub fn new(
745 inner: Box<dyn OutputConsumer>,
746 postprocessor: Box<dyn Postprocessor>,
747 error_cb: PostprocessorErrorCallback,
748 ) -> Self {
749 Self {
750 inner,
751 postprocessor,
752 error_cb,
753 }
754 }
755}
756
757impl OutputConsumer for PostprocessedConsumer {
758 fn max_buffer_size_bytes(&self) -> usize {
759 self.inner.max_buffer_size_bytes()
760 }
761
762 fn batch_start(&mut self, step: Step, batch_type: OutputBatchType) {
763 self.postprocessor.batch_start(step, batch_type);
764 self.inner.batch_start(step, batch_type);
765 }
766
767 fn push_buffer(&mut self, buffer: &[u8], num_records: usize) {
768 match self.postprocessor.push_buffer(buffer) {
769 Ok(transformed) => self.inner.push_buffer(&transformed, num_records),
770 Err(e) => (self.error_cb)(e),
771 }
772 }
773
774 fn push_key(
775 &mut self,
776 key: Option<&[u8]>,
777 val: Option<&[u8]>,
778 headers: &[(&str, Option<&[u8]>)],
779 num_records: usize,
780 ) {
781 match self.postprocessor.push_key(key, val, headers) {
782 Ok((k, v, h)) => {
783 let h_refs: Vec<(&str, Option<&[u8]>)> =
784 h.iter().map(|(k, v)| (k.as_str(), v.as_deref())).collect();
785 self.inner
786 .push_key(k.as_deref(), v.as_deref(), &h_refs, num_records);
787 }
788 Err(e) => (self.error_cb)(e),
789 }
790 }
791
792 fn batch_end(&mut self) {
793 self.postprocessor.batch_end();
794 self.inner.batch_end();
795 }
796
797 fn memory(&self) -> usize {
798 self.inner.memory() + self.postprocessor.memory()
799 }
800}
801
802pub const MAX_DUPLICATES: i64 = 1_000_000;
808
809pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
812
813#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
815#[serde(transparent)]
816pub struct ParseError(Box<ParseErrorInner>);
819impl Display for ParseError {
820 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
821 self.0.fmt(f)
822 }
823}
824
825impl StdError for ParseError {}
826
827impl ParseError {
828 pub fn new(
829 description: String,
830 event_number: Option<u64>,
831 field: Option<String>,
832 invalid_text: Option<&str>,
833 invalid_bytes: Option<&[u8]>,
834 suggestion: Option<Cow<'static, str>>,
835 error_tag: Option<String>,
836 ) -> Self {
837 Self(Box::new(ParseErrorInner::new(
838 description,
839 event_number,
840 field,
841 invalid_text,
842 invalid_bytes,
843 suggestion,
844 error_tag,
845 )))
846 }
847
848 pub fn text_event_error<E>(
849 msg: &str,
850 error: E,
851 event_number: u64,
852 invalid_text: Option<&str>,
853 suggestion: Option<Cow<'static, str>>,
854 ) -> Self
855 where
856 E: ToString,
857 {
858 Self(Box::new(ParseErrorInner::text_event_error(
859 msg,
860 error,
861 event_number,
862 invalid_text,
863 suggestion,
864 )))
865 }
866
867 pub fn text_envelope_error(
868 description: String,
869 invalid_text: &str,
870 suggestion: Option<Cow<'static, str>>,
871 ) -> Self {
872 Self(Box::new(ParseErrorInner::text_envelope_error(
873 description,
874 invalid_text,
875 suggestion,
876 )))
877 }
878
879 pub fn bin_event_error(
880 description: String,
881 event_number: u64,
882 invalid_bytes: &[u8],
883 suggestion: Option<Cow<'static, str>>,
884 ) -> Self {
885 Self(Box::new(ParseErrorInner::bin_event_error(
886 description,
887 event_number,
888 invalid_bytes,
889 suggestion,
890 )))
891 }
892
893 pub fn bin_envelope_error(
894 description: String,
895 invalid_bytes: &[u8],
896 suggestion: Option<Cow<'static, str>>,
897 ) -> Self {
898 Self(Box::new(ParseErrorInner::bin_envelope_error(
899 description,
900 invalid_bytes,
901 suggestion,
902 )))
903 }
904
905 pub fn map_description<F>(self, f: F) -> Self
909 where
910 F: FnOnce(&str) -> String,
911 {
912 let mut inner = self.0;
913 let description = f(&inner.description);
914 inner.description = description;
915 Self(inner)
916 }
917
918 pub fn get_error_tag(&self) -> Option<String> {
919 self.0.get_error_tag()
920 }
921}
922
923#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
924pub struct ParseErrorInner {
925 description: String,
927
928 event_number: Option<u64>,
936
937 field: Option<String>,
942
943 invalid_bytes: Option<Vec<u8>>,
948
949 invalid_text: Option<String>,
953
954 suggestion: Option<Cow<'static, str>>,
957
958 tag: Option<String>,
960}
961
962impl Display for ParseErrorInner {
963 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
964 let event = if let Some(event_number) = self.event_number {
965 format!(" (event #{})", event_number)
966 } else {
967 String::new()
968 };
969
970 let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
971 format!("\nInvalid bytes: {invalid_bytes:?}")
972 } else if let Some(invalid_text) = &self.invalid_text {
973 format!("\nInvalid fragment: '{invalid_text}'")
974 } else {
975 String::new()
976 };
977
978 let suggestion = if let Some(suggestion) = &self.suggestion {
979 format!("\n{suggestion}")
980 } else {
981 String::new()
982 };
983
984 write!(
985 f,
986 "Parse error{event}: {}{invalid_fragment}{suggestion}",
987 self.description
988 )
989 }
990}
991
992impl ParseErrorInner {
993 pub fn new(
994 description: String,
995 event_number: Option<u64>,
996 field: Option<String>,
997 invalid_text: Option<&str>,
998 invalid_bytes: Option<&[u8]>,
999 suggestion: Option<Cow<'static, str>>,
1000 error_tag: Option<String>,
1001 ) -> Self {
1002 Self {
1003 description,
1004 event_number,
1005 field,
1006 invalid_text: invalid_text.map(str::to_string),
1007 invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
1008 suggestion,
1009 tag: error_tag,
1010 }
1011 }
1012
1013 pub fn text_event_error<E>(
1016 msg: &str,
1017 error: E,
1018 event_number: u64,
1019 invalid_text: Option<&str>,
1020 suggestion: Option<Cow<'static, str>>,
1021 ) -> Self
1022 where
1023 E: ToString,
1024 {
1025 let err_str = error.to_string();
1026 let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
1030 if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
1031 .into_iter::<FieldParseError>()
1032 .next()
1033 {
1034 (err.description, Some(err.field))
1035 } else {
1036 (err_str, None)
1037 }
1038 } else {
1039 (err_str, None)
1040 };
1041 let column_name = if let Some(field) = &field {
1042 format!(": error parsing field '{field}'")
1043 } else {
1044 String::new()
1045 };
1046
1047 Self::new(
1048 format!("{msg}{column_name}: {descr}",),
1049 Some(event_number),
1050 field,
1051 invalid_text,
1052 None,
1053 suggestion,
1054 Some("text_event_err".to_string()),
1055 )
1056 }
1057
1058 pub fn text_envelope_error(
1062 description: String,
1063 invalid_text: &str,
1064 suggestion: Option<Cow<'static, str>>,
1065 ) -> Self {
1066 Self::new(
1067 description,
1068 None,
1069 None,
1070 Some(invalid_text),
1071 None,
1072 suggestion,
1073 Some("text_envelope_err".to_string()),
1074 )
1075 }
1076
1077 pub fn bin_event_error(
1080 description: String,
1081 event_number: u64,
1082 invalid_bytes: &[u8],
1083 suggestion: Option<Cow<'static, str>>,
1084 ) -> Self {
1085 Self::new(
1086 description,
1087 Some(event_number),
1088 None,
1089 None,
1090 Some(invalid_bytes),
1091 suggestion,
1092 Some("bin_event_err".to_string()),
1093 )
1094 }
1095
1096 pub fn bin_envelope_error(
1100 description: String,
1101 invalid_bytes: &[u8],
1102 suggestion: Option<Cow<'static, str>>,
1103 ) -> Self {
1104 Self::new(
1105 description,
1106 None,
1107 None,
1108 None,
1109 Some(invalid_bytes),
1110 suggestion,
1111 Some("bin_envelope_err".to_string()),
1112 )
1113 }
1114
1115 pub fn get_error_tag(&self) -> Option<String> {
1116 self.tag.clone()
1117 }
1118}