spiffe_rs/workloadapi/
x509source.rs1use crate::bundle::x509bundle;
2use crate::svid::x509svid;
3use crate::workloadapi::option::{X509SourceConfig, X509SourceOption};
4use crate::workloadapi::{Context, Result, Watcher, X509Context};
5use std::sync::{Arc, RwLock};
6
7pub struct X509Source {
10 watcher: Watcher,
11 svid: Arc<RwLock<Option<x509svid::SVID>>>,
12 bundles: Arc<RwLock<Option<x509bundle::Set>>>,
13 closed: std::sync::atomic::AtomicBool,
14}
15
16impl X509Source {
17 pub async fn new<I>(ctx: &Context, options: I) -> Result<X509Source>
21 where
22 I: IntoIterator<Item = Arc<dyn X509SourceOption>>,
23 {
24 let mut config = X509SourceConfig::default();
25 for opt in options {
26 opt.configure_x509_source(&mut config);
27 }
28
29 let picker = config.picker.clone();
30 let svid_slot = Arc::new(RwLock::new(None));
31 let bundles_slot = Arc::new(RwLock::new(None));
32 let svid_slot_clone = svid_slot.clone();
33 let bundles_slot_clone = bundles_slot.clone();
34 let handler = Arc::new(move |context: X509Context| {
35 let svid = match &picker {
36 Some(picker) => picker(&context.svids),
37 None => match context.svids.first() {
38 Some(svid) => svid.clone(),
39 None => return,
40 },
41 };
42 if let Ok(mut guard) = svid_slot_clone.write() {
43 *guard = Some(svid);
44 }
45 if let Ok(mut guard) = bundles_slot_clone.write() {
46 *guard = Some(context.bundles);
47 }
48 });
49
50 let watcher = Watcher::new(ctx, config.watcher, Some(handler), None).await?;
51 Ok(X509Source {
52 watcher,
53 svid: svid_slot,
54 bundles: bundles_slot,
55 closed: std::sync::atomic::AtomicBool::new(false),
56 })
57 }
58
59 pub async fn close(&self) -> Result<()> {
61 self.closed.store(true, std::sync::atomic::Ordering::SeqCst);
62 self.watcher.close().await
63 }
64
65 pub fn get_x509_svid(&self) -> Result<x509svid::SVID> {
67 self.check_closed()?;
68 self.svid
69 .read()
70 .ok()
71 .and_then(|guard| guard.clone())
72 .ok_or_else(|| crate::workloadapi::Error::new("x509source: missing X509-SVID"))
73 }
74
75 pub fn get_x509_bundle_for_trust_domain(
77 &self,
78 trust_domain: crate::spiffeid::TrustDomain,
79 ) -> Result<x509bundle::Bundle> {
80 self.check_closed()?;
81 self.bundles
82 .read()
83 .ok()
84 .and_then(|guard| guard.as_ref().and_then(|b| b.get_x509_bundle_for_trust_domain(trust_domain).ok()))
85 .ok_or_else(|| crate::workloadapi::Error::new("x509source: no X.509 bundle found"))
86 }
87
88 pub async fn wait_until_updated(&self, ctx: &Context) -> Result<()> {
90 self.watcher.wait_until_updated(ctx).await
91 }
92
93 pub fn updated(&self) -> tokio::sync::watch::Receiver<u64> {
95 self.watcher.updated()
96 }
97
98 fn check_closed(&self) -> Result<()> {
99 if self.closed.load(std::sync::atomic::Ordering::SeqCst) {
100 return Err(crate::workloadapi::Error::new("x509source: source is closed"));
101 }
102 Ok(())
103 }
104}
105
106impl x509svid::Source for X509Source {
107 fn get_x509_svid(&self) -> x509svid::Result<x509svid::SVID> {
108 self.get_x509_svid()
109 .map_err(|err| x509svid::Error::new(err.to_string()))
110 }
111}
112
113impl x509bundle::Source for X509Source {
114 fn get_x509_bundle_for_trust_domain(
115 &self,
116 trust_domain: crate::spiffeid::TrustDomain,
117 ) -> x509bundle::Result<x509bundle::Bundle> {
118 self.get_x509_bundle_for_trust_domain(trust_domain)
119 .map_err(|err| x509bundle::Error::new(err.to_string()))
120 }
121}