Skip to main content

kevy_rt/
replication_gate.rs

1//! Dispatch-without-emit gate — used by the server-as-replica path
2//! (Phase 1.F, T1.29) to apply frames pulled from an upstream primary
3//! without immediately re-pushing them into this shard's own
4//! `ReplicationSource`. Without the gate, a server with both an
5//! upstream link AND its own primary listener (chain replication, or
6//! the brief overlap during `REPLICAOF NO ONE` promotion) would emit
7//! every applied frame to its own downstream replicas, double-counting
8//! the offset and creating infinite chains.
9//!
10//! v1.18 explicitly forbids chain replication (see Anti-scope in the
11//! v3-cluster plan), so the gate is defensive: it documents intent
12//! and prevents the misconfig — primary + REPLICAOF together — from
13//! silently corrupting the downstream offset stream.
14//!
15//! Usage:
16//!
17//! ```ignore
18//! let _g = kevy_rt::ReplicatedApplyGuard::enter();
19//! // dispatch frame here — any post_write_housekeeping that hits
20//! // this shard's ReplicationSource is suppressed for the duration
21//! // of `_g`.
22//! ```
23//!
24//! Scope: the gate suppresses ONLY the `ReplicationSource::push_mutation`
25//! call inside [`crate::shard::Shard::post_write_housekeeping`]. AOF
26//! append, WATCH version bump, keyspace notifications, and BLOCK wakes
27//! all still fire — the local store state must remain correct for
28//! anyone reading from this server.
29
30use std::cell::Cell;
31
32thread_local! {
33    /// `true` while a replicated-apply scope is active on this thread.
34    /// Set by [`ReplicatedApplyGuard::enter`], cleared on drop.
35    static APPLYING_REPLICATED: Cell<bool> = const { Cell::new(false) };
36}
37
38/// RAII guard that marks the current thread as "applying a replicated
39/// frame" for the guard's lifetime. The replica runner (T1.29(b))
40/// enters this scope before each `dispatch` call so the apply doesn't
41/// re-push the frame into this shard's own backlog.
42pub struct ReplicatedApplyGuard {
43    /// Prior gate value — supports nesting (caller can enter a second
44    /// scope without losing the outer one's intent; drop restores).
45    prev: bool,
46}
47
48impl ReplicatedApplyGuard {
49    /// Enter a replicated-apply scope on the current thread. Nestable
50    /// — the inner guard restores the outer state on drop.
51    #[must_use = "ReplicatedApplyGuard is RAII — drop it at scope end"]
52    pub fn enter() -> Self {
53        let prev = APPLYING_REPLICATED.with(Cell::get);
54        APPLYING_REPLICATED.with(|c| c.set(true));
55        Self { prev }
56    }
57}
58
59impl Drop for ReplicatedApplyGuard {
60    fn drop(&mut self) {
61        APPLYING_REPLICATED.with(|c| c.set(self.prev));
62    }
63}
64
65/// Read the current gate value. `post_write_housekeeping` calls this
66/// inside the `Some(src)` arm to decide whether to skip the
67/// `push_mutation`.
68pub(crate) fn is_applying_replicated() -> bool {
69    APPLYING_REPLICATED.with(Cell::get)
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    #[test]
77    fn default_is_off() {
78        assert!(!is_applying_replicated());
79    }
80
81    #[test]
82    fn guard_sets_then_clears() {
83        assert!(!is_applying_replicated());
84        {
85            let _g = ReplicatedApplyGuard::enter();
86            assert!(is_applying_replicated());
87        }
88        assert!(!is_applying_replicated());
89    }
90
91    #[test]
92    fn guard_nests_correctly() {
93        let _outer = ReplicatedApplyGuard::enter();
94        assert!(is_applying_replicated());
95        {
96            let _inner = ReplicatedApplyGuard::enter();
97            assert!(is_applying_replicated());
98        }
99        // Outer scope still active after inner drops.
100        assert!(is_applying_replicated());
101    }
102}