#![cfg(feature = "persistent")]
#![allow(clippy::expect_used)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex as StdMutex};
use async_trait::async_trait;
use chrono::Utc;
use zlayer_secrets::cluster_dek::ClusterDek;
use zlayer_secrets::sealed::{RecipientPrivateKey, RecipientPublicKey};
use zlayer_secrets::{
RaftSecretsHandle, RaftSecretsStore, Secret, SecretsError, SecretsProvider, SecretsState,
SecretsStore,
};
use zlayer_types::api::internal::SecretsRaftOp;
use zlayer_types::storage::{NodeIdentity, ReplicatedSecret};
struct TestNode {
node_id: String,
sk: RecipientPrivateKey,
pk: RecipientPublicKey,
store: RaftSecretsStore,
}
struct SharedRaftHandle {
state: StdMutex<SecretsState>,
}
impl SharedRaftHandle {
fn new() -> Arc<Self> {
Arc::new(Self {
state: StdMutex::new(SecretsState::default()),
})
}
fn apply_local(&self, op: SecretsRaftOp) {
let mut guard = self.state.lock().expect("state poisoned");
guard.apply(op).expect("apply ok");
}
fn snapshot(&self) -> SecretsState {
self.state.lock().expect("state poisoned").clone()
}
}
#[async_trait]
impl RaftSecretsHandle for SharedRaftHandle {
async fn secrets_state(&self) -> SecretsState {
self.snapshot()
}
async fn propose_put_secret(&self, secret: ReplicatedSecret) -> zlayer_secrets::Result<()> {
self.apply_local(SecretsRaftOp::PutSecret { secret });
Ok(())
}
async fn propose_delete_secret(&self, storage_key: &str) -> zlayer_secrets::Result<()> {
self.apply_local(SecretsRaftOp::DeleteSecret {
storage_key: storage_key.to_string(),
});
Ok(())
}
}
struct ThreeNodeCluster {
handle: Arc<SharedRaftHandle>,
nodes: Vec<TestNode>,
}
impl ThreeNodeCluster {
fn new() -> Self {
let handle = SharedRaftHandle::new();
let store_handle: Arc<dyn RaftSecretsHandle> = handle.clone();
let mut nodes = Vec::with_capacity(3);
for label in ["node-a", "node-b", "node-c"] {
let (sk, pk) = RecipientPrivateKey::generate();
let store =
RaftSecretsStore::new(sk.clone(), label.to_string(), Arc::clone(&store_handle));
nodes.push(TestNode {
node_id: label.to_string(),
sk,
pk,
store,
});
}
Self { handle, nodes }
}
fn bootstrap_with_initial_dek(&self) {
let now = Utc::now();
for n in &self.nodes {
self.handle.apply_local(SecretsRaftOp::RegisterNode {
identity: NodeIdentity {
node_id: n.node_id.clone(),
secrets_pubkey: *n.pk.as_bytes(),
wg_pubkey: format!("wg-{}", n.node_id),
joined_at: now,
revoked_at: None,
},
});
}
let dek = ClusterDek::generate();
let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
for n in &self.nodes {
recipients.insert(
n.node_id.clone(),
RecipientPublicKey::from_bytes(*n.pk.as_bytes()),
);
}
let envelope = dek
.rewrap_for_set(&recipients, 1)
.expect("initial rewrap to all 3 nodes");
self.handle.apply_local(SecretsRaftOp::RotateDek {
new_wraps: envelope,
});
}
fn node(&self, id: &str) -> &TestNode {
self.nodes
.iter()
.find(|n| n.node_id == id)
.unwrap_or_else(|| panic!("node {id} not found in test cluster"))
}
fn leader(&self) -> &TestNode {
&self.nodes[0]
}
fn leader_rotate_dek(&self, revoke_node_id: Option<&str>) -> u64 {
let leader = self.leader();
let snapshot = self.handle.snapshot();
let prev_envelope = snapshot
.wrapped_dek
.as_ref()
.expect("no current DEK; bootstrap first")
.clone();
let prev_generation = prev_envelope.dek_generation;
let leader_wrap = prev_envelope
.wraps
.get(&leader.node_id)
.unwrap_or_else(|| panic!("leader {} has no wrap in current DEK", leader.node_id))
.clone();
let prev_dek =
ClusterDek::unwrap(&leader.sk, &leader_wrap).expect("leader unwraps prev DEK");
let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
for (node_id, identity) in &snapshot.nodes {
if identity.revoked_at.is_some() {
continue;
}
if Some(node_id.as_str()) == revoke_node_id {
continue;
}
recipients.insert(
node_id.clone(),
RecipientPublicKey::from_bytes(identity.secrets_pubkey),
);
}
let new_dek = ClusterDek::generate();
let new_generation = prev_generation.saturating_add(1);
let new_envelope = new_dek
.rewrap_for_set(&recipients, new_generation)
.expect("new DEK rewrap");
self.handle.apply_local(SecretsRaftOp::RotateDek {
new_wraps: new_envelope,
});
for secret in snapshot.secrets.values() {
if secret.dek_generation == new_generation {
continue;
}
let plaintext = prev_dek
.decrypt(&secret.ciphertext)
.expect("re-encrypt: decrypt under old DEK");
let new_ciphertext = new_dek
.encrypt(plaintext.as_slice())
.expect("re-encrypt: encrypt under new DEK");
let mut updated = secret.clone();
updated.ciphertext = new_ciphertext;
updated.dek_generation = new_generation;
self.handle
.apply_local(SecretsRaftOp::PutSecret { secret: updated });
}
new_generation
}
#[allow(clippy::needless_pass_by_value)]
fn leader_register_node_and_rotate(&self, identity: NodeIdentity) -> u64 {
let leader = self.leader();
let snapshot = self.handle.snapshot();
let (dek, base_generation) = match snapshot.wrapped_dek.as_ref() {
Some(envelope) => {
let leader_wrap = envelope
.wraps
.get(&leader.node_id)
.unwrap_or_else(|| {
panic!("leader {} has no wrap in current DEK", leader.node_id)
})
.clone();
let dek = ClusterDek::unwrap(&leader.sk, &leader_wrap)
.expect("leader unwraps current DEK");
(dek, envelope.dek_generation)
}
None => (ClusterDek::generate(), 0),
};
self.handle.apply_local(SecretsRaftOp::RegisterNode {
identity: identity.clone(),
});
let mut recipients: HashMap<String, RecipientPublicKey> = HashMap::new();
let post_snapshot = self.handle.snapshot();
for (node_id, n) in &post_snapshot.nodes {
if n.revoked_at.is_some() {
continue;
}
recipients.insert(
node_id.clone(),
RecipientPublicKey::from_bytes(n.secrets_pubkey),
);
}
recipients.insert(
identity.node_id.clone(),
RecipientPublicKey::from_bytes(identity.secrets_pubkey),
);
let new_generation = base_generation.saturating_add(1);
let envelope = dek
.rewrap_for_set(&recipients, new_generation)
.expect("rewrap for re-registration");
self.handle.apply_local(SecretsRaftOp::RotateDek {
new_wraps: envelope,
});
new_generation
}
}
#[tokio::test]
async fn revoke_node_triggers_dek_rotation() {
let cluster = ThreeNodeCluster::new();
cluster.bootstrap_with_initial_dek();
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
.await
.expect("set initial secret");
let pre = cluster.handle.snapshot();
let pre_generation = pre
.wrapped_dek
.as_ref()
.expect("DEK present")
.dek_generation;
assert_eq!(pre_generation, 1, "fixture installs DEK at generation 1");
assert!(
pre.wrapped_dek
.as_ref()
.expect("DEK")
.wraps
.contains_key("node-c"),
"node-c starts in the recipient set",
);
let c_old_wrap = pre
.wrapped_dek
.as_ref()
.expect("DEK")
.wraps
.get("node-c")
.expect("c has wrap pre-rotation")
.clone();
let c_old_dek = ClusterDek::unwrap(&cluster.node("node-c").sk, &c_old_wrap)
.expect("c can unwrap its own pre-rotation wrap");
cluster.handle.apply_local(SecretsRaftOp::RevokeNode {
node_id: "node-c".to_string(),
});
let new_generation = cluster.leader_rotate_dek(Some("node-c"));
assert_eq!(
new_generation,
pre_generation + 1,
"rotate must bump dek_generation by 1",
);
let post = cluster.handle.snapshot();
let post_envelope = post.wrapped_dek.as_ref().expect("post DEK present");
assert_eq!(post_envelope.dek_generation, new_generation);
assert!(
!post_envelope.wraps.contains_key("node-c"),
"revoked node-c must not appear in new wraps",
);
assert!(
post_envelope.wraps.contains_key("node-a"),
"node-a kept in new wraps",
);
assert!(
post_envelope.wraps.contains_key("node-b"),
"node-b kept in new wraps",
);
let from_a = cluster
.node("node-a")
.store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("A still reads after rotation");
assert_eq!(from_a.expose(), "hunter2");
let from_b = cluster
.node("node-b")
.store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("B still reads after rotation");
assert_eq!(from_b.expose(), "hunter2");
let storage_key = RaftSecretsStore::make_key("dep:myapp", "API_KEY");
let row = post
.secrets
.get(&storage_key)
.expect("secret row replicated");
assert_eq!(
row.dek_generation, new_generation,
"rotation walk re-encrypted the row to the new generation",
);
let new_ciphertext = row.ciphertext.clone();
let decrypt_attempt = c_old_dek.decrypt(&new_ciphertext);
assert!(
matches!(decrypt_attempt, Err(SecretsError::Decryption(_))),
"C's old DEK must NOT decrypt the new-generation ciphertext, got {decrypt_attempt:?}",
);
let from_c = cluster
.node("node-c")
.store
.get_secret("dep:myapp", "API_KEY")
.await;
assert!(
matches!(from_c, Err(SecretsError::Provider(_))),
"revoked node-c must not be able to read post-rotation, got {from_c:?}",
);
}
#[tokio::test]
async fn revoked_node_can_still_decrypt_old_ciphertext_with_old_dek() {
let cluster = ThreeNodeCluster::new();
cluster.bootstrap_with_initial_dek();
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
.await
.expect("set initial secret");
let pre = cluster.handle.snapshot();
let storage_key = RaftSecretsStore::make_key("dep:myapp", "API_KEY");
let old_ciphertext = pre
.secrets
.get(&storage_key)
.expect("secret present pre-rotation")
.ciphertext
.clone();
let c_old_wrap = pre
.wrapped_dek
.as_ref()
.expect("DEK")
.wraps
.get("node-c")
.expect("c has wrap pre-rotation")
.clone();
let c_old_dek = ClusterDek::unwrap(&cluster.node("node-c").sk, &c_old_wrap)
.expect("c unwraps pre-rotation");
cluster.handle.apply_local(SecretsRaftOp::RevokeNode {
node_id: "node-c".to_string(),
});
cluster.leader_rotate_dek(Some("node-c"));
let plaintext = c_old_dek
.decrypt(&old_ciphertext)
.expect("old DEK decrypts old ciphertext (forward security is not retroactive)");
assert_eq!(
plaintext.as_slice(),
b"hunter2",
"old material decrypts to the original plaintext",
);
}
#[tokio::test]
async fn revoked_node_register_node_idempotent() {
let cluster = ThreeNodeCluster::new();
cluster.bootstrap_with_initial_dek();
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
.await
.expect("set initial secret");
cluster.handle.apply_local(SecretsRaftOp::RevokeNode {
node_id: "node-c".to_string(),
});
cluster.leader_rotate_dek(Some("node-c"));
let mid = cluster.handle.snapshot();
assert!(
mid.nodes
.get("node-c")
.expect("c entry present (soft-revocation keeps the row)")
.revoked_at
.is_some(),
"c is soft-revoked",
);
assert!(
!mid.wrapped_dek
.as_ref()
.expect("DEK")
.wraps
.contains_key("node-c"),
"c is excluded from the post-revoke wraps",
);
let c = cluster.node("node-c");
let rejoin_identity = NodeIdentity {
node_id: c.node_id.clone(),
secrets_pubkey: *c.pk.as_bytes(),
wg_pubkey: format!("wg-{}", c.node_id),
joined_at: Utc::now(),
revoked_at: None,
};
let new_generation = cluster.leader_register_node_and_rotate(rejoin_identity);
let post = cluster.handle.snapshot();
let c_entry = post.nodes.get("node-c").expect("c entry present");
assert!(
c_entry.revoked_at.is_none(),
"re-registering must clear revoked_at on the existing entry, got {:?}",
c_entry.revoked_at,
);
let envelope = post.wrapped_dek.as_ref().expect("DEK");
assert_eq!(envelope.dek_generation, new_generation);
assert!(
envelope.wraps.contains_key("node-c"),
"rejoined C must be in the new recipient set",
);
let c_new_wrap = envelope
.wraps
.get("node-c")
.expect("c has a wrap in the post-rejoin envelope")
.clone();
let _c_new_dek = ClusterDek::unwrap(&cluster.node("node-c").sk, &c_new_wrap)
.expect("rejoined C unwraps the new envelope's wrap");
}
#[tokio::test]
async fn register_then_revoke_then_register_then_decrypt_works() {
let cluster = ThreeNodeCluster::new();
cluster.bootstrap_with_initial_dek();
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter2"))
.await
.expect("set initial secret");
cluster.handle.apply_local(SecretsRaftOp::RevokeNode {
node_id: "node-c".to_string(),
});
cluster.leader_rotate_dek(Some("node-c"));
let denied = cluster
.node("node-c")
.store
.get_secret("dep:myapp", "API_KEY")
.await;
assert!(
matches!(denied, Err(SecretsError::Provider(_))),
"C must not read post-revoke, got {denied:?}",
);
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter3"))
.await
.expect("update secret while C is out");
let c = cluster.node("node-c");
let rejoin = NodeIdentity {
node_id: c.node_id.clone(),
secrets_pubkey: *c.pk.as_bytes(),
wg_pubkey: format!("wg-{}", c.node_id),
joined_at: Utc::now(),
revoked_at: None,
};
cluster.leader_register_node_and_rotate(rejoin);
cluster
.leader()
.store
.set_secret("dep:myapp", "API_KEY", &Secret::new("hunter4"))
.await
.expect("write under new envelope generation");
let from_c = cluster
.node("node-c")
.store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("C reads the latest secret after rejoin");
assert_eq!(from_c.expose(), "hunter4");
let from_a = cluster
.node("node-a")
.store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("A reads after rejoin");
assert_eq!(from_a.expose(), "hunter4");
let from_b = cluster
.node("node-b")
.store
.get_secret("dep:myapp", "API_KEY")
.await
.expect("B reads after rejoin");
assert_eq!(from_b.expose(), "hunter4");
}