Skip to main content

zlayer_secrets/
raft_sm.rs

1//! In-memory state and apply logic for the cluster secrets state machine.
2//!
3//! Pure synchronous logic — no IO, no crypto, no async. The leader-side
4//! orchestration that decides *when* to propose [`SecretsRaftOp`] variants
5//! lives in `zlayer-scheduler`'s Raft integration; the actual crypto for
6//! wrapping/encrypting lives in `crate::cluster_dek` (added in a sibling
7//! task). This module just takes ops off the Raft log and updates local
8//! state deterministically so every replica converges on the same view.
9
10use std::collections::HashMap;
11
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14
15use zlayer_types::api::internal::SecretsRaftOp;
16use zlayer_types::storage::{NodeIdentity, ReplicatedSecret, WrappedDek};
17
18use crate::SecretsError;
19
20/// Snapshot of the cluster secrets state on this node.
21///
22/// Followers and the leader hold identical content. Snapshots
23/// (de)serialize through serde for openraft.
24#[derive(Debug, Default, Clone, Serialize, Deserialize)]
25pub struct SecretsState {
26    /// Every node ever registered, keyed by `node_id`. Soft-revocation
27    /// is recorded inline (`NodeIdentity::revoked_at`); the entry is
28    /// kept so historical wraps in old `WrappedDek` generations can still
29    /// be referenced for audit.
30    pub nodes: HashMap<String, NodeIdentity>,
31
32    /// Current cluster DEK envelope (per-node sealed-box wraps + generation).
33    /// `None` until the first `RegisterNode` + `RotateDek` pair lands.
34    pub wrapped_dek: Option<WrappedDek>,
35
36    /// Replicated secrets, keyed by their `storage_key` (`"{scope}:{name}"`).
37    pub secrets: HashMap<String, ReplicatedSecret>,
38}
39
40impl SecretsState {
41    /// Apply a Raft op to local state.
42    ///
43    /// Deterministic — every replica that sees the same op sequence must
44    /// end up with the same `SecretsState`. Returns an error only on
45    /// genuinely impossible inputs (e.g. `DeleteSecret` for an unknown
46    /// key); the leader's orchestration is expected to ensure the inputs
47    /// are well-formed before proposing.
48    ///
49    /// # Errors
50    /// - [`SecretsError::Provider`] if the op references state that
51    ///   doesn't exist (revoke unknown node, delete unknown secret).
52    pub fn apply(&mut self, op: SecretsRaftOp) -> Result<(), SecretsError> {
53        match op {
54            SecretsRaftOp::RegisterNode { identity } => {
55                // Insert; overwriting is OK (e.g. re-join after a crash before revoke).
56                self.nodes.insert(identity.node_id.clone(), identity);
57                Ok(())
58            }
59            SecretsRaftOp::RevokeNode { node_id } => {
60                let entry = self.nodes.get_mut(&node_id).ok_or_else(|| {
61                    SecretsError::Provider(format!("RevokeNode for unknown node_id: {node_id}"))
62                })?;
63                if entry.revoked_at.is_none() {
64                    entry.revoked_at = Some(Utc::now());
65                }
66                Ok(())
67            }
68            SecretsRaftOp::RotateDek { new_wraps } => {
69                // Replace wholesale. The leader is responsible for
70                // emitting a sequence of `PutSecret` re-encrypts after
71                // the rotation; followers just store the new envelope
72                // and apply re-encrypts as they arrive.
73                self.wrapped_dek = Some(new_wraps);
74                Ok(())
75            }
76            SecretsRaftOp::PutSecret { secret } => {
77                self.secrets.insert(secret.storage_key.clone(), secret);
78                Ok(())
79            }
80            SecretsRaftOp::DeleteSecret { storage_key } => {
81                self.secrets.remove(&storage_key).ok_or_else(|| {
82                    SecretsError::Provider(format!(
83                        "DeleteSecret for unknown storage_key: {storage_key}"
84                    ))
85                })?;
86                Ok(())
87            }
88        }
89    }
90
91    /// Serialize the state for an openraft snapshot. Uses JSON for now;
92    /// the consensus wire-up task may swap this for a more compact codec
93    /// once it audits whatever the scheduler SM uses.
94    ///
95    /// # Errors
96    /// - [`SecretsError::Storage`] if serialization fails.
97    pub fn snapshot(&self) -> Result<Vec<u8>, SecretsError> {
98        serde_json::to_vec(self).map_err(|e| SecretsError::Storage(format!("snapshot: {e}")))
99    }
100
101    /// Restore from a snapshot blob produced by [`Self::snapshot`].
102    ///
103    /// # Errors
104    /// - [`SecretsError::Storage`] if deserialization fails.
105    pub fn restore(bytes: &[u8]) -> Result<Self, SecretsError> {
106        serde_json::from_slice(bytes).map_err(|e| SecretsError::Storage(format!("restore: {e}")))
107    }
108
109    /// Convenience: is this node currently in the active recipient set
110    /// for the current DEK generation?
111    #[must_use]
112    pub fn node_can_decrypt(&self, node_id: &str) -> bool {
113        self.wrapped_dek
114            .as_ref()
115            .is_some_and(|w| w.wraps.contains_key(node_id))
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use chrono::TimeZone;
123    use zlayer_types::secrets::SecretMetadata;
124
125    fn make_identity(node_id: &str) -> NodeIdentity {
126        NodeIdentity {
127            node_id: node_id.to_string(),
128            secrets_pubkey: [0u8; 32],
129            wg_pubkey: format!("wg-{node_id}"),
130            joined_at: Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
131            revoked_at: None,
132        }
133    }
134
135    fn make_wrapped_dek(generation: u64, node_ids: &[&str]) -> WrappedDek {
136        let mut wraps = HashMap::new();
137        for nid in node_ids {
138            wraps.insert((*nid).to_string(), vec![0xAB, 0xCD]);
139        }
140        WrappedDek {
141            dek_generation: generation,
142            wraps,
143        }
144    }
145
146    fn make_secret(name: &str, generation: u64) -> ReplicatedSecret {
147        ReplicatedSecret {
148            storage_key: format!("dep:{name}"),
149            ciphertext: vec![1, 2, 3, 4],
150            dek_generation: generation,
151            metadata: SecretMetadata::new(name),
152            node_affinity: None,
153        }
154    }
155
156    #[test]
157    fn apply_register_node_inserts() {
158        let mut state = SecretsState::default();
159        state
160            .apply(SecretsRaftOp::RegisterNode {
161                identity: make_identity("node-a"),
162            })
163            .expect("register should succeed");
164        assert_eq!(state.nodes.len(), 1);
165        assert!(state.nodes.contains_key("node-a"));
166    }
167
168    #[test]
169    fn apply_register_node_overwrites_existing() {
170        let mut state = SecretsState::default();
171        let mut first = make_identity("node-a");
172        first.wg_pubkey = "wg-original".to_string();
173        state
174            .apply(SecretsRaftOp::RegisterNode { identity: first })
175            .expect("first register");
176
177        let mut second = make_identity("node-a");
178        second.wg_pubkey = "wg-replaced".to_string();
179        state
180            .apply(SecretsRaftOp::RegisterNode { identity: second })
181            .expect("second register should not error");
182
183        assert_eq!(state.nodes.len(), 1);
184        assert_eq!(state.nodes["node-a"].wg_pubkey, "wg-replaced");
185    }
186
187    #[test]
188    fn apply_revoke_node_marks_revoked_at() {
189        let mut state = SecretsState::default();
190        state
191            .apply(SecretsRaftOp::RegisterNode {
192                identity: make_identity("node-a"),
193            })
194            .expect("register");
195        state
196            .apply(SecretsRaftOp::RevokeNode {
197                node_id: "node-a".to_string(),
198            })
199            .expect("revoke");
200        assert!(state.nodes["node-a"].revoked_at.is_some());
201
202        // Idempotent: revoking again should not error and should not
203        // overwrite the original revocation timestamp.
204        let original_ts = state.nodes["node-a"].revoked_at;
205        state
206            .apply(SecretsRaftOp::RevokeNode {
207                node_id: "node-a".to_string(),
208            })
209            .expect("revoke again");
210        assert_eq!(state.nodes["node-a"].revoked_at, original_ts);
211    }
212
213    #[test]
214    fn apply_revoke_unknown_node_errors() {
215        let mut state = SecretsState::default();
216        let err = state
217            .apply(SecretsRaftOp::RevokeNode {
218                node_id: "missing".to_string(),
219            })
220            .expect_err("revoke unknown should fail");
221        assert!(matches!(err, SecretsError::Provider(_)), "got: {err:?}");
222    }
223
224    #[test]
225    fn apply_rotate_dek_replaces_wraps() {
226        let mut state = SecretsState::default();
227        state
228            .apply(SecretsRaftOp::RotateDek {
229                new_wraps: make_wrapped_dek(1, &["node-a"]),
230            })
231            .expect("rotate 1");
232        state
233            .apply(SecretsRaftOp::RotateDek {
234                new_wraps: make_wrapped_dek(2, &["node-a", "node-b"]),
235            })
236            .expect("rotate 2");
237        let dek = state.wrapped_dek.as_ref().expect("dek present");
238        assert_eq!(dek.dek_generation, 2);
239        assert_eq!(dek.wraps.len(), 2);
240        assert!(dek.wraps.contains_key("node-a"));
241        assert!(dek.wraps.contains_key("node-b"));
242    }
243
244    #[test]
245    fn apply_put_secret_inserts_then_overwrites() {
246        let mut state = SecretsState::default();
247        let mut first = make_secret("api-key", 1);
248        first.ciphertext = vec![0xDE, 0xAD];
249        state
250            .apply(SecretsRaftOp::PutSecret {
251                secret: first.clone(),
252            })
253            .expect("put 1");
254        assert_eq!(state.secrets.len(), 1);
255        assert_eq!(
256            state.secrets[&first.storage_key].ciphertext,
257            vec![0xDE, 0xAD]
258        );
259
260        let mut second = make_secret("api-key", 2);
261        second.ciphertext = vec![0xBE, 0xEF];
262        state
263            .apply(SecretsRaftOp::PutSecret {
264                secret: second.clone(),
265            })
266            .expect("put 2");
267        assert_eq!(state.secrets.len(), 1);
268        assert_eq!(
269            state.secrets[&second.storage_key].ciphertext,
270            vec![0xBE, 0xEF]
271        );
272        assert_eq!(state.secrets[&second.storage_key].dek_generation, 2);
273    }
274
275    #[test]
276    fn apply_delete_secret_removes() {
277        let mut state = SecretsState::default();
278        let secret = make_secret("api-key", 1);
279        let key = secret.storage_key.clone();
280        state
281            .apply(SecretsRaftOp::PutSecret { secret })
282            .expect("put");
283        state
284            .apply(SecretsRaftOp::DeleteSecret {
285                storage_key: key.clone(),
286            })
287            .expect("delete");
288        assert!(state.secrets.is_empty());
289    }
290
291    #[test]
292    fn apply_delete_unknown_secret_errors() {
293        let mut state = SecretsState::default();
294        let err = state
295            .apply(SecretsRaftOp::DeleteSecret {
296                storage_key: "dep:nope".to_string(),
297            })
298            .expect_err("delete unknown should fail");
299        assert!(matches!(err, SecretsError::Provider(_)), "got: {err:?}");
300    }
301
302    #[test]
303    fn snapshot_round_trip() {
304        let mut state = SecretsState::default();
305        state
306            .apply(SecretsRaftOp::RegisterNode {
307                identity: make_identity("node-a"),
308            })
309            .expect("register a");
310        state
311            .apply(SecretsRaftOp::RegisterNode {
312                identity: make_identity("node-b"),
313            })
314            .expect("register b");
315        state
316            .apply(SecretsRaftOp::RotateDek {
317                new_wraps: make_wrapped_dek(7, &["node-a", "node-b"]),
318            })
319            .expect("rotate");
320        state
321            .apply(SecretsRaftOp::PutSecret {
322                secret: make_secret("api-key", 7),
323            })
324            .expect("put");
325        state
326            .apply(SecretsRaftOp::RevokeNode {
327                node_id: "node-b".to_string(),
328            })
329            .expect("revoke b");
330
331        let bytes = state.snapshot().expect("snapshot ok");
332        let restored = SecretsState::restore(&bytes).expect("restore ok");
333
334        // Storage shapes don't derive PartialEq, and `HashMap` iteration
335        // order isn't stable across snapshot/restore. Compare the parsed
336        // JSON values (which match by object content rather than key
337        // insertion order) so the assertion isn't flaky.
338        let bytes2 = restored.snapshot().expect("snapshot restored ok");
339        let v1: serde_json::Value = serde_json::from_slice(&bytes).expect("parse v1");
340        let v2: serde_json::Value = serde_json::from_slice(&bytes2).expect("parse v2");
341        assert_eq!(v1, v2);
342
343        // And the restored shape exposes the same surface as the original.
344        assert_eq!(restored.nodes.len(), state.nodes.len());
345        assert_eq!(restored.secrets.len(), state.secrets.len());
346        assert_eq!(
347            restored.wrapped_dek.as_ref().map(|w| w.dek_generation),
348            state.wrapped_dek.as_ref().map(|w| w.dek_generation),
349        );
350    }
351
352    #[test]
353    fn node_can_decrypt_reflects_wraps() {
354        let mut state = SecretsState::default();
355        assert!(!state.node_can_decrypt("node-a"));
356
357        state
358            .apply(SecretsRaftOp::RotateDek {
359                new_wraps: make_wrapped_dek(1, &["node-a"]),
360            })
361            .expect("rotate include");
362        assert!(state.node_can_decrypt("node-a"));
363        assert!(!state.node_can_decrypt("node-b"));
364
365        state
366            .apply(SecretsRaftOp::RotateDek {
367                new_wraps: make_wrapped_dek(2, &["node-b"]),
368            })
369            .expect("rotate exclude a");
370        assert!(!state.node_can_decrypt("node-a"));
371        assert!(state.node_can_decrypt("node-b"));
372    }
373}