Skip to main content

feldera_adapterlib/
format.rs

1use std::any::Any;
2use std::borrow::Cow;
3use std::cmp::max;
4use std::fmt::{Display, Error as FmtError, Formatter};
5use std::fs::File;
6use std::hash::Hasher;
7use std::io::{Error as IoError, Read};
8use std::ops::{Add, AddAssign, Range};
9use std::sync::Arc;
10
11use actix_web::HttpRequest;
12use anyhow::Result as AnyResult;
13use dbsp::operator::input::StagedBuffers;
14use erased_serde::Serialize as ErasedSerialize;
15use feldera_types::config::ConnectorConfig;
16use feldera_types::program_schema::Relation;
17use feldera_types::serde_with_context::FieldParseError;
18use serde::Serialize;
19use serde::de::StdError;
20
21use crate::ConnectorMetadata;
22use crate::catalog::{InputCollectionHandle, SerBatchReader};
23use crate::errors::controller::ControllerError;
24use crate::postprocess::Postprocessor;
25use crate::preprocess::Preprocessor;
26use crate::transport::{OutputBatchType, Step};
27
28/// Trait that represents a specific data format.
29///
30/// This is a factory trait that creates parsers for a specific data format.
31pub trait InputFormat: Send + Sync {
32    /// Unique name of the data format.
33    fn name(&self) -> Cow<'static, str>;
34
35    /// Extract parser configuration from an HTTP request.
36    ///
37    /// Returns the extracted configuration cast to the `ErasedSerialize` trait
38    /// object (to keep this trait object-safe).
39    ///
40    /// # Discussion
41    ///
42    /// We could rely on the `serde_urlencoded` crate to deserialize the config
43    /// from the HTTP request, which is what most implementations will do
44    /// internally; however allowing the implementation to override this
45    /// method enables additional flexibility. For example, an
46    /// implementation may use `Content-Type` and other request headers, set
47    /// HTTP-specific defaults for config fields, etc.
48    fn config_from_http_request(
49        &self,
50        endpoint_name: &str,
51        request: &HttpRequest,
52    ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
53
54    /// Create a new parser for the format.
55    ///
56    /// # Arguments
57    ///
58    /// * `input_stream` - Input stream of the circuit to push parsed data to.
59    ///
60    /// * `config` - Format-specific configuration.
61    fn new_parser(
62        &self,
63        endpoint_name: &str,
64        input_stream: &InputCollectionHandle,
65        config: &serde_json::Value,
66    ) -> Result<Box<dyn Parser>, ControllerError>;
67}
68
69/// A collection of records associated with an input handle.
70///
71/// A [Parser] holds and adds records to an [InputBuffer].  The client, which is
72/// typically an [InputReader](crate::transport::InputReader), collects one or
73/// more [InputBuffer]s and pushes them to the circuit when the controller
74/// requests it.
75///
76/// # Pushing buffers into a circuit
77///
78/// There are two ways to push `InputBuffer`s into a circuit:
79///
80/// - With [InputBuffer::flush].  This immediately pushes the input buffer into
81///   the DBSP input handle.
82///
83/// - By passing buffers to [Parser::stage], which collects all of them into a
84///   [StagedBuffers].  Then, later, call [StagedBuffers::flush] to push the
85///   input buffers into the circuit.
86///
87/// Both approaches are equivalent in terms of correctness.  There can be a
88/// difference in performance, because [InputBuffer::flush] has a significant
89/// cost for a large number of records.  Using [StagedBuffers] has a similar
90/// cost, but it incurs it in the call to [Parser::stage] rather than in
91/// [StagedBuffers::flush].  This means that, if the input connector can buffer
92/// data ahead of the circuit's demand for it, the cost can be hidden and the
93/// circuit as a whole runs faster.
94pub trait InputBuffer: Any + Send {
95    /// Pushes all of the records into the circuit input handle, and discards
96    /// those records.
97    fn flush(&mut self);
98
99    /// Returns the number of buffered records and bytes.
100    fn len(&self) -> BufferSize;
101
102    /// Hashes the records in the input buffer into `hasher`, in order.  This is
103    /// used to ensure that input data remains the same for replay, so, for
104    /// equal data, it should remain the same from one program run to the next.
105    /// Data might be divided into `InputBuffer`s differently from one run to
106    /// the next, so the data fed into `hasher` should be the same if, for
107    /// example, records 0..10 then 10..20 are fed in one run and 0..5, 5..15,
108    /// 15..20 in another.
109    fn hash(&self, hasher: &mut dyn Hasher);
110
111    fn is_empty(&self) -> bool {
112        self.len().is_empty()
113    }
114
115    /// Removes the first `n` records from this input buffer and returns a new
116    /// [InputBuffer] that holds them. If fewer than `n` records are available,
117    /// returns all of them. May return `None` if this input buffer is empty (or
118    /// if `n` is 0).
119    ///
120    /// Some implementations can't implement `n` with full granularity. These
121    /// implementations might return more than `n` records.
122    ///
123    /// This is useful for extracting the records from one of several parser
124    /// threads to send to a single common thread to be pushed later.
125    ///
126    /// # Byte accounting
127    ///
128    /// This function must not increase or decrease the total number of bytes.
129    /// That is, if the returned buffer is named `head`, `self.len().bytes`
130    /// before the call must equal `self.len().bytes + head.len().bytes`
131    /// following the call.  Violating this invariant will cause the number of
132    /// buffered bytes reported by a pipeline never to fall to zero (or to wrap
133    /// around to `u64::MAX`).
134    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>>;
135
136    fn take_all(&mut self) -> Option<Box<dyn InputBuffer>> {
137        self.take_some(usize::MAX)
138    }
139}
140
141/// The size of an [InputBuffer].
142#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
143pub struct BufferSize {
144    /// The exact number of records in the buffer.
145    pub records: usize,
146
147    /// The number of bytes attributable to the buffer.
148    ///
149    /// This need not be exact.  (When a buffer is split with
150    /// [InputBuffer::take_some], it will usually not be exact.)
151    pub bytes: usize,
152}
153
154impl BufferSize {
155    /// The size of an empty buffer.
156    pub fn empty() -> Self {
157        Self::default()
158    }
159
160    /// Returns true if this buffer is empty.
161    pub fn is_empty(&self) -> bool {
162        self.records == 0 && self.bytes == 0
163    }
164}
165
166impl Add for BufferSize {
167    type Output = Self;
168
169    fn add(self, rhs: Self) -> Self::Output {
170        BufferSize {
171            records: self.records + rhs.records,
172            bytes: self.bytes + rhs.bytes,
173        }
174    }
175}
176
177impl AddAssign for BufferSize {
178    fn add_assign(&mut self, rhs: Self) {
179        self.records += rhs.records;
180        self.bytes += rhs.bytes;
181    }
182}
183
184impl InputBuffer for Option<Box<dyn InputBuffer>> {
185    fn len(&self) -> BufferSize {
186        self.as_ref()
187            .map_or(BufferSize::empty(), |buffer| buffer.len())
188    }
189
190    fn hash(&self, hasher: &mut dyn Hasher) {
191        if let Some(buffer) = self {
192            buffer.hash(hasher)
193        }
194    }
195
196    fn flush(&mut self) {
197        if let Some(buffer) = self.as_mut() {
198            buffer.flush()
199        }
200    }
201
202    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
203        self.as_mut().and_then(|buffer| buffer.take_some(n))
204    }
205}
206
207impl InputBuffer for Box<dyn InputBuffer> {
208    fn len(&self) -> BufferSize {
209        self.as_ref().len()
210    }
211
212    fn hash(&self, hasher: &mut dyn Hasher) {
213        self.as_ref().hash(hasher)
214    }
215
216    fn flush(&mut self) {
217        self.as_mut().flush()
218    }
219
220    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
221        self.as_mut().take_some(n)
222    }
223}
224
225impl InputBuffer for Vec<Box<dyn InputBuffer>> {
226    fn flush(&mut self) {
227        for v in self.iter_mut() {
228            v.flush();
229        }
230    }
231
232    fn len(&self) -> BufferSize {
233        let mut size = BufferSize::empty();
234        for v in self.iter() {
235            size += v.len();
236        }
237        size
238    }
239
240    fn hash(&self, hasher: &mut dyn Hasher) {
241        for v in self.iter() {
242            v.hash(hasher);
243        }
244    }
245
246    fn take_some(&mut self, n: usize) -> Option<Box<dyn InputBuffer>> {
247        let mut result = Vec::new();
248        let mut remaining = n;
249        // Index of first buffer that should be preserved
250        let mut index = 0;
251        for v in self.iter_mut() {
252            if remaining == 0 {
253                break;
254            }
255            let buf = v.take_some(remaining);
256            if let Some(ib) = buf {
257                let len = ib.len().records;
258                if remaining >= len {
259                    // This buffer will be completely used
260                    index += 1;
261                }
262                remaining = remaining.saturating_sub(len);
263                result.push(ib);
264            }
265        }
266        self.drain(0..index);
267        if result.is_empty() {
268            None
269        } else {
270            Some(Box::new(result))
271        }
272    }
273}
274
275/// If any of the InputBuffer elements is a Vec itself, flatten it recursively.
276/// Return the concatenated input buffers all downcast to T.
277pub fn flatten_nested<T>(buffers: Vec<Box<dyn InputBuffer>>) -> Vec<Box<T>>
278where
279    T: Any,
280{
281    fn inner<T>(input: Vec<Box<dyn InputBuffer>>, output: &mut Vec<Box<T>>)
282    where
283        T: Any,
284    {
285        for buffer in input {
286            let any = buffer as Box<dyn Any>;
287            match any.downcast::<Vec<Box<dyn InputBuffer>>>() {
288                Ok(vec) => inner(*vec, output),
289                Err(any) => output.push(any.downcast().unwrap()),
290            }
291        }
292    }
293
294    let mut output = Vec::new();
295    inner(buffers, &mut output);
296    output
297}
298
299/// A wrapper around a [StagedBuffers] that implements the [InputBuffer] trait.
300///
301/// The `StagedBuffers` trait is similar to `InputBuffer` in that it supports flushing
302/// a set of tuples to the circuit. Unlike `InputBuffer`, it doesn't support returning
303/// its size as a `BufferSize`. It also doesn't support hashing and taking a subset of
304/// records.
305///
306/// This wrapper adds the former by storing `BufferSize` collected from `InputBuffer`s
307/// used to create the `StagedBuffers`.
308///
309/// `hash()` and `take_some()` methods are unimplemented. Therefore this wrapper can only be
310/// safely used in contexts where these methods are not needed.
311///
312// FIXME: It would be better to encode the unimplemented functionality in the type system,
313// e.g., as an extension trait.
314pub struct StagedInputBuffer {
315    buffer: Box<dyn StagedBuffers>,
316    size: BufferSize,
317}
318
319impl StagedInputBuffer {
320    pub fn new(buffer: Box<dyn StagedBuffers>, size: BufferSize) -> Self {
321        Self { buffer, size }
322    }
323}
324
325impl InputBuffer for StagedInputBuffer {
326    fn flush(&mut self) {
327        self.buffer.flush()
328    }
329
330    fn len(&self) -> BufferSize {
331        self.size
332    }
333
334    fn hash(&self, _hasher: &mut dyn Hasher) {
335        unimplemented!()
336    }
337
338    fn take_some(&mut self, _n: usize) -> Option<Box<dyn InputBuffer>> {
339        unimplemented!()
340    }
341}
342
343/// Parses raw bytes into database records.
344pub trait Parser: Send + Sync {
345    /// Parses `data` into records and returns the records and any parse errors
346    /// that occurred.
347    ///
348    /// XXX it would be even better if this were `&self` and avoided keeping
349    /// state entirely.
350    fn parse(
351        &mut self,
352        data: &[u8],
353        metadata: Option<ConnectorMetadata>,
354    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>);
355
356    /// Stages all of the `buffers`, which must have been obtained from this
357    /// [Parser] or one forked from this one, into a [StagedBuffers] that may
358    /// later be used to push the collected data into the circuit.  See
359    /// [StagedBuffers] for more information.
360    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers>;
361
362    /// Returns an object that can be used to break a stream of incoming data
363    /// into complete records to pass to [Parser::parse].
364    fn splitter(&self) -> Box<dyn Splitter>;
365
366    /// Create a new parser with the same configuration as `self`.
367    ///
368    /// Used by multithreaded transport endpoints to create multiple parallel
369    /// input pipelines.
370    fn fork(&self) -> Box<dyn Parser>;
371}
372
373/// A parser with preprocessing for a streaming preprocessor
374pub struct StreamingPreprocessedParser {
375    preprocessor: Box<dyn Preprocessor>,
376    stream_splitter: StreamSplitter,
377    parser: Box<dyn Parser>,
378}
379
380impl StreamingPreprocessedParser {
381    pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
382        Self {
383            preprocessor,
384            stream_splitter: StreamSplitter::new(parser.splitter()),
385            parser,
386        }
387    }
388}
389
390impl Parser for StreamingPreprocessedParser {
391    fn parse(
392        &mut self,
393        data: &[u8],
394        metadata: Option<ConnectorMetadata>,
395    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
396        let (pre_data, mut pre_errors) = self.preprocessor.process(data);
397        self.stream_splitter.append(&pre_data);
398        let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
399        while let Some(chunk) = self.stream_splitter.next(true) {
400            let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
401            pre_errors.append(&mut parse_errors);
402            if let Some(data) = parsed_data {
403                parsed.push(data);
404            }
405        }
406        if parsed.is_empty() {
407            (None, pre_errors)
408        } else {
409            (Some(Box::new(parsed)), pre_errors)
410        }
411    }
412
413    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
414        self.parser.stage(buffers)
415    }
416
417    fn splitter(&self) -> Box<dyn Splitter> {
418        let pre_splitter = self.preprocessor.splitter();
419        if let Some(splitter) = pre_splitter {
420            return splitter;
421        }
422        self.parser.splitter()
423    }
424
425    fn fork(&self) -> Box<dyn Parser> {
426        Box::new(StreamingPreprocessedParser::new(
427            self.preprocessor.fork(),
428            self.parser.fork(),
429        ))
430    }
431}
432
433/// A parser with preprocessing for a message-oriented preprocessor
434pub struct MessageOrientedPreprocessedParser {
435    preprocessor: Box<dyn Preprocessor>,
436    parser: Box<dyn Parser>,
437}
438
439impl MessageOrientedPreprocessedParser {
440    pub fn new(preprocessor: Box<dyn Preprocessor>, parser: Box<dyn Parser>) -> Self {
441        Self {
442            preprocessor,
443            parser,
444        }
445    }
446}
447
448impl Parser for MessageOrientedPreprocessedParser {
449    fn parse(
450        &mut self,
451        data: &[u8],
452        metadata: Option<ConnectorMetadata>,
453    ) -> (Option<Box<dyn InputBuffer>>, Vec<ParseError>) {
454        let (pre_data, mut pre_errors) = self.preprocessor.process(data);
455        let mut parser_splitter = self.parser.splitter();
456        let mut parsed: Vec<Box<dyn InputBuffer>> = Vec::new();
457        let mut remaining = pre_data.as_slice();
458        // Use the parser to divide the message received from the preprocessor into chunks and parse each of them
459        while !remaining.is_empty() {
460            let chunk;
461            let split_offset = parser_splitter.input(remaining).unwrap_or(remaining.len());
462            (chunk, remaining) = remaining.split_at(split_offset);
463            let (parsed_data, mut parse_errors) = self.parser.parse(chunk, metadata.clone());
464            pre_errors.append(&mut parse_errors);
465            if let Some(data) = parsed_data {
466                parsed.push(data);
467            }
468        }
469        if parsed.is_empty() {
470            (None, pre_errors)
471        } else {
472            (Some(Box::new(parsed)), pre_errors)
473        }
474    }
475
476    fn stage(&self, buffers: Vec<Box<dyn InputBuffer>>) -> Box<dyn StagedBuffers> {
477        self.parser.stage(buffers)
478    }
479
480    fn splitter(&self) -> Box<dyn Splitter> {
481        let pre_splitter = self.preprocessor.splitter();
482        if let Some(splitter) = pre_splitter {
483            return splitter;
484        }
485        self.parser.splitter()
486    }
487
488    fn fork(&self) -> Box<dyn Parser> {
489        Box::new(MessageOrientedPreprocessedParser::new(
490            self.preprocessor.fork(),
491            self.parser.fork(),
492        ))
493    }
494}
495
496/// Splits a data stream at boundaries between records.
497///
498/// [Parser::parse] or [Preprocessor::process] can only parse complete records.
499/// For a byte stream source, a format-specific [Splitter] allows a transport
500/// to find boundaries.
501pub trait Splitter: Send + Sync {
502    /// Looks for a record boundary in `data`. Returns:
503    ///
504    /// - `None`, if `data` does not necessarily complete a record.
505    ///
506    /// - `Some(n)`, if the first `n` bytes of data, plus any data previously
507    ///   presented for which `None` was returned, form one or more complete
508    ///   records. If `n < data.len()`, then the caller should re-present
509    ///   `data[n..]` for further splitting.
510    fn input(&mut self, data: &[u8]) -> Option<usize>;
511
512    /// Clears any state in this splitter and prepares it to start splitting new
513    /// data.
514    fn clear(&mut self);
515}
516
517/// Helper for breaking a stream of data into groups of records using a
518/// [Splitter].
519///
520/// A [Splitter] finds breakpoints between records given data presented to
521/// it. This is a higher-level data structure that takes input data and breaks
522/// it into chunks.
523pub struct StreamSplitter {
524    buffer: Vec<u8>,
525    start: u64,
526    fragment: Range<usize>,
527    fed: usize,
528    splitter: Box<dyn Splitter>,
529}
530
531impl StreamSplitter {
532    /// Returns a new stream splitter that finds breakpoints with `splitter`.
533    pub fn new(splitter: Box<dyn Splitter>) -> Self {
534        Self {
535            buffer: Vec::new(),
536            start: 0,
537            fragment: 0..0,
538            fed: 0,
539            splitter,
540        }
541    }
542
543    /// Returns the next full chunk of input, if any.  `eoi` specifies whether
544    /// the input stream is complete. If `eoi` is true and this function returns
545    /// `None`, then there are no more chunks.
546    pub fn next(&mut self, eoi: bool) -> Option<&[u8]> {
547        match self
548            .splitter
549            .input(&self.buffer[self.fed..self.fragment.end])
550        {
551            Some(n) => {
552                let chunk = &self.buffer[self.fragment.start..self.fed + n];
553                self.fed += n;
554                self.fragment.start = self.fed;
555                Some(chunk)
556            }
557            None => {
558                self.fed = self.fragment.end;
559                if eoi && !self.fragment.is_empty() {
560                    let chunk = &self.buffer[self.fragment.clone()];
561                    self.fragment.start = self.fragment.end;
562                    Some(chunk)
563                } else {
564                    None
565                }
566            }
567        }
568    }
569
570    /// Appends `data` to the data to be broken into chunks.
571    pub fn append(&mut self, data: &[u8]) {
572        let final_len = self.fragment.len() + data.len();
573        if final_len > self.buffer.len() {
574            self.buffer.reserve(final_len - self.buffer.len());
575        }
576        self.buffer.copy_within(self.fragment.clone(), 0);
577        self.buffer.resize(self.fragment.len(), 0);
578        self.buffer.extend(data);
579        self.fed -= self.fragment.start;
580        self.start += self.fragment.start as u64;
581        self.fragment = 0..self.buffer.len();
582    }
583
584    // Reads no more than `limit` bytes of data from `file` into the splitter,
585    // with an initial minimum buffer size of `buffer_size`. Returns the number
586    // of bytes read or an I/O error.
587    pub fn read(
588        &mut self,
589        file: &mut File,
590        buffer_size: usize,
591        limit: usize,
592    ) -> Result<usize, IoError> {
593        // Move data to beginning of buffer.
594        if self.fragment.start != 0 {
595            self.buffer.copy_within(self.fragment.clone(), 0);
596            self.fed -= self.fragment.start;
597            self.start += self.fragment.start as u64;
598            self.fragment = 0..self.fragment.len();
599        }
600
601        // Make sure there's some space to read data.
602        if self.fragment.len() == self.buffer.len() {
603            self.buffer
604                .resize(max(buffer_size, self.buffer.capacity() * 2), 0);
605        }
606
607        // Read data.
608        let mut space = &mut self.buffer[self.fragment.len()..];
609        if space.len() > limit {
610            space = &mut space[..limit];
611        }
612        let result = file.read(space);
613        if let Ok(n) = result {
614            self.fragment.end += n;
615        }
616        result
617    }
618
619    /// Returns the logical stream position of the next byte to be returned by
620    /// the splitter.
621    pub fn position(&self) -> u64 {
622        self.start + self.fragment.start as u64
623    }
624
625    /// Sets the logical stream position of the next byte to be returned by
626    /// the splitter to `offset`, and discards other state.
627    pub fn seek(&mut self, offset: u64) {
628        self.start = offset;
629        self.fragment = 0..0;
630        self.fed = 0;
631        self.splitter.clear();
632    }
633
634    /// Resets the splitter's state as if it were newly created.
635    pub fn reset(&mut self) {
636        self.seek(0);
637    }
638}
639
640pub trait OutputFormat: Send + Sync {
641    /// Unique name of the data format.
642    fn name(&self) -> Cow<'static, str>;
643
644    /// Extract encoder configuration from an HTTP request.
645    ///
646    /// Returns the extracted configuration cast to the `ErasedSerialize` trait
647    /// object (to keep this trait object-safe).
648    fn config_from_http_request(
649        &self,
650        endpoint_name: &str,
651        request: &HttpRequest,
652    ) -> Result<Box<dyn ErasedSerialize>, ControllerError>;
653
654    /// Create a new encoder for the format.
655    ///
656    /// # Arguments
657    ///
658    /// * `config` - Format-specific configuration.
659    /// * `key_schema` - Schema of the keys in the stream; only set for indexed Z-sets.
660    /// * `value_schema` - Schema of the values in the stream. If the stream is an indexed Z-set,
661    ///   this is the schema of the values in the stream; if it is a Z-set, this is the schema of the keys in the stream.
662    /// * `consumer` - Consumer to send encoded data batches to.
663    /// * `is_index` - Whether the connector is configured with the `index` property.
664    ///
665    /// `is_index` implies that `key_schema` is set. The inverse is not true: the stream may be an indexed Z-set; but
666    /// the connector is not configured with the `index` property. The connector must iterate over the values in the
667    /// stream either directly or using `SerCursorFlattened`.
668    fn new_encoder(
669        &self,
670        endpoint_name: &str,
671        config: &ConnectorConfig,
672        key_schema: &Option<Relation>,
673        value_schema: &Relation,
674        consumer: Box<dyn OutputConsumer>,
675        is_index: bool,
676    ) -> Result<Box<dyn Encoder>, ControllerError>;
677}
678
679pub trait Encoder: Send {
680    /// Returns a reference to the consumer that the encoder is connected to.
681    fn consumer(&mut self) -> &mut dyn OutputConsumer;
682
683    /// Encode a batch of updates and push the results to the consumer.
684    ///
685    /// The encoder calls [`OutputConsumer::batch_start`] before emitting any
686    /// data, then delivers encoded records via [`OutputConsumer::push_buffer`]
687    /// or [`OutputConsumer::push_key`], and finally calls
688    /// [`OutputConsumer::batch_end`].
689    ///
690    /// Which of `push_buffer` or `push_key` is used depends on the kind of encoder used.
691    fn encode(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()>;
692}
693
694#[doc(hidden)]
695pub trait OutputConsumer: Send {
696    /// Maximum buffer size that this transport can transmit.
697    /// The encoder should not generate buffers exceeding this size.
698    fn max_buffer_size_bytes(&self) -> usize;
699
700    fn batch_start(&mut self, step: Step, batch_type: OutputBatchType);
701
702    /// See OutputEndpoint::push_buffer.
703    fn push_buffer(&mut self, buffer: &[u8], num_records: usize);
704
705    /// See OutputEndpoint::push_key.
706    fn push_key(
707        &mut self,
708        key: Option<&[u8]>,
709        val: Option<&[u8]>,
710        headers: &[(&str, Option<&[u8]>)],
711        num_records: usize,
712    );
713    fn batch_end(&mut self);
714
715    /// Returns the approximate amount of memory used by the connector's
716    /// underlying implementation.  For the Kafka connectors, for example, this
717    /// is the amount of memory used by librdkafka.  Not all connectors use a
718    /// substantial amount of memory, so the default implementation returns 0.
719    fn memory(&self) -> usize {
720        0
721    }
722}
723
724/// Callback invoked when a [`Postprocessor`] returns an error.
725///
726/// The argument is the error returned by the postprocessor.  The callback
727/// is responsible for reporting or logging the error; the record that
728/// triggered the error is dropped and processing continues.
729pub type PostprocessorErrorCallback = Box<dyn Fn(anyhow::Error) + Send + Sync>;
730
731/// Bridges a [`Postprocessor`] into the [`OutputConsumer`] interface.
732///
733/// Each [`OutputConsumer`] method calls the corresponding [`Postprocessor`]
734/// method, then forwards its return value to the inner consumer.
735/// When the postprocessor returns an error, `error_cb` is invoked and the
736/// affected record is dropped.
737pub struct PostprocessedConsumer {
738    inner: Box<dyn OutputConsumer>,
739    postprocessor: Box<dyn Postprocessor>,
740    error_cb: PostprocessorErrorCallback,
741}
742
743impl PostprocessedConsumer {
744    pub fn new(
745        inner: Box<dyn OutputConsumer>,
746        postprocessor: Box<dyn Postprocessor>,
747        error_cb: PostprocessorErrorCallback,
748    ) -> Self {
749        Self {
750            inner,
751            postprocessor,
752            error_cb,
753        }
754    }
755}
756
757impl OutputConsumer for PostprocessedConsumer {
758    fn max_buffer_size_bytes(&self) -> usize {
759        self.inner.max_buffer_size_bytes()
760    }
761
762    fn batch_start(&mut self, step: Step, batch_type: OutputBatchType) {
763        self.postprocessor.batch_start(step, batch_type);
764        self.inner.batch_start(step, batch_type);
765    }
766
767    fn push_buffer(&mut self, buffer: &[u8], num_records: usize) {
768        match self.postprocessor.push_buffer(buffer) {
769            Ok(transformed) => self.inner.push_buffer(&transformed, num_records),
770            Err(e) => (self.error_cb)(e),
771        }
772    }
773
774    fn push_key(
775        &mut self,
776        key: Option<&[u8]>,
777        val: Option<&[u8]>,
778        headers: &[(&str, Option<&[u8]>)],
779        num_records: usize,
780    ) {
781        match self.postprocessor.push_key(key, val, headers) {
782            Ok((k, v, h)) => {
783                let h_refs: Vec<(&str, Option<&[u8]>)> =
784                    h.iter().map(|(k, v)| (k.as_str(), v.as_deref())).collect();
785                self.inner
786                    .push_key(k.as_deref(), v.as_deref(), &h_refs, num_records);
787            }
788            Err(e) => (self.error_cb)(e),
789        }
790    }
791
792    fn batch_end(&mut self) {
793        self.postprocessor.batch_end();
794        self.inner.batch_end();
795    }
796
797    fn memory(&self) -> usize {
798        self.inner.memory() + self.postprocessor.memory()
799    }
800}
801
802/// The largest weight of a record that can be output using
803/// a format without explicit weights. Such formats require
804/// duplicating the record `w` times, which is expensive
805/// for large weights (and is most likely not what the user
806/// intends).
807pub const MAX_DUPLICATES: i64 = 1_000_000;
808
809/// When including a long JSON record in an error message,
810/// truncate it to `MAX_RECORD_LEN_IN_ERRMSG` bytes.
811pub const MAX_RECORD_LEN_IN_ERRMSG: usize = 4096;
812
813/// Error parsing input data.
814#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
815#[serde(transparent)]
816// Box the internals of `ParseError` to avoid
817// "Error variant to large" clippy warnings".
818pub struct ParseError(Box<ParseErrorInner>);
819impl Display for ParseError {
820    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
821        self.0.fmt(f)
822    }
823}
824
825impl StdError for ParseError {}
826
827impl ParseError {
828    pub fn new(
829        description: String,
830        event_number: Option<u64>,
831        field: Option<String>,
832        invalid_text: Option<&str>,
833        invalid_bytes: Option<&[u8]>,
834        suggestion: Option<Cow<'static, str>>,
835        error_tag: Option<String>,
836    ) -> Self {
837        Self(Box::new(ParseErrorInner::new(
838            description,
839            event_number,
840            field,
841            invalid_text,
842            invalid_bytes,
843            suggestion,
844            error_tag,
845        )))
846    }
847
848    pub fn text_event_error<E>(
849        msg: &str,
850        error: E,
851        event_number: u64,
852        invalid_text: Option<&str>,
853        suggestion: Option<Cow<'static, str>>,
854    ) -> Self
855    where
856        E: ToString,
857    {
858        Self(Box::new(ParseErrorInner::text_event_error(
859            msg,
860            error,
861            event_number,
862            invalid_text,
863            suggestion,
864        )))
865    }
866
867    pub fn text_envelope_error(
868        description: String,
869        invalid_text: &str,
870        suggestion: Option<Cow<'static, str>>,
871    ) -> Self {
872        Self(Box::new(ParseErrorInner::text_envelope_error(
873            description,
874            invalid_text,
875            suggestion,
876        )))
877    }
878
879    pub fn bin_event_error(
880        description: String,
881        event_number: u64,
882        invalid_bytes: &[u8],
883        suggestion: Option<Cow<'static, str>>,
884    ) -> Self {
885        Self(Box::new(ParseErrorInner::bin_event_error(
886            description,
887            event_number,
888            invalid_bytes,
889            suggestion,
890        )))
891    }
892
893    pub fn bin_envelope_error(
894        description: String,
895        invalid_bytes: &[u8],
896        suggestion: Option<Cow<'static, str>>,
897    ) -> Self {
898        Self(Box::new(ParseErrorInner::bin_envelope_error(
899            description,
900            invalid_bytes,
901            suggestion,
902        )))
903    }
904
905    /// Returns a new `ParseError` with the description modified by `f`.
906    ///
907    /// Can be used, e.g., to prepend context to the description.
908    pub fn map_description<F>(self, f: F) -> Self
909    where
910        F: FnOnce(&str) -> String,
911    {
912        let mut inner = self.0;
913        let description = f(&inner.description);
914        inner.description = description;
915        Self(inner)
916    }
917
918    pub fn get_error_tag(&self) -> Option<String> {
919        self.0.get_error_tag()
920    }
921}
922
923#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
924pub struct ParseErrorInner {
925    /// Error description.
926    description: String,
927
928    /// Event number relative to the start of the stream.
929    ///
930    /// An input stream is a series data change events (row insertions,
931    /// deletions, and updates).  This field specifies the index (starting
932    /// from 1) of the event that caused the error, relative to the start of
933    /// the stream.  In some cases this index cannot be identified, e.g., if
934    /// the error makes an entire block of events unparseable.
935    event_number: Option<u64>,
936
937    /// Field that failed to parse.
938    ///
939    /// Only set when the parsing error can be attributed to a
940    /// specific field.
941    field: Option<String>,
942
943    /// Invalid fragment of input data.
944    ///
945    /// Used for binary data formats and for text-based formats when the input
946    /// is not valid UTF-8 string.
947    invalid_bytes: Option<Vec<u8>>,
948
949    /// Invalid fragment of the input text.
950    ///
951    /// Only used for text-based formats and in cases when input is valid UTF-8.
952    invalid_text: Option<String>,
953
954    /// Any additional information that may help fix the problem, e.g., example
955    /// of a valid input.
956    suggestion: Option<Cow<'static, str>>,
957
958    /// Used for rate limiting
959    tag: Option<String>,
960}
961
962impl Display for ParseErrorInner {
963    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
964        let event = if let Some(event_number) = self.event_number {
965            format!(" (event #{})", event_number)
966        } else {
967            String::new()
968        };
969
970        let invalid_fragment = if let Some(invalid_bytes) = &self.invalid_bytes {
971            format!("\nInvalid bytes: {invalid_bytes:?}")
972        } else if let Some(invalid_text) = &self.invalid_text {
973            format!("\nInvalid fragment: '{invalid_text}'")
974        } else {
975            String::new()
976        };
977
978        let suggestion = if let Some(suggestion) = &self.suggestion {
979            format!("\n{suggestion}")
980        } else {
981            String::new()
982        };
983
984        write!(
985            f,
986            "Parse error{event}: {}{invalid_fragment}{suggestion}",
987            self.description
988        )
989    }
990}
991
992impl ParseErrorInner {
993    pub fn new(
994        description: String,
995        event_number: Option<u64>,
996        field: Option<String>,
997        invalid_text: Option<&str>,
998        invalid_bytes: Option<&[u8]>,
999        suggestion: Option<Cow<'static, str>>,
1000        error_tag: Option<String>,
1001    ) -> Self {
1002        Self {
1003            description,
1004            event_number,
1005            field,
1006            invalid_text: invalid_text.map(str::to_string),
1007            invalid_bytes: invalid_bytes.map(ToOwned::to_owned),
1008            suggestion,
1009            tag: error_tag,
1010        }
1011    }
1012
1013    /// Error parsing an individual event in a text-based input format (e.g.,
1014    /// JSON, CSV).
1015    pub fn text_event_error<E>(
1016        msg: &str,
1017        error: E,
1018        event_number: u64,
1019        invalid_text: Option<&str>,
1020        suggestion: Option<Cow<'static, str>>,
1021    ) -> Self
1022    where
1023        E: ToString,
1024    {
1025        let err_str = error.to_string();
1026        // Try to parse the error as `FieldParseError`.  If this is not a field-specific
1027        // error or the error was not returned by the `deserialize_table_record`
1028        // macro, this will fail and we'll store the error as is.
1029        let (descr, field) = if let Some(offset) = err_str.find("{\"field\":") {
1030            if let Some(Ok(err)) = serde_json::Deserializer::from_str(&err_str[offset..])
1031                .into_iter::<FieldParseError>()
1032                .next()
1033            {
1034                (err.description, Some(err.field))
1035            } else {
1036                (err_str, None)
1037            }
1038        } else {
1039            (err_str, None)
1040        };
1041        let column_name = if let Some(field) = &field {
1042            format!(": error parsing field '{field}'")
1043        } else {
1044            String::new()
1045        };
1046
1047        Self::new(
1048            format!("{msg}{column_name}: {descr}",),
1049            Some(event_number),
1050            field,
1051            invalid_text,
1052            None,
1053            suggestion,
1054            Some("text_event_err".to_string()),
1055        )
1056    }
1057
1058    /// Error parsing a container, e.g., a JSON array, with multiple events.
1059    ///
1060    /// Such errors cannot be attributed to an individual event numbers.
1061    pub fn text_envelope_error(
1062        description: String,
1063        invalid_text: &str,
1064        suggestion: Option<Cow<'static, str>>,
1065    ) -> Self {
1066        Self::new(
1067            description,
1068            None,
1069            None,
1070            Some(invalid_text),
1071            None,
1072            suggestion,
1073            Some("text_envelope_err".to_string()),
1074        )
1075    }
1076
1077    /// Error parsing an individual event in a binary input format (e.g.,
1078    /// bincode).
1079    pub fn bin_event_error(
1080        description: String,
1081        event_number: u64,
1082        invalid_bytes: &[u8],
1083        suggestion: Option<Cow<'static, str>>,
1084    ) -> Self {
1085        Self::new(
1086            description,
1087            Some(event_number),
1088            None,
1089            None,
1090            Some(invalid_bytes),
1091            suggestion,
1092            Some("bin_event_err".to_string()),
1093        )
1094    }
1095
1096    /// Error parsing a container with multiple events.
1097    ///
1098    /// Such errors cannot be attributed to an individual event numbers.
1099    pub fn bin_envelope_error(
1100        description: String,
1101        invalid_bytes: &[u8],
1102        suggestion: Option<Cow<'static, str>>,
1103    ) -> Self {
1104        Self::new(
1105            description,
1106            None,
1107            None,
1108            None,
1109            Some(invalid_bytes),
1110            suggestion,
1111            Some("bin_envelope_err".to_string()),
1112        )
1113    }
1114
1115    pub fn get_error_tag(&self) -> Option<String> {
1116        self.tag.clone()
1117    }
1118}