use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::backend::{Backend, ConsumerOptionsInner};
use crate::error::{Result, ShoveError};
#[cfg(feature = "kafka-schema-registry")]
use crate::markers::Kafka;
#[cfg(feature = "nats")]
use crate::markers::Nats;
#[cfg(feature = "rabbitmq")]
use crate::markers::RabbitMq;
#[cfg(feature = "aws-sns-sqs")]
use crate::markers::Sqs;
#[cfg(feature = "kafka-schema-registry")]
use crate::schema_registry::{SchemaEnforcement, SchemaRegistry};
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
pub const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_secs(30);
#[allow(dead_code)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub(crate) enum HandlerTimeoutConfig {
#[default]
Inherit,
Set(Duration),
}
#[allow(dead_code)] pub(crate) fn resolve_handler_timeout(
config: HandlerTimeoutConfig,
registry_default: Option<Duration>,
) -> Duration {
match config {
HandlerTimeoutConfig::Set(d) => d,
HandlerTimeoutConfig::Inherit => registry_default.unwrap_or(DEFAULT_HANDLER_TIMEOUT),
}
}
pub const DEFAULT_MAX_PENDING_PER_KEY: usize = 1_000;
pub(crate) fn validate_message_size(len: usize, max: Option<usize>) -> Result<()> {
match max {
Some(max) if len > max => Err(ShoveError::Validation(format!(
"message size {len} exceeds max_message_size {max}"
))),
_ => Ok(()),
}
}
pub struct ConsumerOptions<B: Backend> {
pub max_retries: u32,
pub prefetch_count: u16,
pub concurrent_processing: bool,
pub handler_timeout: Option<Duration>,
pub max_pending_per_key: Option<usize>,
pub max_message_size: Option<usize>,
pub max_reconnect_attempts: Option<u32>,
#[cfg(feature = "rabbitmq")]
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))]
pub hold_queue_timeout: Option<Duration>,
#[cfg(feature = "rabbitmq-transactional")]
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq-transactional")))]
pub exactly_once: bool,
#[cfg(feature = "aws-sns-sqs")]
#[cfg_attr(docsrs, doc(cfg(feature = "aws-sns-sqs")))]
pub receive_batch_size: u16,
#[cfg(feature = "nats")]
#[cfg_attr(docsrs, doc(cfg(feature = "nats")))]
pub max_ack_pending: Option<i64>,
#[cfg(feature = "kafka-schema-registry")]
#[cfg_attr(docsrs, doc(cfg(feature = "kafka-schema-registry")))]
pub schema_registry: Option<Arc<SchemaRegistry>>,
#[cfg(feature = "kafka-schema-registry")]
#[cfg_attr(docsrs, doc(cfg(feature = "kafka-schema-registry")))]
pub schema_enforcement: SchemaEnforcement,
#[cfg(feature = "kafka-schema-registry")]
#[cfg_attr(docsrs, doc(cfg(feature = "kafka-schema-registry")))]
pub schema_accepted_subjects: Option<Vec<Arc<str>>>,
pub(crate) shutdown: Option<CancellationToken>,
pub(crate) processing: Arc<AtomicBool>,
pub(crate) consumer_group: Option<Arc<str>>,
_backend: PhantomData<fn() -> B>,
}
impl<B: Backend> ConsumerOptions<B> {
pub fn new() -> Self {
Self {
max_retries: 10,
prefetch_count: 10,
concurrent_processing: true,
handler_timeout: Some(DEFAULT_HANDLER_TIMEOUT),
max_pending_per_key: Some(DEFAULT_MAX_PENDING_PER_KEY),
max_message_size: Some(DEFAULT_MAX_MESSAGE_SIZE),
max_reconnect_attempts: None,
#[cfg(feature = "rabbitmq")]
hold_queue_timeout: None,
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: false,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: 0,
#[cfg(feature = "nats")]
max_ack_pending: None,
#[cfg(feature = "kafka-schema-registry")]
schema_registry: None,
#[cfg(feature = "kafka-schema-registry")]
schema_enforcement: SchemaEnforcement::Enforce,
#[cfg(feature = "kafka-schema-registry")]
schema_accepted_subjects: None,
shutdown: None,
processing: Arc::new(AtomicBool::new(false)),
consumer_group: None,
_backend: PhantomData,
}
}
pub fn preset(prefetch: u16) -> Self {
Self::new().with_prefetch_count(prefetch)
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_prefetch_count(mut self, prefetch_count: u16) -> Self {
self.prefetch_count = prefetch_count;
self
}
pub fn with_concurrent_processing(mut self, on: bool) -> Self {
self.concurrent_processing = on;
self
}
pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
assert!(!timeout.is_zero(), "handler_timeout must be positive");
self.handler_timeout = Some(timeout);
self
}
pub fn without_handler_timeout(mut self) -> Self {
self.handler_timeout = None;
self
}
pub fn with_max_pending_per_key(mut self, limit: usize) -> Self {
self.max_pending_per_key = Some(limit);
self
}
pub fn without_max_pending_per_key(mut self) -> Self {
self.max_pending_per_key = None;
self
}
pub fn with_max_message_size(mut self, max: usize) -> Self {
self.max_message_size = Some(max);
self
}
pub fn without_message_size_limit(mut self) -> Self {
self.max_message_size = None;
self
}
pub fn with_max_reconnect_attempts(mut self, n: u32) -> Self {
self.max_reconnect_attempts = Some(n);
self
}
pub fn with_shutdown(mut self, shutdown: CancellationToken) -> Self {
self.shutdown = Some(shutdown);
self
}
pub fn with_consumer_group(mut self, name: impl Into<Arc<str>>) -> Self {
self.consumer_group = Some(name.into());
self
}
pub fn processing_handle(&self) -> Arc<AtomicBool> {
self.processing.clone()
}
pub(crate) fn into_inner(self) -> ConsumerOptionsInner {
let effective_prefetch = if self.concurrent_processing {
self.prefetch_count
} else {
1
};
ConsumerOptionsInner {
max_retries: self.max_retries,
prefetch_count: effective_prefetch,
handler_timeout: self.handler_timeout,
max_pending_per_key: self.max_pending_per_key,
max_message_size: self.max_message_size,
max_reconnect_attempts: self.max_reconnect_attempts,
#[cfg(feature = "rabbitmq")]
hold_queue_timeout: self.hold_queue_timeout,
shutdown: self.shutdown.unwrap_or_default(),
processing: self.processing,
consumer_group: self.consumer_group,
#[cfg(feature = "kafka")]
kafka_group_id: None,
#[cfg(feature = "kafka")]
kafka_auto_offset_reset: None,
#[cfg(feature = "kafka-schema-registry")]
schema_registry: self.schema_registry,
#[cfg(feature = "kafka-schema-registry")]
schema_enforcement: self.schema_enforcement,
#[cfg(feature = "kafka-schema-registry")]
schema_accepted_subjects: self.schema_accepted_subjects,
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: self.exactly_once,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: self.receive_batch_size,
#[cfg(feature = "nats")]
max_ack_pending: self.max_ack_pending,
}
}
}
impl<B: Backend> Default for ConsumerOptions<B> {
fn default() -> Self {
Self::new()
}
}
impl<B: Backend> Clone for ConsumerOptions<B> {
fn clone(&self) -> Self {
Self {
max_retries: self.max_retries,
prefetch_count: self.prefetch_count,
concurrent_processing: self.concurrent_processing,
handler_timeout: self.handler_timeout,
max_pending_per_key: self.max_pending_per_key,
max_message_size: self.max_message_size,
max_reconnect_attempts: self.max_reconnect_attempts,
#[cfg(feature = "rabbitmq")]
hold_queue_timeout: self.hold_queue_timeout,
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: self.exactly_once,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: self.receive_batch_size,
#[cfg(feature = "nats")]
max_ack_pending: self.max_ack_pending,
#[cfg(feature = "kafka-schema-registry")]
schema_registry: self.schema_registry.clone(),
#[cfg(feature = "kafka-schema-registry")]
schema_enforcement: self.schema_enforcement,
#[cfg(feature = "kafka-schema-registry")]
schema_accepted_subjects: self.schema_accepted_subjects.clone(),
shutdown: self.shutdown.clone(),
processing: self.processing.clone(),
consumer_group: self.consumer_group.clone(),
_backend: PhantomData,
}
}
}
#[cfg(feature = "aws-sns-sqs")]
#[cfg_attr(docsrs, doc(cfg(feature = "aws-sns-sqs")))]
impl ConsumerOptions<Sqs> {
pub fn with_receive_batch_size(mut self, n: u16) -> Self {
self.receive_batch_size = n;
self
}
}
#[cfg(feature = "nats")]
#[cfg_attr(docsrs, doc(cfg(feature = "nats")))]
impl ConsumerOptions<Nats> {
pub fn with_max_ack_pending(mut self, n: i64) -> Self {
self.max_ack_pending = Some(n);
self
}
}
#[cfg(feature = "rabbitmq-transactional")]
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq-transactional")))]
impl ConsumerOptions<RabbitMq> {
pub fn with_exactly_once(mut self) -> Self {
self.exactly_once = true;
self
}
}
#[cfg(feature = "rabbitmq")]
#[cfg_attr(docsrs, doc(cfg(feature = "rabbitmq")))]
impl ConsumerOptions<RabbitMq> {
pub fn with_hold_queue_timeout(mut self, timeout: Duration) -> Self {
assert!(!timeout.is_zero(), "hold_queue_timeout must be positive");
self.hold_queue_timeout = Some(timeout);
self
}
}
#[cfg(feature = "kafka-schema-registry")]
#[cfg_attr(docsrs, doc(cfg(feature = "kafka-schema-registry")))]
impl ConsumerOptions<Kafka> {
pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.schema_registry = Some(registry);
self
}
pub fn with_schema_enforcement(mut self, enforcement: SchemaEnforcement) -> Self {
self.schema_enforcement = enforcement;
self
}
pub fn accept_schema_subjects<I, S>(mut self, subjects: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<Arc<str>>,
{
self.schema_accepted_subjects = Some(subjects.into_iter().map(Into::into).collect());
self
}
}
#[cfg(test)]
#[allow(clippy::absolute_paths)]
mod tests {
use super::*;
#[allow(unused_imports)]
use crate::markers::*;
#[cfg(feature = "inmemory")]
type TestBackend = InMemory;
#[cfg(all(not(feature = "inmemory"), feature = "kafka"))]
type TestBackend = Kafka;
#[cfg(all(not(feature = "inmemory"), not(feature = "kafka"), feature = "nats"))]
type TestBackend = Nats;
#[cfg(all(
not(feature = "inmemory"),
not(feature = "kafka"),
not(feature = "nats"),
not(feature = "redis-streams"),
feature = "rabbitmq"
))]
type TestBackend = RabbitMq;
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn defaults_are_correct() {
let opts = ConsumerOptions::<TestBackend>::new();
assert_eq!(opts.max_retries, 10);
assert_eq!(opts.prefetch_count, 10);
assert!(opts.concurrent_processing);
assert_eq!(opts.handler_timeout, Some(DEFAULT_HANDLER_TIMEOUT));
assert_eq!(opts.max_pending_per_key, Some(DEFAULT_MAX_PENDING_PER_KEY));
assert_eq!(opts.max_message_size, Some(DEFAULT_MAX_MESSAGE_SIZE));
assert_eq!(opts.max_reconnect_attempts, None);
assert!(!opts.processing.load(std::sync::atomic::Ordering::Acquire));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_concurrent_processing_toggles_flag() {
let opts = ConsumerOptions::<TestBackend>::new().with_concurrent_processing(false);
assert!(!opts.concurrent_processing);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn into_inner_clamps_prefetch_when_non_concurrent() {
let inner = ConsumerOptions::<TestBackend>::new()
.with_prefetch_count(32)
.with_concurrent_processing(false)
.into_inner();
assert_eq!(
inner.prefetch_count, 1,
"prefetch must clamp to 1 when concurrent_processing=false"
);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn into_inner_preserves_prefetch_when_concurrent() {
let inner = ConsumerOptions::<TestBackend>::new()
.with_prefetch_count(32)
.into_inner();
assert_eq!(inner.prefetch_count, 32);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_max_retries_overrides() {
let opts = ConsumerOptions::<TestBackend>::new().with_max_retries(5);
assert_eq!(opts.max_retries, 5);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_prefetch_count_overrides() {
let opts = ConsumerOptions::<TestBackend>::new().with_prefetch_count(50);
assert_eq!(opts.prefetch_count, 50);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_handler_timeout_sets_timeout() {
let opts =
ConsumerOptions::<TestBackend>::new().with_handler_timeout(Duration::from_secs(30));
assert_eq!(opts.handler_timeout, Some(Duration::from_secs(30)));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn builder_chains() {
let opts = ConsumerOptions::<TestBackend>::new()
.with_max_retries(3)
.with_prefetch_count(20)
.with_handler_timeout(Duration::from_secs(5))
.with_max_pending_per_key(100)
.with_max_message_size(5 * 1024 * 1024)
.with_max_reconnect_attempts(50);
assert_eq!(opts.max_retries, 3);
assert_eq!(opts.prefetch_count, 20);
assert_eq!(opts.handler_timeout, Some(Duration::from_secs(5)));
assert_eq!(opts.max_pending_per_key, Some(100));
assert_eq!(opts.max_message_size, Some(5 * 1024 * 1024));
assert_eq!(opts.max_reconnect_attempts, Some(50));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn preset_sets_prefetch() {
let opts = ConsumerOptions::<TestBackend>::preset(42);
assert_eq!(opts.prefetch_count, 42);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_max_pending_per_key_sets_value() {
let opts = ConsumerOptions::<TestBackend>::new().with_max_pending_per_key(50);
assert_eq!(opts.max_pending_per_key, Some(50));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_max_message_size_overrides_default() {
let opts = ConsumerOptions::<TestBackend>::new().with_max_message_size(1024 * 1024);
assert_eq!(opts.max_message_size, Some(1024 * 1024));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_max_reconnect_attempts_sets_value() {
let opts = ConsumerOptions::<TestBackend>::new().with_max_reconnect_attempts(25);
assert_eq!(opts.max_reconnect_attempts, Some(25));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn without_message_size_limit_disables_check() {
let opts = ConsumerOptions::<TestBackend>::new().without_message_size_limit();
assert_eq!(opts.max_message_size, None);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn exactly_once_defaults_to_false() {
let opts = ConsumerOptions::<RabbitMq>::new();
assert!(!opts.exactly_once);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn with_exactly_once_sets_flag() {
let opts = ConsumerOptions::<RabbitMq>::new().with_exactly_once();
assert!(opts.exactly_once);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn exactly_once_chains_with_other_builders() {
let opts = ConsumerOptions::<RabbitMq>::new()
.with_max_retries(5)
.with_exactly_once()
.with_prefetch_count(1);
assert!(opts.exactly_once);
assert_eq!(opts.max_retries, 5);
assert_eq!(opts.prefetch_count, 1);
}
#[test]
fn validate_message_size_accepts_payload_at_limit() {
assert!(validate_message_size(100, Some(100)).is_ok());
}
#[test]
fn validate_message_size_accepts_payload_under_limit() {
assert!(validate_message_size(99, Some(100)).is_ok());
}
#[test]
fn validate_message_size_rejects_oversize_payload() {
let err = validate_message_size(101, Some(100)).unwrap_err();
let ShoveError::Validation(msg) = err else {
panic!("expected Validation variant");
};
assert!(msg.contains("101"));
assert!(msg.contains("100"));
}
#[test]
fn validate_message_size_skips_check_when_limit_absent() {
assert!(validate_message_size(usize::MAX, None).is_ok());
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn without_handler_timeout_clears_default() {
let opts = ConsumerOptions::<TestBackend>::new().without_handler_timeout();
assert_eq!(opts.handler_timeout, None);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn without_max_pending_per_key_clears_default() {
let opts = ConsumerOptions::<TestBackend>::new().without_max_pending_per_key();
assert_eq!(opts.max_pending_per_key, None);
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_consumer_group_sets_label() {
let opts = ConsumerOptions::<TestBackend>::new().with_consumer_group("orders-worker");
let inner = opts.into_inner();
assert_eq!(inner.consumer_group.as_deref(), Some("orders-worker"));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn with_shutdown_stores_token() {
let token = CancellationToken::new();
let opts = ConsumerOptions::<TestBackend>::new().with_shutdown(token.clone());
let inner = opts.into_inner();
token.cancel();
assert!(inner.shutdown.is_cancelled());
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn into_inner_without_shutdown_yields_fresh_token() {
let inner = ConsumerOptions::<TestBackend>::new().into_inner();
assert!(!inner.shutdown.is_cancelled());
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn processing_handle_is_a_shared_view() {
use std::sync::atomic::Ordering;
let opts = ConsumerOptions::<TestBackend>::new();
let handle = opts.processing_handle();
handle.store(true, Ordering::Release);
let inner = opts.into_inner();
assert!(inner.processing.load(Ordering::Acquire));
}
#[cfg(any(
feature = "inmemory",
feature = "kafka",
feature = "nats",
feature = "rabbitmq"
))]
#[test]
fn clone_preserves_all_settings() {
let opts = ConsumerOptions::<TestBackend>::new()
.with_max_retries(7)
.with_prefetch_count(13)
.with_concurrent_processing(false)
.with_handler_timeout(Duration::from_secs(11))
.with_max_pending_per_key(99)
.with_max_message_size(4096)
.with_max_reconnect_attempts(42);
let copy = opts.clone();
assert_eq!(copy.max_retries, 7);
assert_eq!(copy.prefetch_count, 13);
assert!(!copy.concurrent_processing);
assert_eq!(copy.handler_timeout, Some(Duration::from_secs(11)));
assert_eq!(copy.max_pending_per_key, Some(99));
assert_eq!(copy.max_message_size, Some(4096));
assert_eq!(copy.max_reconnect_attempts, Some(42));
}
#[cfg(feature = "aws-sns-sqs")]
#[test]
fn sqs_with_receive_batch_size_sets_value() {
let opts = ConsumerOptions::<Sqs>::new().with_receive_batch_size(7);
assert_eq!(opts.receive_batch_size, 7);
}
#[cfg(feature = "nats")]
#[test]
fn nats_with_max_ack_pending_sets_value() {
let opts = ConsumerOptions::<Nats>::new().with_max_ack_pending(128);
assert_eq!(opts.max_ack_pending, Some(128));
}
#[test]
fn resolve_set_returns_explicit_duration() {
assert_eq!(
resolve_handler_timeout(
HandlerTimeoutConfig::Set(Duration::from_secs(5)),
Some(Duration::from_secs(60)),
),
Duration::from_secs(5),
);
assert_eq!(
resolve_handler_timeout(HandlerTimeoutConfig::Set(Duration::from_secs(5)), None),
Duration::from_secs(5),
);
}
#[test]
fn resolve_inherit_uses_registry_default_then_library_default() {
assert_eq!(
resolve_handler_timeout(HandlerTimeoutConfig::Inherit, Some(Duration::from_secs(45))),
Duration::from_secs(45),
);
assert_eq!(
resolve_handler_timeout(HandlerTimeoutConfig::Inherit, None),
DEFAULT_HANDLER_TIMEOUT,
);
}
#[test]
fn handler_timeout_config_default_is_inherit() {
assert_eq!(
HandlerTimeoutConfig::default(),
HandlerTimeoutConfig::Inherit
);
}
#[cfg(feature = "kafka-schema-registry")]
mod kafka_schema_registry_options {
use std::sync::Arc;
use crate::consumer::ConsumerOptions;
use crate::markers::Kafka;
use crate::schema_registry::{SchemaEnforcement, SchemaRegistry};
#[test]
fn schema_enforcement_default_is_enforce() {
let inner = ConsumerOptions::<Kafka>::new().into_inner();
assert_eq!(inner.schema_enforcement, SchemaEnforcement::Enforce);
assert!(inner.schema_registry.is_none());
assert!(inner.schema_accepted_subjects.is_none());
}
#[test]
fn with_schema_enforcement_permissive_propagates() {
let inner = ConsumerOptions::<Kafka>::new()
.with_schema_enforcement(SchemaEnforcement::Permissive)
.into_inner();
assert_eq!(inner.schema_enforcement, SchemaEnforcement::Permissive);
}
#[test]
fn accept_schema_subjects_propagates() {
let inner = ConsumerOptions::<Kafka>::new()
.accept_schema_subjects(["orders-value", "orders-key"])
.into_inner();
assert_eq!(
inner.schema_accepted_subjects,
Some(vec![Arc::from("orders-value"), Arc::from("orders-key")])
);
}
#[test]
fn with_schema_registry_propagates() {
let registry = SchemaRegistry::builder("http://localhost:8081").build();
let inner = ConsumerOptions::<Kafka>::new()
.with_schema_registry(registry)
.into_inner();
assert!(inner.schema_registry.is_some());
}
}
}