feldera-adapterlib 0.304.0

Connector support for the Feldera streaming engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
use anyhow::{Error as AnyError, Result as AnyResult};
use chrono::{DateTime, Utc};
use dyn_clone::DynClone;
use feldera_types::adapter_stats::ConnectorHealth;
use feldera_types::config::FtModel;
use feldera_types::coordination::Completion;
use feldera_types::program_schema::Relation;
use rmpv::{Value as RmpValue, ext::Error as RmpDecodeError};
use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
use std::collections::VecDeque;
use std::fmt::Display;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::error::TryRecvError;
use xxhash_rust::xxh3::Xxh3Default;

use crate::PipelineState;
use crate::catalog::InputCollectionHandle;
use crate::format::{BufferSize, InputBuffer, ParseError, Parser};
use crate::metrics::ConnectorMetrics;

/// Step number for fault-tolerant circuits.
///
/// The step number increases by 1 each time the circuit runs; that is, it
/// tracks the global clock for the outermost circuit.  The first step is
/// numbered zero.
///
/// A [fault-tolerant](crate#fault-tolerance) output transport divides output
/// into steps numbered sequentially.  If a given step is written multiple
/// times, the endpoint must discard the later writes.
pub type Step = u64;

/// A configured input endpoint.
pub trait InputEndpoint: Send {
    /// This endpoint's level of fault tolerance, if any:
    ///
    /// - An endpoint that returns `None` does not support suspend and resume or
    ///   any kind of fault tolerance and has no further constraints.
    ///
    /// - An endpoint that returns `Some(FtModel::AtLeastOnce)` can support
    ///   suspend and resume and at-least-once fault tolerance.  Such an
    ///   endpoint must pass `Some(Resume::*)` to [InputConsumer::extended] for
    ///   at least some steps (see [Resume] for details).
    ///
    /// - An endpoint that returns `Some(FtModel::ExactlyOnce)` can support
    ///   suspend and resume, at-least-once fault tolerance, and exactly once
    ///   fault tolerance.  Such an endpoint must pass `Some(Resume::Replay
    ///   {..})` to [InputConsumer::extended] for every step (see [Resume] for
    ///   details).
    fn fault_tolerance(&self) -> Option<FtModel>;
}

pub trait TransportInputEndpoint: InputEndpoint {
    /// Creates a new input endpoint. The endpoint should use `parser` to parse
    /// data into records. Returns an [`InputReader`] for reading the endpoint's
    /// data.  The endpoint will use `consumer` to report its progress.
    ///
    /// The `resume_info` parameter is used when resuming the pipeline from a
    /// checkpoint. It contains resume metadata that the endpoint returned via the
    /// [`InputConsumer::extended`] function before suspending. When specified,
    /// it tells a fault-tolerant input reader to seek past the data already read
    /// in the step whose metadata is given by the value.
    ///
    /// The reader is initially paused.
    fn open(
        &self,
        consumer: Box<dyn InputConsumer>,
        parser: Box<dyn Parser>,
        schema: Relation,
        resume_info: Option<JsonValue>,
    ) -> AnyResult<Box<dyn InputReader>>;
}

#[doc(hidden)]
pub trait IntegratedInputEndpoint: InputEndpoint {
    fn open(
        self: Box<Self>,
        input_handle: &InputCollectionHandle,
        resume_info: Option<JsonValue>,
    ) -> AnyResult<Box<dyn InputReader>>;
}

/// Commands for an [InputReader] to execute.
///
/// # Transitions
///
/// The following diagram shows the possible order in which the controller can
/// issue commands to [InputReader]s:
///
/// ```text
///   ┌─⯇─ (start) ─⯈──┐
///   │      │         │
///   │      │  ┌───┐  │
///   │      ▼  ▼   │  │
///   ├─⯇─ Replay ──┘  │
///   │      │         │
///   │      ▼         │
///   ├─⯇─ Extend⯇─────┤
///   │      │         │
///   │      │ ┌───┐   │
///   │      ▼ ▼   │   │
///   ├─⯇─ Queue ──┘   │
///   │      │         │
///   │      ▼         │
///   ├─⯇─ Pause ─⯈────┘
///   │      │
///   │      ▼
///   └───⯈Disconnect
/////////        (end)
/// ```
///
/// # Stalls
///
/// When the controller issues a [InputReaderCommand::Replay] or
/// [InputReaderCommand::Queue] command to an input adapter, it waits for the
/// input adapter to respond to them.  Until it receives a reply, the next step
/// cannot proceed. An input adapter that does not respond to one of these
/// commands will stall the entire pipeline.  However, the controller also uses
/// [InputReader::is_closed] to detect that an input adapter has died due to an
/// error or reaching end-of-input, so input adapters for which it is difficult
/// to handle errors gracefully can report that they have died using
/// `is_closed`, if necessary, as described in more detail below.
///
/// ## End-of-input handling
///
/// If an input adapter reaches the end of its input, and it isn't implemented
/// to wait for and pass along further input, then it should:
///
/// - Make sure that it has already indicated that it has buffered all of its
///   data, via [InputConsumer::buffered].
///
/// - Call [InputConsumer::eoi] to indicate that it has reached end of input.
///
/// - Respond to [InputReaderCommand::Queue] until it has queued all of its
///   input and has none left.
///
/// - Optionally, at this point, it may exit and start returning `true` from
///   `InputReader::is_closed`.
///
/// ## Error handling
///
/// If an input adapter encounters a fatal error that keeps it from continuing
/// to obtain input, then it should report the error via [InputConsumer::error]
/// with `true` for `fatal`.  Afterward, it may exit and start returning `true`
/// from `InputReader::is_closed`.
///
/// ## Additional requirement
///
/// An input adapter should ensure that, if it flushes any records to the
/// circuit in response to [InputReaderCommand::Replay] or
/// [InputReaderCommand::Queue], then it finishes up and responds to the
/// consumer using [InputConsumer::replayed] or [InputConsumer::extended],
/// respectively.  If it instead dies mid-way, then the controller will not
/// record the step properly and fault tolerance replay will be incorrect.
#[derive(Debug)]
pub enum InputReaderCommand {
    /// Tells the input reader to replay the step described by `metadata` and
    /// `data` by reading and flushing buffers for the data in the step, and
    /// then [InputConsumer::replayed] to signal completion.
    ///
    /// The input reader should report the data that it queues to
    /// [InputConsumer::buffered] as it does the replay.
    ///
    /// The input reader doesn't have to process other commands while it does
    /// the replay.
    ///
    /// # Constraints
    ///
    /// Only fault-tolerant input readers need to accept this. It will be issued
    /// zero or more times, before any other command.
    Replay { metadata: JsonValue, data: RmpValue },

