spiffe_rs/workloadapi/
bundlesource.rs1use crate::bundle::{jwtbundle, spiffebundle, x509bundle};
2use crate::spiffeid::TrustDomain;
3use crate::workloadapi::option::{BundleSourceConfig, BundleSourceOption};
4use crate::workloadapi::{Context, Result, Watcher, X509Context};
5use std::collections::HashMap;
6use std::sync::{Arc, RwLock};
7
8pub struct BundleSource {
9 watcher: Watcher,
10 x509_authorities: Arc<RwLock<HashMap<TrustDomain, Vec<Vec<u8>>>>>,
11 jwt_authorities: Arc<RwLock<HashMap<TrustDomain, HashMap<String, jwtbundle::JwtKey>>>>,
12 closed: std::sync::atomic::AtomicBool,
13}
14
15impl BundleSource {
16 pub async fn new<I>(ctx: &Context, options: I) -> Result<BundleSource>
17 where
18 I: IntoIterator<Item = Arc<dyn BundleSourceOption>>,
19 {
20 let mut config = BundleSourceConfig::default();
21 for opt in options {
22 opt.configure_bundle_source(&mut config);
23 }
24
25 let x509_authorities = Arc::new(RwLock::new(HashMap::new()));
26 let jwt_authorities = Arc::new(RwLock::new(HashMap::new()));
27
28 let x509_authorities_clone = x509_authorities.clone();
29 let jwt_authorities_clone = jwt_authorities.clone();
30 let x509_handler = Arc::new(move |context: X509Context| {
31 let mut new_auth = HashMap::new();
32 for bundle in context.bundles.bundles() {
33 new_auth.insert(bundle.trust_domain(), bundle.x509_authorities());
34 }
35 if let Ok(mut guard) = x509_authorities_clone.write() {
36 *guard = new_auth;
37 }
38 });
39
40 let jwt_handler = Arc::new(move |bundles: jwtbundle::Set| {
41 let mut new_auth = HashMap::new();
42 for bundle in bundles.bundles() {
43 new_auth.insert(bundle.trust_domain(), bundle.jwt_authorities());
44 }
45 if let Ok(mut guard) = jwt_authorities_clone.write() {
46 *guard = new_auth;
47 }
48 });
49
50 let watcher = Watcher::new(ctx, config.watcher, Some(x509_handler), Some(jwt_handler)).await?;
51 Ok(BundleSource {
52 watcher,
53 x509_authorities,
54 jwt_authorities,
55 closed: std::sync::atomic::AtomicBool::new(false),
56 })
57 }
58
59 pub async fn close(&self) -> Result<()> {
60 self.closed.store(true, std::sync::atomic::Ordering::SeqCst);
61 self.watcher.close().await
62 }
63
64 pub fn get_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<spiffebundle::Bundle> {
65 self.check_closed()?;
66 let x509 = self
67 .x509_authorities
68 .read()
69 .ok()
70 .and_then(|guard| guard.get(&trust_domain).cloned());
71 let jwt = self
72 .jwt_authorities
73 .read()
74 .ok()
75 .and_then(|guard| guard.get(&trust_domain).cloned());
76
77 if x509.is_none() && jwt.is_none() {
78 return Err(crate::workloadapi::Error::new(format!(
79 "bundlesource: no SPIFFE bundle for trust domain {:?}",
80 trust_domain
81 )));
82 }
83 let bundle = spiffebundle::Bundle::new(trust_domain.clone());
84 if let Some(x509) = x509 {
85 bundle.set_x509_authorities(&x509);
86 }
87 if let Some(jwt) = jwt {
88 bundle.set_jwt_authorities(&jwt);
89 }
90 Ok(bundle)
91 }
92
93 pub fn get_x509_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<x509bundle::Bundle> {
94 self.check_closed()?;
95 let x509 = self
96 .x509_authorities
97 .read()
98 .ok()
99 .and_then(|guard| guard.get(&trust_domain).cloned())
100 .ok_or_else(|| {
101 crate::workloadapi::Error::new(format!(
102 "bundlesource: no X.509 bundle for trust domain {:?}",
103 trust_domain
104 ))
105 })?;
106 Ok(x509bundle::Bundle::from_x509_authorities(trust_domain, &x509))
107 }
108
109 pub fn get_jwt_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> Result<jwtbundle::Bundle> {
110 self.check_closed()?;
111 let jwt = self
112 .jwt_authorities
113 .read()
114 .ok()
115 .and_then(|guard| guard.get(&trust_domain).cloned())
116 .ok_or_else(|| {
117 crate::workloadapi::Error::new(format!(
118 "bundlesource: no JWT bundle for trust domain {:?}",
119 trust_domain
120 ))
121 })?;
122 Ok(jwtbundle::Bundle::from_jwt_authorities(trust_domain, &jwt))
123 }
124
125 pub async fn wait_until_updated(&self, ctx: &Context) -> Result<()> {
126 self.watcher.wait_until_updated(ctx).await
127 }
128
129 pub fn updated(&self) -> tokio::sync::watch::Receiver<u64> {
130 self.watcher.updated()
131 }
132
133 fn check_closed(&self) -> Result<()> {
134 if self.closed.load(std::sync::atomic::Ordering::SeqCst) {
135 return Err(crate::workloadapi::Error::new("bundlesource: source is closed"));
136 }
137 Ok(())
138 }
139}
140
141impl spiffebundle::Source for BundleSource {
142 fn get_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> spiffebundle::Result<spiffebundle::Bundle> {
143 self.get_bundle_for_trust_domain(trust_domain)
144 .map_err(|err| spiffebundle::Error::new(err.to_string()))
145 }
146}
147
148impl x509bundle::Source for BundleSource {
149 fn get_x509_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> x509bundle::Result<x509bundle::Bundle> {
150 self.get_x509_bundle_for_trust_domain(trust_domain)
151 .map_err(|err| x509bundle::Error::new(err.to_string()))
152 }
153}
154
155impl jwtbundle::Source for BundleSource {
156 fn get_jwt_bundle_for_trust_domain(&self, trust_domain: TrustDomain) -> jwtbundle::Result<jwtbundle::Bundle> {
157 self.get_jwt_bundle_for_trust_domain(trust_domain)
158 .map_err(|err| jwtbundle::Error::new(err.to_string()))
159 }
160}