spiffe-rs 0.1.0

Rust port of spiffe-go with SPIFFE IDs, bundles, SVIDs, Workload API client, federation helpers, and rustls-based SPIFFE TLS utilities.
Documentation
use crate::bundle::{jwtbundle, spiffebundle, x509bundle};
use crate::spiffeid::TrustDomain;
use crate::workloadapi::option::{BundleSourceConfig, BundleSourceOption};
use crate::workloadapi::{Context, Result, Watcher, X509Context};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

pub struct BundleSource {
    watcher: Watcher,
    x509_authorities: Arc<RwLock<HashMap<TrustDomain, Vec<Vec<u8>>>>>,
    jwt_authorities: Arc<RwLock<HashMap<TrustDomain, HashMap<String, jwtbundle::JwtKey>>>>,
    closed: std::sync::atomic::AtomicBool,
}

impl BundleSource {
    pub async fn new<I>(ctx: &Context, options: I) -> Result<BundleSource>
    where
        I: IntoIterator<Item = Arc<dyn BundleSourceOption>>,
    {
        let mut config = BundleSourceConfig::default();
        for opt in options {
            opt.configure_bundle_source(&mut config);
        }

        let x509_authorities = Arc::new(RwLock::new(HashMap::new()));
        let jwt_authorities = Arc::new(RwLock::new(HashMap::new()));

        let x509_authorities_clone = x509_authorities.clone();
        let jwt_authorities_clone = jwt_authorities.clone();
        let x509_handler = Arc::new(move |context: X509Context| {
            let mut new_auth = HashMap::new();
            for bundle in context.bundles.bundles() {
                new_auth.insert(bundle.trust_domain(), bundle.x509_authorities());
            }
            if let Ok(mut guard) = x509_authorities_clone.write() {
                *guard = new_auth;
            }
        });

        let jwt_handler = Arc::new(move |bundles: jwtbundle::Set| {
            let mut new_auth = HashMap::new();
            for bundle in bundles.bundles() {
                new_auth.insert(bundle.trust_domain(), bundle.jwt_authorities());
            }
            if let Ok(mut guard) = jwt_authorities_clone.write() {
                *guard = new_auth;
            }
        });

        let watcher = Watcher::new(ctx, config.watcher, Some(x509_handler), Some(jwt_handler)).await?;
        Ok(BundleSource {
            watcher,
            x509_authorities,
            jwt_authorities,
            closed: std::sync::atomic::AtomicBool::new(false),
        })
    }

    pub async fn close(&self) -> Result<()> {
        self.closed.store(true, std::sync::atomic::Ordering::SeqCst);
        self.watcher.close().await
    }

    pub fn get_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<spiffebundle::Bundle> {
        self.check_closed()?;
        let x509 = self
            .x509_authorities
            .read()
            .ok()
            .and_then(|guard| guard.get(&trust_domain).cloned());
        let jwt = self
            .jwt_authorities
            .read()
            .ok()
            .and_then(|guard| guard.get(&trust_domain).cloned());

        if x509.is_none() && jwt.is_none() {
            return Err(crate::workloadapi::Error::new(format!(
                "bundlesource: no SPIFFE bundle for trust domain {:?}",
                trust_domain
            )));
        }
        let bundle = spiffebundle::Bundle::new(trust_domain.clone());
        if let Some(x509) = x509 {
            bundle.set_x509_authorities(&x509);
        }
        if let Some(jwt) = jwt {
            bundle.set_jwt_authorities(&jwt);
        }
        Ok(bundle)
    }

    pub fn get_x509_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<x509bundle::Bundle> {
        self.check_closed()?;
        let x509 = self
            .x509_authorities
            .read()
            .ok()
            .and_then(|guard| guard.get(&trust_domain).cloned())
            .ok_or_else(|| {
                crate::workloadapi::Error::new(format!(
                    "bundlesource: no X.509 bundle for trust domain {:?}",
                    trust_domain
                ))
            })?;
        Ok(x509bundle::Bundle::from_x509_authorities(trust_domain, &x509))
    }

    pub fn get_jwt_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<jwtbundle::Bundle> {
        self.check_closed()?;
        let jwt = self
            .jwt_authorities
            .read()
            .ok()
            .and_then(|guard| guard.get(&trust_domain).cloned())
            .ok_or_else(|| {
                crate::workloadapi::Error::new(format!(
                    "bundlesource: no JWT bundle for trust domain {:?}",
                    trust_domain
                ))
            })?;
        Ok(jwtbundle::Bundle::from_jwt_authorities(trust_domain, &jwt))
    }

    pub async fn wait_until_updated(&self, ctx: &Context) -> Result<()> {
        self.watcher.wait_until_updated(ctx).await
    }

    pub fn updated(&self) -> tokio::sync::watch::Receiver<u64> {
        self.watcher.updated()
    }

    fn check_closed(&self) -> Result<()> {
        if self.closed.load(std::sync::atomic::Ordering::SeqCst) {
            return Err(crate::workloadapi::Error::new("bundlesource: source is closed"));
        }
        Ok(())
    }
}

impl spiffebundle::Source for BundleSource {
    fn get_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> spiffebundle::Result<spiffebundle::Bundle> {
        self.get_bundle_for_trust_domain(trust_domain)
            .map_err(|err| spiffebundle::Error::new(err.to_string()))
    }
}

impl x509bundle::Source for BundleSource {
    fn get_x509_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> x509bundle::Result<x509bundle::Bundle> {
        self.get_x509_bundle_for_trust_domain(trust_domain)
            .map_err(|err| x509bundle::Error::new(err.to_string()))
    }
}

impl jwtbundle::Source for BundleSource {
    fn get_jwt_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> jwtbundle::Result<jwtbundle::Bundle> {
        self.get_jwt_bundle_for_trust_domain(trust_domain)
            .map_err(|err| jwtbundle::Error::new(err.to_string()))
    }
}