hyperi_rustlib/kafka_config.rs
1// Project: hyperi-rustlib
2// File: src/kafka_config.rs
3// Purpose: Shared Kafka librdkafka defaults, profiles, and file config loader
4// Language: Rust
5//
6// License: FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Shared Kafka librdkafka configuration profiles, merge helper, and file loader.
10//!
11//! This module is always available (no feature gate). The core profile constants
12//! and merge helper have zero external dependencies. File loading supports
13//! `.properties` without any feature gate; YAML and JSON require the
14//! `directory-config` and `config` features respectively.
15//!
16//! ## Loading from Config Git Directory
17//!
18//! Services store librdkafka settings in their config git directory and load
19//! them with [`config_from_file`]:
20//!
21//! ```rust,ignore
22//! use hyperi_rustlib::kafka_config::{config_from_file, merge_with_overrides, CONSUMER_PRODUCTION};
23//!
24//! let overrides = config_from_file("/config/kafka.properties")?;
25//! let rdkafka_config = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
26//! ```
27
28use std::collections::HashMap;
29use std::path::Path;
30
31use thiserror::Error;
32
33// ============================================================================
34// Error Type
35// ============================================================================
36
37/// Error loading librdkafka configuration from a file.
38#[derive(Debug, Error)]
39pub enum KafkaConfigError {
40 /// File does not exist.
41 #[error("kafka config file not found: {path}")]
42 FileNotFound { path: std::path::PathBuf },
43
44 /// File extension is not supported (or feature is not enabled).
45 #[error("unsupported kafka config format: {ext}. Supported: .properties, .yaml, .yml, .json")]
46 UnsupportedFormat { ext: String },
47
48 /// File content could not be parsed.
49 #[error("parse error in {path}: {message}")]
50 ParseError { path: String, message: String },
51
52 /// I/O error reading the file.
53 #[error("io error reading kafka config: {0}")]
54 Io(#[from] std::io::Error),
55}
56
57/// Result type for kafka config file operations.
58pub type KafkaConfigResult<T> = Result<T, KafkaConfigError>;
59
60// ============================================================================
61// File Loading
62// ============================================================================
63
64/// Load librdkafka configuration from a file in the config git directory.
65///
66/// Detects format from file extension:
67///
68/// | Extension | Format | Requires |
69/// |-----------|--------|---------|
70/// | `.properties` | Java-style `key=value` | nothing (always available) |
71/// | `.yaml`, `.yml` | YAML flat mapping | `directory-config` feature |
72/// | `.json` | JSON object | `config` feature |
73///
74/// The returned map passes directly to [`merge_with_overrides`] or as
75/// `librdkafka_overrides` in `KafkaConfig`.
76///
77/// # Errors
78///
79/// Returns [`KafkaConfigError`] if the file is missing, the format is
80/// unsupported, or parsing fails.
81pub fn config_from_file(path: impl AsRef<Path>) -> KafkaConfigResult<HashMap<String, String>> {
82 let path = path.as_ref();
83
84 let content = std::fs::read_to_string(path).map_err(|e| {
85 if e.kind() == std::io::ErrorKind::NotFound {
86 KafkaConfigError::FileNotFound {
87 path: path.to_path_buf(),
88 }
89 } else {
90 KafkaConfigError::Io(e)
91 }
92 })?;
93
94 let ext = path
95 .extension()
96 .and_then(|s| s.to_str())
97 .unwrap_or("")
98 .to_lowercase();
99
100 let path_str = path.display().to_string();
101
102 match ext.as_str() {
103 "properties" => Ok(config_from_properties_str(&content)),
104 "yaml" | "yml" => parse_yaml(&content, path_str),
105 "json" => parse_json(&content, path_str),
106 other => Err(KafkaConfigError::UnsupportedFormat {
107 ext: other.to_string(),
108 }),
109 }
110}
111
112/// Parse Java-style `.properties` content into a librdkafka config map.
113///
114/// Handles:
115/// - `key=value` pairs (splits on first `=` only, so values may contain `=`)
116/// - `#` and `!` comments
117/// - Empty lines and surrounding whitespace
118///
119/// Always available with no feature gate.
120#[must_use]
121pub fn config_from_properties_str(content: &str) -> HashMap<String, String> {
122 let mut config = HashMap::new();
123
124 for line in content.lines() {
125 let line = line.trim();
126 if line.is_empty() || line.starts_with('#') || line.starts_with('!') {
127 continue;
128 }
129 if let Some((key, value)) = line.split_once('=') {
130 config.insert(key.trim().to_string(), value.trim().to_string());
131 }
132 }
133
134 config
135}
136
137fn parse_yaml(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
138 #[cfg(feature = "directory-config")]
139 {
140 serde_yaml_ng::from_str(content).map_err(|e| KafkaConfigError::ParseError {
141 path,
142 message: e.to_string(),
143 })
144 }
145 #[cfg(not(feature = "directory-config"))]
146 {
147 let _ = (content, path);
148 Err(KafkaConfigError::UnsupportedFormat {
149 ext: "yaml — enable the `directory-config` feature".to_string(),
150 })
151 }
152}
153
154fn parse_json(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
155 #[cfg(feature = "config")]
156 {
157 serde_json::from_str(content).map_err(|e| KafkaConfigError::ParseError {
158 path,
159 message: e.to_string(),
160 })
161 }
162 #[cfg(not(feature = "config"))]
163 {
164 let _ = (content, path);
165 Err(KafkaConfigError::UnsupportedFormat {
166 ext: "json — enable the `config` feature".to_string(),
167 })
168 }
169}
170
171// ============================================================================
172// Merge Helper
173// ============================================================================
174
175/// Merge profile defaults with user overrides.
176///
177/// Starts with `profile` defaults, then applies `overrides` on top.
178/// User overrides always win.
179#[must_use]
180pub fn merge_with_overrides<S: std::hash::BuildHasher>(
181 profile: &[(&str, &str)],
182 overrides: &HashMap<String, String, S>,
183) -> HashMap<String, String> {
184 let mut config = HashMap::with_capacity(profile.len() + overrides.len());
185
186 for (key, value) in profile {
187 config.insert((*key).to_string(), (*value).to_string());
188 }
189 for (key, value) in overrides {
190 config.insert(key.clone(), value.clone());
191 }
192
193 config
194}
195
196// ============================================================================
197// Consumer Profiles
198// ============================================================================
199
200/// Production consumer baseline — lean, only non-defaults.
201///
202/// | Setting | Value | librdkafka Default | Why |
203/// |---|---|---|---|
204/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | KIP-429: avoids stop-the-world rebalances |
205/// | `fetch.min.bytes` | 1 MiB | 1 byte | Batch fetches for throughput |
206/// | `fetch.wait.max.ms` | 100 ms | 500 ms | Bound latency when fetch.min.bytes not met |
207/// | `queued.min.messages` | 20000 | 100000 | 10-20K batches are most efficient |
208/// | `enable.auto.commit` | false | true | DFE services manage offset commits |
209/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable Prometheus metrics |
210pub const CONSUMER_PRODUCTION: &[(&str, &str)] = &[
211 ("partition.assignment.strategy", "cooperative-sticky"),
212 ("fetch.min.bytes", "1048576"),
213 ("fetch.wait.max.ms", "100"),
214 ("queued.min.messages", "20000"),
215 ("enable.auto.commit", "false"),
216 ("statistics.interval.ms", "1000"),
217];
218
219/// Development/test consumer baseline — fast iteration, low memory.
220///
221/// | Setting | Value | librdkafka Default | Why |
222/// |---|---|---|---|
223/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | Consistent across environments |
224/// | `queued.min.messages` | 1000 | 100000 | Lower memory for dev machines |
225/// | `enable.auto.commit` | false | true | DFE services manage commits |
226/// | `reconnect.backoff.ms` | 10 ms | 100 ms | Fast reconnect for quick iteration |
227/// | `reconnect.backoff.max.ms` | 100 ms | 10000 ms | Cap quickly |
228/// | `log.connection.close` | true | false | Debug-friendly |
229/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable metrics even in dev |
230pub const CONSUMER_DEVTEST: &[(&str, &str)] = &[
231 ("partition.assignment.strategy", "cooperative-sticky"),
232 ("queued.min.messages", "1000"),
233 ("enable.auto.commit", "false"),
234 ("reconnect.backoff.ms", "10"),
235 ("reconnect.backoff.max.ms", "100"),
236 ("log.connection.close", "true"),
237 ("statistics.interval.ms", "1000"),
238];
239
240/// Low-latency consumer — minimal fetch delay.
241///
242/// | Setting | Value | librdkafka Default | Why |
243/// |---|---|---|---|
244/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | Consistent across envs |
245/// | `fetch.wait.max.ms` | 10 ms | 500 ms | Return quickly |
246/// | `queued.min.messages` | 1000 | 100000 | Smaller pre-fetch queue |
247/// | `enable.auto.commit` | false | true | DFE manages commits |
248/// | `reconnect.backoff.ms` | 10 ms | 100 ms | Fast reconnect |
249/// | `reconnect.backoff.max.ms` | 100 ms | 10000 ms | Cap quickly |
250/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
251pub const CONSUMER_LOW_LATENCY: &[(&str, &str)] = &[
252 ("partition.assignment.strategy", "cooperative-sticky"),
253 ("fetch.wait.max.ms", "10"),
254 ("queued.min.messages", "1000"),
255 ("enable.auto.commit", "false"),
256 ("reconnect.backoff.ms", "10"),
257 ("reconnect.backoff.max.ms", "100"),
258 ("statistics.interval.ms", "1000"),
259];
260
261// ============================================================================
262// Producer Profiles
263// ============================================================================
264
265/// Production producer baseline — high throughput, zstd compression.
266///
267/// | Setting | Value | librdkafka Default | Why |
268/// |---|---|---|---|
269/// | `linger.ms` | 100 ms | 5 ms | Accumulate larger batches |
270/// | `compression.type` | zstd | none | Best ratio with good CPU |
271/// | `socket.nagle.disable` | true | false | Kafka batches at app level |
272/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable Prometheus metrics |
273pub const PRODUCER_PRODUCTION: &[(&str, &str)] = &[
274 ("linger.ms", "100"),
275 ("compression.type", "zstd"),
276 ("socket.nagle.disable", "true"),
277 ("statistics.interval.ms", "1000"),
278];
279
280/// Exactly-once producer — idempotence + ordering.
281///
282/// | Setting | Value | librdkafka Default | Why |
283/// |---|---|---|---|
284/// | `enable.idempotence` | true | false | Exactly-once within partition |
285/// | `acks` | all | all (-1) | Invariant for EOS (explicit) |
286/// | `max.in.flight.requests.per.connection` | 5 | 1000000 | Max for idempotent producer |
287/// | `linger.ms` | 20 ms | 5 ms | Moderate batching |
288/// | `compression.type` | zstd | none | Best ratio |
289/// | `socket.nagle.disable` | true | false | Kafka batches at app level |
290/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
291pub const PRODUCER_EXACTLY_ONCE: &[(&str, &str)] = &[
292 ("enable.idempotence", "true"),
293 ("acks", "all"),
294 ("max.in.flight.requests.per.connection", "5"),
295 ("linger.ms", "20"),
296 ("compression.type", "zstd"),
297 ("socket.nagle.disable", "true"),
298 ("statistics.interval.ms", "1000"),
299];
300
301/// Low-latency producer — minimal delay, leader-ack only.
302///
303/// | Setting | Value | librdkafka Default | Why |
304/// |---|---|---|---|
305/// | `acks` | 1 | all (-1) | Leader ack only for speed |
306/// | `linger.ms` | 0 ms | 5 ms | Send immediately |
307/// | `compression.type` | lz4 | none | LZ4 is fastest codec |
308/// | `socket.nagle.disable` | true | false | No TCP coalescing |
309/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
310pub const PRODUCER_LOW_LATENCY: &[(&str, &str)] = &[
311 ("acks", "1"),
312 ("linger.ms", "0"),
313 ("compression.type", "lz4"),
314 ("socket.nagle.disable", "true"),
315 ("statistics.interval.ms", "1000"),
316];
317
318/// DevTest producer — fast acks, no compression.
319///
320/// | Setting | Value | librdkafka Default | Why |
321/// |---|---|---|---|
322/// | `acks` | 1 | all (-1) | Faster for dev |
323/// | `socket.nagle.disable` | true | false | No TCP coalescing |
324/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics in dev |
325pub const PRODUCER_DEVTEST: &[(&str, &str)] = &[
326 ("acks", "1"),
327 ("socket.nagle.disable", "true"),
328 ("statistics.interval.ms", "1000"),
329];
330
331// ============================================================================
332// DFE Source Convention
333// ============================================================================
334
335/// Default topic suffix for landing zone (raw ingest).
336pub const TOPIC_SUFFIX_LAND: &str = "_land";
337
338/// Default topic suffix for load-ready data (post-transform).
339pub const TOPIC_SUFFIX_LOAD: &str = "_load";
340
341/// DFE service role — determines consumer group naming convention.
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum ServiceRole {
344 /// Transform services (middleware): CG = `dfe-{service}-{source}`.
345 ///
346 /// Transforms sit between `_land` and `_load` topics. Each source gets
347 /// its own consumer group so multiple transform pipelines don't compete.
348 Transform,
349
350 /// Universal consumers (loader, archiver): CG = `dfe-{service}`.
351 ///
352 /// Universal services consume from whatever topics are configured or
353 /// auto-discovered. The source name is not part of the consumer group.
354 Universal,
355}
356
357/// DFE source-aware topic naming for transform services.
358///
359/// All DFE data flows follow the same topology:
360///
361/// ```text
362/// receiver -> {source}_land -> transform -> {source}_load -> loader -> ClickHouse
363/// ```
364///
365/// `DfeSource` is for **transform services** (middleware) that sit between
366/// `_land` and `_load`. It derives input/output topic names and source-scoped
367/// consumer group IDs from a source name.
368///
369/// Terminal consumers (loader, archiver) do not use `DfeSource` — they
370/// consume from whatever topics are configured or auto-discovered, and their
371/// consumer group is simply `dfe-{service}` without a source component.
372///
373/// # Examples
374///
375/// ```
376/// use hyperi_rustlib::kafka_config::{DfeSource, ServiceRole};
377///
378/// let source = DfeSource::new("syslog");
379/// assert_eq!(source.input_topic(), "syslog_land");
380/// assert_eq!(source.output_topic(), "syslog_load");
381///
382/// // Transform: CG includes source name
383/// assert_eq!(
384/// source.consumer_group("transform-vector", ServiceRole::Transform, None, None).unwrap(),
385/// "dfe-transform-vector-syslog"
386/// );
387///
388/// // Terminal: CG is just the service name
389/// assert_eq!(
390/// source.consumer_group("loader", ServiceRole::Universal, None, None).unwrap(),
391/// "dfe-loader"
392/// );
393///
394/// // Override always wins
395/// assert_eq!(
396/// source.consumer_group("transform-vector", ServiceRole::Transform, None, Some("custom")).unwrap(),
397/// "custom"
398/// );
399///
400/// // Transform without source is an error
401/// let empty = DfeSource::new("");
402/// assert!(empty.consumer_group("transform-vector", ServiceRole::Transform, None, None).is_err());
403/// ```
404#[derive(Debug, Clone, PartialEq, Eq)]
405pub struct DfeSource {
406 name: String,
407 land_suffix: String,
408 load_suffix: String,
409}
410
411impl DfeSource {
412 /// Create a new source with default suffixes (`_land`, `_load`).
413 #[must_use]
414 pub fn new(name: impl Into<String>) -> Self {
415 Self {
416 name: name.into(),
417 land_suffix: TOPIC_SUFFIX_LAND.to_string(),
418 load_suffix: TOPIC_SUFFIX_LOAD.to_string(),
419 }
420 }
421
422 /// Create a source with custom suffixes.
423 #[must_use]
424 pub fn with_suffixes(
425 name: impl Into<String>,
426 land_suffix: impl Into<String>,
427 load_suffix: impl Into<String>,
428 ) -> Self {
429 Self {
430 name: name.into(),
431 land_suffix: land_suffix.into(),
432 load_suffix: load_suffix.into(),
433 }
434 }
435
436 /// Source name (e.g. `"syslog"`, `"netflow"`).
437 #[must_use]
438 pub fn name(&self) -> &str {
439 &self.name
440 }
441
442 /// Landing zone topic: `{source}_land`.
443 #[must_use]
444 pub fn input_topic(&self) -> String {
445 format!("{}{}", self.name, self.land_suffix)
446 }
447
448 /// Load-ready topic: `{source}_load`.
449 #[must_use]
450 pub fn output_topic(&self) -> String {
451 format!("{}{}", self.name, self.load_suffix)
452 }
453
454 /// Consumer group ID following DFE naming conventions.
455 ///
456 /// The `cg_override` takes precedence when set — use it when the operator
457 /// explicitly configures a consumer group in YAML/env.
458 ///
459 /// When `cg_override` is `None`, the default pattern depends on the
460 /// service role:
461 ///
462 /// | Role | Pattern | Example |
463 /// |------|---------|---------|
464 /// | Transform | `dfe-{service}-{source}` | `dfe-transform-vector-syslog` |
465 /// | Universal (loader, archiver) | `dfe-{service}` | `dfe-loader` |
466 ///
467 /// For transforms, `pipeline` overrides the source component in the CG
468 /// (e.g. `syslog-enriched` instead of `syslog`). Either the `DfeSource`
469 /// name or `pipeline` must be non-empty — a bare service name is never
470 /// valid for transforms because multiple pipelines would compete.
471 ///
472 /// # Errors
473 ///
474 /// Returns `Err` if the role is `Transform` and neither the source name
475 /// nor `pipeline` provides a non-empty suffix.
476 pub fn consumer_group(
477 &self,
478 service: &str,
479 role: ServiceRole,
480 pipeline: Option<&str>,
481 cg_override: Option<&str>,
482 ) -> Result<String, KafkaConfigError> {
483 if let Some(cg) = cg_override {
484 return Ok(cg.to_string());
485 }
486
487 match role {
488 ServiceRole::Transform => {
489 let suffix = pipeline.unwrap_or(&self.name);
490 if suffix.is_empty() {
491 return Err(KafkaConfigError::ParseError {
492 path: String::new(),
493 message: format!(
494 "transform service '{service}' requires a source or pipeline \
495 name for its consumer group — a bare 'dfe-{service}' CG would \
496 cause multiple pipelines to compete for messages"
497 ),
498 });
499 }
500 Ok(format!("dfe-{service}-{suffix}"))
501 }
502 ServiceRole::Universal => Ok(format!("dfe-{service}")),
503 }
504 }
505
506 /// Derive the source name from a topic by stripping known suffixes.
507 ///
508 /// Returns `None` if the topic doesn't end with a known suffix.
509 ///
510 /// ```
511 /// use hyperi_rustlib::kafka_config::DfeSource;
512 ///
513 /// assert_eq!(DfeSource::source_from_topic("syslog_land"), Some("syslog"));
514 /// assert_eq!(DfeSource::source_from_topic("netflow_load"), Some("netflow"));
515 /// assert_eq!(DfeSource::source_from_topic("unknown"), None);
516 /// ```
517 #[must_use]
518 pub fn source_from_topic(topic: &str) -> Option<&str> {
519 topic
520 .strip_suffix(TOPIC_SUFFIX_LAND)
521 .or_else(|| topic.strip_suffix(TOPIC_SUFFIX_LOAD))
522 }
523}
524
525// ============================================================================
526// Tests
527// ============================================================================
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532
533 #[test]
534 fn consumer_production_only_non_defaults() {
535 assert_eq!(CONSUMER_PRODUCTION.len(), 6);
536 let map: HashMap<&str, &str> = CONSUMER_PRODUCTION.iter().copied().collect();
537 assert_eq!(map["partition.assignment.strategy"], "cooperative-sticky");
538 assert_eq!(map["fetch.min.bytes"], "1048576");
539 assert_eq!(map["fetch.wait.max.ms"], "100");
540 assert_eq!(map["queued.min.messages"], "20000");
541 assert_eq!(map["enable.auto.commit"], "false");
542 assert_eq!(map["statistics.interval.ms"], "1000");
543 }
544
545 #[test]
546 fn producer_production_only_non_defaults() {
547 assert_eq!(PRODUCER_PRODUCTION.len(), 4);
548 let map: HashMap<&str, &str> = PRODUCER_PRODUCTION.iter().copied().collect();
549 assert_eq!(map["linger.ms"], "100");
550 assert_eq!(map["compression.type"], "zstd");
551 assert_eq!(map["socket.nagle.disable"], "true");
552 assert_eq!(map["statistics.interval.ms"], "1000");
553 }
554
555 #[test]
556 fn merge_user_overrides_win() {
557 let mut overrides = HashMap::new();
558 overrides.insert("fetch.min.bytes".to_string(), "2097152".to_string());
559 overrides.insert("custom.setting".to_string(), "value".to_string());
560
561 let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
562
563 assert_eq!(merged["fetch.min.bytes"], "2097152");
564 assert_eq!(merged["custom.setting"], "value");
565 assert_eq!(
566 merged["partition.assignment.strategy"],
567 "cooperative-sticky"
568 );
569 }
570
571 #[test]
572 fn merge_empty_overrides_returns_profile() {
573 let overrides = HashMap::new();
574 let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
575 assert_eq!(merged.len(), CONSUMER_PRODUCTION.len());
576 }
577
578 #[test]
579 fn merge_empty_profile_returns_overrides() {
580 let mut overrides = HashMap::new();
581 overrides.insert("key".to_string(), "value".to_string());
582 let merged = merge_with_overrides(&[], &overrides);
583 assert_eq!(merged.len(), 1);
584 assert_eq!(merged["key"], "value");
585 }
586
587 #[test]
588 fn properties_str_basic() {
589 let content = "\
590# This is a comment
591bootstrap.servers=kafka1:9092,kafka2:9092
592security.protocol=SASL_SSL
593sasl.mechanism=SCRAM-SHA-512
594! Another comment style
595";
596 let config = config_from_properties_str(content);
597 assert_eq!(config.len(), 3);
598 assert_eq!(config["bootstrap.servers"], "kafka1:9092,kafka2:9092");
599 assert_eq!(config["security.protocol"], "SASL_SSL");
600 assert_eq!(config["sasl.mechanism"], "SCRAM-SHA-512");
601 }
602
603 #[test]
604 fn properties_str_value_with_equals() {
605 let content = "ssl.certificate.pem=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI==\n";
606 let config = config_from_properties_str(content);
607 assert_eq!(
608 config["ssl.certificate.pem"],
609 "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI=="
610 );
611 }
612
613 #[test]
614 fn properties_str_empty_and_whitespace() {
615 let config = config_from_properties_str(" \n# comment\n\n");
616 assert!(config.is_empty());
617 }
618
619 #[test]
620 fn config_from_file_properties() {
621 let dir = tempfile::tempdir().unwrap();
622 let path = dir.path().join("kafka.properties");
623 std::fs::write(
624 &path,
625 "bootstrap.servers=kafka:9092\ncompression.type=zstd\n",
626 )
627 .unwrap();
628
629 let config = config_from_file(&path).unwrap();
630 assert_eq!(config["bootstrap.servers"], "kafka:9092");
631 assert_eq!(config["compression.type"], "zstd");
632 }
633
634 #[test]
635 fn config_from_file_not_found() {
636 let result = config_from_file("/nonexistent/kafka.properties");
637 assert!(matches!(result, Err(KafkaConfigError::FileNotFound { .. })));
638 }
639
640 // ===================================================================
641 // DfeSource tests
642 // ===================================================================
643
644 #[test]
645 fn dfe_source_default_topics() {
646 let source = DfeSource::new("syslog");
647 assert_eq!(source.name(), "syslog");
648 assert_eq!(source.input_topic(), "syslog_land");
649 assert_eq!(source.output_topic(), "syslog_load");
650 }
651
652 #[test]
653 fn dfe_source_custom_suffixes() {
654 let source = DfeSource::with_suffixes("auth", "_raw", "_enriched");
655 assert_eq!(source.input_topic(), "auth_raw");
656 assert_eq!(source.output_topic(), "auth_enriched");
657 }
658
659 #[test]
660 fn dfe_source_cg_transform_default() {
661 let source = DfeSource::new("syslog");
662 assert_eq!(
663 source
664 .consumer_group("transform-vector", ServiceRole::Transform, None, None)
665 .unwrap(),
666 "dfe-transform-vector-syslog"
667 );
668 }
669
670 #[test]
671 fn dfe_source_cg_transform_with_pipeline() {
672 let source = DfeSource::new("syslog");
673 assert_eq!(
674 source
675 .consumer_group(
676 "transform-vector",
677 ServiceRole::Transform,
678 Some("syslog-enriched"),
679 None
680 )
681 .unwrap(),
682 "dfe-transform-vector-syslog-enriched"
683 );
684 }
685
686 #[test]
687 fn dfe_source_cg_transform_empty_source_errors() {
688 let source = DfeSource::new("");
689 assert!(
690 source
691 .consumer_group("transform-vector", ServiceRole::Transform, None, None)
692 .is_err()
693 );
694 }
695
696 #[test]
697 fn dfe_source_cg_transform_empty_source_pipeline_rescues() {
698 let source = DfeSource::new("");
699 assert_eq!(
700 source
701 .consumer_group(
702 "transform-vector",
703 ServiceRole::Transform,
704 Some("syslog"),
705 None
706 )
707 .unwrap(),
708 "dfe-transform-vector-syslog"
709 );
710 }
711
712 #[test]
713 fn dfe_source_cg_universal() {
714 let source = DfeSource::new("netflow");
715 assert_eq!(
716 source
717 .consumer_group("loader", ServiceRole::Universal, None, None)
718 .unwrap(),
719 "dfe-loader"
720 );
721 }
722
723 #[test]
724 fn dfe_source_cg_universal_ignores_pipeline() {
725 let source = DfeSource::new("syslog");
726 assert_eq!(
727 source
728 .consumer_group("archiver", ServiceRole::Universal, Some("ignored"), None)
729 .unwrap(),
730 "dfe-archiver"
731 );
732 }
733
734 #[test]
735 fn dfe_source_cg_override_wins() {
736 let source = DfeSource::new("syslog");
737 assert_eq!(
738 source
739 .consumer_group(
740 "transform-vector",
741 ServiceRole::Transform,
742 None,
743 Some("my-custom-cg")
744 )
745 .unwrap(),
746 "my-custom-cg"
747 );
748 }
749
750 #[test]
751 fn dfe_source_cg_override_wins_universal() {
752 let source = DfeSource::new("syslog");
753 assert_eq!(
754 source
755 .consumer_group(
756 "loader",
757 ServiceRole::Universal,
758 None,
759 Some("custom-loader-cg")
760 )
761 .unwrap(),
762 "custom-loader-cg"
763 );
764 }
765
766 #[test]
767 fn dfe_source_from_topic_land() {
768 assert_eq!(DfeSource::source_from_topic("syslog_land"), Some("syslog"));
769 assert_eq!(DfeSource::source_from_topic("auth_land"), Some("auth"));
770 }
771
772 #[test]
773 fn dfe_source_from_topic_load() {
774 assert_eq!(DfeSource::source_from_topic("syslog_load"), Some("syslog"));
775 assert_eq!(
776 DfeSource::source_from_topic("netflow_load"),
777 Some("netflow")
778 );
779 }
780
781 #[test]
782 fn dfe_source_from_topic_unknown() {
783 assert_eq!(DfeSource::source_from_topic("unknown"), None);
784 assert_eq!(DfeSource::source_from_topic("events"), None);
785 assert_eq!(DfeSource::source_from_topic(""), None);
786 }
787
788 #[test]
789 fn dfe_source_from_topic_edge_cases() {
790 assert_eq!(DfeSource::source_from_topic("_land"), Some(""));
791 assert_eq!(DfeSource::source_from_topic("a_load"), Some("a"));
792 }
793
794 #[test]
795 fn config_from_file_unsupported_extension() {
796 let dir = tempfile::tempdir().unwrap();
797 let path = dir.path().join("kafka.toml");
798 std::fs::write(&path, "key = value\n").unwrap();
799
800 let result = config_from_file(&path);
801 assert!(matches!(
802 result,
803 Err(KafkaConfigError::UnsupportedFormat { .. })
804 ));
805 }
806}