feldera_adapterlib/transport.rs
1use anyhow::{Error as AnyError, Result as AnyResult};
2use chrono::{DateTime, Utc};
3use dyn_clone::DynClone;
4use feldera_types::adapter_stats::ConnectorHealth;
5use feldera_types::config::FtModel;
6use feldera_types::coordination::Completion;
7use feldera_types::program_schema::Relation;
8use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::Value as JsonValue;
12use std::collections::VecDeque;
13use std::fmt::Display;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::mpsc::UnboundedReceiver;
18use tokio::sync::mpsc::error::TryRecvError;
19use xxhash_rust::xxh3::Xxh3Default;
20
21use crate::PipelineState;
22use crate::catalog::InputCollectionHandle;
23use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
24use crate::metrics::ConnectorMetrics;
25
26/// Step number for fault-tolerant circuits.
27///
28/// The step number increases by 1 each time the circuit runs; that is, it
29/// tracks the global clock for the outermost circuit. The first step is
30/// numbered zero.
31///
32/// A [fault-tolerant](crate#fault-tolerance) output transport divides output
33/// into steps numbered sequentially. If a given step is written multiple
34/// times, the endpoint must discard the later writes.
35pub type Step = u64;
36
37/// A configured input endpoint.
38pub trait InputEndpoint: Send {
39 /// This endpoint's level of fault tolerance, if any:
40 ///
41 /// - An endpoint that returns `None` does not support suspend and resume or
42 /// any kind of fault tolerance and has no further constraints.
43 ///
44 /// - An endpoint that returns `Some(FtModel::AtLeastOnce)` can support
45 /// suspend and resume and at-least-once fault tolerance. Such an
46 /// endpoint must pass `Some(Resume::*)` to [InputConsumer::extended] for
47 /// at least some steps (see [Resume] for details).
48 ///
49 /// - An endpoint that returns `Some(FtModel::ExactlyOnce)` can support
50 /// suspend and resume, at-least-once fault tolerance, and exactly once
51 /// fault tolerance. Such an endpoint must pass `Some(Resume::Replay
52 /// {..})` to [InputConsumer::extended] for every step (see [Resume] for
53 /// details).
54 fn fault_tolerance(&self) -> Option<FtModel>;
55}
56
57pub trait TransportInputEndpoint: InputEndpoint {
58 /// Creates a new input endpoint. The endpoint should use `parser` to parse
59 /// data into records. Returns an [`InputReader`] for reading the endpoint's
60 /// data. The endpoint will use `consumer` to report its progress.
61 ///
62 /// The `resume_info` parameter is used when resuming the pipeline from a
63 /// checkpoint. It contains resume metadata that the endpoint returned via the
64 /// [`InputConsumer::extended`] function before suspending. When specified,
65 /// it tells a fault-tolerant input reader to seek past the data already read
66 /// in the step whose metadata is given by the value.
67 ///
68 /// The reader is initially paused.
69 fn open(
70 &self,
71 consumer: Box<dyn InputConsumer>,
72 parser: Box<dyn Parser>,
73 schema: Relation,
74 resume_info: Option<JsonValue>,
75 ) -> AnyResult<Box<dyn InputReader>>;
76}
77
78#[doc(hidden)]
79pub trait IntegratedInputEndpoint: InputEndpoint {
80 fn open(
81 self: Box<Self>,
82 input_handle: &InputCollectionHandle,
83 resume_info: Option<JsonValue>,
84 ) -> AnyResult<Box<dyn InputReader>>;
85}
86
87/// Commands for an [InputReader] to execute.
88///
89/// # Transitions
90///
91/// The following diagram shows the possible order in which the controller can
92/// issue commands to [InputReader]s:
93///
94/// ```text
95/// ┌─⯇─ (start) ─⯈──┐
96/// │ │ │
97/// │ │ ┌───┐ │
98/// │ ▼ ▼ │ │
99/// ├─⯇─ Replay ──┘ │
100/// │ │ │
101/// │ ▼ │
102/// ├─⯇─ Extend⯇─────┤
103/// │ │ │
104/// │ │ ┌───┐ │
105/// │ ▼ ▼ │ │
106/// ├─⯇─ Queue ──┘ │
107/// │ │ │
108/// │ ▼ │
109/// ├─⯇─ Pause ─⯈────┘
110/// │ │
111/// │ ▼
112/// └───⯈Disconnect
113/// │
114/// ▼
115/// (end)
116/// ```
117///
118/// # Stalls
119///
120/// When the controller issues a [InputReaderCommand::Replay] or
121/// [InputReaderCommand::Queue] command to an input adapter, it waits for the
122/// input adapter to respond to them. Until it receives a reply, the next step
123/// cannot proceed. An input adapter that does not respond to one of these
124/// commands will stall the entire pipeline. However, the controller also uses
125/// [InputReader::is_closed] to detect that an input adapter has died due to an
126/// error or reaching end-of-input, so input adapters for which it is difficult
127/// to handle errors gracefully can report that they have died using
128/// `is_closed`, if necessary, as described in more detail below.
129///
130/// ## End-of-input handling
131///
132/// If an input adapter reaches the end of its input, and it isn't implemented
133/// to wait for and pass along further input, then it should:
134///
135/// - Make sure that it has already indicated that it has buffered all of its
136/// data, via [InputConsumer::buffered].
137///
138/// - Call [InputConsumer::eoi] to indicate that it has reached end of input.
139///
140/// - Respond to [InputReaderCommand::Queue] until it has queued all of its
141/// input and has none left.
142///
143/// - Optionally, at this point, it may exit and start returning `true` from
144/// `InputReader::is_closed`.
145///
146/// ## Error handling
147///
148/// If an input adapter encounters a fatal error that keeps it from continuing
149/// to obtain input, then it should report the error via [InputConsumer::error]
150/// with `true` for `fatal`. Afterward, it may exit and start returning `true`
151/// from `InputReader::is_closed`.
152///
153/// ## Additional requirement
154///
155/// An input adapter should ensure that, if it flushes any records to the
156/// circuit in response to [InputReaderCommand::Replay] or
157/// [InputReaderCommand::Queue], then it finishes up and responds to the
158/// consumer using [InputConsumer::replayed] or [InputConsumer::extended],
159/// respectively. If it instead dies mid-way, then the controller will not
160/// record the step properly and fault tolerance replay will be incorrect.
161#[derive(Debug)]
162pub enum InputReaderCommand {
163 /// Tells the input reader to replay the step described by `metadata` and
164 /// `data` by reading and flushing buffers for the data in the step, and
165 /// then [InputConsumer::replayed] to signal completion.
166 ///
167 /// The input reader should report the data that it queues to
168 /// [InputConsumer::buffered] as it does the replay.
169 ///
170 /// The input reader doesn't have to process other commands while it does
171 /// the replay.
172 ///
173 /// # Constraints
174 ///
175 /// Only fault-tolerant input readers need to accept this. It will be issued
176 /// zero or more times, before any other command.
177 Replay { metadata: JsonValue, data: RmpValue },
178
179 /// Tells the input reader to accept further input. The first time it
180 /// receives this command, the reader should start from the resume point
181 /// passed as `resume_info` when the endpoint was opened, if any, and
182 /// otherwise from the beginning of input.
183 ///
184 /// The input reader should report the data that it queues to
185 /// [InputConsumer::buffered] as it queues it.
186 ///
187 /// # Constraints
188 ///
189 /// The controller will not call this function:
190 ///
191 /// - Twice on a given reader without an intervening
192 /// [InputReaderCommand::Pause].
193 ///
194 /// - If it requested a replay (with [InputReaderCommand::Replay]) and the reader
195 /// hasn't yet reported that the replay is complete.
196 Extend,
197
198 /// Tells the input reader to stop reading more input.
199 ///
200 /// The controller uses this to limit the number of buffered records and to
201 /// respond to user requests to pause the pipeline.
202 ///
203 /// # Constraints
204 ///
205 /// The controller issues this only after a paired
206 /// [InputReaderCommand::Extend].
207 Pause,
208
209 /// Tells the input reader to flush input buffers to the circuit.
210 ///
211 /// The input reader can call [InputConsumer::max_batch_size] to find out
212 /// how many records it should flush. When it's done, it must call
213 /// [InputConsumer::extended] to report it.
214 ///
215 /// The `checkpoint_requested` flag indicates that the controller is trying
216 /// to checkpoint or suspend the pipeline. This serves as a hint to the reader
217 /// to try to clear the checkpoint barrier by returning [Resume::Seek] or
218 /// [Resume::Replay] if possible. For instance, if the reader has multiple
219 /// buffers queued, it can choose to stop flushing them after reaching the first
220 /// buffer that corresponds to a seekable position in the input stream.
221 ///
222 /// # Constraints
223 ///
224 /// The controller won't issue this command before it first issues [InputReaderCommand::Extend].
225 Queue { checkpoint_requested: bool },
226
227 /// Tells the reader it's going to be dropped soon and should clean up.
228 ///
229 /// The reader can continue to queue some data buffers afterward if that's
230 /// the easiest implementation.
231 ///
232 /// # Constraints
233 ///
234 /// The controller calls this only once and won't call any other functions
235 /// for a given reader after it calls this one.
236 Disconnect,
237}
238
239impl InputReaderCommand {
240 /// Returns this command translated to a [NonFtInputReaderCommand], or
241 /// `None` if that is not possible (because this command is only for
242 /// fault-tolerant endpoints).
243 pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
244 match self {
245 InputReaderCommand::Replay { .. } => None,
246 InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
247 InputReaderCommand::Extend => {
248 Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
249 }
250 InputReaderCommand::Pause => {
251 Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
252 }
253 InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
254 PipelineState::Terminated,
255 )),
256 }
257 }
258}
259
260/// A subset of [InputReaderCommand] that only includes the commands for
261/// non-fault-tolerant connectors.
262#[derive(Debug)]
263pub enum NonFtInputReaderCommand {
264 /// Equivalent to [InputReaderCommand::Queue].
265 Queue,
266
267 /// Equivalencies:
268 ///
269 /// - `Transition(PipelineState::Paused)`: [InputReaderCommand::Pause].
270 ///
271 /// - `Transition(PipelineState::Running)`: [InputReaderCommand::Extend].
272 ///
273 /// - `Transition(PipelineState::Terminated)`: [InputReaderCommand::Disconnect].
274 Transition(PipelineState),
275}
276
277#[doc(hidden)]
278pub struct InputQueueEntry<A, B> {
279 /// Data buffer to push to the circuit.
280 buffer: Option<B>,
281
282 /// Time when data in this buffer was received from the transport endpoint.
283 /// It is used to track the processing latency in different stages of the pipeline.
284 timestamp: DateTime<Utc>,
285
286 /// Start a transaction with the given label before pushing the buffer to the circuit
287 /// unless a transaction is already in progress.
288 start_transaction: Option<Option<String>>,
289
290 /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
291 commit_transaction: bool,
292
293 /// Auxiliary data associated with the buffer.
294 aux: A,
295}
296
297impl<A, B> InputQueueEntry<A, B> {
298 #[doc(hidden)]
299 pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
300 Self {
301 buffer: None,
302 timestamp,
303 start_transaction: None,
304 commit_transaction: false,
305 aux,
306 }
307 }
308
309 #[doc(hidden)]
310 pub fn with_buffer(self, buffer: Option<B>) -> Self {
311 Self { buffer, ..self }
312 }
313
314 /// Start a transaction with the given label before pushing the buffer to the circuit
315 /// unless a transaction is already in progress.
316 pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
317 Self {
318 start_transaction,
319 ..self
320 }
321 }
322
323 /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
324 pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
325 Self {
326 commit_transaction,
327 ..self
328 }
329 }
330}
331
332/// A thread-safe queue for collecting and flushing input buffers.
333///
334/// Commonly used by `InputReader` implementations for staging buffers from
335/// worker threads.
336pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
337 #[allow(clippy::type_complexity)]
338 pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
339 pub consumer: Box<dyn InputConsumer>,
340 pub transaction_in_progress: AtomicBool,
341}
342
343impl<A, B: InputBuffer> InputQueue<A, B> {
344 pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
345 Self {
346 queue: Mutex::new(VecDeque::new()),
347 consumer,
348 transaction_in_progress: AtomicBool::new(false),
349 }
350 }
351
352 pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
353 self.consumer.parse_errors(errors);
354 let len = entry
355 .buffer
356 .as_ref()
357 .map_or(BufferSize::empty(), |buffer| buffer.len());
358
359 let mut queue = self.queue.lock().unwrap();
360 queue.push_back(entry);
361 self.consumer.buffered(len);
362
363 // The endpoint pushed an empty buffer. This likely indicates that the accompanying aux data
364 // needs to be processed by the endpoint after preceding buffers have been flushed. However,
365 // since we didn't report any buffered records, the controller may never perform another step,
366 // so we nudge it to do it.
367 if len.records == 0 {
368 self.consumer.request_step();
369 }
370 }
371
372 /// Appends `buffer`, to the queue, and associates it with `aux`. Reports
373 /// to the controller that `errors` have occurred during parsing.
374 pub fn push_with_aux(
375 &self,
376 (buffer, errors): (Option<B>, Vec<ParseError>),
377 timestamp: DateTime<Utc>,
378 aux: A,
379 ) {
380 let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);
381
382 self.push_entry(entry, errors);
383 }
384
385 /// Flushes a batch of records to the circuit and returns the auxiliary data
386 /// that was associated with those records.
387 ///
388 /// This always flushes whole buffers to the circuit (with `flush`),
389 /// since auxiliary data is associated with a whole buffer rather than with
390 /// individual records. If the auxiliary data type `A` is `()`, then
391 /// [InputQueue<()>::flush] avoids that and so is a better choice.
392 #[allow(clippy::type_complexity)]
393 pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
394 self.flush_with_aux_until(&|_| false)
395 }
396
397 /// Flushes a batch of records to the circuit and returns the auxiliary data
398 /// that was associated with those records.
399 ///
400 /// Stops after flushing at least `max_batch_size` records or after flushing a
401 /// buffer whose auxiliary data satisfies the `stop_at` predicate, whichever
402 /// happens first.
403 ///
404 /// This always flushes whole buffers to the circuit (with `flush`),
405 /// since auxiliary data is associated with a whole buffer rather than with
406 /// individual records. If the auxiliary data type `A` is `()`, then
407 /// [InputQueue<()>::flush] avoids that and so is a better choice.
408 #[allow(clippy::type_complexity)]
409 pub fn flush_with_aux_until(
410 &self,
411 stop_at: &dyn Fn(&A) -> bool,
412 ) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
413 let mut total = BufferSize::empty();
414 let mut hasher = self.consumer.hasher();
415 let n = self.consumer.max_batch_size();
416 let mut consumed_aux = Vec::new();
417
418 let mut stop = false;
419
420 while !stop && total.records < n {
421 let Some(InputQueueEntry {
422 buffer,
423 timestamp,
424 aux,
425 start_transaction,
426 commit_transaction,
427 }) = self.queue.lock().unwrap().pop_front()
428 else {
429 break;
430 };
431
432 if let Some(label) = start_transaction {
433 self.start_transaction(label.as_deref());
434 }
435
436 if let Some(mut buffer) = buffer {
437 total += buffer.len();
438 if let Some(hasher) = hasher.as_mut() {
439 buffer.hash(hasher);
440 }
441 buffer.flush();
442 }
443
444 stop = stop_at(&aux);
445 consumed_aux.push((timestamp, aux));
446
447 if commit_transaction && self.commit_transaction() {
448 break;
449 }
450 }
451
452 // Process any entries with aux data only.
453 let mut queue = self.queue.lock().unwrap();
454 while !stop
455 && queue
456 .front()
457 .is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
458 {
459 let Some(InputQueueEntry {
460 timestamp,
461 aux,
462 start_transaction,
463 commit_transaction,
464 ..
465 }) = queue.pop_front()
466 else {
467 break;
468 };
469
470 if let Some(label) = start_transaction {
471 self.start_transaction(label.as_deref());
472 }
473
474 stop = stop_at(&aux);
475 consumed_aux.push((timestamp, aux));
476
477 if commit_transaction && self.commit_transaction() {
478 break;
479 }
480 }
481
482 (total, hasher, consumed_aux)
483 }
484
485 pub fn len(&self) -> usize {
486 self.queue.lock().unwrap().len()
487 }
488
489 pub fn is_empty(&self) -> bool {
490 self.len() == 0
491 }
492
493 fn start_transaction(&self, label: Option<&str>) -> bool {
494 if self
495 .transaction_in_progress
496 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
497 .is_ok()
498 {
499 self.consumer.start_transaction(label);
500 true
501 } else {
502 false
503 }
504 }
505
506 fn commit_transaction(&self) -> bool {
507 if self
508 .transaction_in_progress
509 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
510 .is_ok()
511 {
512 self.consumer.commit_transaction();
513 true
514 } else {
515 false
516 }
517 }
518}
519
520impl InputQueue<(), Box<dyn InputBuffer>> {
521 /// Appends `buffer`, if nonempty,` to the queue. Reports to the controller
522 /// that `errors` occurred during parsing.
523 pub fn push(
524 &self,
525 (buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
526 timestamp: DateTime<Utc>,
527 ) {
528 self.push_with_aux((buffer, errors), timestamp, ())
529 }
530
531 /// Flushes a batch of records to the circuit and reports to the consumer
532 /// that it was done.
533 ///
534 /// Only non-fault-tolerant input adapters can use this.
535 pub fn queue(&self) {
536 let mut total = BufferSize::empty();
537 let n = self.consumer.max_batch_size();
538 let mut consumed = Vec::new();
539
540 while total.records < n {
541 let Some(InputQueueEntry {
542 buffer,
543 timestamp,
544 start_transaction,
545 commit_transaction,
546 ..
547 }) = self.queue.lock().unwrap().pop_front()
548 else {
549 break;
550 };
551
552 if let Some(label) = start_transaction
553 && self
554 .transaction_in_progress
555 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
556 .is_ok()
557 {
558 self.consumer.start_transaction(label.as_deref());
559 }
560
561 if let Some(mut buffer) = buffer {
562 let mut taken = buffer.take_some(n - total.records);
563 total += taken.len();
564 consumed.push(Watermark::new(timestamp, None));
565 taken.flush();
566 drop(taken);
567 if !buffer.is_empty() {
568 self.queue.lock().unwrap().push_front(InputQueueEntry {
569 buffer: Some(buffer),
570 timestamp,
571 start_transaction: None,
572 commit_transaction,
573 aux: (),
574 });
575 break;
576 }
577 }
578
579 if commit_transaction {
580 if self
581 .transaction_in_progress
582 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
583 .is_ok()
584 {
585 self.consumer.commit_transaction();
586 }
587 break;
588 }
589 }
590 self.consumer.extended(total, None, consumed);
591 }
592}
593
594/// Reads data from an endpoint.
595///
596/// Use [`TransportInputEndpoint::open`] to obtain an [`InputReader`].
597pub trait InputReader: Send + Sync {
598 fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
599
600 /// Requests the input reader to execute `command`.
601 fn request(&self, command: InputReaderCommand);
602
603 /// Returns true if the endpoint is closed, meaning that it has already
604 /// acted on all of the commands that it ever will. A closed endpoint can be
605 /// one that came to the end of its input (and is not waiting for more to
606 /// arrive) or one that encountered a fatal error and cannot continue.
607 ///
608 /// An endpoint is often implemented in terms of a channel to a thread. In
609 /// such a case, this can be implemented in terms of `is_closed` on the
610 /// channel's sender.
611 fn is_closed(&self) -> bool;
612
613 fn replay(&self, metadata: JsonValue, data: RmpValue) {
614 self.request(InputReaderCommand::Replay { metadata, data });
615 }
616
617 fn extend(&self) {
618 self.request(InputReaderCommand::Extend);
619 }
620
621 fn pause(&self) {
622 self.request(InputReaderCommand::Pause);
623 }
624
625 fn queue(&self, checkpoint_requested: bool) {
626 self.request(InputReaderCommand::Queue {
627 checkpoint_requested,
628 });
629 }
630
631 fn disconnect(&self) {
632 self.request(InputReaderCommand::Disconnect);
633 }
634
635 /// Returns the approximate amount of memory used by the connector's
636 /// underlying implementation. For the Kafka connectors, for example, this
637 /// is the amount of memory used by librdkafka. Not all connectors use a
638 /// substantial amount of memory, so the default implementation returns 0.
639 fn memory(&self) -> usize {
640 0
641 }
642}
643
644/// Position in an input stream, including the timestamp when the data was ingested
645/// from the transport endpoint and transport-specific metadata such as delta table
646/// version or Kafka partition offsets.
647#[derive(Clone, Debug)]
648pub struct Watermark {
649 pub timestamp: DateTime<Utc>,
650 pub metadata: Option<JsonValue>,
651}
652
653impl Watermark {
654 pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
655 Self {
656 timestamp,
657 metadata,
658 }
659 }
660}
661
662/// Input stream consumer.
663///
664/// A transport endpoint pushes binary data downstream via an instance of this
665/// trait.
666pub trait InputConsumer: Send + Sync + DynClone {
667 /// Returns the maximum number of records that an `InputReader` should queue
668 /// in response to a [InputReaderCommand::Queue] command.
669 ///
670 /// Nothing keeps the endpoint from queuing more than this if necessary (for
671 /// example, if for the sake of lateness it needs to group more than this
672 /// number of records together).
673 fn max_batch_size(&self) -> usize;
674
675 /// Returns the level of fault tolerance that the pipeline supports, if any.
676 ///
677 /// An endpoint only needs to implement `min(endpoint_ft, pipeline_ft)`
678 /// fault tolerance, where `endpoint_ft` is what the endpoint returns from
679 /// `InputEndpoint::fault_tolerance` and `pipeline_ft` is what this function
680 /// returns. For example, if an input adapter supports
681 /// `Some(FtModel::ExactlyOnce)`, but the pipeline's fault tolerance level
682 /// is `None`, then the input adapter can simply pass `None` as `resume` to
683 /// [InputConsumer::extended]. This optimization is, probably, worthwhile
684 /// only to input adapters that log a copy of all of their data, instead of
685 /// just metadata.
686 fn pipeline_fault_tolerance(&self) -> Option<FtModel>;
687
688 /// Returns a hasher, if the fault tolerance model calls for hashing, and
689 /// `None` otherwise.
690 ///
691 /// This is just a convenience method. Connectors can do hashing any way
692 /// they like, as long as they do it the same way for new data and for
693 /// replays.
694 fn hasher(&self) -> Option<Xxh3Default> {
695 match self.pipeline_fault_tolerance() {
696 Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
697 _ => None,
698 }
699 }
700
701 /// Reports `errors` as parse errors.
702 fn parse_errors(&self, errors: Vec<ParseError>);
703
704 /// Reports that the input adapter has internally buffered `amt` records and
705 /// bytes.
706 ///
707 /// Fault-tolerant input adapters should report buffered data during replay
708 /// as well as in normal operation.
709 fn buffered(&self, amt: BufferSize);
710
711 /// Reports that the input adapter has completed flushing `amt` data to the
712 /// circuit, that hash to `hash`, in response to an
713 /// [InputReaderCommand::Replay] request.
714 ///
715 /// Only a fault-tolerant input adapter will invoke this.
716 fn replayed(&self, amt: BufferSize, hash: u64);
717
718 /// Reports that the input adapter has completed flushing `amt` data to the
719 /// circuit, that hash to `hash`, in response to an
720 /// [InputReaderCommand::Queue] request.
721 ///
722 /// If the step is one that the input adapter can restart after, or replay,
723 /// then it should supply that as `resume` (see [Resume] for details).
724 fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);
725
726 /// Reports that the endpoint has reached end of input and that no more data
727 /// will be received from the endpoint.
728 ///
729 /// If the endpoint has already indicated that it has buffered records then
730 /// the controller will request them in future [InputReaderCommand::Queue]
731 /// messages. The endpoint must not make further calls to
732 /// [InputConsumer::buffered] or [InputConsumer::parse_errors].
733 fn eoi(&self);
734
735 /// Request the controller to schedule a step even if the connector hasn't queued
736 /// any records.
737 fn request_step(&self);
738
739 /// The connector is initiating a transaction. `label` is an optional label for
740 /// the transaction for debugging purposes.
741 ///
742 /// This function can be invoked in response to a `Queue` command.
743 ///
744 /// Any updates pushed by the connector after this function is invoked will be part
745 /// of the transaction.
746 ///
747 /// The connector _must_ perform a matching call to `commit_transaction` to commit the
748 /// transaction.
749 ///
750 /// Multiple connectors can initiate a transaction concurrently, in which case their
751 /// updates will be combined into a single transaction. The transaction will be committed
752 /// when all the connectors have committed it.
753 fn start_transaction(&self, label: Option<&str>);
754
755 /// The connector is committing a transaction started by a previous `start_transaction` call.
756 ///
757 /// This function can be invoked in response to a `Queue` command after pushing all updates that
758 /// belong to the the transaction, immediately before calling `extended`. The connector cannot
759 /// queue any more updates after this function is invoked, until the next `Queue` command.
760 fn commit_transaction(&self);
761
762 /// Register connector-specific metrics for Prometheus export.
763 ///
764 /// A connector may call this once during [`TransportInputEndpoint::open`]
765 /// to provide an [`Arc<dyn ConnectorMetrics>`] whose [`ConnectorMetrics::metrics`]
766 /// will be polled on every scrape. The default implementation is a no-op,
767 /// so connectors that have no custom metrics need not override it.
768 fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}
769
770 /// Returns a watch receiver that fires each time pipeline step processing
771 /// completes. The value is the count of fully-processed steps
772 /// (`total_completed_steps`): a record ingested in step `n` is done when
773 /// the value exceeds `n`.
774 ///
775 /// Input adapters can use this to defer acknowledgment (e.g. a CDC
776 /// connector holding a replication slot) until their data has been
777 /// processed by the circuit and all output connectors.
778 ///
779 /// Returns `None` if the consumer does not support completion tracking.
780 fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;
781
782 /// Returns a watch receiver that fires each time a durable checkpoint
783 /// completes. The value is the count of checkpointed steps: a record
784 /// ingested in step `n` is durably stored when the value exceeds `n`.
785 ///
786 /// Input adapters that require at-least-once delivery stronger than step
787 /// completion (e.g. a CDC connector that must not advance its replication
788 /// slot past the last checkpoint) can wait on this rather than on
789 /// [`completion_watcher`][Self::completion_watcher].
790 ///
791 /// Returns `Some` only when fault tolerance is enabled for the pipeline
792 /// (which implies storage is configured and checkpoints are scheduled).
793 /// Returns `None` otherwise, in which case the connector should fall back
794 /// to [`completion_watcher`][Self::completion_watcher].
795 fn checkpoint_watcher(&self) -> Option<tokio::sync::watch::Receiver<u64>> {
796 None
797 }
798
799 /// Endpoint failed.
800 ///
801 /// Reports that the endpoint failed and that it will not queue any more
802 /// data.
803 ///
804 /// Optional tag that can be used for additional context
805 /// e.g. for rate limiting
806 fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);
807
808 /// Updates the health status of the connector.
809 fn update_connector_health(&self, health: ConnectorHealth);
810}
811
812/// Information needed to restart after or replay input.
813///
814/// Feldera supports a few ways to checkpoint and resume a pipeline. These
815/// operations in turn require support from the pipeline's input adapters:
816///
817/// 1. To support suspend and resume, or at-least-once fault tolerance, the
818/// input adapter must indicate, per step, how to restart from just after
819/// that step, by passing `Some(Resume::*)` to [InputConsumer::extended].
820///
821/// Such input adapters might have steps for which seeking would be
822/// impractical. Such an input adapter may skip over those steps by passing
823/// `Some(Resume::Barrier)` instead; the controller will not try to
824/// checkpoint after them.
825///
826/// 2. To additionally support exactly once fault tolerance, the input adapter
827/// must indicate, per step, both how to restart after the step and how to
828/// replay exactly that step, by passing `Some(Resume::Replay { .. })` to
829/// [InputConsumer::extended].
830///
831/// An input adapter that supports fault tolerance may not skip steps; that
832/// is, it must supply `Some(Resume::Replay { .. })` for every step.
833#[derive(Clone, Debug)]
834pub enum Resume {
835 /// The input adapter does not support resuming after this step.
836 Barrier,
837
838 /// The input adapter can resume just after this step, but it can't replay
839 /// the step exactly.
840 Seek {
841 /// Metadata needed for the controller to restart the input adapter from
842 /// just after this input step.
843 seek: JsonValue,
844 },
845
846 /// The input adapter can replay this step exactly, or resume just after the
847 /// step.
848 ///
849 /// Input adapters can use `seek` and `replay` in different combinations:
850 ///
851 /// - Some kinds of input adapters, for example the ones for files, or for
852 /// Kafka, can reread the input data that they used before. These will
853 /// ordinarily just use `seek`, filling it with a pointer just past the
854 /// end of the data to be read. (Ordinarily, it would already know where
855 /// the start is from the previous step, so the start pointer isn't
856 /// usually needed.)
857 ///
858 /// These input adapters can just set `replay` to [RmpValue::Nil].
859 ///
860 /// - Other kinds of input connectors can't seek back and reread the input
861 /// data that they used before. The best example is the HTTP input
862 /// connector, because which can't ask whatever client connected before to
863 /// repeat the same exact data that it input before. These input
864 /// connectors have to save all the input data for replay, by putting into
865 /// the `replay` field.
866 ///
867 /// These input adapters can just set `seek` to [JsonValue::Null].
868 ///
869 /// (In theory, any input connector could substitute data for metadata,
870 /// but if the data can simply be reread using the metadata, we usually
871 /// consider that better because it saves time and space saving all the
872 /// data when in most cases it will never be reread.)
873 Replay {
874 /// Metadata needed for the controller to restart the input adapter from
875 /// just after this input step.
876 seek: JsonValue,
877
878 /// The data needed for the controller to replay exactly this input step
879 /// using [InputReaderCommand::Replay].
880 replay: RmpValue,
881
882 /// Hash of the input records in this step, for verification on replay.
883 ///
884 /// The input adapter can compute this in any way convenient to it, as
885 /// long as it does so the same way for reading data initially and on
886 /// replay. On replay, the controller checks that the replayed value
887 /// matches the original one and fails the circuit if it differs.
888 hash: u64,
889 },
890}
891
892impl Resume {
893 pub fn is_barrier(&self) -> bool {
894 matches!(self, Self::Barrier)
895 }
896
897 /// Returns the `seek` value, if any, in this [Resume].
898 pub fn seek(&self) -> Option<&JsonValue> {
899 match self {
900 Resume::Barrier => None,
901 Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
902 }
903 }
904
905 /// Consumes this [Resume] and returns just the `seek` value, if any.
906 pub fn into_seek(self) -> Option<JsonValue> {
907 match self {
908 Resume::Barrier => None,
909 Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
910 }
911 }
912
913 /// Returns the maximum fault tolerance level that this [Resume] can
914 /// support.
915 pub fn fault_tolerance(&self) -> FtModel {
916 match self {
917 &Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
918 Resume::Replay { .. } => FtModel::ExactlyOnce,
919 }
920 }
921
922 /// If `hash` is provided, returns `Resume::Replay` with its hash value and
923 /// `seek`; otherwise, returns `Resume::Seek` with `seek`.
924 ///
925 /// This is convenient for endpoints that only need to use metadata to
926 /// support journaling. [InputConsumer::hasher] can be a convenient way to
927 /// get a hasher.
928 pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
929 match hash {
930 Some(hash) => Self::Replay {
931 seek,
932 replay: RmpValue::Nil,
933 hash,
934 },
935 None => Self::Seek { seek },
936 }
937 }
938
939 /// If `hash` is provided, returns `Resume::Replay` with its hash value and
940 /// whatever `replay` returns; otherwise, returns `Resume::Seek`.
941 ///
942 /// This is convenient for endpoints that support journaling by journaling
943 /// all the data (and that don't need to journal any metadata).
944 /// [InputConsumer::hasher] can be a convenient way to get a hasher.
945 pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
946 where
947 F: FnOnce() -> RmpValue,
948 {
949 let seek = JsonValue::Null;
950 match hash {
951 Some(hash) => Self::Replay {
952 seek,
953 replay: replay(),
954 hash,
955 },
956 None => Self::Seek { seek },
957 }
958 }
959}
960
961dyn_clone::clone_trait_object!(InputConsumer);
962
963/// Helper function to parse resume info passed to [`InputConsumer::extended`].
964pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
965where
966 M: DeserializeOwned,
967{
968 serde_json_path_to_error::from_value::<M>(metadata.clone())
969 .map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
970}
971
972#[doc(hidden)]
973pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;
974
975/// Command handler API exposed by connectors.
976///
977/// Connectors can support arbitrary connector-specific commands that can be
978/// invoked via the `/command` endpoint. These commands take and return arbitrary
979/// JSON values.
980///
981/// This API is not part of trait `Output[Input]Endpoint` because it can be invoked
982/// from any thread, and requires `Send + Sync`, while the `OutputEndpoint` API is
983/// not `Sync` and is meant to be called from the controller thread only.
984///
985/// The idea is that connectors that support custom commands create separate command
986/// handler objects that implement this trait and are returned by
987/// `OutputEndpoint::command_handler`.
988pub trait CommandHandler: Send + Sync {
989 /// Handle a command specified by the JSON objest.
990 ///
991 /// Fails if the connector does not support the command, the command is invalid,
992 /// or command execution fails.
993 fn command(&self, command: serde_json::Value) -> AnyResult<serde_json::Value>;
994}
995
996/// Distinguishes a full-materialized-view snapshot from an incremental delta
997/// when pushed to an output connector.
998#[derive(Debug, Clone, Copy, Eq, PartialEq)]
999pub enum OutputBatchType {
1000 Delta,
1001 Snapshot,
1002}
1003
1004/// A configured output transport endpoint.
1005///
1006/// Output endpoints come in two flavors:
1007///
1008/// * A [fault-tolerant](crate#fault-tolerance) endpoint accepts output that has
1009/// been divided into numbered steps. If it is given output associated with a
1010/// step number that has already been output, then it discards the duplicate.
1011/// It must also keep data written to the output transport from becoming
1012/// visible to downstream readers until `batch_end` is called. (This works
1013/// for output to Kafka, which supports transactional output. If it is
1014/// difficult for some future fault-tolerant output endpoint, then the API
1015/// could be adjusted to support writing output only after it can become
1016/// immediately visible.)
1017///
1018/// * A non-fault-tolerant endpoint does not have a concept of steps and ignores
1019/// them.
1020pub trait OutputEndpoint: Send {
1021 fn command_handler(&self) -> Option<Arc<dyn CommandHandler>> {
1022 None
1023 }
1024
1025 /// Finishes establishing the connection to the output endpoint.
1026 ///
1027 /// If the endpoint encounters any errors during output, now or later, it
1028 /// invokes `async_error_callback` to notify the client about asynchronous
1029 /// errors, i.e., errors that happen outside the context of the
1030 /// [`OutputEndpoint::push_buffer`] method. For instance, a reliable message
1031 /// bus like Kafka may notify the endpoint about a failure to deliver a
1032 /// previously sent message via an async callback. If the endpoint is unable
1033 /// to handle this error, it must forward it to the client via the
1034 /// `async_error_callback`. The first argument of the callback is a flag
1035 /// that indicates a fatal error that the endpoint cannot recover from.
1036 fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;
1037
1038 /// Maximum buffer size that this transport can transmit.
1039 /// The encoder should not generate buffers exceeding this size.
1040 fn max_buffer_size_bytes(&self) -> usize;
1041
1042 /// Notifies the output endpoint that data subsequently written by
1043 /// `push_buffer` belong to the given `step`.
1044 ///
1045 /// A [fault-tolerant](crate#fault-tolerance) endpoint has additional
1046 /// requirements:
1047 ///
1048 /// 1. If data for the given step has been written before, the endpoint
1049 /// should discard it.
1050 ///
1051 /// 2. The output batch must not be made visible to downstream readers
1052 /// before the next call to `batch_end`.
1053 fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) -> AnyResult<()> {
1054 Ok(())
1055 }
1056
1057 fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;
1058
1059 /// Output a message consisting of a key/value pair, with optional headers.
1060 ///
1061 /// This API is implemented by Kafka and other transports that transmit
1062 /// messages consisting of key and value fields and is invoked by
1063 /// Kafka-specific data formats that rely on this message structure,
1064 /// e.g., Debezium. If a given transport does not implement this API, it
1065 /// should return an error.
1066 ///
1067 /// `headers` contains a list of key/optional_value pairs to be appended
1068 /// to Kafka message headers.
1069 fn push_key(
1070 &mut self,
1071 key: Option<&[u8]>,
1072 val: Option<&[u8]>,
1073 headers: &[(&str, Option<&[u8]>)],
1074 ) -> AnyResult<()>;
1075
1076 /// Notifies the output endpoint that output for the current step is
1077 /// complete.
1078 ///
1079 /// A fault-tolerant output endpoint may now make the output batch visible
1080 /// to readers.
1081 fn batch_end(&mut self) -> AnyResult<()> {
1082 Ok(())
1083 }
1084
1085 /// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
1086 fn is_fault_tolerant(&self) -> bool;
1087
1088 /// Returns the approximate amount of memory used by the connector's
1089 /// underlying implementation. For the Kafka connectors, for example, this
1090 /// is the amount of memory used by librdkafka. Not all connectors use a
1091 /// substantial amount of memory, so the default implementation returns 0.
1092 fn memory(&self) -> usize {
1093 0
1094 }
1095}
1096
1097/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.
1098///
1099/// A fault-tolerant connector wants to receive, in order:
1100///
1101/// - Zero or more [InputReaderCommand::Replay]s.
1102///
1103/// - Zero or more other commands.
1104///
1105/// This helps with that.
1106// This is used by Kafka and Nexmark but both of those are optional.
1107pub struct InputCommandReceiver<M, D> {
1108 receiver: UnboundedReceiver<InputReaderCommand>,
1109 buffer: Option<InputReaderCommand>,
1110 _phantom: PhantomData<(M, D)>,
1111}
1112
1113/// Error type returned by some [InputCommandReceiver] methods.
1114///
1115/// We could just use `anyhow` and that would probably be just as good though.
1116#[derive(Debug)]
1117pub enum InputCommandReceiverError {
1118 Disconnected,
1119 JsonDecodeError(serde_json_path_to_error::Error),
1120 RmpDecodeError(RmpDecodeError),
1121}
1122
1123impl std::error::Error for InputCommandReceiverError {}
1124
1125impl Display for InputCommandReceiverError {
1126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1127 match self {
1128 InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
1129 InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
1130 InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
1131 }
1132 }
1133}
1134
1135impl From<RmpDecodeError> for InputCommandReceiverError {
1136 fn from(value: RmpDecodeError) -> Self {
1137 Self::RmpDecodeError(value)
1138 }
1139}
1140
1141impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
1142 fn from(value: serde_json_path_to_error::Error) -> Self {
1143 Self::JsonDecodeError(value)
1144 }
1145}
1146
1147// This is used by Kafka and Nexmark but both of those are optional.
1148impl<M, D> InputCommandReceiver<M, D> {
1149 pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
1150 Self {
1151 receiver,
1152 buffer: None,
1153 _phantom: PhantomData,
1154 }
1155 }
1156
1157 #[doc(hidden)]
1158 pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1159 where
1160 M: for<'a> Deserialize<'a>,
1161 D: for<'a> Deserialize<'a>,
1162 {
1163 let command = self.blocking_recv()?;
1164 self.take_replay(command)
1165 }
1166
1167 #[doc(hidden)]
1168 pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
1169 where
1170 M: for<'a> Deserialize<'a>,
1171 D: for<'a> Deserialize<'a>,
1172 {
1173 let command = self.recv().await?;
1174 self.take_replay(command)
1175 }
1176
1177 fn take_replay(
1178 &mut self,
1179 command: InputReaderCommand,
1180 ) -> Result<Option<(M, D)>, InputCommandReceiverError>
1181 where
1182 M: for<'a> Deserialize<'a>,
1183 D: for<'a> Deserialize<'a>,
1184 {
1185 match command {
1186 InputReaderCommand::Replay { metadata, data } => Ok(Some((
1187 serde_json_path_to_error::from_value::<M>(metadata)?,
1188 rmpv::ext::from_value::<D>(data)?,
1189 ))),
1190 other => {
1191 self.put_back(other);
1192 Ok(None)
1193 }
1194 }
1195 }
1196
1197 #[doc(hidden)]
1198 pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1199 match self.buffer.take() {
1200 Some(value) => Ok(value),
1201 None => self
1202 .receiver
1203 .recv()
1204 .await
1205 .ok_or(InputCommandReceiverError::Disconnected),
1206 }
1207 }
1208
1209 #[doc(hidden)]
1210 pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
1211 match self.buffer.take() {
1212 Some(value) => Ok(value),
1213 None => self
1214 .receiver
1215 .blocking_recv()
1216 .ok_or(InputCommandReceiverError::Disconnected),
1217 }
1218 }
1219
1220 #[doc(hidden)]
1221 pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
1222 if let Some(command) = self.buffer.take() {
1223 Ok(Some(command))
1224 } else {
1225 match self.receiver.try_recv() {
1226 Ok(command) => Ok(Some(command)),
1227 Err(TryRecvError::Empty) => Ok(None),
1228 Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
1229 }
1230 }
1231 }
1232
1233 #[doc(hidden)]
1234 pub fn put_back(&mut self, value: InputReaderCommand) {
1235 assert!(self.buffer.is_none());
1236 self.buffer = Some(value);
1237 }
1238}