Skip to main content

net/adapter/net/redex/
replication_config.rs

1//! Per-channel replication configuration — Phase B opt-in for
2//! `docs/plans/REDEX_DISTRIBUTED_PLAN.md` §1.
3//!
4//! `ReplicationConfig` is the opt-in surface that turns a v1 / v2
5//! single-node `RedexFile` into a replicated channel. It lives on
6//! [`RedexFileConfig::replication`](super::RedexFileConfig) as an
7//! `Option`; the default `None` keeps every existing channel single-
8//! node (no observable change). Phase C wires the `ReplicationCoordinator`
9//! daemon's spawn path to consult this field on `Redex::open_file`.
10//!
11//! Validation is fail-fast at config-build time — pin invariants here
12//! so a malformed config can't escape into the coordinator's hot
13//! loop. The `validate()` method returns a typed `ReplicationConfigError`
14//! covering every reject path; the `with_*` builder methods are
15//! validation-free for ergonomic chaining and pair with a final
16//! `validate()` call before the config is committed to a `Redex`.
17
18use crate::adapter::net::behavior::placement::NodeId;
19
20/// Replication factor lower bound. `1` collapses to single-node-with-
21/// coordinator (the daemon runs but there's only one replica) — useful
22/// for testing and the brief moment between channel-open and the first
23/// peer joining; below `1` is meaningless.
24pub const REPLICATION_FACTOR_MIN: u8 = 1;
25/// Replication factor upper bound. The protocol allows up to 255
26/// replicas per channel (u8), but anything beyond ~16 stops scaling
27/// (heartbeat fanout dominates) and we clamp here so a misconfig
28/// can't accidentally fanout to hundreds. Operators with genuine
29/// 16+-replica workloads can plumb the override; v1 keeps the
30/// ceiling conservative.
31pub const REPLICATION_FACTOR_MAX: u8 = 16;
32
33/// Default replication factor — three is the conventional minimum
34/// for a single-leader log to tolerate one replica failure while
35/// staying in single-partition quorum-irrelevant configuration.
36pub const REPLICATION_FACTOR_DEFAULT: u8 = 3;
37
38/// Minimum heartbeat cadence. Below 100 ms heartbeat traffic
39/// dominates the channel's effective throughput; pin a floor here so
40/// a misconfig can't accidentally turn the heartbeat path into a
41/// busy-loop.
42pub const HEARTBEAT_MS_MIN: u64 = 100;
43
44/// Maximum heartbeat cadence. Five minutes is well past the longest
45/// reasonable failover detection window; values above this risk
46/// silently disabling silence detection via the
47/// `heartbeat_ms.saturating_mul(miss_threshold)` arithmetic in the
48/// heartbeat tracker, which saturates to `u64::MAX` and makes
49/// `is_leader_silent` always return `false`. Pin a sane ceiling so
50/// a typo / unit-confusion (passing μs instead of ms) can't disable
51/// the failure detector.
52pub const HEARTBEAT_MS_MAX: u64 = 300_000;
53
54/// Default heartbeat cadence — 500 ms matches the plan §1 default and
55/// the §6 "3 × heartbeat" lag bound; with three-missed hysteresis the
56/// effective failover detection window is ~1.5 s, well under the
57/// activation-gate's "< 5 s RTO" target.
58pub const HEARTBEAT_MS_DEFAULT: u64 = 500;
59
60/// Default replication-sync I/O budget as a fraction of measured NIC
61/// peak. `0.5` lets replication consume half the link without
62/// starving foreground publish traffic; tune per channel via
63/// [`ReplicationConfig::with_replication_budget_fraction`].
64pub const REPLICATION_BUDGET_FRACTION_DEFAULT: f32 = 0.5;
65
66/// v0.3 Phase D2 default: fraction of bucket capacity reserved
67/// against `Background` admission. With the default 0.3, a
68/// `Background` request needs `available >= 0.7 * capacity` to
69/// pass the gate — preserving ~70% headroom for Foreground bursts
70/// against sustained Background load.
71pub const BACKGROUND_FRACTION_DEFAULT: f32 = 0.3;
72
73/// Where replicas live and how they're chosen at channel-open time
74/// and on roster change. Mirrors the four-axis intent the plan §1
75/// pins:
76///
77/// - [`PlacementStrategy::Standard`] — let `PlacementFilter` decide
78///   (default for new channels). Reads `metadata.intent`,
79///   `metadata.colocate-with`, `scope:`, proximity, and resource-
80///   availability axes.
81/// - [`PlacementStrategy::Pinned`] — manual placement on a fixed
82///   `NodeId` set. Used for special-case topologies and tests.
83/// - [`PlacementStrategy::ColocationStrict`] — every replica must
84///   live on a node already holding the chain referenced by
85///   `metadata.colocate-with-strict`. Refuses placement on
86///   insufficient-coverage nodes.
87#[derive(Debug, Clone, Default, PartialEq, Eq)]
88pub enum PlacementStrategy {
89    /// Spread across nodes per the `PlacementFilter` primitive shipped
90    /// in The Warriors. Reads `metadata.intent`, `metadata.colocate-
91    /// with`, `scope:`, proximity, and resource-availability axes.
92    /// Default for new channels.
93    #[default]
94    Standard,
95    /// Manual pinning. Used for special-case topologies and tests.
96    /// The vector lists the exact `NodeId` set that should run
97    /// replicas — its length pins the effective replication factor
98    /// regardless of [`ReplicationConfig::factor`].
99    Pinned(Vec<NodeId>),
100    /// Strict colocation — all replicas must be on nodes already
101    /// holding the chain referenced by `metadata.colocate-with-
102    /// strict`. Refuses placement on insufficient-coverage nodes.
103    ColocationStrict,
104}
105
106/// Behavior when a replica falls below the channel's retention
107/// requirement due to local disk pressure. The leader's replication
108/// factor is a hard guarantee; replicas are best-effort under
109/// pressure.
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
111pub enum UnderCapacity {
112    /// Drop the replica role; fall through to greedy LRU if also
113    /// enabled. The channel's capability tag for this node is
114    /// withdrawn and reads re-route to the leader. **Default.**
115    #[default]
116    Withdraw,
117    /// Aggressively evict the oldest local data to maintain the
118    /// channel's retention even if total data exceeds disk. Trades
119    /// older data for keeping the replication factor intact.
120    EvictOldest,
121}
122
123/// Per-channel replication opt-in. The default-when-set
124/// [`ReplicationConfig::new`] gives sensible values (factor 3,
125/// 500 ms heartbeat, standard placement, withdraw-under-capacity,
126/// 0.5 NIC peak budget); the `with_*` builders adjust individual
127/// fields.
128///
129/// `validate()` runs the full set of invariants pinned at module top
130/// — callers should invoke it before committing the config to a
131/// `Redex` so a malformed config can't leak into the coordinator's
132/// hot loop.
133#[derive(Debug, Clone, PartialEq)]
134pub struct ReplicationConfig {
135    /// Replication factor — number of replicas (including the leader)
136    /// maintained. Must satisfy
137    /// `REPLICATION_FACTOR_MIN <= factor <= REPLICATION_FACTOR_MAX`.
138    /// Default [`REPLICATION_FACTOR_DEFAULT`].
139    pub factor: u8,
140    /// How replicas are chosen when first instantiated and on roster
141    /// change. Default [`PlacementStrategy::Standard`].
142    pub placement: PlacementStrategy,
143    /// Heartbeat interval between leader and replicas, in
144    /// milliseconds. Must satisfy
145    /// `heartbeat_ms >= HEARTBEAT_MS_MIN`. Default
146    /// [`HEARTBEAT_MS_DEFAULT`].
147    pub heartbeat_ms: u64,
148    /// Optional override pinning the leader to a specific node.
149    /// `None` = leader is the channel's natural publisher (the
150    /// `ChannelPublisher` home). When `Some(node)`, the override
151    /// applies on every leader election; the deterministic
152    /// nearest-RTT election picks `node` whenever it's healthy.
153    pub leader_pinned: Option<NodeId>,
154    /// Behavior when a replica falls below the channel's retention
155    /// requirement due to local disk pressure. Default
156    /// [`UnderCapacity::Withdraw`].
157    pub on_under_capacity: UnderCapacity,
158    /// Bandwidth budget for replication-sync I/O, as a fraction of
159    /// measured NIC peak. Must lie in `(0.0, 1.0]` and be finite.
160    /// Default [`REPLICATION_BUDGET_FRACTION_DEFAULT`].
161    pub replication_budget_fraction: f32,
162    /// v0.3 Phase D2: per-channel default
163    /// [`BandwidthClass`](super::bandwidth::BandwidthClass) the
164    /// runtime stamps on every emitted `SyncRequest` unless the
165    /// caller explicitly overrides it. Receivers honor the
166    /// per-request value in preference to this default. Default
167    /// [`BandwidthClass::Foreground`](super::bandwidth::BandwidthClass::Foreground).
168    pub default_bandwidth_class: super::bandwidth::BandwidthClass,
169    /// v0.3 Phase D2: admission-gate parameter — the fraction of
170    /// the bandwidth bucket reserved against `Background`
171    /// requests. A `Background` request is admitted only when
172    /// `available >= (1 - background_fraction) * capacity`.
173    /// Operators tune per channel: hot channels set this low
174    /// (tight Background bound, more Foreground headroom);
175    /// archival channels set it high (give Background more
176    /// room). Must lie in `[0.0, 1.0)` and be finite (1.0
177    /// would deny every Background request unconditionally and
178    /// is rejected; the v0.3 Phase D4 anti-starvation hatch one-
179    /// shot bypasses the gate after 60 s starve regardless of
180    /// the configured value). Default
181    /// [`BACKGROUND_FRACTION_DEFAULT`].
182    pub background_fraction: f32,
183}
184
185impl Default for ReplicationConfig {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191impl ReplicationConfig {
192    /// Construct a [`ReplicationConfig`] with all defaults — factor
193    /// 3, 500 ms heartbeat, standard placement, withdraw-under-
194    /// capacity, 0.5 NIC peak budget, leader = natural publisher.
195    pub fn new() -> Self {
196        Self {
197            factor: REPLICATION_FACTOR_DEFAULT,
198            placement: PlacementStrategy::default(),
199            heartbeat_ms: HEARTBEAT_MS_DEFAULT,
200            leader_pinned: None,
201            on_under_capacity: UnderCapacity::default(),
202            replication_budget_fraction: REPLICATION_BUDGET_FRACTION_DEFAULT,
203            default_bandwidth_class: super::bandwidth::BandwidthClass::Foreground,
204            background_fraction: BACKGROUND_FRACTION_DEFAULT,
205        }
206    }
207
208    /// Set the replication factor. Validate via [`Self::validate`]
209    /// before committing the config to a `Redex`.
210    pub fn with_factor(mut self, factor: u8) -> Self {
211        self.factor = factor;
212        self
213    }
214
215    /// Set the placement strategy.
216    pub fn with_placement(mut self, placement: PlacementStrategy) -> Self {
217        self.placement = placement;
218        self
219    }
220
221    /// Set the heartbeat cadence in milliseconds.
222    pub fn with_heartbeat_ms(mut self, heartbeat_ms: u64) -> Self {
223        self.heartbeat_ms = heartbeat_ms;
224        self
225    }
226
227    /// Pin the leader to a specific `NodeId`. Pass `None` to fall
228    /// back to "leader = natural publisher."
229    pub fn with_leader_pinned(mut self, leader: Option<NodeId>) -> Self {
230        self.leader_pinned = leader;
231        self
232    }
233
234    /// Set the under-capacity policy.
235    pub fn with_on_under_capacity(mut self, on_under_capacity: UnderCapacity) -> Self {
236        self.on_under_capacity = on_under_capacity;
237        self
238    }
239
240    /// Set the replication-sync I/O budget as a fraction of measured
241    /// NIC peak.
242    pub fn with_replication_budget_fraction(mut self, fraction: f32) -> Self {
243        self.replication_budget_fraction = fraction;
244        self
245    }
246
247    /// Set the per-channel default
248    /// [`BandwidthClass`](super::bandwidth::BandwidthClass) the
249    /// runtime stamps on emitted `SyncRequest` frames. v0.3 Phase
250    /// D2 — defaults to `Foreground`.
251    pub fn with_default_bandwidth_class(mut self, class: super::bandwidth::BandwidthClass) -> Self {
252        self.default_bandwidth_class = class;
253        self
254    }
255
256    /// Set the per-channel `background_fraction` — the share of
257    /// the bandwidth bucket reserved against `Background`
258    /// admission. Must lie in `[0.0, 1.0)` and be finite (1.0 is
259    /// rejected as it would deny every Background request
260    /// unconditionally). Default
261    /// [`BACKGROUND_FRACTION_DEFAULT`] (0.3).
262    pub fn with_background_fraction(mut self, fraction: f32) -> Self {
263        self.background_fraction = fraction;
264        self
265    }
266
267    /// Effective replica count — `placement` may override `factor`:
268    /// [`PlacementStrategy::Pinned`] pins the count to the length of
269    /// its `Vec<NodeId>` regardless of the configured factor (the
270    /// operator's explicit list wins over the numeric hint). All
271    /// other strategies honor `factor`.
272    pub fn effective_factor(&self) -> u8 {
273        match &self.placement {
274            PlacementStrategy::Pinned(nodes) => {
275                u8::try_from(nodes.len()).unwrap_or(REPLICATION_FACTOR_MAX)
276            }
277            _ => self.factor,
278        }
279    }
280
281    /// Run every documented invariant. Returns `Ok(())` when the
282    /// config is safe to commit; otherwise a typed
283    /// [`ReplicationConfigError`] naming the first violation. Pin
284    /// in tests; surface to operators on `Redex::open_file`.
285    pub fn validate(&self) -> Result<(), ReplicationConfigError> {
286        if self.factor < REPLICATION_FACTOR_MIN {
287            return Err(ReplicationConfigError::FactorBelowMin {
288                got: self.factor,
289                min: REPLICATION_FACTOR_MIN,
290            });
291        }
292        if self.factor > REPLICATION_FACTOR_MAX {
293            return Err(ReplicationConfigError::FactorAboveMax {
294                got: self.factor,
295                max: REPLICATION_FACTOR_MAX,
296            });
297        }
298        if self.heartbeat_ms < HEARTBEAT_MS_MIN {
299            return Err(ReplicationConfigError::HeartbeatTooLow {
300                got: self.heartbeat_ms,
301                min: HEARTBEAT_MS_MIN,
302            });
303        }
304        if self.heartbeat_ms > HEARTBEAT_MS_MAX {
305            return Err(ReplicationConfigError::HeartbeatTooHigh {
306                got: self.heartbeat_ms,
307                max: HEARTBEAT_MS_MAX,
308            });
309        }
310        if !self.replication_budget_fraction.is_finite()
311            || self.replication_budget_fraction <= 0.0
312            || self.replication_budget_fraction > 1.0
313        {
314            return Err(ReplicationConfigError::BudgetFractionOutOfRange {
315                got: self.replication_budget_fraction,
316            });
317        }
318        // v0.3 Phase D2: background_fraction lives in [0.0, 1.0).
319        // 1.0 would deny every Background request unconditionally
320        // (the gate check would require `available >= 0` which
321        // every empty bucket satisfies — but the math overflows
322        // the intent; reject explicitly).
323        if !self.background_fraction.is_finite()
324            || self.background_fraction < 0.0
325            || self.background_fraction >= 1.0
326        {
327            return Err(ReplicationConfigError::BackgroundFractionOutOfRange {
328                got: self.background_fraction,
329            });
330        }
331        if let PlacementStrategy::Pinned(nodes) = &self.placement {
332            if nodes.is_empty() {
333                return Err(ReplicationConfigError::PinnedSetEmpty);
334            }
335            if nodes.len() > REPLICATION_FACTOR_MAX as usize {
336                return Err(ReplicationConfigError::PinnedSetTooLarge {
337                    got: nodes.len(),
338                    max: REPLICATION_FACTOR_MAX as usize,
339                });
340            }
341            // Reject duplicate `NodeId`s — pinning a node twice is
342            // a misconfig (would the coordinator try to run two
343            // local instances? Worse: half the count is silent).
344            let mut sorted = nodes.clone();
345            sorted.sort_unstable();
346            for w in sorted.windows(2) {
347                if w[0] == w[1] {
348                    return Err(ReplicationConfigError::PinnedDuplicate { node_id: w[0] });
349                }
350            }
351            // If `leader_pinned` is set, it must live in the pinned
352            // set — otherwise the operator pinned a leader on a
353            // node that won't even run a replica.
354            if let Some(leader) = self.leader_pinned {
355                if !nodes.contains(&leader) {
356                    return Err(ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader });
357                }
358            }
359        }
360        Ok(())
361    }
362}
363
364/// Reject reasons surfaced by [`ReplicationConfig::validate`].
365///
366/// Not `Eq` — the [`Self::BudgetFractionOutOfRange`] variant carries
367/// an `f32`, which is `PartialEq` but not `Eq`.
368#[derive(Debug, thiserror::Error, PartialEq)]
369pub enum ReplicationConfigError {
370    /// `factor` is below the [`REPLICATION_FACTOR_MIN`] floor.
371    #[error("replication factor {got} below minimum {min}")]
372    FactorBelowMin {
373        /// Configured factor value.
374        got: u8,
375        /// Minimum permitted factor.
376        min: u8,
377    },
378    /// `factor` is above the [`REPLICATION_FACTOR_MAX`] ceiling.
379    #[error("replication factor {got} above maximum {max}")]
380    FactorAboveMax {
381        /// Configured factor value.
382        got: u8,
383        /// Maximum permitted factor.
384        max: u8,
385    },
386    /// `heartbeat_ms` is below the [`HEARTBEAT_MS_MIN`] floor.
387    #[error("heartbeat_ms {got} below minimum {min} ms")]
388    HeartbeatTooLow {
389        /// Configured heartbeat cadence.
390        got: u64,
391        /// Minimum permitted heartbeat cadence.
392        min: u64,
393    },
394    /// `heartbeat_ms` is above the [`HEARTBEAT_MS_MAX`] ceiling.
395    /// Pathological values let
396    /// `heartbeat_ms.saturating_mul(miss_threshold)` saturate to
397    /// `u64::MAX`, silently disabling silence detection.
398    #[error("heartbeat_ms {got} above maximum {max} ms")]
399    HeartbeatTooHigh {
400        /// Configured heartbeat cadence.
401        got: u64,
402        /// Maximum permitted heartbeat cadence.
403        max: u64,
404    },
405    /// `replication_budget_fraction` is outside `(0.0, 1.0]` or
406    /// non-finite (NaN / ±inf).
407    #[error("replication_budget_fraction {got} outside (0.0, 1.0] or non-finite")]
408    BudgetFractionOutOfRange {
409        /// Configured budget fraction.
410        got: f32,
411    },
412    /// `background_fraction` is outside `[0.0, 1.0)` or non-finite.
413    /// 1.0 is rejected because it would deny every Background
414    /// request unconditionally.
415    #[error("background_fraction {got} outside [0.0, 1.0) or non-finite")]
416    BackgroundFractionOutOfRange {
417        /// Configured background fraction.
418        got: f32,
419    },
420    /// [`PlacementStrategy::Pinned`] supplied an empty `Vec<NodeId>`.
421    /// Pinned placement needs at least one node; if the operator
422    /// wanted "no replication" they should leave
423    /// `RedexFileConfig::replication` at `None` instead.
424    #[error("PlacementStrategy::Pinned must list at least one NodeId")]
425    PinnedSetEmpty,
426    /// [`PlacementStrategy::Pinned`] supplied more than
427    /// `REPLICATION_FACTOR_MAX` nodes.
428    #[error("PlacementStrategy::Pinned has {got} nodes; ceiling is {max}")]
429    PinnedSetTooLarge {
430        /// Number of nodes in the pinned set.
431        got: usize,
432        /// Maximum permitted pinned-set size.
433        max: usize,
434    },
435    /// A `NodeId` appears more than once in the pinned set.
436    #[error("PlacementStrategy::Pinned contains duplicate NodeId {node_id:#x}")]
437    PinnedDuplicate {
438        /// The duplicated `NodeId`.
439        node_id: NodeId,
440    },
441    /// `leader_pinned` names a node that isn't in the pinned set —
442    /// the operator pinned a leader on a node that won't even run a
443    /// replica.
444    #[error("leader_pinned {leader:#x} is not in the PlacementStrategy::Pinned set")]
445    LeaderPinnedOutsidePinnedSet {
446        /// The leader `NodeId` that lies outside the pinned set.
447        leader: NodeId,
448    },
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn default_config_validates() {
457        let cfg = ReplicationConfig::new();
458        assert_eq!(cfg.factor, REPLICATION_FACTOR_DEFAULT);
459        assert_eq!(cfg.heartbeat_ms, HEARTBEAT_MS_DEFAULT);
460        assert_eq!(cfg.placement, PlacementStrategy::Standard);
461        assert_eq!(cfg.on_under_capacity, UnderCapacity::Withdraw);
462        assert!(cfg.leader_pinned.is_none());
463        assert!((cfg.replication_budget_fraction - 0.5).abs() < f32::EPSILON);
464        cfg.validate().expect("defaults must validate");
465    }
466
467    #[test]
468    fn builder_chain_threads_through() {
469        let cfg = ReplicationConfig::new()
470            .with_factor(5)
471            .with_heartbeat_ms(250)
472            .with_placement(PlacementStrategy::ColocationStrict)
473            .with_on_under_capacity(UnderCapacity::EvictOldest)
474            .with_leader_pinned(Some(0xDEAD_BEEF))
475            .with_replication_budget_fraction(0.75);
476        assert_eq!(cfg.factor, 5);
477        assert_eq!(cfg.heartbeat_ms, 250);
478        assert_eq!(cfg.placement, PlacementStrategy::ColocationStrict);
479        assert_eq!(cfg.on_under_capacity, UnderCapacity::EvictOldest);
480        assert_eq!(cfg.leader_pinned, Some(0xDEAD_BEEF));
481        assert!((cfg.replication_budget_fraction - 0.75).abs() < f32::EPSILON);
482        cfg.validate().expect("built config must validate");
483    }
484
485    #[test]
486    fn factor_below_min_rejected() {
487        let cfg = ReplicationConfig::new().with_factor(0);
488        let err = cfg.validate().expect_err("factor=0 must fail");
489        assert!(matches!(
490            err,
491            ReplicationConfigError::FactorBelowMin { got: 0, min: 1 }
492        ));
493    }
494
495    #[test]
496    fn factor_above_max_rejected() {
497        let cfg = ReplicationConfig::new().with_factor(REPLICATION_FACTOR_MAX + 1);
498        let err = cfg.validate().expect_err("factor>max must fail");
499        assert!(matches!(
500            err,
501            ReplicationConfigError::FactorAboveMax { got: 17, max: 16 }
502        ));
503    }
504
505    #[test]
506    fn heartbeat_below_min_rejected() {
507        let cfg = ReplicationConfig::new().with_heartbeat_ms(50);
508        let err = cfg.validate().expect_err("heartbeat=50ms must fail");
509        assert!(matches!(
510            err,
511            ReplicationConfigError::HeartbeatTooLow { got: 50, min: 100 }
512        ));
513    }
514
515    #[test]
516    fn heartbeat_above_max_rejected() {
517        // Pathological config: heartbeat_ms = u64::MAX would let the
518        // tracker's `saturating_mul(heartbeat_ms, miss_threshold)`
519        // saturate to u64::MAX, making `is_leader_silent` always
520        // return false. Pin the ceiling so unit-confusion
521        // (passing microseconds instead of milliseconds) cannot
522        // disable silence detection.
523        let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX + 1);
524        let err = cfg.validate().expect_err("heartbeat above max must fail");
525        match err {
526            ReplicationConfigError::HeartbeatTooHigh { got, max } => {
527                assert_eq!(got, HEARTBEAT_MS_MAX + 1);
528                assert_eq!(max, HEARTBEAT_MS_MAX);
529            }
530            other => panic!("expected HeartbeatTooHigh, got {other:?}"),
531        }
532    }
533
534    #[test]
535    fn heartbeat_at_max_accepted() {
536        let cfg = ReplicationConfig::new().with_heartbeat_ms(HEARTBEAT_MS_MAX);
537        cfg.validate()
538            .expect("heartbeat at the ceiling is permitted (inclusive)");
539    }
540
541    #[test]
542    fn budget_fraction_out_of_range_rejected() {
543        for bad in [0.0, -0.5, 1.5, f32::NAN, f32::INFINITY, f32::NEG_INFINITY] {
544            let cfg = ReplicationConfig::new().with_replication_budget_fraction(bad);
545            let err = cfg
546                .validate()
547                .expect_err(&format!("budget={bad} must fail but didn't"));
548            assert!(
549                matches!(err, ReplicationConfigError::BudgetFractionOutOfRange { .. }),
550                "budget={bad} produced wrong error: {err:?}"
551            );
552        }
553
554        // Inverse: 1.0 (the inclusive upper) is fine.
555        ReplicationConfig::new()
556            .with_replication_budget_fraction(1.0)
557            .validate()
558            .expect("budget=1.0 is the inclusive upper bound");
559    }
560
561    #[test]
562    fn pinned_empty_rejected() {
563        let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(vec![]));
564        let err = cfg.validate().expect_err("empty pinned set must fail");
565        assert_eq!(err, ReplicationConfigError::PinnedSetEmpty);
566    }
567
568    #[test]
569    fn pinned_too_large_rejected() {
570        let nodes = (0..(REPLICATION_FACTOR_MAX as u64 + 1)).collect();
571        let cfg = ReplicationConfig::new().with_placement(PlacementStrategy::Pinned(nodes));
572        let err = cfg.validate().expect_err("oversized pinned set must fail");
573        assert!(matches!(
574            err,
575            ReplicationConfigError::PinnedSetTooLarge { got: 17, max: 16 }
576        ));
577    }
578
579    #[test]
580    fn pinned_duplicate_rejected() {
581        let cfg = ReplicationConfig::new()
582            .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xAA]));
583        let err = cfg.validate().expect_err("duplicate NodeId must fail");
584        assert!(matches!(
585            err,
586            ReplicationConfigError::PinnedDuplicate { node_id: 0xAA }
587        ));
588    }
589
590    #[test]
591    fn pinned_leader_outside_set_rejected() {
592        let cfg = ReplicationConfig::new()
593            .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
594            .with_leader_pinned(Some(0xDD));
595        let err = cfg.validate().expect_err("leader outside set must fail");
596        assert!(matches!(
597            err,
598            ReplicationConfigError::LeaderPinnedOutsidePinnedSet { leader: 0xDD }
599        ));
600    }
601
602    #[test]
603    fn pinned_leader_inside_set_validates() {
604        let cfg = ReplicationConfig::new()
605            .with_placement(PlacementStrategy::Pinned(vec![0xAA, 0xBB, 0xCC]))
606            .with_leader_pinned(Some(0xBB));
607        cfg.validate().expect("leader in set must validate");
608    }
609
610    #[test]
611    fn pinned_leader_with_standard_placement_validates() {
612        // `leader_pinned` with `PlacementStrategy::Standard` is the
613        // explicit-publisher-elsewhere shape — leader runs on a
614        // specific node while replicas spread via the standard
615        // filter. Plan §10 calls this out as a supported topology.
616        let cfg = ReplicationConfig::new().with_leader_pinned(Some(0x1234_5678));
617        cfg.validate()
618            .expect("standard + leader_pinned must validate");
619    }
620
621    #[test]
622    fn effective_factor_honors_pinned_length() {
623        let cfg = ReplicationConfig::new()
624            .with_factor(7) // ignored
625            .with_placement(PlacementStrategy::Pinned(vec![1, 2, 3, 4]));
626        assert_eq!(cfg.effective_factor(), 4);
627    }
628
629    #[test]
630    fn effective_factor_falls_back_to_factor_for_non_pinned() {
631        let cfg = ReplicationConfig::new()
632            .with_factor(7)
633            .with_placement(PlacementStrategy::Standard);
634        assert_eq!(cfg.effective_factor(), 7);
635        let cfg = ReplicationConfig::new()
636            .with_factor(5)
637            .with_placement(PlacementStrategy::ColocationStrict);
638        assert_eq!(cfg.effective_factor(), 5);
639    }
640
641    #[test]
642    fn factor_boundary_min_and_max_validate() {
643        ReplicationConfig::new()
644            .with_factor(REPLICATION_FACTOR_MIN)
645            .validate()
646            .expect("factor=min must validate");
647        ReplicationConfig::new()
648            .with_factor(REPLICATION_FACTOR_MAX)
649            .validate()
650            .expect("factor=max must validate");
651    }
652
653    #[test]
654    fn heartbeat_at_min_validates() {
655        ReplicationConfig::new()
656            .with_heartbeat_ms(HEARTBEAT_MS_MIN)
657            .validate()
658            .expect("heartbeat=min must validate");
659    }
660}