Skip to main content

zlayer_secrets/
raft_store.rs

1//! Cluster-replicated secrets store backed by openraft.
2//!
3//! Reads from the local [`SecretsState`] (a follower view of the replicated
4//! state). Writes go through the leader via the [`RaftSecretsHandle`]
5//! abstraction, which the scheduler crate's `RaftCoordinator` implements.
6//! Decryption uses the local node's X25519 private key to unwrap this
7//! node's copy of the cluster DEK; the unwrapped DEK is cached and
8//! invalidated lazily when a read notices the local
9//! `wrapped_dek.dek_generation` has moved.
10//!
11//! This is the cluster-mode counterpart to [`crate::PersistentSecretsStore`].
12//! The daemon picks one or the other at startup based on whether
13//! `--cluster` is on (Task #18).
14//!
15//! # Why a trait instead of a direct dep on `zlayer-scheduler`?
16//!
17//! `zlayer-scheduler` already depends on `zlayer-secrets` for crypto
18//! primitives ([`crate::cluster_dek::ClusterDek`]) and the SM
19//! ([`crate::raft_sm::SecretsState`]). Adding the reverse edge would
20//! create a cycle. The [`RaftSecretsHandle`] trait inverts the
21//! dependency: this module names *the operations it needs* abstractly,
22//! and the scheduler implements the trait against its `RaftCoordinator`.
23//! This also makes the store trivially mockable for unit tests (see
24//! `mod tests` below).
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use async_trait::async_trait;
30use tokio::sync::RwLock;
31
32use zlayer_types::storage::{NodeAffinity, ReplicatedSecret, WrappedDek};
33
34use crate::cluster_dek::ClusterDek;
35use crate::raft_sm::SecretsState;
36use crate::sealed::RecipientPrivateKey;
37use crate::{Result, Secret, SecretMetadata, SecretsError, SecretsProvider, SecretsStore};
38
39/// Operations [`RaftSecretsStore`] needs from a running consensus instance.
40///
41/// The scheduler crate's `RaftCoordinator` implements this trait; tests
42/// substitute an in-memory mock that applies ops directly to a local
43/// [`SecretsState`] without spinning up Raft. See
44/// `tests::InMemoryRaftHandle` below.
45///
46/// # Why not just take `Arc<RaftCoordinator>`?
47///
48/// `zlayer-scheduler -> zlayer-secrets` already exists; adding the
49/// reverse edge would close a dependency cycle. Inverting via a trait
50/// keeps the layering one-directional.
51#[async_trait]
52pub trait RaftSecretsHandle: Send + Sync {
53    /// Snapshot of the current cluster secrets state. Returned by clone
54    /// so the caller can drop any internal locks held by the
55    /// implementation before doing crypto / locking other things.
56    async fn secrets_state(&self) -> SecretsState;
57
58    /// Propose a [`SecretsRaftOp::PutSecret`]. Leader-only. Followers
59    /// surface a `not leader; redirect to: <id>` error so the API layer
60    /// can issue a redirect.
61    ///
62    /// # Errors
63    /// - [`SecretsError::Provider`] when this node is not the leader, or
64    ///   when the underlying Raft propose fails.
65    async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()>;
66
67    /// Propose a [`SecretsRaftOp::DeleteSecret`]. Leader-only with the
68    /// same redirect-on-follower semantics as
69    /// [`Self::propose_put_secret`].
70    ///
71    /// # Errors
72    /// - [`SecretsError::Provider`] when this node is not the leader, or
73    ///   when the underlying Raft propose fails.
74    async fn propose_delete_secret(&self, storage_key: &str) -> Result<()>;
75}
76
77/// Cluster-replicated secrets store.
78///
79/// Reads are served from the local Raft-replicated [`SecretsState`].
80/// Writes are proposed through the leader via [`RaftSecretsHandle`].
81/// The unwrapped cluster DEK is cached in memory and invalidated when
82/// the observed `dek_generation` changes.
83pub struct RaftSecretsStore {
84    /// Local node's X25519 private key, used to unwrap this node's copy
85    /// of the cluster DEK from the per-generation [`WrappedDek`].
86    node_priv: Arc<RecipientPrivateKey>,
87
88    /// This node's cluster UUID — needed to look up
89    /// `wrapped_dek.wraps[node_id]`.
90    node_id: String,
91
92    /// Handle to the running consensus instance. Reads pull state from
93    /// it; writes propose through it.
94    raft: Arc<dyn RaftSecretsHandle>,
95
96    /// Cached unwrapped DEK + the generation it was unwrapped from.
97    /// Lazily refreshed on read when the observed generation differs.
98    dek_cache: RwLock<Option<CachedDek>>,
99}
100
101/// One slot in the DEK cache: the unwrapped DEK plus the generation it
102/// was unwrapped from. The generation field is what `ensure_dek_current`
103/// compares against to decide whether to drop and re-unwrap.
104struct CachedDek {
105    generation: u64,
106    dek: ClusterDek,
107}
108
109impl RaftSecretsStore {
110    /// Construct a new store bound to a running [`RaftSecretsHandle`].
111    ///
112    /// The DEK cache starts empty and is populated on first read.
113    #[must_use]
114    pub fn new(
115        node_priv: RecipientPrivateKey,
116        node_id: String,
117        raft: Arc<dyn RaftSecretsHandle>,
118    ) -> Self {
119        Self {
120            node_priv: Arc::new(node_priv),
121            node_id,
122            raft,
123            dek_cache: RwLock::new(None),
124        }
125    }
126
127    /// Construct a storage key in the same shape used by
128    /// [`crate::PersistentSecretsStore`] (`"{scope}:{name}"`), so a
129    /// secret written via `RaftSecretsStore` is findable via the same
130    /// key under the persistent store and vice versa.
131    #[inline]
132    #[must_use]
133    pub fn make_key(scope: &str, name: &str) -> String {
134        format!("{scope}:{name}")
135    }
136
137    /// Is the local node currently entitled to host this secret's
138    /// decryptable form?
139    ///
140    /// - `None` affinity: any node may host. Returns `true`.
141    /// - [`NodeAffinity::Nodes`]: returns `true` iff `node_id` is in the
142    ///   allow-list.
143    /// - [`NodeAffinity::Labels`]: node-label matching is not yet
144    ///   implemented in this layer (the SM doesn't carry labels). Returns
145    ///   `true` so the read is permitted; the API gate is the
146    ///   authoritative enforcement point until labels are wired in.
147    #[must_use]
148    pub fn node_allowed(node_id: &str, affinity: Option<&NodeAffinity>) -> bool {
149        match affinity {
150            // TODO Phase 1.5: label matching. The SM has no node-label
151            // table yet; until labels are wired in, conservatively allow
152            // for both `None` and `Labels` and let the API gate enforce.
153            None | Some(NodeAffinity::Labels { .. }) => true,
154            Some(NodeAffinity::Nodes { node_ids }) => node_ids.iter().any(|n| n == node_id),
155        }
156    }
157
158    /// Refresh the cached DEK from the current `wrapped_dek` if its
159    /// generation moved (or if the cache is empty). On success, the
160    /// cache holds an unwrapped DEK pinned to the current generation.
161    ///
162    /// `current_envelope` is the envelope read from
163    /// [`SecretsState::wrapped_dek`] by the caller. Passing it in
164    /// (instead of reaching into `self.raft.secrets_state()` again)
165    /// avoids a second clone of the entire state when the caller is
166    /// already iterating over secrets.
167    async fn ensure_dek_for_envelope(&self, current_envelope: &WrappedDek) -> Result<()> {
168        // Fast path: cache hit on the right generation.
169        {
170            let guard = self.dek_cache.read().await;
171            if let Some(cached) = guard.as_ref() {
172                if cached.generation == current_envelope.dek_generation {
173                    return Ok(());
174                }
175            }
176        }
177
178        // Slow path: take the wrap bytes for this node out, drop the
179        // read lock on the envelope reference, and unwrap.
180        let wrap = current_envelope
181            .wraps
182            .get(&self.node_id)
183            .ok_or_else(|| {
184                SecretsError::Provider(format!(
185                    "node {} has no wrap in current DEK (generation {}); \
186                     cannot decrypt cluster secrets — re-join the cluster \
187                     so the leader can re-wrap",
188                    self.node_id, current_envelope.dek_generation
189                ))
190            })?
191            .clone();
192
193        let dek = ClusterDek::unwrap(&self.node_priv, &wrap)?;
194
195        let mut guard = self.dek_cache.write().await;
196        *guard = Some(CachedDek {
197            generation: current_envelope.dek_generation,
198            dek,
199        });
200        Ok(())
201    }
202
203    /// Look up the secret in the local [`SecretsState`], filter by
204    /// [`NodeAffinity`] (returning `None` when this node isn't allowed —
205    /// existence is intentionally not leaked), then unwrap the DEK as
206    /// needed and decrypt the ciphertext.
207    ///
208    /// Returns `Ok(None)` for "no such secret" and "not allowed for this
209    /// node"; the [`SecretsProvider`] trait reserves
210    /// [`SecretsError::NotFound`] for callers that want a hard "not
211    /// here" signal — the cluster store prefers `None` so the API gate
212    /// can decide between 404 and 403.
213    async fn read_inner(&self, scope: &str, name: &str) -> Result<Option<Secret>> {
214        let storage_key = Self::make_key(scope, name);
215        let state = self.raft.secrets_state().await;
216
217        // Pull out everything we need from the state snapshot, then
218        // drop it before doing any crypto so we never hold a state
219        // reference across an await on the DEK cache lock.
220        let (ciphertext, dek_generation, envelope) = {
221            let Some(replicated) = state.secrets.get(&storage_key) else {
222                return Ok(None);
223            };
224            if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
225                return Ok(None);
226            }
227            let Some(envelope) = state.wrapped_dek.as_ref() else {
228                return Err(SecretsError::Provider(
229                    "cluster has no DEK yet; secret cannot be decrypted".to_string(),
230                ));
231            };
232            (
233                replicated.ciphertext.clone(),
234                replicated.dek_generation,
235                envelope.clone(),
236            )
237        };
238
239        if envelope.dek_generation < dek_generation {
240            // Should be impossible — secrets always reference a
241            // generation <= current. Surface as a Provider error so the
242            // operator can investigate.
243            return Err(SecretsError::Provider(format!(
244                "secret {storage_key} references DEK generation {dek_generation} \
245                 but current cluster DEK is older (generation {}); state is \
246                 inconsistent",
247                envelope.dek_generation
248            )));
249        }
250
251        // The encrypt-side puts every secret on the *current* DEK at
252        // write time, and rotations re-encrypt every existing secret on
253        // commit. So in steady state, `dek_generation == envelope.dek_generation`.
254        // We assert that here defensively — if a row is left on an old
255        // generation (mid-rotation crash before re-encrypt completes),
256        // we fail the read rather than silently decrypt with the wrong
257        // key. The leader's rotation walker is responsible for cleaning
258        // up stragglers.
259        if dek_generation != envelope.dek_generation {
260            return Err(SecretsError::Provider(format!(
261                "secret {storage_key} encrypted under DEK generation {dek_generation} \
262                 but current is {} — wait for rotation re-encrypt to finish",
263                envelope.dek_generation
264            )));
265        }
266
267        self.ensure_dek_for_envelope(&envelope).await?;
268
269        let guard = self.dek_cache.read().await;
270        let cached = guard.as_ref().ok_or_else(|| {
271            SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
272        })?;
273
274        let plaintext = cached.dek.decrypt(&ciphertext)?;
275        let value = std::str::from_utf8(plaintext.as_slice())
276            .map_err(|e| SecretsError::Decryption(format!("invalid UTF-8 in secret: {e}")))?;
277        Ok(Some(Secret::new(value)))
278    }
279
280    /// Encrypt `plaintext` under the current cluster DEK, returning
281    /// `(ciphertext, generation)`.
282    async fn encrypt_under_current(&self, plaintext: &[u8]) -> Result<(Vec<u8>, u64)> {
283        let state = self.raft.secrets_state().await;
284        let envelope = state.wrapped_dek.clone().ok_or_else(|| {
285            SecretsError::Provider(
286                "cluster has no DEK yet; cannot write secret — wait for the \
287                 first node to register via propose_register_node_and_rotate"
288                    .to_string(),
289            )
290        })?;
291        self.ensure_dek_for_envelope(&envelope).await?;
292        let guard = self.dek_cache.read().await;
293        let cached = guard.as_ref().ok_or_else(|| {
294            SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
295        })?;
296        let ciphertext = cached.dek.encrypt(plaintext)?;
297        Ok((ciphertext, cached.generation))
298    }
299}
300
301#[async_trait]
302impl SecretsProvider for RaftSecretsStore {
303    async fn get_secret(&self, scope: &str, name: &str) -> Result<Secret> {
304        match self.read_inner(scope, name).await? {
305            Some(secret) => Ok(secret),
306            None => Err(SecretsError::NotFound {
307                name: name.to_string(),
308            }),
309        }
310    }
311
312    async fn get_secrets(&self, scope: &str, names: &[&str]) -> Result<HashMap<String, Secret>> {
313        let mut out = HashMap::with_capacity(names.len());
314        for name in names {
315            // Per the trait docs: missing secrets are silently omitted
316            // from the batch result (vs. erroring). `read_inner` already
317            // returns `Ok(None)` for both "doesn't exist" and "not
318            // allowed for this node", which is exactly what we want.
319            if let Some(secret) = self.read_inner(scope, name).await? {
320                out.insert((*name).to_string(), secret);
321            }
322        }
323        Ok(out)
324    }
325
326    async fn list_secrets(&self, scope: &str) -> Result<Vec<SecretMetadata>> {
327        let state = self.raft.secrets_state().await;
328        let prefix = format!("{scope}:");
329        let mut results = Vec::new();
330        for replicated in state.secrets.values() {
331            if !replicated.storage_key.starts_with(&prefix) {
332                continue;
333            }
334            // Hide secrets this node isn't entitled to host. Don't leak
335            // existence via the metadata listing.
336            if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
337                continue;
338            }
339            // Strip ciphertext / dek_generation; only return the public
340            // metadata block.
341            results.push(replicated.metadata.clone());
342        }
343        results.sort_by(|a, b| a.name.cmp(&b.name));
344        Ok(results)
345    }
346
347    async fn exists(&self, scope: &str, name: &str) -> Result<bool> {
348        let state = self.raft.secrets_state().await;
349        let storage_key = Self::make_key(scope, name);
350        let Some(replicated) = state.secrets.get(&storage_key) else {
351            return Ok(false);
352        };
353        // Don't leak existence to nodes outside the affinity set.
354        Ok(Self::node_allowed(
355            &self.node_id,
356            replicated.node_affinity.as_ref(),
357        ))
358    }
359}
360
361#[async_trait]
362impl SecretsStore for RaftSecretsStore {
363    async fn set_secret(&self, scope: &str, name: &str, value: &Secret) -> Result<()> {
364        let storage_key = Self::make_key(scope, name);
365
366        // Look up any existing replicated row to preserve metadata
367        // (created_at, version) so set_secret semantics match
368        // PersistentSecretsStore (version increments on update, stable
369        // created_at, fresh updated_at).
370        let existing = {
371            let state = self.raft.secrets_state().await;
372            state.secrets.get(&storage_key).cloned()
373        };
374
375        let metadata = match existing.as_ref() {
376            Some(prev) => {
377                let mut m = prev.metadata.clone();
378                m.update();
379                m
380            }
381            None => SecretMetadata::new(name),
382        };
383
384        // Preserve any previously-set node_affinity. set_secret() is the
385        // value-rotation path — the affinity is configured separately via
386        // a dedicated API in handlers and shouldn't be cleared by a
387        // simple value update.
388        let node_affinity = existing.as_ref().and_then(|p| p.node_affinity.clone());
389
390        let (ciphertext, dek_generation) = self
391            .encrypt_under_current(value.expose().as_bytes())
392            .await?;
393
394        let secret = ReplicatedSecret {
395            storage_key,
396            ciphertext,
397            dek_generation,
398            metadata,
399            node_affinity,
400        };
401
402        // Surface leader-redirect / propose errors verbatim — the API
403        // layer parses the "not leader; redirect to: <id>" prefix to
404        // issue an HTTP redirect.
405        self.raft.propose_put_secret(secret).await
406    }
407
408    async fn delete_secret(&self, scope: &str, name: &str) -> Result<()> {
409        let storage_key = Self::make_key(scope, name);
410
411        // Pre-flight existence check so the caller gets a clean
412        // NotFound rather than a Raft "DeleteSecret for unknown
413        // storage_key" Provider error. Race with concurrent deletes is
414        // acceptable; the underlying SM apply will return Provider in
415        // that case which we surface verbatim.
416        let exists = {
417            let state = self.raft.secrets_state().await;
418            state.secrets.contains_key(&storage_key)
419        };
420        if !exists {
421            return Err(SecretsError::NotFound {
422                name: name.to_string(),
423            });
424        }
425
426        self.raft.propose_delete_secret(&storage_key).await
427    }
428
429    async fn rotate_secret(
430        &self,
431        scope: &str,
432        name: &str,
433        value: &Secret,
434    ) -> Result<crate::RotationResult> {
435        let storage_key = Self::make_key(scope, name);
436
437        // Look up the existing record so we can return a meaningful
438        // RotationResult and bump its version cleanly. Mirrors
439        // PersistentSecretsStore: rotation requires the secret to
440        // already exist.
441        let existing = {
442            let state = self.raft.secrets_state().await;
443            state.secrets.get(&storage_key).cloned()
444        };
445        let existing = existing.ok_or_else(|| SecretsError::NotFound {
446            name: name.to_string(),
447        })?;
448
449        let previous_version = existing.metadata.version;
450
451        let mut metadata = existing.metadata.clone();
452        metadata.update();
453        let new_version = metadata.version;
454
455        let (ciphertext, dek_generation) = self
456            .encrypt_under_current(value.expose().as_bytes())
457            .await?;
458
459        let secret = ReplicatedSecret {
460            storage_key,
461            ciphertext,
462            dek_generation,
463            metadata,
464            node_affinity: existing.node_affinity,
465        };
466
467        self.raft.propose_put_secret(secret).await?;
468
469        Ok(crate::RotationResult {
470            previous_version: Some(previous_version),
471            new_version,
472        })
473    }
474
475    async fn set_secret_with_affinity(
476        &self,
477        scope: &str,
478        name: &str,
479        value: &Secret,
480        node_affinity: Option<&NodeAffinity>,
481    ) -> Result<()> {
482        let storage_key = Self::make_key(scope, name);
483
484        // Preserve metadata semantics with [`Self::set_secret`]: bump
485        // version + updated_at on update, keep created_at stable.
486        let existing = {
487            let state = self.raft.secrets_state().await;
488            state.secrets.get(&storage_key).cloned()
489        };
490
491        let metadata = match existing.as_ref() {
492            Some(prev) => {
493                let mut m = prev.metadata.clone();
494                m.update();
495                m
496            }
497            None => SecretMetadata::new(name),
498        };
499
500        // Affinity precedence:
501        //   - Some(_) from caller -> overwrite (this is the explicit
502        //     write/update path used by the API on create/rotate).
503        //   - None from caller -> preserve any previously stored selector
504        //     (matches the "leave affinity unchanged" contract on the
505        //     rotate request DTO).
506        let resolved_affinity = match node_affinity {
507            Some(_) => node_affinity.cloned(),
508            None => existing.as_ref().and_then(|p| p.node_affinity.clone()),
509        };
510
511        let (ciphertext, dek_generation) = self
512            .encrypt_under_current(value.expose().as_bytes())
513            .await?;
514
515        let secret = ReplicatedSecret {
516            storage_key,
517            ciphertext,
518            dek_generation,
519            metadata,
520            node_affinity: resolved_affinity,
521        };
522
523        self.raft.propose_put_secret(secret).await
524    }
525
526    async fn rotate_secret_with_affinity(
527        &self,
528        scope: &str,
529        name: &str,
530        value: &Secret,
531        node_affinity: Option<&NodeAffinity>,
532    ) -> Result<crate::RotationResult> {
533        let storage_key = Self::make_key(scope, name);
534
535        let existing = {
536            let state = self.raft.secrets_state().await;
537            state.secrets.get(&storage_key).cloned()
538        };
539        let existing = existing.ok_or_else(|| SecretsError::NotFound {
540            name: name.to_string(),
541        })?;
542
543        let previous_version = existing.metadata.version;
544
545        let mut metadata = existing.metadata.clone();
546        metadata.update();
547        let new_version = metadata.version;
548
549        let (ciphertext, dek_generation) = self
550            .encrypt_under_current(value.expose().as_bytes())
551            .await?;
552
553        // Same precedence as set_secret_with_affinity above.
554        let resolved_affinity = match node_affinity {
555            Some(_) => node_affinity.cloned(),
556            None => existing.node_affinity,
557        };
558
559        let secret = ReplicatedSecret {
560            storage_key,
561            ciphertext,
562            dek_generation,
563            metadata,
564            node_affinity: resolved_affinity,
565        };
566
567        self.raft.propose_put_secret(secret).await?;
568
569        Ok(crate::RotationResult {
570            previous_version: Some(previous_version),
571            new_version,
572        })
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use std::sync::Mutex as StdMutex;
580
581    use chrono::Utc;
582    use zlayer_types::api::internal::SecretsRaftOp;
583    use zlayer_types::storage::NodeIdentity;
584
585    use crate::sealed::RecipientPublicKey;
586
587    /// In-memory mock that applies ops directly to a local
588    /// [`SecretsState`] without spinning up a real Raft cluster.
589    /// Single-node "leader of one" for unit-test purposes.
590    struct InMemoryRaftHandle {
591        // StdMutex (not tokio::sync::Mutex) is fine because all the
592        // mutations are short, synchronous applies. Wrapping in async
593        // methods keeps the trait signatures.
594        state: StdMutex<SecretsState>,
595    }
596
597    impl InMemoryRaftHandle {
598        fn new() -> Self {
599            Self {
600                state: StdMutex::new(SecretsState::default()),
601            }
602        }
603
604        /// Apply an op directly (bypassing leader checks) so the test
605        /// fixture can register a node + rotate the DEK without going
606        /// through any propose path.
607        fn apply(&self, op: SecretsRaftOp) {
608            let mut guard = self.state.lock().expect("state poisoned");
609            guard.apply(op).expect("apply ok");
610        }
611    }
612
613    #[async_trait]
614    impl RaftSecretsHandle for InMemoryRaftHandle {
615        async fn secrets_state(&self) -> SecretsState {
616            self.state.lock().expect("state poisoned").clone()
617        }
618
619        async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()> {
620            self.apply(SecretsRaftOp::PutSecret { secret });
621            Ok(())
622        }
623
624        async fn propose_delete_secret(&self, storage_key: &str) -> Result<()> {
625            self.apply(SecretsRaftOp::DeleteSecret {
626                storage_key: storage_key.to_string(),
627            });
628            Ok(())
629        }
630    }
631
632    /// Build a fixture: in-memory handle pre-seeded with a single node
633    /// (`node-a`), a DEK at generation 1 wrapped to `node-a`, and a
634    /// `RaftSecretsStore` bound to that node's private key.
635    fn fixture() -> (
636        Arc<InMemoryRaftHandle>,
637        RaftSecretsStore,
638        RecipientPrivateKey,
639    ) {
640        let (sk, pk) = RecipientPrivateKey::generate();
641
642        let identity = NodeIdentity {
643            node_id: "node-a".to_string(),
644            secrets_pubkey: *pk.as_bytes(),
645            wg_pubkey: "wg-a".to_string(),
646            joined_at: Utc::now(),
647            revoked_at: None,
648        };
649
650        let dek = ClusterDek::generate();
651        let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
652        recipients.insert(
653            "node-a".to_string(),
654            RecipientPublicKey::from_bytes(*pk.as_bytes()),
655        );
656        let envelope = dek.rewrap_for_set(&recipients, 1).expect("rewrap");
657
658        let handle = Arc::new(InMemoryRaftHandle::new());
659        handle.apply(SecretsRaftOp::RegisterNode { identity });
660        handle.apply(SecretsRaftOp::RotateDek {
661            new_wraps: envelope,
662        });
663
664        let store_handle: Arc<dyn RaftSecretsHandle> = handle.clone();
665        let store = RaftSecretsStore::new(sk.clone(), "node-a".to_string(), store_handle);
666        (handle, store, sk)
667    }
668
669    #[tokio::test]
670    async fn round_trip_set_get() {
671        let (_handle, store, _sk) = fixture();
672        store
673            .set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
674            .await
675            .expect("set");
676        let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
677        assert_eq!(got.expose(), "hunter2");
678    }
679
680    #[tokio::test]
681    async fn get_unknown_returns_not_found() {
682        let (_handle, store, _sk) = fixture();
683        let err = store
684            .get_secret("dep:myapp", "missing")
685            .await
686            .expect_err("should error");
687        assert!(matches!(err, SecretsError::NotFound { .. }));
688    }
689
690    #[tokio::test]
691    async fn affinity_excluded_node_returns_not_found_without_leaking() {
692        let (handle, store, _sk) = fixture();
693        // Insert a secret with affinity restricted to a *different*
694        // node so the local node isn't allowed to see it.
695        let dek = ClusterDek::generate(); // DEK doesn't matter; we won't decrypt.
696        let cipher = dek.encrypt(b"top secret").expect("encrypt");
697        let secret = ReplicatedSecret {
698            storage_key: RaftSecretsStore::make_key("dep:myapp", "ALLOW_ELSEWHERE"),
699            ciphertext: cipher,
700            dek_generation: 1,
701            metadata: SecretMetadata::new("ALLOW_ELSEWHERE"),
702            node_affinity: Some(NodeAffinity::Nodes {
703                node_ids: vec!["node-other".to_string()],
704            }),
705        };
706        handle.apply(SecretsRaftOp::PutSecret { secret });
707
708        // get_secret -> NotFound (the get_secret wrapper turns the
709        // None from read_inner into NotFound).
710        let err = store
711            .get_secret("dep:myapp", "ALLOW_ELSEWHERE")
712            .await
713            .expect_err("should error");
714        assert!(matches!(err, SecretsError::NotFound { .. }));
715
716        // exists should also report false, not true.
717        let present = store
718            .exists("dep:myapp", "ALLOW_ELSEWHERE")
719            .await
720            .expect("exists");
721        assert!(!present, "node-a must not learn the secret exists");
722
723        // list_secrets should not include it either.
724        let listed = store.list_secrets("dep:myapp").await.expect("list");
725        assert!(
726            listed.iter().all(|m| m.name != "ALLOW_ELSEWHERE"),
727            "list must not leak affinity-excluded secrets",
728        );
729    }
730
731    #[tokio::test]
732    async fn rotate_increments_version_and_returns_correct_versions() {
733        let (_handle, store, _sk) = fixture();
734        store
735            .set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
736            .await
737            .expect("set v1");
738
739        let result = store
740            .rotate_secret("dep:myapp", "API_KEY", &Secret::new("v2"))
741            .await
742            .expect("rotate");
743        assert_eq!(result.previous_version, Some(1));
744        assert_eq!(result.new_version, 2);
745
746        let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
747        assert_eq!(got.expose(), "v2");
748    }
749
750    #[tokio::test]
751    async fn rotate_unknown_returns_not_found() {
752        let (_handle, store, _sk) = fixture();
753        let err = store
754            .rotate_secret("dep:myapp", "never-set", &Secret::new("v1"))
755            .await
756            .expect_err("should error");
757        assert!(matches!(err, SecretsError::NotFound { .. }));
758    }
759
760    #[tokio::test]
761    async fn delete_then_get_returns_not_found() {
762        let (_handle, store, _sk) = fixture();
763        store
764            .set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
765            .await
766            .expect("set");
767        store
768            .delete_secret("dep:myapp", "API_KEY")
769            .await
770            .expect("delete");
771        let err = store
772            .get_secret("dep:myapp", "API_KEY")
773            .await
774            .expect_err("should error");
775        assert!(matches!(err, SecretsError::NotFound { .. }));
776    }
777
778    #[tokio::test]
779    async fn delete_unknown_returns_not_found() {
780        let (_handle, store, _sk) = fixture();
781        let err = store
782            .delete_secret("dep:myapp", "missing")
783            .await
784            .expect_err("should error");
785        assert!(matches!(err, SecretsError::NotFound { .. }));
786    }
787
788    #[tokio::test]
789    async fn dek_rotation_is_picked_up_on_next_read() {
790        let (handle, store, _sk) = fixture();
791        store
792            .set_secret("dep:myapp", "API_KEY", &Secret::new("before"))
793            .await
794            .expect("set before");
795
796        // Prime the cache by reading once.
797        let got = store
798            .get_secret("dep:myapp", "API_KEY")
799            .await
800            .expect("read 1");
801        assert_eq!(got.expose(), "before");
802
803        // Simulate a rotation: generate a new DEK, re-wrap for the
804        // single existing node, and replay the rotation through the
805        // SM. Then re-encrypt the existing secret under the new DEK
806        // and put it back (this is what the leader's rotation walker
807        // does in propose_rotate_dek).
808        let pk_a = {
809            let s = handle.secrets_state().await;
810            RecipientPublicKey::from_bytes(s.nodes["node-a"].secrets_pubkey)
811        };
812        let new_dek = ClusterDek::generate();
813        let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
814        recipients.insert("node-a".to_string(), pk_a);
815        let envelope = new_dek.rewrap_for_set(&recipients, 2).expect("rewrap");
816        handle.apply(SecretsRaftOp::RotateDek {
817            new_wraps: envelope,
818        });
819
820        // Re-encrypt the existing secret under the new DEK so the read
821        // path's "row generation must match envelope generation" check
822        // is satisfied.
823        let new_cipher = new_dek.encrypt(b"before").expect("re-encrypt");
824        let updated = ReplicatedSecret {
825            storage_key: RaftSecretsStore::make_key("dep:myapp", "API_KEY"),
826            ciphertext: new_cipher,
827            dek_generation: 2,
828            metadata: SecretMetadata::new("API_KEY"),
829            node_affinity: None,
830        };
831        handle.apply(SecretsRaftOp::PutSecret { secret: updated });
832
833        // The cache still holds the generation-1 DEK; the next read
834        // must notice the gen mismatch, refresh, and decrypt under
835        // generation 2.
836        let got = store
837            .get_secret("dep:myapp", "API_KEY")
838            .await
839            .expect("read 2");
840        assert_eq!(got.expose(), "before");
841    }
842
843    #[tokio::test]
844    async fn list_secrets_filters_by_scope_prefix() {
845        let (_handle, store, _sk) = fixture();
846        store
847            .set_secret("dep:app1", "A", &Secret::new("1"))
848            .await
849            .expect("set 1");
850        store
851            .set_secret("dep:app1", "B", &Secret::new("2"))
852            .await
853            .expect("set 2");
854        store
855            .set_secret("dep:app2", "C", &Secret::new("3"))
856            .await
857            .expect("set 3");
858
859        let list = store.list_secrets("dep:app1").await.expect("list 1");
860        assert_eq!(list.len(), 2);
861        let names: Vec<_> = list.iter().map(|m| m.name.as_str()).collect();
862        assert_eq!(names, vec!["A", "B"]);
863
864        let list = store.list_secrets("dep:app2").await.expect("list 2");
865        assert_eq!(list.len(), 1);
866        assert_eq!(list[0].name, "C");
867    }
868
869    #[test]
870    fn node_allowed_unrestricted() {
871        assert!(RaftSecretsStore::node_allowed("node-a", None));
872    }
873
874    #[test]
875    fn node_allowed_explicit_nodes() {
876        let aff = NodeAffinity::Nodes {
877            node_ids: vec!["node-a".to_string(), "node-b".to_string()],
878        };
879        assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
880        assert!(RaftSecretsStore::node_allowed("node-b", Some(&aff)));
881        assert!(!RaftSecretsStore::node_allowed("node-c", Some(&aff)));
882    }
883
884    #[test]
885    fn node_allowed_labels_phase_15_permissive() {
886        // Labels are a Phase 1.5 follow-up; until then, any node passes.
887        let aff = NodeAffinity::Labels {
888            labels: HashMap::new(),
889        };
890        assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
891    }
892
893    #[test]
894    fn make_key_matches_persistent_shape() {
895        // Same `{scope}:{name}` shape PersistentSecretsStore uses, so
896        // a row written via either store is findable via the other.
897        assert_eq!(RaftSecretsStore::make_key("scope", "name"), "scope:name");
898        assert_eq!(
899            RaftSecretsStore::make_key("dep/myapp", "secret"),
900            "dep/myapp:secret"
901        );
902    }
903}