feldera_adapterlib/transport.rs
1use anyhow::{Error as AnyError, Result as AnyResult};
2use chrono::{DateTime, Utc};
3use dyn_clone::DynClone;
4use feldera_types::adapter_stats::ConnectorHealth;
5use feldera_types::config::FtModel;
6use feldera_types::coordination::Completion;
7use feldera_types::program_schema::Relation;
8use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value as JsonValue;
12use std::collections::VecDeque;
13use std::fmt::Display;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::mpsc::UnboundedReceiver;
18use tokio::sync::mpsc::error::TryRecvError;
19use xxhash_rust::xxh3::Xxh3Default;
20
21use crate::PipelineState;
22use crate::catalog::InputCollectionHandle;
23use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
24use crate::metrics::ConnectorMetrics;
25
26/// Step number for fault-tolerant circuits.
27///
28/// The step number increases by 1 each time the circuit runs; that is, it
29/// tracks the global clock for the outermost circuit. The first step is
30/// numbered zero.
31///
32/// A [fault-tolerant](crate#fault-tolerance) output transport divides output
33/// into steps numbered sequentially. If a given step is written multiple
34/// times, the endpoint must discard the later writes.
35pub type Step = u64;
36
37/// A configured input endpoint.
38pub trait InputEndpoint: Send {
39 /// This endpoint's level of fault tolerance, if any:
40 ///
41 /// - An endpoint that returns `None` does not support suspend and resume or
42 /// any kind of fault tolerance and has no further constraints.
43 ///
44 /// - An endpoint that returns `Some(FtModel::AtLeastOnce)` can support
45 /// suspend and resume and at-least-once fault tolerance. Such an
46 /// endpoint must pass `Some(Resume::*)` to [InputConsumer::extended] for
47 /// at least some steps (see [Resume] for details).
48 ///
49 /// - An endpoint that returns `Some(FtModel::ExactlyOnce)` can support
50 /// suspend and resume, at-least-once fault tolerance, and exactly once
51 /// fault tolerance. Such an endpoint must pass `Some(Resume::Replay
52 /// {..})` to [InputConsumer::extended] for every step (see [Resume] for
53 /// details).
54 fn fault_tolerance(&self) -> Option<FtModel>;
55}
56
57pub trait TransportInputEndpoint: InputEndpoint {
58 /// Creates a new input endpoint. The endpoint should use `parser` to parse
59 /// data into records. Returns an [`InputReader`] for reading the endpoint's
60 /// data. The endpoint will use `consumer` to report its progress.
61 ///
62 /// The `resume_info` parameter is used when resuming the pipeline from a
63 /// checkpoint. It contains resume metadata that the endpoint returned via the
64 /// [`InputConsumer::extended`] function before suspending. When specified,
65 /// it tells a fault-tolerant input reader to seek past the data already read
66 /// in the step whose metadata is given by the value.
67 ///
68 /// The reader is initially paused.
69 fn open(
70 &self,
71 consumer: Box<dyn InputConsumer>,
72 parser: Box<dyn Parser>,
73 schema: Relation,
74 resume_info: Option<JsonValue>,
75 ) -> AnyResult<Box<dyn InputReader>>;
76}
77
78#[doc(hidden)]
79pub trait IntegratedInputEndpoint: InputEndpoint {
80 fn open(
81 self: Box<Self>,
82 input_handle: &InputCollectionHandle,
83 resume_info: Option<JsonValue>,
84 ) -> AnyResult<Box<dyn InputReader>>;
85}
86
87/// Commands for an [InputReader] to execute.
88///
89/// # Transitions
90///
91/// The following diagram shows the possible order in which the controller can
92/// issue commands to [InputReader]s:
93///
94/// ```text
95/// ┌─⯇─ (start) ─⯈──┐
96/// │ │ │
97/// │ │ ┌───┐ │
98/// │ ▼ ▼ │ │
99/// ├─⯇─ Replay ──┘ │
100/// │ │ │
101/// │ ▼ │
102/// ├─⯇─ Extend⯇─────┤
103/// │ │ │
104/// │ │ ┌───┐ │
105/// │ ▼ ▼ │ │
106/// ├─⯇─ Queue ──┘ │
107/// │ │ │
108/// │ ▼ │
109/// ├─⯇─ Pause ─⯈────┘
110/// │ │
111/// │ ▼
112/// └───⯈Disconnect
113/// │
114/// ▼
115/// (end)
116/// ```
117///
118/// # Stalls
119///
120/// When the controller issues a [InputReaderCommand::Replay] or
121/// [InputReaderCommand::Queue] command to an input adapter, it waits for the
122/// input adapter to respond to them. Until it receives a reply, the next step
123/// cannot proceed. An input adapter that does not respond to one of these
124/// commands will stall the entire pipeline. However, the controller also uses
125/// [InputReader::is_closed] to detect that an input adapter has died due to an
126/// error or reaching end-of-input, so input adapters for which it is difficult
127/// to handle errors gracefully can report that they have died using
128/// `is_closed`, if necessary, as described in more detail below.
129///
130/// ## End-of-input handling
131///
132/// If an input adapter reaches the end of its input, and it isn't implemented
133/// to wait for and pass along further input, then it should:
134///
135/// - Make sure that it has already indicated that it has buffered all of its
136/// data, via [InputConsumer::buffered].
137///
138/// - Call [InputConsumer::eoi] to indicate that it has reached end of input.
139///
140/// - Respond to [InputReaderCommand::Queue] until it has queued all of its
141/// input and has none left.
142///
143/// - Optionally, at this point, it may exit and start returning `true` from
144/// `InputReader::is_closed`.
145///
146/// ## Error handling
147///
148/// If an input adapter encounters a fatal error that keeps it from continuing
149/// to obtain input, then it should report the error via [InputConsumer::error]
150/// with `true` for `fatal`. Afterward, it may exit and start returning `true`
151/// from `InputReader::is_closed`.
152///
153/// ## Additional requirement
154///
155/// An input adapter should ensure that, if it flushes any records to the
156/// circuit in response to [InputReaderCommand::Replay] or
157/// [InputReaderCommand::Queue], then it finishes up and responds to the
158/// consumer using [InputConsumer::replayed] or [InputConsumer::extended],
159/// respectively. If it instead dies mid-way, then the controller will not
160/// record the step properly and fault tolerance replay will be incorrect.
161#[derive(Debug)]
162pub enum InputReaderCommand {
163 /// Tells the input reader to replay the step described by `metadata` and
164 /// `data` by reading and flushing buffers for the data in the step, and
165 /// then [InputConsumer::replayed] to signal completion.
166 ///
167 /// The input reader should report the data that it queues to
168 /// [InputConsumer::buffered] as it does the replay.
169 ///
170 /// The input reader doesn't have to process other commands while it does
171 /// the replay.
172 ///
173 /// # Constraints
174 ///
175 /// Only fault-tolerant input readers need to accept this. It will be issued
176 /// zero or more times, before any other command.
177 Replay { metadata: JsonValue, data: RmpValue },
178
179 /// Tells the input reader to accept further input. The first time it
180 /// receives this command, the reader should start from the resume point
181 /// passed as `resume_info` when the endpoint was opened, if any, and
182 /// otherwise from the beginning of input.
183 ///
184 /// The input reader should report the data that it queues to
185 /// [InputConsumer::buffered] as it queues it.
186 ///
187 /// # Constraints
188 ///
189 /// The controller will not call this function:
190 ///
191 /// - Twice on a given reader without an intervening
192 /// [InputReaderCommand::Pause].
193 ///
194 /// - If it requested a replay (with [InputReaderCommand::Replay]) and the reader
195 /// hasn't yet reported that the replay is complete.
196 Extend,
197
198 /// Tells the input reader to stop reading more input.
199 ///
200 /// The controller uses this to limit the number of buffered records and to
201 /// respond to user requests to pause the pipeline.
202 ///
203 /// # Constraints
204 ///
205 /// The controller issues this only after a paired
206 /// [InputReaderCommand::Extend].
207 Pause,
208
209 /// Tells the input reader to flush input buffers to the circuit.
210 ///
211 /// The input reader can call [InputConsumer::max_batch_size] to find out
212 /// how many records it should flush. When it's done, it must call
213 /// [InputConsumer::extended] to report it.
214 ///
215 /// The `checkpoint_requested` flag indicates that the controller is trying
216 /// to checkpoint or suspend the pipeline. This serves as a hint to the reader
217 /// to try to clear the checkpoint barrier by returning [Resume::Seek] or
218 /// [Resume::Replay] if possible. For instance, if the reader has multiple
219 /// buffers queued, it can choose to stop flushing them after reaching the first
220 /// buffer that corresponds to a seekable position in the input stream.
221 ///
222 /// # Constraints
223 ///
224 /// The controller won't issue this command before it first issues [InputReaderCommand::Extend].
225 Queue { checkpoint_requested: bool },
226
227 /// Tells the reader it's going to be dropped soon and should clean up.
228 ///
229 /// The reader can continue to queue some data buffers afterward if that's
230 /// the easiest implementation.
231 ///
232 /// # Constraints
233 ///
234 /// The controller calls this only once and won't call any other functions
235 /// for a given reader after it calls this one.
236 Disconnect,
237}
238
239impl InputReaderCommand {
240 /// Returns this command translated to a [NonFtInputReaderCommand], or
241 /// `None` if that is not possible (because this command is only for
242 /// fault-tolerant endpoints).
243 pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
244 match self {
245 InputReaderCommand::Replay { .. } => None,
246 InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
247 InputReaderCommand::Extend => {
248 Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
249 }
250 InputReaderCommand::Pause => {
251 Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
252 }
253 InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
254 PipelineState::Terminated,
255 )),
256 }
257 }
258}
259
260/// A subset of [InputReaderCommand] that only includes the commands for
261/// non-fault-tolerant connectors.
262#[derive(Debug)]
263pub enum NonFtInputReaderCommand {
264 /// Equivalent to [InputReaderCommand::Queue].
265 Queue,
266
267 /// Equivalencies:
268 ///
269 /// - `Transition(PipelineState::Paused)`: [InputReaderCommand::Pause].
270 ///
271 /// - `Transition(PipelineState::Running)`: [InputReaderCommand::Extend].
272 ///
273 /// - `Transition(PipelineState::Terminated)`: [InputReaderCommand::Disconnect].
274 Transition(PipelineState),
275}
276
277#[doc(hidden)]
278pub struct InputQueueEntry<A, B> {
279 /// Data buffer to push to the circuit.
280 buffer: Option<B>,
281
282 /// Time when data in this buffer was received from the transport endpoint.
283 /// It is used to track the processing latency in different stages of the pipeline.
284 timestamp: DateTime<Utc>,
285
286 /// Start a transaction with the given label before pushing the buffer to the circuit
287 /// unless a transaction is already in progress.
288 start_transaction: Option<Option<String>>,
289
290 /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
291 commit_transaction: bool,
292
293 /// Auxiliary data associated with the buffer.
294 aux: A,
295}
296
297impl<A, B> InputQueueEntry<A, B> {
298 #[doc(hidden)]
299 pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
300 Self {
301 buffer: None,
302 timestamp,
303 start_transaction: None,
304 commit_transaction: false,
305 aux,
306 }
307 }
308
309 #[doc(hidden)]
310 pub fn with_buffer(self, buffer: Option<B>) -> Self {
311 Self { buffer, ..self }
312 }
313
314 /// Start a transaction with the given label before pushing the buffer to the circuit
315 /// unless a transaction is already in progress.
316 pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
317 Self {
318 start_transaction,
319 ..self
320 }
321 }
322
323 /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
324 pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
325 Self {
326 commit_transaction,
327 ..self
328 }
329 }
330}
331
332/// A thread-safe queue for collecting and flushing input buffers.
333///
334/// Commonly used by `InputReader` implementations for staging buffers from
335/// worker threads.
336pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
337 #[allow(clippy::type_complexity)]
338 pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
339 pub consumer: Box<dyn InputConsumer>,
340 pub transaction_in_progress: AtomicBool,
341}
342
343impl<A, B: InputBuffer> InputQueue<A, B> {
344 pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
345 Self {
346 queue: Mutex::new(VecDeque::new()),
347 consumer,
348 transaction_in_progress: AtomicBool::new(false),
349 }
350 }
351
352 pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
353 self.consumer.parse_errors(errors);
354 let len = entry
355 .buffer
356 .as_ref()
357 .map_or(BufferSize::empty(), |buffer| buffer.len());
358
359 let mut queue = self.queue.lock().unwrap();
360 queue.push_back(entry);
361 self.consumer.buffered(len);
362
363 // The endpoint pushed an empty buffer. This likely indicates that the accompanying aux data
364 // needs to be processed by the endpoint after preceding buffers have been flushed. However,
365 // since we didn't report any buffered records, the controller may never perform another step,
366 // so we nudge it to do it.
367 if len.records == 0 {
368 self.consumer.request_step();
369 }
370 }
371
372 /// Appends `buffer`, to the queue, and associates it with `aux`. Reports
373 /// to the controller that `errors` have occurred during parsing.
374 pub fn push_with_aux(
375 &self,
376 (buffer, errors): (Option<B>, Vec<ParseError>),
377 timestamp: DateTime<Utc>,
378 aux: A,
379 ) {
380 let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);
381
382 self.push_entry(entry, errors);
383 }
384
385 /// Flushes a batch of records to the circuit and returns the auxiliary data
386 /// that was associated with those records.
387 ///
388 /// This always flushes whole buffers to the circuit (with `flush`),
389 /// since auxiliary data is associated with a whole buffer rather than with
390 /// individual records. If the auxiliary data type `A` is `()`, then
391 /// [InputQueue<()>::flush] avoids that and so is a better choice.
392 #[allow(clippy::type_complexity)]
393 pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
394 self.flush_with_aux_until(&|_| false)
395 }
396
397 /// Flushes a batch of records to the circuit and returns the auxiliary data
398 /// that was associated with those records.
399 ///
400 /// Stops after flushing at least `max_batch_size` records or after flushing a
401 /// buffer whose auxiliary data satisfies the `stop_at` predicate, whichever
402 /// happens first.
403 ///
404 /// This always flushes whole buffers to the circuit (with `flush`),
405 /// since auxiliary data is associated with a whole buffer rather than with
406 /// individual records. If the auxiliary data type `A` is `()`, then
407 /// [InputQueue<()>::flush] avoids that and so is a better choice.
408 #[allow(clippy::type_complexity)]
409 pub fn flush_with_aux_until(
410 &self,
411 stop_at: &dyn Fn(&A) -> bool,
412 ) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
413 let mut total = BufferSize::empty();
414 let mut hasher = self.consumer.hasher();
415 let n = self.consumer.max_batch_size();
416 let mut consumed_aux = Vec::new();
417
418 let mut stop = false;
419
420 while !stop && total.records < n {
421 let Some(InputQueueEntry {
422 buffer,
423 timestamp,
424 aux,
425 start_transaction,
426 commit_transaction,
427 }) = self.queue.lock().unwrap().pop_front()
428 else {
429 break;
430 };
431
432 if let Some(label) = start_transaction {
433 self.start_transaction(label.as_deref());
434 }
435
436 if let Some(mut buffer) = buffer {
437 total += buffer.len();
438 if let Some(hasher) = hasher.as_mut() {
439 buffer.hash(hasher);
440 }
441 buffer.flush();
442 }
443
444 stop = stop_at(&aux);
445 consumed_aux.push((timestamp, aux));
446
447 if commit_transaction && self.commit_transaction() {
448 break;
449 }
450 }
451
452 // Process any entries with aux data only.
453 let mut queue = self.queue.lock().unwrap();
454 while !stop
455 && queue
456 .front()
457 .is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
458 {
459 let Some(InputQueueEntry {
460 timestamp,
461 aux,
462 start_transaction,
463 commit_transaction,
464 ..
465 }) = queue.pop_front()
466 else {
467 break;
468 };
469
470 if let Some(label) = start_transaction {
471 self.start_transaction(label.as_deref());
472 }
473
474 stop = stop_at(&aux);
475 consumed_aux.push((timestamp, aux));
476
477 if commit_transaction && self.commit_transaction() {
478 break;
479 }
480 }
481
482 (total, hasher, consumed_aux)
483 }
484
485 pub fn len(&self) -> usize {
486 self.queue.lock().unwrap().len()
487 }
488
489 pub fn is_empty(&self) -> bool {
490 self.len() == 0
491 }
492
493 fn start_transaction(&self, label: Option<&str>) -> bool {
494 if self
495 .transaction_in_progress
496 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
497 .is_ok()
498 {
499 self.consumer.start_transaction(label);
500 true
501 } else {
502 false
503 }
504 }
505
506 fn commit_transaction(&self) -> bool {
507 if self
508 .transaction_in_progress
509 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
510 .is_ok()
511 {
512 self.consumer.commit_transaction();
513 true
514 } else {
515 false
516 }
517 }
518}
519
520impl InputQueue<(), Box<dyn InputBuffer>> {
521 /// Appends `buffer`, if nonempty,` to the queue. Reports to the controller
522 /// that `errors` occurred during parsing.
523 pub fn push(
524 &self,
525 (buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
526 timestamp: DateTime<Utc>,
527 ) {
528 self.push_with_aux((buffer, errors), timestamp, ())
529 }
530
531 /// Flushes a batch of records to the circuit and reports to the consumer
532 /// that it was done.
533 ///
534 /// Only non-fault-tolerant input adapters can use this.
535 pub fn queue(&self) {
536 let mut total = BufferSize::empty();
537 let n = self.consumer.max_batch_size();
538 let mut consumed = Vec::new();
539
540 while total.records < n {
541 let Some(InputQueueEntry {
542 buffer,
543 timestamp,
544 start_transaction,
545 commit_transaction,
546 ..
547 }) = self.queue.lock().unwrap().pop_front()
548 else {
549 break;
550 };
551
552 if let Some(label) = start_transaction
553 && self
554 .transaction_in_progress
555 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
556 .is_ok()
557 {
558 self.consumer.start_transaction(label.as_deref());
559 }
560
561 if let Some(mut buffer) = buffer {
562 let mut taken = buffer.take_some(n - total.records);
563 total += taken.len();
564 consumed.push(Watermark::new(timestamp, None));
565 taken.flush();
566 drop(taken);
567 if !buffer.is_empty() {
568 self.queue.lock().unwrap().push_front(InputQueueEntry {
569 buffer: Some(buffer),
570 timestamp,
571 start_transaction: None,
572 commit_transaction,
573 aux: (),
574 });
575 break;
576 }
577 }
578
579 if commit_transaction {
580 if self
581 .transaction_in_progress
582 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
583 .is_ok()
584 {
585 self.consumer.commit_transaction();
586 }
587 break;
588 }
589 }
590 self.consumer.extended(total, None, consumed);
591 }
592}
593
594/// Reads data from an endpoint.
595///
596/// Use [`TransportInputEndpoint::open`] to obtain an [`InputReader`].
597pub trait InputReader: Send + Sync {
598 fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
599
600 /// Requests the input reader to execute `command`.
601 fn request(&self, command: InputReaderCommand);
602
603 /// Returns true if the endpoint is closed, meaning that it has already
604 /// acted on all of the commands that it ever will. A closed endpoint can be
605 /// one that came to the end of its input (and is not waiting for more to
606 /// arrive) or one that encountered a fatal error and cannot continue.
607 ///
608 /// An endpoint is often implemented in terms of a channel to a thread. In
609 /// such a case, this can be implemented in terms of `is_closed` on the
610 /// channel's sender.
611 fn is_closed(&self) -> bool;
612
613 fn replay(&self, metadata: JsonValue, data: RmpValue) {
614 self.request(InputReaderCommand::Replay { metadata, data });
615 }
616
617 fn extend(&self) {
618 self.request(InputReaderCommand::Extend);
619 }
620
621 fn pause(&self) {
622 self.request(InputReaderCommand::Pause);
623 }
624
625 fn queue(&self, checkpoint_requested: bool) {
626 self.request(InputReaderCommand::Queue {
627 checkpoint_requested,
628 });
629 }
630
631 fn disconnect(&self) {
632 self.request(InputReaderCommand::Disconnect);
633 }
634
635 /// Returns the approximate amount of memory used by the connector's
636 /// underlying implementation. For the Kafka connectors, for example, this
637 /// is the amount of memory used by librdkafka. Not all connectors use a
638 /// substantial amount of memory, so the default implementation returns 0.
639 fn memory(&self) -> usize {
640 0
641 }
642}
643
644/// Position in an input stream, including the timestamp when the data was ingested
645/// from the transport endpoint and transport-specific metadata such as delta table
646/// version or Kafka partition offsets.
647#[derive(Clone, Debug)]
648pub struct Watermark {
649 pub timestamp: DateTime<Utc>,
650 pub metadata: Option<JsonValue>,
651}
652
653impl Watermark {
654 pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
655 Self {
656 timestamp,
657 metadata,
658 }
659 }
660}
661
662/// Input stream consumer.
663///
664/// A transport endpoint pushes binary data downstream via an instance of this
665/// trait.
666pub trait InputConsumer: Send + Sync + DynClone {
667 /// Returns the maximum number of records that an `InputReader` should queue
668 /// in response to a [InputReaderCommand::Queue] command.
669 ///
670 /// Nothing keeps the endpoint from queuing more than this if necessary (for
671 /// example, if for the sake of lateness it needs to group more than this
672 /// number of records together).
673 fn max_batch_size(&self) -> usize;
674
675 /// Returns the level of fault tolerance that the pipeline supports, if any.
676 ///
677 /// An endpoint only needs to implement `min(endpoint_ft, pipeline_ft)`
678 /// fault tolerance, where `endpoint_ft` is what the endpoint returns from
679 /// `InputEndpoint::fault_tolerance` and `pipeline_ft` is what this function
680 /// returns. For example, if an input adapter supports
681 /// `Some(FtModel::ExactlyOnce)`, but the pipeline's fault tolerance level
682 /// is `None`, then the input adapter can simply pass `None` as `resume` to
683 /// [InputConsumer::extended]. This optimization is, probably, worthwhile
684 /// only to input adapters that log a copy of all of their data, instead of
685 /// just metadata.
686 fn pipeline_fault_tolerance(&self) -> Option<FtModel>;
687
688 /// Returns a hasher, if the fault tolerance model calls for hashing, and
689 /// `None` otherwise.
690 ///
691 /// This is just a convenience method. Connectors can do hashing any way
692 /// they like, as long as they do it the same way for new data and for
693 /// replays.
694 fn hasher(&self) -> Option<Xxh3Default> {
695 match self.pipeline_fault_tolerance() {
696 Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
697 _ => None,
698 }
699 }
700
701 /// Reports `errors` as parse errors.
702 fn parse_errors(&self, errors: Vec<ParseError>);
703
704 /// Reports that the input adapter has internally buffered `amt` records and
705 /// bytes.
706 ///
707 /// Fault-tolerant input adapters should report buffered data during replay
708 /// as well as in normal operation.
709 fn buffered(&self, amt: BufferSize);
710
711 /// Reports that the input adapter has completed flushing `amt` data to the
712 /// circuit, that hash to `hash`, in response to an
713 /// [InputReaderCommand::Replay] request.
714 ///
715 /// Only a fault-tolerant input adapter will invoke this.
716 fn replayed(&self, amt: BufferSize, hash: u64);
717
718 /// Reports that the input adapter has completed flushing `amt` data to the
719 /// circuit, that hash to `hash`, in response to an
720 /// [InputReaderCommand::Queue] request.
721 ///
722 /// If the step is one that the input adapter can restart after, or replay,
723 /// then it should supply that as `resume` (see [Resume] for details).
724 fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);
725
726 /// Reports that the endpoint has reached end of input and that no more data
727 /// will be received from the endpoint.
728 ///
729 /// If the endpoint has already indicated that it has buffered records then
730 /// the controller will request them in future [InputReaderCommand::Queue]
731 /// messages. The endpoint must not make further calls to
732 /// [InputConsumer::buffered] or [InputConsumer::parse_errors].
733 fn eoi(&self);
734
735 /// Request the controller to schedule a step even if the connector hasn't queued
736 /// any records.
737 fn request_step(&self);
738
739 /// The connector is initiating a transaction. `label` is an optional label for
740 /// the transaction for debugging purposes.
741 ///
742 /// This function can be invoked in response to a `Queue` command.
743 ///
744 /// Any updates pushed by the connector after this function is invoked will be part
745 /// of the transaction.
746 ///
747 /// The connector _must_ perform a matching call to `commit_transaction` to commit the
748 /// transaction.
749 ///
750 /// Multiple connectors can initiate a transaction concurrently, in which case their
751 /// updates will be combined into a single transaction. The transaction will be committed
752 /// when all the connectors have committed it.
753 fn start_transaction(&self, label: Option<&str>);
754
755 /// The connector is committing a transaction started by a previous `start_transaction` call.
756 ///
757 /// This function can be invoked in response to a `Queue` command after pushing all updates that
758 /// belong to the the transaction, immediately before calling `extended`. The connector cannot
759 /// queue any more updates after this function is invoked, until the next `Queue` command.
760 fn commit_transaction(&self);
761
762 /// Register connector-specific metrics for Prometheus export.
763 ///
764 /// A connector may call this once during [`TransportInputEndpoint::open`]
765 /// to provide an [`Arc<dyn ConnectorMetrics>`] whose [`ConnectorMetrics::metrics`]
766 /// will be polled on every scrape. The default implementation is a no-op,
767 /// so connectors that have no custom metrics need not override it.
768 fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
769
770 /// Returns a watch receiver that tracks completion of pipeline steps.
771 ///
772 /// The receiver yields [`Completion`] values whose `total_completed_steps`
773 /// field indicates how many steps have been fully processed (circuit
774 /// execution + all output connectors).
775 ///
776 /// Input adapters that need to defer acknowledgment until data is durably
777 /// processed (e.g., CDC adapters controlling a replication slot) can use
778 /// this to detect when their data has been consumed.
779 ///
780 /// Return `None` if the consumer does not support completion tracking.
781 fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;
782
783 /// Endpoint failed.
784 ///
785 /// Reports that the endpoint failed and that it will not queue any more
786 /// data.
787 ///
788 /// Optional tag that can be used for additional context
789 /// e.g. for rate limiting
790 fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);
791
792 /// Updates the health status of the connector.
793 fn update_connector_health(&self, health: ConnectorHealth);
794}
795
796/// Information needed to restart after or replay input.
797///
798/// Feldera supports a few ways to checkpoint and resume a pipeline. These
799/// operations in turn require support from the pipeline's input adapters:
800///
801/// 1. To support suspend and resume, or at-least-once fault tolerance, the
802/// input adapter must indicate, per step, how to restart from just after
803/// that step, by passing `Some(Resume::*)` to [InputConsumer::extended].
804///
805/// Such input adapters might have steps for which seeking would be
806/// impractical. Such an input adapter may skip over those steps by passing
807/// `Some(Resume::Barrier)` instead; the controller will not try to
808/// checkpoint after them.
809///
810/// 2. To additionally support exactly once fault tolerance, the input adapter
811/// must indicate, per step, both how to restart after the step and how to
812/// replay exactly that step, by passing `Some(Resume::Replay { .. })` to
813/// [InputConsumer::extended].
814///
815/// An input adapter that supports fault tolerance may not skip steps; that
816/// is, it must supply `Some(Resume::Replay { .. })` for every step.
817#[derive(Clone, Debug)]
818pub enum Resume {
819 /// The input adapter does not support resuming after this step.
820 Barrier,
821
822 /// The input adapter can resume just after this step, but it can't replay
823 /// the step exactly.
824 Seek {
825 /// Metadata needed for the controller to restart the input adapter from
826 /// just after this input step.
827 seek: JsonValue,
828 },
829
830 /// The input adapter can replay this step exactly, or resume just after the
831 /// step.
832 ///
833 /// Input adapters can use `seek` and `replay` in different combinations:
834 ///
835 /// - Some kinds of input adapters, for example the ones for files, or for
836 /// Kafka, can reread the input data that they used before. These will
837 /// ordinarily just use `seek`, filling it with a pointer just past the
838 /// end of the data to be read. (Ordinarily, it would already know where
839 /// the start is from the previous step, so the start pointer isn't
840 /// usually needed.)
841 ///
842 /// These input adapters can just set `replay` to [RmpValue::Nil].
843 ///
844 /// - Other kinds of input connectors can't seek back and reread the input
845 /// data that they used before. The best example is the HTTP input
846 /// connector, because which can't ask whatever client connected before to
847 /// repeat the same exact data that it input before. These input
848 /// connectors have to save all the input data for replay, by putting into
849 /// the `replay` field.
850 ///
851 /// These input adapters can just set `seek` to [JsonValue::Null].
852 ///
853 /// (In theory, any input connector could substitute data for metadata,
854 /// but if the data can simply be reread using the metadata, we usually
855 /// consider that better because it saves time and space saving all the
856 /// data when in most cases it will never be reread.)
857 Replay {
858 /// Metadata needed for the controller to restart the input adapter from
859 /// just after this input step.
860 seek: JsonValue,
861
862 /// The data needed for the controller to replay exactly this input step
863 /// using [InputReaderCommand::Replay].
864 replay: RmpValue,
865
866 /// Hash of the input records in this step, for verification on replay.
867 ///
868 /// The input adapter can compute this in any way convenient to it, as
869 /// long as it does so the same way for reading data initially and on
870 /// replay. On replay, the controller checks that the replayed value
871 /// matches the original one and fails the circuit if it differs.
872 hash: u64,
873 },
874}
875
876impl Resume {
877 pub fn is_barrier(&self) -> bool {
878 matches!(self, Self::Barrier)
879 }
880
881 /// Returns the `seek` value, if any, in this [Resume].
882 pub fn seek(&self) -> Option<&JsonValue> {
883 match self {
884 Resume::Barrier => None,
885 Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
886 }
887 }
888
889 /// Consumes this [Resume] and returns just the `seek` value, if any.
890 pub fn into_seek(self) -> Option<JsonValue> {
891 match self {
892 Resume::Barrier => None,
893 Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
894 }
895 }
896
897 /// Returns the maximum fault tolerance level that this [Resume] can
898 /// support.
899 pub fn fault_tolerance(&self) -> FtModel {
900 match self {
901 &Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
902 Resume::Replay { .. } => FtModel::ExactlyOnce,
903 }
904 }
905
906 /// If `hash` is provided, returns `Resume::Replay` with its hash value and
907 /// `seek`; otherwise, returns `Resume::Seek` with `seek`.
908 ///
909 /// This is convenient for endpoints that only need to use metadata to
910 /// support journaling. [InputConsumer::hasher] can be a convenient way to
911 /// get a hasher.
912 pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
913 match hash {
914 Some(hash) => Self::Replay {
915 seek,
916 replay: RmpValue::Nil,
917 hash,
918 },
919 None => Self::Seek { seek },
920 }
921 }
922
923 /// If `hash` is provided, returns `Resume::Replay` with its hash value and
924 /// whatever `replay` returns; otherwise, returns `Resume::Seek`.
925 ///
926 /// This is convenient for endpoints that support journaling by journaling
927 /// all the data (and that don't need to journal any metadata).
928 /// [InputConsumer::hasher] can be a convenient way to get a hasher.
929 pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
930 where
931 F: FnOnce() -> RmpValue,
932 {
933 let seek = JsonValue::Null;
934 match hash {
935 Some(hash) => Self::Replay {
936 seek,
937 replay: replay(),
938 hash,
939 },
940 None => Self::Seek { seek },
941 }
942 }
943}
944
945dyn_clone::clone_trait_object!(InputConsumer);
946
947/// Helper function to parse resume info passed to [`InputConsumer::extended`].
948pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
949where
950 M: DeserializeOwned,
951{
952 serde_json_path_to_error::from_value::<M>(metadata.clone())
953 .map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
954}
955
956#[doc(hidden)]
957pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;
958
959/// Command handler API exposed by connectors.
960///
961/// Connectors can support arbitrary connector-specific commands that can be
962/// invoked via the `/command` endpoint. These commands take and return arbitrary
963/// JSON values.
964///
965/// This API is not part of trait `Output[Input]Endpoint` because it can be invoked
966/// from any thread, and requires `Send + Sync`, while the `OutputEndpoint` API is
967/// not `Sync` and is meant to be called from the controller thread only.
968///
969/// The idea is that connectors that support custom commands create separate command
970/// handler objects that implement this trait and are returned by
971/// `OutputEndpoint::command_handler`.
972pub trait CommandHandler: Send + Sync {
973 /// Handle a command specified by the JSON objest.
974 ///
975 /// Fails if the connector does not support the command, the command is invalid,
976 /// or command execution fails.
977 fn command(&self, command: serde_json::Value) -> AnyResult<serde_json::Value>;
978}
979
980/// A configured output transport endpoint.
981///
982/// Output endpoints come in two flavors:
983///
984/// * A [fault-tolerant](crate#fault-tolerance) endpoint accepts output that has
985/// been divided into numbered steps. If it is given output associated with a
986/// step number that has already been output, then it discards the duplicate.
987/// It must also keep data written to the output transport from becoming
988/// visible to downstream readers until `batch_end` is called. (This works
989/// for output to Kafka, which supports transactional output. If it is
990/// difficult for some future fault-tolerant output endpoint, then the API
991/// could be adjusted to support writing output only after it can become
992/// immediately visible.)
993///
994/// * A non-fault-tolerant endpoint does not have a concept of steps and ignores
995/// them.
996pub trait OutputEndpoint: Send {
997 fn command_handler(&self) -> Option<Arc<dyn CommandHandler>> {
998 None
999 }
1000
1001 /// Finishes establishing the connection to the output endpoint.
1002 ///
1003 /// If the endpoint encounters any errors during output, now or later, it
1004 /// invokes `async_error_callback` to notify the client about asynchronous
1005 /// errors, i.e., errors that happen outside the context of the
1006 /// [`OutputEndpoint::push_buffer`] method. For instance, a reliable message
1007 /// bus like Kafka may notify the endpoint about a failure to deliver a
1008 /// previously sent message via an async callback. If the endpoint is unable
1009 /// to handle this error, it must forward it to the client via the
1010 /// `async_error_callback`. The first argument of the callback is a flag
1011 /// that indicates a fatal error that the endpoint cannot recover from.
1012 fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;
1013
1014 /// Maximum buffer size that this transport can transmit.
1015 /// The encoder should not generate buffers exceeding this size.
1016 fn max_buffer_size_bytes(&self) -> usize;
1017
1018 /// Notifies the output endpoint that data subsequently written by
1019 /// `push_buffer` belong to the given `step`.
1020 ///
1021 /// A [fault-tolerant](crate#fault-tolerance) endpoint has additional
1022 /// requirements:
1023 ///
1024 /// 1. If data for the given step has been written before, the endpoint
1025 /// should discard it.
1026 ///
1027 /// 2. The output batch must not be made visible to downstream readers
1028 /// before the next call to `batch_end`.
1029 fn batch_start(&mut self, _step: Step) -> AnyResult<()> {
1030 Ok(())
1031 }
1032
1033 fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;
1034
1035 /// Output a message consisting of a key/value pair, with optional headers.
1036 ///
1037 /// This API is implemented by Kafka and other transports that transmit
1038 /// messages consisting of key and value fields and is invoked by
1039 /// Kafka-specific data formats that rely on this message structure,
1040 /// e.g., Debezium. If a given transport does not implement this API, it
1041 /// should return an error.
1042 ///
1043 /// `headers` contains a list of key/optional_value pairs to be appended
1044 /// to Kafka message headers.
1045 fn push_key(
1046 &mut self,
1047 key: Option<&[u8]>,
1048 val: Option<&[u8]>,
1049 headers: &[(&str, Option<&[u8]>)],
1050 ) -> AnyResult<()>;
1051
1052 /// Notifies the output endpoint that output for the current step is
1053 /// complete.
1054 ///
1055 /// A fault-tolerant output endpoint may now make the output batch visible
1056 /// to readers.
1057 fn batch_end(&mut self) -> AnyResult<()> {
1058 Ok(())
1059 }
1060
1061 /// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
1062 fn is_fault_tolerant(&self) -> bool;
1063
1064 /// Returns the approximate amount of memory used by the connector's
1065 /// underlying implementation. For the Kafka connectors, for example, this
1066 /// is the amount of memory used by librdkafka. Not all connectors use a
1067 /// substantial amount of memory, so the default implementation returns 0.
1068 fn memory(&self) -> usize {
1069 0
1070 }
1071}
1072
1073/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.
1074///
1075/// A fault-tolerant connector wants to receive, in order:
1076///
1077/// - Zero or more [InputReaderCommand::Replay]s.
1078///
1079/// - Zero or more other commands.
1080///
1081/// This helps with that.
1082// This is used by Kafka and Nexmark but both of those are optional.
1083pub struct InputCommandReceiver<M, D> {
1084 receiver: UnboundedReceiver<InputReaderCommand>,
1085 buffer: Option<InputReaderCommand>,
1086 _phantom: PhantomData<(M, D)>,
1087}
1088
1089/// Error type returned by some [InputCommandReceiver] methods.
1090///
1091/// We could just use `anyhow` and that would probably be just as good though.
1092#[derive(Debug)]
1093pub enum InputCommandReceiverError {
1094 Disconnected,
1095 JsonDecodeError(serde_json_path_to_error::Error),
1096 RmpDecodeError(RmpDecodeError),
1097}
1098
1099impl std::error::Error for InputCommandReceiverError {}
1100
1101impl Display for InputCommandReceiverError {
1102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1103 match self {
1104 InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
1105 InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
1106 InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
1107 }
1108 }
1109}
1110
1111impl From<RmpDecodeError> for InputCommandReceiverError {
1112 fn from(value: RmpDecodeError) -> Self {
1113 Self::RmpDecodeError(value)
1114 }
1115}
1116
1117impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
1118 fn from(value: serde_json_path_to_error::Error) -> Self {
1119 Self::JsonDecodeError(value)
1120 }
1121}
1122
1123// This is used by Kafka and Nexmark but both of those are optional.
1124impl<M, D> InputCommandReceiver<M, D> {
1125 pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
1126 Self {
1127 receiver,
1128 buffer: None,
1129 _phantom: PhantomData,
1130 }
1131 }
1132
1133 #[doc(hidden)]
1134 pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1135 where
1136 M: for<'a> Deserialize<'a>,
1137 D: for<'a> Deserialize<'a>,
1138 {
1139 let command = self.blocking_recv()?;
1140 self.take_replay(command)
1141 }
1142
1143 #[doc(hidden)]
1144 pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1145 where
1146 M: for<'a> Deserialize<'a>,
1147 D: for<'a> Deserialize<'a>,
1148 {
1149 let command = self.recv().await?;
1150 self.take_replay(command)
1151 }
1152
1153 fn take_replay(
1154 &mut self,
1155 command: InputReaderCommand,
1156 ) -> Result<Option<(M, D)>, InputCommandReceiverError>
1157 where
1158 M: for<'a> Deserialize<'a>,
1159 D: for<'a> Deserialize<'a>,
1160 {
1161 match command {
1162 InputReaderCommand::Replay { metadata, data } => Ok(Some((
1163 serde_json_path_to_error::from_value::<M>(metadata)?,
1164 rmpv::ext::from_value::<D>(data)?,
1165 ))),
1166 other => {
1167 self.put_back(other);
1168 Ok(None)
1169 }
1170 }
1171 }
1172
1173 #[doc(hidden)]
1174 pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1175 match self.buffer.take() {
1176 Some(value) => Ok(value),
1177 None => self
1178 .receiver
1179 .recv()
1180 .await
1181 .ok_or(InputCommandReceiverError::Disconnected),
1182 }
1183 }
1184
1185 #[doc(hidden)]
1186 pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1187 match self.buffer.take() {
1188 Some(value) => Ok(value),
1189 None => self
1190 .receiver
1191 .blocking_recv()
1192 .ok_or(InputCommandReceiverError::Disconnected),
1193 }
1194 }
1195
1196 #[doc(hidden)]
1197 pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
1198 if let Some(command) = self.buffer.take() {
1199 Ok(Some(command))
1200 } else {
1201 match self.receiver.try_recv() {
1202 Ok(command) => Ok(Some(command)),
1203 Err(TryRecvError::Empty) => Ok(None),
1204 Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
1205 }
1206 }
1207 }
1208
1209 #[doc(hidden)]
1210 pub fn put_back(&mut self, value: InputReaderCommand) {
1211 assert!(self.buffer.is_none());
1212 self.buffer = Some(value);
1213 }
1214}