    /// Tells the input reader to accept further input. The first time it
    /// receives this command, the reader should start from the resume point
    /// passed as `resume_info` when the endpoint was opened, if any, and
    /// otherwise from the beginning of input.
    ///
    /// The input reader should report the data that it queues to
    /// [InputConsumer::buffered] as it queues it.
    ///
    /// # Constraints
    ///
    /// The controller will not call this function:
    ///
    /// - Twice on a given reader without an intervening
    ///   [InputReaderCommand::Pause].
    ///
    /// - If it requested a replay (with [InputReaderCommand::Replay]) and the reader
    ///   hasn't yet reported that the replay is complete.
    Extend,

    /// Tells the input reader to stop reading more input.
    ///
    /// The controller uses this to limit the number of buffered records and to
    /// respond to user requests to pause the pipeline.
    ///
    /// # Constraints
    ///
    /// The controller issues this only after a paired
    /// [InputReaderCommand::Extend].
    Pause,

    /// Tells the input reader to flush input buffers to the circuit.
    ///
    /// The input reader can call [InputConsumer::max_batch_size] to find out
    /// how many records it should flush. When it's done, it must call
    /// [InputConsumer::extended] to report it.
    ///
    /// The `checkpoint_requested` flag indicates that the controller is trying
    /// to checkpoint or suspend the pipeline. This serves as a hint to the reader
    /// to try to clear the checkpoint barrier by returning [Resume::Seek] or
    /// [Resume::Replay] if possible. For instance, if the reader has multiple
    /// buffers queued, it can choose to stop flushing them after reaching the first
    /// buffer that corresponds to a seekable position in the input stream.
    ///
    /// # Constraints
    ///
    /// The controller won't issue this command before it first issues [InputReaderCommand::Extend].
    Queue { checkpoint_requested: bool },

    /// Tells the reader it's going to be dropped soon and should clean up.
    ///
    /// The reader can continue to queue some data buffers afterward if that's
    /// the easiest implementation.
    ///
    /// # Constraints
    ///
    /// The controller calls this only once and won't call any other functions
    /// for a given reader after it calls this one.
    Disconnect,
}

impl InputReaderCommand {
    /// Returns this command translated to a [NonFtInputReaderCommand], or
    /// `None` if that is not possible (because this command is only for
    /// fault-tolerant endpoints).
    pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand> {
        match self {
            InputReaderCommand::Replay { .. } => None,
            InputReaderCommand::Queue { .. } => Some(NonFtInputReaderCommand::Queue),
            InputReaderCommand::Extend => {
                Some(NonFtInputReaderCommand::Transition(PipelineState::Running))
            }
            InputReaderCommand::Pause => {
                Some(NonFtInputReaderCommand::Transition(PipelineState::Paused))
            }
            InputReaderCommand::Disconnect => Some(NonFtInputReaderCommand::Transition(
                PipelineState::Terminated,
            )),
        }
    }
}

/// A subset of [InputReaderCommand] that only includes the commands for
/// non-fault-tolerant connectors.
#[derive(Debug)]
pub enum NonFtInputReaderCommand {
    /// Equivalent to [InputReaderCommand::Queue].
    Queue,

    /// Equivalencies:
    ///
    /// - `Transition(PipelineState::Paused)`: [InputReaderCommand::Pause].
    ///
    /// - `Transition(PipelineState::Running)`: [InputReaderCommand::Extend].
    ///
    /// - `Transition(PipelineState::Terminated)`: [InputReaderCommand::Disconnect].
    Transition(PipelineState),
}

#[doc(hidden)]
pub struct InputQueueEntry<A, B> {
    /// Data buffer to push to the circuit.
    buffer: Option<B>,

    /// Time when data in this buffer was received from the transport endpoint.
    /// It is used to track the processing latency in different stages of the pipeline.
    timestamp: DateTime<Utc>,

