use super::MessageStream;
use super::ShutdownBehavior;
use super::transport::Transport;
use std::sync::Arc;
use std::time::Duration;
const MIB: i64 = 1024 * 1024;
pub use super::client_builder::ClientBuilder;
pub struct Subscribe {
pub(super) inner: Arc<Transport>,
pub(super) subscription: String,
pub(super) client_id: String,
pub(super) grpc_subchannel_count: usize,
pub(super) ack_deadline_seconds: i32,
pub(super) max_lease: Duration,
pub(super) max_outstanding_messages: i64,
pub(super) max_outstanding_bytes: i64,
pub(super) shutdown_behavior: ShutdownBehavior,
}
impl Subscribe {
pub(super) fn new(
inner: Arc<Transport>,
subscription: String,
client_id: String,
grpc_subchannel_count: usize,
) -> Self {
Self {
inner,
subscription,
client_id,
grpc_subchannel_count,
ack_deadline_seconds: 60,
max_lease: Duration::from_secs(60 * 60),
max_outstanding_messages: 1000,
max_outstanding_bytes: 100 * MIB,
shutdown_behavior: ShutdownBehavior::WaitForProcessing,
}
}
pub fn build(self) -> MessageStream {
MessageStream::new(self)
}
pub fn set_max_lease<T: Into<Duration>>(mut self, v: T) -> Self {
self.max_lease = v.into();
self
}
pub fn set_max_lease_extension<T: Into<Duration>>(mut self, v: T) -> Self {
self.ack_deadline_seconds = v.into().as_secs().clamp(10, 600) as i32;
self
}
pub fn set_max_outstanding_messages<T: Into<i64>>(mut self, v: T) -> Self {
self.max_outstanding_messages = v.into();
self
}
pub fn set_max_outstanding_bytes<T: Into<i64>>(mut self, v: T) -> Self {
self.max_outstanding_bytes = v.into();
self
}
pub fn set_shutdown_behavior(mut self, v: ShutdownBehavior) -> Self {
self.shutdown_behavior = v;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use gaxi::options::ClientConfig;
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use test_case::test_case;
const KIB: i64 = 1024;
async fn test_inner() -> anyhow::Result<Arc<Transport>> {
let mut config = ClientConfig::default();
config.cred = Some(Anonymous::new().build());
let transport = Transport::new(config).await?;
Ok(Arc::new(transport))
}
#[tokio::test]
async fn reasonable_defaults() -> anyhow::Result<()> {
let builder = Subscribe::new(
test_inner().await?,
"projects/my-project/subscriptions/my-subscription".to_string(),
"client-id".to_string(),
1_usize,
);
assert_eq!(
builder.subscription,
"projects/my-project/subscriptions/my-subscription"
);
assert_eq!(builder.grpc_subchannel_count, 1);
assert_eq!(builder.ack_deadline_seconds, 60);
assert!(
builder.max_lease >= Duration::from_secs(300),
"max_lease={:?}",
builder.max_lease
);
assert!(
100_000 > builder.max_outstanding_messages && builder.max_outstanding_messages > 100,
"max_outstanding_messages={}",
builder.max_outstanding_messages
);
assert!(
builder.max_outstanding_bytes > 100 * KIB,
"max_outstanding_bytes={}",
builder.max_outstanding_bytes
);
assert_eq!(
builder.shutdown_behavior,
ShutdownBehavior::WaitForProcessing
);
Ok(())
}
#[tokio::test]
async fn options() -> anyhow::Result<()> {
let builder = Subscribe::new(
test_inner().await?,
"projects/my-project/subscriptions/my-subscription".to_string(),
"client-id".to_string(),
2_usize,
)
.set_max_lease(Duration::from_secs(3600))
.set_max_lease_extension(Duration::from_secs(20))
.set_max_outstanding_messages(12345)
.set_max_outstanding_bytes(6789 * KIB)
.set_shutdown_behavior(ShutdownBehavior::NackImmediately);
assert_eq!(
builder.subscription,
"projects/my-project/subscriptions/my-subscription"
);
assert_eq!(builder.grpc_subchannel_count, 2);
assert_eq!(builder.max_lease, Duration::from_secs(3600));
assert_eq!(builder.ack_deadline_seconds, 20);
assert_eq!(builder.max_outstanding_messages, 12345);
assert_eq!(builder.max_outstanding_bytes, 6789 * KIB);
assert_eq!(builder.shutdown_behavior, ShutdownBehavior::NackImmediately);
Ok(())
}
#[test_case(Duration::ZERO, 10)]
#[test_case(Duration::from_secs(42), 42)]
#[test_case(Duration::from_secs(4200), 600)]
#[tokio::test]
async fn clamp_ack_deadline(v: Duration, want: i32) -> anyhow::Result<()> {
let builder = Subscribe::new(
test_inner().await?,
"projects/my-project/subscriptions/my-subscription".to_string(),
"client-id".to_string(),
1_usize,
)
.set_max_lease_extension(v);
assert_eq!(builder.ack_deadline_seconds, want);
Ok(())
}
}