1use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_type::value::{
7 Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8 r#type::Type, uint::Uint,
9};
10
11use crate::common::CommitVersion;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum AcceptError {
15 TypeMismatch {
16 expected: Vec<Type>,
17 actual: Type,
18 },
19
20 InvalidValue(String),
21}
22
23impl fmt::Display for AcceptError {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 Self::TypeMismatch {
27 expected,
28 actual,
29 } => {
30 write!(f, "expected one of {:?}, got {:?}", expected, actual)
31 }
32 Self::InvalidValue(reason) => write!(f, "{reason}"),
33 }
34 }
35}
36
37#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum ConfigKey {
39 OracleWindowSize,
40 OracleWaterMark,
41 QueryRowBatchSize,
42 RowTtlScanBatchSize,
43 RowTtlScanInterval,
44 OperatorTtlScanBatchSize,
45 OperatorTtlScanInterval,
46 HistoricalGcBatchSize,
47 HistoricalGcInterval,
48 CdcTtlDuration,
49 CdcCompactInterval,
50 CdcCompactBlockSize,
51 CdcCompactSafetyLag,
52 CdcCompactMaxBlocksPerTick,
53 CdcCompactBlockCacheCapacity,
54 CdcCompactZstdLevel,
55 ThreadsAsync,
56 ThreadsSystem,
57 ThreadsQuery,
58}
59
60impl ConfigKey {
61 pub fn all() -> &'static [Self] {
62 &[
63 Self::OracleWindowSize,
64 Self::OracleWaterMark,
65 Self::QueryRowBatchSize,
66 Self::RowTtlScanBatchSize,
67 Self::RowTtlScanInterval,
68 Self::OperatorTtlScanBatchSize,
69 Self::OperatorTtlScanInterval,
70 Self::HistoricalGcBatchSize,
71 Self::HistoricalGcInterval,
72 Self::CdcTtlDuration,
73 Self::CdcCompactInterval,
74 Self::CdcCompactBlockSize,
75 Self::CdcCompactSafetyLag,
76 Self::CdcCompactMaxBlocksPerTick,
77 Self::CdcCompactBlockCacheCapacity,
78 Self::CdcCompactZstdLevel,
79 Self::ThreadsAsync,
80 Self::ThreadsSystem,
81 Self::ThreadsQuery,
82 ]
83 }
84
85 pub fn default_value(&self) -> Value {
86 match self {
87 Self::OracleWindowSize => Value::Uint8(500),
88 Self::OracleWaterMark => Value::Uint8(20),
89 Self::QueryRowBatchSize => Value::Uint2(32),
90 Self::RowTtlScanBatchSize => Value::Uint8(10000),
91 Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
92 Self::OperatorTtlScanBatchSize => Value::Uint8(10000),
93 Self::OperatorTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
94 Self::HistoricalGcBatchSize => Value::Uint8(50_000),
95 Self::HistoricalGcInterval => Value::Duration(Duration::from_seconds(30).unwrap()),
96 Self::CdcTtlDuration => Value::None {
97 inner: Type::Duration,
98 },
99 Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
100 Self::CdcCompactBlockSize => Value::Uint8(1024),
101 Self::CdcCompactSafetyLag => Value::Uint8(1024),
102 Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
103 Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
104 Self::CdcCompactZstdLevel => Value::Uint1(7),
105 Self::ThreadsAsync => Value::Uint2(1),
106 Self::ThreadsSystem => Value::Uint2(2),
107 Self::ThreadsQuery => Value::Uint2(1),
108 }
109 }
110
111 pub fn description(&self) -> &'static str {
112 match self {
113 Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
114 Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
115 Self::QueryRowBatchSize => {
116 "Number of rows produced per batch by query / DML pipeline operators."
117 }
118 Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
119 Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
120 Self::OperatorTtlScanBatchSize => {
121 "Max rows to examine per batch during an operator-state TTL scan."
122 }
123 Self::OperatorTtlScanInterval => {
124 "How often the operator-state TTL actor should scan for expired rows."
125 }
126 Self::HistoricalGcBatchSize => {
127 "Max historical (key, version) pairs scanned per shape per historical GC tick."
128 }
129 Self::HistoricalGcInterval => {
130 "How often the historical-version GC actor sweeps __historical for versions older than the oracle read watermark."
131 }
132 Self::CdcTtlDuration => {
133 "Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
134 when set, must be > 0 and entries older than this duration are evicted regardless \
135 of consumer state."
136 }
137 Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
138 Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
139 Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
140 Self::CdcCompactMaxBlocksPerTick => {
141 "Upper bound on consecutive blocks produced per actor tick."
142 }
143 Self::CdcCompactBlockCacheCapacity => {
144 "Number of decompressed CDC blocks held in the in-memory LRU cache."
145 }
146 Self::CdcCompactZstdLevel => {
147 "Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
148 slower compression. Decompression cost is independent of level."
149 }
150 Self::ThreadsAsync => {
151 "Number of worker threads for the async runtime. Must be >= 1. \
152 Read at boot before the runtime starts; changes require restart."
153 }
154 Self::ThreadsSystem => {
155 "Number of worker threads for the system pool (lightweight actors). \
156 Must be >= 1. Changes require restart."
157 }
158 Self::ThreadsQuery => {
159 "Number of worker threads for the query pool (execution-heavy actors). \
160 Must be >= 1. Changes require restart."
161 }
162 }
163 }
164
165 pub fn requires_restart(&self) -> bool {
166 match self {
167 Self::OracleWindowSize => false,
168 Self::OracleWaterMark => false,
169 Self::QueryRowBatchSize => false,
170 Self::RowTtlScanBatchSize => false,
171 Self::RowTtlScanInterval => false,
172 Self::OperatorTtlScanBatchSize => false,
173 Self::OperatorTtlScanInterval => false,
174 Self::HistoricalGcBatchSize => false,
175 Self::HistoricalGcInterval => false,
176 Self::CdcTtlDuration => false,
177 Self::CdcCompactInterval => false,
178 Self::CdcCompactBlockSize => false,
179 Self::CdcCompactSafetyLag => false,
180 Self::CdcCompactMaxBlocksPerTick => false,
181 Self::CdcCompactBlockCacheCapacity => true,
182 Self::CdcCompactZstdLevel => false,
183 Self::ThreadsAsync => true,
184 Self::ThreadsSystem => true,
185 Self::ThreadsQuery => true,
186 }
187 }
188
189 pub fn expected_types(&self) -> &'static [Type] {
190 match self {
191 Self::OracleWindowSize => &[Type::Uint8],
192 Self::OracleWaterMark => &[Type::Uint8],
193 Self::QueryRowBatchSize => &[Type::Uint2],
194 Self::RowTtlScanBatchSize => &[Type::Uint8],
195 Self::RowTtlScanInterval => &[Type::Duration],
196 Self::OperatorTtlScanBatchSize => &[Type::Uint8],
197 Self::OperatorTtlScanInterval => &[Type::Duration],
198 Self::HistoricalGcBatchSize => &[Type::Uint8],
199 Self::HistoricalGcInterval => &[Type::Duration],
200 Self::CdcTtlDuration => &[Type::Duration],
201 Self::CdcCompactInterval => &[Type::Duration],
202 Self::CdcCompactBlockSize => &[Type::Uint8],
203 Self::CdcCompactSafetyLag => &[Type::Uint8],
204 Self::CdcCompactMaxBlocksPerTick => &[Type::Uint8],
205 Self::CdcCompactBlockCacheCapacity => &[Type::Uint8],
206 Self::CdcCompactZstdLevel => &[Type::Uint1],
207 Self::ThreadsAsync => &[Type::Uint2],
208 Self::ThreadsSystem => &[Type::Uint2],
209 Self::ThreadsQuery => &[Type::Uint2],
210 }
211 }
212
213 pub fn is_optional(&self) -> bool {
214 match self {
215 Self::OracleWindowSize => false,
216 Self::OracleWaterMark => false,
217 Self::QueryRowBatchSize => false,
218 Self::RowTtlScanBatchSize => false,
219 Self::RowTtlScanInterval => false,
220 Self::OperatorTtlScanBatchSize => false,
221 Self::OperatorTtlScanInterval => false,
222 Self::HistoricalGcBatchSize => false,
223 Self::HistoricalGcInterval => false,
224 Self::CdcTtlDuration => true,
225 Self::CdcCompactInterval => false,
226 Self::CdcCompactBlockSize => false,
227 Self::CdcCompactSafetyLag => false,
228 Self::CdcCompactMaxBlocksPerTick => false,
229 Self::CdcCompactBlockCacheCapacity => false,
230 Self::CdcCompactZstdLevel => false,
231 Self::ThreadsAsync => false,
232 Self::ThreadsSystem => false,
233 Self::ThreadsQuery => false,
234 }
235 }
236
237 fn validate_canonical(&self, value: &Value) -> Result<(), String> {
238 match self {
239 Self::CdcTtlDuration => match value {
240 Value::None {
241 ..
242 } => Ok(()),
243 Value::Duration(d) => {
244 if d.is_positive() {
245 Ok(())
246 } else {
247 Err("CDC_TTL_DURATION must be greater than zero".to_string())
248 }
249 }
250 _ => Ok(()),
251 },
252 Self::CdcCompactInterval => match value {
253 Value::Duration(d) => {
254 if d.is_positive() {
255 Ok(())
256 } else {
257 Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
258 }
259 }
260 _ => Ok(()),
261 },
262 Self::CdcCompactBlockSize => match value {
263 Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
264 _ => Ok(()),
265 },
266 Self::QueryRowBatchSize => match value {
267 Value::Uint2(0) => Err("QUERY_ROW_BATCH_SIZE must be greater than zero".to_string()),
268 _ => Ok(()),
269 },
270 Self::CdcCompactBlockCacheCapacity => match value {
271 Value::Uint8(0) => {
272 Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
273 }
274 _ => Ok(()),
275 },
276 Self::CdcCompactZstdLevel => match value {
277 Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
278 Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
279 _ => Ok(()),
280 },
281 Self::HistoricalGcBatchSize => match value {
282 Value::Uint8(0) => {
283 Err("HISTORICAL_GC_BATCH_SIZE must be greater than zero".to_string())
284 }
285 _ => Ok(()),
286 },
287 Self::HistoricalGcInterval => match value {
288 Value::Duration(d) => {
289 if d.is_positive() {
290 Ok(())
291 } else {
292 Err("HISTORICAL_GC_INTERVAL must be greater than zero".to_string())
293 }
294 }
295 _ => Ok(()),
296 },
297 Self::ThreadsAsync => match value {
298 Value::Uint2(0) => Err("THREADS_ASYNC must be greater than zero".to_string()),
299 _ => Ok(()),
300 },
301 Self::ThreadsSystem => match value {
302 Value::Uint2(0) => Err("THREADS_SYSTEM must be greater than zero".to_string()),
303 _ => Ok(()),
304 },
305 Self::ThreadsQuery => match value {
306 Value::Uint2(0) => Err("THREADS_QUERY must be greater than zero".to_string()),
307 _ => Ok(()),
308 },
309 _ => Ok(()),
310 }
311 }
312
313 pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
314 if let Value::None {
315 inner,
316 } = &value
317 {
318 if self.is_optional() && self.expected_types().contains(inner) {
319 return Ok(value);
320 }
321 return Err(AcceptError::TypeMismatch {
322 expected: self.expected_types().to_vec(),
323 actual: value.get_type(),
324 });
325 }
326
327 let canonical = if self.expected_types().contains(&value.get_type()) {
328 value
329 } else {
330 try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
331 expected: self.expected_types().to_vec(),
332 actual: value.get_type(),
333 })?
334 };
335
336 self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
337 Ok(canonical)
338 }
339}
340
341fn try_coerce_numeric(value: &Value, expected: &[Type]) -> Option<Value> {
342 for target in expected {
343 let coerced = match target {
344 Type::Uint1 => {
345 value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
346 }
347 Type::Uint2 => {
348 value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
349 }
350 Type::Uint4 => {
351 value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
352 }
353 Type::Uint8 => {
354 value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
355 }
356 Type::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
357 Type::Int1 => value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8)),
358 Type::Int2 => {
359 value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
360 }
361 Type::Int4 => {
362 value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
363 }
364 Type::Int8 => {
365 value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
366 }
367 Type::Int16 => {
368 value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
369 }
370 Type::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
371 Type::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
372 Type::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
373 Type::Float4 => {
374 value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
375 }
376 Type::Float8 => {
377 value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
378 }
379 Type::Duration => value
380 .to_usize()
381 .and_then(|v| Duration::from_seconds(v as i64).ok())
382 .map(Value::Duration),
383 _ => None,
384 };
385 if coerced.is_some() {
386 return coerced;
387 }
388 }
389 None
390}
391
392impl fmt::Display for ConfigKey {
393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394 match self {
395 Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
396 Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
397 Self::QueryRowBatchSize => write!(f, "QUERY_ROW_BATCH_SIZE"),
398 Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
399 Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
400 Self::OperatorTtlScanBatchSize => write!(f, "OPERATOR_TTL_SCAN_BATCH_SIZE"),
401 Self::OperatorTtlScanInterval => write!(f, "OPERATOR_TTL_SCAN_INTERVAL"),
402 Self::HistoricalGcBatchSize => write!(f, "HISTORICAL_GC_BATCH_SIZE"),
403 Self::HistoricalGcInterval => write!(f, "HISTORICAL_GC_INTERVAL"),
404 Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
405 Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
406 Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
407 Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
408 Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
409 Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
410 Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
411 Self::ThreadsAsync => write!(f, "THREADS_ASYNC"),
412 Self::ThreadsSystem => write!(f, "THREADS_SYSTEM"),
413 Self::ThreadsQuery => write!(f, "THREADS_QUERY"),
414 }
415 }
416}
417
418impl FromStr for ConfigKey {
419 type Err = String;
420
421 fn from_str(s: &str) -> Result<Self, Self::Err> {
422 match s {
423 "ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
424 "ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
425 "QUERY_ROW_BATCH_SIZE" => Ok(Self::QueryRowBatchSize),
426 "ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
427 "ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
428 "OPERATOR_TTL_SCAN_BATCH_SIZE" => Ok(Self::OperatorTtlScanBatchSize),
429 "OPERATOR_TTL_SCAN_INTERVAL" => Ok(Self::OperatorTtlScanInterval),
430 "HISTORICAL_GC_BATCH_SIZE" => Ok(Self::HistoricalGcBatchSize),
431 "HISTORICAL_GC_INTERVAL" => Ok(Self::HistoricalGcInterval),
432 "CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
433 "CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
434 "CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
435 "CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
436 "CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
437 "CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
438 "CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
439 "THREADS_ASYNC" => Ok(Self::ThreadsAsync),
440 "THREADS_SYSTEM" => Ok(Self::ThreadsSystem),
441 "THREADS_QUERY" => Ok(Self::ThreadsQuery),
442 _ => Err(format!("Unknown system configuration key: {}", s)),
443 }
444 }
445}
446
447#[derive(Debug, Clone)]
448pub struct Config {
449 pub key: ConfigKey,
450
451 pub value: Value,
452
453 pub default_value: Value,
454
455 pub description: &'static str,
456
457 pub requires_restart: bool,
458}
459
460pub trait GetConfig: Send + Sync {
461 fn get_config(&self, key: ConfigKey) -> Value;
462
463 fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
464
465 fn get_config_uint8(&self, key: ConfigKey) -> u64 {
466 let val = self.get_config(key);
467 match val {
468 Value::Uint8(v) => v,
469 v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
470 }
471 }
472
473 fn get_config_uint1(&self, key: ConfigKey) -> u8 {
474 let val = self.get_config(key);
475 match val {
476 Value::Uint1(v) => v,
477 v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
478 }
479 }
480
481 fn get_config_uint2(&self, key: ConfigKey) -> u16 {
482 let val = self.get_config(key);
483 match val {
484 Value::Uint2(v) => v,
485 v => panic!("config key '{}' expected Uint2, got {:?}", key, v),
486 }
487 }
488
489 fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
490 let val = self.get_config(key);
491 match val {
492 Value::Duration(v) => {
493 let total_nanos =
494 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
495 StdDuration::from_nanos(total_nanos.max(0) as u64)
496 }
497 v => panic!("config key '{}' expected Duration, got {:?}", key, v),
498 }
499 }
500
501 fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
502 match self.get_config(key) {
503 Value::None {
504 ..
505 } => None,
506 Value::Duration(v) => {
507 let total_nanos =
508 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
509 Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
510 }
511 v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
512 }
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 #[test]
521 fn test_cdc_ttl_default_is_typed_null() {
522 let default = ConfigKey::CdcTtlDuration.default_value();
524 assert!(matches!(
525 default,
526 Value::None {
527 inner: Type::Duration
528 }
529 ));
530 }
531
532 #[test]
533 fn test_cdc_ttl_accept_passes_typed_null() {
534 let none = Value::None {
535 inner: Type::Duration,
536 };
537 let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
538 assert_eq!(v, none);
539 }
540
541 #[test]
542 fn test_cdc_ttl_accept_passes_positive_duration() {
543 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
544 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
545
546 let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
547 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
548 }
549
550 #[test]
551 fn test_cdc_ttl_accept_rejects_zero() {
552 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
553 match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
554 AcceptError::InvalidValue(reason) => {
555 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
556 }
557 other => panic!("expected InvalidValue, got {other:?}"),
558 }
559 }
560
561 #[test]
562 fn test_cdc_ttl_accept_rejects_negative() {
563 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
564 assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
565 }
566
567 #[test]
568 fn test_other_keys_accept_in_type_values() {
569 assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
571 assert!(ConfigKey::RowTtlScanInterval
572 .accept(Value::Duration(Duration::from_seconds(0).unwrap()))
573 .is_ok());
574 }
575
576 #[test]
577 fn test_cdc_ttl_round_trips_through_display_and_from_str() {
578 let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
579 assert_eq!(key, ConfigKey::CdcTtlDuration);
580 assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
581 }
582
583 #[test]
584 fn test_cdc_ttl_in_all() {
585 assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
586 }
587
588 #[test]
589 fn test_all_contains_every_compact_key_and_has_expected_len() {
590 let all = ConfigKey::all();
591 assert_eq!(all.len(), 19);
592 assert!(all.contains(&ConfigKey::CdcCompactInterval));
593 assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
594 assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
595 assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
596 assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
597 assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
598 assert!(all.contains(&ConfigKey::QueryRowBatchSize));
599 assert!(all.contains(&ConfigKey::ThreadsAsync));
600 assert!(all.contains(&ConfigKey::ThreadsSystem));
601 assert!(all.contains(&ConfigKey::ThreadsQuery));
602 }
603
604 #[test]
605 fn test_threads_keys_round_trip() {
606 assert_eq!("THREADS_ASYNC".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsAsync);
607 assert_eq!("THREADS_SYSTEM".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsSystem);
608 assert_eq!("THREADS_QUERY".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsQuery);
609 assert_eq!(format!("{}", ConfigKey::ThreadsAsync), "THREADS_ASYNC");
610 assert_eq!(format!("{}", ConfigKey::ThreadsSystem), "THREADS_SYSTEM");
611 assert_eq!(format!("{}", ConfigKey::ThreadsQuery), "THREADS_QUERY");
612 }
613
614 #[test]
615 fn test_threads_defaults() {
616 assert_eq!(ConfigKey::ThreadsAsync.default_value(), Value::Uint2(1));
617 assert_eq!(ConfigKey::ThreadsSystem.default_value(), Value::Uint2(2));
618 assert_eq!(ConfigKey::ThreadsQuery.default_value(), Value::Uint2(1));
619 }
620
621 #[test]
622 fn test_threads_reject_zero() {
623 for key in [ConfigKey::ThreadsAsync, ConfigKey::ThreadsSystem, ConfigKey::ThreadsQuery] {
624 match key.accept(Value::Uint2(0)).unwrap_err() {
625 AcceptError::InvalidValue(reason) => {
626 assert!(
627 reason.contains("greater than zero"),
628 "{key}: unexpected reason: {reason}"
629 );
630 }
631 other => panic!("{key}: expected InvalidValue, got {other:?}"),
632 }
633 }
634 }
635
636 #[test]
637 fn test_threads_accept_positive() {
638 assert_eq!(ConfigKey::ThreadsAsync.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
639 assert_eq!(ConfigKey::ThreadsSystem.accept(Value::Uint2(8)).unwrap(), Value::Uint2(8));
640 assert_eq!(ConfigKey::ThreadsQuery.accept(Value::Uint2(16)).unwrap(), Value::Uint2(16));
641 }
642
643 #[test]
644 fn test_threads_coerce_int4_to_uint2() {
645 let v = ConfigKey::ThreadsQuery.accept(Value::Int4(8)).unwrap();
646 assert_eq!(v, Value::Uint2(8));
647 }
648
649 #[test]
650 fn test_threads_require_restart() {
651 assert!(ConfigKey::ThreadsAsync.requires_restart());
652 assert!(ConfigKey::ThreadsSystem.requires_restart());
653 assert!(ConfigKey::ThreadsQuery.requires_restart());
654 }
655
656 #[test]
657 fn test_query_row_batch_size_default_is_uint2_32() {
658 assert_eq!(ConfigKey::QueryRowBatchSize.default_value(), Value::Uint2(32));
659 }
660
661 #[test]
662 fn test_query_row_batch_size_round_trips_through_display_and_from_str() {
663 let key: ConfigKey = "QUERY_ROW_BATCH_SIZE".parse().unwrap();
664 assert_eq!(key, ConfigKey::QueryRowBatchSize);
665 assert_eq!(format!("{}", ConfigKey::QueryRowBatchSize), "QUERY_ROW_BATCH_SIZE");
666 }
667
668 #[test]
669 fn test_query_row_batch_size_accept_rejects_zero() {
670 match ConfigKey::QueryRowBatchSize.accept(Value::Uint2(0)).unwrap_err() {
671 AcceptError::InvalidValue(reason) => {
672 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
673 }
674 other => panic!("expected InvalidValue, got {other:?}"),
675 }
676 }
677
678 #[test]
679 fn test_query_row_batch_size_accept_passes_positive() {
680 assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1)).unwrap(), Value::Uint2(1));
681 assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1024)).unwrap(), Value::Uint2(1024));
682 }
683
684 #[test]
685 fn test_query_row_batch_size_accept_rejects_zero_after_coercion() {
686 match ConfigKey::QueryRowBatchSize.accept(Value::Int4(0)).unwrap_err() {
687 AcceptError::InvalidValue(reason) => {
688 assert!(reason.contains("greater than zero"));
689 }
690 other => panic!("expected InvalidValue, got {other:?}"),
691 }
692 }
693
694 #[test]
695 fn test_query_row_batch_size_coerces_int4_to_uint2() {
696 let v = ConfigKey::QueryRowBatchSize.accept(Value::Int4(64)).unwrap();
697 assert_eq!(v, Value::Uint2(64));
698 }
699
700 #[test]
701 fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
702 let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
703 assert_eq!(key, ConfigKey::CdcCompactInterval);
704 assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
705 }
706
707 #[test]
708 fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
709 let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
710 assert_eq!(key, ConfigKey::CdcCompactBlockSize);
711 assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
712 }
713
714 #[test]
715 fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
716 let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
717 assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
718 assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
719 }
720
721 #[test]
722 fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
723 let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
724 assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
725 assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
726 }
727
728 #[test]
729 fn test_cdc_compact_interval_default_is_duration() {
730 assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
731 }
732
733 #[test]
734 fn test_cdc_compact_block_size_default_is_uint8_1024() {
735 assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
736 }
737
738 #[test]
739 fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
740 assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
741 }
742
743 #[test]
744 fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
745 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
746 }
747
748 #[test]
749 fn test_cdc_compact_interval_accept_passes_positive_duration() {
750 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
751 assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
752 }
753
754 #[test]
755 fn test_cdc_compact_interval_accept_rejects_zero() {
756 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
757 match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
758 AcceptError::InvalidValue(reason) => {
759 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
760 }
761 other => panic!("expected InvalidValue, got {other:?}"),
762 }
763 }
764
765 #[test]
766 fn test_cdc_compact_interval_accept_rejects_negative() {
767 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
768 assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
769 }
770
771 #[test]
772 fn test_cdc_compact_block_size_accept_rejects_zero() {
773 match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
774 AcceptError::InvalidValue(reason) => {
775 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
776 }
777 other => panic!("expected InvalidValue, got {other:?}"),
778 }
779 }
780
781 #[test]
782 fn test_cdc_compact_block_size_accept_passes_positive() {
783 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
784 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
785 }
786
787 #[test]
788 fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
789 assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
790 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
791 }
792
793 #[test]
794 fn test_accept_coerces_int4_to_uint8_for_block_size() {
795 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
797 assert_eq!(v, Value::Uint8(1024));
798 }
799
800 #[test]
801 fn test_accept_coerces_int8_to_uint8_for_block_size() {
802 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
803 assert_eq!(v, Value::Uint8(2048));
804 }
805
806 #[test]
807 fn test_accept_rejects_zero_after_coercion() {
808 match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
810 AcceptError::InvalidValue(reason) => {
811 assert!(reason.contains("greater than zero"));
812 }
813 other => panic!("expected InvalidValue, got {other:?}"),
814 }
815 }
816
817 #[test]
818 fn test_accept_rejects_negative_int_for_uint8_key() {
819 assert!(matches!(
821 ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
822 Err(AcceptError::TypeMismatch { .. })
823 ));
824 }
825
826 #[test]
827 fn test_accept_coerces_int_to_duration_via_seconds() {
828 let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
830 assert!(matches!(v, Value::Duration(_)));
831 }
832
833 #[test]
834 fn test_accept_idempotent_on_canonical_uint8() {
835 let canonical = Value::Uint8(42);
836 assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
837 }
838
839 #[test]
840 fn test_accept_idempotent_on_canonical_duration() {
841 let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
842 assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
843 }
844
845 #[test]
846 fn test_accept_rejects_typed_null_for_non_optional_key() {
847 let err = ConfigKey::CdcCompactBlockSize
848 .accept(Value::None {
849 inner: Type::Uint8,
850 })
851 .unwrap_err();
852 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
853 }
854
855 #[test]
856 fn test_accept_passes_typed_null_for_optional_key() {
857 let none = Value::None {
858 inner: Type::Duration,
859 };
860 assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
861 }
862
863 #[test]
864 fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
865 let err = ConfigKey::CdcTtlDuration
867 .accept(Value::None {
868 inner: Type::Uint8,
869 })
870 .unwrap_err();
871 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
872 }
873
874 #[test]
875 fn test_historical_gc_keys_round_trip() {
876 assert_eq!("HISTORICAL_GC_BATCH_SIZE".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcBatchSize);
877 assert_eq!("HISTORICAL_GC_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcInterval);
878 assert_eq!(format!("{}", ConfigKey::HistoricalGcBatchSize), "HISTORICAL_GC_BATCH_SIZE");
879 assert_eq!(format!("{}", ConfigKey::HistoricalGcInterval), "HISTORICAL_GC_INTERVAL");
880 }
881
882 #[test]
883 fn test_historical_gc_defaults() {
884 assert_eq!(ConfigKey::HistoricalGcBatchSize.default_value(), Value::Uint8(50_000));
885 assert!(matches!(ConfigKey::HistoricalGcInterval.default_value(), Value::Duration(_)));
886 }
887
888 #[test]
889 fn test_historical_gc_batch_size_rejects_zero() {
890 match ConfigKey::HistoricalGcBatchSize.accept(Value::Uint8(0)).unwrap_err() {
891 AcceptError::InvalidValue(reason) => {
892 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
893 }
894 other => panic!("expected InvalidValue, got {other:?}"),
895 }
896 }
897
898 #[test]
899 fn test_historical_gc_interval_rejects_zero() {
900 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
901 match ConfigKey::HistoricalGcInterval.accept(zero).unwrap_err() {
902 AcceptError::InvalidValue(reason) => {
903 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
904 }
905 other => panic!("expected InvalidValue, got {other:?}"),
906 }
907 }
908}