Skip to main content

hyperi_rustlib/
kafka_config.rs

1// Project:   hyperi-rustlib
2// File:      src/kafka_config.rs
3// Purpose:   Shared Kafka librdkafka defaults, profiles, and file config loader
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Shared Kafka librdkafka configuration profiles, merge helper, and file loader.
10//!
11//! This module is always available (no feature gate). The core profile constants
12//! and merge helper have zero external dependencies. File loading supports
13//! `.properties` without any feature gate; YAML and JSON require the
14//! `directory-config` and `config` features respectively.
15//!
16//! ## Loading from Config Git Directory
17//!
18//! Services store librdkafka settings in their config git directory and load
19//! them with [`config_from_file`]:
20//!
21//! ```rust,ignore
22//! use hyperi_rustlib::kafka_config::{config_from_file, merge_with_overrides, CONSUMER_PRODUCTION};
23//!
24//! let overrides = config_from_file("/config/kafka.properties")?;
25//! let rdkafka_config = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
26//! ```
27
28use std::collections::HashMap;
29use std::path::Path;
30
31use thiserror::Error;
32
33// ============================================================================
34// Error Type
35// ============================================================================
36
37/// Error loading librdkafka configuration from a file.
38#[derive(Debug, Error)]
39pub enum KafkaConfigError {
40    /// File does not exist.
41    #[error("kafka config file not found: {path}")]
42    FileNotFound { path: std::path::PathBuf },
43
44    /// File extension is not supported (or feature is not enabled).
45    #[error("unsupported kafka config format: {ext}. Supported: .properties, .yaml, .yml, .json")]
46    UnsupportedFormat { ext: String },
47
48    /// File content could not be parsed.
49    #[error("parse error in {path}: {message}")]
50    ParseError { path: String, message: String },
51
52    /// I/O error reading the file.
53    #[error("io error reading kafka config: {0}")]
54    Io(#[from] std::io::Error),
55}
56
57/// Result type for kafka config file operations.
58pub type KafkaConfigResult<T> = Result<T, KafkaConfigError>;
59
60// ============================================================================
61// File Loading
62// ============================================================================
63
64/// Load librdkafka configuration from a file in the config git directory.
65///
66/// Detects format from file extension:
67///
68/// | Extension | Format | Requires |
69/// |-----------|--------|---------|
70/// | `.properties` | Java-style `key=value` | nothing (always available) |
71/// | `.yaml`, `.yml` | YAML flat mapping | `directory-config` feature |
72/// | `.json` | JSON object | `config` feature |
73///
74/// The returned map passes directly to [`merge_with_overrides`] or as
75/// `librdkafka_overrides` in `KafkaConfig`.
76///
77/// # Errors
78///
79/// Returns [`KafkaConfigError`] if the file is missing, the format is
80/// unsupported, or parsing fails.
81pub fn config_from_file(path: impl AsRef<Path>) -> KafkaConfigResult<HashMap<String, String>> {
82    let path = path.as_ref();
83
84    let content = std::fs::read_to_string(path).map_err(|e| {
85        if e.kind() == std::io::ErrorKind::NotFound {
86            KafkaConfigError::FileNotFound {
87                path: path.to_path_buf(),
88            }
89        } else {
90            KafkaConfigError::Io(e)
91        }
92    })?;
93
94    let ext = path
95        .extension()
96        .and_then(|s| s.to_str())
97        .unwrap_or("")
98        .to_lowercase();
99
100    let path_str = path.display().to_string();
101
102    match ext.as_str() {
103        "properties" => Ok(config_from_properties_str(&content)),
104        "yaml" | "yml" => parse_yaml(&content, path_str),
105        "json" => parse_json(&content, path_str),
106        other => Err(KafkaConfigError::UnsupportedFormat {
107            ext: other.to_string(),
108        }),
109    }
110}
111
112/// Parse Java-style `.properties` content into a librdkafka config map.
113///
114/// Handles:
115/// - `key=value` pairs (splits on first `=` only, so values may contain `=`)
116/// - `#` and `!` comments
117/// - Empty lines and surrounding whitespace
118///
119/// Always available with no feature gate.
120#[must_use]
121pub fn config_from_properties_str(content: &str) -> HashMap<String, String> {
122    let mut config = HashMap::new();
123
124    for line in content.lines() {
125        let line = line.trim();
126        if line.is_empty() || line.starts_with('#') || line.starts_with('!') {
127            continue;
128        }
129        if let Some((key, value)) = line.split_once('=') {
130            config.insert(key.trim().to_string(), value.trim().to_string());
131        }
132    }
133
134    config
135}
136
137fn parse_yaml(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
138    #[cfg(feature = "directory-config")]
139    {
140        serde_yaml_ng::from_str(content).map_err(|e| KafkaConfigError::ParseError {
141            path,
142            message: e.to_string(),
143        })
144    }
145    #[cfg(not(feature = "directory-config"))]
146    {
147        let _ = (content, path);
148        Err(KafkaConfigError::UnsupportedFormat {
149            ext: "yaml — enable the `directory-config` feature".to_string(),
150        })
151    }
152}
153
154fn parse_json(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
155    #[cfg(feature = "config")]
156    {
157        serde_json::from_str(content).map_err(|e| KafkaConfigError::ParseError {
158            path,
159            message: e.to_string(),
160        })
161    }
162    #[cfg(not(feature = "config"))]
163    {
164        let _ = (content, path);
165        Err(KafkaConfigError::UnsupportedFormat {
166            ext: "json — enable the `config` feature".to_string(),
167        })
168    }
169}
170
171// ============================================================================
172// Merge Helper
173// ============================================================================
174
175/// Merge profile defaults with user overrides.
176///
177/// Starts with `profile` defaults, then applies `overrides` on top.
178/// User overrides always win.
179#[must_use]
180pub fn merge_with_overrides<S: std::hash::BuildHasher>(
181    profile: &[(&str, &str)],
182    overrides: &HashMap<String, String, S>,
183) -> HashMap<String, String> {
184    let mut config = HashMap::with_capacity(profile.len() + overrides.len());
185
186    for (key, value) in profile {
187        config.insert((*key).to_string(), (*value).to_string());
188    }
189    for (key, value) in overrides {
190        config.insert(key.clone(), value.clone());
191    }
192
193    config
194}
195
196// ============================================================================
197// Consumer Profiles
198// ============================================================================
199
200/// Production consumer baseline — lean, only non-defaults.
201///
202/// | Setting | Value | librdkafka Default | Why |
203/// |---|---|---|---|
204/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | KIP-429: avoids stop-the-world rebalances |
205/// | `fetch.min.bytes` | 1 MiB | 1 byte | Batch fetches for throughput |
206/// | `fetch.wait.max.ms` | 100 ms | 500 ms | Bound latency when fetch.min.bytes not met |
207/// | `queued.min.messages` | 20000 | 100000 | 10-20K batches are most efficient |
208/// | `enable.auto.commit` | false | true | DFE services manage offset commits |
209/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable Prometheus metrics |
210pub const CONSUMER_PRODUCTION: &[(&str, &str)] = &[
211    ("partition.assignment.strategy", "cooperative-sticky"),
212    ("fetch.min.bytes", "1048576"),
213    ("fetch.wait.max.ms", "100"),
214    ("queued.min.messages", "20000"),
215    ("enable.auto.commit", "false"),
216    ("statistics.interval.ms", "1000"),
217];
218
219/// Development/test consumer baseline — fast iteration, low memory.
220///
221/// | Setting | Value | librdkafka Default | Why |
222/// |---|---|---|---|
223/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | Consistent across environments |
224/// | `queued.min.messages` | 1000 | 100000 | Lower memory for dev machines |
225/// | `enable.auto.commit` | false | true | DFE services manage commits |
226/// | `reconnect.backoff.ms` | 10 ms | 100 ms | Fast reconnect for quick iteration |
227/// | `reconnect.backoff.max.ms` | 100 ms | 10000 ms | Cap quickly |
228/// | `log.connection.close` | true | false | Debug-friendly |
229/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable metrics even in dev |
230pub const CONSUMER_DEVTEST: &[(&str, &str)] = &[
231    ("partition.assignment.strategy", "cooperative-sticky"),
232    ("queued.min.messages", "1000"),
233    ("enable.auto.commit", "false"),
234    ("reconnect.backoff.ms", "10"),
235    ("reconnect.backoff.max.ms", "100"),
236    ("log.connection.close", "true"),
237    ("statistics.interval.ms", "1000"),
238];
239
240/// Low-latency consumer — minimal fetch delay.
241///
242/// | Setting | Value | librdkafka Default | Why |
243/// |---|---|---|---|
244/// | `partition.assignment.strategy` | `cooperative-sticky` | `range,roundrobin` | Consistent across envs |
245/// | `fetch.wait.max.ms` | 10 ms | 500 ms | Return quickly |
246/// | `queued.min.messages` | 1000 | 100000 | Smaller pre-fetch queue |
247/// | `enable.auto.commit` | false | true | DFE manages commits |
248/// | `reconnect.backoff.ms` | 10 ms | 100 ms | Fast reconnect |
249/// | `reconnect.backoff.max.ms` | 100 ms | 10000 ms | Cap quickly |
250/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
251pub const CONSUMER_LOW_LATENCY: &[(&str, &str)] = &[
252    ("partition.assignment.strategy", "cooperative-sticky"),
253    ("fetch.wait.max.ms", "10"),
254    ("queued.min.messages", "1000"),
255    ("enable.auto.commit", "false"),
256    ("reconnect.backoff.ms", "10"),
257    ("reconnect.backoff.max.ms", "100"),
258    ("statistics.interval.ms", "1000"),
259];
260
261// ============================================================================
262// Producer Profiles
263// ============================================================================
264
265/// Production producer baseline — high throughput, zstd compression.
266///
267/// | Setting | Value | librdkafka Default | Why |
268/// |---|---|---|---|
269/// | `linger.ms` | 100 ms | 5 ms | Accumulate larger batches |
270/// | `compression.type` | zstd | none | Best ratio with good CPU |
271/// | `socket.nagle.disable` | true | false | Kafka batches at app level |
272/// | `statistics.interval.ms` | 1000 ms | 0 (disabled) | Enable Prometheus metrics |
273pub const PRODUCER_PRODUCTION: &[(&str, &str)] = &[
274    ("linger.ms", "100"),
275    ("compression.type", "zstd"),
276    ("socket.nagle.disable", "true"),
277    ("statistics.interval.ms", "1000"),
278];
279
280/// Exactly-once producer — idempotence + ordering.
281///
282/// | Setting | Value | librdkafka Default | Why |
283/// |---|---|---|---|
284/// | `enable.idempotence` | true | false | Exactly-once within partition |
285/// | `acks` | all | all (-1) | Invariant for EOS (explicit) |
286/// | `max.in.flight.requests.per.connection` | 5 | 1000000 | Max for idempotent producer |
287/// | `linger.ms` | 20 ms | 5 ms | Moderate batching |
288/// | `compression.type` | zstd | none | Best ratio |
289/// | `socket.nagle.disable` | true | false | Kafka batches at app level |
290/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
291pub const PRODUCER_EXACTLY_ONCE: &[(&str, &str)] = &[
292    ("enable.idempotence", "true"),
293    ("acks", "all"),
294    ("max.in.flight.requests.per.connection", "5"),
295    ("linger.ms", "20"),
296    ("compression.type", "zstd"),
297    ("socket.nagle.disable", "true"),
298    ("statistics.interval.ms", "1000"),
299];
300
301/// Low-latency producer — minimal delay, leader-ack only.
302///
303/// | Setting | Value | librdkafka Default | Why |
304/// |---|---|---|---|
305/// | `acks` | 1 | all (-1) | Leader ack only for speed |
306/// | `linger.ms` | 0 ms | 5 ms | Send immediately |
307/// | `compression.type` | lz4 | none | LZ4 is fastest codec |
308/// | `socket.nagle.disable` | true | false | No TCP coalescing |
309/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics |
310pub const PRODUCER_LOW_LATENCY: &[(&str, &str)] = &[
311    ("acks", "1"),
312    ("linger.ms", "0"),
313    ("compression.type", "lz4"),
314    ("socket.nagle.disable", "true"),
315    ("statistics.interval.ms", "1000"),
316];
317
318/// DevTest producer — fast acks, no compression.
319///
320/// | Setting | Value | librdkafka Default | Why |
321/// |---|---|---|---|
322/// | `acks` | 1 | all (-1) | Faster for dev |
323/// | `socket.nagle.disable` | true | false | No TCP coalescing |
324/// | `statistics.interval.ms` | 1000 ms | 0 | Enable metrics in dev |
325pub const PRODUCER_DEVTEST: &[(&str, &str)] = &[
326    ("acks", "1"),
327    ("socket.nagle.disable", "true"),
328    ("statistics.interval.ms", "1000"),
329];
330
331// ============================================================================
332// DFE Source Convention
333// ============================================================================
334
335/// Default topic suffix for landing zone (raw ingest).
336pub const TOPIC_SUFFIX_LAND: &str = "_land";
337
338/// Default topic suffix for load-ready data (post-transform).
339pub const TOPIC_SUFFIX_LOAD: &str = "_load";
340
341/// DFE service role — determines consumer group naming convention.
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum ServiceRole {
344    /// Transform services (middleware): CG = `dfe-{service}-{source}`.
345    ///
346    /// Transforms sit between `_land` and `_load` topics. Each source gets
347    /// its own consumer group so multiple transform pipelines don't compete.
348    Transform,
349
350    /// Universal consumers (loader, archiver): CG = `dfe-{service}`.
351    ///
352    /// Universal services consume from whatever topics are configured or
353    /// auto-discovered. The source name is not part of the consumer group.
354    Universal,
355}
356
357/// DFE source-aware topic naming for transform services.
358///
359/// All DFE data flows follow the same topology:
360///
361/// ```text
362/// receiver -> {source}_land -> transform -> {source}_load -> loader -> ClickHouse
363/// ```
364///
365/// `DfeSource` is for **transform services** (middleware) that sit between
366/// `_land` and `_load`. It derives input/output topic names and source-scoped
367/// consumer group IDs from a source name.
368///
369/// Terminal consumers (loader, archiver) do not use `DfeSource` — they
370/// consume from whatever topics are configured or auto-discovered, and their
371/// consumer group is simply `dfe-{service}` without a source component.
372///
373/// # Examples
374///
375/// ```
376/// use hyperi_rustlib::kafka_config::{DfeSource, ServiceRole};
377///
378/// let source = DfeSource::new("syslog");
379/// assert_eq!(source.input_topic(), "syslog_land");
380/// assert_eq!(source.output_topic(), "syslog_load");
381///
382/// // Transform: CG includes source name
383/// assert_eq!(
384///     source.consumer_group("transform-vector", ServiceRole::Transform, None, None).unwrap(),
385///     "dfe-transform-vector-syslog"
386/// );
387///
388/// // Terminal: CG is just the service name
389/// assert_eq!(
390///     source.consumer_group("loader", ServiceRole::Universal, None, None).unwrap(),
391///     "dfe-loader"
392/// );
393///
394/// // Override always wins
395/// assert_eq!(
396///     source.consumer_group("transform-vector", ServiceRole::Transform, None, Some("custom")).unwrap(),
397///     "custom"
398/// );
399///
400/// // Transform without source is an error
401/// let empty = DfeSource::new("");
402/// assert!(empty.consumer_group("transform-vector", ServiceRole::Transform, None, None).is_err());
403/// ```
404#[derive(Debug, Clone, PartialEq, Eq)]
405pub struct DfeSource {
406    name: String,
407    land_suffix: String,
408    load_suffix: String,
409}
410
411impl DfeSource {
412    /// Create a new source with default suffixes (`_land`, `_load`).
413    #[must_use]
414    pub fn new(name: impl Into<String>) -> Self {
415        Self {
416            name: name.into(),
417            land_suffix: TOPIC_SUFFIX_LAND.to_string(),
418            load_suffix: TOPIC_SUFFIX_LOAD.to_string(),
419        }
420    }
421
422    /// Create a source with custom suffixes.
423    #[must_use]
424    pub fn with_suffixes(
425        name: impl Into<String>,
426        land_suffix: impl Into<String>,
427        load_suffix: impl Into<String>,
428    ) -> Self {
429        Self {
430            name: name.into(),
431            land_suffix: land_suffix.into(),
432            load_suffix: load_suffix.into(),
433        }
434    }
435
436    /// Source name (e.g. `"syslog"`, `"netflow"`).
437    #[must_use]
438    pub fn name(&self) -> &str {
439        &self.name
440    }
441
442    /// Landing zone topic: `{source}_land`.
443    #[must_use]
444    pub fn input_topic(&self) -> String {
445        format!("{}{}", self.name, self.land_suffix)
446    }
447
448    /// Load-ready topic: `{source}_load`.
449    #[must_use]
450    pub fn output_topic(&self) -> String {
451        format!("{}{}", self.name, self.load_suffix)
452    }
453
454    /// Consumer group ID following DFE naming conventions.
455    ///
456    /// The `cg_override` takes precedence when set — use it when the operator
457    /// explicitly configures a consumer group in YAML/env.
458    ///
459    /// When `cg_override` is `None`, the default pattern depends on the
460    /// service role:
461    ///
462    /// | Role | Pattern | Example |
463    /// |------|---------|---------|
464    /// | Transform | `dfe-{service}-{source}` | `dfe-transform-vector-syslog` |
465    /// | Universal (loader, archiver) | `dfe-{service}` | `dfe-loader` |
466    ///
467    /// For transforms, `pipeline` overrides the source component in the CG
468    /// (e.g. `syslog-enriched` instead of `syslog`). Either the `DfeSource`
469    /// name or `pipeline` must be non-empty — a bare service name is never
470    /// valid for transforms because multiple pipelines would compete.
471    ///
472    /// # Errors
473    ///
474    /// Returns `Err` if the role is `Transform` and neither the source name
475    /// nor `pipeline` provides a non-empty suffix.
476    pub fn consumer_group(
477        &self,
478        service: &str,
479        role: ServiceRole,
480        pipeline: Option<&str>,
481        cg_override: Option<&str>,
482    ) -> Result<String, KafkaConfigError> {
483        if let Some(cg) = cg_override {
484            return Ok(cg.to_string());
485        }
486
487        match role {
488            ServiceRole::Transform => {
489                let suffix = pipeline.unwrap_or(&self.name);
490                if suffix.is_empty() {
491                    return Err(KafkaConfigError::ParseError {
492                        path: String::new(),
493                        message: format!(
494                            "transform service '{service}' requires a source or pipeline \
495                             name for its consumer group — a bare 'dfe-{service}' CG would \
496                             cause multiple pipelines to compete for messages"
497                        ),
498                    });
499                }
500                Ok(format!("dfe-{service}-{suffix}"))
501            }
502            ServiceRole::Universal => Ok(format!("dfe-{service}")),
503        }
504    }
505
506    /// Derive the source name from a topic by stripping known suffixes.
507    ///
508    /// Returns `None` if the topic doesn't end with a known suffix.
509    ///
510    /// ```
511    /// use hyperi_rustlib::kafka_config::DfeSource;
512    ///
513    /// assert_eq!(DfeSource::source_from_topic("syslog_land"), Some("syslog"));
514    /// assert_eq!(DfeSource::source_from_topic("netflow_load"), Some("netflow"));
515    /// assert_eq!(DfeSource::source_from_topic("unknown"), None);
516    /// ```
517    #[must_use]
518    pub fn source_from_topic(topic: &str) -> Option<&str> {
519        topic
520            .strip_suffix(TOPIC_SUFFIX_LAND)
521            .or_else(|| topic.strip_suffix(TOPIC_SUFFIX_LOAD))
522    }
523}
524
525// ============================================================================
526// Tests
527// ============================================================================
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532
533    #[test]
534    fn consumer_production_only_non_defaults() {
535        assert_eq!(CONSUMER_PRODUCTION.len(), 6);
536        let map: HashMap<&str, &str> = CONSUMER_PRODUCTION.iter().copied().collect();
537        assert_eq!(map["partition.assignment.strategy"], "cooperative-sticky");
538        assert_eq!(map["fetch.min.bytes"], "1048576");
539        assert_eq!(map["fetch.wait.max.ms"], "100");
540        assert_eq!(map["queued.min.messages"], "20000");
541        assert_eq!(map["enable.auto.commit"], "false");
542        assert_eq!(map["statistics.interval.ms"], "1000");
543    }
544
545    #[test]
546    fn producer_production_only_non_defaults() {
547        assert_eq!(PRODUCER_PRODUCTION.len(), 4);
548        let map: HashMap<&str, &str> = PRODUCER_PRODUCTION.iter().copied().collect();
549        assert_eq!(map["linger.ms"], "100");
550        assert_eq!(map["compression.type"], "zstd");
551        assert_eq!(map["socket.nagle.disable"], "true");
552        assert_eq!(map["statistics.interval.ms"], "1000");
553    }
554
555    #[test]
556    fn merge_user_overrides_win() {
557        let mut overrides = HashMap::new();
558        overrides.insert("fetch.min.bytes".to_string(), "2097152".to_string());
559        overrides.insert("custom.setting".to_string(), "value".to_string());
560
561        let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
562
563        assert_eq!(merged["fetch.min.bytes"], "2097152");
564        assert_eq!(merged["custom.setting"], "value");
565        assert_eq!(
566            merged["partition.assignment.strategy"],
567            "cooperative-sticky"
568        );
569    }
570
571    #[test]
572    fn merge_empty_overrides_returns_profile() {
573        let overrides = HashMap::new();
574        let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
575        assert_eq!(merged.len(), CONSUMER_PRODUCTION.len());
576    }
577
578    #[test]
579    fn merge_empty_profile_returns_overrides() {
580        let mut overrides = HashMap::new();
581        overrides.insert("key".to_string(), "value".to_string());
582        let merged = merge_with_overrides(&[], &overrides);
583        assert_eq!(merged.len(), 1);
584        assert_eq!(merged["key"], "value");
585    }
586
587    #[test]
588    fn properties_str_basic() {
589        let content = "\
590# This is a comment
591bootstrap.servers=kafka1:9092,kafka2:9092
592security.protocol=SASL_SSL
593sasl.mechanism=SCRAM-SHA-512
594! Another comment style
595";
596        let config = config_from_properties_str(content);
597        assert_eq!(config.len(), 3);
598        assert_eq!(config["bootstrap.servers"], "kafka1:9092,kafka2:9092");
599        assert_eq!(config["security.protocol"], "SASL_SSL");
600        assert_eq!(config["sasl.mechanism"], "SCRAM-SHA-512");
601    }
602
603    #[test]
604    fn properties_str_value_with_equals() {
605        let content = "ssl.certificate.pem=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI==\n";
606        let config = config_from_properties_str(content);
607        assert_eq!(
608            config["ssl.certificate.pem"],
609            "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI=="
610        );
611    }
612
613    #[test]
614    fn properties_str_empty_and_whitespace() {
615        let config = config_from_properties_str("   \n# comment\n\n");
616        assert!(config.is_empty());
617    }
618
619    #[test]
620    fn config_from_file_properties() {
621        let dir = tempfile::tempdir().unwrap();
622        let path = dir.path().join("kafka.properties");
623        std::fs::write(
624            &path,
625            "bootstrap.servers=kafka:9092\ncompression.type=zstd\n",
626        )
627        .unwrap();
628
629        let config = config_from_file(&path).unwrap();
630        assert_eq!(config["bootstrap.servers"], "kafka:9092");
631        assert_eq!(config["compression.type"], "zstd");
632    }
633
634    #[test]
635    fn config_from_file_not_found() {
636        let result = config_from_file("/nonexistent/kafka.properties");
637        assert!(matches!(result, Err(KafkaConfigError::FileNotFound { .. })));
638    }
639
640    // ===================================================================
641    // DfeSource tests
642    // ===================================================================
643
644    #[test]
645    fn dfe_source_default_topics() {
646        let source = DfeSource::new("syslog");
647        assert_eq!(source.name(), "syslog");
648        assert_eq!(source.input_topic(), "syslog_land");
649        assert_eq!(source.output_topic(), "syslog_load");
650    }
651
652    #[test]
653    fn dfe_source_custom_suffixes() {
654        let source = DfeSource::with_suffixes("auth", "_raw", "_enriched");
655        assert_eq!(source.input_topic(), "auth_raw");
656        assert_eq!(source.output_topic(), "auth_enriched");
657    }
658
659    #[test]
660    fn dfe_source_cg_transform_default() {
661        let source = DfeSource::new("syslog");
662        assert_eq!(
663            source
664                .consumer_group("transform-vector", ServiceRole::Transform, None, None)
665                .unwrap(),
666            "dfe-transform-vector-syslog"
667        );
668    }
669
670    #[test]
671    fn dfe_source_cg_transform_with_pipeline() {
672        let source = DfeSource::new("syslog");
673        assert_eq!(
674            source
675                .consumer_group(
676                    "transform-vector",
677                    ServiceRole::Transform,
678                    Some("syslog-enriched"),
679                    None
680                )
681                .unwrap(),
682            "dfe-transform-vector-syslog-enriched"
683        );
684    }
685
686    #[test]
687    fn dfe_source_cg_transform_empty_source_errors() {
688        let source = DfeSource::new("");
689        assert!(
690            source
691                .consumer_group("transform-vector", ServiceRole::Transform, None, None)
692                .is_err()
693        );
694    }
695
696    #[test]
697    fn dfe_source_cg_transform_empty_source_pipeline_rescues() {
698        let source = DfeSource::new("");
699        assert_eq!(
700            source
701                .consumer_group(
702                    "transform-vector",
703                    ServiceRole::Transform,
704                    Some("syslog"),
705                    None
706                )
707                .unwrap(),
708            "dfe-transform-vector-syslog"
709        );
710    }
711
712    #[test]
713    fn dfe_source_cg_universal() {
714        let source = DfeSource::new("netflow");
715        assert_eq!(
716            source
717                .consumer_group("loader", ServiceRole::Universal, None, None)
718                .unwrap(),
719            "dfe-loader"
720        );
721    }
722
723    #[test]
724    fn dfe_source_cg_universal_ignores_pipeline() {
725        let source = DfeSource::new("syslog");
726        assert_eq!(
727            source
728                .consumer_group("archiver", ServiceRole::Universal, Some("ignored"), None)
729                .unwrap(),
730            "dfe-archiver"
731        );
732    }
733
734    #[test]
735    fn dfe_source_cg_override_wins() {
736        let source = DfeSource::new("syslog");
737        assert_eq!(
738            source
739                .consumer_group(
740                    "transform-vector",
741                    ServiceRole::Transform,
742                    None,
743                    Some("my-custom-cg")
744                )
745                .unwrap(),
746            "my-custom-cg"
747        );
748    }
749
750    #[test]
751    fn dfe_source_cg_override_wins_universal() {
752        let source = DfeSource::new("syslog");
753        assert_eq!(
754            source
755                .consumer_group(
756                    "loader",
757                    ServiceRole::Universal,
758                    None,
759                    Some("custom-loader-cg")
760                )
761                .unwrap(),
762            "custom-loader-cg"
763        );
764    }
765
766    #[test]
767    fn dfe_source_from_topic_land() {
768        assert_eq!(DfeSource::source_from_topic("syslog_land"), Some("syslog"));
769        assert_eq!(DfeSource::source_from_topic("auth_land"), Some("auth"));
770    }
771
772    #[test]
773    fn dfe_source_from_topic_load() {
774        assert_eq!(DfeSource::source_from_topic("syslog_load"), Some("syslog"));
775        assert_eq!(
776            DfeSource::source_from_topic("netflow_load"),
777            Some("netflow")
778        );
779    }
780
781    #[test]
782    fn dfe_source_from_topic_unknown() {
783        assert_eq!(DfeSource::source_from_topic("unknown"), None);
784        assert_eq!(DfeSource::source_from_topic("events"), None);
785        assert_eq!(DfeSource::source_from_topic(""), None);
786    }
787
788    #[test]
789    fn dfe_source_from_topic_edge_cases() {
790        assert_eq!(DfeSource::source_from_topic("_land"), Some(""));
791        assert_eq!(DfeSource::source_from_topic("a_load"), Some("a"));
792    }
793
794    #[test]
795    fn config_from_file_unsupported_extension() {
796        let dir = tempfile::tempdir().unwrap();
797        let path = dir.path().join("kafka.toml");
798        std::fs::write(&path, "key = value\n").unwrap();
799
800        let result = config_from_file(&path);
801        assert!(matches!(
802            result,
803            Err(KafkaConfigError::UnsupportedFormat { .. })
804        ));
805    }
806}