use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::RwLock;
use zlayer_types::storage::{NodeAffinity, ReplicatedSecret, WrappedDek};
use crate::cluster_dek::ClusterDek;
use crate::raft_sm::SecretsState;
use crate::sealed::RecipientPrivateKey;
use crate::{Result, Secret, SecretMetadata, SecretsError, SecretsProvider, SecretsStore};
#[async_trait]
pub trait RaftSecretsHandle: Send + Sync {
async fn secrets_state(&self) -> SecretsState;
async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()>;
async fn propose_delete_secret(&self, storage_key: &str) -> Result<()>;
}
pub struct RaftSecretsStore {
node_priv: Arc<RecipientPrivateKey>,
node_id: String,
raft: Arc<dyn RaftSecretsHandle>,
dek_cache: RwLock<Option<CachedDek>>,
}
struct CachedDek {
generation: u64,
dek: ClusterDek,
}
impl RaftSecretsStore {
#[must_use]
pub fn new(
node_priv: RecipientPrivateKey,
node_id: String,
raft: Arc<dyn RaftSecretsHandle>,
) -> Self {
Self {
node_priv: Arc::new(node_priv),
node_id,
raft,
dek_cache: RwLock::new(None),
}
}
#[inline]
#[must_use]
pub fn make_key(scope: &str, name: &str) -> String {
format!("{scope}:{name}")
}
#[must_use]
pub fn node_allowed(node_id: &str, affinity: Option<&NodeAffinity>) -> bool {
match affinity {
None | Some(NodeAffinity::Labels { .. }) => true,
Some(NodeAffinity::Nodes { node_ids }) => node_ids.iter().any(|n| n == node_id),
}
}
async fn ensure_dek_for_envelope(&self, current_envelope: &WrappedDek) -> Result<()> {
{
let guard = self.dek_cache.read().await;
if let Some(cached) = guard.as_ref() {
if cached.generation == current_envelope.dek_generation {
return Ok(());
}
}
}
let wrap = current_envelope
.wraps
.get(&self.node_id)
.ok_or_else(|| {
SecretsError::Provider(format!(
"node {} has no wrap in current DEK (generation {}); \
cannot decrypt cluster secrets — re-join the cluster \
so the leader can re-wrap",
self.node_id, current_envelope.dek_generation
))
})?
.clone();
let dek = ClusterDek::unwrap(&self.node_priv, &wrap)?;
let mut guard = self.dek_cache.write().await;
*guard = Some(CachedDek {
generation: current_envelope.dek_generation,
dek,
});
Ok(())
}
async fn read_inner(&self, scope: &str, name: &str) -> Result<Option<Secret>> {
let storage_key = Self::make_key(scope, name);
let state = self.raft.secrets_state().await;
let (ciphertext, dek_generation, envelope) = {
let Some(replicated) = state.secrets.get(&storage_key) else {
return Ok(None);
};
if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
return Ok(None);
}
let Some(envelope) = state.wrapped_dek.as_ref() else {
return Err(SecretsError::Provider(
"cluster has no DEK yet; secret cannot be decrypted".to_string(),
));
};
(
replicated.ciphertext.clone(),
replicated.dek_generation,
envelope.clone(),
)
};
if envelope.dek_generation < dek_generation {
return Err(SecretsError::Provider(format!(
"secret {storage_key} references DEK generation {dek_generation} \
but current cluster DEK is older (generation {}); state is \
inconsistent",
envelope.dek_generation
)));
}
if dek_generation != envelope.dek_generation {
return Err(SecretsError::Provider(format!(
"secret {storage_key} encrypted under DEK generation {dek_generation} \
but current is {} — wait for rotation re-encrypt to finish",
envelope.dek_generation
)));
}
self.ensure_dek_for_envelope(&envelope).await?;
let guard = self.dek_cache.read().await;
let cached = guard.as_ref().ok_or_else(|| {
SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
})?;
let plaintext = cached.dek.decrypt(&ciphertext)?;
let value = std::str::from_utf8(plaintext.as_slice())
.map_err(|e| SecretsError::Decryption(format!("invalid UTF-8 in secret: {e}")))?;
Ok(Some(Secret::new(value)))
}
async fn encrypt_under_current(&self, plaintext: &[u8]) -> Result<(Vec<u8>, u64)> {
let state = self.raft.secrets_state().await;
let envelope = state.wrapped_dek.clone().ok_or_else(|| {
SecretsError::Provider(
"cluster has no DEK yet; cannot write secret — wait for the \
first node to register via propose_register_node_and_rotate"
.to_string(),
)
})?;
self.ensure_dek_for_envelope(&envelope).await?;
let guard = self.dek_cache.read().await;
let cached = guard.as_ref().ok_or_else(|| {
SecretsError::Provider("DEK cache unexpectedly empty after refresh".to_string())
})?;
let ciphertext = cached.dek.encrypt(plaintext)?;
Ok((ciphertext, cached.generation))
}
}
#[async_trait]
impl SecretsProvider for RaftSecretsStore {
async fn get_secret(&self, scope: &str, name: &str) -> Result<Secret> {
match self.read_inner(scope, name).await? {
Some(secret) => Ok(secret),
None => Err(SecretsError::NotFound {
name: name.to_string(),
}),
}
}
async fn get_secrets(&self, scope: &str, names: &[&str]) -> Result<HashMap<String, Secret>> {
let mut out = HashMap::with_capacity(names.len());
for name in names {
if let Some(secret) = self.read_inner(scope, name).await? {
out.insert((*name).to_string(), secret);
}
}
Ok(out)
}
async fn list_secrets(&self, scope: &str) -> Result<Vec<SecretMetadata>> {
let state = self.raft.secrets_state().await;
let prefix = format!("{scope}:");
let mut results = Vec::new();
for replicated in state.secrets.values() {
if !replicated.storage_key.starts_with(&prefix) {
continue;
}
if !Self::node_allowed(&self.node_id, replicated.node_affinity.as_ref()) {
continue;
}
results.push(replicated.metadata.clone());
}
results.sort_by(|a, b| a.name.cmp(&b.name));
Ok(results)
}
async fn exists(&self, scope: &str, name: &str) -> Result<bool> {
let state = self.raft.secrets_state().await;
let storage_key = Self::make_key(scope, name);
let Some(replicated) = state.secrets.get(&storage_key) else {
return Ok(false);
};
Ok(Self::node_allowed(
&self.node_id,
replicated.node_affinity.as_ref(),
))
}
}
#[async_trait]
impl SecretsStore for RaftSecretsStore {
async fn set_secret(&self, scope: &str, name: &str, value: &Secret) -> Result<()> {
let storage_key = Self::make_key(scope, name);
let existing = {
let state = self.raft.secrets_state().await;
state.secrets.get(&storage_key).cloned()
};
let metadata = match existing.as_ref() {
Some(prev) => {
let mut m = prev.metadata.clone();
m.update();
m
}
None => SecretMetadata::new(name),
};
let node_affinity = existing.as_ref().and_then(|p| p.node_affinity.clone());
let (ciphertext, dek_generation) = self
.encrypt_under_current(value.expose().as_bytes())
.await?;
let secret = ReplicatedSecret {
storage_key,
ciphertext,
dek_generation,
metadata,
node_affinity,
};
self.raft.propose_put_secret(secret).await
}
async fn delete_secret(&self, scope: &str, name: &str) -> Result<()> {
let storage_key = Self::make_key(scope, name);
let exists = {
let state = self.raft.secrets_state().await;
state.secrets.contains_key(&storage_key)
};
if !exists {
return Err(SecretsError::NotFound {
name: name.to_string(),
});
}
self.raft.propose_delete_secret(&storage_key).await
}
async fn rotate_secret(
&self,
scope: &str,
name: &str,
value: &Secret,
) -> Result<crate::RotationResult> {
let storage_key = Self::make_key(scope, name);
let existing = {
let state = self.raft.secrets_state().await;
state.secrets.get(&storage_key).cloned()
};
let existing = existing.ok_or_else(|| SecretsError::NotFound {
name: name.to_string(),
})?;
let previous_version = existing.metadata.version;
let mut metadata = existing.metadata.clone();
metadata.update();
let new_version = metadata.version;
let (ciphertext, dek_generation) = self
.encrypt_under_current(value.expose().as_bytes())
.await?;
let secret = ReplicatedSecret {
storage_key,
ciphertext,
dek_generation,
metadata,
node_affinity: existing.node_affinity,
};
self.raft.propose_put_secret(secret).await?;
Ok(crate::RotationResult {
previous_version: Some(previous_version),
new_version,
})
}
async fn set_secret_with_affinity(
&self,
scope: &str,
name: &str,
value: &Secret,
node_affinity: Option<&NodeAffinity>,
) -> Result<()> {
let storage_key = Self::make_key(scope, name);
let existing = {
let state = self.raft.secrets_state().await;
state.secrets.get(&storage_key).cloned()
};
let metadata = match existing.as_ref() {
Some(prev) => {
let mut m = prev.metadata.clone();
m.update();
m
}
None => SecretMetadata::new(name),
};
let resolved_affinity = match node_affinity {
Some(_) => node_affinity.cloned(),
None => existing.as_ref().and_then(|p| p.node_affinity.clone()),
};
let (ciphertext, dek_generation) = self
.encrypt_under_current(value.expose().as_bytes())
.await?;
let secret = ReplicatedSecret {
storage_key,
ciphertext,
dek_generation,
metadata,
node_affinity: resolved_affinity,
};
self.raft.propose_put_secret(secret).await
}
async fn rotate_secret_with_affinity(
&self,
scope: &str,
name: &str,
value: &Secret,
node_affinity: Option<&NodeAffinity>,
) -> Result<crate::RotationResult> {
let storage_key = Self::make_key(scope, name);
let existing = {
let state = self.raft.secrets_state().await;
state.secrets.get(&storage_key).cloned()
};
let existing = existing.ok_or_else(|| SecretsError::NotFound {
name: name.to_string(),
})?;
let previous_version = existing.metadata.version;
let mut metadata = existing.metadata.clone();
metadata.update();
let new_version = metadata.version;
let (ciphertext, dek_generation) = self
.encrypt_under_current(value.expose().as_bytes())
.await?;
let resolved_affinity = match node_affinity {
Some(_) => node_affinity.cloned(),
None => existing.node_affinity,
};
let secret = ReplicatedSecret {
storage_key,
ciphertext,
dek_generation,
metadata,
node_affinity: resolved_affinity,
};
self.raft.propose_put_secret(secret).await?;
Ok(crate::RotationResult {
previous_version: Some(previous_version),
new_version,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex as StdMutex;
use chrono::Utc;
use zlayer_types::api::internal::SecretsRaftOp;
use zlayer_types::storage::NodeIdentity;
use crate::sealed::RecipientPublicKey;
struct InMemoryRaftHandle {
state: StdMutex<SecretsState>,
}
impl InMemoryRaftHandle {
fn new() -> Self {
Self {
state: StdMutex::new(SecretsState::default()),
}
}
fn apply(&self, op: SecretsRaftOp) {
let mut guard = self.state.lock().expect("state poisoned");
guard.apply(op).expect("apply ok");
}
}
#[async_trait]
impl RaftSecretsHandle for InMemoryRaftHandle {
async fn secrets_state(&self) -> SecretsState {
self.state.lock().expect("state poisoned").clone()
}
async fn propose_put_secret(&self, secret: ReplicatedSecret) -> Result<()> {
self.apply(SecretsRaftOp::PutSecret { secret });
Ok(())
}
async fn propose_delete_secret(&self, storage_key: &str) -> Result<()> {
self.apply(SecretsRaftOp::DeleteSecret {
storage_key: storage_key.to_string(),
});
Ok(())
}
}
fn fixture() -> (
Arc<InMemoryRaftHandle>,
RaftSecretsStore,
RecipientPrivateKey,
) {
let (sk, pk) = RecipientPrivateKey::generate();
let identity = NodeIdentity {
node_id: "node-a".to_string(),
secrets_pubkey: *pk.as_bytes(),
wg_pubkey: "wg-a".to_string(),
joined_at: Utc::now(),
revoked_at: None,
};
let dek = ClusterDek::generate();
let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
recipients.insert(
"node-a".to_string(),
RecipientPublicKey::from_bytes(*pk.as_bytes()),
);
let envelope = dek.rewrap_for_set(&recipients, 1).expect("rewrap");
let handle = Arc::new(InMemoryRaftHandle::new());
handle.apply(SecretsRaftOp::RegisterNode { identity });
handle.apply(SecretsRaftOp::RotateDek {
new_wraps: envelope,
});
let store_handle: Arc<dyn RaftSecretsHandle> = handle.clone();
let store = RaftSecretsStore::new(sk.clone(), "node-a".to_string(), store_handle);
(handle, store, sk)
}
#[tokio::test]
async fn round_trip_set_get() {
let (_handle, store, _sk) = fixture();
store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
.await
.expect("set");
let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
assert_eq!(got.expose(), "hunter2");
}
#[tokio::test]
async fn get_unknown_returns_not_found() {
let (_handle, store, _sk) = fixture();
let err = store
.get_secret("dep:myapp", "missing")
.await
.expect_err("should error");
assert!(matches!(err, SecretsError::NotFound { .. }));
}
#[tokio::test]
async fn affinity_excluded_node_returns_not_found_without_leaking() {
let (handle, store, _sk) = fixture();
let dek = ClusterDek::generate(); let cipher = dek.encrypt(b"top secret").expect("encrypt");
let secret = ReplicatedSecret {
storage_key: RaftSecretsStore::make_key("dep:myapp", "ALLOW_ELSEWHERE"),
ciphertext: cipher,
dek_generation: 1,
metadata: SecretMetadata::new("ALLOW_ELSEWHERE"),
node_affinity: Some(NodeAffinity::Nodes {
node_ids: vec!["node-other".to_string()],
}),
};
handle.apply(SecretsRaftOp::PutSecret { secret });
let err = store
.get_secret("dep:myapp", "ALLOW_ELSEWHERE")
.await
.expect_err("should error");
assert!(matches!(err, SecretsError::NotFound { .. }));
let present = store
.exists("dep:myapp", "ALLOW_ELSEWHERE")
.await
.expect("exists");
assert!(!present, "node-a must not learn the secret exists");
let listed = store.list_secrets("dep:myapp").await.expect("list");
assert!(
listed.iter().all(|m| m.name != "ALLOW_ELSEWHERE"),
"list must not leak affinity-excluded secrets",
);
}
#[tokio::test]
async fn rotate_increments_version_and_returns_correct_versions() {
let (_handle, store, _sk) = fixture();
store
.set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
.await
.expect("set v1");
let result = store
.rotate_secret("dep:myapp", "API_KEY", &Secret::new("v2"))
.await
.expect("rotate");
assert_eq!(result.previous_version, Some(1));
assert_eq!(result.new_version, 2);
let got = store.get_secret("dep:myapp", "API_KEY").await.expect("get");
assert_eq!(got.expose(), "v2");
}
#[tokio::test]
async fn rotate_unknown_returns_not_found() {
let (_handle, store, _sk) = fixture();
let err = store
.rotate_secret("dep:myapp", "never-set", &Secret::new("v1"))
.await
.expect_err("should error");
assert!(matches!(err, SecretsError::NotFound { .. }));
}
#[tokio::test]
async fn delete_then_get_returns_not_found() {
let (_handle, store, _sk) = fixture();
store
.set_secret("dep:myapp", "API_KEY", &Secret::new("v1"))
.await
.expect("set");
store
.delete_secret("dep:myapp", "API_KEY")
.await
.expect("delete");
let err = store
.get_secret("dep:myapp", "API_KEY")
.await
.expect_err("should error");
assert!(matches!(err, SecretsError::NotFound { .. }));
}
#[tokio::test]
async fn delete_unknown_returns_not_found() {
let (_handle, store, _sk) = fixture();
let err = store
.delete_secret("dep:myapp", "missing")
.await
.expect_err("should error");
assert!(matches!(err, SecretsError::NotFound { .. }));
}
#[tokio::test]
async fn dek_rotation_is_picked_up_on_next_read() {
let (handle, store, _sk) = fixture();
store
.set_secret("dep:myapp", "API_KEY", &Secret::new("before"))
.await
.expect("set before");
let got = store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("read 1");
assert_eq!(got.expose(), "before");
let pk_a = {
let s = handle.secrets_state().await;
RecipientPublicKey::from_bytes(s.nodes["node-a"].secrets_pubkey)
};
let new_dek = ClusterDek::generate();
let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
recipients.insert("node-a".to_string(), pk_a);
let envelope = new_dek.rewrap_for_set(&recipients, 2).expect("rewrap");
handle.apply(SecretsRaftOp::RotateDek {
new_wraps: envelope,
});
let new_cipher = new_dek.encrypt(b"before").expect("re-encrypt");
let updated = ReplicatedSecret {
storage_key: RaftSecretsStore::make_key("dep:myapp", "API_KEY"),
ciphertext: new_cipher,
dek_generation: 2,
metadata: SecretMetadata::new("API_KEY"),
node_affinity: None,
};
handle.apply(SecretsRaftOp::PutSecret { secret: updated });
let got = store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("read 2");
assert_eq!(got.expose(), "before");
}
#[tokio::test]
async fn list_secrets_filters_by_scope_prefix() {
let (_handle, store, _sk) = fixture();
store
.set_secret("dep:app1", "A", &Secret::new("1"))
.await
.expect("set 1");
store
.set_secret("dep:app1", "B", &Secret::new("2"))
.await
.expect("set 2");
store
.set_secret("dep:app2", "C", &Secret::new("3"))
.await
.expect("set 3");
let list = store.list_secrets("dep:app1").await.expect("list 1");
assert_eq!(list.len(), 2);
let names: Vec<_> = list.iter().map(|m| m.name.as_str()).collect();
assert_eq!(names, vec!["A", "B"]);
let list = store.list_secrets("dep:app2").await.expect("list 2");
assert_eq!(list.len(), 1);
assert_eq!(list[0].name, "C");
}
#[test]
fn node_allowed_unrestricted() {
assert!(RaftSecretsStore::node_allowed("node-a", None));
}
#[test]
fn node_allowed_explicit_nodes() {
let aff = NodeAffinity::Nodes {
node_ids: vec!["node-a".to_string(), "node-b".to_string()],
};
assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
assert!(RaftSecretsStore::node_allowed("node-b", Some(&aff)));
assert!(!RaftSecretsStore::node_allowed("node-c", Some(&aff)));
}
#[test]
fn node_allowed_labels_phase_15_permissive() {
let aff = NodeAffinity::Labels {
labels: HashMap::new(),
};
assert!(RaftSecretsStore::node_allowed("node-a", Some(&aff)));
}
#[test]
fn make_key_matches_persistent_shape() {
assert_eq!(RaftSecretsStore::make_key("scope", "name"), "scope:name");
assert_eq!(
RaftSecretsStore::make_key("dep/myapp", "secret"),
"dep/myapp:secret"
);
}
}