    /// Start a transaction with the given label before pushing the buffer to the circuit
    /// unless a transaction is already in progress.
    start_transaction: Option<Option<String>>,

    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
    commit_transaction: bool,

    /// Auxiliary data associated with the buffer.
    aux: A,
}

impl<A, B> InputQueueEntry<A, B> {
    #[doc(hidden)]
    pub fn new_with_aux(timestamp: DateTime<Utc>, aux: A) -> Self {
        Self {
            buffer: None,
            timestamp,
            start_transaction: None,
            commit_transaction: false,
            aux,
        }
    }

    #[doc(hidden)]
    pub fn with_buffer(self, buffer: Option<B>) -> Self {
        Self { buffer, ..self }
    }

    /// Start a transaction with the given label before pushing the buffer to the circuit
    /// unless a transaction is already in progress.
    pub fn with_start_transaction(self, start_transaction: Option<Option<String>>) -> Self {
        Self {
            start_transaction,
            ..self
        }
    }

    /// Commit the transaction after pushing the buffer to the circuit if there is a transaction in progress.
    pub fn with_commit_transaction(self, commit_transaction: bool) -> Self {
        Self {
            commit_transaction,
            ..self
        }
    }
}

/// A thread-safe queue for collecting and flushing input buffers.
///
/// Commonly used by `InputReader` implementations for staging buffers from
/// worker threads.
pub struct InputQueue<A = (), B = Box<dyn InputBuffer>> {
    #[allow(clippy::type_complexity)]
    pub queue: Mutex<VecDeque<InputQueueEntry<A, B>>>,
    pub consumer: Box<dyn InputConsumer>,
    pub transaction_in_progress: AtomicBool,
}

impl<A, B: InputBuffer> InputQueue<A, B> {
    pub fn new(consumer: Box<dyn InputConsumer>) -> Self {
        Self {
            queue: Mutex::new(VecDeque::new()),
            consumer,
            transaction_in_progress: AtomicBool::new(false),
        }
    }

    pub fn push_entry(&self, entry: InputQueueEntry<A, B>, errors: Vec<ParseError>) {
        self.consumer.parse_errors(errors);
        let len = entry
            .buffer
            .as_ref()
            .map_or(BufferSize::empty(), |buffer| buffer.len());

        let mut queue = self.queue.lock().unwrap();
        queue.push_back(entry);
        self.consumer.buffered(len);

        // The endpoint pushed an empty buffer. This likely indicates that the accompanying aux data
        // needs to be processed by the endpoint after preceding buffers have been flushed. However,
        // since we didn't report any buffered records, the controller may never perform another step,
        // so we nudge it to do it.
        if len.records == 0 {
            self.consumer.request_step();
        }
    }

    /// Appends `buffer`, to the queue, and associates it with `aux`.  Reports
    /// to the controller that `errors` have occurred during parsing.
    pub fn push_with_aux(
        &self,
        (buffer, errors): (Option<B>, Vec<ParseError>),
        timestamp: DateTime<Utc>,
        aux: A,
    ) {
        let entry = InputQueueEntry::new_with_aux(timestamp, aux).with_buffer(buffer);

        self.push_entry(entry, errors);
    }

    /// Flushes a batch of records to the circuit and returns the auxiliary data
    /// that was associated with those records.
    ///
    /// This always flushes whole buffers to the circuit (with `flush`),
    /// since auxiliary data is associated with a whole buffer rather than with
    /// individual records. If the auxiliary data type `A` is `()`, then
    /// [InputQueue<()>::flush] avoids that and so is a better choice.
    #[allow(clippy::type_complexity)]
    pub fn flush_with_aux(&self) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
        self.flush_with_aux_until(&|_| false)
    }

    /// Flushes a batch of records to the circuit and returns the auxiliary data
    /// that was associated with those records.
    ///
    /// Stops after flushing at least `max_batch_size` records or after flushing a
    /// buffer whose auxiliary data satisfies the `stop_at` predicate, whichever
    /// happens first.
    ///
    /// This always flushes whole buffers to the circuit (with `flush`),
    /// since auxiliary data is associated with a whole buffer rather than with
    /// individual records. If the auxiliary data type `A` is `()`, then
    /// [InputQueue<()>::flush] avoids that and so is a better choice.
    #[allow(clippy::type_complexity)]
    pub fn flush_with_aux_until(
        &self,
        stop_at: &dyn Fn(&A) -> bool,
    ) -> (BufferSize, Option<Xxh3Default>, Vec<(DateTime<Utc>, A)>) {
        let mut total = BufferSize::empty();
        let mut hasher = self.consumer.hasher();
        let n = self.consumer.max_batch_size();
        let mut consumed_aux = Vec::new();

        let mut stop = false;

        while !stop && total.records < n {
            let Some(InputQueueEntry {
                buffer,
                timestamp,
                aux,
                start_transaction,
                commit_transaction,
            }) = self.queue.lock().unwrap().pop_front()
            else {
                break;
            };

            if let Some(label) = start_transaction {
                self.start_transaction(label.as_deref());
            }

            if let Some(mut buffer) = buffer {
                total += buffer.len();
                if let Some(hasher) = hasher.as_mut() {
                    buffer.hash(hasher);
                }
                buffer.flush();
            }

            stop = stop_at(&aux);
            consumed_aux.push((timestamp, aux));

            if commit_transaction && self.commit_transaction() {
                break;
            }
        }

        // Process any entries with aux data only.
        let mut queue = self.queue.lock().unwrap();
        while !stop
            && queue
                .front()
                .is_some_and(|InputQueueEntry { buffer, .. }| buffer.is_none())
        {
            let Some(InputQueueEntry {
                timestamp,
                aux,
                start_transaction,
                commit_transaction,
                ..
            }) = queue.pop_front()
            else {
                break;
            };

            if let Some(label) = start_transaction {
                self.start_transaction(label.as_deref());
            }

            stop = stop_at(&aux);
            consumed_aux.push((timestamp, aux));

            if commit_transaction && self.commit_transaction() {
                break;
            }
        }

        (total, hasher, consumed_aux)
    }

    pub fn len(&self) -> usize {
        self.queue.lock().unwrap().len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    fn start_transaction(&self, label: Option<&str>) -> bool {
        if self
            .transaction_in_progress
            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
            .is_ok()
        {
            self.consumer.start_transaction(label);
            true
        } else {
            false
        }
    }

    fn commit_transaction(&self) -> bool {
        if self
            .transaction_in_progress
            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
            .is_ok()
        {
            self.consumer.commit_transaction();
            true
        } else {
            false
        }
    }
}

