1use clap::{Parser, ValueEnum};
19use serde::{Deserialize, Serialize};
20use strum_macros::{Display, EnumString};
21
22const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
23const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
24const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
25const DEFAULT_RETRIES: i32 = i32::MAX;
26const DEFAULT_PREFETCH_NUM: usize = 4;
27const DEFAULT_DOWNLOAD_THREADS: usize = 3;
28const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
29const DEFAULT_MAX_POLL_RECORDS: usize = 500;
30const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
31const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1;
32const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500;
33const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
34const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024;
35const DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET: usize = 5;
36const DEFAULT_WRITER_BUFFER_MEMORY_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS: u64 = u64::MAX;
38
39const MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE: usize = 5;
40const DEFAULT_ACKS: &str = "all";
41const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
42const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
43const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
44
45#[derive(
48 Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize, EnumString, Display,
49)]
50#[serde(rename_all = "snake_case")]
51#[strum(ascii_case_insensitive)]
52pub enum NoKeyAssigner {
53 #[strum(serialize = "sticky")]
55 Sticky,
56 #[strum(serialize = "round_robin")]
58 RoundRobin,
59}
60
61#[derive(Parser, Clone, Deserialize, Serialize)]
62#[command(author, version, about, long_about = None)]
63pub struct Config {
64 #[arg(long, default_value_t = String::from(DEFAULT_BOOTSTRAP_SERVER))]
65 pub bootstrap_servers: String,
66
67 #[arg(long, default_value_t = DEFAULT_REQUEST_MAX_SIZE)]
68 pub writer_request_max_size: i32,
69
70 #[arg(long, default_value_t = String::from(DEFAULT_ACKS))]
71 pub writer_acks: String,
72
73 #[arg(long, default_value_t = DEFAULT_RETRIES)]
74 pub writer_retries: i32,
75
76 #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
77 pub writer_batch_size: i32,
78
79 #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
80 pub writer_bucket_no_key_assigner: NoKeyAssigner,
81
82 #[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
85 pub scanner_remote_log_prefetch_num: usize,
86
87 #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
90 pub remote_file_download_thread_num: usize,
91
92 #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
95 pub scanner_remote_log_read_concurrency: usize,
96
97 #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
100 pub scanner_log_max_poll_records: usize,
101
102 #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
105 pub scanner_log_fetch_max_bytes: i32,
106
107 #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)]
110 pub scanner_log_fetch_min_bytes: i32,
111
112 #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)]
115 pub scanner_log_fetch_wait_max_time_ms: i32,
116
117 #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
120 pub writer_batch_timeout_ms: i64,
121
122 #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)]
125 pub scanner_log_fetch_max_bytes_for_bucket: i32,
126
127 #[arg(long, default_value_t = true)]
132 pub writer_enable_idempotence: bool,
133
134 #[arg(long, default_value_t = DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET)]
137 pub writer_max_inflight_requests_per_bucket: usize,
138
139 #[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_MEMORY_SIZE)]
144 pub writer_buffer_memory_size: usize,
145
146 #[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS)]
149 pub writer_buffer_wait_timeout_ms: u64,
150
151 #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
154 pub connect_timeout_ms: u64,
155
156 #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
157 pub security_protocol: String,
158
159 #[arg(long, default_value_t = String::from(DEFAULT_SASL_MECHANISM))]
160 pub security_sasl_mechanism: String,
161
162 #[arg(long, default_value_t = String::new())]
163 pub security_sasl_username: String,
164
165 #[arg(long, default_value_t = String::new())]
166 #[serde(skip_serializing)]
167 pub security_sasl_password: String,
168}
169
170impl std::fmt::Debug for Config {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("Config")
173 .field("bootstrap_servers", &self.bootstrap_servers)
174 .field("writer_request_max_size", &self.writer_request_max_size)
175 .field("writer_acks", &self.writer_acks)
176 .field("writer_retries", &self.writer_retries)
177 .field("writer_batch_size", &self.writer_batch_size)
178 .field(
179 "writer_bucket_no_key_assigner",
180 &self.writer_bucket_no_key_assigner,
181 )
182 .field(
183 "scanner_remote_log_prefetch_num",
184 &self.scanner_remote_log_prefetch_num,
185 )
186 .field(
187 "remote_file_download_thread_num",
188 &self.remote_file_download_thread_num,
189 )
190 .field(
191 "scanner_log_max_poll_records",
192 &self.scanner_log_max_poll_records,
193 )
194 .field(
195 "scanner_log_fetch_max_bytes",
196 &self.scanner_log_fetch_max_bytes,
197 )
198 .field(
199 "scanner_log_fetch_min_bytes",
200 &self.scanner_log_fetch_min_bytes,
201 )
202 .field(
203 "scanner_log_fetch_max_bytes_for_bucket",
204 &self.scanner_log_fetch_max_bytes_for_bucket,
205 )
206 .field(
207 "scanner_log_fetch_wait_max_time_ms",
208 &self.scanner_log_fetch_wait_max_time_ms,
209 )
210 .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
211 .field("writer_enable_idempotence", &self.writer_enable_idempotence)
212 .field(
213 "writer_max_inflight_requests_per_bucket",
214 &self.writer_max_inflight_requests_per_bucket,
215 )
216 .field("writer_buffer_memory_size", &self.writer_buffer_memory_size)
217 .field(
218 "writer_buffer_wait_timeout_ms",
219 &self.writer_buffer_wait_timeout_ms,
220 )
221 .field("connect_timeout_ms", &self.connect_timeout_ms)
222 .field("security_protocol", &self.security_protocol)
223 .field("security_sasl_mechanism", &self.security_sasl_mechanism)
224 .field("security_sasl_username", &self.security_sasl_username)
225 .field("security_sasl_password", &"[REDACTED]")
226 .finish()
227 }
228}
229
230impl Default for Config {
231 fn default() -> Self {
232 Self {
233 bootstrap_servers: String::from(DEFAULT_BOOTSTRAP_SERVER),
234 writer_request_max_size: DEFAULT_REQUEST_MAX_SIZE,
235 writer_acks: String::from(DEFAULT_ACKS),
236 writer_retries: i32::MAX,
237 writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
238 writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
239 scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
240 remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
241 scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
242 scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
243 scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES,
244 scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES,
245 scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS,
246 scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
247 writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
248 writer_enable_idempotence: true,
249 writer_max_inflight_requests_per_bucket:
250 DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET,
251 writer_buffer_memory_size: DEFAULT_WRITER_BUFFER_MEMORY_SIZE,
252 writer_buffer_wait_timeout_ms: DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS,
253 connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
254 security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
255 security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
256 security_sasl_username: String::new(),
257 security_sasl_password: String::new(),
258 }
259 }
260}
261
262impl Config {
263 pub fn is_sasl_enabled(&self) -> bool {
267 self.security_protocol.eq_ignore_ascii_case("sasl")
268 }
269
270 pub fn validate_idempotence(&self) -> Result<(), String> {
274 if !self.writer_enable_idempotence {
275 return Ok(());
276 }
277 let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
278 if !acks_is_all {
279 return Err(format!(
280 "Idempotent writes require acks='all' (-1), but got acks='{}'",
281 self.writer_acks
282 ));
283 }
284 if self.writer_retries <= 0 {
285 return Err(format!(
286 "Idempotent writes require retries > 0, but got retries={}",
287 self.writer_retries
288 ));
289 }
290 if self.writer_max_inflight_requests_per_bucket
291 > MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
292 {
293 return Err(format!(
294 "Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
295 MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
296 self.writer_max_inflight_requests_per_bucket
297 ));
298 }
299 Ok(())
300 }
301
302 pub fn validate_security(&self) -> Result<(), String> {
306 if !self.is_sasl_enabled() {
307 return Ok(());
308 }
309 if !self.security_sasl_mechanism.eq_ignore_ascii_case("PLAIN") {
310 return Err(format!(
311 "Unsupported SASL mechanism: '{}'. Only 'PLAIN' is supported.",
312 self.security_sasl_mechanism
313 ));
314 }
315 if self.security_sasl_username.is_empty() {
316 return Err(
317 "security_sasl_username must be set when security_protocol is 'sasl'".to_string(),
318 );
319 }
320 if self.security_sasl_password.is_empty() {
321 return Err(
322 "security_sasl_password must be set when security_protocol is 'sasl'".to_string(),
323 );
324 }
325 Ok(())
326 }
327 pub fn validate_scanner_fetch(&self) -> Result<(), String> {
328 if self.scanner_log_fetch_min_bytes <= 0 {
329 return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
330 }
331 if self.scanner_log_fetch_max_bytes <= 0 {
332 return Err("scanner_log_fetch_max_bytes must be > 0".to_string());
333 }
334 if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes {
335 return Err(
336 "scanner_log_fetch_max_bytes must be >= scanner_log_fetch_min_bytes".to_string(),
337 );
338 }
339 if self.scanner_log_fetch_wait_max_time_ms < 0 {
340 return Err("scanner_log_fetch_wait_max_time_ms must be >= 0".to_string());
341 }
342 if self.scanner_log_fetch_max_bytes_for_bucket <= 0 {
343 return Err("scanner_log_fetch_max_bytes_for_bucket must be > 0".to_string());
344 }
345 if self.scanner_log_fetch_max_bytes_for_bucket > self.scanner_log_fetch_max_bytes {
346 return Err(
347 "scanner_log_fetch_max_bytes_for_bucket must be <= scanner_log_fetch_max_bytes"
348 .to_string(),
349 );
350 }
351 Ok(())
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_default_is_not_sasl() {
361 let config = Config::default();
362 assert!(!config.is_sasl_enabled());
363 assert!(config.validate_security().is_ok());
364 }
365
366 #[test]
367 fn test_sasl_enabled_valid() {
368 let config = Config {
369 security_protocol: "sasl".to_string(),
370 security_sasl_mechanism: "PLAIN".to_string(),
371 security_sasl_username: "admin".to_string(),
372 security_sasl_password: "secret".to_string(),
373 ..Config::default()
374 };
375 assert!(config.is_sasl_enabled());
376 assert!(config.validate_security().is_ok());
377 }
378
379 #[test]
380 fn test_sasl_enabled_case_insensitive() {
381 let config = Config {
382 security_protocol: "SASL".to_string(),
383 security_sasl_username: "admin".to_string(),
384 security_sasl_password: "secret".to_string(),
385 ..Config::default()
386 };
387 assert!(config.is_sasl_enabled());
388 assert!(config.validate_security().is_ok());
389 }
390
391 #[test]
392 fn test_sasl_missing_username() {
393 let config = Config {
394 security_protocol: "sasl".to_string(),
395 security_sasl_password: "secret".to_string(),
396 ..Config::default()
397 };
398 assert!(config.validate_security().is_err());
399 }
400
401 #[test]
402 fn test_sasl_missing_password() {
403 let config = Config {
404 security_protocol: "sasl".to_string(),
405 security_sasl_username: "admin".to_string(),
406 ..Config::default()
407 };
408 assert!(config.validate_security().is_err());
409 }
410
411 #[test]
412 fn test_sasl_unsupported_mechanism() {
413 let config = Config {
414 security_protocol: "sasl".to_string(),
415 security_sasl_mechanism: "SCRAM-SHA-256".to_string(),
416 security_sasl_username: "admin".to_string(),
417 security_sasl_password: "secret".to_string(),
418 ..Config::default()
419 };
420 assert!(config.validate_security().is_err());
421 }
422 #[test]
423 fn test_scanner_fetch_defaults_valid() {
424 let config = Config::default();
425 assert!(config.validate_scanner_fetch().is_ok());
426 assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
427 assert_eq!(config.scanner_log_fetch_min_bytes, 1);
428 assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
429 }
430
431 #[test]
432 fn test_scanner_fetch_invalid_ranges() {
433 let config = Config {
434 scanner_log_fetch_min_bytes: 2,
435 scanner_log_fetch_max_bytes: 1,
436 ..Config::default()
437 };
438 assert!(config.validate_scanner_fetch().is_err());
439 }
440
441 #[test]
442 fn test_scanner_fetch_negative_wait() {
443 let config = Config {
444 scanner_log_fetch_wait_max_time_ms: -1,
445 ..Config::default()
446 };
447 assert!(config.validate_scanner_fetch().is_err());
448 }
449
450 #[test]
451 fn test_idempotence_default_is_valid() {
452 let config = Config::default();
453 assert!(config.validate_idempotence().is_ok());
454 }
455
456 #[test]
457 fn test_idempotence_disabled_skips_validation() {
458 let config = Config {
459 writer_enable_idempotence: false,
460 writer_acks: "0".to_string(),
461 writer_retries: 0,
462 writer_max_inflight_requests_per_bucket: 100,
463 ..Config::default()
464 };
465 assert!(config.validate_idempotence().is_ok());
466 }
467
468 #[test]
469 fn test_idempotence_requires_acks_all() {
470 let config = Config {
471 writer_enable_idempotence: true,
472 writer_acks: "1".to_string(),
473 ..Config::default()
474 };
475 assert!(config.validate_idempotence().is_err());
476 }
477
478 #[test]
479 fn test_idempotence_requires_retries() {
480 let config = Config {
481 writer_enable_idempotence: true,
482 writer_retries: 0,
483 ..Config::default()
484 };
485 assert!(config.validate_idempotence().is_err());
486 }
487
488 #[test]
489 fn test_idempotence_requires_bounded_inflight() {
490 let config = Config {
491 writer_enable_idempotence: true,
492 writer_max_inflight_requests_per_bucket: 10,
493 ..Config::default()
494 };
495 assert!(config.validate_idempotence().is_err());
496 }
497}