use crate::error::{Error, Result};
use crate::material::{
cert_chain_from_der_bytes, certified_key_from_chain_and_key, certs_from_der_bytes,
roots_from_certs, MaterialSnapshot,
};
use crate::prelude::{debug, error, info, warn};
use spiffe::X509Source;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
#[derive(Clone, Debug)]
pub(crate) struct MaterialWatcher {
rx: watch::Receiver<Arc<MaterialSnapshot>>,
_guard: Arc<WatcherGuard>,
}
#[derive(Debug)]
struct WatcherGuard {
cancel: CancellationToken,
task: tokio::task::JoinHandle<()>,
}
impl Drop for WatcherGuard {
fn drop(&mut self) {
self.cancel.cancel();
self.task.abort();
}
}
impl MaterialWatcher {
pub(crate) fn spawn(source: Arc<X509Source>) -> Result<Self> {
let cancel = CancellationToken::new();
let token = cancel.clone();
let handle = tokio::runtime::Handle::try_current()
.map_err(|tokio::runtime::TryCurrentError { .. }| Error::NoTokioRuntime)?;
let initial = Arc::new(build_material(source.as_ref(), 1)?);
let (tx, rx) = watch::channel(initial);
let mut updates = source.updated();
let mut generation = 1u64;
let task = handle.spawn(async move {
loop {
tokio::select! {
() = token.cancelled() => {
debug!("material watcher cancelled; stopping");
break;
}
res = updates.changed() => {
if res.is_ok() {
let next_generation = generation + 1;
match build_material(source.as_ref(), next_generation) {
Ok(mat) => {
match tx.send(Arc::new(mat)) {
Ok(()) => {
generation = next_generation;
debug!("updated rustls material from X509Source rotation (generation={generation})");
}
Err(watch::error::SendError(material)) => {
let _unused: Arc<MaterialSnapshot> = material;
info!("material watcher has no receivers; stopping");
break;
}
}
}
Err(e) => {
error!("failed rebuilding rustls material; keeping previous: {e}");
}
}
} else {
info!("x509 source update channel closed; stopping material watcher");
break;
}
}
}
}
});
Ok(Self {
rx,
_guard: Arc::new(WatcherGuard { cancel, task }),
})
}
pub(crate) fn current(&self) -> Arc<MaterialSnapshot> {
self.rx.borrow().clone()
}
}
fn build_material<S: X509MaterialSource>(source: &S, generation: u64) -> Result<MaterialSnapshot> {
let svid = source.current_svid()?;
let bundle_set = source.bundle_set()?;
let cert_chain = cert_chain_from_der_bytes(
svid.cert_chain()
.iter()
.map(spiffe::cert::Certificate::as_bytes),
);
let certified_key =
certified_key_from_chain_and_key(cert_chain, svid.private_key().as_bytes())?;
let mut roots_by_td = BTreeMap::new();
for (trust_domain, bundle) in bundle_set.iter() {
let root_certs = certs_from_der_bytes(
bundle
.authorities()
.iter()
.map(spiffe::cert::Certificate::as_bytes),
);
match roots_from_certs(&root_certs) {
Ok(roots) => {
roots_by_td.insert(trust_domain.clone(), roots);
}
Err(e) => {
warn!("Failed to build root cert store for trust domain {trust_domain}: {e}");
}
}
}
if roots_by_td.is_empty() {
return Err(Error::NoUsableRootStores);
}
Ok(MaterialSnapshot {
generation,
certified_key,
roots_by_td,
})
}
trait X509MaterialSource {
fn current_svid(&self) -> Result<Arc<spiffe::X509Svid>>;
fn bundle_set(&self) -> Result<Arc<spiffe::X509BundleSet>>;
}
impl X509MaterialSource for X509Source {
fn current_svid(&self) -> Result<Arc<spiffe::X509Svid>> {
self.svid().map_err(Error::from)
}
fn bundle_set(&self) -> Result<Arc<spiffe::X509BundleSet>> {
self.bundle_set().map_err(Error::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
use spiffe::{TrustDomain, X509BundleSet};
use std::sync::Mutex;
fn ensure_provider() {
crate::crypto::ensure_crypto_provider_installed();
}
fn fixture_spiffe_leaf_der() -> &'static [u8] {
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/fixtures/spiffe_leaf.der"
))
}
fn fixture_ca_der() -> &'static [u8] {
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/fixtures/ca.der"
))
}
fn fixture_leaf_key_pkcs8_der() -> &'static [u8] {
include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/fixtures/leaf.key.pkcs8"
))
}
fn make_svid() -> spiffe::X509Svid {
spiffe::X509Svid::parse_from_der(fixture_spiffe_leaf_der(), fixture_leaf_key_pkcs8_der())
.unwrap()
}
fn make_bundle(td: TrustDomain) -> spiffe::X509Bundle {
spiffe::X509Bundle::parse_from_der(td, fixture_ca_der()).unwrap()
}
#[derive(Debug)]
struct FakeSource {
pub svid: Mutex<Option<Arc<spiffe::X509Svid>>>,
pub bundle_set: Mutex<X509BundleSet>,
}
impl FakeSource {
fn new(
svid: Option<Arc<spiffe::X509Svid>>,
bundle: Option<Arc<spiffe::X509Bundle>>,
) -> Self {
let mut bundle_set = X509BundleSet::new();
if let Some(b) = bundle {
bundle_set.add_bundle((*b).clone());
}
Self {
svid: Mutex::new(svid),
bundle_set: Mutex::new(bundle_set),
}
}
}
#[expect(
clippy::unwrap_in_result,
reason = "https://github.com/rust-lang/rust-clippy/issues/16476"
)]
impl X509MaterialSource for FakeSource {
fn current_svid(&self) -> Result<Arc<spiffe::X509Svid>> {
self.svid
.lock()
.expect("FakeSource.svid mutex poisoned")
.clone()
.ok_or(Error::NoSvid)
}
fn bundle_set(&self) -> Result<Arc<X509BundleSet>> {
Ok(Arc::new(
self.bundle_set
.lock()
.expect("FakeSource.bundle_set mutex poisoned")
.clone(),
))
}
}
#[test]
fn build_material_ok() {
ensure_provider();
let td = TrustDomain::new("example.org").unwrap();
let src = FakeSource::new(
Some(Arc::new(make_svid())),
Some(Arc::new(make_bundle(td.clone()))),
);
let mat = build_material(&src, 1).unwrap();
assert!(!mat.certified_key.cert.is_empty());
assert!(!mat.roots_by_td.is_empty());
assert!(mat.roots_by_td.contains_key(&td));
}
#[test]
fn build_material_no_svid() {
ensure_provider();
let td = TrustDomain::new("example.org").unwrap();
let src = FakeSource::new(None, Some(Arc::new(make_bundle(td))));
let err = build_material(&src, 1).unwrap_err();
assert!(matches!(err, Error::NoSvid));
}
#[test]
fn build_material_no_bundle() {
ensure_provider();
let src = FakeSource::new(Some(Arc::new(make_svid())), None);
let err = build_material(&src, 1).unwrap_err();
assert!(matches!(err, Error::NoUsableRootStores));
}
#[test]
fn roots_from_bundle_der_builds_store() {
let certs = certs_from_der_bytes([fixture_ca_der()]);
let store = roots_from_certs(&certs).unwrap();
assert!(!store.is_empty());
}
#[test]
fn certified_key_from_der_builds_key() {
ensure_provider();
let chain = cert_chain_from_der_bytes([fixture_spiffe_leaf_der()]);
let key = fixture_leaf_key_pkcs8_der();
let ck = certified_key_from_chain_and_key(chain, key).unwrap();
assert!(!ck.cert.is_empty());
}
}