impl InputQueue<(), Box<dyn InputBuffer>> {
    /// Appends `buffer`, if nonempty,` to the queue.  Reports to the controller
    /// that `errors` occurred during parsing.
    pub fn push(
        &self,
        (buffer, errors): (Option<Box<dyn InputBuffer>>, Vec<ParseError>),
        timestamp: DateTime<Utc>,
    ) {
        self.push_with_aux((buffer, errors), timestamp, ())
    }

    /// Flushes a batch of records to the circuit and reports to the consumer
    /// that it was done.
    ///
    /// Only non-fault-tolerant input adapters can use this.
    pub fn queue(&self) {
        let mut total = BufferSize::empty();
        let n = self.consumer.max_batch_size();
        let mut consumed = Vec::new();

        while total.records < n {
            let Some(InputQueueEntry {
                buffer,
                timestamp,
                start_transaction,
                commit_transaction,
                ..
            }) = self.queue.lock().unwrap().pop_front()
            else {
                break;
            };

            if let Some(label) = start_transaction
                && self
                    .transaction_in_progress
                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
                    .is_ok()
            {
                self.consumer.start_transaction(label.as_deref());
            }

            if let Some(mut buffer) = buffer {
                let mut taken = buffer.take_some(n - total.records);
                total += taken.len();
                consumed.push(Watermark::new(timestamp, None));
                taken.flush();
                drop(taken);
                if !buffer.is_empty() {
                    self.queue.lock().unwrap().push_front(InputQueueEntry {
                        buffer: Some(buffer),
                        timestamp,
                        start_transaction: None,
                        commit_transaction,
                        aux: (),
                    });
                    break;
                }
            }

            if commit_transaction {
                if self
                    .transaction_in_progress
                    .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
                    .is_ok()
                {
                    self.consumer.commit_transaction();
                }
                break;
            }
        }
        self.consumer.extended(total, None, consumed);
    }
}

/// Reads data from an endpoint.
///
/// Use [`TransportInputEndpoint::open`] to obtain an [`InputReader`].
pub trait InputReader: Send + Sync {
    fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;

    /// Requests the input reader to execute `command`.
    fn request(&self, command: InputReaderCommand);

    /// Returns true if the endpoint is closed, meaning that it has already
    /// acted on all of the commands that it ever will. A closed endpoint can be
    /// one that came to the end of its input (and is not waiting for more to
    /// arrive) or one that encountered a fatal error and cannot continue.
    ///
    /// An endpoint is often implemented in terms of a channel to a thread. In
    /// such a case, this can be implemented in terms of `is_closed` on the
    /// channel's sender.
    fn is_closed(&self) -> bool;

    fn replay(&self, metadata: JsonValue, data: RmpValue) {
        self.request(InputReaderCommand::Replay { metadata, data });
    }

    fn extend(&self) {
        self.request(InputReaderCommand::Extend);
    }

    fn pause(&self) {
        self.request(InputReaderCommand::Pause);
    }

    fn queue(&self, checkpoint_requested: bool) {
        self.request(InputReaderCommand::Queue {
            checkpoint_requested,
        });
    }

    fn disconnect(&self) {
        self.request(InputReaderCommand::Disconnect);
    }

    /// Returns the approximate amount of memory used by the connector's
    /// underlying implementation.  For the Kafka connectors, for example, this
    /// is the amount of memory used by librdkafka.  Not all connectors use a
    /// substantial amount of memory, so the default implementation returns 0.
    fn memory(&self) -> usize {
        0
    }
}

/// Position in an input stream, including the timestamp when the data was ingested
/// from the transport endpoint and transport-specific metadata such as delta table
/// version or Kafka partition offsets.
#[derive(Clone, Debug)]
pub struct Watermark {
    pub timestamp: DateTime<Utc>,
    pub metadata: Option<JsonValue>,
}

impl Watermark {
    pub fn new(timestamp: DateTime<Utc>, metadata: Option<JsonValue>) -> Self {
        Self {
            timestamp,
            metadata,
        }
    }
}

