use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use crabka_metadata::{DeleteDelegationTokenRecord, MetadataImage, MetadataRecord};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
#[async_trait]
pub(crate) trait DelegationTokenController: Send + Sync {
fn current_image(&self) -> Arc<MetadataImage>;
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), String>;
}
pub(crate) async fn run(
controller: Arc<dyn DelegationTokenController>,
interval: Duration,
shutdown: CancellationToken,
) {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = tick.tick() => sweep(&*controller).await,
() = shutdown.cancelled() => {
info!("delegation-token cleanup shutting down");
return;
}
}
}
}
pub(crate) async fn sweep(controller: &dyn DelegationTokenController) {
let now = crate::time_util::now_ms();
let expired: Vec<String> = controller
.current_image()
.all_delegation_tokens()
.filter(|t| t.expiry_timestamp_ms <= now)
.map(|t| t.token_id.clone())
.collect();
if expired.is_empty() {
return;
}
let records: Vec<MetadataRecord> = expired
.iter()
.map(|id| {
MetadataRecord::V1DeleteDelegationToken(DeleteDelegationTokenRecord {
token_id: id.clone(),
})
})
.collect();
let count = expired.len();
if let Err(e) = controller.submit_change(records).await {
warn!(
error = %e,
count,
"failed to submit delegation-token tombstone batch"
);
} else {
for id in &expired {
debug!(token_id = %id, "delegation token expired and tombstoned");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::sync::Mutex;
use crabka_metadata::{DelegationTokenRecord, MetadataImage, MetadataRecord};
use crabka_security::KafkaPrincipal;
use uuid::Uuid;
struct MockController {
image: Mutex<Arc<MetadataImage>>,
submitted: Mutex<Vec<MetadataRecord>>,
}
#[async_trait]
impl DelegationTokenController for MockController {
fn current_image(&self) -> Arc<MetadataImage> {
self.image.lock().unwrap().clone()
}
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), String> {
let mut img: MetadataImage = (**self.image.lock().unwrap()).clone();
for r in &records {
img.apply(r);
}
*self.image.lock().unwrap() = Arc::new(img);
self.submitted.lock().unwrap().extend(records);
Ok(())
}
}
fn principal(t: &str, n: &str) -> KafkaPrincipal {
KafkaPrincipal {
principal_type: t.into(),
name: n.into(),
}
}
fn dt_record(token_id: &str, expiry_ms: i64) -> MetadataRecord {
MetadataRecord::V1DelegationToken(DelegationTokenRecord {
token_id: token_id.into(),
owner: principal("User", "alice"),
hmac: vec![0xAB; 32],
issue_timestamp_ms: 0,
expiry_timestamp_ms: expiry_ms,
max_timestamp_ms: i64::MAX,
renewers: vec![],
})
}
#[tokio::test]
async fn sweep_emits_tombstones_for_expired_tokens_only() {
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&dt_record("expired-1", 1_000));
img.apply(&dt_record("expired-2", 2_000));
img.apply(&dt_record("fresh", i64::MAX));
let mock = Arc::new(MockController {
image: Mutex::new(Arc::new(img)),
submitted: Mutex::new(Vec::new()),
});
sweep(&*mock).await;
let submitted = mock.submitted.lock().unwrap();
assert!(submitted.len() == 2);
let mut tombstone_ids: Vec<String> = submitted
.iter()
.map(|r| match r {
MetadataRecord::V1DeleteDelegationToken(d) => d.token_id.clone(),
other => panic!("unexpected record type: {other:?}"),
})
.collect();
tombstone_ids.sort();
assert!(tombstone_ids == vec!["expired-1".to_string(), "expired-2".to_string()]);
drop(submitted);
let after = mock.current_image();
assert!(after.all_delegation_tokens().count() == 1);
assert!(after.delegation_token_by_id("fresh").is_some());
assert!(after.delegation_token_by_id("expired-1").is_none());
assert!(after.delegation_token_by_id("expired-2").is_none());
}
#[tokio::test]
async fn sweep_with_no_expired_tokens_is_a_no_op() {
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&dt_record("fresh", i64::MAX));
let mock = Arc::new(MockController {
image: Mutex::new(Arc::new(img)),
submitted: Mutex::new(Vec::new()),
});
sweep(&*mock).await;
assert!(mock.submitted.lock().unwrap().is_empty());
assert!(mock.current_image().all_delegation_tokens().count() == 1);
}
}