Skip to main content

feldera_adapterlib/
transport.rs

1use anyhow::{Error as AnyError, Result as AnyResult};
2use chrono::{DateTime, Utc};
3use dyn_clone::DynClone;
4use feldera_types::adapter_stats::ConnectorHealth;
5use feldera_types::config::FtModel;
6use feldera_types::coordination::Completion;
7use feldera_types::program_schema::Relation;
8use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value as JsonValue;
12use std::collections::VecDeque;
13use std::fmt::Display;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::mpsc::UnboundedReceiver;
18use tokio::sync::mpsc::error::TryRecvError;
19use xxhash_rust::xxh3::Xxh3Default;
20
21use crate::PipelineState;
22use crate::catalog::InputCollectionHandle;
23use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
24use crate::metrics::ConnectorMetrics;
25
26/// Step number for fault-tolerant circuits.
27///
28/// The step number increases by 1 each time the circuit runs; that is, it
29/// tracks the global clock for the outermost circuit.  The first step is
30/// numbered zero.
31///
32/// A [fault-tolerant](crate#fault-tolerance) output transport divides output
33/// into steps numbered sequentially.  If a given step is written multiple
34/// times, the endpoint must discard the later writes.
35pub type Step = u64;
36
37/// A configured input endpoint.
38pub trait InputEndpoint: Send {
39    /// This endpoint's level of fault tolerance, if any:
40    ///
41    /// - An endpoint that returns `None` does not support suspend and resume or
42    ///   any kind of fault tolerance and has no further constraints.
43    ///
44    /// - An endpoint that returns `Some(FtModel::AtLeastOnce)` can support
45    ///   suspend and resume and at-least-once fault tolerance.  Such an
46    ///   endpoint must pass `Some(Resume::*)` to [InputConsumer::extended] for
47    ///   at least some steps (see [Resume] for details).
48    ///
49    /// - An endpoint that returns `Some(FtModel::ExactlyOnce)` can support
50    ///   suspend and resume, at-least-once fault tolerance, and exactly once
51    ///   fault tolerance.  Such an endpoint must pass `Some(Resume::Replay
52    ///   {..})` to [InputConsumer::extended] for every step (see [Resume] for
53    ///   details).
54    fn fault_tolerance(&self) -> Option<FtModel>;
55}
56
57pub trait TransportInputEndpoint: InputEndpoint {
58    /// Creates a new input endpoint. The endpoint should use `parser` to parse
59    /// data into records. Returns an [`InputReader`] for reading the endpoint's
60    /// data.  The endpoint will use `consumer` to report its progress.
61    ///
62    /// The `resume_info` parameter is used when resuming the pipeline from a
63    /// checkpoint. It contains resume metadata that the endpoint returned via the
64    /// [`InputConsumer::extended`] function before suspending. When specified,
65    /// it tells a fault-tolerant input reader to seek past the data already read
66    /// in the step whose metadata is given by the value.
67    ///
68    /// The reader is initially paused.
69    fn open(
70        &self,
71        consumer: Box<dyn InputConsumer>,
72        parser: Box<dyn Parser>,
73        schema: Relation,
74        resume_info: Option<JsonValue>,
75    ) -> AnyResult<Box<dyn InputReader>>;
76}
77
78#[doc(hidden)]
79pub trait IntegratedInputEndpoint: InputEndpoint {
80    fn open(
81        self: Box<Self>,
82        input_handle: &InputCollectionHandle,
83        resume_info: Option<JsonValue>,
84    ) -> AnyResult<Box<dyn InputReader>>;
85}
86
87/// Commands for an [InputReader] to execute.
88///
89/// # Transitions
90///
91/// The following diagram shows the possible order in which the controller can
92/// issue commands to [InputReader]s:
93///
94/// ```text
95///   ┌─⯇─ (start) ─⯈──┐
96///   │      │         │
97///   │      │  ┌───┐  │
98///   │      ▼  ▼   │  │
99///   ├─⯇─ Replay ──┘  │
100///   │      │         │
101///   │      ▼         │
102///   ├─⯇─ Extend⯇─────┤
103///   │      │         │
104///   │      │ ┌───┐   │
105///   │      ▼ ▼   │   │
106///   ├─⯇─ Queue ──┘   │
107///   │      │         │
108///   │      ▼         │
109///   ├─⯇─ Pause ─⯈────┘
110///   │      │
111///   │      ▼
112///   └───⯈Disconnect
113///          │
114///          ▼
115///        (end)
116/// ```
117///
118/// # Stalls
119///
120/// When the controller issues a [InputReaderCommand::Replay] or
121/// [InputReaderCommand::Queue] command to an input adapter, it waits for the
122/// input adapter to respond to them.  Until it receives a reply, the next step
123/// cannot proceed. An input adapter that does not respond to one of these
124/// commands will stall the entire pipeline.  However, the controller also uses
125/// [InputReader::is_closed] to detect that an input adapter has died due to an
126/// error or reaching end-of-input, so input adapters for which it is difficult
127/// to handle errors gracefully can report that they have died using
128/// `is_closed`, if necessary, as described in more detail below.
129///
130/// ## End-of-input handling
131///
132/// If an input adapter reaches the end of its input, and it isn't implemented
133/// to wait for and pass along further input, then it should:
134///
135/// - Make sure that it has already indicated that it has buffered all of its
136///   data, via [InputConsumer::buffered].
137///
138/// - Call [InputConsumer::eoi] to indicate that it has reached end of input.
139///
140/// - Respond to [InputReaderCommand::Queue] until it has queued all of its
141///   input and has none left.
142///
143/// - Optionally, at this point, it may exit and start returning `true` from
144///   `InputReader::is_closed`.
145///
146/// ## Error handling
147///
148/// If an input adapter encounters a fatal error that keeps it from continuing
149/// to obtain input, then it should report the error via [InputConsumer::error]
150/// with `true` for `fatal`.  Afterward, it may exit and start returning `true`
151/// from `InputReader::is_closed`.
152///
153/// ## Additional requirement
154///
155/// An input adapter should ensure that, if it flushes any records to the
156/// circuit in response to [InputReaderCommand::Replay] or
157/// [InputReaderCommand::Queue], then it finishes up and responds to the
158/// consumer using [InputConsumer::replayed] or [InputConsumer::extended],
159/// respectively.  If it instead dies mid-way, then the controller will not
160/// record the step properly and fault tolerance replay will be incorrect.
161#[derive(Debug)]
162pub enum InputReaderCommand {
163    /// Tells the input reader to replay the step described by `metadata` and
164    /// `data` by reading and flushing buffers for the data in the step, and
165    /// then [InputConsumer::replayed] to signal completion.
166    ///
167    /// The input reader should report the data that it queues to
168    /// [InputConsumer::buffered] as it does the replay.
169    ///
170    /// The input reader doesn't have to process other commands while it does
171    /// the replay.
172    ///
173    /// # Constraints
174    ///
175    /// Only fault-tolerant input readers need to accept this. It will be issued
176    /// zero or more times, before any other command.
177    Replay { metadata: JsonValue, data: RmpValue },
178
179    /// Tells the input reader to accept further input. The first time it
180    /// receives this command, the reader should start from the resume point
181    /// passed as `resume_info` when the endpoint was opened, if any, and
182    /// otherwise from the beginning of input.
183    ///
184    /// The input reader should report the data that it queues to
185    /// [InputConsumer::buffered] as it queues it.
186    ///
187    /// # Constraints
188    ///
189    /// The controller will not call this function:
190    ///
191    /// - Twice on a given reader without an intervening
192    ///   [InputReaderCommand::Pause].
193    ///
194    /// - If it requested a replay (with [InputReaderCommand::Replay]) and the reader
195    ///   hasn't yet reported that the replay is complete.
196    Extend,
197
198    /// Tells the input reader to stop reading more input.
199    ///
200    /// The controller uses this to limit the number of buffered records and to
201    /// respond to user requests to pause the pipeline.
202    ///
203    /// # Constraints
204    ///
205    /// The controller issues this only after a paired
206    /// [InputReaderCommand::Extend].
207    Pause,
208
209    /// Tells the input reader to flush input buffers to the circuit.
210    ///
211    /// The input reader can call [InputConsumer::max_batch_size] to find out
212    /// how many records it should flush. When it's done, it must call
213    /// [InputConsumer::extended] to report it.
214    ///
215    /// The `checkpoint_requested` flag indicates that the controller is trying
216    /// to checkpoint or suspend the pipeline. This serves as a hint to the reader
217    /// to try to clear the checkpoint barrier by returning [Resume::Seek] or
218    /// [Resume::Replay] if possible. For instance, if the reader has multiple
219    /// buffers queued, it can choose to stop flushing them after reaching the first
220    /// buffer that corresponds to a seekable position in the input stream.
221    ///
222    /// # Constraints
223    ///
224    /// The controller won't issue this command before it first issues [InputReaderCommand::Extend].
225    Queue { checkpoint_requested: bool },
226
227    /// Tells the reader it's going to be dropped soon and should clean up.
228    ///
229    /// The reader can continue to queue some data buffers afterward if that's
230    /// the easiest implementation.
231    ///
232    /// # Constraints
233    ///
234    /// The controller calls this only once and won't call any other functions
235    /// for a given reader after it calls this one.
236    Disconnect,
237}
238
239impl InputReaderCommand {
240    /// Returns this command translated to a [NonFtInputReaderCommand], or
241    /// `None` if that is not possible (because this command is only for
242    /// fault-tolerant endpoints).
243    pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
244        match self {
245            InputReaderCommand::Replay { .. } => None,
246            InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
247            InputReaderCommand::Extend => {
248                Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
249            }
250            InputReaderCommand::Pause => {
251                Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
252            }
253            InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
254                PipelineState::Terminated,
255            )),
256        }
257    }
258}
259
260/// A subset of [InputReaderCommand] that only includes the commands for
261/// non-fault-tolerant connectors.
262#[derive(Debug)]
263pub enum NonFtInputReaderCommand {
264    /// Equivalent to [InputReaderCommand::Queue].
265    Queue,
266
267    /// Equivalencies:
268    ///
269    /// - `Transition(PipelineState::Paused)`: [InputReaderCommand::Pause].
270    ///
271    /// - `Transition(PipelineState::Running)`: [InputReaderCommand::Extend].
272    ///
273    /// - `Transition(PipelineState::Terminated)`: [InputReaderCommand::Disconnect].
274    Transition(PipelineState),
275}
276
277#[doc(hidden)]
278pub struct InputQueueEntry<A, B> {
279    /// Data buffer to push to the circuit.
280    buffer: Option<B>,
281
282    /// Time when data in this buffer was received from the transport endpoint.
283    /// It is used to track the processing latency in different stages of the pipeline.
284    timestamp: DateTime<Utc>,
285
286    /// Start a transaction with the given label before pushing the buffer to the circuit
287    /// unless a transaction is already in progress.
288    start_transaction: Option<Option<String>>,
289
290    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
291    commit_transaction: bool,
292
293    /// Auxiliary data associated with the buffer.
294    aux: A,
295}
296
297impl<A, B> InputQueueEntry<A, B> {
298    #[doc(hidden)]
299    pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
300        Self {
301            buffer: None,
302            timestamp,
303            start_transaction: None,
304            commit_transaction: false,
305            aux,
306        }
307    }
308
309    #[doc(hidden)]
310    pub fn with_buffer(self, buffer: Option<B>) -> Self {
311        Self { buffer, ..self }
312    }
313
314    /// Start a transaction with the given label before pushing the buffer to the circuit
315    /// unless a transaction is already in progress.
316    pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
317        Self {
318            start_transaction,
319            ..self
320        }
321    }
322
323    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
324    pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
325        Self {
326            commit_transaction,
327            ..self
328        }
329    }
330}
331
332/// A thread-safe queue for collecting and flushing input buffers.
333///
334/// Commonly used by `InputReader` implementations for staging buffers from
335/// worker threads.
336pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
337    #[allow(clippy::type_complexity)]
338    pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
339    pub consumer: Box<dyn InputConsumer>,
340    pub transaction_in_progress: AtomicBool,
341}
342
343impl<A, B: InputBuffer> InputQueue<A, B> {
344    pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
345        Self {
346            queue: Mutex::new(VecDeque::new()),
347            consumer,
348            transaction_in_progress: AtomicBool::new(false),
349        }
350    }
351
352    pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
353        self.consumer.parse_errors(errors);
354        let len = entry
355            .buffer
356            .as_ref()
357            .map_or(BufferSize::empty(), |buffer| buffer.len());
358
359        let mut queue = self.queue.lock().unwrap();
360        queue.push_back(entry);
361        self.consumer.buffered(len);
362
363        // The endpoint pushed an empty buffer. This likely indicates that the accompanying aux data
364        // needs to be processed by the endpoint after preceding buffers have been flushed. However,
365        // since we didn't report any buffered records, the controller may never perform another step,
366        // so we nudge it to do it.
367        if len.records == 0 {
368            self.consumer.request_step();
369        }
370    }
371
372    /// Appends `buffer`, to the queue, and associates it with `aux`.  Reports
373    /// to the controller that `errors` have occurred during parsing.
374    pub fn push_with_aux(
375        &self,
376        (buffer, errors): (Option<B>, Vec<ParseError>),
377        timestamp: DateTime<Utc>,
378        aux: A,
379    ) {
380        let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);
381
382        self.push_entry(entry, errors);
383    }
384
385    /// Flushes a batch of records to the circuit and returns the auxiliary data
386    /// that was associated with those records.
387    ///
388    /// This always flushes whole buffers to the circuit (with `flush`),
389    /// since auxiliary data is associated with a whole buffer rather than with
390    /// individual records. If the auxiliary data type `A` is `()`, then
391    /// [InputQueue<()>::flush] avoids that and so is a better choice.
392    #[allow(clippy::type_complexity)]
393    pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
394        self.flush_with_aux_until(&|_| false)
395    }
396
397    /// Flushes a batch of records to the circuit and returns the auxiliary data
398    /// that was associated with those records.
399    ///
400    /// Stops after flushing at least `max_batch_size` records or after flushing a
401    /// buffer whose auxiliary data satisfies the `stop_at` predicate, whichever
402    /// happens first.
403    ///
404    /// This always flushes whole buffers to the circuit (with `flush`),
405    /// since auxiliary data is associated with a whole buffer rather than with
406    /// individual records. If the auxiliary data type `A` is `()`, then
407    /// [InputQueue<()>::flush] avoids that and so is a better choice.
408    #[allow(clippy::type_complexity)]
409    pub fn flush_with_aux_until(
410        &self,
411        stop_at: &dyn Fn(&A) -> bool,
412    ) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
413        let mut total = BufferSize::empty();
414        let mut hasher = self.consumer.hasher();
415        let n = self.consumer.max_batch_size();
416        let mut consumed_aux = Vec::new();
417
418        let mut stop = false;
419
420        while !stop && total.records < n {
421            let Some(InputQueueEntry {
422                buffer,
423                timestamp,
424                aux,
425                start_transaction,
426                commit_transaction,
427            }) = self.queue.lock().unwrap().pop_front()
428            else {
429                break;
430            };
431
432            if let Some(label) = start_transaction {
433                self.start_transaction(label.as_deref());
434            }
435
436            if let Some(mut buffer) = buffer {
437                total += buffer.len();
438                if let Some(hasher) = hasher.as_mut() {
439                    buffer.hash(hasher);
440                }
441                buffer.flush();
442            }
443
444            stop = stop_at(&aux);
445            consumed_aux.push((timestamp, aux));
446
447            if commit_transaction && self.commit_transaction() {
448                break;
449            }
450        }
451
452        // Process any entries with aux data only.
453        let mut queue = self.queue.lock().unwrap();
454        while !stop
455            && queue
456                .front()
457                .is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
458        {
459            let Some(InputQueueEntry {
460                timestamp,
461                aux,
462                start_transaction,
463                commit_transaction,
464                ..
465            }) = queue.pop_front()
466            else {
467                break;
468            };
469
470            if let Some(label) = start_transaction {
471                self.start_transaction(label.as_deref());
472            }
473
474            stop = stop_at(&aux);
475            consumed_aux.push((timestamp, aux));
476
477            if commit_transaction && self.commit_transaction() {
478                break;
479            }
480        }
481
482        (total, hasher, consumed_aux)
483    }
484
485    pub fn len(&self) -> usize {
486        self.queue.lock().unwrap().len()
487    }
488
489    pub fn is_empty(&self) -> bool {
490        self.len() == 0
491    }
492
493    fn start_transaction(&self, label: Option<&str>) -> bool {
494        if self
495            .transaction_in_progress
496            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
497            .is_ok()
498        {
499            self.consumer.start_transaction(label);
500            true
501        } else {
502            false
503        }
504    }
505
506    fn commit_transaction(&self) -> bool {
507        if self
508            .transaction_in_progress
509            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
510            .is_ok()
511        {
512            self.consumer.commit_transaction();
513            true
514        } else {
515            false
516        }
517    }
518}
519
520impl InputQueue<(), Box<dyn InputBuffer>> {
521    /// Appends `buffer`, if nonempty,` to the queue.  Reports to the controller
522    /// that `errors` occurred during parsing.
523    pub fn push(
524        &self,
525        (buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
526        timestamp: DateTime<Utc>,
527    ) {
528        self.push_with_aux((buffer, errors), timestamp, ())
529    }
530
531    /// Flushes a batch of records to the circuit and reports to the consumer
532    /// that it was done.
533    ///
534    /// Only non-fault-tolerant input adapters can use this.
535    pub fn queue(&self) {
536        let mut total = BufferSize::empty();
537        let n = self.consumer.max_batch_size();
538        let mut consumed = Vec::new();
539
540        while total.records < n {
541            let Some(InputQueueEntry {
542                buffer,
543                timestamp,
544                start_transaction,
545                commit_transaction,
546                ..
547            }) = self.queue.lock().unwrap().pop_front()
548            else {
549                break;
550            };
551
552            if let Some(label) = start_transaction
553                && self
554                    .transaction_in_progress
555                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
556                    .is_ok()
557            {
558                self.consumer.start_transaction(label.as_deref());
559            }
560
561            if let Some(mut buffer) = buffer {
562                let mut taken = buffer.take_some(n - total.records);
563                total += taken.len();
564                consumed.push(Watermark::new(timestamp, None));
565                taken.flush();
566                drop(taken);
567                if !buffer.is_empty() {
568                    self.queue.lock().unwrap().push_front(InputQueueEntry {
569                        buffer: Some(buffer),
570                        timestamp,
571                        start_transaction: None,
572                        commit_transaction,
573                        aux: (),
574                    });
575                    break;
576                }
577            }
578
579            if commit_transaction {
580                if self
581                    .transaction_in_progress
582                    .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
583                    .is_ok()
584                {
585                    self.consumer.commit_transaction();
586                }
587                break;
588            }
589        }
590        self.consumer.extended(total, None, consumed);
591    }
592}
593
594/// Reads data from an endpoint.
595///
596/// Use [`TransportInputEndpoint::open`] to obtain an [`InputReader`].
597pub trait InputReader: Send + Sync {
598    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
599
600    /// Requests the input reader to execute `command`.
601    fn request(&self, command: InputReaderCommand);
602
603    /// Returns true if the endpoint is closed, meaning that it has already
604    /// acted on all of the commands that it ever will. A closed endpoint can be
605    /// one that came to the end of its input (and is not waiting for more to
606    /// arrive) or one that encountered a fatal error and cannot continue.
607    ///
608    /// An endpoint is often implemented in terms of a channel to a thread. In
609    /// such a case, this can be implemented in terms of `is_closed` on the
610    /// channel's sender.
611    fn is_closed(&self) -> bool;
612
613    fn replay(&self, metadata: JsonValue, data: RmpValue) {
614        self.request(InputReaderCommand::Replay { metadata, data });
615    }
616
617    fn extend(&self) {
618        self.request(InputReaderCommand::Extend);
619    }
620
621    fn pause(&self) {
622        self.request(InputReaderCommand::Pause);
623    }
624
625    fn queue(&self, checkpoint_requested: bool) {
626        self.request(InputReaderCommand::Queue {
627            checkpoint_requested,
628        });
629    }
630
631    fn disconnect(&self) {
632        self.request(InputReaderCommand::Disconnect);
633    }
634
635    /// Returns the approximate amount of memory used by the connector's
636    /// underlying implementation.  For the Kafka connectors, for example, this
637    /// is the amount of memory used by librdkafka.  Not all connectors use a
638    /// substantial amount of memory, so the default implementation returns 0.
639    fn memory(&self) -> usize {
640        0
641    }
642}
643
644/// Position in an input stream, including the timestamp when the data was ingested
645/// from the transport endpoint and transport-specific metadata such as delta table
646/// version or Kafka partition offsets.
647#[derive(Clone, Debug)]
648pub struct Watermark {
649    pub timestamp: DateTime<Utc>,
650    pub metadata: Option<JsonValue>,
651}
652
653impl Watermark {
654    pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
655        Self {
656            timestamp,
657            metadata,
658        }
659    }
660}
661
662/// Input stream consumer.
663///
664/// A transport endpoint pushes binary data downstream via an instance of this
665/// trait.
666pub trait InputConsumer: Send + Sync + DynClone {
667    /// Returns the maximum number of records that an `InputReader` should queue
668    /// in response to a [InputReaderCommand::Queue] command.
669    ///
670    /// Nothing keeps the endpoint from queuing more than this if necessary (for
671    /// example, if for the sake of lateness it needs to group more than this
672    /// number of records together).
673    fn max_batch_size(&self) -> usize;
674
675    /// Returns the level of fault tolerance that the pipeline supports, if any.
676    ///
677    /// An endpoint only needs to implement `min(endpoint_ft, pipeline_ft)`
678    /// fault tolerance, where `endpoint_ft` is what the endpoint returns from
679    /// `InputEndpoint::fault_tolerance` and `pipeline_ft` is what this function
680    /// returns.  For example, if an input adapter supports
681    /// `Some(FtModel::ExactlyOnce)`, but the pipeline's fault tolerance level
682    /// is `None`, then the input adapter can simply pass `None` as `resume` to
683    /// [InputConsumer::extended].  This optimization is, probably, worthwhile
684    /// only to input adapters that log a copy of all of their data, instead of
685    /// just metadata.
686    fn pipeline_fault_tolerance(&self) -> Option<FtModel>;
687
688    /// Returns a hasher, if the fault tolerance model calls for hashing, and
689    /// `None` otherwise.
690    ///
691    /// This is just a convenience method.  Connectors can do hashing any way
692    /// they like, as long as they do it the same way for new data and for
693    /// replays.
694    fn hasher(&self) -> Option<Xxh3Default> {
695        match self.pipeline_fault_tolerance() {
696            Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
697            _ => None,
698        }
699    }
700
701    /// Reports `errors` as parse errors.
702    fn parse_errors(&self, errors: Vec<ParseError>);
703
704    /// Reports that the input adapter has internally buffered `amt` records and
705    /// bytes.
706    ///
707    /// Fault-tolerant input adapters should report buffered data during replay
708    /// as well as in normal operation.
709    fn buffered(&self, amt: BufferSize);
710
711    /// Reports that the input adapter has completed flushing `amt` data to the
712    /// circuit, that hash to `hash`, in response to an
713    /// [InputReaderCommand::Replay] request.
714    ///
715    /// Only a fault-tolerant input adapter will invoke this.
716    fn replayed(&self, amt: BufferSize, hash: u64);
717
718    /// Reports that the input adapter has completed flushing `amt` data to the
719    /// circuit, that hash to `hash`, in response to an
720    /// [InputReaderCommand::Queue] request.
721    ///
722    /// If the step is one that the input adapter can restart after, or replay,
723    /// then it should supply that as `resume` (see [Resume] for details).
724    fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);
725
726    /// Reports that the endpoint has reached end of input and that no more data
727    /// will be received from the endpoint.
728    ///
729    /// If the endpoint has already indicated that it has buffered records then
730    /// the controller will request them in future [InputReaderCommand::Queue]
731    /// messages. The endpoint must not make further calls to
732    /// [InputConsumer::buffered] or [InputConsumer::parse_errors].
733    fn eoi(&self);
734
735    /// Request the controller to schedule a step even if the connector hasn't queued
736    /// any records.
737    fn request_step(&self);
738
739    /// The connector is initiating a transaction. `label` is an optional label for
740    /// the transaction for debugging purposes.
741    ///
742    /// This function can be invoked in response to a `Queue` command.
743    ///
744    /// Any updates pushed by the connector after this function is invoked will be part
745    /// of the transaction.
746    ///
747    /// The connector _must_ perform a matching call to `commit_transaction` to commit the
748    /// transaction.
749    ///
750    /// Multiple connectors can initiate a transaction concurrently, in which case their
751    /// updates will be combined into a single transaction. The transaction will be committed
752    /// when all the connectors have committed it.
753    fn start_transaction(&self, label: Option<&str>);
754
755    /// The connector is committing a transaction started by a previous `start_transaction` call.
756    ///
757    /// This function can be invoked in response to a `Queue` command after pushing all updates that
758    /// belong to the the transaction, immediately before calling `extended`. The connector cannot
759    /// queue any more updates after this function is invoked, until the next `Queue` command.
760    fn commit_transaction(&self);
761
762    /// Register connector-specific metrics for Prometheus export.
763    ///
764    /// A connector may call this once during [`TransportInputEndpoint::open`]
765    /// to provide an [`Arc<dyn ConnectorMetrics>`] whose [`ConnectorMetrics::metrics`]
766    /// will be polled on every scrape.  The default implementation is a no-op,
767    /// so connectors that have no custom metrics need not override it.
768    fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
769
770    /// Returns a watch receiver that tracks completion of pipeline steps.
771    ///
772    /// The receiver yields [`Completion`] values whose `total_completed_steps`
773    /// field indicates how many steps have been fully processed (circuit
774    /// execution + all output connectors).
775    ///
776    /// Input adapters that need to defer acknowledgment until data is durably
777    /// processed (e.g., CDC adapters controlling a replication slot) can use
778    /// this to detect when their data has been consumed.
779    ///
780    /// Return `None` if the consumer does not support completion tracking.
781    fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;
782
783    /// Endpoint failed.
784    ///
785    /// Reports that the endpoint failed and that it will not queue any more
786    /// data.
787    ///
788    /// Optional tag that can be used for additional context
789    /// e.g. for rate limiting
790    fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);
791
792    /// Updates the health status of the connector.
793    fn update_connector_health(&self, health: ConnectorHealth);
794}
795
796/// Information needed to restart after or replay input.
797///
798/// Feldera supports a few ways to checkpoint and resume a pipeline.  These
799/// operations in turn require support from the pipeline's input adapters:
800///
801/// 1. To support suspend and resume, or at-least-once fault tolerance, the
802///    input adapter must indicate, per step, how to restart from just after
803///    that step, by passing `Some(Resume::*)` to [InputConsumer::extended].
804///
805///    Such input adapters might have steps for which seeking would be
806///    impractical.  Such an input adapter may skip over those steps by passing
807///    `Some(Resume::Barrier)` instead; the controller will not try to
808///    checkpoint after them.
809///
810/// 2. To additionally support exactly once fault tolerance, the input adapter
811///    must indicate, per step, both how to restart after the step and how to
812///    replay exactly that step, by passing `Some(Resume::Replay { .. })` to
813///    [InputConsumer::extended].
814///
815///    An input adapter that supports fault tolerance may not skip steps; that
816///    is, it must supply `Some(Resume::Replay { .. })` for every step.
817#[derive(Clone, Debug)]
818pub enum Resume {
819    /// The input adapter does not support resuming after this step.
820    Barrier,
821
822    /// The input adapter can resume just after this step, but it can't replay
823    /// the step exactly.
824    Seek {
825        /// Metadata needed for the controller to restart the input adapter from
826        /// just after this input step.
827        seek: JsonValue,
828    },
829
830    /// The input adapter can replay this step exactly, or resume just after the
831    /// step.
832    ///
833    /// Input adapters can use `seek` and `replay` in different combinations:
834    ///
835    /// - Some kinds of input adapters, for example the ones for files, or for
836    ///   Kafka, can reread the input data that they used before.  These will
837    ///   ordinarily just use `seek`, filling it with a pointer just past the
838    ///   end of the data to be read. (Ordinarily, it would already know where
839    ///   the start is from the previous step, so the start pointer isn't
840    ///   usually needed.)
841    ///
842    ///   These input adapters can just set `replay` to [RmpValue::Nil].
843    ///
844    /// - Other kinds of input connectors can't seek back and reread the input
845    ///   data that they used before. The best example is the HTTP input
846    ///   connector, because which can't ask whatever client connected before to
847    ///   repeat the same exact data that it input before.  These input
848    ///   connectors have to save all the input data for replay, by putting into
849    ///   the `replay` field.
850    ///
851    ///   These input adapters can just set `seek` to [JsonValue::Null].
852    ///
853    ///   (In theory, any input connector could substitute data for metadata,
854    ///   but if the data can simply be reread using the metadata, we usually
855    ///   consider that better because it saves time and space saving all the
856    ///   data when in most cases it will never be reread.)
857    Replay {
858        /// Metadata needed for the controller to restart the input adapter from
859        /// just after this input step.
860        seek: JsonValue,
861
862        /// The data needed for the controller to replay exactly this input step
863        /// using [InputReaderCommand::Replay].
864        replay: RmpValue,
865
866        /// Hash of the input records in this step, for verification on replay.
867        ///
868        /// The input adapter can compute this in any way convenient to it, as
869        /// long as it does so the same way for reading data initially and on
870        /// replay.  On replay, the controller checks that the replayed value
871        /// matches the original one and fails the circuit if it differs.
872        hash: u64,
873    },
874}
875
876impl Resume {
877    pub fn is_barrier(&self) -> bool {
878        matches!(self, Self::Barrier)
879    }
880
881    /// Returns the `seek` value, if any, in this [Resume].
882    pub fn seek(&self) -> Option<&JsonValue> {
883        match self {
884            Resume::Barrier => None,
885            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
886        }
887    }
888
889    /// Consumes this [Resume] and returns just the `seek` value, if any.
890    pub fn into_seek(self) -> Option<JsonValue> {
891        match self {
892            Resume::Barrier => None,
893            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
894        }
895    }
896
897    /// Returns the maximum fault tolerance level that this [Resume] can
898    /// support.
899    pub fn fault_tolerance(&self) -> FtModel {
900        match self {
901            &Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
902            Resume::Replay { .. } => FtModel::ExactlyOnce,
903        }
904    }
905
906    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
907    /// `seek`; otherwise, returns `Resume::Seek` with `seek`.
908    ///
909    /// This is convenient for endpoints that only need to use metadata to
910    /// support journaling. [InputConsumer::hasher] can be a convenient way to
911    /// get a hasher.
912    pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
913        match hash {
914            Some(hash) => Self::Replay {
915                seek,
916                replay: RmpValue::Nil,
917                hash,
918            },
919            None => Self::Seek { seek },
920        }
921    }
922
923    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
924    /// whatever `replay` returns; otherwise, returns `Resume::Seek`.
925    ///
926    /// This is convenient for endpoints that support journaling by journaling
927    /// all the data (and that don't need to journal any metadata).
928    /// [InputConsumer::hasher] can be a convenient way to get a hasher.
929    pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
930    where
931        F: FnOnce() -> RmpValue,
932    {
933        let seek = JsonValue::Null;
934        match hash {
935            Some(hash) => Self::Replay {
936                seek,
937                replay: replay(),
938                hash,
939            },
940            None => Self::Seek { seek },
941        }
942    }
943}
944
945dyn_clone::clone_trait_object!(InputConsumer);
946
947/// Helper function to parse resume info passed to [`InputConsumer::extended`].
948pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
949where
950    M: DeserializeOwned,
951{
952    serde_json_path_to_error::from_value::<M>(metadata.clone())
953            .map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
954}
955
956#[doc(hidden)]
957pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;
958
959/// Command handler API exposed by connectors.
960///
961/// Connectors can support arbitrary connector-specific commands that can be
962/// invoked via the `/command` endpoint. These commands take and return arbitrary
963/// JSON values.
964///
965/// This API is not part of trait `Output[Input]Endpoint` because it can be invoked
966/// from any thread, and requires `Send + Sync`, while the `OutputEndpoint` API is
967/// not `Sync` and is meant to be called from the controller thread only.
968///
969/// The idea is that connectors that support custom commands create separate command
970/// handler objects that implement this trait and are returned by
971/// `OutputEndpoint::command_handler`.
972pub trait CommandHandler: Send + Sync {
973    /// Handle a command specified by the JSON objest.
974    ///
975    /// Fails if the connector does not support the command, the command is invalid,
976    /// or command execution fails.
977    fn command(&self, command: serde_json::Value) -> AnyResult<serde_json::Value>;
978}
979
980/// A configured output transport endpoint.
981///
982/// Output endpoints come in two flavors:
983///
984/// * A [fault-tolerant](crate#fault-tolerance) endpoint accepts output that has
985///   been divided into numbered steps.  If it is given output associated with a
986///   step number that has already been output, then it discards the duplicate.
987///   It must also keep data written to the output transport from becoming
988///   visible to downstream readers until `batch_end` is called.  (This works
989///   for output to Kafka, which supports transactional output.  If it is
990///   difficult for some future fault-tolerant output endpoint, then the API
991///   could be adjusted to support writing output only after it can become
992///   immediately visible.)
993///
994/// * A non-fault-tolerant endpoint does not have a concept of steps and ignores
995///   them.
996pub trait OutputEndpoint: Send {
997    fn command_handler(&self) -> Option<Arc<dyn CommandHandler>> {
998        None
999    }
1000
1001    /// Finishes establishing the connection to the output endpoint.
1002    ///
1003    /// If the endpoint encounters any errors during output, now or later, it
1004    /// invokes `async_error_callback` to notify the client about asynchronous
1005    /// errors, i.e., errors that happen outside the context of the
1006    /// [`OutputEndpoint::push_buffer`] method. For instance, a reliable message
1007    /// bus like Kafka may notify the endpoint about a failure to deliver a
1008    /// previously sent message via an async callback. If the endpoint is unable
1009    /// to handle this error, it must forward it to the client via the
1010    /// `async_error_callback`.  The first argument of the callback is a flag
1011    /// that indicates a fatal error that the endpoint cannot recover from.
1012    fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;
1013
1014    /// Maximum buffer size that this transport can transmit.
1015    /// The encoder should not generate buffers exceeding this size.
1016    fn max_buffer_size_bytes(&self) -> usize;
1017
1018    /// Notifies the output endpoint that data subsequently written by
1019    /// `push_buffer` belong to the given `step`.
1020    ///
1021    /// A [fault-tolerant](crate#fault-tolerance) endpoint has additional
1022    /// requirements:
1023    ///
1024    /// 1. If data for the given step has been written before, the endpoint
1025    ///    should discard it.
1026    ///
1027    /// 2. The output batch must not be made visible to downstream readers
1028    ///    before the next call to `batch_end`.
1029    fn batch_start(&mut self, _step: Step) -> AnyResult<()> {
1030        Ok(())
1031    }
1032
1033    fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;
1034
1035    /// Output a message consisting of a key/value pair, with optional headers.
1036    ///
1037    /// This API is implemented by Kafka and other transports that transmit
1038    /// messages consisting of key and value fields and is invoked by
1039    /// Kafka-specific data formats that rely on this message structure,
1040    /// e.g., Debezium. If a given transport does not implement this API, it
1041    /// should return an error.
1042    ///
1043    /// `headers` contains a list of key/optional_value pairs to be appended
1044    /// to Kafka message headers.
1045    fn push_key(
1046        &mut self,
1047        key: Option<&[u8]>,
1048        val: Option<&[u8]>,
1049        headers: &[(&str, Option<&[u8]>)],
1050    ) -> AnyResult<()>;
1051
1052    /// Notifies the output endpoint that output for the current step is
1053    /// complete.
1054    ///
1055    /// A fault-tolerant output endpoint may now make the output batch visible
1056    /// to readers.
1057    fn batch_end(&mut self) -> AnyResult<()> {
1058        Ok(())
1059    }
1060
1061    /// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
1062    fn is_fault_tolerant(&self) -> bool;
1063
1064    /// Returns the approximate amount of memory used by the connector's
1065    /// underlying implementation.  For the Kafka connectors, for example, this
1066    /// is the amount of memory used by librdkafka.  Not all connectors use a
1067    /// substantial amount of memory, so the default implementation returns 0.
1068    fn memory(&self) -> usize {
1069        0
1070    }
1071}
1072
1073/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.
1074///
1075/// A fault-tolerant connector wants to receive, in order:
1076///
1077/// - Zero or more [InputReaderCommand::Replay]s.
1078///
1079/// - Zero or more other commands.
1080///
1081/// This helps with that.
1082// This is used by Kafka and Nexmark but both of those are optional.
1083pub struct InputCommandReceiver<M, D> {
1084    receiver: UnboundedReceiver<InputReaderCommand>,
1085    buffer: Option<InputReaderCommand>,
1086    _phantom: PhantomData<(M, D)>,
1087}
1088
1089/// Error type returned by some [InputCommandReceiver] methods.
1090///
1091/// We could just use `anyhow` and that would probably be just as good though.
1092#[derive(Debug)]
1093pub enum InputCommandReceiverError {
1094    Disconnected,
1095    JsonDecodeError(serde_json_path_to_error::Error),
1096    RmpDecodeError(RmpDecodeError),
1097}
1098
1099impl std::error::Error for InputCommandReceiverError {}
1100
1101impl Display for InputCommandReceiverError {
1102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103        match self {
1104            InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
1105            InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
1106            InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
1107        }
1108    }
1109}
1110
1111impl From<RmpDecodeError> for InputCommandReceiverError {
1112    fn from(value: RmpDecodeError) -> Self {
1113        Self::RmpDecodeError(value)
1114    }
1115}
1116
1117impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
1118    fn from(value: serde_json_path_to_error::Error) -> Self {
1119        Self::JsonDecodeError(value)
1120    }
1121}
1122
1123// This is used by Kafka and Nexmark but both of those are optional.
1124impl<M, D> InputCommandReceiver<M, D> {
1125    pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
1126        Self {
1127            receiver,
1128            buffer: None,
1129            _phantom: PhantomData,
1130        }
1131    }
1132
1133    #[doc(hidden)]
1134    pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1135    where
1136        M: for<'a> Deserialize<'a>,
1137        D: for<'a> Deserialize<'a>,
1138    {
1139        let command = self.blocking_recv()?;
1140        self.take_replay(command)
1141    }
1142
1143    #[doc(hidden)]
1144    pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1145    where
1146        M: for<'a> Deserialize<'a>,
1147        D: for<'a> Deserialize<'a>,
1148    {
1149        let command = self.recv().await?;
1150        self.take_replay(command)
1151    }
1152
1153    fn take_replay(
1154        &mut self,
1155        command: InputReaderCommand,
1156    ) -> Result<Option<(M, D)>, InputCommandReceiverError>
1157    where
1158        M: for<'a> Deserialize<'a>,
1159        D: for<'a> Deserialize<'a>,
1160    {
1161        match command {
1162            InputReaderCommand::Replay { metadata, data } => Ok(Some((
1163                serde_json_path_to_error::from_value::<M>(metadata)?,
1164                rmpv::ext::from_value::<D>(data)?,
1165            ))),
1166            other => {
1167                self.put_back(other);
1168                Ok(None)
1169            }
1170        }
1171    }
1172
1173    #[doc(hidden)]
1174    pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1175        match self.buffer.take() {
1176            Some(value) => Ok(value),
1177            None => self
1178                .receiver
1179                .recv()
1180                .await
1181                .ok_or(InputCommandReceiverError::Disconnected),
1182        }
1183    }
1184
1185    #[doc(hidden)]
1186    pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1187        match self.buffer.take() {
1188            Some(value) => Ok(value),
1189            None => self
1190                .receiver
1191                .blocking_recv()
1192                .ok_or(InputCommandReceiverError::Disconnected),
1193        }
1194    }
1195
1196    #[doc(hidden)]
1197    pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
1198        if let Some(command) = self.buffer.take() {
1199            Ok(Some(command))
1200        } else {
1201            match self.receiver.try_recv() {
1202                Ok(command) => Ok(Some(command)),
1203                Err(TryRecvError::Empty) => Ok(None),
1204                Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
1205            }
1206        }
1207    }
1208
1209    #[doc(hidden)]
1210    pub fn put_back(&mut self, value: InputReaderCommand) {
1211        assert!(self.buffer.is_none());
1212        self.buffer = Some(value);
1213    }
1214}