1use std::collections::HashMap;
27use std::sync::Arc;
28
29use async_trait::async_trait;
30use tokio::sync::RwLock;
31
32use zlayer_types::storage::{NodeAffinity, ReplicatedSecret, WrappedDek};
33
34use crate::cluster_dek::ClusterDek;
35use crate::raft_sm::SecretsState;
36use crate::sealed::RecipientPrivateKey;
37use crate::{Result, Secret, SecretMetadata, SecretsError, SecretsProvider, SecretsStore};
38
39#[async_trait]
52pub trait RaftSecretsHandle: Send + Sync {
53 async fn secrets_state(&self) -> SecretsState;
57
58 async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()>;
66
67 async fn propose_delete_secret(&self, storage_key: &str) -> Result<()>;
75}
76
77pub struct RaftSecretsStore {
84 node_priv: Arc<RecipientPrivateKey>,
87
88 node_id: String,
91
92 raft: Arc<dyn RaftSecretsHandle>,
95
96 dek_cache: RwLock<Option<CachedDek>>,
99}
100
101struct CachedDek {
105 generation: u64,
106 dek: ClusterDek,
107}
108
109impl RaftSecretsStore {
110 #[must_use]
114 pub fn new(
115 node_priv: RecipientPrivateKey,
116 node_id: String,
117 raft: Arc<dyn RaftSecretsHandle>,
118 ) -> Self {
119 Self {
120 node_priv: Arc::new(node_priv),
121 node_id,
122 raft,
123 dek_cache: RwLock::new(None),
124 }
125 }
126
127 #[inline]
132 #[must_use]
133 pub fn make_key(scope: &str, name: &str) -> String {
134 format!("{scope}:{name}")
135 }
136
137 #[must_use]
148 pub fn node_allowed(node_id: &str, affinity: Option<&NodeAffinity>) -> bool {
149 match affinity {
150 None | Some(NodeAffinity::Labels { .. }) => true,
154 Some(NodeAffinity::Nodes { node_ids }) => node_ids.iter().any(|n| n == node_id),
155 }
156 }
157
158 async fn ensure_dek_for_envelope(&self, current_envelope: &WrappedDek) -> Result<()> {
168 {
170 let guard = self.dek_cache.read().await;
171 if let Some(cached) = guard.as_ref() {
172 if cached.generation == current_envelope.dek_generation {
173 return Ok(());
174 }
175 }
176 }
177
178 let wrap = current_envelope
181 .wraps
182 .get(&self.node_id)
183 .ok_or_else(|| {
184 SecretsError::Provider(format!(
185 "node {} has no wrap in current DEK (generation {}); \
186 cannot decrypt cluster secrets — re-join the cluster \
187 so the leader can re-wrap",
188 self.node_id, current_envelope.dek_generation
189 ))
190 })?
191 .clone();
192
193 let dek = ClusterDek::unwrap(&self.node_priv, &wrap)?;
194
195 let mut guard = self.dek_cache.write().await;
196 *guard = Some(CachedDek {
197 generation: current_envelope.dek_generation,
198 dek,
199 });
200 Ok(())
201 }
202
203 async fn read_inner(&self, scope: &str, name: &str) -> Result<Option<Secret>> {
214 let storage_key = Self::make_key(scope, name);
215 let state = self.raft.secrets_state().await;
216
217 let (ciphertext, dek_generation, envelope) = {
221 let Some(replicated) = state.secrets.get(&storage_key) else {
222 return Ok(None);
223 };
224 if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
225 return Ok(None);
226 }
227 let Some(envelope) = state.wrapped_dek.as_ref() else {
228 return Err(SecretsError::Provider(
229 "cluster has no DEK yet; secret cannot be decrypted".to_string(),
230 ));
231 };
232 (
233 replicated.ciphertext.clone(),
234 replicated.dek_generation,
235 envelope.clone(),
236 )
237 };
238
239 if envelope.dek_generation < dek_generation {
240 return Err(SecretsError::Provider(format!(
244 "secret {storage_key} references DEK generation {dek_generation} \
245 but current cluster DEK is older (generation {}); state is \
246 inconsistent",
247 envelope.dek_generation
248 )));
249 }
250
251 if dek_generation != envelope.dek_generation {
260 return Err(SecretsError::Provider(format!(
261 "secret {storage_key} encrypted under DEK generation {dek_generation} \
262 but current is {} — wait for rotation re-encrypt to finish",
263 envelope.dek_generation
264 )));
265 }
266
267 self.ensure_dek_for_envelope(&envelope).await?;
268
269 let guard = self.dek_cache.read().await;
270 let cached = guard.as_ref().ok_or_else(|| {
271 SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
272 })?;
273
274 let plaintext = cached.dek.decrypt(&ciphertext)?;
275 let value = std::str::from_utf8(plaintext.as_slice())
276 .map_err(|e| SecretsError::Decryption(format!("invalid UTF-8 in secret: {e}")))?;
277 Ok(Some(Secret::new(value)))
278 }
279
280 async fn encrypt_under_current(&self, plaintext: &[u8]) -> Result<(Vec<u8>, u64)> {
283 let state = self.raft.secrets_state().await;
284 let envelope = state.wrapped_dek.clone().ok_or_else(|| {
285 SecretsError::Provider(
286 "cluster has no DEK yet; cannot write secret — wait for the \
287 first node to register via propose_register_node_and_rotate"
288 .to_string(),
289 )
290 })?;
291 self.ensure_dek_for_envelope(&envelope).await?;
292 let guard = self.dek_cache.read().await;
293 let cached = guard.as_ref().ok_or_else(|| {
294 SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
295 })?;
296 let ciphertext = cached.dek.encrypt(plaintext)?;
297 Ok((ciphertext, cached.generation))
298 }
299}
300
301#[async_trait]
302impl SecretsProvider for RaftSecretsStore {
303 async fn get_secret(&self, scope: &str, name: &str) -> Result<Secret> {
304 match self.read_inner(scope, name).await? {
305 Some(secret) => Ok(secret),
306 None => Err(SecretsError::NotFound {
307 name: name.to_string(),
308 }),
309 }
310 }
311
312 async fn get_secrets(&self, scope: &str, names: &[&str]) -> Result<HashMap<String, Secret>> {
313 let mut out = HashMap::with_capacity(names.len());
314 for name in names {
315 if let Some(secret) = self.read_inner(scope, name).await? {
320 out.insert((*name).to_string(), secret);
321 }
322 }
323 Ok(out)
324 }
325
326 async fn list_secrets(&self, scope: &str) -> Result<Vec<SecretMetadata>> {
327 let state = self.raft.secrets_state().await;
328 let prefix = format!("{scope}:");
329 let mut results = Vec::new();
330 for replicated in state.secrets.values() {
331 if !replicated.storage_key.starts_with(&prefix) {
332 continue;
333 }
334 if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
337 continue;
338 }
339 results.push(replicated.metadata.clone());
342 }
343 results.sort_by(|a, b| a.name.cmp(&b.name));
344 Ok(results)
345 }
346
347 async fn exists(&self, scope: &str, name: &str) -> Result<bool> {
348 let state = self.raft.secrets_state().await;
349 let storage_key = Self::make_key(scope, name);
350 let Some(replicated) = state.secrets.get(&storage_key) else {
351 return Ok(false);
352 };
353 Ok(Self::node_allowed(
355 &self.node_id,
356 replicated.node_affinity.as_ref(),
357 ))
358 }
359}
360
361#[async_trait]
362impl SecretsStore for RaftSecretsStore {
363 async fn set_secret(&self, scope: &str, name: &str, value: &Secret) -> Result<()> {
364 let storage_key = Self::make_key(scope, name);
365
366 let existing = {
371 let state = self.raft.secrets_state().await;
372 state.secrets.get(&storage_key).cloned()
373 };
374
375 let metadata = match existing.as_ref() {
376 Some(prev) => {
377 let mut m = prev.metadata.clone();
378 m.update();
379 m
380 }
381 None => SecretMetadata::new(name),
382 };
383
384 let node_affinity = existing.as_ref().and_then(|p| p.node_affinity.clone());
389
390 let (ciphertext, dek_generation) = self
391 .encrypt_under_current(value.expose().as_bytes())
392 .await?;
393
394 let secret = ReplicatedSecret {
395 storage_key,
396 ciphertext,
397 dek_generation,
398 metadata,
399 node_affinity,
400 };
401
402 self.raft.propose_put_secret(secret).await
406 }
407
408 async fn delete_secret(&self, scope: &str, name: &str) -> Result<()> {
409 let storage_key = Self::make_key(scope, name);
410
411 let exists = {
417 let state = self.raft.secrets_state().await;
418 state.secrets.contains_key(&storage_key)
419 };
420 if !exists {
421 return Err(SecretsError::NotFound {
422 name: name.to_string(),
423 });
424 }
425
426 self.raft.propose_delete_secret(&storage_key).await
427 }
428
429 async fn rotate_secret(
430 &self,
431 scope: &str,
432 name: &str,
433 value: &Secret,
434 ) -> Result<crate::RotationResult> {
435 let storage_key = Self::make_key(scope, name);
436
437 let existing = {
442 let state = self.raft.secrets_state().await;
443 state.secrets.get(&storage_key).cloned()
444 };
445 let existing = existing.ok_or_else(|| SecretsError::NotFound {
446 name: name.to_string(),
447 })?;
448
449 let previous_version = existing.metadata.version;
450
451 let mut metadata = existing.metadata.clone();
452 metadata.update();
453 let new_version = metadata.version;
454
455 let (ciphertext, dek_generation) = self
456 .encrypt_under_current(value.expose().as_bytes())
457 .await?;
458
459 let secret = ReplicatedSecret {
460 storage_key,
461 ciphertext,
462 dek_generation,
463 metadata,
464 node_affinity: existing.node_affinity,
465 };
466
467 self.raft.propose_put_secret(secret).await?;
468
469 Ok(crate::RotationResult {
470 previous_version: Some(previous_version),
471 new_version,
472 })
473 }
474
475 async fn set_secret_with_affinity(
476 &self,
477 scope: &str,
478 name: &str,
479 value: &Secret,
480 node_affinity: Option<&NodeAffinity>,
481 ) -> Result<()> {
482 let storage_key = Self::make_key(scope, name);
483
484 let existing = {
487 let state = self.raft.secrets_state().await;
488 state.secrets.get(&storage_key).cloned()
489 };
490
491 let metadata = match existing.as_ref() {
492 Some(prev) => {
493 let mut m = prev.metadata.clone();
494 m.update();
495 m
496 }
497 None => SecretMetadata::new(name),
498 };
499
500 let resolved_affinity = match node_affinity {
507 Some(_) => node_affinity.cloned(),
508 None => existing.as_ref().and_then(|p| p.node_affinity.clone()),
509 };
510
511 let (ciphertext, dek_generation) = self
512 .encrypt_under_current(value.expose().as_bytes())
513 .await?;
514
515 let secret = ReplicatedSecret {
516 storage_key,
517 ciphertext,
518 dek_generation,
519 metadata,
520 node_affinity: resolved_affinity,
521 };
522
523 self.raft.propose_put_secret(secret).await
524 }
525
526 async fn rotate_secret_with_affinity(
527 &self,
528 scope: &str,
529 name: &str,
530 value: &Secret,
531 node_affinity: Option<&NodeAffinity>,
532 ) -> Result<crate::RotationResult> {
533 let storage_key = Self::make_key(scope, name);
534
535 let existing = {
536 let state = self.raft.secrets_state().await;
537 state.secrets.get(&storage_key).cloned()
538 };
539 let existing = existing.ok_or_else(|| SecretsError::NotFound {
540 name: name.to_string(),
541 })?;
542
543 let previous_version = existing.metadata.version;
544
545 let mut metadata = existing.metadata.clone();
546 metadata.update();
547 let new_version = metadata.version;
548
549 let (ciphertext, dek_generation) = self
550 .encrypt_under_current(value.expose().as_bytes())
551 .await?;
552
553 let resolved_affinity = match node_affinity {
555 Some(_) => node_affinity.cloned(),
556 None => existing.node_affinity,
557 };
558
559 let secret = ReplicatedSecret {
560 storage_key,
561 ciphertext,
562 dek_generation,
563 metadata,
564 node_affinity: resolved_affinity,
565 };
566
567 self.raft.propose_put_secret(secret).await?;
568
569 Ok(crate::RotationResult {
570 previous_version: Some(previous_version),
571 new_version,
572 })
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use std::sync::Mutex as StdMutex;
580
581 use chrono::Utc;
582 use zlayer_types::api::internal::SecretsRaftOp;
583 use zlayer_types::storage::NodeIdentity;
584
585 use crate::sealed::RecipientPublicKey;
586
587 struct InMemoryRaftHandle {
591 state: StdMutex<SecretsState>,
595 }
596
597 impl InMemoryRaftHandle {
598 fn new() -> Self {
599 Self {
600 state: StdMutex::new(SecretsState::default()),
601 }
602 }
603
604 fn apply(&self, op: SecretsRaftOp) {
608 let mut guard = self.state.lock().expect("state poisoned");
609 guard.apply(op).expect("apply ok");
610 }
611 }
612
613 #[async_trait]
614 impl RaftSecretsHandle for InMemoryRaftHandle {
615 async fn secrets_state(&self) -> SecretsState {
616 self.state.lock().expect("state poisoned").clone()
617 }
618
619 async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()> {
620 self.apply(SecretsRaftOp::PutSecret { secret });
621 Ok(())
622 }
623
624 async fn propose_delete_secret(&self, storage_key: &str) -> Result<()> {
625 self.apply(SecretsRaftOp::DeleteSecret {
626 storage_key: storage_key.to_string(),
627 });
628 Ok(())
629 }
630 }
631
632 fn fixture() -> (
636 Arc<InMemoryRaftHandle>,
637 RaftSecretsStore,
638 RecipientPrivateKey,
639 ) {
640 let (sk, pk) = RecipientPrivateKey::generate();
641
642 let identity = NodeIdentity {
643 node_id: "node-a".to_string(),
644 secrets_pubkey: *pk.as_bytes(),
645 wg_pubkey: "wg-a".to_string(),
646 joined_at: Utc::now(),
647 revoked_at: None,
648 };
649
650 let dek = ClusterDek::generate();
651 let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
652 recipients.insert(
653 "node-a".to_string(),
654 RecipientPublicKey::from_bytes(*pk.as_bytes()),
655 );
656 let envelope = dek.rewrap_for_set(&recipients, 1).expect("rewrap");
657
658 let handle = Arc::new(InMemoryRaftHandle::new());
659 handle.apply(SecretsRaftOp::RegisterNode { identity });
660 handle.apply(SecretsRaftOp::RotateDek {
661 new_wraps: envelope,
662 });
663
664 let store_handle: Arc<dyn RaftSecretsHandle> = handle.clone();
665 let store = RaftSecretsStore::new(sk.clone(), "node-a".to_string(), store_handle);
666 (handle, store, sk)
667 }
668
669 #[tokio::test]
670 async fn round_trip_set_get() {
671 let (_handle, store, _sk) = fixture();
672 store
673 .set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
674 .await
675 .expect("set");
676 let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
677 assert_eq!(got.expose(), "hunter2");
678 }
679
680 #[tokio::test]
681 async fn get_unknown_returns_not_found() {
682 let (_handle, store, _sk) = fixture();
683 let err = store
684 .get_secret("dep:myapp", "missing")
685 .await
686 .expect_err("should error");
687 assert!(matches!(err, SecretsError::NotFound { .. }));
688 }
689
690 #[tokio::test]
691 async fn affinity_excluded_node_returns_not_found_without_leaking() {
692 let (handle, store, _sk) = fixture();
693 let dek = ClusterDek::generate(); let cipher = dek.encrypt(b"top secret").expect("encrypt");
697 let secret = ReplicatedSecret {
698 storage_key: RaftSecretsStore::make_key("dep:myapp", "ALLOW_ELSEWHERE"),
699 ciphertext: cipher,
700 dek_generation: 1,
701 metadata: SecretMetadata::new("ALLOW_ELSEWHERE"),
702 node_affinity: Some(NodeAffinity::Nodes {
703 node_ids: vec!["node-other".to_string()],
704 }),
705 };
706 handle.apply(SecretsRaftOp::PutSecret { secret });
707
708 let err = store
711 .get_secret("dep:myapp", "ALLOW_ELSEWHERE")
712 .await
713 .expect_err("should error");
714 assert!(matches!(err, SecretsError::NotFound { .. }));
715
716 let present = store
718 .exists("dep:myapp", "ALLOW_ELSEWHERE")
719 .await
720 .expect("exists");
721 assert!(!present, "node-a must not learn the secret exists");
722
723 let listed = store.list_secrets("dep:myapp").await.expect("list");
725 assert!(
726 listed.iter().all(|m| m.name != "ALLOW_ELSEWHERE"),
727 "list must not leak affinity-excluded secrets",
728 );
729 }
730
731 #[tokio::test]
732 async fn rotate_increments_version_and_returns_correct_versions() {
733 let (_handle, store, _sk) = fixture();
734 store
735 .set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
736 .await
737 .expect("set v1");
738
739 let result = store
740 .rotate_secret("dep:myapp", "API_KEY", &Secret::new("v2"))
741 .await
742 .expect("rotate");
743 assert_eq!(result.previous_version, Some(1));
744 assert_eq!(result.new_version, 2);
745
746 let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
747 assert_eq!(got.expose(), "v2");
748 }
749
750 #[tokio::test]
751 async fn rotate_unknown_returns_not_found() {
752 let (_handle, store, _sk) = fixture();
753 let err = store
754 .rotate_secret("dep:myapp", "never-set", &Secret::new("v1"))
755 .await
756 .expect_err("should error");
757 assert!(matches!(err, SecretsError::NotFound { .. }));
758 }
759
760 #[tokio::test]
761 async fn delete_then_get_returns_not_found() {
762 let (_handle, store, _sk) = fixture();
763 store
764 .set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
765 .await
766 .expect("set");
767 store
768 .delete_secret("dep:myapp", "API_KEY")
769 .await
770 .expect("delete");
771 let err = store
772 .get_secret("dep:myapp", "API_KEY")
773 .await
774 .expect_err("should error");
775 assert!(matches!(err, SecretsError::NotFound { .. }));
776 }
777
778 #[tokio::test]
779 async fn delete_unknown_returns_not_found() {
780 let (_handle, store, _sk) = fixture();
781 let err = store
782 .delete_secret("dep:myapp", "missing")
783 .await
784 .expect_err("should error");
785 assert!(matches!(err, SecretsError::NotFound { .. }));
786 }
787
788 #[tokio::test]
789 async fn dek_rotation_is_picked_up_on_next_read() {
790 let (handle, store, _sk) = fixture();
791 store
792 .set_secret("dep:myapp", "API_KEY", &Secret::new("before"))
793 .await
794 .expect("set before");
795
796 let got = store
798 .get_secret("dep:myapp", "API_KEY")
799 .await
800 .expect("read 1");
801 assert_eq!(got.expose(), "before");
802
803 let pk_a = {
809 let s = handle.secrets_state().await;
810 RecipientPublicKey::from_bytes(s.nodes["node-a"].secrets_pubkey)
811 };
812 let new_dek = ClusterDek::generate();
813 let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
814 recipients.insert("node-a".to_string(), pk_a);
815 let envelope = new_dek.rewrap_for_set(&recipients, 2).expect("rewrap");
816 handle.apply(SecretsRaftOp::RotateDek {
817 new_wraps: envelope,
818 });
819
820 let new_cipher = new_dek.encrypt(b"before").expect("re-encrypt");
824 let updated = ReplicatedSecret {
825 storage_key: RaftSecretsStore::make_key("dep:myapp", "API_KEY"),
826 ciphertext: new_cipher,
827 dek_generation: 2,
828 metadata: SecretMetadata::new("API_KEY"),
829 node_affinity: None,
830 };
831 handle.apply(SecretsRaftOp::PutSecret { secret: updated });
832
833 let got = store
837 .get_secret("dep:myapp", "API_KEY")
838 .await
839 .expect("read 2");
840 assert_eq!(got.expose(), "before");
841 }
842
843 #[tokio::test]
844 async fn list_secrets_filters_by_scope_prefix() {
845 let (_handle, store, _sk) = fixture();
846 store
847 .set_secret("dep:app1", "A", &Secret::new("1"))
848 .await
849 .expect("set 1");
850 store
851 .set_secret("dep:app1", "B", &Secret::new("2"))
852 .await
853 .expect("set 2");
854 store
855 .set_secret("dep:app2", "C", &Secret::new("3"))
856 .await
857 .expect("set 3");
858
859 let list = store.list_secrets("dep:app1").await.expect("list 1");
860 assert_eq!(list.len(), 2);
861 let names: Vec<_> = list.iter().map(|m| m.name.as_str()).collect();
862 assert_eq!(names, vec!["A", "B"]);
863
864 let list = store.list_secrets("dep:app2").await.expect("list 2");
865 assert_eq!(list.len(), 1);
866 assert_eq!(list[0].name, "C");
867 }
868
869 #[test]
870 fn node_allowed_unrestricted() {
871 assert!(RaftSecretsStore::node_allowed("node-a", None));
872 }
873
874 #[test]
875 fn node_allowed_explicit_nodes() {
876 let aff = NodeAffinity::Nodes {
877 node_ids: vec!["node-a".to_string(), "node-b".to_string()],
878 };
879 assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
880 assert!(RaftSecretsStore::node_allowed("node-b", Some(&aff)));
881 assert!(!RaftSecretsStore::node_allowed("node-c", Some(&aff)));
882 }
883
884 #[test]
885 fn node_allowed_labels_phase_15_permissive() {
886 let aff = NodeAffinity::Labels {
888 labels: HashMap::new(),
889 };
890 assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
891 }
892
893 #[test]
894 fn make_key_matches_persistent_shape() {
895 assert_eq!(RaftSecretsStore::make_key("scope", "name"), "scope:name");
898 assert_eq!(
899 RaftSecretsStore::make_key("dep/myapp", "secret"),
900 "dep/myapp:secret"
901 );
902 }
903}