net/adapter/net/redex/replication_config.rs
1//! Per-channel replication configuration — Phase B opt-in for
2//! `docs/plans/REDEX_DISTRIBUTED_PLAN.md` §1.
3//!
4//! `ReplicationConfig` is the opt-in surface that turns a v1 / v2
5//! single-node `RedexFile` into a replicated channel. It lives on
6//! [`RedexFileConfig::replication`](super::RedexFileConfig) as an
7//! `Option`; the default `None` keeps every existing channel single-
8//! node (no observable change). Phase C wires the `ReplicationCoordinator`
9//! daemon's spawn path to consult this field on `Redex::open_file`.
10//!
11//! Validation is fail-fast at config-build time — pin invariants here
12//! so a malformed config can't escape into the coordinator's hot
13//! loop. The `validate()` method returns a typed `ReplicationConfigError`
14//! covering every reject path; the `with_*` builder methods are
15//! validation-free for ergonomic chaining and pair with a final
16//! `validate()` call before the config is committed to a `Redex`.
17
18use crate::adapter::net::behavior::placement::NodeId;
19
20/// Replication factor lower bound. `1` collapses to single-node-with-
21/// coordinator (the daemon runs but there's only one replica) — useful
22/// for testing and the brief moment between channel-open and the first
23/// peer joining; below `1` is meaningless.
24pub const REPLICATION_FACTOR_MIN: u8 = 1;
25/// Replication factor upper bound. The protocol allows up to 255
26/// replicas per channel (u8), but anything beyond ~16 stops scaling
27/// (heartbeat fanout dominates) and we clamp here so a misconfig
28/// can't accidentally fanout to hundreds. Operators with genuine
29/// 16+-replica workloads can plumb the override; v1 keeps the
30/// ceiling conservative.
31pub const REPLICATION_FACTOR_MAX: u8 = 16;
32
33/// Default replication factor — three is the conventional minimum
34/// for a single-leader log to tolerate one replica failure while
35/// staying in single-partition quorum-irrelevant configuration.
36pub const REPLICATION_FACTOR_DEFAULT: u8 = 3;
37
38/// Minimum heartbeat cadence. Below 100 ms heartbeat traffic
39/// dominates the channel's effective throughput; pin a floor here so
40/// a misconfig can't accidentally turn the heartbeat path into a
41/// busy-loop.
42pub const HEARTBEAT_MS_MIN: u64 = 100;
43
44/// Maximum heartbeat cadence. Five minutes is well past the longest
45/// reasonable failover detection window; values above this risk
46/// silently disabling silence detection via the
47/// `heartbeat_ms.saturating_mul(miss_threshold)` arithmetic in the
48/// heartbeat tracker, which saturates to `u64::MAX` and makes
49/// `is_leader_silent` always return `false`. Pin a sane ceiling so
50/// a typo / unit-confusion (passing μs instead of ms) can't disable
51/// the failure detector.
52pub const HEARTBEAT_MS_MAX: u64 = 300_000;
53
54/// Default heartbeat cadence — 500 ms matches the plan §1 default and
55/// the §6 "3 × heartbeat" lag bound; with three-missed hysteresis the
56/// effective failover detection window is ~1.5 s, well under the
57/// activation-gate's "< 5 s RTO" target.
58pub const HEARTBEAT_MS_DEFAULT: u64 = 500;
59
60/// Default replication-sync I/O budget as a fraction of measured NIC
61/// peak. `0.5` lets replication consume half the link without
62/// starving foreground publish traffic; tune per channel via
63/// [`ReplicationConfig::with_replication_budget_fraction`].
64pub const REPLICATION_BUDGET_FRACTION_DEFAULT: f32 = 0.5;
65
66/// v0.3 Phase D2 default: fraction of bucket capacity reserved
67/// against `Background` admission. With the default 0.3, a
68/// `Background` request needs `available >= 0.7 * capacity` to
69/// pass the gate — preserving ~70% headroom for Foreground bursts
70/// against sustained Background load.
71pub const BACKGROUND_FRACTION_DEFAULT: f32 = 0.3;
72
73/// Where replicas live and how they're chosen at channel-open time
74/// and on roster change. Mirrors the four-axis intent the plan §1
75/// pins:
76///
77/// - [`PlacementStrategy::Standard`] — let `PlacementFilter` decide
78/// (default for new channels). Reads `metadata.intent`,
79/// `metadata.colocate-with`, `scope:`, proximity, and resource-
80/// availability axes.
81/// - [`PlacementStrategy::Pinned`] — manual placement on a fixed
82/// `NodeId` set. Used for special-case topologies and tests.
83/// - [`PlacementStrategy::ColocationStrict`] — every replica must
84/// live on a node already holding the chain referenced by
85/// `metadata.colocate-with-strict`. Refuses placement on
86/// insufficient-coverage nodes.
87#[derive(Debug, Clone, Default, PartialEq, Eq)]
88pub enum PlacementStrategy {
89 /// Spread across nodes per the `PlacementFilter` primitive shipped
90 /// in The Warriors. Reads `metadata.intent`, `metadata.colocate-
91 /// with`, `scope:`, proximity, and resource-availability axes.
92 /// Default for new channels.
93 #[default]
94 Standard,
95 /// Manual pinning. Used for special-case topologies and tests.
96 /// The vector lists the exact `NodeId` set that should run
97 /// replicas — its length pins the effective replication factor
98 /// regardless of [`ReplicationConfig::factor`].
99 Pinned(Vec<NodeId>),
100 /// Strict colocation — all replicas must be on nodes already
101 /// holding the chain referenced by `metadata.colocate-with-
102 /// strict`. Refuses placement on insufficient-coverage nodes.
103 ColocationStrict,
104}
105
106/// Behavior when a replica falls below the channel's retention
107/// requirement due to local disk pressure. The leader's replication
108/// factor is a hard guarantee; replicas are best-effort under
109/// pressure.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
111pub enum UnderCapacity {
112 /// Drop the replica role; fall through to greedy LRU if also
113 /// enabled. The channel's capability tag for this node is
114 /// withdrawn and reads re-route to the leader. **Default.**
115 #[default]
116 Withdraw,
117 /// Aggressively evict the oldest local data to maintain the
118 /// channel's retention even if total data exceeds disk. Trades
119 /// older data for keeping the replication factor intact.
120 EvictOldest,
121}
122
123/// Per-channel replication opt-in. The default-when-set
124/// [`ReplicationConfig::new`] gives sensible values (factor 3,
125/// 500 ms heartbeat, standard placement, withdraw-under-capacity,
126/// 0.5 NIC peak budget); the `with_*` builders adjust individual
127/// fields.
128///
129/// `validate()` runs the full set of invariants pinned at module top
130/// — callers should invoke it before committing the config to a
131/// `Redex` so a malformed config can't leak into the coordinator's
132/// hot loop.
133#[derive(Debug, Clone, PartialEq)]
134pub struct ReplicationConfig {
135 /// Replication factor — number of replicas (including the leader)
136 /// maintained. Must satisfy
137 /// `REPLICATION_FACTOR_MIN <= factor <= REPLICATION_FACTOR_MAX`.
138 /// Default [`REPLICATION_FACTOR_DEFAULT`].
139 pub factor: u8,
140 /// How replicas are chosen when first instantiated and on roster
141 /// change. Default [`PlacementStrategy::Standard`].
142 pub placement: PlacementStrategy,
143 /// Heartbeat interval between leader and replicas, in
144 /// milliseconds. Must satisfy
145 /// `heartbeat_ms >= HEARTBEAT_MS_MIN`. Default
146 /// [`HEARTBEAT_MS_DEFAULT`].
147 pub heartbeat_ms: u64,
148 /// Optional override pinning the leader to a specific node.
149 /// `None` = leader is the channel's natural publisher (the
150 /// `ChannelPublisher` home). When `Some(node)`, the override
151 /// applies on every leader election; the deterministic
152 /// nearest-RTT election picks `node` whenever it's healthy.
153 pub leader_pinned: Option<NodeId>,
154 /// Behavior when a replica falls below the channel's retention
155 /// requirement due to local disk pressure. Default
156 /// [`UnderCapacity::Withdraw`].
157 pub on_under_capacity: UnderCapacity,
158 /// Bandwidth budget for replication-sync I/O, as a fraction of
159 /// measured NIC peak. Must lie in `(0.0, 1.0]` and be finite.
160 /// Default [`REPLICATION_BUDGET_FRACTION_DEFAULT`].
161 pub replication_budget_fraction: f32,
162 /// v0.3 Phase D2: per-channel default
163 /// [`BandwidthClass`](super::bandwidth::BandwidthClass) the
164 /// runtime stamps on every emitted `SyncRequest` unless the
165 /// caller explicitly overrides it. Receivers honor the
166 /// per-request value in preference to this default. Default
167 /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
168 pub default_bandwidth_class: super::bandwidth::BandwidthClass,
169 /// v0.3 Phase D2: admission-gate parameter — the fraction of
170 /// the bandwidth bucket reserved against `Background`
171 /// requests. A `Background` request is admitted only when
172 /// `available >= (1 - background_fraction) * capacity`.
173 /// Operators tune per channel: hot channels set this low
174 /// (tight Background bound, more Foreground headroom);
175 /// archival channels set it high (give Background more
176 /// room). Must lie in `[0.0, 1.0)` and be finite (1.0
177 /// would deny every Background request unconditionally and
178 /// is rejected; the v0.3 Phase D4 anti-starvation hatch one-
179 /// shot bypasses the gate after 60 s starve regardless of
180 /// the configured value). Default
181 /// [`BACKGROUND_FRACTION_DEFAULT`].
182 pub background_fraction: f32,
183}
184
185impl Default for ReplicationConfig {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191impl ReplicationConfig {
192 /// Construct a [`ReplicationConfig`] with all defaults — factor
193 /// 3, 500 ms heartbeat, standard placement, withdraw-under-
194 /// capacity, 0.5 NIC peak budget, leader = natural publisher.
195 pub fn new() -> Self {
196 Self {
197 factor: REPLICATION_FACTOR_DEFAULT,
198 placement: PlacementStrategy::default(),
199 heartbeat_ms: HEARTBEAT_MS_DEFAULT,
200 leader_pinned: None,
201 on_under_capacity: UnderCapacity::default(),
202 replication_budget_fraction: REPLICATION_BUDGET_FRACTION_DEFAULT,
203 default_bandwidth_class: super::bandwidth::BandwidthClass::Foreground,
204 background_fraction: BACKGROUND_FRACTION_DEFAULT,
205 }
206 }
207
208 /// Set the replication factor. Validate via [`Self::validate`]
209 /// before committing the config to a `Redex`.
210 pub fn with_factor(mut self, factor: u8) -> Self {
211 self.factor = factor;
212 self
213 }
214
215 /// Set the placement strategy.
216 pub fn with_placement(mut self, placement: PlacementStrategy) -> Self {
217 self.placement = placement;
218 self
219 }
220
221 /// Set the heartbeat cadence in milliseconds.
222 pub fn with_heartbeat_ms(mut self, heartbeat_ms: u64) -> Self {
223 self.heartbeat_ms = heartbeat_ms;
224 self
225 }
226
227 /// Pin the leader to a specific `NodeId`. Pass `None` to fall
228 /// back to "leader = natural publisher."
229 pub fn with_leader_pinned(mut self, leader: Option<NodeId>) -> Self {
230 self.leader_pinned = leader;
231 self
232 }
233
234 /// Set the under-capacity policy.
235 pub fn with_on_under_capacity(mut self, on_under_capacity: UnderCapacity) -> Self {
236 self.on_under_capacity = on_under_capacity;
237 self
238 }
239
240 /// Set the replication-sync I/O budget as a fraction of measured
241 /// NIC peak.
242 pub fn with_replication_budget_fraction(mut self, fraction: f32) -> Self {
243 self.replication_budget_fraction = fraction;
244 self
245 }
246
247 /// Set the per-channel default
248 /// [`BandwidthClass`](super::bandwidth::BandwidthClass) the
249 /// runtime stamps on emitted `SyncRequest` frames. v0.3 Phase
250 /// D2 — defaults to `Foreground`.
251 pub fn with_default_bandwidth_class(mut self, class: super::bandwidth::BandwidthClass) -> Self {
252 self.default_bandwidth_class = class;
253 self
254 }
255
256 /// Set the per-channel `background_fraction` — the share of
257 /// the bandwidth bucket reserved against `Background`
258 /// admission. Must lie in `[0.0, 1.0)` and be finite (1.0 is
259 /// rejected as it would deny every Background request
260 /// unconditionally). Default
261 /// [`BACKGROUND_FRACTION_DEFAULT`] (0.3).
262 pub fn with_background_fraction(mut self, fraction: f32) -> Self {
263 self.background_fraction = fraction;
264 self
265 }
266
267 /// Effective replica count — `placement` may override `factor`:
268 /// [`PlacementStrategy::Pinned`] pins the count to the length of
269 /// its `Vec<NodeId>` regardless of the configured factor (the
270 /// operator's explicit list wins over the numeric hint). All
271 /// other strategies honor `factor`.
272 pub fn effective_factor(&self) -> u8 {
273 match &self.placement {
274 PlacementStrategy::Pinned(nodes) => {
275 u8::try_from(nodes.len()).unwrap_or(REPLICATION_FACTOR_MAX)
276 }
277 _ => self.factor,
278 }
279 }
280
281 /// Run every documented invariant. Returns `Ok(())` when the
282 /// config is safe to commit; otherwise a typed
283 /// [`ReplicationConfigError`] naming the first violation. Pin
284 /// in tests; surface to operators on `Redex::open_file`.
285 pub fn validate(&self) -> Result<(), ReplicationConfigError> {
286 if self.factor < REPLICATION_FACTOR_MIN {
287 return Err(ReplicationConfigError::FactorBelowMin {
288 got: self.factor,
289 min: REPLICATION_FACTOR_MIN,
290 });
291 }
292 if self.factor > REPLICATION_FACTOR_MAX {
293 return Err(ReplicationConfigError::FactorAboveMax {
294 got: self.factor,
295 max: REPLICATION_FACTOR_MAX,
296 });
297 }
298 if self.heartbeat_ms < HEARTBEAT_MS_MIN {
299 return Err(ReplicationConfigError::HeartbeatTooLow {
300 got: self.heartbeat_ms,
301 min: HEARTBEAT_MS_MIN,
302 });
303 }
304 if self.heartbeat_ms > HEARTBEAT_MS_MAX {
305 return Err(ReplicationConfigError::HeartbeatTooHigh {
306 got: self.heartbeat_ms,
307 max: HEARTBEAT_MS_MAX,
308 });
309 }
310 if !self.replication_budget_fraction.is_finite()
311 || self.replication_budget_fraction <= 0.0
312 || self.replication_budget_fraction > 1.0
313 {
314 return Err(ReplicationConfigError::BudgetFractionOutOfRange {
315 got: self.replication_budget_fraction,
316 });
317 }
318 // v0.3 Phase D2: background_fraction lives in [0.0, 1.0).
319 // 1.0 would deny every Background request unconditionally
320 // (the gate check would require `available >= 0` which
321 // every empty bucket satisfies — but the math overflows
322 // the intent; reject explicitly).
323 if !self.background_fraction.is_finite()
324 || self.background_fraction < 0.0
325 || self.background_fraction >= 1.0
326 {
327 return Err(ReplicationConfigError::BackgroundFractionOutOfRange {
328 got: self.background_fraction,
329 });
330 }
331 if let PlacementStrategy::Pinned(nodes) = &self.placement {
332 if nodes.is_empty() {
333 return Err(ReplicationConfigError::PinnedSetEmpty);
334 }
335 if nodes.len() > REPLICATION_FACTOR_MAX as usize {
336 return Err(ReplicationConfigError::PinnedSetTooLarge {
337 got: nodes.len(),
338 max: REPLICATION_FACTOR_MAX as usize,
339 });
340 }
341 // Reject duplicate `NodeId`s — pinning a node twice is
342 // a misconfig (would the coordinator try to run two
343 // local instances? Worse: half the count is silent).
344 let mut sorted = nodes.clone();
345 sorted.sort_unstable();
346 for w in sorted.windows(2) {
347 if w[0] == w[1] {
348 return Err(ReplicationConfigError::PinnedDuplicate { node_id: w[0] });
349 }
350 }
351 // If `leader_pinned` is set, it must live in the pinned
352 // set — otherwise the operator pinned a leader on a
353 // node that won't even run a replica.
354 if let Some(leader) = self.leader_pinned {
355 if !nodes.contains(&leader) {
356 return Err(ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader });
357 }
358 }
359 }
360 Ok(())
361 }
362}
363
364/// Reject reasons surfaced by [`ReplicationConfig::validate`].
365///
366/// Not `Eq` — the [`Self::BudgetFractionOutOfRange`] variant carries
367/// an `f32`, which is `PartialEq` but not `Eq`.
368#[derive(Debug, thiserror::Error, PartialEq)]
369pub enum ReplicationConfigError {
370 /// `factor` is below the [`REPLICATION_FACTOR_MIN`] floor.
371 #[error("replication factor {got} below minimum {min}")]
372 FactorBelowMin {
373 /// Configured factor value.
374 got: u8,
375 /// Minimum permitted factor.
376 min: u8,
377 },
378 /// `factor` is above the [`REPLICATION_FACTOR_MAX`] ceiling.
379 #[error("replication factor {got} above maximum {max}")]
380 FactorAboveMax {
381 /// Configured factor value.
382 got: u8,
383 /// Maximum permitted factor.
384 max: u8,
385 },
386 /// `heartbeat_ms` is below the [`HEARTBEAT_MS_MIN`] floor.
387 #[error("heartbeat_ms {got} below minimum {min} ms")]
388 HeartbeatTooLow {
389 /// Configured heartbeat cadence.
390 got: u64,
391 /// Minimum permitted heartbeat cadence.
392 min: u64,
393 },
394 /// `heartbeat_ms` is above the [`HEARTBEAT_MS_MAX`] ceiling.
395 /// Pathological values let
396 /// `heartbeat_ms.saturating_mul(miss_threshold)` saturate to
397 /// `u64::MAX`, silently disabling silence detection.
398 #[error("heartbeat_ms {got} above maximum {max} ms")]
399 HeartbeatTooHigh {
400 /// Configured heartbeat cadence.
401 got: u64,
402 /// Maximum permitted heartbeat cadence.
403 max: u64,
404 },
405 /// `replication_budget_fraction` is outside `(0.0, 1.0]` or
406 /// non-finite (NaN / ±inf).
407 #[error("replication_budget_fraction {got} outside (0.0, 1.0] or non-finite")]
408 BudgetFractionOutOfRange {
409 /// Configured budget fraction.
410 got: f32,
411 },
412 /// `background_fraction` is outside `[0.0, 1.0)` or non-finite.
413 /// 1.0 is rejected because it would deny every Background
414 /// request unconditionally.
415 #[error("background_fraction {got} outside [0.0, 1.0) or non-finite")]
416 BackgroundFractionOutOfRange {
417 /// Configured background fraction.
418 got: f32,
419 },
420 /// [`PlacementStrategy::Pinned`] supplied an empty `Vec<NodeId>`.
421 /// Pinned placement needs at least one node; if the operator
422 /// wanted "no replication" they should leave
423 /// `RedexFileConfig::replication` at `None` instead.
424 #[error("PlacementStrategy::Pinned must list at least one NodeId")]
425 PinnedSetEmpty,
426 /// [`PlacementStrategy::Pinned`] supplied more than
427 /// `REPLICATION_FACTOR_MAX` nodes.
428 #[error("PlacementStrategy::Pinned has {got} nodes; ceiling is {max}")]
429 PinnedSetTooLarge {
430 /// Number of nodes in the pinned set.
431 got: usize,
432 /// Maximum permitted pinned-set size.
433 max: usize,
434 },
435 /// A `NodeId` appears more than once in the pinned set.
436 #[error("PlacementStrategy::Pinned contains duplicate NodeId {node_id:#x}")]
437 PinnedDuplicate {
438 /// The duplicated `NodeId`.
439 node_id: NodeId,
440 },
441 /// `leader_pinned` names a node that isn't in the pinned set —
442 /// the operator pinned a leader on a node that won't even run a
443 /// replica.
444 #[error("leader_pinned {leader:#x} is not in the PlacementStrategy::Pinned set")]
445 LeaderPinnedOutsidePinnedSet {
446 /// The leader `NodeId` that lies outside the pinned set.
447 leader: NodeId,
448 },
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn default_config_validates() {
457 let cfg = ReplicationConfig::new();
458 assert_eq!(cfg.factor, REPLICATION_FACTOR_DEFAULT);
459 assert_eq!(cfg.heartbeat_ms, HEARTBEAT_MS_DEFAULT);
460 assert_eq!(cfg.placement, PlacementStrategy::Standard);
461 assert_eq!(cfg.on_under_capacity, UnderCapacity::Withdraw);
462 assert!(cfg.leader_pinned.is_none());
463 assert!((cfg.replication_budget_fraction - 0.5).abs() < f32::EPSILON);
464 cfg.validate().expect("defaults must validate");
465 }
466
467 #[test]
468 fn builder_chain_threads_through() {
469 let cfg = ReplicationConfig::new()
470 .with_factor(5)
471 .with_heartbeat_ms(250)
472 .with_placement(PlacementStrategy::ColocationStrict)
473 .with_on_under_capacity(UnderCapacity::EvictOldest)
474 .with_leader_pinned(Some(0xDEAD_BEEF))
475 .with_replication_budget_fraction(0.75);
476 assert_eq!(cfg.factor, 5);
477 assert_eq!(cfg.heartbeat_ms, 250);
478 assert_eq!(cfg.placement, PlacementStrategy::ColocationStrict);
479 assert_eq!(cfg.on_under_capacity, UnderCapacity::EvictOldest);
480 assert_eq!(cfg.leader_pinned, Some(0xDEAD_BEEF));
481 assert!((cfg.replication_budget_fraction - 0.75).abs() < f32::EPSILON);
482 cfg.validate().expect("built config must validate");
483 }
484
485 #[test]
486 fn factor_below_min_rejected() {
487 let cfg = ReplicationConfig::new().with_factor(0);
488 let err = cfg.validate().expect_err("factor=0 must fail");
489 assert!(matches!(
490 err,
491 ReplicationConfigError::FactorBelowMin { got: 0, min: 1 }
492 ));
493 }
494
495 #[test]
496 fn factor_above_max_rejected() {
497 let cfg = ReplicationConfig::new().with_factor(REPLICATION_FACTOR_MAX + 1);
498 let err = cfg.validate().expect_err("factor>max must fail");
499 assert!(matches!(
500 err,
501 ReplicationConfigError::FactorAboveMax { got: 17, max: 16 }
502 ));
503 }
504
505 #[test]
506 fn heartbeat_below_min_rejected() {
507 let cfg = ReplicationConfig::new().with_heartbeat_ms(50);
508 let err = cfg.validate().expect_err("heartbeat=50ms must fail");
509 assert!(matches!(
510 err,
511 ReplicationConfigError::HeartbeatTooLow { got: 50, min: 100 }
512 ));
513 }
514
515 #[test]
516 fn heartbeat_above_max_rejected() {
517 // Pathological config: heartbeat_ms = u64::MAX would let the
518 // tracker's `saturating_mul(heartbeat_ms, miss_threshold)`
519 // saturate to u64::MAX, making `is_leader_silent` always
520 // return false. Pin the ceiling so unit-confusion
521 // (passing microseconds instead of milliseconds) cannot
522 // disable silence detection.
523 let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX + 1);
524 let err = cfg.validate().expect_err("heartbeat above max must fail");
525 match err {
526 ReplicationConfigError::HeartbeatTooHigh { got, max } => {
527 assert_eq!(got, HEARTBEAT_MS_MAX + 1);
528 assert_eq!(max, HEARTBEAT_MS_MAX);
529 }
530 other => panic!("expected HeartbeatTooHigh, got {other:?}"),
531 }
532 }
533
534 #[test]
535 fn heartbeat_at_max_accepted() {
536 let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX);
537 cfg.validate()
538 .expect("heartbeat at the ceiling is permitted (inclusive)");
539 }
540
541 #[test]
542 fn budget_fraction_out_of_range_rejected() {
543 for bad in [0.0, -0.5, 1.5, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
544 let cfg = ReplicationConfig::new().with_replication_budget_fraction(bad);
545 let err = cfg
546 .validate()
547 .expect_err(&format!("budget={bad} must fail but didn't"));
548 assert!(
549 matches!(err, ReplicationConfigError::BudgetFractionOutOfRange { .. }),
550 "budget={bad} produced wrong error: {err:?}"
551 );
552 }
553
554 // Inverse: 1.0 (the inclusive upper) is fine.
555 ReplicationConfig::new()
556 .with_replication_budget_fraction(1.0)
557 .validate()
558 .expect("budget=1.0 is the inclusive upper bound");
559 }
560
561 #[test]
562 fn pinned_empty_rejected() {
563 let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(vec![]));
564 let err = cfg.validate().expect_err("empty pinned set must fail");
565 assert_eq!(err, ReplicationConfigError::PinnedSetEmpty);
566 }
567
568 #[test]
569 fn pinned_too_large_rejected() {
570 let nodes = (0..(REPLICATION_FACTOR_MAX as u64 + 1)).collect();
571 let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(nodes));
572 let err = cfg.validate().expect_err("oversized pinned set must fail");
573 assert!(matches!(
574 err,
575 ReplicationConfigError::PinnedSetTooLarge { got: 17, max: 16 }
576 ));
577 }
578
579 #[test]
580 fn pinned_duplicate_rejected() {
581 let cfg = ReplicationConfig::new()
582 .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xAA]));
583 let err = cfg.validate().expect_err("duplicate NodeId must fail");
584 assert!(matches!(
585 err,
586 ReplicationConfigError::PinnedDuplicate { node_id: 0xAA }
587 ));
588 }
589
590 #[test]
591 fn pinned_leader_outside_set_rejected() {
592 let cfg = ReplicationConfig::new()
593 .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
594 .with_leader_pinned(Some(0xDD));
595 let err = cfg.validate().expect_err("leader outside set must fail");
596 assert!(matches!(
597 err,
598 ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader: 0xDD }
599 ));
600 }
601
602 #[test]
603 fn pinned_leader_inside_set_validates() {
604 let cfg = ReplicationConfig::new()
605 .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
606 .with_leader_pinned(Some(0xBB));
607 cfg.validate().expect("leader in set must validate");
608 }
609
610 #[test]
611 fn pinned_leader_with_standard_placement_validates() {
612 // `leader_pinned` with `PlacementStrategy::Standard` is the
613 // explicit-publisher-elsewhere shape — leader runs on a
614 // specific node while replicas spread via the standard
615 // filter. Plan §10 calls this out as a supported topology.
616 let cfg = ReplicationConfig::new().with_leader_pinned(Some(0x1234_5678));
617 cfg.validate()
618 .expect("standard + leader_pinned must validate");
619 }
620
621 #[test]
622 fn effective_factor_honors_pinned_length() {
623 let cfg = ReplicationConfig::new()
624 .with_factor(7) // ignored
625 .with_placement(PlacementStrategy::Pinned(vec![1, 2, 3, 4]));
626 assert_eq!(cfg.effective_factor(), 4);
627 }
628
629 #[test]
630 fn effective_factor_falls_back_to_factor_for_non_pinned() {
631 let cfg = ReplicationConfig::new()
632 .with_factor(7)
633 .with_placement(PlacementStrategy::Standard);
634 assert_eq!(cfg.effective_factor(), 7);
635 let cfg = ReplicationConfig::new()
636 .with_factor(5)
637 .with_placement(PlacementStrategy::ColocationStrict);
638 assert_eq!(cfg.effective_factor(), 5);
639 }
640
641 #[test]
642 fn factor_boundary_min_and_max_validate() {
643 ReplicationConfig::new()
644 .with_factor(REPLICATION_FACTOR_MIN)
645 .validate()
646 .expect("factor=min must validate");
647 ReplicationConfig::new()
648 .with_factor(REPLICATION_FACTOR_MAX)
649 .validate()
650 .expect("factor=max must validate");
651 }
652
653 #[test]
654 fn heartbeat_at_min_validates() {
655 ReplicationConfig::new()
656 .with_heartbeat_ms(HEARTBEAT_MS_MIN)
657 .validate()
658 .expect("heartbeat=min must validate");
659 }
660}