use crate::constants::DEFAULT_SVID;
use crate::workload_api::pb::workload::{
X509BundlesRequest, X509BundlesResponse, X509svidRequest, X509svidResponse,
};
use crate::workload_api::x509_context::X509Context;
use crate::{
TrustDomain, WorkloadApiClient, WorkloadApiError, X509Bundle, X509BundleSet, X509Svid,
};
use futures::{Stream, StreamExt as _};
use std::sync::Arc;
impl WorkloadApiClient {
pub async fn fetch_x509_svid(&self) -> Result<X509Svid, WorkloadApiError> {
let request = X509svidRequest::default();
let mut client = self.client.clone();
let grpc_stream_response: tonic::Response<tonic::Streaming<X509svidResponse>> =
client.fetch_x509svid(request).await?;
let resp = Self::first_message(grpc_stream_response.into_inner()).await?;
Self::parse_x509_svid_from_grpc_response(&resp)
}
pub async fn fetch_all_x509_svids(&self) -> Result<Vec<X509Svid>, WorkloadApiError> {
let request = X509svidRequest::default();
let mut client = self.client.clone();
let grpc_stream_response: tonic::Response<tonic::Streaming<X509svidResponse>> =
client.fetch_x509svid(request).await?;
let response = Self::first_message(grpc_stream_response.into_inner()).await?;
Self::parse_x509_svids_from_grpc_response(&response)
}
pub async fn fetch_x509_bundles(&self) -> Result<X509BundleSet, WorkloadApiError> {
let request = X509BundlesRequest::default();
let mut client = self.client.clone();
let grpc_stream_response: tonic::Response<tonic::Streaming<X509BundlesResponse>> =
client.fetch_x509_bundles(request).await?;
let response = Self::first_message(grpc_stream_response.into_inner()).await?;
Self::parse_x509_bundle_set_from_grpc_response(response)
}
#[cfg(feature = "x509")]
pub async fn fetch_x509_context(&self) -> Result<X509Context, WorkloadApiError> {
let request = X509svidRequest::default();
let mut client = self.client.clone();
let grpc_stream_response: tonic::Response<tonic::Streaming<X509svidResponse>> =
client.fetch_x509svid(request).await?;
let response = Self::first_message(grpc_stream_response.into_inner()).await?;
Self::parse_x509_context_from_grpc_response(response)
}
pub async fn stream_x509_contexts(
&self,
) -> Result<
impl Stream<Item = Result<X509Context, WorkloadApiError>> + Send + 'static + use<>,
WorkloadApiError,
> {
let request = X509svidRequest::default();
let mut client = self.client.clone();
let response = client.fetch_x509svid(request).await?;
let stream = response.into_inner().map(|message| {
message
.map_err(WorkloadApiError::from)
.and_then(Self::parse_x509_context_from_grpc_response)
});
Ok(Box::pin(stream))
}
pub async fn stream_x509_svids(
&self,
) -> Result<
impl Stream<Item = Result<X509Svid, WorkloadApiError>> + Send + 'static + use<>,
WorkloadApiError,
> {
let request = X509svidRequest::default();
let mut client = self.client.clone();
let response = client.fetch_x509svid(request).await?;
let stream = response.into_inner().map(|message| {
let resp = message.map_err(WorkloadApiError::from)?;
Self::parse_x509_svid_from_grpc_response(&resp)
});
Ok(Box::pin(stream))
}
pub async fn stream_x509_bundles(
&self,
) -> Result<
impl Stream<Item = Result<X509BundleSet, WorkloadApiError>> + Send + 'static + use<>,
WorkloadApiError,
> {
let request = X509BundlesRequest::default();
let mut client = self.client.clone();
let response = client.fetch_x509_bundles(request).await?;
let stream = response.into_inner().map(|message| {
message
.map_err(WorkloadApiError::from)
.and_then(Self::parse_x509_bundle_set_from_grpc_response)
});
Ok(Box::pin(stream))
}
}
impl WorkloadApiClient {
fn parse_x509_svid_from_grpc_response(
response: &X509svidResponse,
) -> Result<X509Svid, WorkloadApiError> {
let svid = response
.svids
.get(DEFAULT_SVID)
.ok_or(WorkloadApiError::EmptyResponse)?;
X509Svid::parse_from_der_with_hint(
svid.x509_svid.as_ref(),
svid.x509_svid_key.as_ref(),
(!svid.hint.is_empty()).then(|| Arc::<str>::from(svid.hint.as_str())),
)
.map_err(WorkloadApiError::from)
}
fn parse_x509_svids_from_grpc_response(
response: &X509svidResponse,
) -> Result<Vec<X509Svid>, WorkloadApiError> {
response
.svids
.iter()
.map(|svid| {
let hint = (!svid.hint.is_empty()).then(|| Arc::<str>::from(svid.hint.as_str()));
X509Svid::parse_from_der_with_hint(
svid.x509_svid.as_ref(),
svid.x509_svid_key.as_ref(),
hint,
)
.map_err(WorkloadApiError::from)
})
.collect()
}
fn parse_x509_bundle_set_from_grpc_response(
response: X509BundlesResponse,
) -> Result<X509BundleSet, WorkloadApiError> {
let bundles: Result<Vec<_>, _> = response
.bundles
.into_iter()
.map(|(td, bundle_data)| {
let trust_domain = TrustDomain::try_from(td)?;
X509Bundle::parse_from_der(trust_domain, &bundle_data)
.map_err(WorkloadApiError::from)
})
.collect();
let mut bundle_set = X509BundleSet::new();
for bundle in bundles? {
bundle_set.add_bundle(bundle);
}
Ok(bundle_set)
}
fn parse_x509_context_from_grpc_response(
response: X509svidResponse,
) -> Result<X509Context, WorkloadApiError> {
let mut svids: Vec<Arc<X509Svid>> = Vec::new();
let mut bundle_set = X509BundleSet::new();
for svid in response.svids {
let hint = (!svid.hint.is_empty()).then(|| Arc::<str>::from(svid.hint.as_str()));
let x509_svid = X509Svid::parse_from_der_with_hint(
svid.x509_svid.as_ref(),
svid.x509_svid_key.as_ref(),
hint,
)
.map_err(WorkloadApiError::from)?;
let trust_domain = x509_svid.spiffe_id().trust_domain().clone();
svids.push(Arc::new(x509_svid));
let bundle = X509Bundle::parse_from_der(trust_domain, svid.bundle.as_ref())
.map_err(WorkloadApiError::from)?;
bundle_set.add_bundle(bundle);
}
for (trust_domain, bundle) in response.federated_bundles {
let trust_domain = TrustDomain::try_from(trust_domain)?;
let x509_bundle = X509Bundle::parse_from_der(trust_domain, bundle.as_ref())
.map_err(WorkloadApiError::from)?;
bundle_set.add_bundle(x509_bundle);
}
Ok(X509Context::new(svids, Arc::new(bundle_set)))
}
}