spire_api/agent/
delegated_identity.rs

1//! Delegate Identity (SPIRE Agent Admin API).
2//!
3//! Protobuf:
4//! - `https://github.com/spiffe/spire-api-sdk/blob/main/proto/spire/api/agent/delegatedidentity/v1/delegatedidentity.proto`
5//!
6//! Docs:
7//! - `https://spiffe.io/docs/latest/deploying/spire_agent/#delegated-identity-api`
8//!
9//! Notes:
10//! - This API must be used over the SPIRE Agent **admin** socket, not the Workload API socket.
11
12use crate::pb::spire::api::agent::delegatedidentity::v1::delegated_identity_client::DelegatedIdentityClient as DelegatedIdentityApiClient;
13use crate::pb::spire::api::agent::delegatedidentity::v1::{
14    FetchJwtsviDsRequest, SubscribeToJwtBundlesRequest, SubscribeToJwtBundlesResponse,
15    SubscribeToX509BundlesRequest, SubscribeToX509BundlesResponse, SubscribeToX509sviDsRequest,
16    SubscribeToX509sviDsResponse,
17};
18use crate::pb::spire::api::types::Jwtsvid as ProtoJwtSvid;
19
20use crate::selectors::Selector;
21
22use spiffe::constants::DEFAULT_SVID;
23use spiffe::transport::{Endpoint, TransportError};
24use spiffe::{
25    JwtBundle, JwtBundleError, JwtBundleSet, JwtSvid, JwtSvidError, SpiffeIdError, TrustDomain,
26    X509Bundle, X509BundleError, X509BundleSet, X509Svid, X509SvidError,
27};
28
29use std::str::FromStr;
30
31use tokio_stream::{Stream, StreamExt};
32
33/// Name of the environment variable that holds the default socket endpoint path.
34pub const ADMIN_SOCKET_ENV: &str = "SPIRE_ADMIN_ENDPOINT_SOCKET";
35
36/// Errors produced by the Delegated Identity API client.
37#[derive(Debug, thiserror::Error)]
38#[non_exhaustive]
39pub enum DelegatedIdentityError {
40    /// The environment variable for the admin endpoint socket is not set.
41    #[error("missing admin endpoint socket path ({ADMIN_SOCKET_ENV})")]
42    MissingEndpointSocket,
43
44    /// Failed to parse the endpoint URI.
45    #[error("invalid endpoint: {0}")]
46    Endpoint(#[from] spiffe::transport::EndpointError),
47
48    /// Transport error while connecting to the API.
49    #[error(transparent)]
50    Transport(#[from] TransportError),
51
52    /// The API returned an empty response.
53    #[error("empty response")]
54    EmptyResponse,
55
56    /// Failed to parse a JWT SVID.
57    #[error("JWT SVID error: {0}")]
58    JwtSvid(#[from] JwtSvidError),
59
60    /// Failed to parse an X.509 bundle.
61    #[error("X.509 bundle error: {0}")]
62    X509Bundle(#[from] X509BundleError),
63
64    /// Failed to parse an X.509 SVID.
65    #[error("X.509 SVID error: {0}")]
66    X509Svid(#[from] X509SvidError),
67
68    /// Failed to parse a JWT bundle.
69    #[error("JWT bundle error: {0}")]
70    JwtBundle(#[from] JwtBundleError),
71
72    /// Failed to parse a SPIFFE identifier.
73    #[error("SPIFFE ID error: {0}")]
74    SpiffeId(#[from] SpiffeIdError),
75}
76
77/// Load the admin endpoint socket URI from the environment.
78///
79/// ## Errors
80///
81/// Returns [`DelegatedIdentityError`] if the environment variable is not set or the value is invalid.
82pub fn admin_endpoint_from_env() -> Result<Endpoint, DelegatedIdentityError> {
83    let raw = std::env::var(ADMIN_SOCKET_ENV)
84        .map_err(|_| DelegatedIdentityError::MissingEndpointSocket)?;
85    Ok(Endpoint::parse(&raw)?)
86}
87
88/// Impl for `DelegatedIdentity` API
89#[derive(Debug, Clone)]
90pub struct DelegatedIdentityClient {
91    client: DelegatedIdentityApiClient<tonic::transport::Channel>,
92}
93
94/// Represents that a delegate attestation request can have one-of
95/// PID (let agent attest PID->selectors) or selectors (delegate has already attested a PID)
96#[derive(Debug, Clone)]
97pub enum DelegateAttestationRequest {
98    /// PID (let agent attest PID->selectors)
99    Pid(i32),
100    /// selectors (delegate has already attested a PID and generated full set of selectors)
101    Selectors(Vec<Selector>),
102}
103
104/// Constructors
105impl DelegatedIdentityClient {
106    /// Create a client by connecting to the given admin endpoint URI string (e.g. `unix:///...`).
107    ///
108    /// # Arguments
109    ///
110    /// * `endpoint` - The path to the UNIX domain socket, which can optionally start with "unix:".
111    ///
112    /// # Returns
113    ///
114    /// * `Result<Self, DelegatedIdentityError>` - Returns an instance of `DelegatedIdentityClient` if successful, otherwise returns an error.
115    ///
116    /// # Errors
117    ///
118    /// This function will return an error if the provided socket path is invalid or if there are issues connecting.
119    pub async fn connect_to(endpoint: impl AsRef<str>) -> Result<Self, DelegatedIdentityError> {
120        let endpoint = Endpoint::parse(endpoint.as_ref())?;
121        Self::connect(endpoint).await
122    }
123
124    /// Creates a new `DelegatedIdentityClient` using the default socket endpoint address.
125    ///
126    /// Requires that the environment variable `SPIFFE_ENDPOINT_SOCKET` be set with
127    /// the path to the Workload API endpoint socket.
128    ///
129    /// # Errors
130    ///
131    /// The function returns a variant of [`DelegatedIdentityError`] if environment variable is not set or if
132    /// the provided socket path is not valid.
133    pub async fn connect_env() -> Result<Self, DelegatedIdentityError> {
134        let endpoint = admin_endpoint_from_env()?;
135        Self::connect(endpoint).await
136    }
137
138    /// Create a client by connecting to a parsed SPIFFE [`Endpoint`].
139    ///
140    /// ## Errors
141    ///
142    /// Returns [`GrpcClientError`] if the connection fails or the endpoint is unsupported.
143    pub async fn connect(endpoint: Endpoint) -> Result<Self, DelegatedIdentityError> {
144        let channel = spiffe::transport::connector::connect(&endpoint).await?;
145        Ok(Self {
146            client: DelegatedIdentityApiClient::new(channel),
147        })
148    }
149
150    /// Creates a new [`DelegatedIdentityClient`] from an established gRPC channel.
151    ///
152    /// This constructor does not perform any network I/O. It only wraps the
153    /// provided [`tonic::transport::Channel`] and prepares the client for use.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`DelegatedIdentityError`] if the client could not be constructed from
158    /// the provided channel (for example, due to an invalid configuration).
159    pub fn new(conn: tonic::transport::Channel) -> Result<Self, DelegatedIdentityError> {
160        Ok(DelegatedIdentityClient {
161            client: DelegatedIdentityApiClient::new(conn),
162        })
163    }
164}
165
166impl DelegatedIdentityClient {
167    /// Fetches a single X509 SPIFFE Verifiable Identity Document (SVID).
168    ///
169    /// This method connects to the SPIFFE Workload API and returns the first X509 SVID in the response.
170    ///
171    /// # Arguments
172    ///
173    /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
174    ///
175    /// # Returns
176    ///
177    /// On success, it returns a valid [`X509Svid`] which represents the parsed SVID.
178    /// If the fetch operation or the parsing fails, it returns a [`DelegatedIdentityError`].
179    ///
180    /// # Errors
181    ///
182    /// Returns [`DelegatedIdentityError`] if the gRPC call fails or if the SVID could not be parsed from the gRPC response.
183    pub async fn fetch_x509_svid(
184        &self,
185        attest_type: DelegateAttestationRequest,
186    ) -> Result<X509Svid, DelegatedIdentityError> {
187        let request = make_x509svid_request(attest_type);
188
189        self.client
190            .clone()
191            .subscribe_to_x509svi_ds(request)
192            .await?
193            .into_inner()
194            .message()
195            .await?
196            .ok_or(DelegatedIdentityError::EmptyResponse)
197            .and_then(|resp| Self::parse_x509_svid_from_grpc_response(&resp))
198    }
199
200    /// Watches the stream of [`X509Svid`] updates.
201    ///
202    /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Svid`].
203    /// The returned stream can be used to asynchronously yield new `X509Svid` updates as they become available.
204    ///
205    /// # Arguments
206    ///
207    /// * `selectors` - A list of selectors to filter the stream of [`X509Svid`] updates.
208    ///
209    /// # Returns
210    ///
211    /// Returns a stream of `Result<X509Svid, DelegatedIdentityError>`. Each item represents an updated [`X509Svid`] or an error if
212    /// there was a problem processing an update from the stream.
213    ///
214    /// # Errors
215    ///
216    /// The function can return an error variant of [`DelegatedIdentityError`] in the following scenarios:
217    ///
218    /// * There's an issue connecting to the Workload API.
219    /// * An error occurs while setting up the stream.
220    ///
221    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
222    pub async fn stream_x509_svids(
223        &self,
224        attest_type: DelegateAttestationRequest,
225    ) -> Result<
226        impl Stream<Item = Result<X509Svid, DelegatedIdentityError>> + Send + '_,
227        DelegatedIdentityError,
228    > {
229        let request = match attest_type {
230            DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
231                selectors: selectors.into_iter().map(Into::into).collect(),
232                pid: 0,
233            },
234            DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
235                selectors: Vec::new(),
236                pid,
237            },
238        };
239
240        let response = self.client.clone().subscribe_to_x509svi_ds(request).await?;
241
242        let stream = response.into_inner().map(|message| {
243            message
244                .map_err(DelegatedIdentityError::from)
245                .and_then(|resp| Self::parse_x509_svid_from_grpc_response(&resp))
246        });
247
248        Ok(stream)
249    }
250
251    /// Fetches [`X509BundleSet`], that is a set of [`X509Bundle`] keyed by the trust domain to which they belong.
252    ///
253    /// # Errors
254    ///
255    /// The function returns a variant of [`DelegatedIdentityError`] if there is an error connecting to the Workload API or
256    /// there is a problem processing the response.
257    pub async fn fetch_x509_bundles(&self) -> Result<X509BundleSet, DelegatedIdentityError> {
258        let request = SubscribeToX509BundlesRequest::default();
259
260        let response = self
261            .client
262            .clone()
263            .subscribe_to_x509_bundles(request)
264            .await?;
265
266        let initial = response
267            .into_inner()
268            .message()
269            .await?
270            .ok_or(DelegatedIdentityError::EmptyResponse)?;
271
272        Self::parse_x509_bundle_set_from_grpc_response(initial)
273    }
274
275    /// Watches the stream of [`X509Bundle`] updates.
276    ///
277    /// This function establishes a stream with the Workload API to continuously receive updates for the [`X509Bundle`].
278    /// The returned stream can be used to asynchronously yield new `X509Bundle` updates as they become available.
279    ///
280    /// # Returns
281    ///
282    /// Returns a stream of `Result<X509BundleSet, DelegatedIdentityError>`. Each item represents an updated [`X509BundleSet`] or an error if
283    /// there was a problem processing an update from the stream.
284    ///
285    /// # Errors
286    ///
287    /// The function can return an error variant of [`DelegatedIdentityError`] in the following scenarios:
288    ///
289    /// * There's an issue connecting to the Admin API.
290    /// * An error occurs while setting up the stream.
291    ///
292    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
293    pub async fn stream_x509_bundles(
294        &self,
295    ) -> Result<
296        impl Stream<Item = Result<X509BundleSet, DelegatedIdentityError>> + Send + 'static,
297        DelegatedIdentityError,
298    > {
299        let request = SubscribeToX509BundlesRequest::default();
300
301        let response = self
302            .client
303            .clone()
304            .subscribe_to_x509_bundles(request)
305            .await?;
306
307        Ok(response.into_inner().map(|msg| {
308            msg.map_err(DelegatedIdentityError::from)
309                .and_then(Self::parse_x509_bundle_set_from_grpc_response)
310        }))
311    }
312
313    /// Fetches a list of [`JwtSvid`] parsing the JWT token in the Workload API response, for the given audience and selectors.
314    ///
315    /// # Arguments
316    ///
317    /// * `audience`  - A list of audiences to include in the JWT token. Cannot be empty nor contain only empty strings.
318    /// * `selectors` - A list of selectors to filter the list of [`JwtSvid`].
319    ///
320    /// # Errors
321    ///
322    /// The function returns a variant of [`DelegatedIdentityError`] if there is an error connecting to the Workload API or
323    /// there is a problem processing the response.
324    pub async fn fetch_jwt_svids<T: AsRef<str> + ToString>(
325        &self,
326        audience: &[T],
327        attest_type: DelegateAttestationRequest,
328    ) -> Result<Vec<JwtSvid>, DelegatedIdentityError> {
329        let request = make_jwtsvid_request(audience, attest_type);
330
331        let resp = self
332            .client
333            .clone()
334            .fetch_jwtsvi_ds(request)
335            .await?
336            .into_inner()
337            .svids;
338
339        Self::parse_jwt_svid_from_grpc_response(resp)
340    }
341
342    /// Watches the stream of [`JwtBundleSet`] updates.
343    ///
344    /// This function establishes a stream with the Workload API to continuously receive updates for the [`JwtBundleSet`].
345    /// The returned stream can be used to asynchronously yield new `JwtBundleSet` updates as they become available.
346    ///
347    /// # Returns
348    ///
349    /// Returns a stream of `Result<JwtBundleSet, DelegatedIdentityError>`. Each item represents an updated [`JwtBundleSet`] or an error if
350    /// there was a problem processing an update from the stream.
351    ///
352    /// # Errors
353    ///
354    /// The function can return an error variant of [`DelegatedIdentityError`] in the following scenarios:
355    ///
356    /// * There's an issue connecting to the Workload API.
357    /// * An error occurs while setting up the stream.
358    ///
359    /// Individual stream items might also be errors if there's an issue processing the response for a specific update.
360    pub async fn stream_jwt_bundles(
361        &self,
362    ) -> Result<
363        impl Stream<Item = Result<JwtBundleSet, DelegatedIdentityError>> + Send + 'static,
364        DelegatedIdentityError,
365    > {
366        let request = SubscribeToJwtBundlesRequest::default();
367
368        let response = self
369            .client
370            .clone()
371            .subscribe_to_jwt_bundles(request)
372            .await?;
373
374        Ok(response.into_inner().map(|msg| {
375            msg.map_err(DelegatedIdentityError::from)
376                .and_then(Self::parse_jwt_bundle_set_from_grpc_response)
377        }))
378    }
379
380    /// Fetches [`JwtBundleSet`] that is a set of [`JwtBundle`] keyed by the trust domain to which they belong.
381    ///
382    /// # Errors
383    ///
384    /// The function returns a variant of [`DelegatedIdentityError`] if there is an error connecting to the Workload API or
385    /// there is a problem processing the response.
386    pub async fn fetch_jwt_bundles(&self) -> Result<JwtBundleSet, DelegatedIdentityError> {
387        let request = SubscribeToJwtBundlesRequest::default();
388
389        let response = self
390            .client
391            .clone()
392            .subscribe_to_jwt_bundles(request)
393            .await?;
394
395        let initial = response
396            .into_inner()
397            .message()
398            .await?
399            .ok_or(DelegatedIdentityError::EmptyResponse)?;
400
401        Self::parse_jwt_bundle_set_from_grpc_response(initial)
402    }
403}
404
405impl DelegatedIdentityClient {
406    fn parse_x509_svid_from_grpc_response(
407        response: &SubscribeToX509sviDsResponse,
408    ) -> Result<X509Svid, DelegatedIdentityError> {
409        let svid = response
410            .x509_svids
411            .get(DEFAULT_SVID)
412            .ok_or(DelegatedIdentityError::EmptyResponse)?;
413
414        let x509_svid = svid
415            .x509_svid
416            .as_ref()
417            .ok_or(DelegatedIdentityError::EmptyResponse)?;
418
419        let total_length: usize = x509_svid
420            .cert_chain
421            .iter()
422            .map(prost::bytes::Bytes::len)
423            .sum();
424        let mut cert_chain_bytes = Vec::with_capacity(total_length);
425        for c in &x509_svid.cert_chain {
426            cert_chain_bytes.extend_from_slice(c);
427        }
428
429        X509Svid::parse_from_der(&cert_chain_bytes, svid.x509_svid_key.as_ref()).map_err(Into::into)
430    }
431
432    fn parse_jwt_svid_from_grpc_response(
433        svids: Vec<ProtoJwtSvid>,
434    ) -> Result<Vec<JwtSvid>, DelegatedIdentityError> {
435        svids
436            .into_iter()
437            .map(|r| JwtSvid::from_str(&r.token).map_err(DelegatedIdentityError::from))
438            .collect()
439    }
440
441    fn parse_jwt_bundle_set_from_grpc_response(
442        response: SubscribeToJwtBundlesResponse,
443    ) -> Result<JwtBundleSet, DelegatedIdentityError> {
444        let mut bundle_set = JwtBundleSet::new();
445
446        for (td, bundle_data) in response.bundles {
447            let trust_domain = TrustDomain::try_from(td)?;
448            let bundle = JwtBundle::from_jwt_authorities(trust_domain, &bundle_data)
449                .map_err(DelegatedIdentityError::from)?;
450            bundle_set.add_bundle(bundle);
451        }
452
453        Ok(bundle_set)
454    }
455
456    fn parse_x509_bundle_set_from_grpc_response(
457        response: SubscribeToX509BundlesResponse,
458    ) -> Result<X509BundleSet, DelegatedIdentityError> {
459        let mut bundle_set = X509BundleSet::new();
460
461        for (td, bundle) in response.ca_certificates {
462            let trust_domain = TrustDomain::try_from(td)?;
463            let parsed = X509Bundle::parse_from_der(trust_domain, &bundle)
464                .map_err(DelegatedIdentityError::from)?;
465            bundle_set.add_bundle(parsed);
466        }
467
468        Ok(bundle_set)
469    }
470}
471
472// Error conversions
473impl From<tonic::Status> for DelegatedIdentityError {
474    fn from(status: tonic::Status) -> Self {
475        Self::Transport(TransportError::Status(status))
476    }
477}
478
479impl From<tonic::transport::Error> for DelegatedIdentityError {
480    fn from(err: tonic::transport::Error) -> Self {
481        Self::Transport(TransportError::Tonic(err))
482    }
483}
484
485fn make_x509svid_request(attest_type: DelegateAttestationRequest) -> SubscribeToX509sviDsRequest {
486    match attest_type {
487        DelegateAttestationRequest::Selectors(selectors) => SubscribeToX509sviDsRequest {
488            selectors: selectors.into_iter().map(Into::into).collect(),
489            pid: 0,
490        },
491        DelegateAttestationRequest::Pid(pid) => SubscribeToX509sviDsRequest {
492            selectors: Vec::new(),
493            pid,
494        },
495    }
496}
497
498fn make_jwtsvid_request<T: AsRef<str> + ToString>(
499    audience: &[T],
500    attest_type: DelegateAttestationRequest,
501) -> FetchJwtsviDsRequest {
502    let audience = audience
503        .iter()
504        .map(std::string::ToString::to_string)
505        .collect();
506
507    match attest_type {
508        DelegateAttestationRequest::Selectors(selectors) => FetchJwtsviDsRequest {
509            audience,
510            selectors: selectors.into_iter().map(Into::into).collect(),
511            pid: 0,
512        },
513        DelegateAttestationRequest::Pid(pid) => FetchJwtsviDsRequest {
514            audience,
515            selectors: Vec::new(),
516            pid,
517        },
518    }
519}