Skip to main content

reifydb_core/interface/catalog/
config.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_value::value::{
7	Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8	uint::Uint, value_type::ValueType,
9};
10
11use crate::common::CommitVersion;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum AcceptError {
15	TypeMismatch {
16		expected: Vec<ValueType>,
17		actual: ValueType,
18	},
19
20	InvalidValue(String),
21}
22
23impl fmt::Display for AcceptError {
24	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25		match self {
26			Self::TypeMismatch {
27				expected,
28				actual,
29			} => {
30				write!(f, "expected one of {:?}, got {:?}", expected, actual)
31			}
32			Self::InvalidValue(reason) => write!(f, "{reason}"),
33		}
34	}
35}
36
37#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum ConfigKey {
39	OracleWindowSize,
40	OracleWaterMark,
41	QueryRowBatchSize,
42	RowTtlScanBatchSize,
43	RowTtlScanInterval,
44	OperatorTtlScanBatchSize,
45	OperatorTtlScanInterval,
46	HistoricalGcBatchSize,
47	HistoricalGcInterval,
48	CdcTtlDuration,
49	CdcCompactInterval,
50	CdcCompactBlockSize,
51	CdcCompactSafetyLag,
52	CdcCompactMaxBlocksPerTick,
53	CdcCompactBlockCacheCapacity,
54	CdcCompactZstdLevel,
55	CdcRecentCacheCapacity,
56	MultiReadBufferCapacity,
57	FlowTick,
58	ThreadsAsync,
59	ThreadsSystem,
60	ThreadsQuery,
61	ThreadsCommit,
62	ThreadsBackground,
63	RuntimeMetricsInterval,
64	MetricFlushInterval,
65}
66
67impl ConfigKey {
68	pub fn all() -> &'static [Self] {
69		&[
70			Self::OracleWindowSize,
71			Self::OracleWaterMark,
72			Self::QueryRowBatchSize,
73			Self::RowTtlScanBatchSize,
74			Self::RowTtlScanInterval,
75			Self::OperatorTtlScanBatchSize,
76			Self::OperatorTtlScanInterval,
77			Self::HistoricalGcBatchSize,
78			Self::HistoricalGcInterval,
79			Self::CdcTtlDuration,
80			Self::CdcCompactInterval,
81			Self::CdcCompactBlockSize,
82			Self::CdcCompactSafetyLag,
83			Self::CdcCompactMaxBlocksPerTick,
84			Self::CdcCompactBlockCacheCapacity,
85			Self::CdcCompactZstdLevel,
86			Self::CdcRecentCacheCapacity,
87			Self::MultiReadBufferCapacity,
88			Self::FlowTick,
89			Self::ThreadsAsync,
90			Self::ThreadsSystem,
91			Self::ThreadsQuery,
92			Self::ThreadsCommit,
93			Self::ThreadsBackground,
94			Self::RuntimeMetricsInterval,
95			Self::MetricFlushInterval,
96		]
97	}
98
99	pub fn default_value(&self) -> Value {
100		match self {
101			Self::OracleWindowSize => Value::Uint8(500),
102			Self::OracleWaterMark => Value::Uint8(20),
103			Self::QueryRowBatchSize => Value::Uint2(32),
104			Self::RowTtlScanBatchSize => Value::Uint8(10000),
105			Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
106			Self::OperatorTtlScanBatchSize => Value::Uint8(10000),
107			Self::OperatorTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
108			Self::HistoricalGcBatchSize => Value::Uint8(50_000),
109			Self::HistoricalGcInterval => Value::Duration(Duration::from_seconds(30).unwrap()),
110			Self::CdcTtlDuration => Value::None {
111				inner: ValueType::Duration,
112			},
113			Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
114			Self::CdcCompactBlockSize => Value::Uint8(1024),
115			Self::CdcCompactSafetyLag => Value::Uint8(1024),
116			Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
117			Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
118			Self::CdcCompactZstdLevel => Value::Uint1(7),
119			Self::CdcRecentCacheCapacity => Value::Uint8(128),
120			Self::MultiReadBufferCapacity => Value::Uint8(4096),
121			Self::FlowTick => Value::Duration(Duration::from_seconds(1).unwrap()),
122			Self::ThreadsAsync => Value::Uint2(1),
123			Self::ThreadsSystem => Value::Uint2(2),
124			Self::ThreadsQuery => Value::Uint2(1),
125			Self::ThreadsCommit => Value::Uint2(2),
126			Self::ThreadsBackground => Value::Uint2(1),
127			Self::RuntimeMetricsInterval => Value::Duration(Duration::from_seconds(5).unwrap()),
128			Self::MetricFlushInterval => Value::Duration(Duration::from_seconds(10).unwrap()),
129		}
130	}
131
132	pub fn description(&self) -> &'static str {
133		match self {
134			Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
135			Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
136			Self::QueryRowBatchSize => {
137				"Number of rows produced per batch by query / DML pipeline operators."
138			}
139			Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
140			Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
141			Self::OperatorTtlScanBatchSize => {
142				"Max rows to examine per batch during an operator-state TTL scan."
143			}
144			Self::OperatorTtlScanInterval => {
145				"How often the operator-state TTL actor should scan for expired rows."
146			}
147			Self::HistoricalGcBatchSize => {
148				"Max historical (key, version) pairs scanned per shape per historical GC tick."
149			}
150			Self::HistoricalGcInterval => {
151				"How often the historical-version GC actor sweeps __historical for versions older than the oracle read watermark."
152			}
153			Self::CdcTtlDuration => {
154				"Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
155				 when set, must be > 0 and entries older than this duration are evicted regardless \
156				 of consumer state."
157			}
158			Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
159			Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
160			Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
161			Self::CdcCompactMaxBlocksPerTick => {
162				"Upper bound on consecutive blocks produced per actor tick."
163			}
164			Self::CdcCompactBlockCacheCapacity => {
165				"Number of decompressed CDC blocks held in the in-memory LRU cache."
166			}
167			Self::CdcCompactZstdLevel => {
168				"Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
169				 slower compression. Decompression cost is independent of level."
170			}
171			Self::CdcRecentCacheCapacity => {
172				"Number of most-recent decoded CDC entries held in memory so a caught-up consumer \
173				 is served without re-reading and re-deserializing from the backend."
174			}
175			Self::MultiReadBufferCapacity => {
176				"Number of keys held in the multi-version store read buffer so cold reads \
177				 evicted from the commit buffer are served without a persistent-tier lookup."
178			}
179			Self::FlowTick => {
180				"How often the deferred and transactional flow tick coordinators wake up to dispatch \
181				 due flows."
182			}
183			Self::ThreadsAsync => {
184				"Number of worker threads for the async runtime. Must be >= 1. \
185				 Read at boot before the runtime starts; changes require restart."
186			}
187			Self::ThreadsSystem => {
188				"Number of worker threads for the system pool (lightweight actors). \
189				 Must be >= 1. Changes require restart."
190			}
191			Self::ThreadsQuery => {
192				"Number of worker threads for the query pool (execution-heavy actors). \
193				 Must be >= 1. Changes require restart."
194			}
195			Self::ThreadsCommit => {
196				"Number of worker threads for the commit pool (synchronous pre-commit flow execution). \
197				 Must be >= 1. Changes require restart."
198			}
199			Self::ThreadsBackground => {
200				"Number of worker threads for the background pool (non-critical cleanup and metrics actors). \
201				 Must be >= 1. Changes require restart."
202			}
203			Self::RuntimeMetricsInterval => {
204				"How often the runtime-metrics sampler records a memory snapshot into \
205				 system::metrics::runtime::memory::snapshots. When unset, the history sampler is \
206				 dormant and only the live ::current view is available; when set, must be > 0."
207			}
208			Self::MetricFlushInterval => {
209				"How often the metric collector flushes accumulated storage and CDC stats into the \
210				 system::metrics views. Must be > 0."
211			}
212		}
213	}
214
215	pub fn requires_restart(&self) -> bool {
216		match self {
217			Self::OracleWindowSize => false,
218			Self::OracleWaterMark => false,
219			Self::QueryRowBatchSize => false,
220			Self::RowTtlScanBatchSize => false,
221			Self::RowTtlScanInterval => false,
222			Self::OperatorTtlScanBatchSize => false,
223			Self::OperatorTtlScanInterval => false,
224			Self::HistoricalGcBatchSize => false,
225			Self::HistoricalGcInterval => false,
226			Self::CdcTtlDuration => false,
227			Self::CdcCompactInterval => false,
228			Self::CdcCompactBlockSize => false,
229			Self::CdcCompactSafetyLag => false,
230			Self::CdcCompactMaxBlocksPerTick => false,
231			Self::CdcCompactBlockCacheCapacity => true,
232			Self::CdcCompactZstdLevel => false,
233			Self::CdcRecentCacheCapacity => true,
234			Self::MultiReadBufferCapacity => true,
235			Self::FlowTick => false,
236			Self::ThreadsAsync => true,
237			Self::ThreadsSystem => true,
238			Self::ThreadsQuery => true,
239			Self::ThreadsCommit => true,
240			Self::ThreadsBackground => true,
241			Self::RuntimeMetricsInterval => false,
242			Self::MetricFlushInterval => false,
243		}
244	}
245
246	pub fn expected_types(&self) -> &'static [ValueType] {
247		match self {
248			Self::OracleWindowSize => &[ValueType::Uint8],
249			Self::OracleWaterMark => &[ValueType::Uint8],
250			Self::QueryRowBatchSize => &[ValueType::Uint2],
251			Self::RowTtlScanBatchSize => &[ValueType::Uint8],
252			Self::RowTtlScanInterval => &[ValueType::Duration],
253			Self::OperatorTtlScanBatchSize => &[ValueType::Uint8],
254			Self::OperatorTtlScanInterval => &[ValueType::Duration],
255			Self::HistoricalGcBatchSize => &[ValueType::Uint8],
256			Self::HistoricalGcInterval => &[ValueType::Duration],
257			Self::CdcTtlDuration => &[ValueType::Duration],
258			Self::CdcCompactInterval => &[ValueType::Duration],
259			Self::CdcCompactBlockSize => &[ValueType::Uint8],
260			Self::CdcCompactSafetyLag => &[ValueType::Uint8],
261			Self::CdcCompactMaxBlocksPerTick => &[ValueType::Uint8],
262			Self::CdcCompactBlockCacheCapacity => &[ValueType::Uint8],
263			Self::CdcCompactZstdLevel => &[ValueType::Uint1],
264			Self::CdcRecentCacheCapacity => &[ValueType::Uint8],
265			Self::MultiReadBufferCapacity => &[ValueType::Uint8],
266			Self::FlowTick => &[ValueType::Duration],
267			Self::ThreadsAsync => &[ValueType::Uint2],
268			Self::ThreadsSystem => &[ValueType::Uint2],
269			Self::ThreadsQuery => &[ValueType::Uint2],
270			Self::ThreadsCommit => &[ValueType::Uint2],
271			Self::ThreadsBackground => &[ValueType::Uint2],
272			Self::RuntimeMetricsInterval => &[ValueType::Duration],
273			Self::MetricFlushInterval => &[ValueType::Duration],
274		}
275	}
276
277	pub fn is_optional(&self) -> bool {
278		match self {
279			Self::OracleWindowSize => false,
280			Self::OracleWaterMark => false,
281			Self::QueryRowBatchSize => false,
282			Self::RowTtlScanBatchSize => false,
283			Self::RowTtlScanInterval => false,
284			Self::OperatorTtlScanBatchSize => false,
285			Self::OperatorTtlScanInterval => false,
286			Self::HistoricalGcBatchSize => false,
287			Self::HistoricalGcInterval => false,
288			Self::CdcTtlDuration => true,
289			Self::CdcCompactInterval => false,
290			Self::CdcCompactBlockSize => false,
291			Self::CdcCompactSafetyLag => false,
292			Self::CdcCompactMaxBlocksPerTick => false,
293			Self::CdcCompactBlockCacheCapacity => false,
294			Self::CdcCompactZstdLevel => false,
295			Self::CdcRecentCacheCapacity => false,
296			Self::MultiReadBufferCapacity => false,
297			Self::FlowTick => false,
298			Self::ThreadsAsync => false,
299			Self::ThreadsSystem => false,
300			Self::ThreadsQuery => false,
301			Self::ThreadsCommit => false,
302			Self::ThreadsBackground => false,
303			Self::RuntimeMetricsInterval => true,
304			Self::MetricFlushInterval => false,
305		}
306	}
307
308	fn validate_canonical(&self, value: &Value) -> Result<(), String> {
309		match self {
310			Self::CdcTtlDuration => match value {
311				Value::None {
312					..
313				} => Ok(()),
314				Value::Duration(d) => {
315					if d.is_positive() {
316						Ok(())
317					} else {
318						Err("CDC_TTL_DURATION must be greater than zero".to_string())
319					}
320				}
321				_ => Ok(()),
322			},
323			Self::CdcCompactInterval => match value {
324				Value::Duration(d) => {
325					if d.is_positive() {
326						Ok(())
327					} else {
328						Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
329					}
330				}
331				_ => Ok(()),
332			},
333			Self::CdcCompactBlockSize => match value {
334				Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
335				_ => Ok(()),
336			},
337			Self::QueryRowBatchSize => match value {
338				Value::Uint2(0) => Err("QUERY_ROW_BATCH_SIZE must be greater than zero".to_string()),
339				_ => Ok(()),
340			},
341			Self::CdcCompactBlockCacheCapacity => match value {
342				Value::Uint8(0) => {
343					Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
344				}
345				_ => Ok(()),
346			},
347			Self::MultiReadBufferCapacity => match value {
348				Value::Uint8(0) => {
349					Err("MULTI_READ_BUFFER_CAPACITY must be greater than zero".to_string())
350				}
351				_ => Ok(()),
352			},
353			Self::CdcCompactZstdLevel => match value {
354				Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
355				Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
356				_ => Ok(()),
357			},
358			Self::HistoricalGcBatchSize => match value {
359				Value::Uint8(0) => {
360					Err("HISTORICAL_GC_BATCH_SIZE must be greater than zero".to_string())
361				}
362				_ => Ok(()),
363			},
364			Self::HistoricalGcInterval => match value {
365				Value::Duration(d) => {
366					if d.is_positive() {
367						Ok(())
368					} else {
369						Err("HISTORICAL_GC_INTERVAL must be greater than zero".to_string())
370					}
371				}
372				_ => Ok(()),
373			},
374			Self::FlowTick => match value {
375				Value::Duration(d) => {
376					if d.is_positive() {
377						Ok(())
378					} else {
379						Err("FLOW_TICK must be greater than zero".to_string())
380					}
381				}
382				_ => Ok(()),
383			},
384			Self::ThreadsAsync => match value {
385				Value::Uint2(0) => Err("THREADS_ASYNC must be greater than zero".to_string()),
386				_ => Ok(()),
387			},
388			Self::ThreadsSystem => match value {
389				Value::Uint2(0) => Err("THREADS_SYSTEM must be greater than zero".to_string()),
390				_ => Ok(()),
391			},
392			Self::ThreadsQuery => match value {
393				Value::Uint2(0) => Err("THREADS_QUERY must be greater than zero".to_string()),
394				_ => Ok(()),
395			},
396			Self::ThreadsCommit => match value {
397				Value::Uint2(0) => Err("THREADS_COMMIT must be greater than zero".to_string()),
398				_ => Ok(()),
399			},
400			Self::ThreadsBackground => match value {
401				Value::Uint2(0) => Err("THREADS_BACKGROUND must be greater than zero".to_string()),
402				_ => Ok(()),
403			},
404			Self::RuntimeMetricsInterval => match value {
405				Value::None {
406					..
407				} => Ok(()),
408				Value::Duration(d) => {
409					if d.is_positive() {
410						Ok(())
411					} else {
412						Err("RUNTIME_METRICS_INTERVAL must be greater than zero".to_string())
413					}
414				}
415				_ => Ok(()),
416			},
417			Self::MetricFlushInterval => match value {
418				Value::Duration(d) => {
419					if d.is_positive() {
420						Ok(())
421					} else {
422						Err("METRIC_FLUSH_INTERVAL must be greater than zero".to_string())
423					}
424				}
425				_ => Ok(()),
426			},
427			_ => Ok(()),
428		}
429	}
430
431	pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
432		if let Value::None {
433			inner,
434		} = &value
435		{
436			if self.is_optional() && self.expected_types().contains(inner) {
437				return Ok(value);
438			}
439			return Err(AcceptError::TypeMismatch {
440				expected: self.expected_types().to_vec(),
441				actual: value.get_type(),
442			});
443		}
444
445		let canonical = if self.expected_types().contains(&value.get_type()) {
446			value
447		} else {
448			try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
449				expected: self.expected_types().to_vec(),
450				actual: value.get_type(),
451			})?
452		};
453
454		self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
455		Ok(canonical)
456	}
457}
458
459fn try_coerce_numeric(value: &Value, expected: &[ValueType]) -> Option<Value> {
460	for target in expected {
461		let coerced = match target {
462			ValueType::Uint1 => {
463				value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
464			}
465			ValueType::Uint2 => {
466				value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
467			}
468			ValueType::Uint4 => {
469				value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
470			}
471			ValueType::Uint8 => {
472				value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
473			}
474			ValueType::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
475			ValueType::Int1 => {
476				value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8))
477			}
478			ValueType::Int2 => {
479				value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
480			}
481			ValueType::Int4 => {
482				value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
483			}
484			ValueType::Int8 => {
485				value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
486			}
487			ValueType::Int16 => {
488				value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
489			}
490			ValueType::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
491			ValueType::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
492			ValueType::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
493			ValueType::Float4 => {
494				value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
495			}
496			ValueType::Float8 => {
497				value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
498			}
499			ValueType::Duration => value
500				.to_usize()
501				.and_then(|v| Duration::from_seconds(v as i64).ok())
502				.map(Value::Duration),
503			_ => None,
504		};
505		if coerced.is_some() {
506			return coerced;
507		}
508	}
509	None
510}
511
512impl fmt::Display for ConfigKey {
513	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
514		match self {
515			Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
516			Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
517			Self::QueryRowBatchSize => write!(f, "QUERY_ROW_BATCH_SIZE"),
518			Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
519			Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
520			Self::OperatorTtlScanBatchSize => write!(f, "OPERATOR_TTL_SCAN_BATCH_SIZE"),
521			Self::OperatorTtlScanInterval => write!(f, "OPERATOR_TTL_SCAN_INTERVAL"),
522			Self::HistoricalGcBatchSize => write!(f, "HISTORICAL_GC_BATCH_SIZE"),
523			Self::HistoricalGcInterval => write!(f, "HISTORICAL_GC_INTERVAL"),
524			Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
525			Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
526			Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
527			Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
528			Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
529			Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
530			Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
531			Self::CdcRecentCacheCapacity => write!(f, "CDC_RECENT_CACHE_CAPACITY"),
532			Self::MultiReadBufferCapacity => write!(f, "MULTI_READ_BUFFER_CAPACITY"),
533			Self::FlowTick => write!(f, "FLOW_TICK"),
534			Self::ThreadsAsync => write!(f, "THREADS_ASYNC"),
535			Self::ThreadsSystem => write!(f, "THREADS_SYSTEM"),
536			Self::ThreadsQuery => write!(f, "THREADS_QUERY"),
537			Self::ThreadsCommit => write!(f, "THREADS_COMMIT"),
538			Self::ThreadsBackground => write!(f, "THREADS_BACKGROUND"),
539			Self::RuntimeMetricsInterval => write!(f, "RUNTIME_METRICS_INTERVAL"),
540			Self::MetricFlushInterval => write!(f, "METRIC_FLUSH_INTERVAL"),
541		}
542	}
543}
544
545impl FromStr for ConfigKey {
546	type Err = String;
547
548	fn from_str(s: &str) -> Result<Self, Self::Err> {
549		match s {
550			"ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
551			"ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
552			"QUERY_ROW_BATCH_SIZE" => Ok(Self::QueryRowBatchSize),
553			"ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
554			"ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
555			"OPERATOR_TTL_SCAN_BATCH_SIZE" => Ok(Self::OperatorTtlScanBatchSize),
556			"OPERATOR_TTL_SCAN_INTERVAL" => Ok(Self::OperatorTtlScanInterval),
557			"HISTORICAL_GC_BATCH_SIZE" => Ok(Self::HistoricalGcBatchSize),
558			"HISTORICAL_GC_INTERVAL" => Ok(Self::HistoricalGcInterval),
559			"CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
560			"CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
561			"CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
562			"CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
563			"CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
564			"CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
565			"CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
566			"CDC_RECENT_CACHE_CAPACITY" => Ok(Self::CdcRecentCacheCapacity),
567			"MULTI_READ_BUFFER_CAPACITY" => Ok(Self::MultiReadBufferCapacity),
568			"FLOW_TICK" => Ok(Self::FlowTick),
569			"THREADS_ASYNC" => Ok(Self::ThreadsAsync),
570			"THREADS_SYSTEM" => Ok(Self::ThreadsSystem),
571			"THREADS_QUERY" => Ok(Self::ThreadsQuery),
572			"THREADS_COMMIT" => Ok(Self::ThreadsCommit),
573			"THREADS_BACKGROUND" => Ok(Self::ThreadsBackground),
574			"RUNTIME_METRICS_INTERVAL" => Ok(Self::RuntimeMetricsInterval),
575			"METRIC_FLUSH_INTERVAL" => Ok(Self::MetricFlushInterval),
576			_ => Err(format!("Unknown system configuration key: {}", s)),
577		}
578	}
579}
580
581#[derive(Debug, Clone)]
582pub struct Config {
583	pub key: ConfigKey,
584
585	pub value: Value,
586
587	pub default_value: Value,
588
589	pub description: &'static str,
590
591	pub requires_restart: bool,
592}
593
594pub trait GetConfig: Send + Sync {
595	fn get_config(&self, key: ConfigKey) -> Value;
596
597	fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
598
599	fn get_config_uint8(&self, key: ConfigKey) -> u64 {
600		let val = self.get_config(key);
601		match val {
602			Value::Uint8(v) => v,
603			v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
604		}
605	}
606
607	fn get_config_uint1(&self, key: ConfigKey) -> u8 {
608		let val = self.get_config(key);
609		match val {
610			Value::Uint1(v) => v,
611			v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
612		}
613	}
614
615	fn get_config_uint2(&self, key: ConfigKey) -> u16 {
616		let val = self.get_config(key);
617		match val {
618			Value::Uint2(v) => v,
619			v => panic!("config key '{}' expected Uint2, got {:?}", key, v),
620		}
621	}
622
623	fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
624		let val = self.get_config(key);
625		match val {
626			Value::Duration(v) => {
627				let total_nanos =
628					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
629				StdDuration::from_nanos(total_nanos.max(0) as u64)
630			}
631			v => panic!("config key '{}' expected Duration, got {:?}", key, v),
632		}
633	}
634
635	fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
636		match self.get_config(key) {
637			Value::None {
638				..
639			} => None,
640			Value::Duration(v) => {
641				let total_nanos =
642					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
643				Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
644			}
645			v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
646		}
647	}
648}
649
650#[cfg(test)]
651mod tests {
652	use super::*;
653
654	#[test]
655	fn test_cdc_ttl_default_is_typed_null() {
656		// Defaulting to Value::None means "TTL not configured" - producer skips cleanup.
657		let default = ConfigKey::CdcTtlDuration.default_value();
658		assert!(matches!(
659			default,
660			Value::None {
661				inner: ValueType::Duration
662			}
663		));
664	}
665
666	#[test]
667	fn test_cdc_ttl_accept_passes_typed_null() {
668		let none = Value::None {
669			inner: ValueType::Duration,
670		};
671		let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
672		assert_eq!(v, none);
673	}
674
675	#[test]
676	fn test_cdc_ttl_accept_passes_positive_duration() {
677		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
678		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
679
680		let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
681		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
682	}
683
684	#[test]
685	fn test_cdc_ttl_accept_rejects_zero() {
686		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
687		match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
688			AcceptError::InvalidValue(reason) => {
689				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
690			}
691			other => panic!("expected InvalidValue, got {other:?}"),
692		}
693	}
694
695	#[test]
696	fn test_cdc_ttl_accept_rejects_negative() {
697		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
698		assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
699	}
700
701	#[test]
702	fn test_other_keys_accept_in_type_values() {
703		// Keys without bespoke validation should accept any in-type value.
704		assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
705		assert!(ConfigKey::RowTtlScanInterval
706			.accept(Value::Duration(Duration::from_seconds(0).unwrap()))
707			.is_ok());
708	}
709
710	#[test]
711	fn test_cdc_ttl_round_trips_through_display_and_from_str() {
712		let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
713		assert_eq!(key, ConfigKey::CdcTtlDuration);
714		assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
715	}
716
717	#[test]
718	fn test_cdc_ttl_in_all() {
719		assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
720	}
721
722	#[test]
723	fn test_all_contains_every_compact_key_and_has_expected_len() {
724		let all = ConfigKey::all();
725		assert_eq!(all.len(), 26);
726		assert!(all.contains(&ConfigKey::CdcCompactInterval));
727		assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
728		assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
729		assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
730		assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
731		assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
732		assert!(all.contains(&ConfigKey::CdcRecentCacheCapacity));
733		assert!(all.contains(&ConfigKey::MultiReadBufferCapacity));
734		assert!(all.contains(&ConfigKey::QueryRowBatchSize));
735		assert!(all.contains(&ConfigKey::ThreadsAsync));
736		assert!(all.contains(&ConfigKey::ThreadsSystem));
737		assert!(all.contains(&ConfigKey::ThreadsQuery));
738		assert!(all.contains(&ConfigKey::ThreadsCommit));
739		assert!(all.contains(&ConfigKey::ThreadsBackground));
740		assert!(all.contains(&ConfigKey::RuntimeMetricsInterval));
741		assert!(all.contains(&ConfigKey::MetricFlushInterval));
742	}
743
744	#[test]
745	fn test_runtime_metrics_interval_metadata() {
746		// Single optional Duration knob: default on (5s), none disables the history sampler.
747		assert_eq!(
748			ConfigKey::RuntimeMetricsInterval.default_value(),
749			Value::Duration(Duration::from_seconds(5).unwrap())
750		);
751		assert_eq!(ConfigKey::RuntimeMetricsInterval.expected_types(), &[ValueType::Duration]);
752		assert!(ConfigKey::RuntimeMetricsInterval.is_optional());
753	}
754
755	#[test]
756	fn test_runtime_metrics_interval_round_trip() {
757		assert_eq!("RUNTIME_METRICS_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::RuntimeMetricsInterval);
758		assert_eq!(format!("{}", ConfigKey::RuntimeMetricsInterval), "RUNTIME_METRICS_INTERVAL");
759	}
760
761	#[test]
762	fn test_runtime_metrics_interval_accepts_none_and_positive_rejects_zero() {
763		let none = Value::None {
764			inner: ValueType::Duration,
765		};
766		assert_eq!(ConfigKey::RuntimeMetricsInterval.accept(none.clone()).unwrap(), none);
767
768		let five = Value::Duration(Duration::from_seconds(5).unwrap());
769		assert_eq!(ConfigKey::RuntimeMetricsInterval.accept(five.clone()).unwrap(), five);
770
771		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
772		assert!(matches!(ConfigKey::RuntimeMetricsInterval.accept(zero), Err(AcceptError::InvalidValue(_))));
773	}
774
775	#[test]
776	fn test_metric_flush_interval_metadata() {
777		// Always-on (non-optional) Duration knob defaulting to the historical 10s flush cadence.
778		assert_eq!(
779			ConfigKey::MetricFlushInterval.default_value(),
780			Value::Duration(Duration::from_seconds(10).unwrap())
781		);
782		assert_eq!(ConfigKey::MetricFlushInterval.expected_types(), &[ValueType::Duration]);
783		assert!(!ConfigKey::MetricFlushInterval.is_optional());
784		assert!(!ConfigKey::MetricFlushInterval.requires_restart());
785	}
786
787	#[test]
788	fn test_metric_flush_interval_round_trip() {
789		assert_eq!("METRIC_FLUSH_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::MetricFlushInterval);
790		assert_eq!(format!("{}", ConfigKey::MetricFlushInterval), "METRIC_FLUSH_INTERVAL");
791	}
792
793	#[test]
794	fn test_metric_flush_interval_accepts_positive_rejects_zero() {
795		let ten = Value::Duration(Duration::from_seconds(10).unwrap());
796		assert_eq!(ConfigKey::MetricFlushInterval.accept(ten.clone()).unwrap(), ten);
797
798		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
799		assert!(matches!(ConfigKey::MetricFlushInterval.accept(zero), Err(AcceptError::InvalidValue(_))));
800	}
801
802	#[test]
803	fn test_cdc_recent_cache_capacity_round_trip() {
804		assert_eq!(
805			"CDC_RECENT_CACHE_CAPACITY".parse::<ConfigKey>().unwrap(),
806			ConfigKey::CdcRecentCacheCapacity
807		);
808		assert_eq!(format!("{}", ConfigKey::CdcRecentCacheCapacity), "CDC_RECENT_CACHE_CAPACITY");
809	}
810
811	#[test]
812	fn test_cdc_recent_cache_capacity_metadata() {
813		assert_eq!(ConfigKey::CdcRecentCacheCapacity.default_value(), Value::Uint8(128));
814		assert_eq!(ConfigKey::CdcRecentCacheCapacity.expected_types(), &[ValueType::Uint8]);
815		assert!(ConfigKey::CdcRecentCacheCapacity.requires_restart());
816		assert!(!ConfigKey::CdcRecentCacheCapacity.is_optional());
817	}
818
819	#[test]
820	fn test_multi_read_cache_capacity_round_trip() {
821		assert_eq!(
822			"MULTI_READ_BUFFER_CAPACITY".parse::<ConfigKey>().unwrap(),
823			ConfigKey::MultiReadBufferCapacity
824		);
825		assert_eq!(format!("{}", ConfigKey::MultiReadBufferCapacity), "MULTI_READ_BUFFER_CAPACITY");
826	}
827
828	#[test]
829	fn test_multi_read_cache_capacity_metadata_and_rejects_zero() {
830		assert_eq!(ConfigKey::MultiReadBufferCapacity.default_value(), Value::Uint8(4096));
831		assert_eq!(ConfigKey::MultiReadBufferCapacity.expected_types(), &[ValueType::Uint8]);
832		assert!(ConfigKey::MultiReadBufferCapacity.requires_restart());
833		assert!(!ConfigKey::MultiReadBufferCapacity.is_optional());
834		match ConfigKey::MultiReadBufferCapacity.accept(Value::Uint8(0)).unwrap_err() {
835			AcceptError::InvalidValue(reason) => {
836				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
837			}
838			other => panic!("expected InvalidValue, got {other:?}"),
839		}
840	}
841
842	#[test]
843	fn test_threads_keys_round_trip() {
844		assert_eq!("THREADS_ASYNC".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsAsync);
845		assert_eq!("THREADS_SYSTEM".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsSystem);
846		assert_eq!("THREADS_QUERY".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsQuery);
847		assert_eq!("THREADS_COMMIT".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsCommit);
848		assert_eq!("THREADS_BACKGROUND".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsBackground);
849		assert_eq!(format!("{}", ConfigKey::ThreadsAsync), "THREADS_ASYNC");
850		assert_eq!(format!("{}", ConfigKey::ThreadsSystem), "THREADS_SYSTEM");
851		assert_eq!(format!("{}", ConfigKey::ThreadsQuery), "THREADS_QUERY");
852		assert_eq!(format!("{}", ConfigKey::ThreadsCommit), "THREADS_COMMIT");
853		assert_eq!(format!("{}", ConfigKey::ThreadsBackground), "THREADS_BACKGROUND");
854	}
855
856	#[test]
857	fn test_threads_defaults() {
858		assert_eq!(ConfigKey::ThreadsAsync.default_value(), Value::Uint2(1));
859		assert_eq!(ConfigKey::ThreadsSystem.default_value(), Value::Uint2(2));
860		assert_eq!(ConfigKey::ThreadsQuery.default_value(), Value::Uint2(1));
861		assert_eq!(ConfigKey::ThreadsCommit.default_value(), Value::Uint2(2));
862		assert_eq!(ConfigKey::ThreadsBackground.default_value(), Value::Uint2(1));
863	}
864
865	#[test]
866	fn test_threads_reject_zero() {
867		for key in [
868			ConfigKey::ThreadsAsync,
869			ConfigKey::ThreadsSystem,
870			ConfigKey::ThreadsQuery,
871			ConfigKey::ThreadsCommit,
872			ConfigKey::ThreadsBackground,
873		] {
874			match key.accept(Value::Uint2(0)).unwrap_err() {
875				AcceptError::InvalidValue(reason) => {
876					assert!(
877						reason.contains("greater than zero"),
878						"{key}: unexpected reason: {reason}"
879					);
880				}
881				other => panic!("{key}: expected InvalidValue, got {other:?}"),
882			}
883		}
884	}
885
886	#[test]
887	fn test_threads_accept_positive() {
888		assert_eq!(ConfigKey::ThreadsAsync.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
889		assert_eq!(ConfigKey::ThreadsSystem.accept(Value::Uint2(8)).unwrap(), Value::Uint2(8));
890		assert_eq!(ConfigKey::ThreadsQuery.accept(Value::Uint2(16)).unwrap(), Value::Uint2(16));
891		assert_eq!(ConfigKey::ThreadsCommit.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
892		assert_eq!(ConfigKey::ThreadsBackground.accept(Value::Uint2(2)).unwrap(), Value::Uint2(2));
893	}
894
895	#[test]
896	fn test_threads_coerce_int4_to_uint2() {
897		let v = ConfigKey::ThreadsQuery.accept(Value::Int4(8)).unwrap();
898		assert_eq!(v, Value::Uint2(8));
899	}
900
901	#[test]
902	fn test_threads_require_restart() {
903		assert!(ConfigKey::ThreadsAsync.requires_restart());
904		assert!(ConfigKey::ThreadsSystem.requires_restart());
905		assert!(ConfigKey::ThreadsQuery.requires_restart());
906		assert!(ConfigKey::ThreadsCommit.requires_restart());
907		assert!(ConfigKey::ThreadsBackground.requires_restart());
908	}
909
910	#[test]
911	fn test_query_row_batch_size_default_is_uint2_32() {
912		assert_eq!(ConfigKey::QueryRowBatchSize.default_value(), Value::Uint2(32));
913	}
914
915	#[test]
916	fn test_query_row_batch_size_round_trips_through_display_and_from_str() {
917		let key: ConfigKey = "QUERY_ROW_BATCH_SIZE".parse().unwrap();
918		assert_eq!(key, ConfigKey::QueryRowBatchSize);
919		assert_eq!(format!("{}", ConfigKey::QueryRowBatchSize), "QUERY_ROW_BATCH_SIZE");
920	}
921
922	#[test]
923	fn test_query_row_batch_size_accept_rejects_zero() {
924		match ConfigKey::QueryRowBatchSize.accept(Value::Uint2(0)).unwrap_err() {
925			AcceptError::InvalidValue(reason) => {
926				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
927			}
928			other => panic!("expected InvalidValue, got {other:?}"),
929		}
930	}
931
932	#[test]
933	fn test_query_row_batch_size_accept_passes_positive() {
934		assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1)).unwrap(), Value::Uint2(1));
935		assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1024)).unwrap(), Value::Uint2(1024));
936	}
937
938	#[test]
939	fn test_query_row_batch_size_accept_rejects_zero_after_coercion() {
940		match ConfigKey::QueryRowBatchSize.accept(Value::Int4(0)).unwrap_err() {
941			AcceptError::InvalidValue(reason) => {
942				assert!(reason.contains("greater than zero"));
943			}
944			other => panic!("expected InvalidValue, got {other:?}"),
945		}
946	}
947
948	#[test]
949	fn test_query_row_batch_size_coerces_int4_to_uint2() {
950		let v = ConfigKey::QueryRowBatchSize.accept(Value::Int4(64)).unwrap();
951		assert_eq!(v, Value::Uint2(64));
952	}
953
954	#[test]
955	fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
956		let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
957		assert_eq!(key, ConfigKey::CdcCompactInterval);
958		assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
959	}
960
961	#[test]
962	fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
963		let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
964		assert_eq!(key, ConfigKey::CdcCompactBlockSize);
965		assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
966	}
967
968	#[test]
969	fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
970		let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
971		assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
972		assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
973	}
974
975	#[test]
976	fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
977		let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
978		assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
979		assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
980	}
981
982	#[test]
983	fn test_cdc_compact_interval_default_is_duration() {
984		assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
985	}
986
987	#[test]
988	fn test_cdc_compact_block_size_default_is_uint8_1024() {
989		assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
990	}
991
992	#[test]
993	fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
994		assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
995	}
996
997	#[test]
998	fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
999		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
1000	}
1001
1002	#[test]
1003	fn test_cdc_compact_interval_accept_passes_positive_duration() {
1004		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
1005		assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
1006	}
1007
1008	#[test]
1009	fn test_cdc_compact_interval_accept_rejects_zero() {
1010		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
1011		match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
1012			AcceptError::InvalidValue(reason) => {
1013				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1014			}
1015			other => panic!("expected InvalidValue, got {other:?}"),
1016		}
1017	}
1018
1019	#[test]
1020	fn test_cdc_compact_interval_accept_rejects_negative() {
1021		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
1022		assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
1023	}
1024
1025	#[test]
1026	fn test_cdc_compact_block_size_accept_rejects_zero() {
1027		match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
1028			AcceptError::InvalidValue(reason) => {
1029				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1030			}
1031			other => panic!("expected InvalidValue, got {other:?}"),
1032		}
1033	}
1034
1035	#[test]
1036	fn test_cdc_compact_block_size_accept_passes_positive() {
1037		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
1038		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
1039	}
1040
1041	#[test]
1042	fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
1043		assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
1044		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
1045	}
1046
1047	#[test]
1048	fn test_accept_coerces_int4_to_uint8_for_block_size() {
1049		// SET CONFIG CDC_COMPACT_BLOCK_SIZE = 1024 (parsed as Int4) becomes Uint8(1024).
1050		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
1051		assert_eq!(v, Value::Uint8(1024));
1052	}
1053
1054	#[test]
1055	fn test_accept_coerces_int8_to_uint8_for_block_size() {
1056		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
1057		assert_eq!(v, Value::Uint8(2048));
1058	}
1059
1060	#[test]
1061	fn test_accept_rejects_zero_after_coercion() {
1062		// Int4(0) coerces to Uint8(0), then validate_canonical rejects it.
1063		match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
1064			AcceptError::InvalidValue(reason) => {
1065				assert!(reason.contains("greater than zero"));
1066			}
1067			other => panic!("expected InvalidValue, got {other:?}"),
1068		}
1069	}
1070
1071	#[test]
1072	fn test_accept_rejects_negative_int_for_uint8_key() {
1073		// to_usize() returns None for negatives -> all coercion arms fail -> TypeMismatch.
1074		assert!(matches!(
1075			ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
1076			Err(AcceptError::TypeMismatch { .. })
1077		));
1078	}
1079
1080	#[test]
1081	fn test_accept_coerces_int_to_duration_via_seconds() {
1082		// SET CONFIG CDC_COMPACT_INTERVAL = 60 (Int4) -> Duration(60s).
1083		let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
1084		assert!(matches!(v, Value::Duration(_)));
1085	}
1086
1087	#[test]
1088	fn test_accept_idempotent_on_canonical_uint8() {
1089		let canonical = Value::Uint8(42);
1090		assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
1091	}
1092
1093	#[test]
1094	fn test_accept_idempotent_on_canonical_duration() {
1095		let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
1096		assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
1097	}
1098
1099	#[test]
1100	fn test_accept_rejects_typed_null_for_non_optional_key() {
1101		let err = ConfigKey::CdcCompactBlockSize
1102			.accept(Value::None {
1103				inner: ValueType::Uint8,
1104			})
1105			.unwrap_err();
1106		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
1107	}
1108
1109	#[test]
1110	fn test_accept_passes_typed_null_for_optional_key() {
1111		let none = Value::None {
1112			inner: ValueType::Duration,
1113		};
1114		assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
1115	}
1116
1117	#[test]
1118	fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
1119		// Optional key still rejects typed-null whose inner doesn't match expected_types.
1120		let err = ConfigKey::CdcTtlDuration
1121			.accept(Value::None {
1122				inner: ValueType::Uint8,
1123			})
1124			.unwrap_err();
1125		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
1126	}
1127
1128	#[test]
1129	fn test_historical_gc_keys_round_trip() {
1130		assert_eq!("HISTORICAL_GC_BATCH_SIZE".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcBatchSize);
1131		assert_eq!("HISTORICAL_GC_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcInterval);
1132		assert_eq!(format!("{}", ConfigKey::HistoricalGcBatchSize), "HISTORICAL_GC_BATCH_SIZE");
1133		assert_eq!(format!("{}", ConfigKey::HistoricalGcInterval), "HISTORICAL_GC_INTERVAL");
1134	}
1135
1136	#[test]
1137	fn test_historical_gc_defaults() {
1138		assert_eq!(ConfigKey::HistoricalGcBatchSize.default_value(), Value::Uint8(50_000));
1139		assert!(matches!(ConfigKey::HistoricalGcInterval.default_value(), Value::Duration(_)));
1140	}
1141
1142	#[test]
1143	fn test_historical_gc_batch_size_rejects_zero() {
1144		match ConfigKey::HistoricalGcBatchSize.accept(Value::Uint8(0)).unwrap_err() {
1145			AcceptError::InvalidValue(reason) => {
1146				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1147			}
1148			other => panic!("expected InvalidValue, got {other:?}"),
1149		}
1150	}
1151
1152	#[test]
1153	fn test_historical_gc_interval_rejects_zero() {
1154		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
1155		match ConfigKey::HistoricalGcInterval.accept(zero).unwrap_err() {
1156			AcceptError::InvalidValue(reason) => {
1157				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
1158			}
1159			other => panic!("expected InvalidValue, got {other:?}"),
1160		}
1161	}
1162}