1use std::{fmt, str::FromStr, time::Duration as StdDuration};
5
6use reifydb_type::value::{
7 Value, decimal::Decimal, duration::Duration, int::Int, ordered_f32::OrderedF32, ordered_f64::OrderedF64,
8 r#type::Type, uint::Uint,
9};
10
11use crate::common::CommitVersion;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum AcceptError {
16 TypeMismatch {
18 expected: Vec<Type>,
19 actual: Type,
20 },
21 InvalidValue(String),
24}
25
26impl fmt::Display for AcceptError {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 match self {
29 Self::TypeMismatch {
30 expected,
31 actual,
32 } => {
33 write!(f, "expected one of {:?}, got {:?}", expected, actual)
34 }
35 Self::InvalidValue(reason) => write!(f, "{reason}"),
36 }
37 }
38}
39
40#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
41pub enum ConfigKey {
42 OracleWindowSize,
43 OracleWaterMark,
44 RowTtlScanBatchSize,
45 RowTtlScanInterval,
46 CdcTtlDuration,
47 CdcCompactInterval,
48 CdcCompactBlockSize,
49 CdcCompactSafetyLag,
50 CdcCompactMaxBlocksPerTick,
51 CdcCompactBlockCacheCapacity,
52 CdcCompactZstdLevel,
53}
54
55impl ConfigKey {
56 pub fn all() -> &'static [Self] {
57 &[
58 Self::OracleWindowSize,
59 Self::OracleWaterMark,
60 Self::RowTtlScanBatchSize,
61 Self::RowTtlScanInterval,
62 Self::CdcTtlDuration,
63 Self::CdcCompactInterval,
64 Self::CdcCompactBlockSize,
65 Self::CdcCompactSafetyLag,
66 Self::CdcCompactMaxBlocksPerTick,
67 Self::CdcCompactBlockCacheCapacity,
68 Self::CdcCompactZstdLevel,
69 ]
70 }
71
72 pub fn default_value(&self) -> Value {
73 match self {
74 Self::OracleWindowSize => Value::Uint8(500),
75 Self::OracleWaterMark => Value::Uint8(20),
76 Self::RowTtlScanBatchSize => Value::Uint8(10000),
77 Self::RowTtlScanInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
78 Self::CdcTtlDuration => Value::None {
79 inner: Type::Duration,
80 },
81 Self::CdcCompactInterval => Value::Duration(Duration::from_seconds(60).unwrap()),
82 Self::CdcCompactBlockSize => Value::Uint8(1024),
83 Self::CdcCompactSafetyLag => Value::Uint8(1024),
84 Self::CdcCompactMaxBlocksPerTick => Value::Uint8(16),
85 Self::CdcCompactBlockCacheCapacity => Value::Uint8(8),
86 Self::CdcCompactZstdLevel => Value::Uint1(7),
87 }
88 }
89
90 pub fn description(&self) -> &'static str {
91 match self {
92 Self::OracleWindowSize => "Number of transactions per conflict-detection window.",
93 Self::OracleWaterMark => "Number of conflict windows retained before cleanup is triggered.",
94 Self::RowTtlScanBatchSize => "Max rows to examine per batch during a row TTL scan.",
95 Self::RowTtlScanInterval => "How often the row TTL actor should scan for expired rows.",
96 Self::CdcTtlDuration => {
97 "Maximum age of CDC entries before eviction. When unset, CDC is retained forever; \
98 when set, must be > 0 and entries older than this duration are evicted regardless \
99 of consumer state."
100 }
101 Self::CdcCompactInterval => "How often the CDC compaction actor runs.",
102 Self::CdcCompactBlockSize => "Number of CDC entries packed into one compressed block.",
103 Self::CdcCompactSafetyLag => "Versions newer than (max_version - lag) are never compacted.",
104 Self::CdcCompactMaxBlocksPerTick => {
105 "Upper bound on consecutive blocks produced per actor tick."
106 }
107 Self::CdcCompactBlockCacheCapacity => {
108 "Number of decompressed CDC blocks held in the in-memory LRU cache."
109 }
110 Self::CdcCompactZstdLevel => {
111 "Zstd compression level for CDC blocks. Range 1-22; higher means smaller blocks but \
112 slower compression. Decompression cost is independent of level."
113 }
114 }
115 }
116
117 pub fn requires_restart(&self) -> bool {
118 match self {
119 Self::OracleWindowSize => false,
120 Self::OracleWaterMark => false,
121 Self::RowTtlScanBatchSize => false,
122 Self::RowTtlScanInterval => false,
123 Self::CdcTtlDuration => false,
124 Self::CdcCompactInterval => false,
125 Self::CdcCompactBlockSize => false,
126 Self::CdcCompactSafetyLag => false,
127 Self::CdcCompactMaxBlocksPerTick => false,
128 Self::CdcCompactBlockCacheCapacity => true,
129 Self::CdcCompactZstdLevel => false,
130 }
131 }
132
133 pub fn expected_types(&self) -> &'static [Type] {
134 match self {
135 Self::OracleWindowSize => &[Type::Uint8],
136 Self::OracleWaterMark => &[Type::Uint8],
137 Self::RowTtlScanBatchSize => &[Type::Uint8],
138 Self::RowTtlScanInterval => &[Type::Duration],
139 Self::CdcTtlDuration => &[Type::Duration],
140 Self::CdcCompactInterval => &[Type::Duration],
141 Self::CdcCompactBlockSize => &[Type::Uint8],
142 Self::CdcCompactSafetyLag => &[Type::Uint8],
143 Self::CdcCompactMaxBlocksPerTick => &[Type::Uint8],
144 Self::CdcCompactBlockCacheCapacity => &[Type::Uint8],
145 Self::CdcCompactZstdLevel => &[Type::Uint1],
146 }
147 }
148
149 pub fn is_optional(&self) -> bool {
154 match self {
155 Self::OracleWindowSize => false,
156 Self::OracleWaterMark => false,
157 Self::RowTtlScanBatchSize => false,
158 Self::RowTtlScanInterval => false,
159 Self::CdcTtlDuration => true,
160 Self::CdcCompactInterval => false,
161 Self::CdcCompactBlockSize => false,
162 Self::CdcCompactSafetyLag => false,
163 Self::CdcCompactMaxBlocksPerTick => false,
164 Self::CdcCompactBlockCacheCapacity => false,
165 Self::CdcCompactZstdLevel => false,
166 }
167 }
168
169 fn validate_canonical(&self, value: &Value) -> Result<(), String> {
174 match self {
175 Self::CdcTtlDuration => match value {
176 Value::None {
177 ..
178 } => Ok(()),
179 Value::Duration(d) => {
180 if d.is_positive() {
181 Ok(())
182 } else {
183 Err("CDC_TTL_DURATION must be greater than zero".to_string())
184 }
185 }
186 _ => Ok(()),
187 },
188 Self::CdcCompactInterval => match value {
189 Value::Duration(d) => {
190 if d.is_positive() {
191 Ok(())
192 } else {
193 Err("CDC_COMPACT_INTERVAL must be greater than zero".to_string())
194 }
195 }
196 _ => Ok(()),
197 },
198 Self::CdcCompactBlockSize => match value {
199 Value::Uint8(0) => Err("CDC_COMPACT_BLOCK_SIZE must be greater than zero".to_string()),
200 _ => Ok(()),
201 },
202 Self::CdcCompactBlockCacheCapacity => match value {
203 Value::Uint8(0) => {
204 Err("CDC_COMPACT_BLOCK_CACHE_CAPACITY must be greater than zero".to_string())
205 }
206 _ => Ok(()),
207 },
208 Self::CdcCompactZstdLevel => match value {
209 Value::Uint1(v) if (1..=22).contains(v) => Ok(()),
210 Value::Uint1(_) => Err("CDC_COMPACT_ZSTD_LEVEL must be in [1, 22]".to_string()),
211 _ => Ok(()),
212 },
213 _ => Ok(()),
214 }
215 }
216
217 pub fn accept(&self, value: Value) -> Result<Value, AcceptError> {
218 if let Value::None {
219 inner,
220 } = &value
221 {
222 if self.is_optional() && self.expected_types().contains(inner) {
223 return Ok(value);
224 }
225 return Err(AcceptError::TypeMismatch {
226 expected: self.expected_types().to_vec(),
227 actual: value.get_type(),
228 });
229 }
230
231 let canonical = if self.expected_types().contains(&value.get_type()) {
232 value
233 } else {
234 try_coerce_numeric(&value, self.expected_types()).ok_or_else(|| AcceptError::TypeMismatch {
235 expected: self.expected_types().to_vec(),
236 actual: value.get_type(),
237 })?
238 };
239
240 self.validate_canonical(&canonical).map_err(AcceptError::InvalidValue)?;
241 Ok(canonical)
242 }
243}
244
245fn try_coerce_numeric(value: &Value, expected: &[Type]) -> Option<Value> {
246 for target in expected {
247 let coerced = match target {
248 Type::Uint1 => {
249 value.to_usize().filter(|&v| v <= u8::MAX as usize).map(|v| Value::Uint1(v as u8))
250 }
251 Type::Uint2 => {
252 value.to_usize().filter(|&v| v <= u16::MAX as usize).map(|v| Value::Uint2(v as u16))
253 }
254 Type::Uint4 => {
255 value.to_usize().filter(|&v| v <= u32::MAX as usize).map(|v| Value::Uint4(v as u32))
256 }
257 Type::Uint8 => {
258 value.to_usize().filter(|&v| v <= u64::MAX as usize).map(|v| Value::Uint8(v as u64))
259 }
260 Type::Uint16 => value.to_usize().map(|v| Value::Uint16(v as u128)),
261 Type::Int1 => value.to_usize().filter(|&v| v <= i8::MAX as usize).map(|v| Value::Int1(v as i8)),
262 Type::Int2 => {
263 value.to_usize().filter(|&v| v <= i16::MAX as usize).map(|v| Value::Int2(v as i16))
264 }
265 Type::Int4 => {
266 value.to_usize().filter(|&v| v <= i32::MAX as usize).map(|v| Value::Int4(v as i32))
267 }
268 Type::Int8 => {
269 value.to_usize().filter(|&v| v <= i64::MAX as usize).map(|v| Value::Int8(v as i64))
270 }
271 Type::Int16 => {
272 value.to_usize().filter(|&v| v <= i128::MAX as usize).map(|v| Value::Int16(v as i128))
273 }
274 Type::Uint => value.to_usize().map(|v| Value::Uint(Uint::from_u64(v as u64))),
275 Type::Int => value.to_usize().map(|v| Value::Int(Int::from_i64(v as i64))),
276 Type::Decimal => value.to_usize().map(|v| Value::Decimal(Decimal::from_i64(v as i64))),
277 Type::Float4 => {
278 value.to_usize().and_then(|v| OrderedF32::try_from(v as f32).ok()).map(Value::Float4)
279 }
280 Type::Float8 => {
281 value.to_usize().and_then(|v| OrderedF64::try_from(v as f64).ok()).map(Value::Float8)
282 }
283 Type::Duration => value
284 .to_usize()
285 .and_then(|v| Duration::from_seconds(v as i64).ok())
286 .map(Value::Duration),
287 _ => None,
288 };
289 if coerced.is_some() {
290 return coerced;
291 }
292 }
293 None
294}
295
296impl fmt::Display for ConfigKey {
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 match self {
299 Self::OracleWindowSize => write!(f, "ORACLE_WINDOW_SIZE"),
300 Self::OracleWaterMark => write!(f, "ORACLE_WATER_MARK"),
301 Self::RowTtlScanBatchSize => write!(f, "ROW_TTL_SCAN_BATCH_SIZE"),
302 Self::RowTtlScanInterval => write!(f, "ROW_TTL_SCAN_INTERVAL"),
303 Self::CdcTtlDuration => write!(f, "CDC_TTL_DURATION"),
304 Self::CdcCompactInterval => write!(f, "CDC_COMPACT_INTERVAL"),
305 Self::CdcCompactBlockSize => write!(f, "CDC_COMPACT_BLOCK_SIZE"),
306 Self::CdcCompactSafetyLag => write!(f, "CDC_COMPACT_SAFETY_LAG"),
307 Self::CdcCompactMaxBlocksPerTick => write!(f, "CDC_COMPACT_MAX_BLOCKS_PER_TICK"),
308 Self::CdcCompactBlockCacheCapacity => write!(f, "CDC_COMPACT_BLOCK_CACHE_CAPACITY"),
309 Self::CdcCompactZstdLevel => write!(f, "CDC_COMPACT_ZSTD_LEVEL"),
310 }
311 }
312}
313
314impl FromStr for ConfigKey {
315 type Err = String;
316
317 fn from_str(s: &str) -> Result<Self, Self::Err> {
318 match s {
319 "ORACLE_WINDOW_SIZE" => Ok(Self::OracleWindowSize),
320 "ORACLE_WATER_MARK" => Ok(Self::OracleWaterMark),
321 "ROW_TTL_SCAN_BATCH_SIZE" => Ok(Self::RowTtlScanBatchSize),
322 "ROW_TTL_SCAN_INTERVAL" => Ok(Self::RowTtlScanInterval),
323 "CDC_TTL_DURATION" => Ok(Self::CdcTtlDuration),
324 "CDC_COMPACT_INTERVAL" => Ok(Self::CdcCompactInterval),
325 "CDC_COMPACT_BLOCK_SIZE" => Ok(Self::CdcCompactBlockSize),
326 "CDC_COMPACT_SAFETY_LAG" => Ok(Self::CdcCompactSafetyLag),
327 "CDC_COMPACT_MAX_BLOCKS_PER_TICK" => Ok(Self::CdcCompactMaxBlocksPerTick),
328 "CDC_COMPACT_BLOCK_CACHE_CAPACITY" => Ok(Self::CdcCompactBlockCacheCapacity),
329 "CDC_COMPACT_ZSTD_LEVEL" => Ok(Self::CdcCompactZstdLevel),
330 _ => Err(format!("Unknown system configuration key: {}", s)),
331 }
332 }
333}
334
335#[derive(Debug, Clone)]
341pub struct Config {
342 pub key: ConfigKey,
344 pub value: Value,
346 pub default_value: Value,
348 pub description: &'static str,
350 pub requires_restart: bool,
352}
353
354pub trait GetConfig: Send + Sync {
356 fn get_config(&self, key: ConfigKey) -> Value;
358 fn get_config_at(&self, key: ConfigKey, version: CommitVersion) -> Value;
360
361 fn get_config_uint8(&self, key: ConfigKey) -> u64 {
363 let val = self.get_config(key);
364 match val {
365 Value::Uint8(v) => v,
366 v => panic!("config key '{}' expected Uint8, got {:?}", key, v),
367 }
368 }
369
370 fn get_config_uint1(&self, key: ConfigKey) -> u8 {
372 let val = self.get_config(key);
373 match val {
374 Value::Uint1(v) => v,
375 v => panic!("config key '{}' expected Uint1, got {:?}", key, v),
376 }
377 }
378
379 fn get_config_duration(&self, key: ConfigKey) -> StdDuration {
381 let val = self.get_config(key);
382 match val {
383 Value::Duration(v) => {
384 let total_nanos =
385 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
386 StdDuration::from_nanos(total_nanos.max(0) as u64)
387 }
388 v => panic!("config key '{}' expected Duration, got {:?}", key, v),
389 }
390 }
391
392 fn get_config_duration_opt(&self, key: ConfigKey) -> Option<StdDuration> {
395 match self.get_config(key) {
396 Value::None {
397 ..
398 } => None,
399 Value::Duration(v) => {
400 let total_nanos =
401 (v.get_days() as i128 * 24 * 3600 * 1_000_000_000) + (v.get_nanos() as i128);
402 Some(StdDuration::from_nanos(total_nanos.max(0) as u64))
403 }
404 v => panic!("config key '{}' expected Duration or None, got {:?}", key, v),
405 }
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_cdc_ttl_default_is_typed_null() {
415 let default = ConfigKey::CdcTtlDuration.default_value();
417 assert!(matches!(
418 default,
419 Value::None {
420 inner: Type::Duration
421 }
422 ));
423 }
424
425 #[test]
426 fn test_cdc_ttl_accept_passes_typed_null() {
427 let none = Value::None {
428 inner: Type::Duration,
429 };
430 let v = ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap();
431 assert_eq!(v, none);
432 }
433
434 #[test]
435 fn test_cdc_ttl_accept_passes_positive_duration() {
436 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
437 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_sec.clone()).unwrap(), one_sec);
438
439 let one_hour = Value::Duration(Duration::from_seconds(3600).unwrap());
440 assert_eq!(ConfigKey::CdcTtlDuration.accept(one_hour.clone()).unwrap(), one_hour);
441 }
442
443 #[test]
444 fn test_cdc_ttl_accept_rejects_zero() {
445 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
446 match ConfigKey::CdcTtlDuration.accept(zero).unwrap_err() {
447 AcceptError::InvalidValue(reason) => {
448 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
449 }
450 other => panic!("expected InvalidValue, got {other:?}"),
451 }
452 }
453
454 #[test]
455 fn test_cdc_ttl_accept_rejects_negative() {
456 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
457 assert!(matches!(ConfigKey::CdcTtlDuration.accept(negative), Err(AcceptError::InvalidValue(_))));
458 }
459
460 #[test]
461 fn test_other_keys_accept_in_type_values() {
462 assert!(ConfigKey::OracleWindowSize.accept(Value::Uint8(0)).is_ok());
464 assert!(ConfigKey::RowTtlScanInterval
465 .accept(Value::Duration(Duration::from_seconds(0).unwrap()))
466 .is_ok());
467 }
468
469 #[test]
470 fn test_cdc_ttl_round_trips_through_display_and_from_str() {
471 let key: ConfigKey = "CDC_TTL_DURATION".parse().unwrap();
472 assert_eq!(key, ConfigKey::CdcTtlDuration);
473 assert_eq!(format!("{}", ConfigKey::CdcTtlDuration), "CDC_TTL_DURATION");
474 }
475
476 #[test]
477 fn test_cdc_ttl_in_all() {
478 assert!(ConfigKey::all().contains(&ConfigKey::CdcTtlDuration));
479 }
480
481 #[test]
482 fn test_all_contains_every_compact_key_and_has_expected_len() {
483 let all = ConfigKey::all();
484 assert_eq!(all.len(), 11);
485 assert!(all.contains(&ConfigKey::CdcCompactInterval));
486 assert!(all.contains(&ConfigKey::CdcCompactBlockSize));
487 assert!(all.contains(&ConfigKey::CdcCompactSafetyLag));
488 assert!(all.contains(&ConfigKey::CdcCompactMaxBlocksPerTick));
489 assert!(all.contains(&ConfigKey::CdcCompactBlockCacheCapacity));
490 assert!(all.contains(&ConfigKey::CdcCompactZstdLevel));
491 }
492
493 #[test]
494 fn test_cdc_compact_interval_round_trips_through_display_and_from_str() {
495 let key: ConfigKey = "CDC_COMPACT_INTERVAL".parse().unwrap();
496 assert_eq!(key, ConfigKey::CdcCompactInterval);
497 assert_eq!(format!("{}", ConfigKey::CdcCompactInterval), "CDC_COMPACT_INTERVAL");
498 }
499
500 #[test]
501 fn test_cdc_compact_block_size_round_trips_through_display_and_from_str() {
502 let key: ConfigKey = "CDC_COMPACT_BLOCK_SIZE".parse().unwrap();
503 assert_eq!(key, ConfigKey::CdcCompactBlockSize);
504 assert_eq!(format!("{}", ConfigKey::CdcCompactBlockSize), "CDC_COMPACT_BLOCK_SIZE");
505 }
506
507 #[test]
508 fn test_cdc_compact_safety_lag_round_trips_through_display_and_from_str() {
509 let key: ConfigKey = "CDC_COMPACT_SAFETY_LAG".parse().unwrap();
510 assert_eq!(key, ConfigKey::CdcCompactSafetyLag);
511 assert_eq!(format!("{}", ConfigKey::CdcCompactSafetyLag), "CDC_COMPACT_SAFETY_LAG");
512 }
513
514 #[test]
515 fn test_cdc_compact_max_blocks_per_tick_round_trips_through_display_and_from_str() {
516 let key: ConfigKey = "CDC_COMPACT_MAX_BLOCKS_PER_TICK".parse().unwrap();
517 assert_eq!(key, ConfigKey::CdcCompactMaxBlocksPerTick);
518 assert_eq!(format!("{}", ConfigKey::CdcCompactMaxBlocksPerTick), "CDC_COMPACT_MAX_BLOCKS_PER_TICK");
519 }
520
521 #[test]
522 fn test_cdc_compact_interval_default_is_duration() {
523 assert!(matches!(ConfigKey::CdcCompactInterval.default_value(), Value::Duration(_)));
524 }
525
526 #[test]
527 fn test_cdc_compact_block_size_default_is_uint8_1024() {
528 assert_eq!(ConfigKey::CdcCompactBlockSize.default_value(), Value::Uint8(1024));
529 }
530
531 #[test]
532 fn test_cdc_compact_safety_lag_default_is_uint8_1024() {
533 assert_eq!(ConfigKey::CdcCompactSafetyLag.default_value(), Value::Uint8(1024));
534 }
535
536 #[test]
537 fn test_cdc_compact_max_blocks_per_tick_default_is_uint8_16() {
538 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.default_value(), Value::Uint8(16));
539 }
540
541 #[test]
542 fn test_cdc_compact_interval_accept_passes_positive_duration() {
543 let one_sec = Value::Duration(Duration::from_seconds(1).unwrap());
544 assert_eq!(ConfigKey::CdcCompactInterval.accept(one_sec.clone()).unwrap(), one_sec);
545 }
546
547 #[test]
548 fn test_cdc_compact_interval_accept_rejects_zero() {
549 let zero = Value::Duration(Duration::from_seconds(0).unwrap());
550 match ConfigKey::CdcCompactInterval.accept(zero).unwrap_err() {
551 AcceptError::InvalidValue(reason) => {
552 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
553 }
554 other => panic!("expected InvalidValue, got {other:?}"),
555 }
556 }
557
558 #[test]
559 fn test_cdc_compact_interval_accept_rejects_negative() {
560 let negative = Value::Duration(Duration::from_seconds(-5).unwrap());
561 assert!(matches!(ConfigKey::CdcCompactInterval.accept(negative), Err(AcceptError::InvalidValue(_))));
562 }
563
564 #[test]
565 fn test_cdc_compact_block_size_accept_rejects_zero() {
566 match ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(0)).unwrap_err() {
567 AcceptError::InvalidValue(reason) => {
568 assert!(reason.contains("greater than zero"), "unexpected reason: {reason}");
569 }
570 other => panic!("expected InvalidValue, got {other:?}"),
571 }
572 }
573
574 #[test]
575 fn test_cdc_compact_block_size_accept_passes_positive() {
576 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1)).unwrap(), Value::Uint8(1));
577 assert_eq!(ConfigKey::CdcCompactBlockSize.accept(Value::Uint8(1024)).unwrap(), Value::Uint8(1024));
578 }
579
580 #[test]
581 fn test_cdc_compact_safety_lag_and_max_blocks_accept_zero() {
582 assert_eq!(ConfigKey::CdcCompactSafetyLag.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
583 assert_eq!(ConfigKey::CdcCompactMaxBlocksPerTick.accept(Value::Uint8(0)).unwrap(), Value::Uint8(0));
584 }
585
586 #[test]
587 fn test_accept_coerces_int4_to_uint8_for_block_size() {
588 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int4(1024)).unwrap();
590 assert_eq!(v, Value::Uint8(1024));
591 }
592
593 #[test]
594 fn test_accept_coerces_int8_to_uint8_for_block_size() {
595 let v = ConfigKey::CdcCompactBlockSize.accept(Value::Int8(2048)).unwrap();
596 assert_eq!(v, Value::Uint8(2048));
597 }
598
599 #[test]
600 fn test_accept_rejects_zero_after_coercion() {
601 match ConfigKey::CdcCompactBlockSize.accept(Value::Int4(0)).unwrap_err() {
603 AcceptError::InvalidValue(reason) => {
604 assert!(reason.contains("greater than zero"));
605 }
606 other => panic!("expected InvalidValue, got {other:?}"),
607 }
608 }
609
610 #[test]
611 fn test_accept_rejects_negative_int_for_uint8_key() {
612 assert!(matches!(
614 ConfigKey::CdcCompactBlockSize.accept(Value::Int4(-1)),
615 Err(AcceptError::TypeMismatch { .. })
616 ));
617 }
618
619 #[test]
620 fn test_accept_coerces_int_to_duration_via_seconds() {
621 let v = ConfigKey::CdcCompactInterval.accept(Value::Int4(60)).unwrap();
623 assert!(matches!(v, Value::Duration(_)));
624 }
625
626 #[test]
627 fn test_accept_idempotent_on_canonical_uint8() {
628 let canonical = Value::Uint8(42);
629 assert_eq!(ConfigKey::OracleWindowSize.accept(canonical.clone()).unwrap(), canonical);
630 }
631
632 #[test]
633 fn test_accept_idempotent_on_canonical_duration() {
634 let canonical = Value::Duration(Duration::from_seconds(5).unwrap());
635 assert_eq!(ConfigKey::CdcCompactInterval.accept(canonical.clone()).unwrap(), canonical);
636 }
637
638 #[test]
639 fn test_accept_rejects_typed_null_for_non_optional_key() {
640 let err = ConfigKey::CdcCompactBlockSize
641 .accept(Value::None {
642 inner: Type::Uint8,
643 })
644 .unwrap_err();
645 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
646 }
647
648 #[test]
649 fn test_accept_passes_typed_null_for_optional_key() {
650 let none = Value::None {
651 inner: Type::Duration,
652 };
653 assert_eq!(ConfigKey::CdcTtlDuration.accept(none.clone()).unwrap(), none);
654 }
655
656 #[test]
657 fn test_accept_rejects_wrong_inner_type_typed_null_for_optional_key() {
658 let err = ConfigKey::CdcTtlDuration
660 .accept(Value::None {
661 inner: Type::Uint8,
662 })
663 .unwrap_err();
664 assert!(matches!(err, AcceptError::TypeMismatch { .. }));
665 }
666}