1use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_value::value::{
7 Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8 uint::Uint, value_type::ValueType,
9};
10
11use crate::common::CommitVersion;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum AcceptError {
15 TypeMismatch {
16 expected: Vec<ValueType>,
17 actual: ValueType,
18 },
19
20 InvalidValue(String),
21}
22
23impl fmt::Display for AcceptError {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 Self::TypeMismatch {
27 expected,
28 actual,
29 } => {
30 write!(f, "expected one of {:?}, got {:?}", expected, actual)
31 }
32 Self::InvalidValue(reason) => write!(f, "{reason}"),
33 }
34 }
35}
36
37#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum ConfigKey {
39 OracleWindowSize,
40 OracleWaterMark,
41 QueryRowBatchSize,
42 RowTtlScanBatchSize,
43 RowTtlScanInterval,
44 OperatorTtlScanBatchSize,
45 OperatorTtlScanInterval,
46 HistoricalGcBatchSize,
47 HistoricalGcInterval,
48 CdcTtlDuration,
49 CdcCompactInterval,
50 CdcCompactBlockSize,
51 CdcCompactSafetyLag,
52 CdcCompactMaxBlocksPerTick,
53 CdcCompactBlockCacheCapacity,
54 CdcCompactZstdLevel,
55 CdcRecentCacheCapacity,
56 MultiReadBufferCapacity,
57 FlowTick,
58 ThreadsAsync,
59 ThreadsSystem,
60 ThreadsQuery,
61 ThreadsCommit,
62 ThreadsBackground,
63 RuntimeMetricsInterval,
64 MetricFlushInterval,
65}
66
67impl ConfigKey {
68 pub fn all() -> &'static [Self] {
69 &[
70 Self::OracleWindowSize,
71 Self::OracleWaterMark,
72 Self::QueryRowBatchSize,
73 Self::RowTtlScanBatchSize,
74 Self::RowTtlScanInterval,
75 Self::OperatorTtlScanBatchSize,
76 Self::OperatorTtlScanInterval,
77 Self::HistoricalGcBatchSize,
78 Self::HistoricalGcInterval,
79 Self::CdcTtlDuration,
80 Self::CdcCompactInterval,
81 Self::CdcCompactBlockSize,
82 Self::CdcCompactSafetyLag,
83 Self::CdcCompactMaxBlocksPerTick,
84 Self::CdcCompactBlockCacheCapacity,
85 Self::CdcCompactZstdLevel,
86 Self::CdcRecentCacheCapacity,
87 Self::MultiReadBufferCapacity,
88 Self::FlowTick,
89 Self::ThreadsAsync,
90 Self::ThreadsSystem,
91 Self::ThreadsQuery,
92 Self::ThreadsCommit,
93 Self::ThreadsBackground,
94 Self::RuntimeMetricsInterval,
95 Self::MetricFlushInterval,
96 ]
97 }
98
99 pub fn default_value(&self) -> Value {
100 match self {
101 Self::OracleWindowSize => Value::Uint8(500),
102 Self::OracleWaterMark => Value::Uint8(20),
103 Self::QueryRowBatchSize => Value::Uint2(32),
104 Self::RowTtlScanBatchSize => Value::Uint8(10000),
105 Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
106 Self::OperatorTtlScanBatchSize => Value::Uint8(10000),
107 Self::OperatorTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
108 Self::HistoricalGcBatchSize => Value::Uint8(50_000),
109 Self::HistoricalGcInterval => Value::Duration(Duration::from_seconds(30).unwrap()),
110 Self::CdcTtlDuration => Value::None {
111 inner: ValueType::Duration,
112 },
113 Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
114 Self::CdcCompactBlockSize => Value::Uint8(1024),
115 Self::CdcCompactSafetyLag => Value::Uint8(1024),
116 Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
117 Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
118 Self::CdcCompactZstdLevel => Value::Uint1(7),
119 Self::CdcRecentCacheCapacity => Value::Uint8(128),
120 Self::MultiReadBufferCapacity => Value::Uint8(4096),
121 Self::FlowTick => Value::Duration(Duration::from_seconds(1).unwrap()),
122 Self::ThreadsAsync => Value::Uint2(1),
123 Self::ThreadsSystem => Value::Uint2(2),
124 Self::ThreadsQuery => Value::Uint2(1),
125 Self::ThreadsCommit => Value::Uint2(2),
126 Self::ThreadsBackground => Value::Uint2(1),
127 Self::RuntimeMetricsInterval => Value::Duration(Duration::from_seconds(5).unwrap()),
128 Self::MetricFlushInterval => Value::Duration(Duration::from_seconds(10).unwrap()),
129 }
130 }
131
132 pub fn description(&self) -> &'static str {
133 match self {
134 Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
135 Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
136 Self::QueryRowBatchSize => {
137 "Number of rows produced per batch by query / DML pipeline operators."
138 }
139 Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
140 Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
141 Self::OperatorTtlScanBatchSize => {
142 "Max rows to examine per batch during an operator-state TTL scan."
143 }
144 Self::OperatorTtlScanInterval => {
145 "How often the operator-state TTL actor should scan for expired rows."
146 }
147 Self::HistoricalGcBatchSize => {
148 "Max historical (key, version) pairs scanned per shape per historical GC tick."
149 }
150 Self::HistoricalGcInterval => {
151 "How often the historical-version GC actor sweeps __historical for versions older than the oracle read watermark."
152 }
153 Self::CdcTtlDuration => {
154 "Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
155 when set, must be > 0 and entries older than this duration are evicted regardless \
156 of consumer state."
157 }
158 Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
159 Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
160 Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
161 Self::CdcCompactMaxBlocksPerTick => {
162 "Upper bound on consecutive blocks produced per actor tick."
163 }
164 Self::CdcCompactBlockCacheCapacity => {
165 "Number of decompressed CDC blocks held in the in-memory LRU cache."
166 }
167 Self::CdcCompactZstdLevel => {
168 "Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
169 slower compression. Decompression cost is independent of level."
170 }
171 Self::CdcRecentCacheCapacity => {
172 "Number of most-recent decoded CDC entries held in memory so a caught-up consumer \
173 is served without re-reading and re-deserializing from the backend."
174 }
175 Self::MultiReadBufferCapacity => {
176 "Number of keys held in the multi-version store read buffer so cold reads \
177 evicted from the commit buffer are served without a persistent-tier lookup."
178 }
179 Self::FlowTick => {
180 "How often the deferred and transactional flow tick coordinators wake up to dispatch \
181 due flows."
182 }
183 Self::ThreadsAsync => {
184 "Number of worker threads for the async runtime. Must be >= 1. \
185 Read at boot before the runtime starts; changes require restart."
186 }
187 Self::ThreadsSystem => {
188 "Number of worker threads for the system pool (lightweight actors). \
189 Must be >= 1. Changes require restart."
190 }
191 Self::ThreadsQuery => {
192 "Number of worker threads for the query pool (execution-heavy actors). \
193 Must be >= 1. Changes require restart."
194 }
195 Self::ThreadsCommit => {
196 "Number of worker threads for the commit pool (synchronous pre-commit flow execution). \
197 Must be >= 1. Changes require restart."
198 }
199 Self::ThreadsBackground => {
200 "Number of worker threads for the background pool (non-critical cleanup and metrics actors). \
201 Must be >= 1. Changes require restart."
202 }
203 Self::RuntimeMetricsInterval => {
204 "How often the runtime-metrics sampler records a memory snapshot into \
205 system::metrics::runtime::memory::snapshots. When unset, the history sampler is \
206 dormant and only the live ::current view is available; when set, must be > 0."
207 }
208 Self::MetricFlushInterval => {
209 "How often the metric collector flushes accumulated storage and CDC stats into the \
210 system::metrics views. Must be > 0."
211 }
212 }
213 }
214
215 pub fn requires_restart(&self) -> bool {
216 match self {
217 Self::OracleWindowSize => false,
218 Self::OracleWaterMark => false,
219 Self::QueryRowBatchSize => false,
220 Self::RowTtlScanBatchSize => false,
221 Self::RowTtlScanInterval => false,
222 Self::OperatorTtlScanBatchSize => false,
223 Self::OperatorTtlScanInterval => false,
224 Self::HistoricalGcBatchSize => false,
225 Self::HistoricalGcInterval => false,
226 Self::CdcTtlDuration => false,
227 Self::CdcCompactInterval => false,
228 Self::CdcCompactBlockSize => false,
229 Self::CdcCompactSafetyLag => false,
230 Self::CdcCompactMaxBlocksPerTick => false,
231 Self::CdcCompactBlockCacheCapacity => true,
232 Self::CdcCompactZstdLevel => false,
233 Self::CdcRecentCacheCapacity => true,
234 Self::MultiReadBufferCapacity => true,
235 Self::FlowTick => false,
236 Self::ThreadsAsync => true,
237 Self::ThreadsSystem => true,
238 Self::ThreadsQuery => true,
239 Self::ThreadsCommit => true,
240 Self::ThreadsBackground => true,
241 Self::RuntimeMetricsInterval => false,
242 Self::MetricFlushInterval => false,
243 }
244 }
245
246 pub fn expected_types(&self) -> &'static [ValueType] {
247 match self {
248 Self::OracleWindowSize => &[ValueType::Uint8],
249 Self::OracleWaterMark => &[ValueType::Uint8],
250 Self::QueryRowBatchSize => &[ValueType::Uint2],
251 Self::RowTtlScanBatchSize => &[ValueType::Uint8],
252 Self::RowTtlScanInterval => &[ValueType::Duration],
253 Self::OperatorTtlScanBatchSize => &[ValueType::Uint8],
254 Self::OperatorTtlScanInterval => &[ValueType::Duration],
255 Self::HistoricalGcBatchSize => &[ValueType::Uint8],
256 Self::HistoricalGcInterval => &[ValueType::Duration],
257 Self::CdcTtlDuration => &[ValueType::Duration],
258 Self::CdcCompactInterval => &[ValueType::Duration],
259 Self::CdcCompactBlockSize => &[ValueType::Uint8],
260 Self::CdcCompactSafetyLag => &[ValueType::Uint8],
261 Self::CdcCompactMaxBlocksPerTick => &[ValueType::Uint8],
262 Self::CdcCompactBlockCacheCapacity => &[ValueType::Uint8],
263 Self::CdcCompactZstdLevel => &[ValueType::Uint1],
264 Self::CdcRecentCacheCapacity => &[ValueType::Uint8],
265 Self::MultiReadBufferCapacity => &[ValueType::Uint8],
266 Self::FlowTick => &[ValueType::Duration],
267 Self::ThreadsAsync => &[ValueType::Uint2],
268 Self::ThreadsSystem => &[ValueType::Uint2],
269 Self::ThreadsQuery => &[ValueType::Uint2],
270 Self::ThreadsCommit => &[ValueType::Uint2],
271 Self::ThreadsBackground => &[ValueType::Uint2],
272 Self::RuntimeMetricsInterval => &[ValueType::Duration],
273 Self::MetricFlushInterval => &[ValueType::Duration],
274 }
275 }
276
277 pub fn is_optional(&self) -> bool {
278 match self {
279 Self::OracleWindowSize => false,
280 Self::OracleWaterMark => false,
281 Self::QueryRowBatchSize => false,
282 Self::RowTtlScanBatchSize => false,
283 Self::RowTtlScanInterval => false,
284 Self::OperatorTtlScanBatchSize => false,
285 Self::OperatorTtlScanInterval => false,
286 Self::HistoricalGcBatchSize => false,
287 Self::HistoricalGcInterval => false,
288 Self::CdcTtlDuration => true,
289 Self::CdcCompactInterval => false,
290 Self::CdcCompactBlockSize => false,
291 Self::CdcCompactSafetyLag => false,
292 Self::CdcCompactMaxBlocksPerTick => false,
293 Self::CdcCompactBlockCacheCapacity => false,
294 Self::CdcCompactZstdLevel => false,
295 Self::CdcRecentCacheCapacity => false,
296 Self::MultiReadBufferCapacity => false,
297 Self::FlowTick => false,
298 Self::ThreadsAsync => false,
299 Self::ThreadsSystem => false,
300 Self::ThreadsQuery => false,
301 Self::ThreadsCommit => false,
302 Self::ThreadsBackground => false,
303 Self::RuntimeMetricsInterval => true,
304 Self::MetricFlushInterval => false,
305 }
306 }
307
308 fn validate_canonical(&self, value: &Value) -> Result<(), String> {
309 match self {
310 Self::CdcTtlDuration => match value {
311 Value::None {
312 ..
313 } => Ok(()),
314 Value::Duration(d) => {
315 if d.is_positive() {
316 Ok(())
317 } else {
318 Err("CDC_TTL_DURATION must be greater than zero".to_string())
319 }
320 }
321 _ => Ok(()),
322 },
323 Self::CdcCompactInterval => match value {
324 Value::Duration(d) => {
325 if d.is_positive() {
326 Ok(())
327 } else {
328 Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
329 }
330 }
331 _ => Ok(()),
332 },
333 Self::CdcCompactBlockSize => match value {
334 Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
335 _ => Ok(()),
336 },
337 Self::QueryRowBatchSize => match value {
338 Value::Uint2(0) => Err("QUERY_ROW_BATCH_SIZE must be greater than zero".to_string()),
339 _ => Ok(()),
340 },
341 Self::CdcCompactBlockCacheCapacity => match value {
342 Value::Uint8(0) => {
343 Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
344 }
345 _ => Ok(()),
346 },
347 Self::MultiReadBufferCapacity => match value {
348 Value::Uint8(0) => {
349 Err("MULTI_READ_BUFFER_CAPACITY must be greater than zero".to_string())
350 }
351 _ => Ok(()),
352 },
353 Self::CdcCompactZstdLevel => match value {
354 Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
355 Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
356 _ => Ok(()),
357 },
358 Self::HistoricalGcBatchSize => match value {
359 Value::Uint8(0) => {
360 Err("HISTORICAL_GC_BATCH_SIZE must be greater than zero".to_string())
361 }
362 _ => Ok(()),
363 },
364 Self::HistoricalGcInterval => match value {
365 Value::Duration(d) => {
366 if d.is_positive() {
367 Ok(())
368 } else {
369 Err("HISTORICAL_GC_INTERVAL must be greater than zero".to_string())
370 }
371 }
372 _ => Ok(()),
373 },
374 Self::FlowTick => match value {
375 Value::Duration(d) => {
376 if d.is_positive() {
377 Ok(())
378 } else {
379 Err("FLOW_TICK must be greater than zero".to_string())
380 }
381 }
382 _ => Ok(()),
383 },
384 Self::ThreadsAsync => match value {
385 Value::Uint2(0) => Err("THREADS_ASYNC must be greater than zero".to_string()),
386 _ => Ok(()),
387 },
388 Self::ThreadsSystem => match value {
389 Value::Uint2(0) => Err("THREADS_SYSTEM must be greater than zero".to_string()),
390 _ => Ok(()),
391 },
392 Self::ThreadsQuery => match value {
393 Value::Uint2(0) => Err("THREADS_QUERY must be greater than zero".to_string()),
394 _ => Ok(()),
395 },
396 Self::ThreadsCommit => match value {
397 Value::Uint2(0) => Err("THREADS_COMMIT must be greater than zero".to_string()),
398 _ => Ok(()),
399 },
400 Self::ThreadsBackground => match value {
401 Value::Uint2(0) => Err("THREADS_BACKGROUND must be greater than zero".to_string()),
402 _ => Ok(()),
403 },
404 Self::RuntimeMetricsInterval => match value {
405 Value::None {
406 ..
407 } => Ok(()),
408 Value::Duration(d) => {
409 if d.is_positive() {
410 Ok(())
411 } else {
412 Err("RUNTIME_METRICS_INTERVAL must be greater than zero".to_string())
413 }
414 }
415 _ => Ok(()),
416 },
417 Self::MetricFlushInterval => match value {
418 Value::Duration(d) => {
419 if d.is_positive() {
420 Ok(())
421 } else {
422 Err("METRIC_FLUSH_INTERVAL must be greater than zero".to_string())
423 }
424 }
425 _ => Ok(()),
426 },
427 _ => Ok(()),
428 }
429 }
430
431 pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
432 if let Value::None {
433 inner,
434 } = &value
435 {
436 if self.is_optional() && self.expected_types().contains(inner) {
437 return Ok(value);
438 }
439 return Err(AcceptError::TypeMismatch {
440 expected: self.expected_types().to_vec(),
441 actual: value.get_type(),
442 });
443 }
444
445 let canonical = if self.expected_types().contains(&value.get_type()) {
446 value
447 } else {
448 try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
449 expected: self.expected_types().to_vec(),
450 actual: value.get_type(),
451 })?
452 };
453
454 self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
455 Ok(canonical)
456 }
457}
458
459fn try_coerce_numeric(value: &Value, expected: &[ValueType]) -> Option<Value> {
460 for target in expected {
461 let coerced = match target {
462 ValueType::Uint1 => {
463 value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
464 }
465 ValueType::Uint2 => {
466 value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
467 }
468 ValueType::Uint4 => {
469 value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
470 }
471 ValueType::Uint8 => {
472 value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
473 }
474 ValueType::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
475 ValueType::Int1 => {
476 value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8))
477 }
478 ValueType::Int2 => {
479 value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
480 }
481 ValueType::Int4 => {
482 value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
483 }
484 ValueType::Int8 => {
485 value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
486 }
487 ValueType::Int16 => {
488 value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
489 }
490 ValueType::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
491 ValueType::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
492 ValueType::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
493 ValueType::Float4 => {
494 value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
495 }
496 ValueType::Float8 => {
497 value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
498 }
499 ValueType::Duration => value
500 .to_usize()
501 .and_then(|v| Duration::from_seconds(v as i64).ok())
502 .map(Value::Duration),
503 _ => None,
504 };
505 if coerced.is_some() {
506 return coerced;
507 }
508 }
509 None
510}
511
512impl fmt::Display for ConfigKey {
513 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514 match self {
515 Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
516 Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
517 Self::QueryRowBatchSize => write!(f, "QUERY_ROW_BATCH_SIZE"),
518 Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
519 Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
520 Self::OperatorTtlScanBatchSize => write!(f, "OPERATOR_TTL_SCAN_BATCH_SIZE"),
521 Self::OperatorTtlScanInterval => write!(f, "OPERATOR_TTL_SCAN_INTERVAL"),
522 Self::HistoricalGcBatchSize => write!(f, "HISTORICAL_GC_BATCH_SIZE"),
523 Self::HistoricalGcInterval => write!(f, "HISTORICAL_GC_INTERVAL"),
524 Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
525 Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
526 Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
527 Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
528 Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
529 Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
530 Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
531 Self::CdcRecentCacheCapacity => write!(f, "CDC_RECENT_CACHE_CAPACITY"),
532 Self::MultiReadBufferCapacity => write!(f, "MULTI_READ_BUFFER_CAPACITY"),
533 Self::FlowTick => write!(f, "FLOW_TICK"),
534 Self::ThreadsAsync => write!(f, "THREADS_ASYNC"),
535 Self::ThreadsSystem => write!(f, "THREADS_SYSTEM"),
536 Self::ThreadsQuery => write!(f, "THREADS_QUERY"),
537 Self::ThreadsCommit => write!(f, "THREADS_COMMIT"),
538 Self::ThreadsBackground => write!(f, "THREADS_BACKGROUND"),
539 Self::RuntimeMetricsInterval => write!(f, "RUNTIME_METRICS_INTERVAL"),
540 Self::MetricFlushInterval => write!(f, "METRIC_FLUSH_INTERVAL"),
541 }
542 }
543}
544
545impl FromStr for ConfigKey {
546 type Err = String;
547
548 fn from_str(s: &str) -> Result<Self, Self::Err> {
549 match s {
550 "ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
551 "ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
552 "QUERY_ROW_BATCH_SIZE" => Ok(Self::QueryRowBatchSize),
553 "ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
554 "ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
555 "OPERATOR_TTL_SCAN_BATCH_SIZE" => Ok(Self::OperatorTtlScanBatchSize),
556 "OPERATOR_TTL_SCAN_INTERVAL" => Ok(Self::OperatorTtlScanInterval),
557 "HISTORICAL_GC_BATCH_SIZE" => Ok(Self::HistoricalGcBatchSize),
558 "HISTORICAL_GC_INTERVAL" => Ok(Self::HistoricalGcInterval),
559 "CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
560 "CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
561 "CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
562 "CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
563 "CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
564 "CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
565 "CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
566 "CDC_RECENT_CACHE_CAPACITY" => Ok(Self::CdcRecentCacheCapacity),
567 "MULTI_READ_BUFFER_CAPACITY" => Ok(Self::MultiReadBufferCapacity),
568 "FLOW_TICK" => Ok(Self::FlowTick),
569 "THREADS_ASYNC" => Ok(Self::ThreadsAsync),
570 "THREADS_SYSTEM" => Ok(Self::ThreadsSystem),
571 "THREADS_QUERY" => Ok(Self::ThreadsQuery),
572 "THREADS_COMMIT" => Ok(Self::ThreadsCommit),
573 "THREADS_BACKGROUND" => Ok(Self::ThreadsBackground),
574 "RUNTIME_METRICS_INTERVAL" => Ok(Self::RuntimeMetricsInterval),
575 "METRIC_FLUSH_INTERVAL" => Ok(Self::MetricFlushInterval),
576 _ => Err(format!("Unknown system configuration key: {}", s)),
577 }
578 }
579}
580
581#[derive(Debug, Clone)]
582pub struct Config {
583 pub key: ConfigKey,
584
585 pub value: Value,
586
587 pub default_value: Value,
588
589 pub description: &'static str,
590
591 pub requires_restart: bool,
592}
593
594pub trait GetConfig: Send + Sync {
595 fn get_config(&self, key: ConfigKey) -> Value;
596
597 fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
598
599 fn get_config_uint8(&self, key: ConfigKey) -> u64 {
600 let val = self.get_config(key);
601 match val {
602 Value::Uint8(v) => v,
603 v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
604 }
605 }
606
607 fn get_config_uint1(&self, key: ConfigKey) -> u8 {
608 let val = self.get_config(key);
609 match val {
610 Value::Uint1(v) => v,
611 v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
612 }
613 }
614
615 fn get_config_uint2(&self, key: ConfigKey) -> u16 {
616 let val = self.get_config(key);
617 match val {
618 Value::Uint2(v) => v,
619 v => panic!("config key '{}' expected Uint2, got {:?}", key, v),
620 }
621 }
622
623 fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
624 let val = self.get_config(key);
625 match val {
626 Value::Duration(v) => {
627 let total_nanos =
628 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
629 StdDuration::from_nanos(total_nanos.max(0) as u64)
630 }
631 v => panic!("config key '{}' expected Duration, got {:?}", key, v),
632 }
633 }
634
635 fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
636 match self.get_config(key) {
637 Value::None {
638 ..
639 } => None,
640 Value::Duration(v) => {
641 let total_nanos =
642 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
643 Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
644 }
645 v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
646 }
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653
654 #[test]
655 fn test_cdc_ttl_default_is_typed_null() {
656 let default = ConfigKey::CdcTtlDuration.default_value();
658 assert!(matches!(
659 default,
660 Value::None {
661 inner: ValueType::Duration
662 }
663 ));
664 }
665
666 #[test]
667 fn test_cdc_ttl_accept_passes_typed_null() {
668 let none = Value::None {
669 inner: ValueType::Duration,
670 };
671 let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
672 assert_eq!(v, none);
673 }
674
675 #[test]
676 fn test_cdc_ttl_accept_passes_positive_duration() {
677 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
678 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
679
680 let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
681 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
682 }
683
684 #[test]
685 fn test_cdc_ttl_accept_rejects_zero() {
686 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
687 match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
688 AcceptError::InvalidValue(reason) => {
689 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
690 }
691 other => panic!("expected InvalidValue, got {other:?}"),
692 }
693 }
694
695 #[test]
696 fn test_cdc_ttl_accept_rejects_negative() {
697 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
698 assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
699 }
700
701 #[test]
702 fn test_other_keys_accept_in_type_values() {
703 assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
705 assert!(ConfigKey::RowTtlScanInterval
706 .accept(Value::Duration(Duration::from_seconds(0).unwrap()))
707 .is_ok());
708 }
709
710 #[test]
711 fn test_cdc_ttl_round_trips_through_display_and_from_str() {
712 let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
713 assert_eq!(key, ConfigKey::CdcTtlDuration);
714 assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
715 }
716
717 #[test]
718 fn test_cdc_ttl_in_all() {
719 assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
720 }
721
722 #[test]
723 fn test_all_contains_every_compact_key_and_has_expected_len() {
724 let all = ConfigKey::all();
725 assert_eq!(all.len(), 26);
726 assert!(all.contains(&ConfigKey::CdcCompactInterval));
727 assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
728 assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
729 assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
730 assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
731 assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
732 assert!(all.contains(&ConfigKey::CdcRecentCacheCapacity));
733 assert!(all.contains(&ConfigKey::MultiReadBufferCapacity));
734 assert!(all.contains(&ConfigKey::QueryRowBatchSize));
735 assert!(all.contains(&ConfigKey::ThreadsAsync));
736 assert!(all.contains(&ConfigKey::ThreadsSystem));
737 assert!(all.contains(&ConfigKey::ThreadsQuery));
738 assert!(all.contains(&ConfigKey::ThreadsCommit));
739 assert!(all.contains(&ConfigKey::ThreadsBackground));
740 assert!(all.contains(&ConfigKey::RuntimeMetricsInterval));
741 assert!(all.contains(&ConfigKey::MetricFlushInterval));
742 }
743
744 #[test]
745 fn test_runtime_metrics_interval_metadata() {
746 assert_eq!(
748 ConfigKey::RuntimeMetricsInterval.default_value(),
749 Value::Duration(Duration::from_seconds(5).unwrap())
750 );
751 assert_eq!(ConfigKey::RuntimeMetricsInterval.expected_types(), &[ValueType::Duration]);
752 assert!(ConfigKey::RuntimeMetricsInterval.is_optional());
753 }
754
755 #[test]
756 fn test_runtime_metrics_interval_round_trip() {
757 assert_eq!("RUNTIME_METRICS_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::RuntimeMetricsInterval);
758 assert_eq!(format!("{}", ConfigKey::RuntimeMetricsInterval), "RUNTIME_METRICS_INTERVAL");
759 }
760
761 #[test]
762 fn test_runtime_metrics_interval_accepts_none_and_positive_rejects_zero() {
763 let none = Value::None {
764 inner: ValueType::Duration,
765 };
766 assert_eq!(ConfigKey::RuntimeMetricsInterval.accept(none.clone()).unwrap(), none);
767
768 let five = Value::Duration(Duration::from_seconds(5).unwrap());
769 assert_eq!(ConfigKey::RuntimeMetricsInterval.accept(five.clone()).unwrap(), five);
770
771 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
772 assert!(matches!(ConfigKey::RuntimeMetricsInterval.accept(zero), Err(AcceptError::InvalidValue(_))));
773 }
774
775 #[test]
776 fn test_metric_flush_interval_metadata() {
777 assert_eq!(
779 ConfigKey::MetricFlushInterval.default_value(),
780 Value::Duration(Duration::from_seconds(10).unwrap())
781 );
782 assert_eq!(ConfigKey::MetricFlushInterval.expected_types(), &[ValueType::Duration]);
783 assert!(!ConfigKey::MetricFlushInterval.is_optional());
784 assert!(!ConfigKey::MetricFlushInterval.requires_restart());
785 }
786
787 #[test]
788 fn test_metric_flush_interval_round_trip() {
789 assert_eq!("METRIC_FLUSH_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::MetricFlushInterval);
790 assert_eq!(format!("{}", ConfigKey::MetricFlushInterval), "METRIC_FLUSH_INTERVAL");
791 }
792
793 #[test]
794 fn test_metric_flush_interval_accepts_positive_rejects_zero() {
795 let ten = Value::Duration(Duration::from_seconds(10).unwrap());
796 assert_eq!(ConfigKey::MetricFlushInterval.accept(ten.clone()).unwrap(), ten);
797
798 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
799 assert!(matches!(ConfigKey::MetricFlushInterval.accept(zero), Err(AcceptError::InvalidValue(_))));
800 }
801
802 #[test]
803 fn test_cdc_recent_cache_capacity_round_trip() {
804 assert_eq!(
805 "CDC_RECENT_CACHE_CAPACITY".parse::<ConfigKey>().unwrap(),
806 ConfigKey::CdcRecentCacheCapacity
807 );
808 assert_eq!(format!("{}", ConfigKey::CdcRecentCacheCapacity), "CDC_RECENT_CACHE_CAPACITY");
809 }
810
811 #[test]
812 fn test_cdc_recent_cache_capacity_metadata() {
813 assert_eq!(ConfigKey::CdcRecentCacheCapacity.default_value(), Value::Uint8(128));
814 assert_eq!(ConfigKey::CdcRecentCacheCapacity.expected_types(), &[ValueType::Uint8]);
815 assert!(ConfigKey::CdcRecentCacheCapacity.requires_restart());
816 assert!(!ConfigKey::CdcRecentCacheCapacity.is_optional());
817 }
818
819 #[test]
820 fn test_multi_read_cache_capacity_round_trip() {
821 assert_eq!(
822 "MULTI_READ_BUFFER_CAPACITY".parse::<ConfigKey>().unwrap(),
823 ConfigKey::MultiReadBufferCapacity
824 );
825 assert_eq!(format!("{}", ConfigKey::MultiReadBufferCapacity), "MULTI_READ_BUFFER_CAPACITY");
826 }
827
828 #[test]
829 fn test_multi_read_cache_capacity_metadata_and_rejects_zero() {
830 assert_eq!(ConfigKey::MultiReadBufferCapacity.default_value(), Value::Uint8(4096));
831 assert_eq!(ConfigKey::MultiReadBufferCapacity.expected_types(), &[ValueType::Uint8]);
832 assert!(ConfigKey::MultiReadBufferCapacity.requires_restart());
833 assert!(!ConfigKey::MultiReadBufferCapacity.is_optional());
834 match ConfigKey::MultiReadBufferCapacity.accept(Value::Uint8(0)).unwrap_err() {
835 AcceptError::InvalidValue(reason) => {
836 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
837 }
838 other => panic!("expected InvalidValue, got {other:?}"),
839 }
840 }
841
842 #[test]
843 fn test_threads_keys_round_trip() {
844 assert_eq!("THREADS_ASYNC".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsAsync);
845 assert_eq!("THREADS_SYSTEM".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsSystem);
846 assert_eq!("THREADS_QUERY".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsQuery);
847 assert_eq!("THREADS_COMMIT".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsCommit);
848 assert_eq!("THREADS_BACKGROUND".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsBackground);
849 assert_eq!(format!("{}", ConfigKey::ThreadsAsync), "THREADS_ASYNC");
850 assert_eq!(format!("{}", ConfigKey::ThreadsSystem), "THREADS_SYSTEM");
851 assert_eq!(format!("{}", ConfigKey::ThreadsQuery), "THREADS_QUERY");
852 assert_eq!(format!("{}", ConfigKey::ThreadsCommit), "THREADS_COMMIT");
853 assert_eq!(format!("{}", ConfigKey::ThreadsBackground), "THREADS_BACKGROUND");
854 }
855
856 #[test]
857 fn test_threads_defaults() {
858 assert_eq!(ConfigKey::ThreadsAsync.default_value(), Value::Uint2(1));
859 assert_eq!(ConfigKey::ThreadsSystem.default_value(), Value::Uint2(2));
860 assert_eq!(ConfigKey::ThreadsQuery.default_value(), Value::Uint2(1));
861 assert_eq!(ConfigKey::ThreadsCommit.default_value(), Value::Uint2(2));
862 assert_eq!(ConfigKey::ThreadsBackground.default_value(), Value::Uint2(1));
863 }
864
865 #[test]
866 fn test_threads_reject_zero() {
867 for key in [
868 ConfigKey::ThreadsAsync,
869 ConfigKey::ThreadsSystem,
870 ConfigKey::ThreadsQuery,
871 ConfigKey::ThreadsCommit,
872 ConfigKey::ThreadsBackground,
873 ] {
874 match key.accept(Value::Uint2(0)).unwrap_err() {
875 AcceptError::InvalidValue(reason) => {
876 assert!(
877 reason.contains("greater than zero"),
878 "{key}: unexpected reason: {reason}"
879 );
880 }
881 other => panic!("{key}: expected InvalidValue, got {other:?}"),
882 }
883 }
884 }
885
886 #[test]
887 fn test_threads_accept_positive() {
888 assert_eq!(ConfigKey::ThreadsAsync.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
889 assert_eq!(ConfigKey::ThreadsSystem.accept(Value::Uint2(8)).unwrap(), Value::Uint2(8));
890 assert_eq!(ConfigKey::ThreadsQuery.accept(Value::Uint2(16)).unwrap(), Value::Uint2(16));
891 assert_eq!(ConfigKey::ThreadsCommit.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
892 assert_eq!(ConfigKey::ThreadsBackground.accept(Value::Uint2(2)).unwrap(), Value::Uint2(2));
893 }
894
895 #[test]
896 fn test_threads_coerce_int4_to_uint2() {
897 let v = ConfigKey::ThreadsQuery.accept(Value::Int4(8)).unwrap();
898 assert_eq!(v, Value::Uint2(8));
899 }
900
901 #[test]
902 fn test_threads_require_restart() {
903 assert!(ConfigKey::ThreadsAsync.requires_restart());
904 assert!(ConfigKey::ThreadsSystem.requires_restart());
905 assert!(ConfigKey::ThreadsQuery.requires_restart());
906 assert!(ConfigKey::ThreadsCommit.requires_restart());
907 assert!(ConfigKey::ThreadsBackground.requires_restart());
908 }
909
910 #[test]
911 fn test_query_row_batch_size_default_is_uint2_32() {
912 assert_eq!(ConfigKey::QueryRowBatchSize.default_value(), Value::Uint2(32));
913 }
914
915 #[test]
916 fn test_query_row_batch_size_round_trips_through_display_and_from_str() {
917 let key: ConfigKey = "QUERY_ROW_BATCH_SIZE".parse().unwrap();
918 assert_eq!(key, ConfigKey::QueryRowBatchSize);
919 assert_eq!(format!("{}", ConfigKey::QueryRowBatchSize), "QUERY_ROW_BATCH_SIZE");
920 }
921
922 #[test]
923 fn test_query_row_batch_size_accept_rejects_zero() {
924 match ConfigKey::QueryRowBatchSize.accept(Value::Uint2(0)).unwrap_err() {
925 AcceptError::InvalidValue(reason) => {
926 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
927 }
928 other => panic!("expected InvalidValue, got {other:?}"),
929 }
930 }
931
932 #[test]
933 fn test_query_row_batch_size_accept_passes_positive() {
934 assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1)).unwrap(), Value::Uint2(1));
935 assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1024)).unwrap(), Value::Uint2(1024));
936 }
937
938 #[test]
939 fn test_query_row_batch_size_accept_rejects_zero_after_coercion() {
940 match ConfigKey::QueryRowBatchSize.accept(Value::Int4(0)).unwrap_err() {
941 AcceptError::InvalidValue(reason) => {
942 assert!(reason.contains("greater than zero"));
943 }
944 other => panic!("expected InvalidValue, got {other:?}"),
945 }
946 }
947
948 #[test]
949 fn test_query_row_batch_size_coerces_int4_to_uint2() {
950 let v = ConfigKey::QueryRowBatchSize.accept(Value::Int4(64)).unwrap();
951 assert_eq!(v, Value::Uint2(64));
952 }
953
954 #[test]
955 fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
956 let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
957 assert_eq!(key, ConfigKey::CdcCompactInterval);
958 assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
959 }
960
961 #[test]
962 fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
963 let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
964 assert_eq!(key, ConfigKey::CdcCompactBlockSize);
965 assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
966 }
967
968 #[test]
969 fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
970 let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
971 assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
972 assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
973 }
974
975 #[test]
976 fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
977 let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
978 assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
979 assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
980 }
981
982 #[test]
983 fn test_cdc_compact_interval_default_is_duration() {
984 assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
985 }
986
987 #[test]
988 fn test_cdc_compact_block_size_default_is_uint8_1024() {
989 assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
990 }
991
992 #[test]
993 fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
994 assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
995 }
996
997 #[test]
998 fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
999 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
1000 }
1001
1002 #[test]
1003 fn test_cdc_compact_interval_accept_passes_positive_duration() {
1004 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
1005 assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
1006 }
1007
1008 #[test]
1009 fn test_cdc_compact_interval_accept_rejects_zero() {
1010 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
1011 match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
1012 AcceptError::InvalidValue(reason) => {
1013 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1014 }
1015 other => panic!("expected InvalidValue, got {other:?}"),
1016 }
1017 }
1018
1019 #[test]
1020 fn test_cdc_compact_interval_accept_rejects_negative() {
1021 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
1022 assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
1023 }
1024
1025 #[test]
1026 fn test_cdc_compact_block_size_accept_rejects_zero() {
1027 match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
1028 AcceptError::InvalidValue(reason) => {
1029 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1030 }
1031 other => panic!("expected InvalidValue, got {other:?}"),
1032 }
1033 }
1034
1035 #[test]
1036 fn test_cdc_compact_block_size_accept_passes_positive() {
1037 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
1038 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
1039 }
1040
1041 #[test]
1042 fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
1043 assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
1044 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
1045 }
1046
1047 #[test]
1048 fn test_accept_coerces_int4_to_uint8_for_block_size() {
1049 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
1051 assert_eq!(v, Value::Uint8(1024));
1052 }
1053
1054 #[test]
1055 fn test_accept_coerces_int8_to_uint8_for_block_size() {
1056 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
1057 assert_eq!(v, Value::Uint8(2048));
1058 }
1059
1060 #[test]
1061 fn test_accept_rejects_zero_after_coercion() {
1062 match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
1064 AcceptError::InvalidValue(reason) => {
1065 assert!(reason.contains("greater than zero"));
1066 }
1067 other => panic!("expected InvalidValue, got {other:?}"),
1068 }
1069 }
1070
1071 #[test]
1072 fn test_accept_rejects_negative_int_for_uint8_key() {
1073 assert!(matches!(
1075 ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
1076 Err(AcceptError::TypeMismatch { .. })
1077 ));
1078 }
1079
1080 #[test]
1081 fn test_accept_coerces_int_to_duration_via_seconds() {
1082 let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
1084 assert!(matches!(v, Value::Duration(_)));
1085 }
1086
1087 #[test]
1088 fn test_accept_idempotent_on_canonical_uint8() {
1089 let canonical = Value::Uint8(42);
1090 assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
1091 }
1092
1093 #[test]
1094 fn test_accept_idempotent_on_canonical_duration() {
1095 let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
1096 assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
1097 }
1098
1099 #[test]
1100 fn test_accept_rejects_typed_null_for_non_optional_key() {
1101 let err = ConfigKey::CdcCompactBlockSize
1102 .accept(Value::None {
1103 inner: ValueType::Uint8,
1104 })
1105 .unwrap_err();
1106 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
1107 }
1108
1109 #[test]
1110 fn test_accept_passes_typed_null_for_optional_key() {
1111 let none = Value::None {
1112 inner: ValueType::Duration,
1113 };
1114 assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
1115 }
1116
1117 #[test]
1118 fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
1119 let err = ConfigKey::CdcTtlDuration
1121 .accept(Value::None {
1122 inner: ValueType::Uint8,
1123 })
1124 .unwrap_err();
1125 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
1126 }
1127
1128 #[test]
1129 fn test_historical_gc_keys_round_trip() {
1130 assert_eq!("HISTORICAL_GC_BATCH_SIZE".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcBatchSize);
1131 assert_eq!("HISTORICAL_GC_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcInterval);
1132 assert_eq!(format!("{}", ConfigKey::HistoricalGcBatchSize), "HISTORICAL_GC_BATCH_SIZE");
1133 assert_eq!(format!("{}", ConfigKey::HistoricalGcInterval), "HISTORICAL_GC_INTERVAL");
1134 }
1135
1136 #[test]
1137 fn test_historical_gc_defaults() {
1138 assert_eq!(ConfigKey::HistoricalGcBatchSize.default_value(), Value::Uint8(50_000));
1139 assert!(matches!(ConfigKey::HistoricalGcInterval.default_value(), Value::Duration(_)));
1140 }
1141
1142 #[test]
1143 fn test_historical_gc_batch_size_rejects_zero() {
1144 match ConfigKey::HistoricalGcBatchSize.accept(Value::Uint8(0)).unwrap_err() {
1145 AcceptError::InvalidValue(reason) => {
1146 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1147 }
1148 other => panic!("expected InvalidValue, got {other:?}"),
1149 }
1150 }
1151
1152 #[test]
1153 fn test_historical_gc_interval_rejects_zero() {
1154 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
1155 match ConfigKey::HistoricalGcInterval.accept(zero).unwrap_err() {
1156 AcceptError::InvalidValue(reason) => {
1157 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1158 }
1159 other => panic!("expected InvalidValue, got {other:?}"),
1160 }
1161 }
1162}