Skip to main content

feldera_adapterlib/
format.rs

1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::max;
4use std::fmt::{Display, Error as FmtError, Formatter};
5use std::fs::File;
6use std::hash::Hasher;
7use std::io::{Error as IoError, Read};
8use std::ops::{Add, AddAssign, Range};
9use std::sync::Arc;
10
11use actix_web::HttpRequest;
12use anyhow::Result as AnyResult;
13use dbsp::operator::input::StagedBuffers;
14use erased_serde::Serialize as ErasedSerialize;
15use feldera_types::config::ConnectorConfig;
16use feldera_types::program_schema::Relation;
17use feldera_types::serde_with_context::FieldParseError;
18use serde::Serialize;
19use serde::de::StdError;
20
21use crate::ConnectorMetadata;
22use crate::catalog::{InputCollectionHandle, SerBatchReader};
23use crate::errors::controller::ControllerError;
24use crate::preprocess::Preprocessor;
25use crate::transport::Step;
26
27/// Trait that represents a specific data format.
28///
29/// This is a factory trait that creates parsers for a specific data format.
30pub trait InputFormat: Send + Sync {
31    /// Unique name of the data format.
32    fn name(&self) -> Cow<'static, str>;
33
34    /// Extract parser configuration from an HTTP request.
35    ///
36    /// Returns the extracted configuration cast to the `ErasedSerialize` trait
37    /// object (to keep this trait object-safe).
38    ///
39    /// # Discussion
40    ///
41    /// We could rely on the `serde_urlencoded` crate to deserialize the config
42    /// from the HTTP request, which is what most implementations will do
43    /// internally; however allowing the implementation to override this
44    /// method enables additional flexibility. For example, an
45    /// implementation may use `Content-Type` and other request headers, set
46    /// HTTP-specific defaults for config fields, etc.
47    fn config_from_http_request(
48        &self,
49        endpoint_name: &str,
50        request: &HttpRequest,
51    ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
52
53    /// Create a new parser for the format.
54    ///
55    /// # Arguments
56    ///
57    /// * `input_stream` - Input stream of the circuit to push parsed data to.
58    ///
59    /// * `config` - Format-specific configuration.
60    fn new_parser(
61        &self,
62        endpoint_name: &str,
63        input_stream: &InputCollectionHandle,
64        config: &serde_json::Value,
65    ) -> Result<Box<dyn Parser>, ControllerError>;
66}
67
68/// A collection of records associated with an input handle.
69///
70/// A [Parser] holds and adds records to an [InputBuffer].  The client, which is
71/// typically an [InputReader](crate::transport::InputReader), collects one or
72/// more [InputBuffer]s and pushes them to the circuit when the controller
73/// requests it.
74///
75/// # Pushing buffers into a circuit
76///
77/// There are two ways to push `InputBuffer`s into a circuit:
78///
79/// - With [InputBuffer::flush].  This immediately pushes the input buffer into
80///   the DBSP input handle.
81///
82/// - By passing buffers to [Parser::stage], which collects all of them into a
83///   [StagedBuffers].  Then, later, call [StagedBuffers::flush] to push the
84///   input buffers into the circuit.
85///
86/// Both approaches are equivalent in terms of correctness.  There can be a
87/// difference in performance, because [InputBuffer::flush] has a significant
88/// cost for a large number of records.  Using [StagedBuffers] has a similar
89/// cost, but it incurs it in the call to [Parser::stage] rather than in
90/// [StagedBuffers::flush].  This means that, if the input connector can buffer
91/// data ahead of the circuit's demand for it, the cost can be hidden and the
92/// circuit as a whole runs faster.
93pub trait InputBuffer: Any + Send {
94    /// Pushes all of the records into the circuit input handle, and discards
95    /// those records.
96    fn flush(&mut self);
97
98    /// Returns the number of buffered records and bytes.
99    fn len(&self) -> BufferSize;
100
101    /// Hashes the records in the input buffer into `hasher`, in order.  This is
102    /// used to ensure that input data remains the same for replay, so, for
103    /// equal data, it should remain the same from one program run to the next.
104    /// Data might be divided into `InputBuffer`s differently from one run to
105    /// the next, so the data fed into `hasher` should be the same if, for
106    /// example, records 0..10 then 10..20 are fed in one run and 0..5, 5..15,
107    /// 15..20 in another.
108    fn hash(&self, hasher: &mut dyn Hasher);
109
110    fn is_empty(&self) -> bool {
111        self.len().is_empty()
112    }
113
114    /// Removes the first `n` records from this input buffer and returns a new
115    /// [InputBuffer] that holds them. If fewer than `n` records are available,
116    /// returns all of them. May return `None` if this input buffer is empty (or
117    /// if `n` is 0).
118    ///
119    /// Some implementations can't implement `n` with full granularity. These
120    /// implementations might return more than `n` records.
121    ///
122    /// This is useful for extracting the records from one of several parser
123    /// threads to send to a single common thread to be pushed later.
124    ///
125    /// # Byte accounting
126    ///
127    /// This function must not increase or decrease the total number of bytes.
128    /// That is, if the returned buffer is named `head`, `self.len().bytes`
129    /// before the call must equal `self.len().bytes + head.len().bytes`
130    /// following the call.  Violating this invariant will cause the number of
131    /// buffered bytes reported by a pipeline never to fall to zero (or to wrap
132    /// around to `u64::MAX`).
133    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
134
135    fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
136        self.take_some(usize::MAX)
137    }
138}
139
140/// The size of an [InputBuffer].
141#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
142pub struct BufferSize {
143    /// The exact number of records in the buffer.
144    pub records: usize,
145
146    /// The number of bytes attributable to the buffer.
147    ///
148    /// This need not be exact.  (When a buffer is split with
149    /// [InputBuffer::take_some], it will usually not be exact.)
150    pub bytes: usize,
151}
152
153impl BufferSize {
154    /// The size of an empty buffer.
155    pub fn empty() -> Self {
156        Self::default()
157    }
158
159    /// Returns true if this buffer is empty.
160    pub fn is_empty(&self) -> bool {
161        self.records == 0 && self.bytes == 0
162    }
163}
164
165impl Add for BufferSize {
166    type Output = Self;
167
168    fn add(self, rhs: Self) -> Self::Output {
169        BufferSize {
170            records: self.records + rhs.records,
171            bytes: self.bytes + rhs.bytes,
172        }
173    }
174}
175
176impl AddAssign for BufferSize {
177    fn add_assign(&mut self, rhs: Self) {
178        self.records += rhs.records;
179        self.bytes += rhs.bytes;
180    }
181}
182
183impl InputBuffer for Option<Box<dyn InputBuffer>> {
184    fn len(&self) -> BufferSize {
185        self.as_ref()
186            .map_or(BufferSize::empty(), |buffer| buffer.len())
187    }
188
189    fn hash(&self, hasher: &mut dyn Hasher) {
190        if let Some(buffer) = self {
191            buffer.hash(hasher)
192        }
193    }
194
195    fn flush(&mut self) {
196        if let Some(buffer) = self.as_mut() {
197            buffer.flush()
198        }
199    }
200
201    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
202        self.as_mut().and_then(|buffer| buffer.take_some(n))
203    }
204}
205
206impl InputBuffer for Box<dyn InputBuffer> {
207    fn len(&self) -> BufferSize {
208        self.as_ref().len()
209    }
210
211    fn hash(&self, hasher: &mut dyn Hasher) {
212        self.as_ref().hash(hasher)
213    }
214
215    fn flush(&mut self) {
216        self.as_mut().flush()
217    }
218
219    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
220        self.as_mut().take_some(n)
221    }
222}
223
224impl InputBuffer for Vec<Box<dyn InputBuffer>> {
225    fn flush(&mut self) {
226        for v in self.iter_mut() {
227            v.flush();
228        }
229    }
230
231    fn len(&self) -> BufferSize {
232        let mut size = BufferSize::empty();
233        for v in self.iter() {
234            size += v.len();
235        }
236        size
237    }
238
239    fn hash(&self, hasher: &mut dyn Hasher) {
240        for v in self.iter() {
241            v.hash(hasher);
242        }
243    }
244
245    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
246        let mut result = Vec::new();
247        let mut remaining = n;
248        // Index of first buffer that should be preserved
249        let mut index = 0;
250        for v in self.iter_mut() {
251            if remaining == 0 {
252                break;
253            }
254            let buf = v.take_some(remaining);
255            if let Some(ib) = buf {
256                let len = ib.len().records;
257                if remaining >= len {
258                    // This buffer will be completely used
259                    index += 1;
260                }
261                remaining = remaining.saturating_sub(len);
262                result.push(ib);
263            }
264        }
265        self.drain(0..index);
266        if result.is_empty() {
267            None
268        } else {
269            Some(Box::new(result))
270        }
271    }
272}
273
274/// If any of the InputBuffer elements is a Vec itself, flatten it recursively.
275/// Return the concatenated input buffers all downcast to T.
276pub fn flatten_nested<T>(buffers: Vec<Box<dyn InputBuffer>>) -> Vec<Box<T>>
277where
278    T: Any,
279{
280    fn inner<T>(input: Vec<Box<dyn InputBuffer>>, output: &mut Vec<Box<T>>)
281    where
282        T: Any,
283    {
284        for buffer in input {
285            let any = buffer as Box<dyn Any>;
286            match any.downcast::<Vec<Box<dyn InputBuffer>>>() {
287                Ok(vec) => inner(*vec, output),
288                Err(any) => output.push(any.downcast().unwrap()),
289            }
290        }
291    }
292
293    let mut output = Vec::new();
294    inner(buffers, &mut output);
295    output
296}
297
298/// A wrapper around a [StagedBuffers] that implements the [InputBuffer] trait.
299///
300/// The `StagedBuffers` trait is similar to `InputBuffer` in that it supports flushing
301/// a set of tuples to the circuit. Unlike `InputBuffer`, it doesn't support returning
302/// its size as a `BufferSize`. It also doesn't support hashing and taking a subset of
303/// records.
304///
305/// This wrapper adds the former by storing `BufferSize` collected from `InputBuffer`s
306/// used to create the `StagedBuffers`.
307///
308/// `hash()` and `take_some()` methods are unimplemented. Therefore this wrapper can only be
309/// safely used in contexts where these methods are not needed.
310///
311// FIXME: It would be better to encode the unimplemented functionality in the type system,
312// e.g., as an extension trait.
313pub struct StagedInputBuffer {
314    buffer: Box<dyn StagedBuffers>,
315    size: BufferSize,
316}
317
318impl StagedInputBuffer {
319    pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
320        Self { buffer, size }
321    }
322}
323
324impl InputBuffer for StagedInputBuffer {
325    fn flush(&mut self) {
326        self.buffer.flush()
327    }
328
329    fn len(&self) -> BufferSize {
330        self.size
331    }
332
333    fn hash(&self, _hasher: &mut dyn Hasher) {
334        unimplemented!()
335    }
336
337    fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
338        unimplemented!()
339    }
340}
341
342/// Parses raw bytes into database records.
343pub trait Parser: Send + Sync {
344    /// Parses `data` into records and returns the records and any parse errors
345    /// that occurred.
346    ///
347    /// XXX it would be even better if this were `&self` and avoided keeping
348    /// state entirely.
349    fn parse(
350        &mut self,
351        data: &[u8],
352        metadata: Option<ConnectorMetadata>,
353    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
354
355    /// Stages all of the `buffers`, which must have been obtained from this
356    /// [Parser] or one forked from this one, into a [StagedBuffers] that may
357    /// later be used to push the collected data into the circuit.  See
358    /// [StagedBuffers] for more information.
359    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
360
361    /// Returns an object that can be used to break a stream of incoming data
362    /// into complete records to pass to [Parser::parse].
363    fn splitter(&self) -> Box<dyn Splitter>;
364
365    /// Create a new parser with the same configuration as `self`.
366    ///
367    /// Used by multithreaded transport endpoints to create multiple parallel
368    /// input pipelines.
369    fn fork(&self) -> Box<dyn Parser>;
370}
371
372/// A parser with preprocessing for a streaming preprocessor
373pub struct StreamingPreprocessedParser {
374    preprocessor: Box<dyn Preprocessor>,
375    stream_splitter: StreamSplitter,
376    parser: Box<dyn Parser>,
377}
378
379impl StreamingPreprocessedParser {
380    pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
381        Self {
382            preprocessor,
383            stream_splitter: StreamSplitter::new(parser.splitter()),
384            parser,
385        }
386    }
387}
388
389impl Parser for StreamingPreprocessedParser {
390    fn parse(
391        &mut self,
392        data: &[u8],
393        metadata: Option<ConnectorMetadata>,
394    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
395        let (pre_data, mut pre_errors) = self.preprocessor.process(data);
396        self.stream_splitter.append(&pre_data);
397        let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
398        while let Some(chunk) = self.stream_splitter.next(true) {
399            let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
400            pre_errors.append(&mut parse_errors);
401            if let Some(data) = parsed_data {
402                parsed.push(data);
403            }
404        }
405        if parsed.is_empty() {
406            (None, pre_errors)
407        } else {
408            (Some(Box::new(parsed)), pre_errors)
409        }
410    }
411
412    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
413        self.parser.stage(buffers)
414    }
415
416    fn splitter(&self) -> Box<dyn Splitter> {
417        let pre_splitter = self.preprocessor.splitter();
418        if let Some(splitter) = pre_splitter {
419            return splitter;
420        }
421        self.parser.splitter()
422    }
423
424    fn fork(&self) -> Box<dyn Parser> {
425        Box::new(StreamingPreprocessedParser::new(
426            self.preprocessor.fork(),
427            self.parser.fork(),
428        ))
429    }
430}
431
432/// A parser with preprocessing for a message-oriented preprocessor
433pub struct MessageOrientedPreprocessedParser {
434    preprocessor: Box<dyn Preprocessor>,
435    parser: Box<dyn Parser>,
436}
437
438impl MessageOrientedPreprocessedParser {
439    pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
440        Self {
441            preprocessor,
442            parser,
443        }
444    }
445}
446
447impl Parser for MessageOrientedPreprocessedParser {
448    fn parse(
449        &mut self,
450        data: &[u8],
451        metadata: Option<ConnectorMetadata>,
452    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
453        let (pre_data, mut pre_errors) = self.preprocessor.process(data);
454        let mut parser_splitter = self.parser.splitter();
455        let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
456        let mut remaining = pre_data.as_slice();
457        // Use the parser to divide the message received from the preprocessor into chunks and parse each of them
458        while !remaining.is_empty() {
459            let chunk;
460            let split_offset = parser_splitter.input(remaining).unwrap_or(remaining.len());
461            (chunk, remaining) = remaining.split_at(split_offset);
462            let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
463            pre_errors.append(&mut parse_errors);
464            if let Some(data) = parsed_data {
465                parsed.push(data);
466            }
467        }
468        if parsed.is_empty() {
469            (None, pre_errors)
470        } else {
471            (Some(Box::new(parsed)), pre_errors)
472        }
473    }
474
475    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
476        self.parser.stage(buffers)
477    }
478
479    fn splitter(&self) -> Box<dyn Splitter> {
480        let pre_splitter = self.preprocessor.splitter();
481        if let Some(splitter) = pre_splitter {
482            return splitter;
483        }
484        self.parser.splitter()
485    }
486
487    fn fork(&self) -> Box<dyn Parser> {
488        Box::new(MessageOrientedPreprocessedParser::new(
489            self.preprocessor.fork(),
490            self.parser.fork(),
491        ))
492    }
493}
494
495/// Splits a data stream at boundaries between records.
496///
497/// [Parser::parse] or [Preprocessor::process] can only parse complete records.
498/// For a byte stream source, a format-specific [Splitter] allows a transport
499/// to find boundaries.
500pub trait Splitter: Send + Sync {
501    /// Looks for a record boundary in `data`. Returns:
502    ///
503    /// - `None`, if `data` does not necessarily complete a record.
504    ///
505    /// - `Some(n)`, if the first `n` bytes of data, plus any data previously
506    ///   presented for which `None` was returned, form one or more complete
507    ///   records. If `n < data.len()`, then the caller should re-present
508    ///   `data[n..]` for further splitting.
509    fn input(&mut self, data: &[u8]) -> Option<usize>;
510
511    /// Clears any state in this splitter and prepares it to start splitting new
512    /// data.
513    fn clear(&mut self);
514}
515
516/// Helper for breaking a stream of data into groups of records using a
517/// [Splitter].
518///
519/// A [Splitter] finds breakpoints between records given data presented to
520/// it. This is a higher-level data structure that takes input data and breaks
521/// it into chunks.
522pub struct StreamSplitter {
523    buffer: Vec<u8>,
524    start: u64,
525    fragment: Range<usize>,
526    fed: usize,
527    splitter: Box<dyn Splitter>,
528}
529
530impl StreamSplitter {
531    /// Returns a new stream splitter that finds breakpoints with `splitter`.
532    pub fn new(splitter: Box<dyn Splitter>) -> Self {
533        Self {
534            buffer: Vec::new(),
535            start: 0,
536            fragment: 0..0,
537            fed: 0,
538            splitter,
539        }
540    }
541
542    /// Returns the next full chunk of input, if any.  `eoi` specifies whether
543    /// the input stream is complete. If `eoi` is true and this function returns
544    /// `None`, then there are no more chunks.
545    pub fn next(&mut self, eoi: bool) -> Option<&[u8]> {
546        match self
547            .splitter
548            .input(&self.buffer[self.fed..self.fragment.end])
549        {
550            Some(n) => {
551                let chunk = &self.buffer[self.fragment.start..self.fed + n];
552                self.fed += n;
553                self.fragment.start = self.fed;
554                Some(chunk)
555            }
556            None => {
557                self.fed = self.fragment.end;
558                if eoi && !self.fragment.is_empty() {
559                    let chunk = &self.buffer[self.fragment.clone()];
560                    self.fragment.start = self.fragment.end;
561                    Some(chunk)
562                } else {
563                    None
564                }
565            }
566        }
567    }
568
569    /// Appends `data` to the data to be broken into chunks.
570    pub fn append(&mut self, data: &[u8]) {
571        let final_len = self.fragment.len() + data.len();
572        if final_len > self.buffer.len() {
573            self.buffer.reserve(final_len - self.buffer.len());
574        }
575        self.buffer.copy_within(self.fragment.clone(), 0);
576        self.buffer.resize(self.fragment.len(), 0);
577        self.buffer.extend(data);
578        self.fed -= self.fragment.start;
579        self.start += self.fragment.start as u64;
580        self.fragment = 0..self.buffer.len();
581    }
582
583    // Reads no more than `limit` bytes of data from `file` into the splitter,
584    // with an initial minimum buffer size of `buffer_size`. Returns the number
585    // of bytes read or an I/O error.
586    pub fn read(
587        &mut self,
588        file: &mut File,
589        buffer_size: usize,
590        limit: usize,
591    ) -> Result<usize, IoError> {
592        // Move data to beginning of buffer.
593        if self.fragment.start != 0 {
594            self.buffer.copy_within(self.fragment.clone(), 0);
595            self.fed -= self.fragment.start;
596            self.start += self.fragment.start as u64;
597            self.fragment = 0..self.fragment.len();
598        }
599
600        // Make sure there's some space to read data.
601        if self.fragment.len() == self.buffer.len() {
602            self.buffer
603                .resize(max(buffer_size, self.buffer.capacity() * 2), 0);
604        }
605
606        // Read data.
607        let mut space = &mut self.buffer[self.fragment.len()..];
608        if space.len() > limit {
609            space = &mut space[..limit];
610        }
611        let result = file.read(space);
612        if let Ok(n) = result {
613            self.fragment.end += n;
614        }
615        result
616    }
617
618    /// Returns the logical stream position of the next byte to be returned by
619    /// the splitter.
620    pub fn position(&self) -> u64 {
621        self.start + self.fragment.start as u64
622    }
623
624    /// Sets the logical stream position of the next byte to be returned by
625    /// the splitter to `offset`, and discards other state.
626    pub fn seek(&mut self, offset: u64) {
627        self.start = offset;
628        self.fragment = 0..0;
629        self.fed = 0;
630        self.splitter.clear();
631    }
632
633    /// Resets the splitter's state as if it were newly created.
634    pub fn reset(&mut self) {
635        self.seek(0);
636    }
637}
638
639pub trait OutputFormat: Send + Sync {
640    /// Unique name of the data format.
641    fn name(&self) -> Cow<'static, str>;
642
643    /// Extract encoder configuration from an HTTP request.
644    ///
645    /// Returns the extracted configuration cast to the `ErasedSerialize` trait
646    /// object (to keep this trait object-safe).
647    fn config_from_http_request(
648        &self,
649        endpoint_name: &str,
650        request: &HttpRequest,
651    ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
652
653    /// Create a new encoder for the format.
654    ///
655    /// # Arguments
656    ///
657    /// * `config` - Format-specific configuration.
658    ///
659    /// * `consumer` - Consumer to send encoded data batches to.
660    fn new_encoder(
661        &self,
662        endpoint_name: &str,
663        config: &ConnectorConfig,
664        key_schema: &Option<Relation>,
665        value_schema: &Relation,
666        consumer: Box<dyn OutputConsumer>,
667    ) -> Result<Box<dyn Encoder>, ControllerError>;
668}
669
670pub trait Encoder: Send {
671    /// Returns a reference to the consumer that the encoder is connected to.
672    fn consumer(&mut self) -> &mut dyn OutputConsumer;
673
674    /// Encode a batch of updates, push encoded buffers to the consumer
675    /// using [`OutputConsumer::push_buffer`].
676    fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
677}
678
679#[doc(hidden)]
680pub trait OutputConsumer: Send {
681    /// Maximum buffer size that this transport can transmit.
682    /// The encoder should not generate buffers exceeding this size.
683    fn max_buffer_size_bytes(&self) -> usize;
684
685    fn batch_start(&mut self, step: Step);
686
687    /// See OutputEndpoint::push_buffer.
688    fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
689
690    /// See OutputEndpoint::push_key.
691    fn push_key(
692        &mut self,
693        key: Option<&[u8]>,
694        val: Option<&[u8]>,
695        headers: &[(&str, Option<&[u8]>)],
696        num_records: usize,
697    );
698    fn batch_end(&mut self);
699
700    /// Returns the approximate amount of memory used by the connector's
701    /// underlying implementation.  For the Kafka connectors, for example, this
702    /// is the amount of memory used by librdkafka.  Not all connectors use a
703    /// substantial amount of memory, so the default implementation returns 0.
704    fn memory(&self) -> usize {
705        0
706    }
707}
708
709/// The largest weight of a record that can be output using
710/// a format without explicit weights. Such formats require
711/// duplicating the record `w` times, which is expensive
712/// for large weights (and is most likely not what the user
713/// intends).
714pub const MAX_DUPLICATES: i64 = 1_000_000;
715
716/// When including a long JSON record in an error message,
717/// truncate it to `MAX_RECORD_LEN_IN_ERRMSG` bytes.
718pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
719
720/// Error parsing input data.
721#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
722#[serde(transparent)]
723// Box the internals of `ParseError` to avoid
724// "Error variant to large" clippy warnings".
725pub struct ParseError(Box<ParseErrorInner>);
726impl Display for ParseError {
727    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
728        self.0.fmt(f)
729    }
730}
731
732impl StdError for ParseError {}
733
734impl ParseError {
735    pub fn new(
736        description: String,
737        event_number: Option<u64>,
738        field: Option<String>,
739        invalid_text: Option<&str>,
740        invalid_bytes: Option<&[u8]>,
741        suggestion: Option<Cow<'static, str>>,
742        error_tag: Option<String>,
743    ) -> Self {
744        Self(Box::new(ParseErrorInner::new(
745            description,
746            event_number,
747            field,
748            invalid_text,
749            invalid_bytes,
750            suggestion,
751            error_tag,
752        )))
753    }
754
755    pub fn text_event_error<E>(
756        msg: &str,
757        error: E,
758        event_number: u64,
759        invalid_text: Option<&str>,
760        suggestion: Option<Cow<'static, str>>,
761    ) -> Self
762    where
763        E: ToString,
764    {
765        Self(Box::new(ParseErrorInner::text_event_error(
766            msg,
767            error,
768            event_number,
769            invalid_text,
770            suggestion,
771        )))
772    }
773
774    pub fn text_envelope_error(
775        description: String,
776        invalid_text: &str,
777        suggestion: Option<Cow<'static, str>>,
778    ) -> Self {
779        Self(Box::new(ParseErrorInner::text_envelope_error(
780            description,
781            invalid_text,
782            suggestion,
783        )))
784    }
785
786    pub fn bin_event_error(
787        description: String,
788        event_number: u64,
789        invalid_bytes: &[u8],
790        suggestion: Option<Cow<'static, str>>,
791    ) -> Self {
792        Self(Box::new(ParseErrorInner::bin_event_error(
793            description,
794            event_number,
795            invalid_bytes,
796            suggestion,
797        )))
798    }
799
800    pub fn bin_envelope_error(
801        description: String,
802        invalid_bytes: &[u8],
803        suggestion: Option<Cow<'static, str>>,
804    ) -> Self {
805        Self(Box::new(ParseErrorInner::bin_envelope_error(
806            description,
807            invalid_bytes,
808            suggestion,
809        )))
810    }
811
812    /// Returns a new `ParseError` with the description modified by `f`.
813    ///
814    /// Can be used, e.g., to prepend context to the description.
815    pub fn map_description<F>(self, f: F) -> Self
816    where
817        F: FnOnce(&str) -> String,
818    {
819        let mut inner = self.0;
820        let description = f(&inner.description);
821        inner.description = description;
822        Self(inner)
823    }
824
825    pub fn get_error_tag(&self) -> Option<String> {
826        self.0.get_error_tag()
827    }
828}
829
830#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
831pub struct ParseErrorInner {
832    /// Error description.
833    description: String,
834
835    /// Event number relative to the start of the stream.
836    ///
837    /// An input stream is a series data change events (row insertions,
838    /// deletions, and updates).  This field specifies the index (starting
839    /// from 1) of the event that caused the error, relative to the start of
840    /// the stream.  In some cases this index cannot be identified, e.g., if
841    /// the error makes an entire block of events unparseable.
842    event_number: Option<u64>,
843
844    /// Field that failed to parse.
845    ///
846    /// Only set when the parsing error can be attributed to a
847    /// specific field.
848    field: Option<String>,
849
850    /// Invalid fragment of input data.
851    ///
852    /// Used for binary data formats and for text-based formats when the input
853    /// is not valid UTF-8 string.
854    invalid_bytes: Option<Vec<u8>>,
855
856    /// Invalid fragment of the input text.
857    ///
858    /// Only used for text-based formats and in cases when input is valid UTF-8.
859    invalid_text: Option<String>,
860
861    /// Any additional information that may help fix the problem, e.g., example
862    /// of a valid input.
863    suggestion: Option<Cow<'static, str>>,
864
865    /// Used for rate limiting
866    tag: Option<String>,
867}
868
869impl Display for ParseErrorInner {
870    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
871        let event = if let Some(event_number) = self.event_number {
872            format!(" (event #{})", event_number)
873        } else {
874            String::new()
875        };
876
877        let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
878            format!("\nInvalid bytes: {invalid_bytes:?}")
879        } else if let Some(invalid_text) = &self.invalid_text {
880            format!("\nInvalid fragment: '{invalid_text}'")
881        } else {
882            String::new()
883        };
884
885        let suggestion = if let Some(suggestion) = &self.suggestion {
886            format!("\n{suggestion}")
887        } else {
888            String::new()
889        };
890
891        write!(
892            f,
893            "Parse error{event}: {}{invalid_fragment}{suggestion}",
894            self.description
895        )
896    }
897}
898
899impl ParseErrorInner {
900    pub fn new(
901        description: String,
902        event_number: Option<u64>,
903        field: Option<String>,
904        invalid_text: Option<&str>,
905        invalid_bytes: Option<&[u8]>,
906        suggestion: Option<Cow<'static, str>>,
907        error_tag: Option<String>,
908    ) -> Self {
909        Self {
910            description,
911            event_number,
912            field,
913            invalid_text: invalid_text.map(str::to_string),
914            invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
915            suggestion,
916            tag: error_tag,
917        }
918    }
919
920    /// Error parsing an individual event in a text-based input format (e.g.,
921    /// JSON, CSV).
922    pub fn text_event_error<E>(
923        msg: &str,
924        error: E,
925        event_number: u64,
926        invalid_text: Option<&str>,
927        suggestion: Option<Cow<'static, str>>,
928    ) -> Self
929    where
930        E: ToString,
931    {
932        let err_str = error.to_string();
933        // Try to parse the error as `FieldParseError`.  If this is not a field-specific
934        // error or the error was not returned by the `deserialize_table_record`
935        // macro, this will fail and we'll store the error as is.
936        let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
937            if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
938                .into_iter::<FieldParseError>()
939                .next()
940            {
941                (err.description, Some(err.field))
942            } else {
943                (err_str, None)
944            }
945        } else {
946            (err_str, None)
947        };
948        let column_name = if let Some(field) = &field {
949            format!(": error parsing field '{field}'")
950        } else {
951            String::new()
952        };
953
954        Self::new(
955            format!("{msg}{column_name}: {descr}",),
956            Some(event_number),
957            field,
958            invalid_text,
959            None,
960            suggestion,
961            Some("text_event_err".to_string()),
962        )
963    }
964
965    /// Error parsing a container, e.g., a JSON array, with multiple events.
966    ///
967    /// Such errors cannot be attributed to an individual event numbers.
968    pub fn text_envelope_error(
969        description: String,
970        invalid_text: &str,
971        suggestion: Option<Cow<'static, str>>,
972    ) -> Self {
973        Self::new(
974            description,
975            None,
976            None,
977            Some(invalid_text),
978            None,
979            suggestion,
980            Some("text_envelope_err".to_string()),
981        )
982    }
983
984    /// Error parsing an individual event in a binary input format (e.g.,
985    /// bincode).
986    pub fn bin_event_error(
987        description: String,
988        event_number: u64,
989        invalid_bytes: &[u8],
990        suggestion: Option<Cow<'static, str>>,
991    ) -> Self {
992        Self::new(
993            description,
994            Some(event_number),
995            None,
996            None,
997            Some(invalid_bytes),
998            suggestion,
999            Some("bin_event_err".to_string()),
1000        )
1001    }
1002
1003    /// Error parsing a container with multiple events.
1004    ///
1005    /// Such errors cannot be attributed to an individual event numbers.
1006    pub fn bin_envelope_error(
1007        description: String,
1008        invalid_bytes: &[u8],
1009        suggestion: Option<Cow<'static, str>>,
1010    ) -> Self {
1011        Self::new(
1012            description,
1013            None,
1014            None,
1015            None,
1016            Some(invalid_bytes),
1017            suggestion,
1018            Some("bin_envelope_err".to_string()),
1019        )
1020    }
1021
1022    pub fn get_error_tag(&self) -> Option<String> {
1023        self.tag.clone()
1024    }
1025}