use std::sync::Arc;
use std::time::Duration;
use koi_certmesh::{ApprovalDecision, ApprovalRequest, CertmeshCore};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
pub type ApprovalDecider = Arc<dyn Fn(&str, bool) -> ApprovalDecision + Send + Sync>;
pub fn deny_and_log_decider() -> ApprovalDecider {
Arc::new(|hostname: &str, _requires_approval: bool| {
tracing::warn!(
hostname,
"Certmesh enrollment auto-denied (no interactive console to approve)"
);
ApprovalDecision::Denied
})
}
pub async fn spawn_enrollment_approval(
certmesh: &Arc<CertmeshCore>,
decider: ApprovalDecider,
cancel: &CancellationToken,
tasks: &mut Vec<JoinHandle<()>>,
) {
let (tx, mut rx) = mpsc::channel(8);
certmesh.set_approval_channel(tx).await;
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => break,
request = rx.recv() => {
let Some(request) = request else {
break;
};
dispatch_approval(request, decider.clone()).await;
}
}
}
}));
}
async fn dispatch_approval(request: ApprovalRequest, decider: ApprovalDecider) {
let ApprovalRequest {
hostname,
requires_approval,
respond_to,
} = request;
let decision = tokio::task::spawn_blocking(move || decider(&hostname, requires_approval))
.await
.unwrap_or(ApprovalDecision::Denied);
let _ = respond_to.send(decision);
}
pub fn spawn_certmesh_background_tasks(
certmesh: &Arc<CertmeshCore>,
cancel: &CancellationToken,
tasks: &mut Vec<JoinHandle<()>>,
) {
let cm = Arc::clone(certmesh);
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
let interval = Duration::from_secs(koi_certmesh::lifecycle::RENEWAL_CHECK_INTERVAL_SECS);
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(interval) => {
match cm.pull_trust_bundle().await {
Ok(koi_certmesh::BundleOutcome::Updated { seq, self_revoked }) => {
if self_revoked {
tracing::error!(seq, "Trust bundle marks this node REVOKED");
} else {
tracing::info!(seq, "Trust bundle updated");
}
}
Ok(_) => {}
Err(e) => tracing::debug!(error = %e, "Trust bundle pull skipped"),
}
match cm.renew_self_if_due().await {
Ok(koi_certmesh::RenewOutcome::Renewed { expires, hook }) => {
let hook_ok = hook.as_ref().map(|h| h.success).unwrap_or(true);
if hook_ok {
tracing::info!(%expires, "Certificate renewed (rotated key)");
} else {
tracing::warn!(%expires, "Certificate renewed but reload hook failed");
}
}
Ok(koi_certmesh::RenewOutcome::NotDue { .. })
| Ok(koi_certmesh::RenewOutcome::NotApplicable) => {}
Err(e) => {
tracing::warn!(error = %e, "Certificate renewal failed; will retry next cycle");
}
}
}
}
}
}));
tracing::debug!("Certmesh background tasks spawned");
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::oneshot;
#[tokio::test]
async fn deny_and_log_decider_denies_regardless_of_approval_flag() {
let decider = deny_and_log_decider();
for requires_approval in [false, true] {
assert!(matches!(
decider("host", requires_approval),
ApprovalDecision::Denied
));
}
}
#[tokio::test]
async fn dispatch_approval_routes_decider_approval() {
let decider: ApprovalDecider =
Arc::new(|_hostname, _requires_approval| ApprovalDecision::Approved {
operator: Some("alice".to_string()),
});
let (tx, rx) = oneshot::channel();
let request = ApprovalRequest {
hostname: "node-1".to_string(),
requires_approval: true,
respond_to: tx,
};
dispatch_approval(request, decider).await;
match rx.await.expect("decision delivered") {
ApprovalDecision::Approved { operator } => {
assert_eq!(operator.as_deref(), Some("alice"))
}
ApprovalDecision::Denied => panic!("expected approval"),
}
}
#[tokio::test]
async fn dispatch_approval_routes_deny_decider() {
let (tx, rx) = oneshot::channel();
let request = ApprovalRequest {
hostname: "node-2".to_string(),
requires_approval: false,
respond_to: tx,
};
dispatch_approval(request, deny_and_log_decider()).await;
assert!(matches!(
rx.await.expect("decision delivered"),
ApprovalDecision::Denied
));
}
#[tokio::test]
async fn spawn_enrollment_approval_pumps_until_cancel() {
let dir = std::env::temp_dir().join(format!("koi-compose-approval-{}", std::process::id()));
let paths = koi_certmesh::CertmeshPaths::with_data_dir(dir);
let certmesh = Arc::new(koi_certmesh::CertmeshCore::uninitialized_with_paths(paths));
let cancel = CancellationToken::new();
let mut tasks = Vec::new();
spawn_enrollment_approval(&certmesh, deny_and_log_decider(), &cancel, &mut tasks).await;
assert_eq!(tasks.len(), 1);
cancel.cancel();
for task in tasks {
task.await.expect("pump task joins cleanly");
}
}
}