use super::errors::X509SourceError;
use super::metrics::MetricsRecorder;
use super::source::X509Source;
use crate::workload_api::WorkloadApiClient;
use crate::x509_source::types::{ClientFactory, SvidPicker};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Copy, Debug)]
pub struct ReconnectConfig {
pub min_backoff: Duration,
pub max_backoff: Duration,
}
impl Default for ReconnectConfig {
fn default() -> Self {
Self {
min_backoff: Duration::from_millis(200),
max_backoff: Duration::from_secs(10),
}
}
}
impl ReconnectConfig {
pub(crate) fn normalize(mut self) -> Self {
if self.min_backoff > self.max_backoff {
std::mem::swap(&mut self.min_backoff, &mut self.max_backoff);
}
self
}
}
pub(super) fn normalize_reconnect(reconnect: ReconnectConfig) -> ReconnectConfig {
reconnect.normalize()
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ResourceLimits {
pub max_svids: Option<usize>,
pub max_bundles: Option<usize>,
pub max_bundle_der_bytes: Option<usize>,
}
impl Default for ResourceLimits {
fn default() -> Self {
Self {
max_svids: Some(100),
max_bundles: Some(200),
max_bundle_der_bytes: Some(4 * 1024 * 1024), }
}
}
impl ResourceLimits {
pub const fn unlimited() -> Self {
Self {
max_svids: None,
max_bundles: None,
max_bundle_der_bytes: None,
}
}
pub fn default_limits() -> Self {
Self::default()
}
}
pub struct X509SourceBuilder {
svid_picker: Option<Box<dyn SvidPicker>>,
reconnect: ReconnectConfig,
make_client: Option<ClientFactory>,
limits: ResourceLimits,
metrics: Option<Arc<dyn MetricsRecorder>>,
shutdown_timeout: Option<Duration>,
}
impl Debug for X509SourceBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("X509SourceBuilder")
.field(
"svid_picker",
&self.svid_picker.as_ref().map(|_| "<SvidPicker>"),
)
.field("reconnect", &self.reconnect)
.field("limits", &self.limits)
.field(
"make_client",
&self.make_client.as_ref().map(|_| "<ClientFactory>"),
)
.field(
"metrics",
&self.metrics.as_ref().map(|_| "<MetricsRecorder>"),
)
.field("shutdown_timeout", &self.shutdown_timeout)
.finish()
}
}
impl Default for X509SourceBuilder {
fn default() -> Self {
Self::new()
}
}
impl X509SourceBuilder {
pub fn new() -> Self {
Self {
svid_picker: None,
reconnect: ReconnectConfig::default(),
make_client: None,
limits: ResourceLimits::default(),
metrics: None,
shutdown_timeout: Some(Duration::from_secs(30)),
}
}
#[must_use]
pub fn endpoint(mut self, endpoint: impl AsRef<str>) -> Self {
let endpoint: Arc<str> = Arc::from(endpoint.as_ref());
let factory: ClientFactory = Arc::new(move || {
let endpoint = Arc::clone(&endpoint);
Box::pin(async move { WorkloadApiClient::connect_to(&endpoint).await })
});
self.make_client = Some(factory);
self
}
#[must_use]
pub fn client_factory(mut self, factory: ClientFactory) -> Self {
self.make_client = Some(factory);
self
}
#[must_use]
pub fn picker<P>(mut self, picker: P) -> Self
where
P: SvidPicker + 'static,
{
self.svid_picker = Some(Box::new(picker));
self
}
#[must_use]
pub const fn reconnect_backoff(mut self, min_backoff: Duration, max_backoff: Duration) -> Self {
self.reconnect = ReconnectConfig {
min_backoff,
max_backoff,
};
self
}
#[must_use]
pub const fn resource_limits(mut self, limits: ResourceLimits) -> Self {
self.limits = limits;
self
}
#[must_use]
pub fn metrics(mut self, metrics: Arc<dyn MetricsRecorder>) -> Self {
self.metrics = Some(metrics);
self
}
#[must_use]
pub const fn shutdown_timeout(mut self, timeout: Option<Duration>) -> Self {
self.shutdown_timeout = timeout;
self
}
pub async fn build(self) -> Result<X509Source, X509SourceError> {
let make_client = self.make_client.unwrap_or_else(|| {
Arc::new(|| Box::pin(async { WorkloadApiClient::connect_env().await }))
});
X509Source::build_with(
make_client,
self.svid_picker,
self.reconnect,
self.limits,
self.metrics,
self.shutdown_timeout,
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reconnect_config_normalization() {
let config = ReconnectConfig {
min_backoff: Duration::from_secs(10),
max_backoff: Duration::from_secs(1),
};
let normalized = config.normalize();
assert_eq!(normalized.min_backoff, Duration::from_secs(1));
assert_eq!(normalized.max_backoff, Duration::from_secs(10));
let config = ReconnectConfig {
min_backoff: Duration::from_millis(200),
max_backoff: Duration::from_secs(10),
};
let normalized = config.normalize();
assert_eq!(normalized.min_backoff, Duration::from_millis(200));
assert_eq!(normalized.max_backoff, Duration::from_secs(10));
let config = ReconnectConfig {
min_backoff: Duration::from_secs(5),
max_backoff: Duration::from_secs(5),
};
let normalized = config.normalize();
assert_eq!(normalized.min_backoff, Duration::from_secs(5));
assert_eq!(normalized.max_backoff, Duration::from_secs(5));
}
#[test]
fn reconnect_config_setter_does_not_normalize() {
let builder = X509SourceBuilder::new()
.reconnect_backoff(Duration::from_secs(10), Duration::from_secs(1));
assert_eq!(builder.reconnect.min_backoff, Duration::from_secs(10));
assert_eq!(builder.reconnect.max_backoff, Duration::from_secs(1));
}
#[test]
fn normalize_reconnect_authoritative_boundary() {
let inverted = ReconnectConfig {
min_backoff: Duration::from_secs(10),
max_backoff: Duration::from_secs(1),
};
let normalized = normalize_reconnect(inverted);
assert_eq!(normalized.min_backoff, Duration::from_secs(1));
assert_eq!(normalized.max_backoff, Duration::from_secs(10));
let valid = ReconnectConfig {
min_backoff: Duration::from_millis(200),
max_backoff: Duration::from_secs(10),
};
let normalized = normalize_reconnect(valid);
assert_eq!(normalized.min_backoff, Duration::from_millis(200));
assert_eq!(normalized.max_backoff, Duration::from_secs(10));
}
}