Skip to main content

reifydb_core/interface/catalog/
config.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_type::value::{
7	Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8	r#type::Type, uint::Uint,
9};
10
11use crate::common::CommitVersion;
12
13/// Error returned by `ConfigKey::accept`. Callers map this to their domain error.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum AcceptError {
16	/// The value's Type is not in `expected_types()` and no lossless coercion succeeded.
17	TypeMismatch {
18		expected: Vec<Type>,
19		actual: Type,
20	},
21	/// Coercion succeeded (or wasn't needed) but the canonical value violated
22	/// the key's domain rules (e.g., zero where positive is required).
23	InvalidValue(String),
24}
25
26impl fmt::Display for AcceptError {
27	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28		match self {
29			Self::TypeMismatch {
30				expected,
31				actual,
32			} => {
33				write!(f, "expected one of {:?}, got {:?}", expected, actual)
34			}
35			Self::InvalidValue(reason) => write!(f, "{reason}"),
36		}
37	}
38}
39
40#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
41pub enum ConfigKey {
42	OracleWindowSize,
43	OracleWaterMark,
44	RowTtlScanBatchSize,
45	RowTtlScanInterval,
46	CdcTtlDuration,
47	CdcCompactInterval,
48	CdcCompactBlockSize,
49	CdcCompactSafetyLag,
50	CdcCompactMaxBlocksPerTick,
51	CdcCompactBlockCacheCapacity,
52	CdcCompactZstdLevel,
53}
54
55impl ConfigKey {
56	pub fn all() -> &'static [Self] {
57		&[
58			Self::OracleWindowSize,
59			Self::OracleWaterMark,
60			Self::RowTtlScanBatchSize,
61			Self::RowTtlScanInterval,
62			Self::CdcTtlDuration,
63			Self::CdcCompactInterval,
64			Self::CdcCompactBlockSize,
65			Self::CdcCompactSafetyLag,
66			Self::CdcCompactMaxBlocksPerTick,
67			Self::CdcCompactBlockCacheCapacity,
68			Self::CdcCompactZstdLevel,
69		]
70	}
71
72	pub fn default_value(&self) -> Value {
73		match self {
74			Self::OracleWindowSize => Value::Uint8(500),
75			Self::OracleWaterMark => Value::Uint8(20),
76			Self::RowTtlScanBatchSize => Value::Uint8(10000),
77			Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
78			Self::CdcTtlDuration => Value::None {
79				inner: Type::Duration,
80			},
81			Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
82			Self::CdcCompactBlockSize => Value::Uint8(1024),
83			Self::CdcCompactSafetyLag => Value::Uint8(1024),
84			Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
85			Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
86			Self::CdcCompactZstdLevel => Value::Uint1(7),
87		}
88	}
89
90	pub fn description(&self) -> &'static str {
91		match self {
92			Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
93			Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
94			Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
95			Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
96			Self::CdcTtlDuration => {
97				"Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
98				 when set, must be > 0 and entries older than this duration are evicted regardless \
99				 of consumer state."
100			}
101			Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
102			Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
103			Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
104			Self::CdcCompactMaxBlocksPerTick => {
105				"Upper bound on consecutive blocks produced per actor tick."
106			}
107			Self::CdcCompactBlockCacheCapacity => {
108				"Number of decompressed CDC blocks held in the in-memory LRU cache."
109			}
110			Self::CdcCompactZstdLevel => {
111				"Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
112				 slower compression. Decompression cost is independent of level."
113			}
114		}
115	}
116
117	pub fn requires_restart(&self) -> bool {
118		match self {
119			Self::OracleWindowSize => false,
120			Self::OracleWaterMark => false,
121			Self::RowTtlScanBatchSize => false,
122			Self::RowTtlScanInterval => false,
123			Self::CdcTtlDuration => false,
124			Self::CdcCompactInterval => false,
125			Self::CdcCompactBlockSize => false,
126			Self::CdcCompactSafetyLag => false,
127			Self::CdcCompactMaxBlocksPerTick => false,
128			Self::CdcCompactBlockCacheCapacity => true,
129			Self::CdcCompactZstdLevel => false,
130		}
131	}
132
133	pub fn expected_types(&self) -> &'static [Type] {
134		match self {
135			Self::OracleWindowSize => &[Type::Uint8],
136			Self::OracleWaterMark => &[Type::Uint8],
137			Self::RowTtlScanBatchSize => &[Type::Uint8],
138			Self::RowTtlScanInterval => &[Type::Duration],
139			Self::CdcTtlDuration => &[Type::Duration],
140			Self::CdcCompactInterval => &[Type::Duration],
141			Self::CdcCompactBlockSize => &[Type::Uint8],
142			Self::CdcCompactSafetyLag => &[Type::Uint8],
143			Self::CdcCompactMaxBlocksPerTick => &[Type::Uint8],
144			Self::CdcCompactBlockCacheCapacity => &[Type::Uint8],
145			Self::CdcCompactZstdLevel => &[Type::Uint1],
146		}
147	}
148
149	/// Whether this key may be unset to a typed-null `Value::None`.
150	///
151	/// Optional keys treat `Value::None { inner }` as valid as long as `inner` matches
152	/// `expected_types`. Non-optional keys reject any `Value::None`.
153	pub fn is_optional(&self) -> bool {
154		match self {
155			Self::OracleWindowSize => false,
156			Self::OracleWaterMark => false,
157			Self::RowTtlScanBatchSize => false,
158			Self::RowTtlScanInterval => false,
159			Self::CdcTtlDuration => true,
160			Self::CdcCompactInterval => false,
161			Self::CdcCompactBlockSize => false,
162			Self::CdcCompactSafetyLag => false,
163			Self::CdcCompactMaxBlocksPerTick => false,
164			Self::CdcCompactBlockCacheCapacity => false,
165			Self::CdcCompactZstdLevel => false,
166		}
167	}
168
169	/// Domain-rule check that assumes the value is already in canonical form
170	/// (its Type is in `expected_types()`). Called only by `accept` after
171	/// coercion. Bespoke variant matches such as `Value::Uint8(0)` are safe
172	/// here because non-canonical inputs cannot reach this point.
173	fn validate_canonical(&self, value: &Value) -> Result<(), String> {
174		match self {
175			Self::CdcTtlDuration => match value {
176				Value::None {
177					..
178				} => Ok(()),
179				Value::Duration(d) => {
180					if d.is_positive() {
181						Ok(())
182					} else {
183						Err("CDC_TTL_DURATION must be greater than zero".to_string())
184					}
185				}
186				_ => Ok(()),
187			},
188			Self::CdcCompactInterval => match value {
189				Value::Duration(d) => {
190					if d.is_positive() {
191						Ok(())
192					} else {
193						Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
194					}
195				}
196				_ => Ok(()),
197			},
198			Self::CdcCompactBlockSize => match value {
199				Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
200				_ => Ok(()),
201			},
202			Self::CdcCompactBlockCacheCapacity => match value {
203				Value::Uint8(0) => {
204					Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
205				}
206				_ => Ok(()),
207			},
208			Self::CdcCompactZstdLevel => match value {
209				Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
210				Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
211				_ => Ok(()),
212			},
213			_ => Ok(()),
214		}
215	}
216
217	pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
218		if let Value::None {
219			inner,
220		} = &value
221		{
222			if self.is_optional() && self.expected_types().contains(inner) {
223				return Ok(value);
224			}
225			return Err(AcceptError::TypeMismatch {
226				expected: self.expected_types().to_vec(),
227				actual: value.get_type(),
228			});
229		}
230
231		let canonical = if self.expected_types().contains(&value.get_type()) {
232			value
233		} else {
234			try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
235				expected: self.expected_types().to_vec(),
236				actual: value.get_type(),
237			})?
238		};
239
240		self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
241		Ok(canonical)
242	}
243}
244
245fn try_coerce_numeric(value: &Value, expected: &[Type]) -> Option<Value> {
246	for target in expected {
247		let coerced = match target {
248			Type::Uint1 => {
249				value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
250			}
251			Type::Uint2 => {
252				value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
253			}
254			Type::Uint4 => {
255				value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
256			}
257			Type::Uint8 => {
258				value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
259			}
260			Type::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
261			Type::Int1 => value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8)),
262			Type::Int2 => {
263				value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
264			}
265			Type::Int4 => {
266				value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
267			}
268			Type::Int8 => {
269				value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
270			}
271			Type::Int16 => {
272				value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
273			}
274			Type::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
275			Type::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
276			Type::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
277			Type::Float4 => {
278				value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
279			}
280			Type::Float8 => {
281				value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
282			}
283			Type::Duration => value
284				.to_usize()
285				.and_then(|v| Duration::from_seconds(v as i64).ok())
286				.map(Value::Duration),
287			_ => None,
288		};
289		if coerced.is_some() {
290			return coerced;
291		}
292	}
293	None
294}
295
296impl fmt::Display for ConfigKey {
297	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298		match self {
299			Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
300			Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
301			Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
302			Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
303			Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
304			Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
305			Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
306			Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
307			Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
308			Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
309			Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
310		}
311	}
312}
313
314impl FromStr for ConfigKey {
315	type Err = String;
316
317	fn from_str(s: &str) -> Result<Self, Self::Err> {
318		match s {
319			"ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
320			"ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
321			"ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
322			"ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
323			"CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
324			"CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
325			"CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
326			"CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
327			"CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
328			"CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
329			"CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
330			_ => Err(format!("Unknown system configuration key: {}", s)),
331		}
332	}
333}
334
335/// A configuration definition for a runtime-tunable database setting.
336///
337/// `value` is the currently active value (either the persisted override or the default).
338/// `default_value`, `description`, and `requires_restart` are compile-time constants
339/// provided at registration time - they are never stored to disk.
340#[derive(Debug, Clone)]
341pub struct Config {
342	/// System configuration key
343	pub key: ConfigKey,
344	/// Currently active value (persisted override or default)
345	pub value: Value,
346	/// Compile-time default value
347	pub default_value: Value,
348	/// Human-readable description
349	pub description: &'static str,
350	/// Whether changing this setting requires a database restart
351	pub requires_restart: bool,
352}
353
354/// Trait for fetching configuration values.
355pub trait GetConfig: Send + Sync {
356	/// Get the configuration value at the current snapshot.
357	fn get_config(&self, key: ConfigKey) -> Value;
358	/// Get the configuration value at a specific snapshot version.
359	fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
360
361	/// Get the current value as a u64. Panics if the value is not Value::Uint8.
362	fn get_config_uint8(&self, key: ConfigKey) -> u64 {
363		let val = self.get_config(key);
364		match val {
365			Value::Uint8(v) => v,
366			v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
367		}
368	}
369
370	/// Get the current value as a u8. Panics if the value is not Value::Uint1.
371	fn get_config_uint1(&self, key: ConfigKey) -> u8 {
372		let val = self.get_config(key);
373		match val {
374			Value::Uint1(v) => v,
375			v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
376		}
377	}
378
379	/// Get the current value as a std::time::Duration. Panics if the value is not Value::Duration.
380	fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
381		let val = self.get_config(key);
382		match val {
383			Value::Duration(v) => {
384				let total_nanos =
385					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
386				StdDuration::from_nanos(total_nanos.max(0) as u64)
387			}
388			v => panic!("config key '{}' expected Duration, got {:?}", key, v),
389		}
390	}
391
392	/// Get the current value as an `Option<StdDuration>` for keys that may be unset.
393	/// `None` for `Value::None`, `Some(d)` for `Value::Duration(d)`. Panics on any other variant.
394	fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
395		match self.get_config(key) {
396			Value::None {
397				..
398			} => None,
399			Value::Duration(v) => {
400				let total_nanos =
401					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
402				Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
403			}
404			v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
405		}
406	}
407}
408
409#[cfg(test)]
410mod tests {
411	use super::*;
412
413	#[test]
414	fn test_cdc_ttl_default_is_typed_null() {
415		// Defaulting to Value::None means "TTL not configured" - producer skips cleanup.
416		let default = ConfigKey::CdcTtlDuration.default_value();
417		assert!(matches!(
418			default,
419			Value::None {
420				inner: Type::Duration
421			}
422		));
423	}
424
425	#[test]
426	fn test_cdc_ttl_accept_passes_typed_null() {
427		let none = Value::None {
428			inner: Type::Duration,
429		};
430		let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
431		assert_eq!(v, none);
432	}
433
434	#[test]
435	fn test_cdc_ttl_accept_passes_positive_duration() {
436		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
437		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
438
439		let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
440		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
441	}
442
443	#[test]
444	fn test_cdc_ttl_accept_rejects_zero() {
445		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
446		match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
447			AcceptError::InvalidValue(reason) => {
448				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
449			}
450			other => panic!("expected InvalidValue, got {other:?}"),
451		}
452	}
453
454	#[test]
455	fn test_cdc_ttl_accept_rejects_negative() {
456		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
457		assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
458	}
459
460	#[test]
461	fn test_other_keys_accept_in_type_values() {
462		// Keys without bespoke validation should accept any in-type value.
463		assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
464		assert!(ConfigKey::RowTtlScanInterval
465			.accept(Value::Duration(Duration::from_seconds(0).unwrap()))
466			.is_ok());
467	}
468
469	#[test]
470	fn test_cdc_ttl_round_trips_through_display_and_from_str() {
471		let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
472		assert_eq!(key, ConfigKey::CdcTtlDuration);
473		assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
474	}
475
476	#[test]
477	fn test_cdc_ttl_in_all() {
478		assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
479	}
480
481	#[test]
482	fn test_all_contains_every_compact_key_and_has_expected_len() {
483		let all = ConfigKey::all();
484		assert_eq!(all.len(), 11);
485		assert!(all.contains(&ConfigKey::CdcCompactInterval));
486		assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
487		assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
488		assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
489		assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
490		assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
491	}
492
493	#[test]
494	fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
495		let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
496		assert_eq!(key, ConfigKey::CdcCompactInterval);
497		assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
498	}
499
500	#[test]
501	fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
502		let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
503		assert_eq!(key, ConfigKey::CdcCompactBlockSize);
504		assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
505	}
506
507	#[test]
508	fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
509		let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
510		assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
511		assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
512	}
513
514	#[test]
515	fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
516		let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
517		assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
518		assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
519	}
520
521	#[test]
522	fn test_cdc_compact_interval_default_is_duration() {
523		assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
524	}
525
526	#[test]
527	fn test_cdc_compact_block_size_default_is_uint8_1024() {
528		assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
529	}
530
531	#[test]
532	fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
533		assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
534	}
535
536	#[test]
537	fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
538		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
539	}
540
541	#[test]
542	fn test_cdc_compact_interval_accept_passes_positive_duration() {
543		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
544		assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
545	}
546
547	#[test]
548	fn test_cdc_compact_interval_accept_rejects_zero() {
549		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
550		match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
551			AcceptError::InvalidValue(reason) => {
552				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
553			}
554			other => panic!("expected InvalidValue, got {other:?}"),
555		}
556	}
557
558	#[test]
559	fn test_cdc_compact_interval_accept_rejects_negative() {
560		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
561		assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
562	}
563
564	#[test]
565	fn test_cdc_compact_block_size_accept_rejects_zero() {
566		match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
567			AcceptError::InvalidValue(reason) => {
568				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
569			}
570			other => panic!("expected InvalidValue, got {other:?}"),
571		}
572	}
573
574	#[test]
575	fn test_cdc_compact_block_size_accept_passes_positive() {
576		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
577		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
578	}
579
580	#[test]
581	fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
582		assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
583		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
584	}
585
586	#[test]
587	fn test_accept_coerces_int4_to_uint8_for_block_size() {
588		// SET CONFIG CDC_COMPACT_BLOCK_SIZE = 1024 (parsed as Int4) becomes Uint8(1024).
589		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
590		assert_eq!(v, Value::Uint8(1024));
591	}
592
593	#[test]
594	fn test_accept_coerces_int8_to_uint8_for_block_size() {
595		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
596		assert_eq!(v, Value::Uint8(2048));
597	}
598
599	#[test]
600	fn test_accept_rejects_zero_after_coercion() {
601		// Int4(0) coerces to Uint8(0), then validate_canonical rejects it.
602		match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
603			AcceptError::InvalidValue(reason) => {
604				assert!(reason.contains("greater than zero"));
605			}
606			other => panic!("expected InvalidValue, got {other:?}"),
607		}
608	}
609
610	#[test]
611	fn test_accept_rejects_negative_int_for_uint8_key() {
612		// to_usize() returns None for negatives -> all coercion arms fail -> TypeMismatch.
613		assert!(matches!(
614			ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
615			Err(AcceptError::TypeMismatch { .. })
616		));
617	}
618
619	#[test]
620	fn test_accept_coerces_int_to_duration_via_seconds() {
621		// SET CONFIG CDC_COMPACT_INTERVAL = 60 (Int4) -> Duration(60s).
622		let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
623		assert!(matches!(v, Value::Duration(_)));
624	}
625
626	#[test]
627	fn test_accept_idempotent_on_canonical_uint8() {
628		let canonical = Value::Uint8(42);
629		assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
630	}
631
632	#[test]
633	fn test_accept_idempotent_on_canonical_duration() {
634		let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
635		assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
636	}
637
638	#[test]
639	fn test_accept_rejects_typed_null_for_non_optional_key() {
640		let err = ConfigKey::CdcCompactBlockSize
641			.accept(Value::None {
642				inner: Type::Uint8,
643			})
644			.unwrap_err();
645		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
646	}
647
648	#[test]
649	fn test_accept_passes_typed_null_for_optional_key() {
650		let none = Value::None {
651			inner: Type::Duration,
652		};
653		assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
654	}
655
656	#[test]
657	fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
658		// Optional key still rejects typed-null whose inner doesn't match expected_types.
659		let err = ConfigKey::CdcTtlDuration
660			.accept(Value::None {
661				inner: Type::Uint8,
662			})
663			.unwrap_err();
664		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
665	}
666}