/// Input stream consumer.
///
/// A transport endpoint pushes binary data downstream via an instance of this
/// trait.
pub trait InputConsumer: Send + Sync + DynClone {
    /// Returns the maximum number of records that an `InputReader` should queue
    /// in response to a [InputReaderCommand::Queue] command.
    ///
    /// Nothing keeps the endpoint from queuing more than this if necessary (for
    /// example, if for the sake of lateness it needs to group more than this
    /// number of records together).
    fn max_batch_size(&self) -> usize;

    /// Returns the level of fault tolerance that the pipeline supports, if any.
    ///
    /// An endpoint only needs to implement `min(endpoint_ft, pipeline_ft)`
    /// fault tolerance, where `endpoint_ft` is what the endpoint returns from
    /// `InputEndpoint::fault_tolerance` and `pipeline_ft` is what this function
    /// returns.  For example, if an input adapter supports
    /// `Some(FtModel::ExactlyOnce)`, but the pipeline's fault tolerance level
    /// is `None`, then the input adapter can simply pass `None` as `resume` to
    /// [InputConsumer::extended].  This optimization is, probably, worthwhile
    /// only to input adapters that log a copy of all of their data, instead of
    /// just metadata.
    fn pipeline_fault_tolerance(&self) -> Option<FtModel>;

    /// Returns a hasher, if the fault tolerance model calls for hashing, and
    /// `None` otherwise.
    ///
    /// This is just a convenience method.  Connectors can do hashing any way
    /// they like, as long as they do it the same way for new data and for
    /// replays.
    fn hasher(&self) -> Option<Xxh3Default> {
        match self.pipeline_fault_tolerance() {
            Some(FtModel::ExactlyOnce) => Some(Xxh3Default::new()),
            _ => None,
        }
    }

    /// Reports `errors` as parse errors.
    fn parse_errors(&self, errors: Vec<ParseError>);

    /// Reports that the input adapter has internally buffered `amt` records and
    /// bytes.
    ///
    /// Fault-tolerant input adapters should report buffered data during replay
    /// as well as in normal operation.
    fn buffered(&self, amt: BufferSize);

    /// Reports that the input adapter has completed flushing `amt` data to the
    /// circuit, that hash to `hash`, in response to an
    /// [InputReaderCommand::Replay] request.
    ///
    /// Only a fault-tolerant input adapter will invoke this.
    fn replayed(&self, amt: BufferSize, hash: u64);

    /// Reports that the input adapter has completed flushing `amt` data to the
    /// circuit, that hash to `hash`, in response to an
    /// [InputReaderCommand::Queue] request.
    ///
    /// If the step is one that the input adapter can restart after, or replay,
    /// then it should supply that as `resume` (see [Resume] for details).
    fn extended(&self, amt: BufferSize, resume: Option<Resume>, watermarks: Vec<Watermark>);

    /// Reports that the endpoint has reached end of input and that no more data
    /// will be received from the endpoint.
    ///
    /// If the endpoint has already indicated that it has buffered records then
    /// the controller will request them in future [InputReaderCommand::Queue]
    /// messages. The endpoint must not make further calls to
    /// [InputConsumer::buffered] or [InputConsumer::parse_errors].
    fn eoi(&self);

    /// Request the controller to schedule a step even if the connector hasn't queued
    /// any records.
    fn request_step(&self);

    /// The connector is initiating a transaction. `label` is an optional label for
    /// the transaction for debugging purposes.
    ///
    /// This function can be invoked in response to a `Queue` command.
    ///
    /// Any updates pushed by the connector after this function is invoked will be part
    /// of the transaction.
    ///
    /// The connector _must_ perform a matching call to `commit_transaction` to commit the
    /// transaction.
    ///
    /// Multiple connectors can initiate a transaction concurrently, in which case their
    /// updates will be combined into a single transaction. The transaction will be committed
    /// when all the connectors have committed it.
    fn start_transaction(&self, label: Option<&str>);

    /// The connector is committing a transaction started by a previous `start_transaction` call.
    ///
    /// This function can be invoked in response to a `Queue` command after pushing all updates that
    /// belong to the the transaction, immediately before calling `extended`. The connector cannot
    /// queue any more updates after this function is invoked, until the next `Queue` command.
    fn commit_transaction(&self);

    /// Register connector-specific metrics for Prometheus export.
    ///
    /// A connector may call this once during [`TransportInputEndpoint::open`]
    /// to provide an [`Arc<dyn ConnectorMetrics>`] whose [`ConnectorMetrics::metrics`]
    /// will be polled on every scrape.  The default implementation is a no-op,
    /// so connectors that have no custom metrics need not override it.
    fn set_custom_metrics(&self, _metrics: Arc<dyn ConnectorMetrics>) {}

    /// Returns a watch receiver that fires each time pipeline step processing
    /// completes.  The value is the count of fully-processed steps
    /// (`total_completed_steps`): a record ingested in step `n` is done when
    /// the value exceeds `n`.
    ///
    /// Input adapters can use this to defer acknowledgment (e.g. a CDC
    /// connector holding a replication slot) until their data has been
    /// processed by the circuit and all output connectors.
    ///
    /// Returns `None` if the consumer does not support completion tracking.
    fn completion_watcher(&self) -> Option<tokio::sync::watch::Receiver<Completion>>;

