zlayer_secrets/node_effects.rs
1//! Node-local side-effect channel fired by the Raft apply wrapper.
2//!
3//! The Raft state machine apply is pure/deterministic — every replica must
4//! reach the same `SecretsState` on the same op sequence. Filesystem and
5//! network effects that should happen on every node when a particular op
6//! applies cannot live inside `SecretsState::apply` itself. Instead the
7//! apply *wrapper* (the closure handed to the openraft `RaftStateMachine`)
8//! inspects the op post-apply and fires the corresponding handle here;
9//! the daemon owns watcher tasks that await each handle and execute the
10//! local effect (idempotently).
11//!
12//! New ops needing per-node effects should add a [`tokio::sync::Notify`]
13//! field plus matching `fire_…` / `wait_…` methods rather than fanning
14//! out into per-op types — the handle is a long-lived `Arc` shared with
15//! the apply wrapper and the daemon's watcher tasks.
16
17use std::sync::Arc;
18use tokio::sync::Notify;
19
20/// Node-local side-effect channel.
21///
22/// Held as an `Arc` shared between the Raft apply wrapper (which fires
23/// notifies post-apply) and the daemon's watcher tasks (which await them
24/// and run the local effect). All methods are cheap and lock-free.
25#[derive(Debug, Default)]
26pub struct NodeSideEffects {
27 /// Fired when `SecretsRaftOp::WipeJoinSecret` applies successfully.
28 /// The daemon's watcher deletes `{data_dir}/join_secret` on every
29 /// wake. Idempotent if the file is already absent.
30 wipe_join_secret: Notify,
31}
32
33impl NodeSideEffects {
34 /// Construct a fresh `Arc<NodeSideEffects>` ready to share with the
35 /// apply wrapper and watcher tasks.
36 #[must_use]
37 pub fn new() -> Arc<Self> {
38 Arc::new(Self::default())
39 }
40
41 /// Wake every current waiter on the `WipeJoinSecret` notify.
42 ///
43 /// If no task is currently awaiting, the fire is dropped on the
44 /// floor — the boot-time reconcile in `zlayer serve` is what
45 /// guarantees the wipe eventually happens for a node that wasn't
46 /// awaiting at apply time (e.g. snapshot install before the
47 /// watcher spawned).
48 pub fn fire_wipe_join_secret(&self) {
49 self.wipe_join_secret.notify_waiters();
50 }
51
52 /// Await the next `WipeJoinSecret` fire. Multiple concurrent
53 /// awaiters are all woken when [`Self::fire_wipe_join_secret`]
54 /// is called.
55 pub async fn wait_wipe_join_secret(&self) {
56 self.wipe_join_secret.notified().await;
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63
64 #[tokio::test]
65 async fn fire_wakes_single_waiter() {
66 let effects = NodeSideEffects::new();
67 let cloned = Arc::clone(&effects);
68 let handle = tokio::spawn(async move {
69 cloned.wait_wipe_join_secret().await;
70 });
71 // Yield so the spawned task registers its waiter before we fire.
72 tokio::task::yield_now().await;
73 effects.fire_wipe_join_secret();
74 tokio::time::timeout(std::time::Duration::from_secs(1), handle)
75 .await
76 .expect("waiter woke within timeout")
77 .expect("waiter task did not panic");
78 }
79
80 #[tokio::test]
81 async fn fire_wakes_multiple_waiters() {
82 let effects = NodeSideEffects::new();
83 let mut handles = Vec::new();
84 for _ in 0..4 {
85 let cloned = Arc::clone(&effects);
86 handles.push(tokio::spawn(async move {
87 cloned.wait_wipe_join_secret().await;
88 }));
89 }
90 // Yield until all waiters are registered. One yield is
91 // sufficient because spawned tasks run before the next
92 // poll of this task on the current-thread runtime.
93 tokio::task::yield_now().await;
94 effects.fire_wipe_join_secret();
95 for h in handles {
96 tokio::time::timeout(std::time::Duration::from_secs(1), h)
97 .await
98 .expect("waiter woke within timeout")
99 .expect("waiter task did not panic");
100 }
101 }
102
103 #[tokio::test]
104 async fn fire_without_waiter_is_harmless() {
105 let effects = NodeSideEffects::new();
106 // Fire with no current waiter; subsequent waiter must NOT
107 // observe the prior fire (documents the boot-reconcile
108 // trade-off).
109 effects.fire_wipe_join_secret();
110 let waited = tokio::time::timeout(
111 std::time::Duration::from_millis(50),
112 effects.wait_wipe_join_secret(),
113 )
114 .await;
115 assert!(
116 waited.is_err(),
117 "Notify::notify_waiters only wakes current waiters; \
118 a post-fire await must NOT complete without a fresh fire",
119 );
120 }
121}