use super::constants::*;
use super::options::BatchingOptions;
use crate::client::Publisher;
use crate::generated::gapic_dataplane::client::Publisher as GapicPublisher;
use crate::publisher::actor::Dispatcher;
use crate::publisher::base_publisher::BasePublisher;
use google_cloud_gax::{
backoff_policy::BackoffPolicyArg, retry_policy::RetryPolicyArg,
retry_throttler::RetryThrottlerArg,
};
use std::time::Duration;
pub use super::base_publisher::BasePublisherBuilder;
#[derive(Clone, Debug)]
pub struct PublisherBuilder {
topic: String,
batching_options: BatchingOptions,
base_builder: BasePublisherBuilder,
}
impl PublisherBuilder {
pub(crate) fn new(topic: String) -> Self {
Self {
topic,
batching_options: BatchingOptions::default(),
base_builder: BasePublisher::builder(),
}
}
pub async fn build(self) -> crate::ClientBuilderResult<Publisher> {
let base_publisher = self.base_builder.build().await?;
let publisher = base_publisher
.publisher(&self.topic)
.set_message_count_threshold(self.batching_options.message_count_threshold)
.set_byte_threshold(self.batching_options.byte_threshold)
.set_delay_threshold(self.batching_options.delay_threshold)
.build();
Ok(publisher)
}
pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherBuilder {
self.batching_options = self.batching_options.set_message_count_threshold(threshold);
self
}
pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherBuilder {
self.batching_options = self.batching_options.set_byte_threshold(threshold);
self
}
pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherBuilder {
self.batching_options = self.batching_options.set_delay_threshold(threshold);
self
}
pub fn with_endpoint<V: Into<String>>(mut self, v: V) -> Self {
self.base_builder = self.base_builder.with_endpoint(v);
self
}
pub fn with_tracing(mut self) -> Self {
self.base_builder = self.base_builder.with_tracing();
self
}
pub fn with_credentials<T: Into<gaxi::options::Credentials>>(mut self, v: T) -> Self {
self.base_builder = self.base_builder.with_credentials(v);
self
}
pub fn with_retry_policy<V: Into<RetryPolicyArg>>(mut self, v: V) -> Self {
self.base_builder = self.base_builder.with_retry_policy(v);
self
}
pub fn with_backoff_policy<V: Into<BackoffPolicyArg>>(mut self, v: V) -> Self {
self.base_builder = self.base_builder.with_backoff_policy(v);
self
}
pub fn with_retry_throttler<V: Into<RetryThrottlerArg>>(mut self, v: V) -> Self {
self.base_builder = self.base_builder.with_retry_throttler(v);
self
}
pub fn with_grpc_subchannel_count(mut self, v: usize) -> Self {
self.base_builder = self.base_builder.with_grpc_subchannel_count(v);
self
}
}
#[derive(Clone, Debug)]
pub struct PublisherPartialBuilder {
pub(crate) inner: GapicPublisher,
topic: String,
batching_options: BatchingOptions,
}
impl PublisherPartialBuilder {
pub(crate) fn new(client: GapicPublisher, topic: String) -> Self {
Self {
inner: client,
topic,
batching_options: BatchingOptions::default(),
}
}
pub fn set_message_count_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
self.batching_options = self.batching_options.set_message_count_threshold(threshold);
self
}
pub fn set_byte_threshold(mut self, threshold: u32) -> PublisherPartialBuilder {
self.batching_options = self.batching_options.set_byte_threshold(threshold);
self
}
pub fn set_delay_threshold(mut self, threshold: Duration) -> PublisherPartialBuilder {
self.batching_options = self.batching_options.set_delay_threshold(threshold);
self
}
pub fn build(self) -> Publisher {
self.build_return_handle().0
}
pub(crate) fn build_return_handle(self) -> (Publisher, tokio::task::JoinHandle<()>) {
let batching_options = BatchingOptions::new()
.set_delay_threshold(
self.batching_options
.delay_threshold
.clamp(Duration::ZERO, MAX_DELAY),
)
.set_message_count_threshold(
self.batching_options
.message_count_threshold
.clamp(0, MAX_MESSAGES),
)
.set_byte_threshold(self.batching_options.byte_threshold.clamp(0, MAX_BYTES));
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let dispatcher = Dispatcher::new(self.topic, self.inner, batching_options.clone(), rx);
let handle = tokio::spawn(dispatcher.run());
(
Publisher {
batching_options,
tx,
},
handle,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn builder() -> anyhow::Result<()> {
let client: BasePublisher = BasePublisher::builder().build().await?;
let builder = client.publisher("projects/my-project/topics/my-topic");
let publisher = builder.set_message_count_threshold(1_u32).build();
assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
let publisher = Publisher::builder("projects/my-project/topics/my-topic")
.set_message_count_threshold(1_u32)
.build()
.await?;
assert_eq!(publisher.batching_options.message_count_threshold, 1_u32);
Ok(())
}
#[tokio::test]
async fn default_batching() -> anyhow::Result<()> {
let topic_name = "projects/my-project/topics/my-topic";
let publishers = vec![
BasePublisher::builder()
.build()
.await?
.publisher(topic_name)
.build(),
Publisher::builder(topic_name).build().await?,
];
for publisher in publishers {
assert_eq!(
publisher.batching_options.message_count_threshold,
BatchingOptions::default().message_count_threshold
);
assert_eq!(
publisher.batching_options.byte_threshold,
BatchingOptions::default().byte_threshold
);
assert_eq!(
publisher.batching_options.delay_threshold,
BatchingOptions::default().delay_threshold
);
}
Ok(())
}
fn assert_eq_client_config(
pub_config: &gaxi::options::ClientConfig,
base_config: &gaxi::options::ClientConfig,
) {
assert_eq!(pub_config.endpoint, base_config.endpoint);
assert_eq!(pub_config.cred.is_some(), base_config.cred.is_some());
assert_eq!(pub_config.tracing, base_config.tracing);
assert_eq!(
pub_config.retry_policy.is_some(),
base_config.retry_policy.is_some()
);
assert_eq!(
pub_config.backoff_policy.is_some(),
base_config.backoff_policy.is_some()
);
assert_eq!(
pub_config.grpc_subchannel_count,
base_config.grpc_subchannel_count
);
}
#[test]
fn publisher_has_default_client_config() {
let pub_builder = Publisher::builder("projects/my-project/topics/my-topic");
let base_builder = BasePublisher::builder();
let pub_config = &pub_builder.base_builder.config;
let base_config = &base_builder.config;
assert_eq_client_config(pub_config, base_config);
}
#[tokio::test]
async fn publisher_builder_sets_client_config() -> anyhow::Result<()> {
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
let throttler = google_cloud_gax::retry_throttler::CircuitBreaker::default();
let pub_builder = Publisher::builder("projects/my-project/topics/my-topic")
.with_endpoint("test-endpoint.com")
.with_credentials(Anonymous::new().build())
.with_tracing()
.with_retry_policy(AlwaysRetry.with_attempt_limit(3))
.with_backoff_policy(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
)
.with_retry_throttler(throttler.clone())
.with_grpc_subchannel_count(16);
let base_builder = BasePublisher::builder()
.with_endpoint("test-endpoint.com")
.with_credentials(Anonymous::new().build())
.with_tracing()
.with_retry_policy(AlwaysRetry.with_attempt_limit(3))
.with_backoff_policy(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
)
.with_retry_throttler(throttler)
.with_grpc_subchannel_count(16);
let pub_config = &pub_builder.base_builder.config;
let base_config = &base_builder.config;
assert_eq_client_config(pub_config, base_config);
Ok(())
}
}