kevy_rt/replication_gate.rs
1//! Dispatch-without-emit gate — used by the server-as-replica path
2//! (Phase 1.F, T1.29) to apply frames pulled from an upstream primary
3//! without immediately re-pushing them into this shard's own
4//! `ReplicationSource`. Without the gate, a server with both an
5//! upstream link AND its own primary listener (chain replication, or
6//! the brief overlap during `REPLICAOF NO ONE` promotion) would emit
7//! every applied frame to its own downstream replicas, double-counting
8//! the offset and creating infinite chains.
9//!
10//! v1.18 explicitly forbids chain replication (see Anti-scope in the
11//! v3-cluster plan), so the gate is defensive: it documents intent
12//! and prevents the misconfig — primary + REPLICAOF together — from
13//! silently corrupting the downstream offset stream.
14//!
15//! Usage:
16//!
17//! ```ignore
18//! let _g = kevy_rt::ReplicatedApplyGuard::enter();
19//! // dispatch frame here — any post_write_housekeeping that hits
20//! // this shard's ReplicationSource is suppressed for the duration
21//! // of `_g`.
22//! ```
23//!
24//! Scope: the gate suppresses ONLY the `ReplicationSource::push_mutation`
25//! call inside [`crate::shard::Shard::post_write_housekeeping`]. AOF
26//! append, WATCH version bump, keyspace notifications, and BLOCK wakes
27//! all still fire — the local store state must remain correct for
28//! anyone reading from this server.
29
30use std::cell::Cell;
31
32thread_local! {
33 /// `true` while a replicated-apply scope is active on this thread.
34 /// Set by [`ReplicatedApplyGuard::enter`], cleared on drop.
35 static APPLYING_REPLICATED: Cell<bool> = const { Cell::new(false) };
36}
37
38/// RAII guard that marks the current thread as "applying a replicated
39/// frame" for the guard's lifetime. The replica runner (T1.29(b))
40/// enters this scope before each `dispatch` call so the apply doesn't
41/// re-push the frame into this shard's own backlog.
42pub struct ReplicatedApplyGuard {
43 /// Prior gate value — supports nesting (caller can enter a second
44 /// scope without losing the outer one's intent; drop restores).
45 prev: bool,
46}
47
48impl ReplicatedApplyGuard {
49 /// Enter a replicated-apply scope on the current thread. Nestable
50 /// — the inner guard restores the outer state on drop.
51 #[must_use = "ReplicatedApplyGuard is RAII — drop it at scope end"]
52 pub fn enter() -> Self {
53 let prev = APPLYING_REPLICATED.with(Cell::get);
54 APPLYING_REPLICATED.with(|c| c.set(true));
55 Self { prev }
56 }
57}
58
59impl Drop for ReplicatedApplyGuard {
60 fn drop(&mut self) {
61 APPLYING_REPLICATED.with(|c| c.set(self.prev));
62 }
63}
64
65/// Read the current gate value. `post_write_housekeeping` calls this
66/// inside the `Some(src)` arm to decide whether to skip the
67/// `push_mutation`.
68pub(crate) fn is_applying_replicated() -> bool {
69 APPLYING_REPLICATED.with(Cell::get)
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75
76 #[test]
77 fn default_is_off() {
78 assert!(!is_applying_replicated());
79 }
80
81 #[test]
82 fn guard_sets_then_clears() {
83 assert!(!is_applying_replicated());
84 {
85 let _g = ReplicatedApplyGuard::enter();
86 assert!(is_applying_replicated());
87 }
88 assert!(!is_applying_replicated());
89 }
90
91 #[test]
92 fn guard_nests_correctly() {
93 let _outer = ReplicatedApplyGuard::enter();
94 assert!(is_applying_replicated());
95 {
96 let _inner = ReplicatedApplyGuard::enter();
97 assert!(is_applying_replicated());
98 }
99 // Outer scope still active after inner drops.
100 assert!(is_applying_replicated());
101 }
102}