Skip to main content

zlayer_secrets/
node_effects.rs

1//! Node-local side-effect channel fired by the Raft apply wrapper.
2//!
3//! The Raft state machine apply is pure/deterministic — every replica must
4//! reach the same `SecretsState` on the same op sequence. Filesystem and
5//! network effects that should happen on every node when a particular op
6//! applies cannot live inside `SecretsState::apply` itself. Instead the
7//! apply *wrapper* (the closure handed to the openraft `RaftStateMachine`)
8//! inspects the op post-apply and fires the corresponding handle here;
9//! the daemon owns watcher tasks that await each handle and execute the
10//! local effect (idempotently).
11//!
12//! New ops needing per-node effects should add a [`tokio::sync::Notify`]
13//! field plus matching `fire_…` / `wait_…` methods rather than fanning
14//! out into per-op types — the handle is a long-lived `Arc` shared with
15//! the apply wrapper and the daemon's watcher tasks.
16
17use std::sync::Arc;
18use tokio::sync::Notify;
19
20/// Node-local side-effect channel.
21///
22/// Held as an `Arc` shared between the Raft apply wrapper (which fires
23/// notifies post-apply) and the daemon's watcher tasks (which await them
24/// and run the local effect). All methods are cheap and lock-free.
25#[derive(Debug, Default)]
26pub struct NodeSideEffects {
27    /// Fired when `SecretsRaftOp::WipeJoinSecret` applies successfully.
28    /// The daemon's watcher deletes `{data_dir}/join_secret` on every
29    /// wake. Idempotent if the file is already absent.
30    wipe_join_secret: Notify,
31}
32
33impl NodeSideEffects {
34    /// Construct a fresh `Arc<NodeSideEffects>` ready to share with the
35    /// apply wrapper and watcher tasks.
36    #[must_use]
37    pub fn new() -> Arc<Self> {
38        Arc::new(Self::default())
39    }
40
41    /// Wake every current waiter on the `WipeJoinSecret` notify.
42    ///
43    /// If no task is currently awaiting, the fire is dropped on the
44    /// floor — the boot-time reconcile in `zlayer serve` is what
45    /// guarantees the wipe eventually happens for a node that wasn't
46    /// awaiting at apply time (e.g. snapshot install before the
47    /// watcher spawned).
48    pub fn fire_wipe_join_secret(&self) {
49        self.wipe_join_secret.notify_waiters();
50    }
51
52    /// Await the next `WipeJoinSecret` fire. Multiple concurrent
53    /// awaiters are all woken when [`Self::fire_wipe_join_secret`]
54    /// is called.
55    pub async fn wait_wipe_join_secret(&self) {
56        self.wipe_join_secret.notified().await;
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63
64    #[tokio::test]
65    async fn fire_wakes_single_waiter() {
66        let effects = NodeSideEffects::new();
67        let cloned = Arc::clone(&effects);
68        let handle = tokio::spawn(async move {
69            cloned.wait_wipe_join_secret().await;
70        });
71        // Yield so the spawned task registers its waiter before we fire.
72        tokio::task::yield_now().await;
73        effects.fire_wipe_join_secret();
74        tokio::time::timeout(std::time::Duration::from_secs(1), handle)
75            .await
76            .expect("waiter woke within timeout")
77            .expect("waiter task did not panic");
78    }
79
80    #[tokio::test]
81    async fn fire_wakes_multiple_waiters() {
82        let effects = NodeSideEffects::new();
83        let mut handles = Vec::new();
84        for _ in 0..4 {
85            let cloned = Arc::clone(&effects);
86            handles.push(tokio::spawn(async move {
87                cloned.wait_wipe_join_secret().await;
88            }));
89        }
90        // Yield until all waiters are registered. One yield is
91        // sufficient because spawned tasks run before the next
92        // poll of this task on the current-thread runtime.
93        tokio::task::yield_now().await;
94        effects.fire_wipe_join_secret();
95        for h in handles {
96            tokio::time::timeout(std::time::Duration::from_secs(1), h)
97                .await
98                .expect("waiter woke within timeout")
99                .expect("waiter task did not panic");
100        }
101    }
102
103    #[tokio::test]
104    async fn fire_without_waiter_is_harmless() {
105        let effects = NodeSideEffects::new();
106        // Fire with no current waiter; subsequent waiter must NOT
107        // observe the prior fire (documents the boot-reconcile
108        // trade-off).
109        effects.fire_wipe_join_secret();
110        let waited = tokio::time::timeout(
111            std::time::Duration::from_millis(50),
112            effects.wait_wipe_join_secret(),
113        )
114        .await;
115        assert!(
116            waited.is_err(),
117            "Notify::notify_waiters only wakes current waiters; \
118             a post-fire await must NOT complete without a fresh fire",
119        );
120    }
121}