use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::consumer::{
DEFAULT_HANDLER_TIMEOUT, DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_PENDING_PER_KEY,
validate_message_size,
};
use crate::error::Result;
#[derive(Clone)]
#[allow(dead_code)] pub(crate) struct ConsumerOptionsInner {
pub max_retries: u32,
pub prefetch_count: u16,
pub handler_timeout: Option<Duration>,
pub max_pending_per_key: Option<usize>,
pub max_message_size: Option<usize>,
pub max_reconnect_attempts: Option<u32>,
#[cfg(feature = "rabbitmq")]
pub hold_queue_timeout: Option<Duration>,
pub shutdown: CancellationToken,
pub processing: Arc<AtomicBool>,
pub consumer_group: Option<Arc<str>>,
#[cfg(feature = "rabbitmq-transactional")]
pub exactly_once: bool,
#[cfg(feature = "aws-sns-sqs")]
pub receive_batch_size: u16,
#[cfg(feature = "nats")]
pub max_ack_pending: Option<i64>,
}
impl ConsumerOptionsInner {
#[allow(dead_code)] pub(crate) fn defaults_with_shutdown(shutdown: CancellationToken) -> Self {
Self {
max_retries: 10,
prefetch_count: 10,
handler_timeout: Some(DEFAULT_HANDLER_TIMEOUT),
max_pending_per_key: Some(DEFAULT_MAX_PENDING_PER_KEY),
max_message_size: Some(DEFAULT_MAX_MESSAGE_SIZE),
max_reconnect_attempts: None,
#[cfg(feature = "rabbitmq")]
hold_queue_timeout: None,
shutdown,
processing: Arc::new(AtomicBool::new(false)),
consumer_group: None,
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: false,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: 0,
#[cfg(feature = "nats")]
max_ack_pending: None,
}
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn test_shutdown() -> CancellationToken {
CancellationToken::new()
}
}
impl ConsumerOptionsInner {
#[allow(dead_code)]
pub(crate) fn validate_payload_message_size(&self, len: usize) -> Result<()> {
validate_message_size(len, self.max_message_size)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn defaults_with_shutdown_max_reconnect_attempts_is_none() {
let opts = ConsumerOptionsInner::defaults_with_shutdown(CancellationToken::new());
assert!(
opts.max_reconnect_attempts.is_none(),
"max_reconnect_attempts must default to None (unlimited)"
);
}
#[test]
fn defaults_with_shutdown_has_sensible_values() {
let opts = ConsumerOptionsInner::defaults_with_shutdown(CancellationToken::new());
assert_eq!(opts.max_retries, 10);
assert_eq!(opts.prefetch_count, 10);
assert!(opts.handler_timeout.is_some());
assert!(opts.max_pending_per_key.is_some());
assert!(opts.max_message_size.is_some());
assert!(opts.consumer_group.is_none());
}
}