use crate::error::Result;
pub(crate) mod autoscaler;
pub mod capability;
pub(crate) mod consumer;
pub(crate) mod options_inner;
pub(crate) mod publisher;
pub(crate) mod registry;
pub(crate) mod topology;
pub(crate) use autoscaler::{AutoscalerBackendImpl, QueueStatsProviderImpl};
pub(crate) use consumer::ConsumerImpl;
pub(crate) use options_inner::ConsumerOptionsInner;
pub(crate) use publisher::PublisherImpl;
pub(crate) use registry::RegistryImpl;
pub(crate) use topology::TopologyImpl;
pub(crate) mod sealed {
pub trait Sealed {}
}
#[allow(private_interfaces, private_bounds)]
pub trait Backend: sealed::Sealed + Sized + Send + Sync + 'static {
type Config: Send;
type Client: Clone + Send + Sync + 'static;
type PublisherImpl: PublisherImpl + Clone + Send + Sync + 'static;
type ConsumerImpl: ConsumerImpl + Clone + Send + Sync + 'static;
type TopologyImpl: TopologyImpl + Send + Sync + 'static;
type AutoscalerImpl: AutoscalerBackendImpl + Send + Sync + 'static;
type QueueStatsImpl: QueueStatsProviderImpl + Send + Sync + 'static;
fn connect(config: Self::Config) -> impl Future<Output = Result<Self::Client>> + Send;
fn make_publisher(
client: &Self::Client,
) -> impl Future<Output = Result<Self::PublisherImpl>> + Send;
fn make_consumer(client: &Self::Client) -> Self::ConsumerImpl;
fn make_declarer(client: &Self::Client) -> Self::TopologyImpl;
fn make_autoscaler(client: &Self::Client) -> Self::AutoscalerImpl;
fn make_stats_provider(client: &Self::Client) -> Self::QueueStatsImpl;
fn close(client: &Self::Client) -> impl Future<Output = ()> + Send;
}
#[cfg(test)]
mod bounds_smoke {
use super::*;
use crate::backend::capability::HasCoordinatedGroups;
use crate::markers::*;
use crate::{MessageMetadata, QueueTopology, SupervisorOutcome};
use tokio_util::sync::CancellationToken;
fn _require_backend<B: Backend>() {
fn needs_send_sync_static<T: Send + Sync + 'static>() {}
needs_send_sync_static::<B>();
needs_send_sync_static::<B::Client>();
needs_send_sync_static::<B::PublisherImpl>();
needs_send_sync_static::<B::ConsumerImpl>();
needs_send_sync_static::<B::TopologyImpl>();
needs_send_sync_static::<B::AutoscalerImpl>();
needs_send_sync_static::<B::QueueStatsImpl>();
}
fn _require_has_coordinated_groups<B: HasCoordinatedGroups>() {
fn needs_default_clone<T: Default + Clone + Send + 'static>() {}
needs_default_clone::<B::ConsumerGroupConfig>();
}
#[cfg(feature = "inmemory")]
fn _anchor_inmemory() {
_require_backend::<InMemory>();
_require_has_coordinated_groups::<InMemory>();
}
#[cfg(feature = "kafka")]
fn _anchor_kafka() {
_require_backend::<Kafka>();
_require_has_coordinated_groups::<Kafka>();
}
#[cfg(feature = "nats")]
fn _anchor_nats() {
_require_backend::<Nats>();
_require_has_coordinated_groups::<Nats>();
}
#[cfg(feature = "rabbitmq")]
fn _anchor_rabbitmq() {
_require_backend::<RabbitMq>();
_require_has_coordinated_groups::<RabbitMq>();
}
#[cfg(feature = "aws-sns-sqs")]
fn _anchor_sqs() {
_require_backend::<Sqs>();
}
#[cfg(feature = "inmemory")]
async fn _anchor_publisher_impl(p: &<InMemory as Backend>::PublisherImpl) {
use crate::topic::Topic;
struct Dummy;
impl Topic for Dummy {
type Message = ();
fn topology() -> &'static QueueTopology {
unreachable!("anchor only")
}
}
let _ = <_ as PublisherImpl>::publish::<Dummy>(p, &()).await;
let _ = <_ as PublisherImpl>::publish_with_headers::<Dummy>(
p,
&(),
std::collections::HashMap::new(),
)
.await;
let _ = <_ as PublisherImpl>::publish_batch::<Dummy>(p, &[]).await;
}
#[cfg(feature = "inmemory")]
async fn _anchor_consumer_impl(c: &<InMemory as Backend>::ConsumerImpl) {
use crate::handler::MessageHandler;
use crate::outcome::Outcome;
use crate::topic::{SequencedTopic, Topic};
struct Dummy;
impl Topic for Dummy {
type Message = ();
fn topology() -> &'static QueueTopology {
unreachable!("anchor only")
}
}
impl SequencedTopic for Dummy {
fn sequence_key(_m: &Self::Message) -> String {
unreachable!("anchor only")
}
}
struct DummyHandler;
impl MessageHandler<Dummy> for DummyHandler {
type Context = ();
async fn handle(&self, _m: (), _meta: MessageMetadata, _ctx: &()) -> Outcome {
Outcome::Ack
}
}
let options = ConsumerOptionsInner {
max_retries: 0,
prefetch_count: 1,
handler_timeout: None,
max_pending_per_key: None,
max_message_size: None,
shutdown: CancellationToken::new(),
processing: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: false,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: 0,
#[cfg(feature = "nats")]
max_ack_pending: None,
};
let _ =
<_ as ConsumerImpl>::run::<Dummy, DummyHandler>(c, DummyHandler, (), options.clone())
.await;
let _ = <_ as ConsumerImpl>::run_fifo::<Dummy, DummyHandler>(c, DummyHandler, (), options)
.await;
let _ = <_ as ConsumerImpl>::run_dlq::<Dummy, DummyHandler>(c, DummyHandler, ()).await;
}
#[cfg(feature = "inmemory")]
async fn _anchor_topology_impl(t: &<InMemory as Backend>::TopologyImpl) {
use crate::topic::Topic;
struct Dummy;
impl Topic for Dummy {
type Message = ();
fn topology() -> &'static QueueTopology {
unreachable!("anchor only")
}
}
let _ = <_ as TopologyImpl>::declare::<Dummy>(t).await;
}
#[cfg(feature = "inmemory")]
async fn _anchor_autoscaler_impl(_a: &<InMemory as Backend>::AutoscalerImpl) {
}
#[cfg(feature = "inmemory")]
async fn _anchor_stats_impl(s: &<InMemory as Backend>::QueueStatsImpl) {
let _ = <_ as QueueStatsProviderImpl>::snapshot(s, "q").await;
}
#[cfg(feature = "inmemory")]
async fn _anchor_registry_impl(r: &mut <InMemory as HasCoordinatedGroups>::RegistryImpl) {
use crate::handler::MessageHandler;
use crate::outcome::Outcome;
use crate::topic::Topic;
struct Dummy;
impl Topic for Dummy {
type Message = ();
fn topology() -> &'static QueueTopology {
unreachable!("anchor only")
}
}
struct DummyHandler;
impl MessageHandler<Dummy> for DummyHandler {
type Context = ();
async fn handle(&self, _m: (), _meta: MessageMetadata, _ctx: &()) -> Outcome {
Outcome::Ack
}
}
let _ = <_ as RegistryImpl>::register::<Dummy, DummyHandler>(
r,
<<InMemory as HasCoordinatedGroups>::ConsumerGroupConfig>::default(),
|| DummyHandler,
(),
)
.await;
let _: CancellationToken = <_ as RegistryImpl>::cancellation_token(r);
}
#[cfg(feature = "inmemory")]
async fn _anchor_registry_run_until_timeout(
r: <InMemory as HasCoordinatedGroups>::RegistryImpl,
) {
let _: SupervisorOutcome =
<_ as RegistryImpl>::run_until_timeout::<std::future::Pending<()>>(
r,
std::future::pending(),
std::time::Duration::ZERO,
)
.await;
}
}