    /// Returns a watch receiver that fires each time a durable checkpoint
    /// completes.  The value is the count of checkpointed steps: a record
    /// ingested in step `n` is durably stored when the value exceeds `n`.
    ///
    /// Input adapters that require at-least-once delivery stronger than step
    /// completion (e.g. a CDC connector that must not advance its replication
    /// slot past the last checkpoint) can wait on this rather than on
    /// [`completion_watcher`][Self::completion_watcher].
    ///
    /// Returns `Some` only when fault tolerance is enabled for the pipeline
    /// (which implies storage is configured and checkpoints are scheduled).
    /// Returns `None` otherwise, in which case the connector should fall back
    /// to [`completion_watcher`][Self::completion_watcher].
    fn checkpoint_watcher(&self) -> Option<tokio::sync::watch::Receiver<u64>> {
        None
    }

    /// Endpoint failed.
    ///
    /// Reports that the endpoint failed and that it will not queue any more
    /// data.
    ///
    /// Optional tag that can be used for additional context
    /// e.g. for rate limiting
    fn error(&self, fatal: bool, error: AnyError, tag: Option<&'static str>);

    /// Updates the health status of the connector.
    fn update_connector_health(&self, health: ConnectorHealth);
}

/// Information needed to restart after or replay input.
///
/// Feldera supports a few ways to checkpoint and resume a pipeline.  These
/// operations in turn require support from the pipeline's input adapters:
///
/// 1. To support suspend and resume, or at-least-once fault tolerance, the
///    input adapter must indicate, per step, how to restart from just after
///    that step, by passing `Some(Resume::*)` to [InputConsumer::extended].
///
///    Such input adapters might have steps for which seeking would be
///    impractical.  Such an input adapter may skip over those steps by passing
///    `Some(Resume::Barrier)` instead; the controller will not try to
///    checkpoint after them.
///
/// 2. To additionally support exactly once fault tolerance, the input adapter
///    must indicate, per step, both how to restart after the step and how to
///    replay exactly that step, by passing `Some(Resume::Replay { .. })` to
///    [InputConsumer::extended].
///
///    An input adapter that supports fault tolerance may not skip steps; that
///    is, it must supply `Some(Resume::Replay { .. })` for every step.
#[derive(Clone, Debug)]
pub enum Resume {
    /// The input adapter does not support resuming after this step.
    Barrier,

    /// The input adapter can resume just after this step, but it can't replay
    /// the step exactly.
    Seek {
        /// Metadata needed for the controller to restart the input adapter from
        /// just after this input step.
        seek: JsonValue,
    },

    /// The input adapter can replay this step exactly, or resume just after the
    /// step.
    ///
    /// Input adapters can use `seek` and `replay` in different combinations:
    ///
    /// - Some kinds of input adapters, for example the ones for files, or for
    ///   Kafka, can reread the input data that they used before.  These will
    ///   ordinarily just use `seek`, filling it with a pointer just past the
    ///   end of the data to be read. (Ordinarily, it would already know where
    ///   the start is from the previous step, so the start pointer isn't
    ///   usually needed.)
    ///
    ///   These input adapters can just set `replay` to [RmpValue::Nil].
    ///
    /// - Other kinds of input connectors can't seek back and reread the input
    ///   data that they used before. The best example is the HTTP input
    ///   connector, because which can't ask whatever client connected before to
    ///   repeat the same exact data that it input before.  These input
    ///   connectors have to save all the input data for replay, by putting into
    ///   the `replay` field.
    ///
    ///   These input adapters can just set `seek` to [JsonValue::Null].
    ///
    ///   (In theory, any input connector could substitute data for metadata,
    ///   but if the data can simply be reread using the metadata, we usually
    ///   consider that better because it saves time and space saving all the
    ///   data when in most cases it will never be reread.)
    Replay {
        /// Metadata needed for the controller to restart the input adapter from
        /// just after this input step.
        seek: JsonValue,

        /// The data needed for the controller to replay exactly this input step
        /// using [InputReaderCommand::Replay].
        replay: RmpValue,

        /// Hash of the input records in this step, for verification on replay.
        ///
        /// The input adapter can compute this in any way convenient to it, as
        /// long as it does so the same way for reading data initially and on
        /// replay.  On replay, the controller checks that the replayed value
        /// matches the original one and fails the circuit if it differs.
        hash: u64,
    },
}

impl Resume {
    pub fn is_barrier(&self) -> bool {
        matches!(self, Self::Barrier)
    }

    /// Returns the `seek` value, if any, in this [Resume].
    pub fn seek(&self) -> Option<&JsonValue> {
        match self {
            Resume::Barrier => None,
            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
        }
    }

    /// Consumes this [Resume] and returns just the `seek` value, if any.
    pub fn into_seek(self) -> Option<JsonValue> {
        match self {
            Resume::Barrier => None,
            Resume::Seek { seek } | Resume::Replay { seek, .. } => Some(seek),
        }
    }

    /// Returns the maximum fault tolerance level that this [Resume] can
    /// support.
    pub fn fault_tolerance(&self) -> FtModel {
        match self {
            &Resume::Barrier | Resume::Seek { .. } => FtModel::AtLeastOnce,
            Resume::Replay { .. } => FtModel::ExactlyOnce,
        }
    }

    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
    /// `seek`; otherwise, returns `Resume::Seek` with `seek`.
    ///
    /// This is convenient for endpoints that only need to use metadata to
    /// support journaling. [InputConsumer::hasher] can be a convenient way to
    /// get a hasher.
    pub fn new_metadata_only(seek: JsonValue, hash: Option<u64>) -> Self {
        match hash {
            Some(hash) => Self::Replay {
                seek,
                replay: RmpValue::Nil,
                hash,
            },
            None => Self::Seek { seek },
        }
    }

