use std::time::Duration;
pub use async_nats::jetstream::consumer::DeliverPolicy;
use ruststream::SubscriptionSource;
use crate::{NatsBroker, error::NatsError, subscriber::NatsSubscriber};
#[derive(Debug, Clone)]
#[must_use]
pub struct SubscribeOptions {
subject: String,
queue_group: Option<String>,
stream: Option<String>,
durable: Option<String>,
filter_subject: Option<String>,
ack_wait: Option<Duration>,
max_ack_pending: Option<i64>,
deliver_policy: Option<DeliverPolicy>,
pull_batch: Option<usize>,
pull_expires: Option<Duration>,
}
impl SubscribeOptions {
pub fn new(subject: impl Into<String>) -> Self {
Self {
subject: subject.into(),
queue_group: None,
stream: None,
durable: None,
filter_subject: None,
ack_wait: None,
max_ack_pending: None,
deliver_policy: None,
pull_batch: None,
pull_expires: None,
}
}
pub fn queue_group(mut self, name: impl Into<String>) -> Self {
self.queue_group = Some(name.into());
self
}
pub fn jetstream(mut self, stream: impl Into<String>) -> Self {
self.stream = Some(stream.into());
self
}
pub fn durable(mut self, name: impl Into<String>) -> Self {
self.durable = Some(name.into());
self
}
pub fn filter_subject(mut self, subject: impl Into<String>) -> Self {
self.filter_subject = Some(subject.into());
self
}
pub const fn ack_wait(mut self, ack_wait: Duration) -> Self {
self.ack_wait = Some(ack_wait);
self
}
pub const fn max_ack_pending(mut self, max: i64) -> Self {
self.max_ack_pending = Some(max);
self
}
pub const fn deliver_policy(mut self, policy: DeliverPolicy) -> Self {
self.deliver_policy = Some(policy);
self
}
pub const fn pull_batch(mut self, max_messages: usize) -> Self {
self.pull_batch = Some(max_messages);
self
}
pub const fn pull_expires(mut self, expires: Duration) -> Self {
self.pull_expires = Some(expires);
self
}
#[must_use]
pub fn subject(&self) -> &str {
&self.subject
}
#[must_use]
pub const fn is_jetstream(&self) -> bool {
self.stream.is_some()
}
pub(crate) fn queue_group_ref(&self) -> Option<&str> {
self.queue_group.as_deref()
}
pub(crate) fn stream_ref(&self) -> Option<&str> {
self.stream.as_deref()
}
pub(crate) fn durable_ref(&self) -> Option<&str> {
self.durable.as_deref()
}
pub(crate) fn filter_subject_or_default(&self) -> String {
self.filter_subject
.clone()
.unwrap_or_else(|| self.subject.clone())
}
pub(crate) fn ack_wait_or_default(&self) -> Duration {
self.ack_wait.unwrap_or(Duration::from_secs(30))
}
pub(crate) fn max_ack_pending_or_default(&self) -> i64 {
self.max_ack_pending.unwrap_or(1024)
}
pub(crate) fn deliver_policy_or_default(&self) -> DeliverPolicy {
self.deliver_policy.unwrap_or(DeliverPolicy::All)
}
pub(crate) fn pull_batch_or_default(&self) -> usize {
self.pull_batch.unwrap_or(100)
}
pub(crate) fn pull_expires_or_default(&self) -> Duration {
self.pull_expires.unwrap_or(Duration::from_secs(5))
}
pub fn validate(&self) -> Result<(), NatsError> {
if self.subject.is_empty() {
return Err(NatsError::InvalidOptions(
"subject must be non-empty".into(),
));
}
if self.stream.is_some() && self.queue_group.is_some() {
return Err(NatsError::InvalidOptions(
"queue_group is Core NATS only and cannot be combined with jetstream(_)".into(),
));
}
if self.stream.is_none() {
let js_only = [
("durable", self.durable.is_some()),
("ack_wait", self.ack_wait.is_some()),
("max_ack_pending", self.max_ack_pending.is_some()),
("deliver_policy", self.deliver_policy.is_some()),
("filter_subject", self.filter_subject.is_some()),
("pull_batch", self.pull_batch.is_some()),
("pull_expires", self.pull_expires.is_some()),
];
if let Some((field, _)) = js_only.iter().find(|(_, set)| *set) {
return Err(NatsError::InvalidOptions(format!(
"{field}(_) requires jetstream(stream_name)"
)));
}
}
Ok(())
}
}
impl SubscriptionSource<NatsBroker> for SubscribeOptions {
type Subscriber = NatsSubscriber;
fn name(&self) -> &str {
self.subject()
}
async fn subscribe(self, broker: &NatsBroker) -> Result<Self::Subscriber, NatsError> {
broker.subscribe(self).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn core_subscription_validates() {
let opts = SubscribeOptions::new("orders.*");
opts.validate().expect("core ok");
assert!(!opts.is_jetstream());
}
#[test]
fn core_with_queue_group_validates() {
SubscribeOptions::new("x")
.queue_group("workers")
.validate()
.expect("queue group ok on core");
}
#[test]
fn jetstream_subscription_validates() {
let opts = SubscribeOptions::new("orders.*")
.jetstream("ORDERS")
.durable("worker")
.ack_wait(Duration::from_secs(5))
.max_ack_pending(64)
.deliver_policy(DeliverPolicy::New);
opts.validate().expect("jetstream ok");
assert!(opts.is_jetstream());
}
#[test]
fn durable_without_jetstream_is_rejected() {
let err = SubscribeOptions::new("x")
.durable("worker")
.validate()
.unwrap_err();
assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("durable")));
}
#[test]
fn ack_wait_without_jetstream_is_rejected() {
let err = SubscribeOptions::new("x")
.ack_wait(Duration::from_secs(1))
.validate()
.unwrap_err();
assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("ack_wait")));
}
#[test]
fn queue_group_plus_jetstream_is_rejected() {
let err = SubscribeOptions::new("x")
.queue_group("w")
.jetstream("S")
.validate()
.unwrap_err();
assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("queue_group")));
}
#[test]
fn pull_batch_without_jetstream_is_rejected() {
let err = SubscribeOptions::new("x")
.pull_batch(64)
.validate()
.unwrap_err();
assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("pull_batch")));
}
#[test]
fn empty_subject_is_rejected() {
let err = SubscribeOptions::new("").validate().unwrap_err();
assert!(matches!(err, NatsError::InvalidOptions(msg) if msg.contains("subject")));
}
}