use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use zeroize::Zeroizing;
use super::INSTALL_TOKEN_LOCK;
use crate::error::AppError;
use vti_common::store::KeyspaceHandle;
pub const ENROLLMENT_CLAIM_WINDOW_SECS: u64 = 300;
const TOKEN_KEY_PREFIX: &[u8] = b"install:token:";
const EMERGENCY_PENDING_KEY: &[u8] = b"install:emergency_pending";
fn token_key(jti: &Uuid) -> Vec<u8> {
let mut out = TOKEN_KEY_PREFIX.to_vec();
out.extend_from_slice(jti.to_string().as_bytes());
out
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum InstallTokenState {
Issued {
exp: DateTime<Utc>,
#[serde(with = "raw_bytes_b64")]
cnonce: [u8; 32],
#[serde(with = "raw_bytes_b64")]
ephemeral_signing_key: [u8; 32],
claimed_at: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
claim_secret_hash: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
admin_did: Option<String>,
},
Consumed {
at: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
admin_did: Option<String>,
},
}
mod raw_bytes_b64 {
use base64::Engine;
use serde::{Deserialize, Deserializer, Serializer};
const B64: base64::engine::general_purpose::GeneralPurpose =
base64::engine::general_purpose::URL_SAFE_NO_PAD;
pub fn serialize<S: Serializer>(bytes: &[u8; 32], s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&B64.encode(bytes))
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<[u8; 32], D::Error> {
let s = String::deserialize(d)?;
let v = B64.decode(&s).map_err(serde::de::Error::custom)?;
v.try_into()
.map_err(|_| serde::de::Error::custom("expected 32 bytes"))
}
}
#[derive(Debug)]
pub struct StartClaimOutcome {
pub ephemeral_signing_key: Zeroizing<[u8; 32]>,
pub cnonce: [u8; 32],
pub claim_secret_hash: Option<String>,
}
#[derive(Clone)]
pub struct InstallTokenStore {
ks: KeyspaceHandle,
}
impl InstallTokenStore {
pub fn new(ks: KeyspaceHandle) -> Self {
Self { ks }
}
pub async fn record_issued(
&self,
jti: &Uuid,
cnonce: [u8; 32],
ephemeral_signing_key: [u8; 32],
exp: DateTime<Utc>,
claim_secret_hash: Option<String>,
admin_did: Option<String>,
) -> Result<(), AppError> {
let _guard = INSTALL_TOKEN_LOCK.lock().await;
let state = InstallTokenState::Issued {
exp,
cnonce,
ephemeral_signing_key,
claimed_at: None,
claim_secret_hash,
admin_did,
};
self.ks.insert(token_key(jti), &state).await
}
pub async fn start_claim(&self, jti: &Uuid) -> Result<StartClaimOutcome, AppError> {
let _guard = INSTALL_TOKEN_LOCK.lock().await;
let key = token_key(jti);
let state: Option<InstallTokenState> = self.ks.get(key.clone()).await?;
let state =
state.ok_or_else(|| AppError::Unauthorized("install token not found".into()))?;
match state {
InstallTokenState::Consumed { .. } => {
Err(AppError::Unauthorized("install token consumed".into()))
}
InstallTokenState::Issued {
exp,
cnonce,
ephemeral_signing_key,
claimed_at,
claim_secret_hash,
admin_did,
} => {
let now = Utc::now();
if now >= exp {
return Err(AppError::Unauthorized("install token expired".into()));
}
if let Some(prev) = claimed_at {
let elapsed = now - prev;
if elapsed < Duration::seconds(ENROLLMENT_CLAIM_WINDOW_SECS as i64) {
return Err(AppError::Conflict(
"install ceremony already in progress".into(),
));
}
}
let next = InstallTokenState::Issued {
exp,
cnonce,
ephemeral_signing_key,
claimed_at: Some(now),
claim_secret_hash: claim_secret_hash.clone(),
admin_did,
};
self.ks.insert(key, &next).await?;
Ok(StartClaimOutcome {
cnonce,
ephemeral_signing_key: Zeroizing::new(ephemeral_signing_key),
claim_secret_hash,
})
}
}
}
pub async fn finish_claim(&self, jti: &Uuid) -> Result<(), AppError> {
let _guard = INSTALL_TOKEN_LOCK.lock().await;
let key = token_key(jti);
let state: Option<InstallTokenState> = self.ks.get(key.clone()).await?;
let state =
state.ok_or_else(|| AppError::Unauthorized("install token not found".into()))?;
match state {
InstallTokenState::Consumed { .. } => {
Err(AppError::Unauthorized("install token consumed".into()))
}
InstallTokenState::Issued { exp, admin_did, .. } => {
let now = Utc::now();
if now >= exp {
return Err(AppError::Unauthorized("install token expired".into()));
}
let next = InstallTokenState::Consumed { at: now, admin_did };
self.ks.insert(key, &next).await
}
}
}
pub async fn mark_emergency_pending(
&self,
pending: PendingEmergencyBootstrap,
) -> Result<(), AppError> {
self.ks
.insert(EMERGENCY_PENDING_KEY.to_vec(), &pending)
.await
}
pub async fn take_pending_emergency(
&self,
) -> Result<Option<PendingEmergencyBootstrap>, AppError> {
let _guard = INSTALL_TOKEN_LOCK.lock().await;
let key = EMERGENCY_PENDING_KEY.to_vec();
let value: Option<PendingEmergencyBootstrap> = self.ks.get(key.clone()).await?;
if value.is_some() {
self.ks.remove(key).await?;
}
Ok(value)
}
pub async fn list_tokens(&self) -> Result<Vec<(Uuid, InstallTokenState)>, AppError> {
let raw = self.ks.prefix_iter_raw(TOKEN_KEY_PREFIX.to_vec()).await?;
let mut out = Vec::with_capacity(raw.len());
for (k, v) in raw {
let Some(suffix) = k.strip_prefix(TOKEN_KEY_PREFIX) else {
continue;
};
let Ok(jti_str) = std::str::from_utf8(suffix) else {
continue;
};
let Ok(jti) = jti_str.parse::<Uuid>() else {
continue;
};
match serde_json::from_slice::<InstallTokenState>(&v) {
Ok(state) => out.push((jti, state)),
Err(e) => {
tracing::warn!(error = %e, %jti, "skipping unparseable install token state")
}
}
}
Ok(out)
}
pub async fn get_token(&self, jti: &Uuid) -> Result<Option<InstallTokenState>, AppError> {
self.ks.get(token_key(jti)).await
}
pub async fn delete_token(&self, jti: &Uuid) -> Result<bool, AppError> {
let _guard = INSTALL_TOKEN_LOCK.lock().await;
let key = token_key(jti);
let existed: Option<InstallTokenState> = self.ks.get(key.clone()).await?;
if existed.is_some() {
self.ks.remove(key).await?;
Ok(true)
} else {
Ok(false)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PendingEmergencyBootstrap {
pub operator_hostname: String,
pub invoked_at: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
use vti_common::config::StoreConfig;
use vti_common::store::Store;
fn temp_store() -> (InstallTokenStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let cfg = StoreConfig {
data_dir: dir.path().to_path_buf(),
};
let store = Store::open(&cfg).expect("store");
let ks = store.keyspace("install-test").expect("ks");
(InstallTokenStore::new(ks), dir)
}
async fn issue(store: &InstallTokenStore, ttl: i64) -> Uuid {
issue_with_hash(store, ttl, None).await
}
async fn issue_with_hash(
store: &InstallTokenStore,
ttl: i64,
claim_secret_hash: Option<String>,
) -> Uuid {
let jti = Uuid::new_v4();
let exp = Utc::now() + Duration::seconds(ttl);
store
.record_issued(
&jti,
[0xAB; 32],
[0xCD; 32],
exp,
claim_secret_hash,
Some("did:key:zTestAdmin".to_string()),
)
.await
.unwrap();
jti
}
#[tokio::test]
async fn start_then_finish_consumes_token() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
let outcome = store.start_claim(&jti).await.unwrap();
assert_eq!(outcome.cnonce, [0xAB; 32]);
assert_eq!(*outcome.ephemeral_signing_key, [0xCD; 32]);
store.finish_claim(&jti).await.unwrap();
let err = store.finish_claim(&jti).await.expect_err("second finish");
assert!(matches!(err, AppError::Unauthorized(_)));
}
#[tokio::test]
async fn finish_preserves_admin_did_in_consumed_row() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
store.start_claim(&jti).await.unwrap();
store.finish_claim(&jti).await.unwrap();
let row = store.get_token(&jti).await.unwrap().expect("token row");
match row {
InstallTokenState::Consumed { admin_did, .. } => {
assert_eq!(admin_did.as_deref(), Some("did:key:zTestAdmin"));
}
_ => panic!("expected Consumed state after finish_claim"),
}
}
#[tokio::test]
async fn second_concurrent_start_within_window_is_rejected() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
let _first = store.start_claim(&jti).await.unwrap();
let err = store.start_claim(&jti).await.expect_err("conflict");
assert!(matches!(err, AppError::Conflict(_)));
}
#[tokio::test]
async fn retry_after_window_succeeds() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
let _first = store.start_claim(&jti).await.unwrap();
let key = token_key(&jti);
let mut state: InstallTokenState = store.ks.get(key.clone()).await.unwrap().unwrap();
if let InstallTokenState::Issued {
ref mut claimed_at, ..
} = state
{
*claimed_at =
Some(Utc::now() - Duration::seconds((ENROLLMENT_CLAIM_WINDOW_SECS as i64) + 10));
}
store.ks.insert(key, &state).await.unwrap();
let _retry = store.start_claim(&jti).await.expect("retry after window");
}
#[tokio::test]
async fn expired_token_rejected_on_start() {
let (store, _dir) = temp_store();
let jti = issue(&store, -1).await;
let err = store.start_claim(&jti).await.expect_err("expired");
assert!(matches!(err, AppError::Unauthorized(_)));
}
#[tokio::test]
async fn expired_token_rejected_on_finish() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
let _ = store.start_claim(&jti).await.unwrap();
let key = token_key(&jti);
let mut state: InstallTokenState = store.ks.get(key.clone()).await.unwrap().unwrap();
if let InstallTokenState::Issued { ref mut exp, .. } = state {
*exp = Utc::now() - Duration::seconds(1);
}
store.ks.insert(key, &state).await.unwrap();
let err = store.finish_claim(&jti).await.expect_err("expired");
assert!(matches!(err, AppError::Unauthorized(_)));
}
#[tokio::test]
async fn claim_secret_hash_is_surfaced_in_outcome() {
let (store, _dir) = temp_store();
let hash = "$argon2id$test-stub".to_string();
let jti = issue_with_hash(&store, 600, Some(hash.clone())).await;
let outcome = store.start_claim(&jti).await.unwrap();
assert_eq!(outcome.claim_secret_hash.as_deref(), Some(hash.as_str()));
}
#[tokio::test]
async fn no_hash_when_token_minted_without_secret() {
let (store, _dir) = temp_store();
let jti = issue(&store, 600).await;
let outcome = store.start_claim(&jti).await.unwrap();
assert!(outcome.claim_secret_hash.is_none());
}
#[tokio::test]
async fn missing_token_returns_unauthorized() {
let (store, _dir) = temp_store();
let phantom_jti = Uuid::new_v4();
let err = store
.start_claim(&phantom_jti)
.await
.expect_err("not found");
assert!(matches!(err, AppError::Unauthorized(_)));
}
#[tokio::test]
async fn state_machine_serde_round_trip() {
let state = InstallTokenState::Issued {
exp: Utc::now() + Duration::seconds(60),
cnonce: [0xAB; 32],
ephemeral_signing_key: [0xCD; 32],
claimed_at: Some(Utc::now()),
claim_secret_hash: Some("$argon2id$stub".into()),
admin_did: Some("did:key:zSerdeRoundTrip".into()),
};
let s = serde_json::to_string(&state).unwrap();
let back: InstallTokenState = serde_json::from_str(&s).unwrap();
assert_eq!(back, state);
}
#[tokio::test]
async fn issued_row_without_hash_field_deserializes() {
let legacy_json = serde_json::json!({
"status": "issued",
"exp": "2099-01-01T00:00:00Z",
"cnonce": "qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqo",
"ephemeral_signing_key": "zc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc3Nzc0",
"claimed_at": null
});
let state: InstallTokenState = serde_json::from_value(legacy_json).unwrap();
match state {
InstallTokenState::Issued {
claim_secret_hash, ..
} => {
assert!(claim_secret_hash.is_none());
}
_ => panic!("expected Issued"),
}
}
}