    /// If `hash` is provided, returns `Resume::Replay` with its hash value and
    /// whatever `replay` returns; otherwise, returns `Resume::Seek`.
    ///
    /// This is convenient for endpoints that support journaling by journaling
    /// all the data (and that don't need to journal any metadata).
    /// [InputConsumer::hasher] can be a convenient way to get a hasher.
    pub fn new_data_only<F>(replay: F, hash: Option<u64>) -> Self
    where
        F: FnOnce() -> RmpValue,
    {
        let seek = JsonValue::Null;
        match hash {
            Some(hash) => Self::Replay {
                seek,
                replay: replay(),
                hash,
            },
            None => Self::Seek { seek },
        }
    }
}

dyn_clone::clone_trait_object!(InputConsumer);

/// Helper function to parse resume info passed to [`InputConsumer::extended`].
pub fn parse_resume_info<M>(metadata: &JsonValue) -> AnyResult<M>
where
    M: DeserializeOwned,
{
    serde_json_path_to_error::from_value::<M>(metadata.clone())
            .map_err(|e| anyhow::anyhow!("unable to parse checkpointed connector state (checkpointed state: {metadata}; parse error: {e})"))
}

#[doc(hidden)]
pub type AsyncErrorCallback = Box<dyn Fn(bool, AnyError, Option<&'static str>) + Send + Sync>;

/// Command handler API exposed by connectors.
///
/// Connectors can support arbitrary connector-specific commands that can be
/// invoked via the `/command` endpoint. These commands take and return arbitrary
/// JSON values.
///
/// This API is not part of trait `Output[Input]Endpoint` because it can be invoked
/// from any thread, and requires `Send + Sync`, while the `OutputEndpoint` API is
/// not `Sync` and is meant to be called from the controller thread only.
///
/// The idea is that connectors that support custom commands create separate command
/// handler objects that implement this trait and are returned by
/// `OutputEndpoint::command_handler`.
pub trait CommandHandler: Send + Sync {
    /// Handle a command specified by the JSON objest.
    ///
    /// Fails if the connector does not support the command, the command is invalid,
    /// or command execution fails.
    fn command(&self, command: serde_json::Value) -> AnyResult<serde_json::Value>;
}

/// Distinguishes a full-materialized-view snapshot from an incremental delta
/// when pushed to an output connector.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum OutputBatchType {
    Delta,
    Snapshot,
}

/// A configured output transport endpoint.
///
/// Output endpoints come in two flavors:
///
/// * A [fault-tolerant](crate#fault-tolerance) endpoint accepts output that has
///   been divided into numbered steps.  If it is given output associated with a
///   step number that has already been output, then it discards the duplicate.
///   It must also keep data written to the output transport from becoming
///   visible to downstream readers until `batch_end` is called.  (This works
///   for output to Kafka, which supports transactional output.  If it is
///   difficult for some future fault-tolerant output endpoint, then the API
///   could be adjusted to support writing output only after it can become
///   immediately visible.)
///
/// * A non-fault-tolerant endpoint does not have a concept of steps and ignores
///   them.
pub trait OutputEndpoint: Send {
    fn command_handler(&self) -> Option<Arc<dyn CommandHandler>> {
        None
    }

    /// Finishes establishing the connection to the output endpoint.
    ///
    /// If the endpoint encounters any errors during output, now or later, it
    /// invokes `async_error_callback` to notify the client about asynchronous
    /// errors, i.e., errors that happen outside the context of the
    /// [`OutputEndpoint::push_buffer`] method. For instance, a reliable message
    /// bus like Kafka may notify the endpoint about a failure to deliver a
    /// previously sent message via an async callback. If the endpoint is unable
    /// to handle this error, it must forward it to the client via the
    /// `async_error_callback`.  The first argument of the callback is a flag
    /// that indicates a fatal error that the endpoint cannot recover from.
    fn connect(&mut self, async_error_callback: AsyncErrorCallback) -> AnyResult<()>;

    /// Maximum buffer size that this transport can transmit.
    /// The encoder should not generate buffers exceeding this size.
    fn max_buffer_size_bytes(&self) -> usize;

    /// Notifies the output endpoint that data subsequently written by
    /// `push_buffer` belong to the given `step`.
    ///
    /// A [fault-tolerant](crate#fault-tolerance) endpoint has additional
    /// requirements:
    ///
    /// 1. If data for the given step has been written before, the endpoint
    ///    should discard it.
    ///
    /// 2. The output batch must not be made visible to downstream readers
    ///    before the next call to `batch_end`.
    fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) -> AnyResult<()> {
        Ok(())
    }

    fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()>;

    /// Output a message consisting of a key/value pair, with optional headers.
    ///
    /// This API is implemented by Kafka and other transports that transmit
    /// messages consisting of key and value fields and is invoked by
    /// Kafka-specific data formats that rely on this message structure,
    /// e.g., Debezium. If a given transport does not implement this API, it
    /// should return an error.
    ///
    /// `headers` contains a list of key/optional_value pairs to be appended
    /// to Kafka message headers.
    fn push_key(
        &mut self,
        key: Option<&[u8]>,
        val: Option<&[u8]>,
        headers: &[(&str, Option<&[u8]>)],
    ) -> AnyResult<()>;

