Skip to main content

feldera_adapterlib/
transport.rs

1use anyhow::{Error as AnyError, Result as AnyResult};
2use chrono::{DateTime, Utc};
3use dyn_clone::DynClone;
4use feldera_types::adapter_stats::ConnectorHealth;
5use feldera_types::config::FtModel;
6use feldera_types::coordination::Completion;
7use feldera_types::program_schema::Relation;
8use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value as JsonValue;
12use std::collections::VecDeque;
13use std::fmt::Display;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::mpsc::UnboundedReceiver;
18use tokio::sync::mpsc::error::TryRecvError;
19use xxhash_rust::xxh3::Xxh3Default;
20
21use crate::PipelineState;
22use crate::catalog::InputCollectionHandle;
23use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
24use crate::metrics::ConnectorMetrics;
25
26/// Step number for fault-tolerant circuits.
27///
28/// The step number increases by 1 each time the circuit runs; that is, it
29/// tracks the global clock for the outermost circuit.  The first step is
30/// numbered zero.
31///
32/// A [fault-tolerant](crate#fault-tolerance) output transport divides output
33/// into steps numbered sequentially.  If a given step is written multiple
34/// times, the endpoint must discard the later writes.
35pub type Step = u64;
36
37/// A configured input endpoint.
38pub trait InputEndpoint: Send {
39    /// This endpoint's level of fault tolerance, if any:
40    ///
41    /// - An endpoint that returns `None` does not support suspend and resume or
42    ///   any kind of fault tolerance and has no further constraints.
43    ///
44    /// - An endpoint that returns `Some(FtModel::AtLeastOnce)` can support
45    ///   suspend and resume and at-least-once fault tolerance.  Such an
46    ///   endpoint must pass `Some(Resume::*)` to [InputConsumer::extended] for
47    ///   at least some steps (see [Resume] for details).
48    ///
49    /// - An endpoint that returns `Some(FtModel::ExactlyOnce)` can support
50    ///   suspend and resume, at-least-once fault tolerance, and exactly once
51    ///   fault tolerance.  Such an endpoint must pass `Some(Resume::Replay
52    ///   {..})` to [InputConsumer::extended] for every step (see [Resume] for
53    ///   details).
54    fn fault_tolerance(&self) -> Option<FtModel>;
55}
56
57pub trait TransportInputEndpoint: InputEndpoint {
58    /// Creates a new input endpoint. The endpoint should use `parser` to parse
59    /// data into records. Returns an [`InputReader`] for reading the endpoint's
60    /// data.  The endpoint will use `consumer` to report its progress.
61    ///
62    /// The `resume_info` parameter is used when resuming the pipeline from a
63    /// checkpoint. It contains resume metadata that the endpoint returned via the
64    /// [`InputConsumer::extended`] function before suspending. When specified,
65    /// it tells a fault-tolerant input reader to seek past the data already read
66    /// in the step whose metadata is given by the value.
67    ///
68    /// The reader is initially paused.
69    fn open(
70        &self,
71        consumer: Box<dyn InputConsumer>,
72        parser: Box<dyn Parser>,
73        schema: Relation,
74        resume_info: Option<JsonValue>,
75    ) -> AnyResult<Box<dyn InputReader>>;
76}
77
78#[doc(hidden)]
79pub trait IntegratedInputEndpoint: InputEndpoint {
80    fn open(
81        self: Box<Self>,
82        input_handle: &InputCollectionHandle,
83        resume_info: Option<JsonValue>,
84    ) -> AnyResult<Box<dyn InputReader>>;
85}
86
87/// Commands for an [InputReader] to execute.
88///
89/// # Transitions
90///
91/// The following diagram shows the possible order in which the controller can
92/// issue commands to [InputReader]s:
93///
94/// ```text
95///   ┌─⯇─ (start) ─⯈──┐
96///   │      │         │
97///   │      │  ┌───┐  │
98///   │      ▼  ▼   │  │
99///   ├─⯇─ Replay ──┘  │
100///   │      │         │
101///   │      ▼         │
102///   ├─⯇─ Extend⯇─────┤
103///   │      │         │
104///   │      │ ┌───┐   │
105///   │      ▼ ▼   │   │
106///   ├─⯇─ Queue ──┘   │
107///   │      │         │
108///   │      ▼         │
109///   ├─⯇─ Pause ─⯈────┘
110///   │      │
111///   │      ▼
112///   └───⯈Disconnect
113///          │
114///          ▼
115///        (end)
116/// ```
117///
118/// # Stalls
119///
120/// When the controller issues a [InputReaderCommand::Replay] or
121/// [InputReaderCommand::Queue] command to an input adapter, it waits for the
122/// input adapter to respond to them.  Until it receives a reply, the next step
123/// cannot proceed. An input adapter that does not respond to one of these
124/// commands will stall the entire pipeline.  However, the controller also uses
125/// [InputReader::is_closed] to detect that an input adapter has died due to an
126/// error or reaching end-of-input, so input adapters for which it is difficult
127/// to handle errors gracefully can report that they have died using
128/// `is_closed`, if necessary, as described in more detail below.
129///
130/// ## End-of-input handling
131///
132/// If an input adapter reaches the end of its input, and it isn't implemented
133/// to wait for and pass along further input, then it should:
134///
135/// - Make sure that it has already indicated that it has buffered all of its
136///   data, via [InputConsumer::buffered].
137///
138/// - Call [InputConsumer::eoi] to indicate that it has reached end of input.
139///
140/// - Respond to [InputReaderCommand::Queue] until it has queued all of its
141///   input and has none left.
142///
143/// - Optionally, at this point, it may exit and start returning `true` from
144///   `InputReader::is_closed`.
145///
146/// ## Error handling
147///
148/// If an input adapter encounters a fatal error that keeps it from continuing
149/// to obtain input, then it should report the error via [InputConsumer::error]
150/// with `true` for `fatal`.  Afterward, it may exit and start returning `true`
151/// from `InputReader::is_closed`.
152///
153/// ## Additional requirement
154///
155/// An input adapter should ensure that, if it flushes any records to the
156/// circuit in response to [InputReaderCommand::Replay] or
157/// [InputReaderCommand::Queue], then it finishes up and responds to the
158/// consumer using [InputConsumer::replayed] or [InputConsumer::extended],
159/// respectively.  If it instead dies mid-way, then the controller will not
160/// record the step properly and fault tolerance replay will be incorrect.
161#[derive(Debug)]
162pub enum InputReaderCommand {
163    /// Tells the input reader to replay the step described by `metadata` and
164    /// `data` by reading and flushing buffers for the data in the step, and
165    /// then [InputConsumer::replayed] to signal completion.
166    ///
167    /// The input reader should report the data that it queues to
168    /// [InputConsumer::buffered] as it does the replay.
169    ///
170    /// The input reader doesn't have to process other commands while it does
171    /// the replay.
172    ///
173    /// # Constraints
174    ///
175    /// Only fault-tolerant input readers need to accept this. It will be issued
176    /// zero or more times, before any other command.
177    Replay { metadata: JsonValue, data: RmpValue },
178
179    /// Tells the input reader to accept further input. The first time it
180    /// receives this command, the reader should start from the resume point
181    /// passed as `resume_info` when the endpoint was opened, if any, and
182    /// otherwise from the beginning of input.
183    ///
184    /// The input reader should report the data that it queues to
185    /// [InputConsumer::buffered] as it queues it.
186    ///
187    /// # Constraints
188    ///
189    /// The controller will not call this function:
190    ///
191    /// - Twice on a given reader without an intervening
192    ///   [InputReaderCommand::Pause].
193    ///
194    /// - If it requested a replay (with [InputReaderCommand::Replay]) and the reader
195    ///   hasn't yet reported that the replay is complete.
196    Extend,
197
198    /// Tells the input reader to stop reading more input.
199    ///
200    /// The controller uses this to limit the number of buffered records and to
201    /// respond to user requests to pause the pipeline.
202    ///
203    /// # Constraints
204    ///
205    /// The controller issues this only after a paired
206    /// [InputReaderCommand::Extend].
207    Pause,
208
209    /// Tells the input reader to flush input buffers to the circuit.
210    ///
211    /// The input reader can call [InputConsumer::max_batch_size] to find out
212    /// how many records it should flush. When it's done, it must call
213    /// [InputConsumer::extended] to report it.
214    ///
215    /// The `checkpoint_requested` flag indicates that the controller is trying
216    /// to checkpoint or suspend the pipeline. This serves as a hint to the reader
217    /// to try to clear the checkpoint barrier by returning [Resume::Seek] or
218    /// [Resume::Replay] if possible. For instance, if the reader has multiple
219    /// buffers queued, it can choose to stop flushing them after reaching the first
220    /// buffer that corresponds to a seekable position in the input stream.
221    ///
222    /// # Constraints
223    ///
224    /// The controller won't issue this command before it first issues [InputReaderCommand::Extend].
225    Queue { checkpoint_requested: bool },
226
227    /// Tells the reader it's going to be dropped soon and should clean up.
228    ///
229    /// The reader can continue to queue some data buffers afterward if that's
230    /// the easiest implementation.
231    ///
232    /// # Constraints
233    ///
234    /// The controller calls this only once and won't call any other functions
235    /// for a given reader after it calls this one.
236    Disconnect,
237}
238
239impl InputReaderCommand {
240    /// Returns this command translated to a [NonFtInputReaderCommand], or
241    /// `None` if that is not possible (because this command is only for
242    /// fault-tolerant endpoints).
243    pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
244        match self {
245            InputReaderCommand::Replay { .. } => None,
246            InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
247            InputReaderCommand::Extend => {
248                Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
249            }
250            InputReaderCommand::Pause => {
251                Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
252            }
253            InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
254                PipelineState::Terminated,
255            )),
256        }
257    }
258}
259
260/// A subset of [InputReaderCommand] that only includes the commands for
261/// non-fault-tolerant connectors.
262#[derive(Debug)]
263pub enum NonFtInputReaderCommand {
264    /// Equivalent to [InputReaderCommand::Queue].
265    Queue,
266
267    /// Equivalencies:
268    ///
269    /// - `Transition(PipelineState::Paused)`: [InputReaderCommand::Pause].
270    ///
271    /// - `Transition(PipelineState::Running)`: [InputReaderCommand::Extend].
272    ///
273    /// - `Transition(PipelineState::Terminated)`: [InputReaderCommand::Disconnect].
274    Transition(PipelineState),
275}
276
277#[doc(hidden)]
278pub struct InputQueueEntry<A, B> {
279    /// Data buffer to push to the circuit.
280    buffer: Option<B>,
281
282    /// Time when data in this buffer was received from the transport endpoint.
283    /// It is used to track the processing latency in different stages of the pipeline.
284    timestamp: DateTime<Utc>,
285
286    /// Start a transaction with the given label before pushing the buffer to the circuit
287    /// unless a transaction is already in progress.
288    start_transaction: Option<Option<String>>,
289
290    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
291    commit_transaction: bool,
292
293    /// Auxiliary data associated with the buffer.
294    aux: A,
295}
296
297impl<A, B> InputQueueEntry<A, B> {
298    #[doc(hidden)]
299    pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
300        Self {
301            buffer: None,
302            timestamp,
303            start_transaction: None,
304            commit_transaction: false,
305            aux,
306        }
307    }
308
309    #[doc(hidden)]
310    pub fn with_buffer(self, buffer: Option<B>) -> Self {
311        Self { buffer, ..self }
312    }
313
314    /// Start a transaction with the given label before pushing the buffer to the circuit
315    /// unless a transaction is already in progress.
316    pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
317        Self {
318            start_transaction,
319            ..self
320        }
321    }
322
323    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
324    pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
325        Self {
326            commit_transaction,
327            ..self
328        }
329    }
330}
331
332/// A thread-safe queue for collecting and flushing input buffers.
333///
334/// Commonly used by `InputReader` implementations for staging buffers from
335/// worker threads.
336pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
337    #[allow(clippy::type_complexity)]
338    pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
339    pub consumer: Box<dyn InputConsumer>,
340    pub transaction_in_progress: AtomicBool,
341}
342
343impl<A, B: InputBuffer> InputQueue<A, B> {
344    pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
345        Self {
346            queue: Mutex::new(VecDeque::new()),
347            consumer,
348            transaction_in_progress: AtomicBool::new(false),
349        }
350    }
351
352    pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
353        self.consumer.parse_errors(errors);
354        let len = entry
355            .buffer
356            .as_ref()
357            .map_or(BufferSize::empty(), |buffer| buffer.len());
358
359        let mut queue = self.queue.lock().unwrap();
360        queue.push_back(entry);
361        self.consumer.buffered(len);
362
363        // The endpoint pushed an empty buffer. This likely indicates that the accompanying aux data
364        // needs to be processed by the endpoint after preceding buffers have been flushed. However,
365        // since we didn't report any buffered records, the controller may never perform another step,
366        // so we nudge it to do it.
367        if len.records == 0 {
368            self.consumer.request_step();
369        }
370    }
371
372    /// Appends `buffer`, to the queue, and associates it with `aux`.  Reports
373    /// to the controller that `errors` have occurred during parsing.
374    pub fn push_with_aux(
375        &self,
376        (buffer, errors): (Option<B>, Vec<ParseError>),
377        timestamp: DateTime<Utc>,
378        aux: A,
379    ) {
380        let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);
381
382        self.push_entry(entry, errors);
383    }
384
385    /// Flushes a batch of records to the circuit and returns the auxiliary data
386    /// that was associated with those records.
387    ///
388    /// This always flushes whole buffers to the circuit (with `flush`),
389    /// since auxiliary data is associated with a whole buffer rather than with
390    /// individual records. If the auxiliary data type `A` is `()`, then
391    /// [InputQueue<()>::flush] avoids that and so is a better choice.
392    #[allow(clippy::type_complexity)]
393    pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
394        self.flush_with_aux_until(&|_| false)
395    }
396
397    /// Flushes a batch of records to the circuit and returns the auxiliary data
398    /// that was associated with those records.
399    ///
400    /// Stops after flushing at least `max_batch_size` records or after flushing a
401    /// buffer whose auxiliary data satisfies the `stop_at` predicate, whichever
402    /// happens first.
403    ///
404    /// This always flushes whole buffers to the circuit (with `flush`),
405    /// since auxiliary data is associated with a whole buffer rather than with
406    /// individual records. If the auxiliary data type `A` is `()`, then
407    /// [InputQueue<()>::flush] avoids that and so is a better choice.
408    #[allow(clippy::type_complexity)]
409    pub fn flush_with_aux_until(
410        &self,
411        stop_at: &dyn Fn(&A) -> bool,
412    ) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
413        let mut total = BufferSize::empty();
414        let mut hasher = self.consumer.hasher();
415        let n = self.consumer.max_batch_size();
416        let mut consumed_aux = Vec::new();
417
418        let mut stop = false;
419
420        while !stop && total.records < n {
421            let Some(InputQueueEntry {
422                buffer,
423                timestamp,
424                aux,
425                start_transaction,
426                commit_transaction,
427            }) = self.queue.lock().unwrap().pop_front()
428            else {
429                break;
430            };
431
432            if let Some(label) = start_transaction {
433                self.start_transaction(label.as_deref());
434            }
435
436            if let Some(mut buffer) = buffer {
437                total += buffer.len();
438                if let Some(hasher) = hasher.as_mut() {
439                    buffer.hash(hasher);
440                }
441                buffer.flush();
442            }
443
444            stop = stop_at(&aux);
445            consumed_aux.push((timestamp, aux));
446
447            if commit_transaction && self.commit_transaction() {
448                break;
449            }
450        }
451
452        // Process any entries with aux data only.
453        let mut queue = self.queue.lock().unwrap();
454        while !stop
455            && queue
456                .front()
457                .is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
458        {
459            let Some(InputQueueEntry {
460                timestamp,
461                aux,
462                start_transaction,
463                commit_transaction,
464                ..
465            }) = queue.pop_front()
466            else {
467                break;
468            };
469
470            if let Some(label) = start_transaction {
471                self.start_transaction(label.as_deref());
472            }
473
474            stop = stop_at(&aux);
475            consumed_aux.push((timestamp, aux));
476
477            if commit_transaction && self.commit_transaction() {
478                break;
479            }
480        }
481
482        (total, hasher, consumed_aux)
483    }
484
485    pub fn len(&self) -> usize {
486        self.queue.lock().unwrap().len()
487    }
488
489    pub fn is_empty(&self) -> bool {
490        self.len() == 0
491    }
492
493    fn start_transaction(&self, label: Option<&str>) -> bool {
494        if self
495            .transaction_in_progress
496            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
497            .is_ok()
498        {
499            self.consumer.start_transaction(label);
500            true
501        } else {
502            false
503        }
504    }
505
506    fn commit_transaction(&self) -> bool {
507        if self
508            .transaction_in_progress
509            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
510            .is_ok()
511        {
512            self.consumer.commit_transaction();
513            true
514        } else {
515            false
516        }
517    }
518}
519
520impl InputQueue<(), Box<dyn InputBuffer>> {
521    /// Appends `buffer`, if nonempty,` to the queue.  Reports to the controller
522    /// that `errors` occurred during parsing.
523    pub fn push(
524        &self,
525        (buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
526        timestamp: DateTime<Utc>,
527    ) {
528        self.push_with_aux((buffer, errors), timestamp, ())
529    }
530
531    /// Flushes a batch of records to the circuit and reports to the consumer
532    /// that it was done.
533    ///
534    /// Only non-fault-tolerant input adapters can use this.
535    pub fn queue(&self) {
536        let mut total = BufferSize::empty();
537        let n = self.consumer.max_batch_size();
538        let mut consumed = Vec::new();
539
540        while total.records < n {
541            let Some(InputQueueEntry {
542                buffer,
543                timestamp,
544                start_transaction,
545                commit_transaction,
546                ..
547            }) = self.queue.lock().unwrap().pop_front()
548            else {
549                break;
550            };
551
552            if let Some(label) = start_transaction
553                && self
554                    .transaction_in_progress
555                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
556                    .is_ok()
557            {
558                self.consumer.start_transaction(label.as_deref());
559            }
560
561            if let Some(mut buffer) = buffer {
562                let mut taken = buffer.take_some(n - total.records);
563                total += taken.len();
564                consumed.push(Watermark::new(timestamp, None));
565                taken.flush();
566                drop(taken);
567                if !buffer.is_empty() {
568                    self.queue.lock().unwrap().push_front(InputQueueEntry {
569                        buffer: Some(buffer),
570                        timestamp,
571                        start_transaction: None,
572                        commit_transaction,
573                        aux: (),
574                    });
575                    break;
576                }
577            }
578
579            if commit_transaction {
580                if self
581                    .transaction_in_progress
582                    .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
583                    .is_ok()
584                {
585                    self.consumer.commit_transaction();
586                }
587                break;
588            }
589        }
590        self.consumer.extended(total, None, consumed);
591    }
592}
593
594/// Reads data from an endpoint.
595///
596/// Use [`TransportInputEndpoint::open`] to obtain an [`InputReader`].
597pub trait InputReader: Send + Sync {
598    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
599
600    /// Requests the input reader to execute `command`.
601    fn request(&self, command: InputReaderCommand);
602
603    /// Returns true if the endpoint is closed, meaning that it has already
604    /// acted on all of the commands that it ever will. A closed endpoint can be
605    /// one that came to the end of its input (and is not waiting for more to
606    /// arrive) or one that encountered a fatal error and cannot continue.
607    ///
608    /// An endpoint is often implemented in terms of a channel to a thread. In
609    /// such a case, this can be implemented in terms of `is_closed` on the
610    /// channel's sender.
611    fn is_closed(&self) -> bool;
612
613    fn replay(&self, metadata: JsonValue, data: RmpValue) {
614        self.request(InputReaderCommand::Replay { metadata, data });
615    }
616
617    fn extend(&self) {
618        self.request(InputReaderCommand::Extend);
619    }
620
621    fn pause(&self) {
622        self.request(InputReaderCommand::Pause);
623    }
624
625    fn queue(&self, checkpoint_requested: bool) {
626        self.request(InputReaderCommand::Queue {
627            checkpoint_requested,
628        });
629    }
630
631    fn disconnect(&self) {
632        self.request(InputReaderCommand::Disconnect);
633    }
634
635    /// Returns the approximate amount of memory used by the connector's
636    /// underlying implementation.  For the Kafka connectors, for example, this
637    /// is the amount of memory used by librdkafka.  Not all connectors use a
638    /// substantial amount of memory, so the default implementation returns 0.
639    fn memory(&self) -> usize {
640        0
641    }
642}
643
644/// Position in an input stream, including the timestamp when the data was ingested
645/// from the transport endpoint and transport-specific metadata such as delta table
646/// version or Kafka partition offsets.
647#[derive(Clone, Debug)]
648pub struct Watermark {
649    pub timestamp: DateTime<Utc>,
650    pub metadata: Option<JsonValue>,
651}
652
653impl Watermark {
654    pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
655        Self {
656            timestamp,
657            metadata,
658        }
659    }
660}
661
662/// Input stream consumer.
663///
664/// A transport endpoint pushes binary data downstream via an instance of this
665/// trait.
666pub trait InputConsumer: Send + Sync + DynClone {
667    /// Returns the maximum number of records that an `InputReader` should queue
668    /// in response to a [InputReaderCommand::Queue] command.
669    ///
670    /// Nothing keeps the endpoint from queuing more than this if necessary (for
671    /// example, if for the sake of lateness it needs to group more than this
672    /// number of records together).
673    fn max_batch_size(&self) -> usize;
674
675    /// Returns the level of fault tolerance that the pipeline supports, if any.
676    ///
677    /// An endpoint only needs to implement `min(endpoint_ft, pipeline_ft)`
678    /// fault tolerance, where `endpoint_ft` is what the endpoint returns from
679    /// `InputEndpoint::fault_tolerance` and `pipeline_ft` is what this function
680    /// returns.  For example, if an input adapter supports
681    /// `Some(FtModel::ExactlyOnce)`, but the pipeline's fault tolerance level
682    /// is `None`, then the input adapter can simply pass `None` as `resume` to
683    /// [InputConsumer::extended].  This optimization is, probably, worthwhile
684    /// only to input adapters that log a copy of all of their data, instead of
685    /// just metadata.
686    fn pipeline_fault_tolerance(&self) -> Option<FtModel>;
687
688    /// Returns a hasher, if the fault tolerance model calls for hashing, and
689    /// `None` otherwise.
690    ///
691    /// This is just a convenience method.  Connectors can do hashing any way
692    /// they like, as long as they do it the same way for new data and for
693    /// replays.
694    fn hasher(&self) -> Option<Xxh3Default> {
695        match self.pipeline_fault_tolerance() {
696            Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
697            _ => None,
698        }
699    }
700
701    /// Reports `errors` as parse errors.
702    fn parse_errors(&self, errors: Vec<ParseError>);
703
704    /// Reports that the input adapter has internally buffered `amt` records and
705    /// bytes.
706    ///
707    /// Fault-tolerant input adapters should report buffered data during replay
708    /// as well as in normal operation.
709    fn buffered(&self, amt: BufferSize);
710
711    /// Reports that the input adapter has completed flushing `amt` data to the
712    /// circuit, that hash to `hash`, in response to an
713    /// [InputReaderCommand::Replay] request.
714    ///
715    /// Only a fault-tolerant input adapter will invoke this.
716    fn replayed(&self, amt: BufferSize, hash: u64);
717
718    /// Reports that the input adapter has completed flushing `amt` data to the
719    /// circuit, that hash to `hash`, in response to an
720    /// [InputReaderCommand::Queue] request.
721    ///
722    /// If the step is one that the input adapter can restart after, or replay,
723    /// then it should supply that as `resume` (see [Resume] for details).
724    fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);
725
726    /// Reports that the endpoint has reached end of input and that no more data
727    /// will be received from the endpoint.
728    ///
729    /// If the endpoint has already indicated that it has buffered records then
730    /// the controller will request them in future [InputReaderCommand::Queue]
731    /// messages. The endpoint must not make further calls to
732    /// [InputConsumer::buffered] or [InputConsumer::parse_errors].
733    fn eoi(&self);
734
735    /// Request the controller to schedule a step even if the connector hasn't queued
736    /// any records.
737    fn request_step(&self);
738
739    /// The connector is initiating a transaction. `label` is an optional label for
740    /// the transaction for debugging purposes.
741    ///
742    /// This function can be invoked in response to a `Queue` command.
743    ///
744    /// Any updates pushed by the connector after this function is invoked will be part
745    /// of the transaction.
746    ///
747    /// The connector _must_ perform a matching call to `commit_transaction` to commit the
748    /// transaction.
749    ///
750    /// Multiple connectors can initiate a transaction concurrently, in which case their
751    /// updates will be combined into a single transaction. The transaction will be committed
752    /// when all the connectors have committed it.
753    fn start_transaction(&self, label: Option<&str>);
754
755    /// The connector is committing a transaction started by a previous `start_transaction` call.
756    ///
757    /// This function can be invoked in response to a `Queue` command after pushing all updates that
758    /// belong to the the transaction, immediately before calling `extended`. The connector cannot
759    /// queue any more updates after this function is invoked, until the next `Queue` command.
760    fn commit_transaction(&self);
761
762    /// Register connector-specific metrics for Prometheus export.
763    ///
764    /// A connector may call this once during [`TransportInputEndpoint::open`]
765    /// to provide an [`Arc<dyn ConnectorMetrics>`] whose [`ConnectorMetrics::metrics`]
766    /// will be polled on every scrape.  The default implementation is a no-op,
767    /// so connectors that have no custom metrics need not override it.
768    fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
769
770    /// Returns a watch receiver that fires each time pipeline step processing
771    /// completes.  The value is the count of fully-processed steps
772    /// (`total_completed_steps`): a record ingested in step `n` is done when
773    /// the value exceeds `n`.
774    ///
775    /// Input adapters can use this to defer acknowledgment (e.g. a CDC
776    /// connector holding a replication slot) until their data has been
777    /// processed by the circuit and all output connectors.
778    ///
779    /// Returns `None` if the consumer does not support completion tracking.
780    fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;
781
782    /// Returns a watch receiver that fires each time a durable checkpoint
783    /// completes.  The value is the count of checkpointed steps: a record
784    /// ingested in step `n` is durably stored when the value exceeds `n`.
785    ///
786    /// Input adapters that require at-least-once delivery stronger than step
787    /// completion (e.g. a CDC connector that must not advance its replication
788    /// slot past the last checkpoint) can wait on this rather than on
789    /// [`completion_watcher`][Self::completion_watcher].
790    ///
791    /// Returns `Some` only when fault tolerance is enabled for the pipeline
792    /// (which implies storage is configured and checkpoints are scheduled).
793    /// Returns `None` otherwise, in which case the connector should fall back
794    /// to [`completion_watcher`][Self::completion_watcher].
795    fn checkpoint_watcher(&self) -> Option<tokio::sync::watch::Receiver<u64>> {
796        None
797    }
798
799    /// Endpoint failed.
800    ///
801    /// Reports that the endpoint failed and that it will not queue any more
802    /// data.
803    ///
804    /// Optional tag that can be used for additional context
805    /// e.g. for rate limiting
806    fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);
807
808    /// Updates the health status of the connector.
809    fn update_connector_health(&self, health: ConnectorHealth);
810}
811
812/// Information needed to restart after or replay input.
813///
814/// Feldera supports a few ways to checkpoint and resume a pipeline.  These
815/// operations in turn require support from the pipeline's input adapters:
816///
817/// 1. To support suspend and resume, or at-least-once fault tolerance, the
818///    input adapter must indicate, per step, how to restart from just after
819///    that step, by passing `Some(Resume::*)` to [InputConsumer::extended].
820///
821///    Such input adapters might have steps for which seeking would be
822///    impractical.  Such an input adapter may skip over those steps by passing
823///    `Some(Resume::Barrier)` instead; the controller will not try to
824///    checkpoint after them.
825///
826/// 2. To additionally support exactly once fault tolerance, the input adapter
827///    must indicate, per step, both how to restart after the step and how to
828///    replay exactly that step, by passing `Some(Resume::Replay { .. })` to
829///    [InputConsumer::extended].
830///
831///    An input adapter that supports fault tolerance may not skip steps; that
832///    is, it must supply `Some(Resume::Replay { .. })` for every step.
833#[derive(Clone, Debug)]
834pub enum Resume {
835    /// The input adapter does not support resuming after this step.
836    Barrier,
837
838    /// The input adapter can resume just after this step, but it can't replay
839    /// the step exactly.
840    Seek {
841        /// Metadata needed for the controller to restart the input adapter from
842        /// just after this input step.
843        seek: JsonValue,
844    },
845
846    /// The input adapter can replay this step exactly, or resume just after the
847    /// step.
848    ///
849    /// Input adapters can use `seek` and `replay` in different combinations:
850    ///
851    /// - Some kinds of input adapters, for example the ones for files, or for
852    ///   Kafka, can reread the input data that they used before.  These will
853    ///   ordinarily just use `seek`, filling it with a pointer just past the
854    ///   end of the data to be read. (Ordinarily, it would already know where
855    ///   the start is from the previous step, so the start pointer isn't
856    ///   usually needed.)
857    ///
858    ///   These input adapters can just set `replay` to [RmpValue::Nil].
859    ///
860    /// - Other kinds of input connectors can't seek back and reread the input
861    ///   data that they used before. The best example is the HTTP input
862    ///   connector, because which can't ask whatever client connected before to
863    ///   repeat the same exact data that it input before.  These input
864    ///   connectors have to save all the input data for replay, by putting into
865    ///   the `replay` field.
866    ///
867    ///   These input adapters can just set `seek` to [JsonValue::Null].
868    ///
869    ///   (In theory, any input connector could substitute data for metadata,
870    ///   but if the data can simply be reread using the metadata, we usually
871    ///   consider that better because it saves time and space saving all the
872    ///   data when in most cases it will never be reread.)
873    Replay {
874        /// Metadata needed for the controller to restart the input adapter from
875        /// just after this input step.
876        seek: JsonValue,
877
878        /// The data needed for the controller to replay exactly this input step
879        /// using [InputReaderCommand::Replay].
880        replay: RmpValue,
881
882        /// Hash of the input records in this step, for verification on replay.
883        ///
884        /// The input adapter can compute this in any way convenient to it, as
885        /// long as it does so the same way for reading data initially and on
886        /// replay.  On replay, the controller checks that the replayed value
887        /// matches the original one and fails the circuit if it differs.
888        hash: u64,
889    },
890}
891
892impl Resume {
893    pub fn is_barrier(&self) -> bool {
894        matches!(self, Self::Barrier)
895    }
896
897    /// Returns the `seek` value, if any, in this [Resume].
898    pub fn seek(&self) -> Option<&JsonValue> {
899        match self {
900            Resume::Barrier => None,
901            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
902        }
903    }
904
905    /// Consumes this [Resume] and returns just the `seek` value, if any.
906    pub fn into_seek(self) -> Option<JsonValue> {
907        match self {
908            Resume::Barrier => None,
909            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
910        }
911    }
912
913    /// Returns the maximum fault tolerance level that this [Resume] can
914    /// support.
915    pub fn fault_tolerance(&self) -> FtModel {
916        match self {
917            &Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
918            Resume::Replay { .. } => FtModel::ExactlyOnce,
919        }
920    }
921
922    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
923    /// `seek`; otherwise, returns `Resume::Seek` with `seek`.
924    ///
925    /// This is convenient for endpoints that only need to use metadata to
926    /// support journaling. [InputConsumer::hasher] can be a convenient way to
927    /// get a hasher.
928    pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
929        match hash {
930            Some(hash) => Self::Replay {
931                seek,
932                replay: RmpValue::Nil,
933                hash,
934            },
935            None => Self::Seek { seek },
936        }
937    }
938
939    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
940    /// whatever `replay` returns; otherwise, returns `Resume::Seek`.
941    ///
942    /// This is convenient for endpoints that support journaling by journaling
943    /// all the data (and that don't need to journal any metadata).
944    /// [InputConsumer::hasher] can be a convenient way to get a hasher.
945    pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
946    where
947        F: FnOnce() -> RmpValue,
948    {
949        let seek = JsonValue::Null;
950        match hash {
951            Some(hash) => Self::Replay {
952                seek,
953                replay: replay(),
954                hash,
955            },
956            None => Self::Seek { seek },
957        }
958    }
959}
960
961dyn_clone::clone_trait_object!(InputConsumer);
962
963/// Helper function to parse resume info passed to [`InputConsumer::extended`].
964pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
965where
966    M: DeserializeOwned,
967{
968    serde_json_path_to_error::from_value::<M>(metadata.clone())
969            .map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
970}
971
972#[doc(hidden)]
973pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;
974
975/// Command handler API exposed by connectors.
976///
977/// Connectors can support arbitrary connector-specific commands that can be
978/// invoked via the `/command` endpoint. These commands take and return arbitrary
979/// JSON values.
980///
981/// This API is not part of trait `Output[Input]Endpoint` because it can be invoked
982/// from any thread, and requires `Send + Sync`, while the `OutputEndpoint` API is
983/// not `Sync` and is meant to be called from the controller thread only.
984///
985/// The idea is that connectors that support custom commands create separate command
986/// handler objects that implement this trait and are returned by
987/// `OutputEndpoint::command_handler`.
988pub trait CommandHandler: Send + Sync {
989    /// Handle a command specified by the JSON objest.
990    ///
991    /// Fails if the connector does not support the command, the command is invalid,
992    /// or command execution fails.
993    fn command(&self, command: serde_json::Value) -> AnyResult<serde_json::Value>;
994}
995
996/// Distinguishes a full-materialized-view snapshot from an incremental delta
997/// when pushed to an output connector.
998#[derive(Debug, Clone, Copy, Eq, PartialEq)]
999pub enum OutputBatchType {
1000    Delta,
1001    Snapshot,
1002}
1003
1004/// A configured output transport endpoint.
1005///
1006/// Output endpoints come in two flavors:
1007///
1008/// * A [fault-tolerant](crate#fault-tolerance) endpoint accepts output that has
1009///   been divided into numbered steps.  If it is given output associated with a
1010///   step number that has already been output, then it discards the duplicate.
1011///   It must also keep data written to the output transport from becoming
1012///   visible to downstream readers until `batch_end` is called.  (This works
1013///   for output to Kafka, which supports transactional output.  If it is
1014///   difficult for some future fault-tolerant output endpoint, then the API
1015///   could be adjusted to support writing output only after it can become
1016///   immediately visible.)
1017///
1018/// * A non-fault-tolerant endpoint does not have a concept of steps and ignores
1019///   them.
1020pub trait OutputEndpoint: Send {
1021    fn command_handler(&self) -> Option<Arc<dyn CommandHandler>> {
1022        None
1023    }
1024
1025    /// Finishes establishing the connection to the output endpoint.
1026    ///
1027    /// If the endpoint encounters any errors during output, now or later, it
1028    /// invokes `async_error_callback` to notify the client about asynchronous
1029    /// errors, i.e., errors that happen outside the context of the
1030    /// [`OutputEndpoint::push_buffer`] method. For instance, a reliable message
1031    /// bus like Kafka may notify the endpoint about a failure to deliver a
1032    /// previously sent message via an async callback. If the endpoint is unable
1033    /// to handle this error, it must forward it to the client via the
1034    /// `async_error_callback`.  The first argument of the callback is a flag
1035    /// that indicates a fatal error that the endpoint cannot recover from.
1036    fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;
1037
1038    /// Maximum buffer size that this transport can transmit.
1039    /// The encoder should not generate buffers exceeding this size.
1040    fn max_buffer_size_bytes(&self) -> usize;
1041
1042    /// Notifies the output endpoint that data subsequently written by
1043    /// `push_buffer` belong to the given `step`.
1044    ///
1045    /// A [fault-tolerant](crate#fault-tolerance) endpoint has additional
1046    /// requirements:
1047    ///
1048    /// 1. If data for the given step has been written before, the endpoint
1049    ///    should discard it.
1050    ///
1051    /// 2. The output batch must not be made visible to downstream readers
1052    ///    before the next call to `batch_end`.
1053    fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) -> AnyResult<()> {
1054        Ok(())
1055    }
1056
1057    fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;
1058
1059    /// Output a message consisting of a key/value pair, with optional headers.
1060    ///
1061    /// This API is implemented by Kafka and other transports that transmit
1062    /// messages consisting of key and value fields and is invoked by
1063    /// Kafka-specific data formats that rely on this message structure,
1064    /// e.g., Debezium. If a given transport does not implement this API, it
1065    /// should return an error.
1066    ///
1067    /// `headers` contains a list of key/optional_value pairs to be appended
1068    /// to Kafka message headers.
1069    fn push_key(
1070        &mut self,
1071        key: Option<&[u8]>,
1072        val: Option<&[u8]>,
1073        headers: &[(&str, Option<&[u8]>)],
1074    ) -> AnyResult<()>;
1075
1076    /// Notifies the output endpoint that output for the current step is
1077    /// complete.
1078    ///
1079    /// A fault-tolerant output endpoint may now make the output batch visible
1080    /// to readers.
1081    fn batch_end(&mut self) -> AnyResult<()> {
1082        Ok(())
1083    }
1084
1085    /// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
1086    fn is_fault_tolerant(&self) -> bool;
1087
1088    /// Returns the approximate amount of memory used by the connector's
1089    /// underlying implementation.  For the Kafka connectors, for example, this
1090    /// is the amount of memory used by librdkafka.  Not all connectors use a
1091    /// substantial amount of memory, so the default implementation returns 0.
1092    fn memory(&self) -> usize {
1093        0
1094    }
1095}
1096
1097/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.
1098///
1099/// A fault-tolerant connector wants to receive, in order:
1100///
1101/// - Zero or more [InputReaderCommand::Replay]s.
1102///
1103/// - Zero or more other commands.
1104///
1105/// This helps with that.
1106// This is used by Kafka and Nexmark but both of those are optional.
1107pub struct InputCommandReceiver<M, D> {
1108    receiver: UnboundedReceiver<InputReaderCommand>,
1109    buffer: Option<InputReaderCommand>,
1110    _phantom: PhantomData<(M, D)>,
1111}
1112
1113/// Error type returned by some [InputCommandReceiver] methods.
1114///
1115/// We could just use `anyhow` and that would probably be just as good though.
1116#[derive(Debug)]
1117pub enum InputCommandReceiverError {
1118    Disconnected,
1119    JsonDecodeError(serde_json_path_to_error::Error),
1120    RmpDecodeError(RmpDecodeError),
1121}
1122
1123impl std::error::Error for InputCommandReceiverError {}
1124
1125impl Display for InputCommandReceiverError {
1126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1127        match self {
1128            InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
1129            InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
1130            InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
1131        }
1132    }
1133}
1134
1135impl From<RmpDecodeError> for InputCommandReceiverError {
1136    fn from(value: RmpDecodeError) -> Self {
1137        Self::RmpDecodeError(value)
1138    }
1139}
1140
1141impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
1142    fn from(value: serde_json_path_to_error::Error) -> Self {
1143        Self::JsonDecodeError(value)
1144    }
1145}
1146
1147// This is used by Kafka and Nexmark but both of those are optional.
1148impl<M, D> InputCommandReceiver<M, D> {
1149    pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
1150        Self {
1151            receiver,
1152            buffer: None,
1153            _phantom: PhantomData,
1154        }
1155    }
1156
1157    #[doc(hidden)]
1158    pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1159    where
1160        M: for<'a> Deserialize<'a>,
1161        D: for<'a> Deserialize<'a>,
1162    {
1163        let command = self.blocking_recv()?;
1164        self.take_replay(command)
1165    }
1166
1167    #[doc(hidden)]
1168    pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1169    where
1170        M: for<'a> Deserialize<'a>,
1171        D: for<'a> Deserialize<'a>,
1172    {
1173        let command = self.recv().await?;
1174        self.take_replay(command)
1175    }
1176
1177    fn take_replay(
1178        &mut self,
1179        command: InputReaderCommand,
1180    ) -> Result<Option<(M, D)>, InputCommandReceiverError>
1181    where
1182        M: for<'a> Deserialize<'a>,
1183        D: for<'a> Deserialize<'a>,
1184    {
1185        match command {
1186            InputReaderCommand::Replay { metadata, data } => Ok(Some((
1187                serde_json_path_to_error::from_value::<M>(metadata)?,
1188                rmpv::ext::from_value::<D>(data)?,
1189            ))),
1190            other => {
1191                self.put_back(other);
1192                Ok(None)
1193            }
1194        }
1195    }
1196
1197    #[doc(hidden)]
1198    pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1199        match self.buffer.take() {
1200            Some(value) => Ok(value),
1201            None => self
1202                .receiver
1203                .recv()
1204                .await
1205                .ok_or(InputCommandReceiverError::Disconnected),
1206        }
1207    }
1208
1209    #[doc(hidden)]
1210    pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1211        match self.buffer.take() {
1212            Some(value) => Ok(value),
1213            None => self
1214                .receiver
1215                .blocking_recv()
1216                .ok_or(InputCommandReceiverError::Disconnected),
1217        }
1218    }
1219
1220    #[doc(hidden)]
1221    pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
1222        if let Some(command) = self.buffer.take() {
1223            Ok(Some(command))
1224        } else {
1225            match self.receiver.try_recv() {
1226                Ok(command) => Ok(Some(command)),
1227                Err(TryRecvError::Empty) => Ok(None),
1228                Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
1229            }
1230        }
1231    }
1232
1233    #[doc(hidden)]
1234    pub fn put_back(&mut self, value: InputReaderCommand) {
1235        assert!(self.buffer.is_none());
1236        self.buffer = Some(value);
1237    }
1238}