use super::base_publisher::BasePublisher;
use gaxi::options::ClientConfig;
use google_cloud_gax::backoff_policy::BackoffPolicyArg;
use google_cloud_gax::client_builder::Result as BuilderResult;
use google_cloud_gax::retry_policy::RetryPolicyArg;
use google_cloud_gax::retry_throttler::RetryThrottlerArg;
#[derive(Clone, Debug)]
pub struct BasePublisherBuilder {
pub(super) config: ClientConfig,
}
impl BasePublisherBuilder {
pub(super) fn new() -> Self {
let mut config = ClientConfig::default();
config.backoff_policy = Some(std::sync::Arc::new(
super::backoff_policy::default_backoff_policy(),
));
config.retry_policy = Some(std::sync::Arc::new(
super::retry_policy::default_retry_policy(),
));
Self { config }
}
pub async fn build(self) -> BuilderResult<BasePublisher> {
BasePublisher::new(self).await
}
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.config.endpoint = Some(v.into());
self
}
pub fn with_tracing(mut self) -> Self {
self.config.tracing = true;
self
}
pub fn with_credentials<V: Into<gaxi::options::Credentials>>(mut self, v: V) -> Self {
self.config.cred = Some(v.into());
self
}
pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
self.config.retry_policy = Some(v.into().into());
self
}
pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
self.config.backoff_policy = Some(v.into().into());
self
}
pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
self.config.retry_throttler = v.into().into();
self
}
pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
self.config.grpc_subchannel_count = Some(v);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
#[test]
fn defaults() -> anyhow::Result<()> {
let builder = BasePublisherBuilder::new();
assert!(builder.config.endpoint.is_none(), "{builder:?}");
assert!(builder.config.cred.is_none(), "{builder:?}");
assert!(!builder.config.tracing);
assert!(
format!("{:?}", &builder.config).contains("AdaptiveThrottler"),
"{:?}",
builder.config
);
assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
let debug_str = format!("{:?}", &builder.config);
assert!(
debug_str.contains("initial_delay: 100ms"),
"actual: {debug_str}"
);
assert!(
debug_str.contains("maximum_delay: 60s"),
"actual: {debug_str}"
);
assert!(debug_str.contains("scaling: 4.0"), "actual: {debug_str}");
assert!(builder.config.retry_policy.is_some(), "{builder:?}");
assert!(
builder.config.grpc_subchannel_count.is_none(),
"{builder:?}"
);
Ok(())
}
#[tokio::test]
async fn setters() -> anyhow::Result<()> {
use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
let builder = BasePublisherBuilder::new()
.with_endpoint("test-endpoint.com")
.with_credentials(Anonymous::new().build())
.with_tracing()
.with_retry_policy(AlwaysRetry.with_attempt_limit(3))
.with_backoff_policy(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
)
.with_retry_throttler(google_cloud_gax::retry_throttler::CircuitBreaker::default())
.with_grpc_subchannel_count(16);
assert_eq!(
builder.config.endpoint,
Some("test-endpoint.com".to_string())
);
assert!(builder.config.cred.is_some(), "{builder:?}");
assert!(builder.config.tracing);
assert!(
format!("{:?}", &builder.config).contains("CircuitBreaker"),
"{:?}",
builder.config
);
assert!(builder.config.retry_policy.is_some(), "{builder:?}");
assert!(builder.config.backoff_policy.is_some(), "{builder:?}");
assert_eq!(builder.config.grpc_subchannel_count, Some(16));
Ok(())
}
}