    /// Notifies the output endpoint that output for the current step is
    /// complete.
    ///
    /// A fault-tolerant output endpoint may now make the output batch visible
    /// to readers.
    fn batch_end(&mut self) -> AnyResult<()> {
        Ok(())
    }

    /// Whether this endpoint is [fault tolerant](crate#fault-tolerance).
    fn is_fault_tolerant(&self) -> bool;

    /// Returns the approximate amount of memory used by the connector's
    /// underlying implementation.  For the Kafka connectors, for example, this
    /// is the amount of memory used by librdkafka.  Not all connectors use a
    /// substantial amount of memory, so the default implementation returns 0.
    fn memory(&self) -> usize {
        0
    }
}

/// An [UnboundedReceiver] wrapper for [InputReaderCommand] for fault-tolerant connectors.
///
/// A fault-tolerant connector wants to receive, in order:
///
/// - Zero or more [InputReaderCommand::Replay]s.
///
/// - Zero or more other commands.
///
/// This helps with that.
// This is used by Kafka and Nexmark but both of those are optional.
pub struct InputCommandReceiver<M, D> {
    receiver: UnboundedReceiver<InputReaderCommand>,
    buffer: Option<InputReaderCommand>,
    _phantom: PhantomData<(M, D)>,
}

/// Error type returned by some [InputCommandReceiver] methods.
///
/// We could just use `anyhow` and that would probably be just as good though.
#[derive(Debug)]
pub enum InputCommandReceiverError {
    Disconnected,
    JsonDecodeError(serde_json_path_to_error::Error),
    RmpDecodeError(RmpDecodeError),
}

impl std::error::Error for InputCommandReceiverError {}

impl Display for InputCommandReceiverError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            InputCommandReceiverError::Disconnected => write!(f, "sender disconnected"),
            InputCommandReceiverError::RmpDecodeError(e) => e.fmt(f),
            InputCommandReceiverError::JsonDecodeError(e) => e.fmt(f),
        }
    }
}

impl From<RmpDecodeError> for InputCommandReceiverError {
    fn from(value: RmpDecodeError) -> Self {
        Self::RmpDecodeError(value)
    }
}

impl From<serde_json_path_to_error::Error> for InputCommandReceiverError {
    fn from(value: serde_json_path_to_error::Error) -> Self {
        Self::JsonDecodeError(value)
    }
}

// This is used by Kafka and Nexmark but both of those are optional.
impl<M, D> InputCommandReceiver<M, D> {
    pub fn new(receiver: UnboundedReceiver<InputReaderCommand>) -> Self {
        Self {
            receiver,
            buffer: None,
            _phantom: PhantomData,
        }
    }

    #[doc(hidden)]
    pub fn blocking_recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
    where
        M: for<'a> Deserialize<'a>,
        D: for<'a> Deserialize<'a>,
    {
        let command = self.blocking_recv()?;
        self.take_replay(command)
    }

    #[doc(hidden)]
    pub async fn recv_replay(&mut self) -> Result<Option<(M, D)>, InputCommandReceiverError>
    where
        M: for<'a> Deserialize<'a>,
        D: for<'a> Deserialize<'a>,
    {
        let command = self.recv().await?;
        self.take_replay(command)
    }

    fn take_replay(
        &mut self,
        command: InputReaderCommand,
    ) -> Result<Option<(M, D)>, InputCommandReceiverError>
    where
        M: for<'a> Deserialize<'a>,
        D: for<'a> Deserialize<'a>,
    {
        match command {
            InputReaderCommand::Replay { metadata, data } => Ok(Some((
                serde_json_path_to_error::from_value::<M>(metadata)?,
                rmpv::ext::from_value::<D>(data)?,
            ))),
            other => {
                self.put_back(other);
                Ok(None)
            }
        }
    }

    #[doc(hidden)]
    pub async fn recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
        match self.buffer.take() {
            Some(value) => Ok(value),
            None => self
                .receiver
                .recv()
                .await
                .ok_or(InputCommandReceiverError::Disconnected),
        }
    }

    #[doc(hidden)]
    pub fn blocking_recv(&mut self) -> Result<InputReaderCommand, InputCommandReceiverError> {
        match self.buffer.take() {
            Some(value) => Ok(value),
            None => self
                .receiver
                .blocking_recv()
                .ok_or(InputCommandReceiverError::Disconnected),
        }
    }

    #[doc(hidden)]
    pub fn try_recv(&mut self) -> Result<Option<InputReaderCommand>, InputCommandReceiverError> {
        if let Some(command) = self.buffer.take() {
            Ok(Some(command))
        } else {
            match self.receiver.try_recv() {
                Ok(command) => Ok(Some(command)),
                Err(TryRecvError::Empty) => Ok(None),
                Err(TryRecvError::Disconnected) => Err(InputCommandReceiverError::Disconnected),
            }
        }
    }

    #[doc(hidden)]
    pub fn put_back(&mut self, value: InputReaderCommand) {
        assert!(self.buffer.is_none());
        self.buffer = Some(value);
    }
}