1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
use anyhow::{Error as AnyError, Result as AnyResult};
use serde::de::{Error, SeqAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value as JsonValue;
use std::fmt::Formatter;
use std::num::NonZeroUsize;
use std::thread::available_parallelism;
use std::{collections::BTreeMap, env};
use utoipa::ToSchema;
use uuid::Uuid;
/// Configuration for reading data from Kafka topics with `InputTransport`.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, ToSchema)]
pub struct KafkaInputConfig {
/// Options passed directly to `rdkafka`.
///
/// [`librdkafka` options](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
/// used to configure the Kafka consumer.
///
/// This input connector does not use consumer groups, so options related to
/// consumer groups are rejected, including:
///
/// * `group.id`, if present, is ignored.
/// * `auto.offset.reset` (use `start_from` instead).
/// * "enable.auto.commit", if present, must be set to "false".
/// * "enable.auto.offset.store", if present, must be set to "false".
#[serde(flatten)]
pub kafka_options: BTreeMap<String, String>,
/// Topic to subscribe to.
pub topic: String,
/// The log level of the client.
///
/// If not specified, the log level will be calculated based on the global
/// log level of the `log` crate.
pub log_level: Option<KafkaLogLevel>,
/// Maximum timeout in seconds to wait for the endpoint to join the Kafka
/// consumer group during initialization.
#[serde(default = "default_group_join_timeout_secs")]
pub group_join_timeout_secs: u32,
/// Set to 1 or more to fix the number of threads used to poll
/// `rdkafka`. Multiple threads can increase performance with small Kafka
/// messages; for large messages, one thread is enough. In either case, too
/// many threads can harm performance. If unset, the default is 3, which
/// helps with small messages but will not harm performance with large
/// messagee
pub poller_threads: Option<usize>,
/// Where to begin reading the topic.
#[serde(default, with = "crate::serde_via_value")]
pub start_from: KafkaStartFromConfig,
/// The AWS region to use while connecting to AWS Managed Streaming for Kafka (MSK).
pub region: Option<String>,
/// The list of Kafka partitions to read from.
///
/// Only the specified partitions will be consumed. If this field is not set,
/// the connector will consume from all available partitions.
///
/// If `start_from` is set to `offsets` and this field is provided, the
/// number of partitions must exactly match the number of offsets, and the
/// order of partitions must correspond to the order of offsets.
///
/// If offsets are provided for all partitions, this field can be omitted.
pub partitions: Option<Vec<i32>>,
/// By default, if the input connector resumes from a checkpoint and the
/// data where it needs to resume has expired from the Kafka topic, the
/// input connector fails initialization and the pipeline will fail to start.
///
/// Set this to true to change the behavior so that, if data is not
/// available on resume, the input connector starts from the earliest
/// offsets that are now available.
pub resume_earliest_if_data_expires: bool,
/// Whether to include Kafka headers in the record metadata.
///
/// When `true`, Kafka message headers are available via the `CONNECTOR_METADATA()` function.
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
#[serde(default)]
pub include_headers: Option<bool>,
/// Whether to include Kafka message timestamp in the record metadata.
///
/// When `true`, Kafka message timestamp is available via the `CONNECTOR_METADATA()` function.
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
#[serde(default)]
pub include_timestamp: Option<bool>,
/// Whether to include Kafka partition in the record metadata.
///
/// When `true`, Kafka partition from which the message was read is available via the `CONNECTOR_METADATA()` function.
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
#[serde(default)]
pub include_partition: Option<bool>,
/// Whether to include Kafka message offset in the record metadata.
///
/// When `true`, Kafka message offset is available via the `CONNECTOR_METADATA()` function.
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
#[serde(default)]
pub include_offset: Option<bool>,
/// Whether to include Kafka topic in the record metadata.
///
/// When `true`, Kafka topic from which the message was read is available via the `CONNECTOR_METADATA()` function.
/// See <https://docs.feldera.com/connectors/sources/kafka#metadata> for details.
#[serde(default)]
pub include_topic: Option<bool>,
/// When lateness is enabled on a Feldera table, Feldera only produces
/// correct output if input arrives approximately in order within the bounds
/// of the lateness. The Feldera Kafka input connector can reorder input
/// when there are multiple partitions:
///
/// - If partitions start at different times, then reading all the
/// partitions in parallel will naturally consume data out of order.
///
/// - Even if they start at the same time, partitions might contain events
/// at different rates.
///
/// - Even if the partitions start at the same time and have the same number
/// of events per unit time, if partitions are spread across brokers,
/// different brokers may fetch data at different rates.
///
/// - Even if all of the partitions are on a single broker, one cannot
/// expect all of the partitions to naturally remain exactly in sync
/// forever.
///
/// Setting this option to `true` addresses the issue by synchronizing
/// ingestion across partitions, ingesting records in order of their Kafka
/// event timestamps.
///
/// Pitfalls of this solution include:
///
/// - Kafka event timestamps are not necessarily monotonically increasing
/// even within a single partition. If timestamps jump backward beyond
/// the lateness, then this can also cause correctness problems.
///
/// (This can be avoided by keeping clocks on Kafka producers and brokers
/// synchronized.)
///
/// - If an event with a timestamp far in the future is added to a
/// partition, that event, and all those that follow it, will never be
/// processed.
///
/// - If one or a few partitions have timestamps far behind the others, only
/// those partitions will be processed until all the old events are
/// processed. (This is the flip side of the previous pitfall.)
///
/// - One or more empty partitions will prevent any data from being
/// processed at all, because there is no way to know the timestamp for
/// the first event that will be added to that partition.
///
/// - In a topic with `N` nonempty partitions, at least `N - 1` events will
/// always be left unprocessed (one in each of `N - 1` partitions), because
/// there is no way to know the timestamp for the next event to be added to
/// the partition whose events have been completely processed.
#[serde(default)]
pub synchronize_partitions: bool,
}
impl KafkaInputConfig {
/// Returns a default [KafkaInputConfig] with the given `kafka_options` and
/// `topic`. To be a usable configuration, `kafka_options` must contain at
/// least `bootstrap.servers`.
pub fn default(kafka_options: BTreeMap<String, String>, topic: impl Into<String>) -> Self {
Self {
kafka_options,
topic: topic.into(),
log_level: None,
group_join_timeout_secs: default_group_join_timeout_secs(),
poller_threads: None,
start_from: KafkaStartFromConfig::default(),
region: None,
partitions: None,
resume_earliest_if_data_expires: false,
include_headers: None,
include_timestamp: None,
include_partition: None,
include_offset: None,
include_topic: None,
synchronize_partitions: false,
}
}
// Returns the number of threads to use based on configuration, defaults,
// and system resources.
pub fn poller_threads(&self) -> usize {
let max_threads = available_parallelism().map_or(16, NonZeroUsize::get);
self.poller_threads.unwrap_or(3).clamp(1, max_threads)
}
pub fn metadata_requested(&self) -> bool {
self.include_topic == Some(true)
|| self.include_timestamp == Some(true)
|| self.include_partition == Some(true)
|| self.include_offset == Some(true)
|| self.include_headers == Some(true)
}
}
impl<'de> Deserialize<'de> for KafkaInputConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let compat = compat::KafkaInputConfigCompat::deserialize(deserializer)?;
Self::try_from(compat).map_err(D::Error::custom)
}
}
/// Where to begin reading a Kafka topic.
#[derive(Debug, Clone, Default, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum KafkaStartFromConfig {
/// Start from the beginning of the topic.
Earliest,
/// Start from the current end of the topic.
///
/// This will only read any data that is added to the topic after the
/// connector initializes.
#[default]
Latest,
/// Start from particular offsets in the topic.
///
/// The number of offsets must match the number of partitions in the topic.
Offsets(Vec<i64>),
/// Start from a particular timestamp in the topic.
///
/// Kafka timestamps are in milliseconds since the epoch.
Timestamp(i64),
}
/// Kafka logging levels.
#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, ToSchema)]
pub enum KafkaLogLevel {
#[serde(rename = "emerg")]
Emerg,
#[serde(rename = "alert")]
Alert,
#[serde(rename = "critical")]
Critical,
#[serde(rename = "error")]
Error,
#[serde(rename = "warning")]
Warning,
#[serde(rename = "notice")]
Notice,
#[serde(rename = "info")]
Info,
#[serde(rename = "debug")]
Debug,
}
/// On startup, the endpoint waits to join the consumer group.
/// This constant defines the default wait timeout.
pub const fn default_group_join_timeout_secs() -> u32 {
10
}
impl KafkaInputConfig {
/// Set `option` to `val`; return an error if `option` is set to a different
/// value.
#[allow(dead_code)]
fn enforce_option(&mut self, option: &str, val: &str) -> AnyResult<()> {
let option_val = self
.kafka_options
.entry(option.to_string())
.or_insert_with(|| val.to_string());
if option_val != val {
Err(AnyError::msg(
"cannot override '{option}' option: the Kafka transport adapter sets this option to '{val}'",
))?;
}
Ok(())
}
/// Set `option` to `val`, if missing.
fn set_option_if_missing(&mut self, option: &str, val: &str) {
self.kafka_options
.entry(option.to_string())
.or_insert_with(|| val.to_string());
}
/// Validate configuration, set default option values required by this
/// adapter.
pub fn validate(&mut self) -> AnyResult<()> {
self.set_option_if_missing("bootstrap.servers", &default_redpanda_server());
// These options will prevent librdkafka from automatically committing offsets
// of consumed messages to the broker, meaning that next time the
// connector is instantiated it will start reading from the offset
// specified in `auto.offset.reset`. We used to set these to
// `true`, which caused `rdkafka` to hang in some circumstances
// (https://github.com/confluentinc/librdkafka/issues/3954). Besides, the new behavior
// is probably more correct given that circuit state currently does not survive
// across pipeline restarts, so it makes sense to start feeding messages
// from the start rather than from the last offset consumed by the
// previous instance of the pipeline, whose state is lost. Once we add
// fault tolerance, we will likely use explicit commits, which also do
// not require these options.
//
// See https://docs.confluent.io/platform/current/clients/consumer.html#offset-management
//
// Note: we allow the user to override the options, so they can still enable
// auto commit if they know what they are doing, e.g., the secops demo
// requires the pipeline to commit its offset for the generator to know
// when to resume sending.
self.set_option_if_missing("enable.auto.commit", "false");
self.set_option_if_missing("enable.auto.offset.store", "false");
let group_id = format!("{}", Uuid::now_v7());
self.set_option_if_missing("group.id", &group_id);
self.set_option_if_missing("enable.partition.eof", "false");
// We link with openssl statically, which means that the default OPENSSLDIR location
// baked into openssl is not correct (see https://github.com/fede1024/rust-rdkafka/issues/594).
// We set the ssl.ca.location to "probe" so that librdkafka can find the CA certificates in a
// standard location (e.g., /etc/ssl/).
self.set_option_if_missing("ssl.ca.location", "probe");
// Enable client context `stats` callback so we can periodically check
// up on librdkafka memory usage.
self.set_option_if_missing("statistics.interval.ms", "10000");
if let (Some(partitions), KafkaStartFromConfig::Offsets(offsets)) =
(&self.partitions, &self.start_from)
&& partitions.len() != offsets.len()
{
anyhow::bail!(
"the number of partitions ('{partitions:?}') should be equal to the number of offsets '{offsets:?}' specified"
)
}
Ok(())
}
}
pub fn default_redpanda_server() -> String {
env::var("REDPANDA_BROKERS").unwrap_or_else(|_| "localhost".to_string())
}
const fn default_initialization_timeout_secs() -> u32 {
60
}
/// Kafka header value encoded as a UTF-8 string or a byte array.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, ToSchema)]
#[repr(transparent)]
pub struct KafkaHeaderValue(pub Vec<u8>);
/// Visitor for deserializing Kafka headers value.
struct HeaderVisitor;
impl<'de> Visitor<'de> for HeaderVisitor {
type Value = KafkaHeaderValue;
fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter.write_str("a string (e.g., \"xyz\") or a byte array (e.g., '[1,2,3])")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
Ok(KafkaHeaderValue(v.as_bytes().to_owned()))
}
fn visit_string<E>(self, v: String) -> Result<Self::Value, E>
where
E: Error,
{
Ok(KafkaHeaderValue(v.into_bytes()))
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut result = Vec::with_capacity(seq.size_hint().unwrap_or_default());
while let Some(b) = seq.next_element()? {
result.push(b);
}
Ok(KafkaHeaderValue(result))
}
}
impl<'de> Deserialize<'de> for KafkaHeaderValue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(HeaderVisitor)
}
}
#[cfg(test)]
#[test]
fn test_kafka_header_value_deserialize() {
assert_eq!(
serde_json::from_str::<KafkaHeaderValue>(r#""foobar""#).unwrap(),
KafkaHeaderValue(br#"foobar"#.to_vec())
);
assert_eq!(
serde_json::from_str::<KafkaHeaderValue>(r#"[1,2,3,4,5]"#).unwrap(),
KafkaHeaderValue(vec![1u8, 2, 3, 4, 5])
);
assert!(serde_json::from_str::<KafkaHeaderValue>(r#"150"#).is_err());
assert!(serde_json::from_str::<KafkaHeaderValue>(r#"{{"foo": "bar"}}"#).is_err());
}
/// Kafka message header.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
pub struct KafkaHeader {
pub key: String,
pub value: Option<KafkaHeaderValue>,
}
/// Configuration for writing data to a Kafka topic with `OutputTransport`.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
pub struct KafkaOutputConfig {
/// Options passed directly to `rdkafka`.
///
/// See [`librdkafka` options](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
/// used to configure the Kafka producer.
#[serde(flatten)]
pub kafka_options: BTreeMap<String, String>,
/// Topic to write to.
pub topic: String,
/// Kafka headers to be added to each message produced by this connector.
#[serde(default)]
pub headers: Vec<KafkaHeader>,
/// The log level of the client.
///
/// If not specified, the log level will be calculated based on the global
/// log level of the `log` crate.
pub log_level: Option<KafkaLogLevel>,
/// Maximum timeout in seconds to wait for the endpoint to connect to
/// a Kafka broker.
///
/// Defaults to 60.
#[serde(default = "default_initialization_timeout_secs")]
pub initialization_timeout_secs: u32,
/// Optional configuration for fault tolerance.
pub fault_tolerance: Option<KafkaOutputFtConfig>,
/// If specified, this service is used to provide defaults for the Kafka options.
pub kafka_service: Option<String>,
/// The AWS region to use while connecting to AWS Managed Streaming for Kafka (MSK).
pub region: Option<String>,
}
/// Fault tolerance configuration for Kafka output connector.
#[derive(Debug, Clone, Default, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
#[serde(default)]
pub struct KafkaOutputFtConfig {
/// Options passed to `rdkafka` for consumers only, as documented at
/// [`librdkafka`
/// options](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
///
/// These options override `kafka_options` for consumers, and may be empty.
pub consumer_options: BTreeMap<String, String>,
/// Options passed to `rdkafka` for producers only, as documented at
/// [`librdkafka`
/// options](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
///
/// These options override `kafka_options` for producers, and may be empty.
pub producer_options: BTreeMap<String, String>,
}
impl KafkaOutputConfig {
#[allow(dead_code)]
/// Set `option` to `val`, if missing.
fn set_option_if_missing(&mut self, option: &str, val: &str) {
self.kafka_options
.entry(option.to_string())
.or_insert_with(|| val.to_string());
}
#[allow(dead_code)]
/// Validate configuration, set default option values required by this
/// adapter.
pub fn validate(&mut self) -> AnyResult<()> {
self.set_option_if_missing("bootstrap.servers", &default_redpanda_server());
// We link with openssl statically, which means that the default OPENSSLDIR location
// baked into openssl is not correct (see https://github.com/fede1024/rust-rdkafka/issues/594).
// We set the ssl.ca.location to "probe" so that librdkafka can find the CA certificates in a
// standard location (e.g., /etc/ssl/).
self.set_option_if_missing("ssl.ca.location", "probe");
// Enable client context `stats` callback so we can periodically check
// up on librdkafka memory usage.
self.set_option_if_missing("statistics.interval.ms", "10000");
Ok(())
}
}
/// A set of updates to a SQL table or view.
///
/// The `sequence_number` field stores the offset of the chunk relative to the
/// start of the stream and can be used to implement reliable delivery.
/// The payload is stored in the `bin_data`, `text_data`, or `json_data` field
/// depending on the data format used.
#[derive(Deserialize, ToSchema)]
pub struct Chunk {
pub sequence_number: u64,
// Exactly one of the following fields must be set.
// This should be an enum inlined with `#[serde(flatten)]`, but `utoipa`
// struggles to generate a schema for that.
/// Base64 encoded binary payload, e.g., bincode.
pub bin_data: Option<Vec<u8>>,
/// Text payload, e.g., CSV.
pub text_data: Option<String>,
/// JSON payload.
#[schema(value_type = Option<Object>)]
pub json_data: Option<JsonValue>,
}
mod compat {
use std::collections::BTreeMap;
use serde::Deserialize;
use crate::transport::kafka::{KafkaLogLevel, KafkaStartFromConfig};
#[derive(Deserialize)]
pub struct KafkaInputConfigCompat {
/// Current, long-standing configuration option.
log_level: Option<KafkaLogLevel>,
/// Current, long-standing configuration option.
#[serde(default = "super::default_group_join_timeout_secs")]
group_join_timeout_secs: u32,
/// Current configuration option.
poller_threads: Option<usize>,
/// Current configuration option, which changed type in an incompatible
/// way soon after it was introduced. No backward compatibility for the
/// initial form.
#[serde(default, with = "crate::serde_via_value")]
start_from: Option<KafkaStartFromConfig>,
/// Current configuration option that replaces the old `topics`
/// option. Currently mandatory.
topic: Option<String>,
/// Old form of `topic`. Currently accepted as a substitute as long as
/// it has exactly one element.
#[serde(default)]
topics: Vec<String>,
/// Legacy, now ignored.
fault_tolerance: Option<String>,
/// Legacy, now ignored.
kafka_service: Option<String>,
/// Options passed directly to `rdkafka`.
#[serde(flatten)]
kafka_options: BTreeMap<String, String>,
/// The AWS region to use while connecting to AWS Managed Streaming for Kafka (MSK).
region: Option<String>,
/// The Kafka partitions to read from.
partitions: Option<Vec<i32>>,
/// By default, if the input connector resumes from a checkpoint and the
/// data where it needs to resume has expired from the Kafka topic, the
/// input connector fails the pipeline.
///
/// Set this to true to change the behavior so that, if data is not
/// available on resume, the input connector starts from the earliest
/// offsets that are now available.
#[serde(default)]
pub resume_earliest_if_data_expires: bool,
include_headers: Option<bool>,
include_timestamp: Option<bool>,
include_partition: Option<bool>,
include_offset: Option<bool>,
include_topic: Option<bool>,
#[serde(default)]
synchronize_partitions: bool,
}
impl TryFrom<KafkaInputConfigCompat> for super::KafkaInputConfig {
type Error = String;
fn try_from(mut compat: KafkaInputConfigCompat) -> Result<Self, Self::Error> {
let (topic, start_from) = if !compat.topics.is_empty() {
// Legacy mode. Convert to modern form.
if compat.topic.is_some() {
return Err(
"Kafka input adapter may not have both (modern) `topic` and (legacy) `topics`."
.into(),
);
}
if compat.topics.len() != 1 {
return Err(format!(
"Kafka input adapter must have exactly one topic (not {}).",
compat.topics.len()
));
}
let start_from = if let Some(start_from) = compat.start_from {
start_from
} else if let Some(auto_offset_reset) =
compat.kafka_options.get("auto.offset.reset")
{
match auto_offset_reset.as_str() {
"smallest" | "earliest" | "beginning" => KafkaStartFromConfig::Earliest,
"largest" | "latest" | "end" => KafkaStartFromConfig::Latest,
_ => {
return Err(format!(
"Unrecognized value {auto_offset_reset:?} for `auto.offset.reset` in Kafka legacy input adapter configuration"
));
}
}
} else {
KafkaStartFromConfig::default()
};
(compat.topics.pop().unwrap(), start_from)
} else if let Some(topic) = compat.topic {
// Modern mode. Forbid legacy settings.
if compat.fault_tolerance.is_some() {
return Err("Kafka input adapter `fault_tolerance` setting is obsolete.".into());
}
if compat.kafka_service.is_some() {
return Err("Kafka input adapter `kafka_service` setting is obsolete.".into());
}
(topic, compat.start_from.unwrap_or_default())
} else {
return Err("Kafka input adapter is missing required `topic` setting.".into());
};
for key in compat.kafka_options.keys() {
if !key.contains('.')
&& key != "debug"
&& key != "enabled_events"
&& key != "retries"
{
return Err(format!(
"Invalid Kafka input connector configuration key {key:?}: it is not valid for the input connector, nor does it contain `.` as librdkafka configuration options generally do (nor is it one of the few special exceptions to that rule)."
));
}
}
Ok(Self {
topic,
kafka_options: compat.kafka_options,
log_level: compat.log_level,
group_join_timeout_secs: compat.group_join_timeout_secs,
poller_threads: compat.poller_threads,
start_from,
region: compat.region,
partitions: compat.partitions,
resume_earliest_if_data_expires: compat.resume_earliest_if_data_expires,
include_headers: compat.include_headers,
include_timestamp: compat.include_timestamp,
include_partition: compat.include_partition,
include_offset: compat.include_offset,
include_topic: compat.include_topic,
synchronize_partitions: compat.synchronize_partitions,
})
}
}
}