use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString};
const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1;
const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024;
const DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET: usize = 5;
const DEFAULT_WRITER_BUFFER_MEMORY_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS: u64 = u64::MAX;
const MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE: usize = 5;
const DEFAULT_ACKS: &str = "all";
const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
#[derive(
Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize, EnumString, Display,
)]
#[serde(rename_all = "snake_case")]
#[strum(ascii_case_insensitive)]
pub enum NoKeyAssigner {
#[strum(serialize = "sticky")]
Sticky,
#[strum(serialize = "round_robin")]
RoundRobin,
}
#[derive(Parser, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {
#[arg(long, default_value_t = String::from(DEFAULT_BOOTSTRAP_SERVER))]
pub bootstrap_servers: String,
#[arg(long, default_value_t = DEFAULT_REQUEST_MAX_SIZE)]
pub writer_request_max_size: i32,
#[arg(long, default_value_t = String::from(DEFAULT_ACKS))]
pub writer_acks: String,
#[arg(long, default_value_t = DEFAULT_RETRIES)]
pub writer_retries: i32,
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
pub writer_batch_size: i32,
#[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
pub writer_bucket_no_key_assigner: NoKeyAssigner,
#[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
pub scanner_remote_log_prefetch_num: usize,
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
pub remote_file_download_thread_num: usize,
#[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
pub scanner_remote_log_read_concurrency: usize,
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
pub scanner_log_max_poll_records: usize,
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
pub scanner_log_fetch_max_bytes: i32,
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)]
pub scanner_log_fetch_min_bytes: i32,
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)]
pub scanner_log_fetch_wait_max_time_ms: i32,
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
pub writer_batch_timeout_ms: i64,
#[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)]
pub scanner_log_fetch_max_bytes_for_bucket: i32,
#[arg(long, default_value_t = true)]
pub writer_enable_idempotence: bool,
#[arg(long, default_value_t = DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET)]
pub writer_max_inflight_requests_per_bucket: usize,
#[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_MEMORY_SIZE)]
pub writer_buffer_memory_size: usize,
#[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS)]
pub writer_buffer_wait_timeout_ms: u64,
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
pub connect_timeout_ms: u64,
#[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
pub security_protocol: String,
#[arg(long, default_value_t = String::from(DEFAULT_SASL_MECHANISM))]
pub security_sasl_mechanism: String,
#[arg(long, default_value_t = String::new())]
pub security_sasl_username: String,
#[arg(long, default_value_t = String::new())]
#[serde(skip_serializing)]
pub security_sasl_password: String,
}
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config")
.field("bootstrap_servers", &self.bootstrap_servers)
.field("writer_request_max_size", &self.writer_request_max_size)
.field("writer_acks", &self.writer_acks)
.field("writer_retries", &self.writer_retries)
.field("writer_batch_size", &self.writer_batch_size)
.field(
"writer_bucket_no_key_assigner",
&self.writer_bucket_no_key_assigner,
)
.field(
"scanner_remote_log_prefetch_num",
&self.scanner_remote_log_prefetch_num,
)
.field(
"remote_file_download_thread_num",
&self.remote_file_download_thread_num,
)
.field(
"scanner_log_max_poll_records",
&self.scanner_log_max_poll_records,
)
.field(
"scanner_log_fetch_max_bytes",
&self.scanner_log_fetch_max_bytes,
)
.field(
"scanner_log_fetch_min_bytes",
&self.scanner_log_fetch_min_bytes,
)
.field(
"scanner_log_fetch_max_bytes_for_bucket",
&self.scanner_log_fetch_max_bytes_for_bucket,
)
.field(
"scanner_log_fetch_wait_max_time_ms",
&self.scanner_log_fetch_wait_max_time_ms,
)
.field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
.field("writer_enable_idempotence", &self.writer_enable_idempotence)
.field(
"writer_max_inflight_requests_per_bucket",
&self.writer_max_inflight_requests_per_bucket,
)
.field("writer_buffer_memory_size", &self.writer_buffer_memory_size)
.field(
"writer_buffer_wait_timeout_ms",
&self.writer_buffer_wait_timeout_ms,
)
.field("connect_timeout_ms", &self.connect_timeout_ms)
.field("security_protocol", &self.security_protocol)
.field("security_sasl_mechanism", &self.security_sasl_mechanism)
.field("security_sasl_username", &self.security_sasl_username)
.field("security_sasl_password", &"[REDACTED]")
.finish()
}
}
impl Default for Config {
fn default() -> Self {
Self {
bootstrap_servers: String::from(DEFAULT_BOOTSTRAP_SERVER),
writer_request_max_size: DEFAULT_REQUEST_MAX_SIZE,
writer_acks: String::from(DEFAULT_ACKS),
writer_retries: i32::MAX,
writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES,
scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES,
scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS,
scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
writer_enable_idempotence: true,
writer_max_inflight_requests_per_bucket:
DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET,
writer_buffer_memory_size: DEFAULT_WRITER_BUFFER_MEMORY_SIZE,
writer_buffer_wait_timeout_ms: DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS,
connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
security_sasl_username: String::new(),
security_sasl_password: String::new(),
}
}
}
impl Config {
pub fn is_sasl_enabled(&self) -> bool {
self.security_protocol.eq_ignore_ascii_case("sasl")
}
pub fn validate_idempotence(&self) -> Result<(), String> {
if !self.writer_enable_idempotence {
return Ok(());
}
let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
if !acks_is_all {
return Err(format!(
"Idempotent writes require acks='all' (-1), but got acks='{}'",
self.writer_acks
));
}
if self.writer_retries <= 0 {
return Err(format!(
"Idempotent writes require retries > 0, but got retries={}",
self.writer_retries
));
}
if self.writer_max_inflight_requests_per_bucket
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
{
return Err(format!(
"Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
self.writer_max_inflight_requests_per_bucket
));
}
Ok(())
}
pub fn validate_security(&self) -> Result<(), String> {
if !self.is_sasl_enabled() {
return Ok(());
}
if !self.security_sasl_mechanism.eq_ignore_ascii_case("PLAIN") {
return Err(format!(
"Unsupported SASL mechanism: '{}'. Only 'PLAIN' is supported.",
self.security_sasl_mechanism
));
}
if self.security_sasl_username.is_empty() {
return Err(
"security_sasl_username must be set when security_protocol is 'sasl'".to_string(),
);
}
if self.security_sasl_password.is_empty() {
return Err(
"security_sasl_password must be set when security_protocol is 'sasl'".to_string(),
);
}
Ok(())
}
pub fn validate_scanner_fetch(&self) -> Result<(), String> {
if self.scanner_log_fetch_min_bytes <= 0 {
return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
}
if self.scanner_log_fetch_max_bytes <= 0 {
return Err("scanner_log_fetch_max_bytes must be > 0".to_string());
}
if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes {
return Err(
"scanner_log_fetch_max_bytes must be >= scanner_log_fetch_min_bytes".to_string(),
);
}
if self.scanner_log_fetch_wait_max_time_ms < 0 {
return Err("scanner_log_fetch_wait_max_time_ms must be >= 0".to_string());
}
if self.scanner_log_fetch_max_bytes_for_bucket <= 0 {
return Err("scanner_log_fetch_max_bytes_for_bucket must be > 0".to_string());
}
if self.scanner_log_fetch_max_bytes_for_bucket > self.scanner_log_fetch_max_bytes {
return Err(
"scanner_log_fetch_max_bytes_for_bucket must be <= scanner_log_fetch_max_bytes"
.to_string(),
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_is_not_sasl() {
let config = Config::default();
assert!(!config.is_sasl_enabled());
assert!(config.validate_security().is_ok());
}
#[test]
fn test_sasl_enabled_valid() {
let config = Config {
security_protocol: "sasl".to_string(),
security_sasl_mechanism: "PLAIN".to_string(),
security_sasl_username: "admin".to_string(),
security_sasl_password: "secret".to_string(),
..Config::default()
};
assert!(config.is_sasl_enabled());
assert!(config.validate_security().is_ok());
}
#[test]
fn test_sasl_enabled_case_insensitive() {
let config = Config {
security_protocol: "SASL".to_string(),
security_sasl_username: "admin".to_string(),
security_sasl_password: "secret".to_string(),
..Config::default()
};
assert!(config.is_sasl_enabled());
assert!(config.validate_security().is_ok());
}
#[test]
fn test_sasl_missing_username() {
let config = Config {
security_protocol: "sasl".to_string(),
security_sasl_password: "secret".to_string(),
..Config::default()
};
assert!(config.validate_security().is_err());
}
#[test]
fn test_sasl_missing_password() {
let config = Config {
security_protocol: "sasl".to_string(),
security_sasl_username: "admin".to_string(),
..Config::default()
};
assert!(config.validate_security().is_err());
}
#[test]
fn test_sasl_unsupported_mechanism() {
let config = Config {
security_protocol: "sasl".to_string(),
security_sasl_mechanism: "SCRAM-SHA-256".to_string(),
security_sasl_username: "admin".to_string(),
security_sasl_password: "secret".to_string(),
..Config::default()
};
assert!(config.validate_security().is_err());
}
#[test]
fn test_scanner_fetch_defaults_valid() {
let config = Config::default();
assert!(config.validate_scanner_fetch().is_ok());
assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
assert_eq!(config.scanner_log_fetch_min_bytes, 1);
assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
}
#[test]
fn test_scanner_fetch_invalid_ranges() {
let config = Config {
scanner_log_fetch_min_bytes: 2,
scanner_log_fetch_max_bytes: 1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
}
#[test]
fn test_scanner_fetch_negative_wait() {
let config = Config {
scanner_log_fetch_wait_max_time_ms: -1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
}
#[test]
fn test_idempotence_default_is_valid() {
let config = Config::default();
assert!(config.validate_idempotence().is_ok());
}
#[test]
fn test_idempotence_disabled_skips_validation() {
let config = Config {
writer_enable_idempotence: false,
writer_acks: "0".to_string(),
writer_retries: 0,
writer_max_inflight_requests_per_bucket: 100,
..Config::default()
};
assert!(config.validate_idempotence().is_ok());
}
#[test]
fn test_idempotence_requires_acks_all() {
let config = Config {
writer_enable_idempotence: true,
writer_acks: "1".to_string(),
..Config::default()
};
assert!(config.validate_idempotence().is_err());
}
#[test]
fn test_idempotence_requires_retries() {
let config = Config {
writer_enable_idempotence: true,
writer_retries: 0,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
}
#[test]
fn test_idempotence_requires_bounded_inflight() {
let config = Config {
writer_enable_idempotence: true,
writer_max_inflight_requests_per_bucket: 10,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
}
}