Skip to main content

reifydb_core/interface/catalog/
config.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_type::value::{
7	Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8	r#type::Type, uint::Uint,
9};
10
11use crate::common::CommitVersion;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum AcceptError {
15	TypeMismatch {
16		expected: Vec<Type>,
17		actual: Type,
18	},
19
20	InvalidValue(String),
21}
22
23impl fmt::Display for AcceptError {
24	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25		match self {
26			Self::TypeMismatch {
27				expected,
28				actual,
29			} => {
30				write!(f, "expected one of {:?}, got {:?}", expected, actual)
31			}
32			Self::InvalidValue(reason) => write!(f, "{reason}"),
33		}
34	}
35}
36
37#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum ConfigKey {
39	OracleWindowSize,
40	OracleWaterMark,
41	QueryRowBatchSize,
42	RowTtlScanBatchSize,
43	RowTtlScanInterval,
44	OperatorTtlScanBatchSize,
45	OperatorTtlScanInterval,
46	HistoricalGcBatchSize,
47	HistoricalGcInterval,
48	CdcTtlDuration,
49	CdcCompactInterval,
50	CdcCompactBlockSize,
51	CdcCompactSafetyLag,
52	CdcCompactMaxBlocksPerTick,
53	CdcCompactBlockCacheCapacity,
54	CdcCompactZstdLevel,
55	ThreadsAsync,
56	ThreadsSystem,
57	ThreadsQuery,
58}
59
60impl ConfigKey {
61	pub fn all() -> &'static [Self] {
62		&[
63			Self::OracleWindowSize,
64			Self::OracleWaterMark,
65			Self::QueryRowBatchSize,
66			Self::RowTtlScanBatchSize,
67			Self::RowTtlScanInterval,
68			Self::OperatorTtlScanBatchSize,
69			Self::OperatorTtlScanInterval,
70			Self::HistoricalGcBatchSize,
71			Self::HistoricalGcInterval,
72			Self::CdcTtlDuration,
73			Self::CdcCompactInterval,
74			Self::CdcCompactBlockSize,
75			Self::CdcCompactSafetyLag,
76			Self::CdcCompactMaxBlocksPerTick,
77			Self::CdcCompactBlockCacheCapacity,
78			Self::CdcCompactZstdLevel,
79			Self::ThreadsAsync,
80			Self::ThreadsSystem,
81			Self::ThreadsQuery,
82		]
83	}
84
85	pub fn default_value(&self) -> Value {
86		match self {
87			Self::OracleWindowSize => Value::Uint8(500),
88			Self::OracleWaterMark => Value::Uint8(20),
89			Self::QueryRowBatchSize => Value::Uint2(32),
90			Self::RowTtlScanBatchSize => Value::Uint8(10000),
91			Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
92			Self::OperatorTtlScanBatchSize => Value::Uint8(10000),
93			Self::OperatorTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
94			Self::HistoricalGcBatchSize => Value::Uint8(50_000),
95			Self::HistoricalGcInterval => Value::Duration(Duration::from_seconds(30).unwrap()),
96			Self::CdcTtlDuration => Value::None {
97				inner: Type::Duration,
98			},
99			Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
100			Self::CdcCompactBlockSize => Value::Uint8(1024),
101			Self::CdcCompactSafetyLag => Value::Uint8(1024),
102			Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
103			Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
104			Self::CdcCompactZstdLevel => Value::Uint1(7),
105			Self::ThreadsAsync => Value::Uint2(1),
106			Self::ThreadsSystem => Value::Uint2(2),
107			Self::ThreadsQuery => Value::Uint2(1),
108		}
109	}
110
111	pub fn description(&self) -> &'static str {
112		match self {
113			Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
114			Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
115			Self::QueryRowBatchSize => {
116				"Number of rows produced per batch by query / DML pipeline operators."
117			}
118			Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
119			Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
120			Self::OperatorTtlScanBatchSize => {
121				"Max rows to examine per batch during an operator-state TTL scan."
122			}
123			Self::OperatorTtlScanInterval => {
124				"How often the operator-state TTL actor should scan for expired rows."
125			}
126			Self::HistoricalGcBatchSize => {
127				"Max historical (key, version) pairs scanned per shape per historical GC tick."
128			}
129			Self::HistoricalGcInterval => {
130				"How often the historical-version GC actor sweeps __historical for versions older than the oracle read watermark."
131			}
132			Self::CdcTtlDuration => {
133				"Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
134				 when set, must be > 0 and entries older than this duration are evicted regardless \
135				 of consumer state."
136			}
137			Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
138			Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
139			Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
140			Self::CdcCompactMaxBlocksPerTick => {
141				"Upper bound on consecutive blocks produced per actor tick."
142			}
143			Self::CdcCompactBlockCacheCapacity => {
144				"Number of decompressed CDC blocks held in the in-memory LRU cache."
145			}
146			Self::CdcCompactZstdLevel => {
147				"Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
148				 slower compression. Decompression cost is independent of level."
149			}
150			Self::ThreadsAsync => {
151				"Number of worker threads for the async runtime. Must be >= 1. \
152				 Read at boot before the runtime starts; changes require restart."
153			}
154			Self::ThreadsSystem => {
155				"Number of worker threads for the system pool (lightweight actors). \
156				 Must be >= 1. Changes require restart."
157			}
158			Self::ThreadsQuery => {
159				"Number of worker threads for the query pool (execution-heavy actors). \
160				 Must be >= 1. Changes require restart."
161			}
162		}
163	}
164
165	pub fn requires_restart(&self) -> bool {
166		match self {
167			Self::OracleWindowSize => false,
168			Self::OracleWaterMark => false,
169			Self::QueryRowBatchSize => false,
170			Self::RowTtlScanBatchSize => false,
171			Self::RowTtlScanInterval => false,
172			Self::OperatorTtlScanBatchSize => false,
173			Self::OperatorTtlScanInterval => false,
174			Self::HistoricalGcBatchSize => false,
175			Self::HistoricalGcInterval => false,
176			Self::CdcTtlDuration => false,
177			Self::CdcCompactInterval => false,
178			Self::CdcCompactBlockSize => false,
179			Self::CdcCompactSafetyLag => false,
180			Self::CdcCompactMaxBlocksPerTick => false,
181			Self::CdcCompactBlockCacheCapacity => true,
182			Self::CdcCompactZstdLevel => false,
183			Self::ThreadsAsync => true,
184			Self::ThreadsSystem => true,
185			Self::ThreadsQuery => true,
186		}
187	}
188
189	pub fn expected_types(&self) -> &'static [Type] {
190		match self {
191			Self::OracleWindowSize => &[Type::Uint8],
192			Self::OracleWaterMark => &[Type::Uint8],
193			Self::QueryRowBatchSize => &[Type::Uint2],
194			Self::RowTtlScanBatchSize => &[Type::Uint8],
195			Self::RowTtlScanInterval => &[Type::Duration],
196			Self::OperatorTtlScanBatchSize => &[Type::Uint8],
197			Self::OperatorTtlScanInterval => &[Type::Duration],
198			Self::HistoricalGcBatchSize => &[Type::Uint8],
199			Self::HistoricalGcInterval => &[Type::Duration],
200			Self::CdcTtlDuration => &[Type::Duration],
201			Self::CdcCompactInterval => &[Type::Duration],
202			Self::CdcCompactBlockSize => &[Type::Uint8],
203			Self::CdcCompactSafetyLag => &[Type::Uint8],
204			Self::CdcCompactMaxBlocksPerTick => &[Type::Uint8],
205			Self::CdcCompactBlockCacheCapacity => &[Type::Uint8],
206			Self::CdcCompactZstdLevel => &[Type::Uint1],
207			Self::ThreadsAsync => &[Type::Uint2],
208			Self::ThreadsSystem => &[Type::Uint2],
209			Self::ThreadsQuery => &[Type::Uint2],
210		}
211	}
212
213	pub fn is_optional(&self) -> bool {
214		match self {
215			Self::OracleWindowSize => false,
216			Self::OracleWaterMark => false,
217			Self::QueryRowBatchSize => false,
218			Self::RowTtlScanBatchSize => false,
219			Self::RowTtlScanInterval => false,
220			Self::OperatorTtlScanBatchSize => false,
221			Self::OperatorTtlScanInterval => false,
222			Self::HistoricalGcBatchSize => false,
223			Self::HistoricalGcInterval => false,
224			Self::CdcTtlDuration => true,
225			Self::CdcCompactInterval => false,
226			Self::CdcCompactBlockSize => false,
227			Self::CdcCompactSafetyLag => false,
228			Self::CdcCompactMaxBlocksPerTick => false,
229			Self::CdcCompactBlockCacheCapacity => false,
230			Self::CdcCompactZstdLevel => false,
231			Self::ThreadsAsync => false,
232			Self::ThreadsSystem => false,
233			Self::ThreadsQuery => false,
234		}
235	}
236
237	fn validate_canonical(&self, value: &Value) -> Result<(), String> {
238		match self {
239			Self::CdcTtlDuration => match value {
240				Value::None {
241					..
242				} => Ok(()),
243				Value::Duration(d) => {
244					if d.is_positive() {
245						Ok(())
246					} else {
247						Err("CDC_TTL_DURATION must be greater than zero".to_string())
248					}
249				}
250				_ => Ok(()),
251			},
252			Self::CdcCompactInterval => match value {
253				Value::Duration(d) => {
254					if d.is_positive() {
255						Ok(())
256					} else {
257						Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
258					}
259				}
260				_ => Ok(()),
261			},
262			Self::CdcCompactBlockSize => match value {
263				Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
264				_ => Ok(()),
265			},
266			Self::QueryRowBatchSize => match value {
267				Value::Uint2(0) => Err("QUERY_ROW_BATCH_SIZE must be greater than zero".to_string()),
268				_ => Ok(()),
269			},
270			Self::CdcCompactBlockCacheCapacity => match value {
271				Value::Uint8(0) => {
272					Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
273				}
274				_ => Ok(()),
275			},
276			Self::CdcCompactZstdLevel => match value {
277				Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
278				Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
279				_ => Ok(()),
280			},
281			Self::HistoricalGcBatchSize => match value {
282				Value::Uint8(0) => {
283					Err("HISTORICAL_GC_BATCH_SIZE must be greater than zero".to_string())
284				}
285				_ => Ok(()),
286			},
287			Self::HistoricalGcInterval => match value {
288				Value::Duration(d) => {
289					if d.is_positive() {
290						Ok(())
291					} else {
292						Err("HISTORICAL_GC_INTERVAL must be greater than zero".to_string())
293					}
294				}
295				_ => Ok(()),
296			},
297			Self::ThreadsAsync => match value {
298				Value::Uint2(0) => Err("THREADS_ASYNC must be greater than zero".to_string()),
299				_ => Ok(()),
300			},
301			Self::ThreadsSystem => match value {
302				Value::Uint2(0) => Err("THREADS_SYSTEM must be greater than zero".to_string()),
303				_ => Ok(()),
304			},
305			Self::ThreadsQuery => match value {
306				Value::Uint2(0) => Err("THREADS_QUERY must be greater than zero".to_string()),
307				_ => Ok(()),
308			},
309			_ => Ok(()),
310		}
311	}
312
313	pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
314		if let Value::None {
315			inner,
316		} = &value
317		{
318			if self.is_optional() && self.expected_types().contains(inner) {
319				return Ok(value);
320			}
321			return Err(AcceptError::TypeMismatch {
322				expected: self.expected_types().to_vec(),
323				actual: value.get_type(),
324			});
325		}
326
327		let canonical = if self.expected_types().contains(&value.get_type()) {
328			value
329		} else {
330			try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
331				expected: self.expected_types().to_vec(),
332				actual: value.get_type(),
333			})?
334		};
335
336		self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
337		Ok(canonical)
338	}
339}
340
341fn try_coerce_numeric(value: &Value, expected: &[Type]) -> Option<Value> {
342	for target in expected {
343		let coerced = match target {
344			Type::Uint1 => {
345				value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
346			}
347			Type::Uint2 => {
348				value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
349			}
350			Type::Uint4 => {
351				value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
352			}
353			Type::Uint8 => {
354				value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
355			}
356			Type::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
357			Type::Int1 => value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8)),
358			Type::Int2 => {
359				value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
360			}
361			Type::Int4 => {
362				value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
363			}
364			Type::Int8 => {
365				value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
366			}
367			Type::Int16 => {
368				value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
369			}
370			Type::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
371			Type::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
372			Type::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
373			Type::Float4 => {
374				value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
375			}
376			Type::Float8 => {
377				value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
378			}
379			Type::Duration => value
380				.to_usize()
381				.and_then(|v| Duration::from_seconds(v as i64).ok())
382				.map(Value::Duration),
383			_ => None,
384		};
385		if coerced.is_some() {
386			return coerced;
387		}
388	}
389	None
390}
391
392impl fmt::Display for ConfigKey {
393	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394		match self {
395			Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
396			Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
397			Self::QueryRowBatchSize => write!(f, "QUERY_ROW_BATCH_SIZE"),
398			Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
399			Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
400			Self::OperatorTtlScanBatchSize => write!(f, "OPERATOR_TTL_SCAN_BATCH_SIZE"),
401			Self::OperatorTtlScanInterval => write!(f, "OPERATOR_TTL_SCAN_INTERVAL"),
402			Self::HistoricalGcBatchSize => write!(f, "HISTORICAL_GC_BATCH_SIZE"),
403			Self::HistoricalGcInterval => write!(f, "HISTORICAL_GC_INTERVAL"),
404			Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
405			Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
406			Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
407			Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
408			Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
409			Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
410			Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
411			Self::ThreadsAsync => write!(f, "THREADS_ASYNC"),
412			Self::ThreadsSystem => write!(f, "THREADS_SYSTEM"),
413			Self::ThreadsQuery => write!(f, "THREADS_QUERY"),
414		}
415	}
416}
417
418impl FromStr for ConfigKey {
419	type Err = String;
420
421	fn from_str(s: &str) -> Result<Self, Self::Err> {
422		match s {
423			"ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
424			"ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
425			"QUERY_ROW_BATCH_SIZE" => Ok(Self::QueryRowBatchSize),
426			"ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
427			"ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
428			"OPERATOR_TTL_SCAN_BATCH_SIZE" => Ok(Self::OperatorTtlScanBatchSize),
429			"OPERATOR_TTL_SCAN_INTERVAL" => Ok(Self::OperatorTtlScanInterval),
430			"HISTORICAL_GC_BATCH_SIZE" => Ok(Self::HistoricalGcBatchSize),
431			"HISTORICAL_GC_INTERVAL" => Ok(Self::HistoricalGcInterval),
432			"CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
433			"CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
434			"CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
435			"CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
436			"CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
437			"CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
438			"CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
439			"THREADS_ASYNC" => Ok(Self::ThreadsAsync),
440			"THREADS_SYSTEM" => Ok(Self::ThreadsSystem),
441			"THREADS_QUERY" => Ok(Self::ThreadsQuery),
442			_ => Err(format!("Unknown system configuration key: {}", s)),
443		}
444	}
445}
446
447#[derive(Debug, Clone)]
448pub struct Config {
449	pub key: ConfigKey,
450
451	pub value: Value,
452
453	pub default_value: Value,
454
455	pub description: &'static str,
456
457	pub requires_restart: bool,
458}
459
460pub trait GetConfig: Send + Sync {
461	fn get_config(&self, key: ConfigKey) -> Value;
462
463	fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
464
465	fn get_config_uint8(&self, key: ConfigKey) -> u64 {
466		let val = self.get_config(key);
467		match val {
468			Value::Uint8(v) => v,
469			v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
470		}
471	}
472
473	fn get_config_uint1(&self, key: ConfigKey) -> u8 {
474		let val = self.get_config(key);
475		match val {
476			Value::Uint1(v) => v,
477			v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
478		}
479	}
480
481	fn get_config_uint2(&self, key: ConfigKey) -> u16 {
482		let val = self.get_config(key);
483		match val {
484			Value::Uint2(v) => v,
485			v => panic!("config key '{}' expected Uint2, got {:?}", key, v),
486		}
487	}
488
489	fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
490		let val = self.get_config(key);
491		match val {
492			Value::Duration(v) => {
493				let total_nanos =
494					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
495				StdDuration::from_nanos(total_nanos.max(0) as u64)
496			}
497			v => panic!("config key '{}' expected Duration, got {:?}", key, v),
498		}
499	}
500
501	fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
502		match self.get_config(key) {
503			Value::None {
504				..
505			} => None,
506			Value::Duration(v) => {
507				let total_nanos =
508					(v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
509				Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
510			}
511			v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
512		}
513	}
514}
515
516#[cfg(test)]
517mod tests {
518	use super::*;
519
520	#[test]
521	fn test_cdc_ttl_default_is_typed_null() {
522		// Defaulting to Value::None means "TTL not configured" - producer skips cleanup.
523		let default = ConfigKey::CdcTtlDuration.default_value();
524		assert!(matches!(
525			default,
526			Value::None {
527				inner: Type::Duration
528			}
529		));
530	}
531
532	#[test]
533	fn test_cdc_ttl_accept_passes_typed_null() {
534		let none = Value::None {
535			inner: Type::Duration,
536		};
537		let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
538		assert_eq!(v, none);
539	}
540
541	#[test]
542	fn test_cdc_ttl_accept_passes_positive_duration() {
543		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
544		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
545
546		let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
547		assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
548	}
549
550	#[test]
551	fn test_cdc_ttl_accept_rejects_zero() {
552		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
553		match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
554			AcceptError::InvalidValue(reason) => {
555				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
556			}
557			other => panic!("expected InvalidValue, got {other:?}"),
558		}
559	}
560
561	#[test]
562	fn test_cdc_ttl_accept_rejects_negative() {
563		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
564		assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
565	}
566
567	#[test]
568	fn test_other_keys_accept_in_type_values() {
569		// Keys without bespoke validation should accept any in-type value.
570		assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
571		assert!(ConfigKey::RowTtlScanInterval
572			.accept(Value::Duration(Duration::from_seconds(0).unwrap()))
573			.is_ok());
574	}
575
576	#[test]
577	fn test_cdc_ttl_round_trips_through_display_and_from_str() {
578		let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
579		assert_eq!(key, ConfigKey::CdcTtlDuration);
580		assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
581	}
582
583	#[test]
584	fn test_cdc_ttl_in_all() {
585		assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
586	}
587
588	#[test]
589	fn test_all_contains_every_compact_key_and_has_expected_len() {
590		let all = ConfigKey::all();
591		assert_eq!(all.len(), 19);
592		assert!(all.contains(&ConfigKey::CdcCompactInterval));
593		assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
594		assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
595		assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
596		assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
597		assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
598		assert!(all.contains(&ConfigKey::QueryRowBatchSize));
599		assert!(all.contains(&ConfigKey::ThreadsAsync));
600		assert!(all.contains(&ConfigKey::ThreadsSystem));
601		assert!(all.contains(&ConfigKey::ThreadsQuery));
602	}
603
604	#[test]
605	fn test_threads_keys_round_trip() {
606		assert_eq!("THREADS_ASYNC".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsAsync);
607		assert_eq!("THREADS_SYSTEM".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsSystem);
608		assert_eq!("THREADS_QUERY".parse::<ConfigKey>().unwrap(), ConfigKey::ThreadsQuery);
609		assert_eq!(format!("{}", ConfigKey::ThreadsAsync), "THREADS_ASYNC");
610		assert_eq!(format!("{}", ConfigKey::ThreadsSystem), "THREADS_SYSTEM");
611		assert_eq!(format!("{}", ConfigKey::ThreadsQuery), "THREADS_QUERY");
612	}
613
614	#[test]
615	fn test_threads_defaults() {
616		assert_eq!(ConfigKey::ThreadsAsync.default_value(), Value::Uint2(1));
617		assert_eq!(ConfigKey::ThreadsSystem.default_value(), Value::Uint2(2));
618		assert_eq!(ConfigKey::ThreadsQuery.default_value(), Value::Uint2(1));
619	}
620
621	#[test]
622	fn test_threads_reject_zero() {
623		for key in [ConfigKey::ThreadsAsync, ConfigKey::ThreadsSystem, ConfigKey::ThreadsQuery] {
624			match key.accept(Value::Uint2(0)).unwrap_err() {
625				AcceptError::InvalidValue(reason) => {
626					assert!(
627						reason.contains("greater than zero"),
628						"{key}: unexpected reason: {reason}"
629					);
630				}
631				other => panic!("{key}: expected InvalidValue, got {other:?}"),
632			}
633		}
634	}
635
636	#[test]
637	fn test_threads_accept_positive() {
638		assert_eq!(ConfigKey::ThreadsAsync.accept(Value::Uint2(4)).unwrap(), Value::Uint2(4));
639		assert_eq!(ConfigKey::ThreadsSystem.accept(Value::Uint2(8)).unwrap(), Value::Uint2(8));
640		assert_eq!(ConfigKey::ThreadsQuery.accept(Value::Uint2(16)).unwrap(), Value::Uint2(16));
641	}
642
643	#[test]
644	fn test_threads_coerce_int4_to_uint2() {
645		let v = ConfigKey::ThreadsQuery.accept(Value::Int4(8)).unwrap();
646		assert_eq!(v, Value::Uint2(8));
647	}
648
649	#[test]
650	fn test_threads_require_restart() {
651		assert!(ConfigKey::ThreadsAsync.requires_restart());
652		assert!(ConfigKey::ThreadsSystem.requires_restart());
653		assert!(ConfigKey::ThreadsQuery.requires_restart());
654	}
655
656	#[test]
657	fn test_query_row_batch_size_default_is_uint2_32() {
658		assert_eq!(ConfigKey::QueryRowBatchSize.default_value(), Value::Uint2(32));
659	}
660
661	#[test]
662	fn test_query_row_batch_size_round_trips_through_display_and_from_str() {
663		let key: ConfigKey = "QUERY_ROW_BATCH_SIZE".parse().unwrap();
664		assert_eq!(key, ConfigKey::QueryRowBatchSize);
665		assert_eq!(format!("{}", ConfigKey::QueryRowBatchSize), "QUERY_ROW_BATCH_SIZE");
666	}
667
668	#[test]
669	fn test_query_row_batch_size_accept_rejects_zero() {
670		match ConfigKey::QueryRowBatchSize.accept(Value::Uint2(0)).unwrap_err() {
671			AcceptError::InvalidValue(reason) => {
672				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
673			}
674			other => panic!("expected InvalidValue, got {other:?}"),
675		}
676	}
677
678	#[test]
679	fn test_query_row_batch_size_accept_passes_positive() {
680		assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1)).unwrap(), Value::Uint2(1));
681		assert_eq!(ConfigKey::QueryRowBatchSize.accept(Value::Uint2(1024)).unwrap(), Value::Uint2(1024));
682	}
683
684	#[test]
685	fn test_query_row_batch_size_accept_rejects_zero_after_coercion() {
686		match ConfigKey::QueryRowBatchSize.accept(Value::Int4(0)).unwrap_err() {
687			AcceptError::InvalidValue(reason) => {
688				assert!(reason.contains("greater than zero"));
689			}
690			other => panic!("expected InvalidValue, got {other:?}"),
691		}
692	}
693
694	#[test]
695	fn test_query_row_batch_size_coerces_int4_to_uint2() {
696		let v = ConfigKey::QueryRowBatchSize.accept(Value::Int4(64)).unwrap();
697		assert_eq!(v, Value::Uint2(64));
698	}
699
700	#[test]
701	fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
702		let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
703		assert_eq!(key, ConfigKey::CdcCompactInterval);
704		assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
705	}
706
707	#[test]
708	fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
709		let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
710		assert_eq!(key, ConfigKey::CdcCompactBlockSize);
711		assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
712	}
713
714	#[test]
715	fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
716		let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
717		assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
718		assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
719	}
720
721	#[test]
722	fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
723		let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
724		assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
725		assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
726	}
727
728	#[test]
729	fn test_cdc_compact_interval_default_is_duration() {
730		assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
731	}
732
733	#[test]
734	fn test_cdc_compact_block_size_default_is_uint8_1024() {
735		assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
736	}
737
738	#[test]
739	fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
740		assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
741	}
742
743	#[test]
744	fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
745		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
746	}
747
748	#[test]
749	fn test_cdc_compact_interval_accept_passes_positive_duration() {
750		let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
751		assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
752	}
753
754	#[test]
755	fn test_cdc_compact_interval_accept_rejects_zero() {
756		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
757		match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
758			AcceptError::InvalidValue(reason) => {
759				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
760			}
761			other => panic!("expected InvalidValue, got {other:?}"),
762		}
763	}
764
765	#[test]
766	fn test_cdc_compact_interval_accept_rejects_negative() {
767		let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
768		assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
769	}
770
771	#[test]
772	fn test_cdc_compact_block_size_accept_rejects_zero() {
773		match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
774			AcceptError::InvalidValue(reason) => {
775				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
776			}
777			other => panic!("expected InvalidValue, got {other:?}"),
778		}
779	}
780
781	#[test]
782	fn test_cdc_compact_block_size_accept_passes_positive() {
783		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
784		assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
785	}
786
787	#[test]
788	fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
789		assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
790		assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
791	}
792
793	#[test]
794	fn test_accept_coerces_int4_to_uint8_for_block_size() {
795		// SET CONFIG CDC_COMPACT_BLOCK_SIZE = 1024 (parsed as Int4) becomes Uint8(1024).
796		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
797		assert_eq!(v, Value::Uint8(1024));
798	}
799
800	#[test]
801	fn test_accept_coerces_int8_to_uint8_for_block_size() {
802		let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
803		assert_eq!(v, Value::Uint8(2048));
804	}
805
806	#[test]
807	fn test_accept_rejects_zero_after_coercion() {
808		// Int4(0) coerces to Uint8(0), then validate_canonical rejects it.
809		match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
810			AcceptError::InvalidValue(reason) => {
811				assert!(reason.contains("greater than zero"));
812			}
813			other => panic!("expected InvalidValue, got {other:?}"),
814		}
815	}
816
817	#[test]
818	fn test_accept_rejects_negative_int_for_uint8_key() {
819		// to_usize() returns None for negatives -> all coercion arms fail -> TypeMismatch.
820		assert!(matches!(
821			ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
822			Err(AcceptError::TypeMismatch { .. })
823		));
824	}
825
826	#[test]
827	fn test_accept_coerces_int_to_duration_via_seconds() {
828		// SET CONFIG CDC_COMPACT_INTERVAL = 60 (Int4) -> Duration(60s).
829		let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
830		assert!(matches!(v, Value::Duration(_)));
831	}
832
833	#[test]
834	fn test_accept_idempotent_on_canonical_uint8() {
835		let canonical = Value::Uint8(42);
836		assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
837	}
838
839	#[test]
840	fn test_accept_idempotent_on_canonical_duration() {
841		let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
842		assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
843	}
844
845	#[test]
846	fn test_accept_rejects_typed_null_for_non_optional_key() {
847		let err = ConfigKey::CdcCompactBlockSize
848			.accept(Value::None {
849				inner: Type::Uint8,
850			})
851			.unwrap_err();
852		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
853	}
854
855	#[test]
856	fn test_accept_passes_typed_null_for_optional_key() {
857		let none = Value::None {
858			inner: Type::Duration,
859		};
860		assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
861	}
862
863	#[test]
864	fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
865		// Optional key still rejects typed-null whose inner doesn't match expected_types.
866		let err = ConfigKey::CdcTtlDuration
867			.accept(Value::None {
868				inner: Type::Uint8,
869			})
870			.unwrap_err();
871		assert!(matches!(err, AcceptError::TypeMismatch { .. }));
872	}
873
874	#[test]
875	fn test_historical_gc_keys_round_trip() {
876		assert_eq!("HISTORICAL_GC_BATCH_SIZE".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcBatchSize);
877		assert_eq!("HISTORICAL_GC_INTERVAL".parse::<ConfigKey>().unwrap(), ConfigKey::HistoricalGcInterval);
878		assert_eq!(format!("{}", ConfigKey::HistoricalGcBatchSize), "HISTORICAL_GC_BATCH_SIZE");
879		assert_eq!(format!("{}", ConfigKey::HistoricalGcInterval), "HISTORICAL_GC_INTERVAL");
880	}
881
882	#[test]
883	fn test_historical_gc_defaults() {
884		assert_eq!(ConfigKey::HistoricalGcBatchSize.default_value(), Value::Uint8(50_000));
885		assert!(matches!(ConfigKey::HistoricalGcInterval.default_value(), Value::Duration(_)));
886	}
887
888	#[test]
889	fn test_historical_gc_batch_size_rejects_zero() {
890		match ConfigKey::HistoricalGcBatchSize.accept(Value::Uint8(0)).unwrap_err() {
891			AcceptError::InvalidValue(reason) => {
892				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
893			}
894			other => panic!("expected InvalidValue, got {other:?}"),
895		}
896	}
897
898	#[test]
899	fn test_historical_gc_interval_rejects_zero() {
900		let zero = Value::Duration(Duration::from_seconds(0).unwrap());
901		match ConfigKey::HistoricalGcInterval.accept(zero).unwrap_err() {
902			AcceptError::InvalidValue(reason) => {
903				assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
904			}
905			other => panic!("expected InvalidValue, got {other:?}"),
906		}
907	}
908}