Skip to main content

fluss/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use clap::{Parser, ValueEnum};
19use serde::{Deserialize, Serialize};
20use strum_macros::{Display, EnumString};
21
22const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
23const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
24const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
25const DEFAULT_RETRIES: i32 = i32::MAX;
26const DEFAULT_PREFETCH_NUM: usize = 4;
27const DEFAULT_DOWNLOAD_THREADS: usize = 3;
28const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
29const DEFAULT_MAX_POLL_RECORDS: usize = 500;
30const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
31const DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES: i32 = 1;
32const DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS: i32 = 500;
33const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
34const DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET: i32 = 1024 * 1024;
35const DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET: usize = 5;
36const DEFAULT_WRITER_BUFFER_MEMORY_SIZE: usize = 64 * 1024 * 1024; // 64MB, matching Java
37const DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS: u64 = u64::MAX;
38
39const MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE: usize = 5;
40const DEFAULT_ACKS: &str = "all";
41const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 120_000;
42const DEFAULT_SECURITY_PROTOCOL: &str = "PLAINTEXT";
43const DEFAULT_SASL_MECHANISM: &str = "PLAIN";
44
45/// Bucket assigner strategy for tables without bucket keys.
46/// Matches Java `client.writer.bucket.no-key-assigner`.
47#[derive(
48    Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize, EnumString, Display,
49)]
50#[serde(rename_all = "snake_case")]
51#[strum(ascii_case_insensitive)]
52pub enum NoKeyAssigner {
53    /// Sticks to one bucket until the batch is full, then switches.
54    #[strum(serialize = "sticky")]
55    Sticky,
56    /// Assigns each record to the next bucket in a rotating sequence.
57    #[strum(serialize = "round_robin")]
58    RoundRobin,
59}
60
61#[derive(Parser, Clone, Deserialize, Serialize)]
62#[command(author, version, about, long_about = None)]
63pub struct Config {
64    #[arg(long, default_value_t = String::from(DEFAULT_BOOTSTRAP_SERVER))]
65    pub bootstrap_servers: String,
66
67    #[arg(long, default_value_t = DEFAULT_REQUEST_MAX_SIZE)]
68    pub writer_request_max_size: i32,
69
70    #[arg(long, default_value_t = String::from(DEFAULT_ACKS))]
71    pub writer_acks: String,
72
73    #[arg(long, default_value_t = DEFAULT_RETRIES)]
74    pub writer_retries: i32,
75
76    #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
77    pub writer_batch_size: i32,
78
79    #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
80    pub writer_bucket_no_key_assigner: NoKeyAssigner,
81
82    /// Maximum number of remote log segments to prefetch
83    /// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
84    #[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
85    pub scanner_remote_log_prefetch_num: usize,
86
87    /// Maximum concurrent remote log downloads
88    /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
89    #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
90    pub remote_file_download_thread_num: usize,
91
92    /// Intra-file remote log read concurrency for each remote segment download.
93    /// Download path always uses streaming reader.
94    #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
95    pub scanner_remote_log_read_concurrency: usize,
96
97    /// Maximum number of records returned in a single call to poll() for LogScanner.
98    /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
99    #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
100    pub scanner_log_max_poll_records: usize,
101
102    /// Maximum bytes per fetch response for LogScanner.
103    /// Default: 16777216 (16MB)
104    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES)]
105    pub scanner_log_fetch_max_bytes: i32,
106
107    /// Minimum bytes to accumulate before returning a fetch response.
108    /// Default: 1
109    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES)]
110    pub scanner_log_fetch_min_bytes: i32,
111
112    /// Maximum time the server may wait (ms) to satisfy min-bytes.
113    /// Default: 500
114    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS)]
115    pub scanner_log_fetch_wait_max_time_ms: i32,
116
117    /// The maximum time to wait for a batch to be completed in milliseconds.
118    /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
119    #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
120    pub writer_batch_timeout_ms: i64,
121
122    /// Maximum bytes per fetch response **per bucket** for LogScanner.
123    /// Default: 1048576 (1MB)
124    #[arg(long, default_value_t = DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET)]
125    pub scanner_log_fetch_max_bytes_for_bucket: i32,
126
127    /// Whether to enable idempotent writes. When enabled, each batch is tagged with
128    /// a server-allocated writer ID and per-bucket sequence number so the server can
129    /// detect and deduplicate retried batches.
130    /// Default: true (matching Java CLIENT_WRITER_ENABLE_IDEMPOTENCE)
131    #[arg(long, default_value_t = true)]
132    pub writer_enable_idempotence: bool,
133
134    /// Maximum number of in-flight requests per bucket for idempotent writes.
135    /// Default: 5 (matching Java client.writer.max-inflight-requests-per-bucket)
136    #[arg(long, default_value_t = DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET)]
137    pub writer_max_inflight_requests_per_bucket: usize,
138
139    /// Total memory available for buffering write batches across all buckets.
140    /// When this limit is reached, `upsert()`/`append()` will block until
141    /// in-flight batches complete and free memory.
142    /// Default: 64MB (matching Java's LazyMemorySegmentPool: 512 pages x 128KB)
143    #[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_MEMORY_SIZE)]
144    pub writer_buffer_memory_size: usize,
145
146    /// Maximum time in milliseconds to block waiting for buffer memory.
147    /// If the timeout is exceeded, the write call returns an error.
148    #[arg(long, default_value_t = DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS)]
149    pub writer_buffer_wait_timeout_ms: u64,
150
151    /// Connect timeout in milliseconds for TCP transport connect.
152    /// Default: 120000 (120 seconds).
153    #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT_MS)]
154    pub connect_timeout_ms: u64,
155
156    #[arg(long, default_value_t = String::from(DEFAULT_SECURITY_PROTOCOL))]
157    pub security_protocol: String,
158
159    #[arg(long, default_value_t = String::from(DEFAULT_SASL_MECHANISM))]
160    pub security_sasl_mechanism: String,
161
162    #[arg(long, default_value_t = String::new())]
163    pub security_sasl_username: String,
164
165    #[arg(long, default_value_t = String::new())]
166    #[serde(skip_serializing)]
167    pub security_sasl_password: String,
168}
169
170impl std::fmt::Debug for Config {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        f.debug_struct("Config")
173            .field("bootstrap_servers", &self.bootstrap_servers)
174            .field("writer_request_max_size", &self.writer_request_max_size)
175            .field("writer_acks", &self.writer_acks)
176            .field("writer_retries", &self.writer_retries)
177            .field("writer_batch_size", &self.writer_batch_size)
178            .field(
179                "writer_bucket_no_key_assigner",
180                &self.writer_bucket_no_key_assigner,
181            )
182            .field(
183                "scanner_remote_log_prefetch_num",
184                &self.scanner_remote_log_prefetch_num,
185            )
186            .field(
187                "remote_file_download_thread_num",
188                &self.remote_file_download_thread_num,
189            )
190            .field(
191                "scanner_log_max_poll_records",
192                &self.scanner_log_max_poll_records,
193            )
194            .field(
195                "scanner_log_fetch_max_bytes",
196                &self.scanner_log_fetch_max_bytes,
197            )
198            .field(
199                "scanner_log_fetch_min_bytes",
200                &self.scanner_log_fetch_min_bytes,
201            )
202            .field(
203                "scanner_log_fetch_max_bytes_for_bucket",
204                &self.scanner_log_fetch_max_bytes_for_bucket,
205            )
206            .field(
207                "scanner_log_fetch_wait_max_time_ms",
208                &self.scanner_log_fetch_wait_max_time_ms,
209            )
210            .field("writer_batch_timeout_ms", &self.writer_batch_timeout_ms)
211            .field("writer_enable_idempotence", &self.writer_enable_idempotence)
212            .field(
213                "writer_max_inflight_requests_per_bucket",
214                &self.writer_max_inflight_requests_per_bucket,
215            )
216            .field("writer_buffer_memory_size", &self.writer_buffer_memory_size)
217            .field(
218                "writer_buffer_wait_timeout_ms",
219                &self.writer_buffer_wait_timeout_ms,
220            )
221            .field("connect_timeout_ms", &self.connect_timeout_ms)
222            .field("security_protocol", &self.security_protocol)
223            .field("security_sasl_mechanism", &self.security_sasl_mechanism)
224            .field("security_sasl_username", &self.security_sasl_username)
225            .field("security_sasl_password", &"[REDACTED]")
226            .finish()
227    }
228}
229
230impl Default for Config {
231    fn default() -> Self {
232        Self {
233            bootstrap_servers: String::from(DEFAULT_BOOTSTRAP_SERVER),
234            writer_request_max_size: DEFAULT_REQUEST_MAX_SIZE,
235            writer_acks: String::from(DEFAULT_ACKS),
236            writer_retries: i32::MAX,
237            writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
238            writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
239            scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
240            remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
241            scanner_remote_log_read_concurrency: DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
242            scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
243            scanner_log_fetch_max_bytes: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES,
244            scanner_log_fetch_min_bytes: DEFAULT_SCANNER_LOG_FETCH_MIN_BYTES,
245            scanner_log_fetch_wait_max_time_ms: DEFAULT_SCANNER_LOG_FETCH_WAIT_MAX_TIME_MS,
246            scanner_log_fetch_max_bytes_for_bucket: DEFAULT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET,
247            writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
248            writer_enable_idempotence: true,
249            writer_max_inflight_requests_per_bucket:
250                DEFAULT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET,
251            writer_buffer_memory_size: DEFAULT_WRITER_BUFFER_MEMORY_SIZE,
252            writer_buffer_wait_timeout_ms: DEFAULT_WRITER_BUFFER_WAIT_TIMEOUT_MS,
253            connect_timeout_ms: DEFAULT_CONNECT_TIMEOUT_MS,
254            security_protocol: String::from(DEFAULT_SECURITY_PROTOCOL),
255            security_sasl_mechanism: String::from(DEFAULT_SASL_MECHANISM),
256            security_sasl_username: String::new(),
257            security_sasl_password: String::new(),
258        }
259    }
260}
261
262impl Config {
263    /// Returns true when the security protocol indicates SASL authentication
264    /// should be performed. Matches Java's `SaslAuthenticationPlugin` which
265    /// registers as `"sasl"` (case-insensitive).
266    pub fn is_sasl_enabled(&self) -> bool {
267        self.security_protocol.eq_ignore_ascii_case("sasl")
268    }
269
270    /// Validates idempotence configuration. Returns `Ok(())` when the config is
271    /// consistent, or an error message when idempotence is enabled but other
272    /// settings are incompatible.
273    pub fn validate_idempotence(&self) -> Result<(), String> {
274        if !self.writer_enable_idempotence {
275            return Ok(());
276        }
277        let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
278        if !acks_is_all {
279            return Err(format!(
280                "Idempotent writes require acks='all' (-1), but got acks='{}'",
281                self.writer_acks
282            ));
283        }
284        if self.writer_retries <= 0 {
285            return Err(format!(
286                "Idempotent writes require retries > 0, but got retries={}",
287                self.writer_retries
288            ));
289        }
290        if self.writer_max_inflight_requests_per_bucket
291            > MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
292        {
293            return Err(format!(
294                "Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
295                MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
296                self.writer_max_inflight_requests_per_bucket
297            ));
298        }
299        Ok(())
300    }
301
302    /// Validates security configuration. Returns `Ok(())` when the config is
303    /// consistent, or an error message when SASL is enabled but the config is
304    /// incomplete or uses an unsupported mechanism.
305    pub fn validate_security(&self) -> Result<(), String> {
306        if !self.is_sasl_enabled() {
307            return Ok(());
308        }
309        if !self.security_sasl_mechanism.eq_ignore_ascii_case("PLAIN") {
310            return Err(format!(
311                "Unsupported SASL mechanism: '{}'. Only 'PLAIN' is supported.",
312                self.security_sasl_mechanism
313            ));
314        }
315        if self.security_sasl_username.is_empty() {
316            return Err(
317                "security_sasl_username must be set when security_protocol is 'sasl'".to_string(),
318            );
319        }
320        if self.security_sasl_password.is_empty() {
321            return Err(
322                "security_sasl_password must be set when security_protocol is 'sasl'".to_string(),
323            );
324        }
325        Ok(())
326    }
327    pub fn validate_scanner_fetch(&self) -> Result<(), String> {
328        if self.scanner_log_fetch_min_bytes <= 0 {
329            return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
330        }
331        if self.scanner_log_fetch_max_bytes <= 0 {
332            return Err("scanner_log_fetch_max_bytes must be > 0".to_string());
333        }
334        if self.scanner_log_fetch_max_bytes < self.scanner_log_fetch_min_bytes {
335            return Err(
336                "scanner_log_fetch_max_bytes must be >= scanner_log_fetch_min_bytes".to_string(),
337            );
338        }
339        if self.scanner_log_fetch_wait_max_time_ms < 0 {
340            return Err("scanner_log_fetch_wait_max_time_ms must be >= 0".to_string());
341        }
342        if self.scanner_log_fetch_max_bytes_for_bucket <= 0 {
343            return Err("scanner_log_fetch_max_bytes_for_bucket must be > 0".to_string());
344        }
345        if self.scanner_log_fetch_max_bytes_for_bucket > self.scanner_log_fetch_max_bytes {
346            return Err(
347                "scanner_log_fetch_max_bytes_for_bucket must be <= scanner_log_fetch_max_bytes"
348                    .to_string(),
349            );
350        }
351        Ok(())
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_default_is_not_sasl() {
361        let config = Config::default();
362        assert!(!config.is_sasl_enabled());
363        assert!(config.validate_security().is_ok());
364    }
365
366    #[test]
367    fn test_sasl_enabled_valid() {
368        let config = Config {
369            security_protocol: "sasl".to_string(),
370            security_sasl_mechanism: "PLAIN".to_string(),
371            security_sasl_username: "admin".to_string(),
372            security_sasl_password: "secret".to_string(),
373            ..Config::default()
374        };
375        assert!(config.is_sasl_enabled());
376        assert!(config.validate_security().is_ok());
377    }
378
379    #[test]
380    fn test_sasl_enabled_case_insensitive() {
381        let config = Config {
382            security_protocol: "SASL".to_string(),
383            security_sasl_username: "admin".to_string(),
384            security_sasl_password: "secret".to_string(),
385            ..Config::default()
386        };
387        assert!(config.is_sasl_enabled());
388        assert!(config.validate_security().is_ok());
389    }
390
391    #[test]
392    fn test_sasl_missing_username() {
393        let config = Config {
394            security_protocol: "sasl".to_string(),
395            security_sasl_password: "secret".to_string(),
396            ..Config::default()
397        };
398        assert!(config.validate_security().is_err());
399    }
400
401    #[test]
402    fn test_sasl_missing_password() {
403        let config = Config {
404            security_protocol: "sasl".to_string(),
405            security_sasl_username: "admin".to_string(),
406            ..Config::default()
407        };
408        assert!(config.validate_security().is_err());
409    }
410
411    #[test]
412    fn test_sasl_unsupported_mechanism() {
413        let config = Config {
414            security_protocol: "sasl".to_string(),
415            security_sasl_mechanism: "SCRAM-SHA-256".to_string(),
416            security_sasl_username: "admin".to_string(),
417            security_sasl_password: "secret".to_string(),
418            ..Config::default()
419        };
420        assert!(config.validate_security().is_err());
421    }
422    #[test]
423    fn test_scanner_fetch_defaults_valid() {
424        let config = Config::default();
425        assert!(config.validate_scanner_fetch().is_ok());
426        assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
427        assert_eq!(config.scanner_log_fetch_min_bytes, 1);
428        assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
429    }
430
431    #[test]
432    fn test_scanner_fetch_invalid_ranges() {
433        let config = Config {
434            scanner_log_fetch_min_bytes: 2,
435            scanner_log_fetch_max_bytes: 1,
436            ..Config::default()
437        };
438        assert!(config.validate_scanner_fetch().is_err());
439    }
440
441    #[test]
442    fn test_scanner_fetch_negative_wait() {
443        let config = Config {
444            scanner_log_fetch_wait_max_time_ms: -1,
445            ..Config::default()
446        };
447        assert!(config.validate_scanner_fetch().is_err());
448    }
449
450    #[test]
451    fn test_idempotence_default_is_valid() {
452        let config = Config::default();
453        assert!(config.validate_idempotence().is_ok());
454    }
455
456    #[test]
457    fn test_idempotence_disabled_skips_validation() {
458        let config = Config {
459            writer_enable_idempotence: false,
460            writer_acks: "0".to_string(),
461            writer_retries: 0,
462            writer_max_inflight_requests_per_bucket: 100,
463            ..Config::default()
464        };
465        assert!(config.validate_idempotence().is_ok());
466    }
467
468    #[test]
469    fn test_idempotence_requires_acks_all() {
470        let config = Config {
471            writer_enable_idempotence: true,
472            writer_acks: "1".to_string(),
473            ..Config::default()
474        };
475        assert!(config.validate_idempotence().is_err());
476    }
477
478    #[test]
479    fn test_idempotence_requires_retries() {
480        let config = Config {
481            writer_enable_idempotence: true,
482            writer_retries: 0,
483            ..Config::default()
484        };
485        assert!(config.validate_idempotence().is_err());
486    }
487
488    #[test]
489    fn test_idempotence_requires_bounded_inflight() {
490        let config = Config {
491            writer_enable_idempotence: true,
492            writer_max_inflight_requests_per_bucket: 10,
493            ..Config::default()
494        };
495        assert!(config.validate_idempotence().is_err());
496    }
497}