use crate::autoscale_metrics::AutoscaleMetrics;
use crate::backend::{
AutoscalerBackendImpl, Backend, ConsumerImpl, ConsumerOptionsInner, QueueStatsProviderImpl,
RegistryImpl, TopologyImpl, capability::HasCoordinatedGroups, sealed,
};
use crate::consumer_supervisor::SupervisorOutcome;
use crate::error::{Result, ShoveError};
use crate::handler::MessageHandler;
use crate::markers::RabbitMq;
use crate::topic::{SequencedTopic, Topic};
use std::future::Future;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use super::autoscaler::RabbitMqAutoscalerBackend;
use super::client::{RabbitMqClient, RabbitMqConfig};
use super::consumer::RabbitMqConsumer;
use super::consumer_group::ConsumerGroupConfig;
use super::publisher::RabbitMqPublisher;
use super::registry::ConsumerGroupRegistry;
use super::topology::RabbitMqTopologyDeclarer;
pub struct LazyRabbitMqTopologyDeclarer {
client: RabbitMqClient,
}
impl LazyRabbitMqTopologyDeclarer {
fn new(client: RabbitMqClient) -> Self {
Self { client }
}
}
pub struct NoopQueueStatsProvider;
impl sealed::Sealed for RabbitMq {}
impl Backend for RabbitMq {
type Config = RabbitMqConfig;
type Client = RabbitMqClient;
type PublisherImpl = RabbitMqPublisher;
type ConsumerImpl = RabbitMqConsumer;
type TopologyImpl = LazyRabbitMqTopologyDeclarer;
type AutoscalerImpl = RabbitMqAutoscalerBackend<NoopQueueStatsProvider>;
type QueueStatsImpl = NoopQueueStatsProvider;
async fn connect(config: Self::Config) -> Result<Self::Client> {
RabbitMqClient::connect(&config).await
}
async fn make_publisher(client: &Self::Client) -> Result<Self::PublisherImpl> {
RabbitMqPublisher::new(client.clone()).await
}
fn make_consumer(client: &Self::Client) -> Self::ConsumerImpl {
RabbitMqConsumer::new(client.clone())
}
fn make_declarer(client: &Self::Client) -> Self::TopologyImpl {
LazyRabbitMqTopologyDeclarer::new(client.clone())
}
fn make_autoscaler(client: &Self::Client) -> Self::AutoscalerImpl {
use std::sync::Arc;
use tokio::sync::Mutex;
let registry = Arc::new(Mutex::new(ConsumerGroupRegistry::new(client.clone())));
RabbitMqAutoscalerBackend::with_stats_provider(NoopQueueStatsProvider, registry)
}
fn make_stats_provider(_client: &Self::Client) -> Self::QueueStatsImpl {
NoopQueueStatsProvider
}
async fn close(client: &Self::Client) {
client.shutdown().await;
}
}
impl HasCoordinatedGroups for RabbitMq {
type ConsumerGroupConfig = ConsumerGroupConfig;
type RegistryImpl = ConsumerGroupRegistry;
fn make_registry(client: &Self::Client) -> Self::RegistryImpl {
ConsumerGroupRegistry::new(client.clone())
}
}
impl ConsumerImpl for RabbitMqConsumer {
async fn run<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
RabbitMqConsumer::run_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_fifo<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
RabbitMqConsumer::run_fifo_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_dlq<T, H>(&self, handler: H, ctx: H::Context) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
RabbitMqConsumer::run_dlq::<T, H>(self, handler, ctx).await
}
async fn spawn_fifo_shards<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<Vec<tokio::task::JoinHandle<Result<()>>>>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
RabbitMqConsumer::spawn_fifo_shards::<T, H>(self, handler, ctx, options)
}
}
impl TopologyImpl for LazyRabbitMqTopologyDeclarer {
async fn declare<T: Topic>(&self) -> Result<()> {
let channel = self.client.create_channel().await?;
let declarer = RabbitMqTopologyDeclarer::new(channel);
RabbitMqTopologyDeclarer::declare(&declarer, T::topology()).await
}
}
impl AutoscalerBackendImpl for RabbitMqAutoscalerBackend<NoopQueueStatsProvider> {}
impl super::management::QueueStatsProvider for NoopQueueStatsProvider {
async fn get_queue_stats(&self, _queue: &str) -> Result<super::management::QueueStats> {
Err(ShoveError::Topology(
"QueueStatsProvider requires ManagementConfig; \
construct a ManagementClient directly for real queue stats"
.into(),
))
}
}
impl QueueStatsProviderImpl for NoopQueueStatsProvider {
async fn snapshot(&self, _queue: &str) -> Result<AutoscaleMetrics> {
Err(ShoveError::Topology(
"QueueStatsProvider requires ManagementConfig; \
construct a ManagementClient directly for real queue stats"
.into(),
))
}
}
impl RegistryImpl for ConsumerGroupRegistry {
type GroupConfig = ConsumerGroupConfig;
async fn register<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
ConsumerGroupRegistry::register::<T, H>(self, config, factory, ctx).await
}
async fn register_fifo<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
ConsumerGroupRegistry::register_fifo::<T, H>(self, config, factory, ctx).await
}
fn cancellation_token(&self) -> CancellationToken {
self.client_shutdown_token()
}
fn set_default_handler_timeout(&mut self, timeout: std::time::Duration) {
self.default_handler_timeout = Some(timeout);
}
async fn run_until_timeout<S>(mut self, signal: S, drain_timeout: Duration) -> SupervisorOutcome
where
S: Future<Output = ()> + Send + 'static,
{
self.start_all();
let broker_token = self.client_shutdown_token();
let signal_handle = tokio::spawn(signal);
tokio::select! {
_ = broker_token.cancelled() => {}
res = signal_handle => {
let _ = res;
broker_token.cancel();
}
}
let drain = self.shutdown_all_with_tally();
match tokio::time::timeout(drain_timeout, drain).await {
Ok(tally) => SupervisorOutcome {
errors: tally.errors,
panics: tally.panics,
timed_out: false,
},
Err(_) => SupervisorOutcome {
errors: 0,
panics: 0,
timed_out: true,
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn noop_stats_provider_snapshot_errors() {
let provider = NoopQueueStatsProvider;
let err =
<NoopQueueStatsProvider as QueueStatsProviderImpl>::snapshot(&provider, "whatever")
.await
.expect_err("NoopQueueStatsProvider must refuse to produce metrics");
assert!(matches!(err, ShoveError::Topology(_)));
assert!(err.to_string().contains("ManagementConfig"));
}
#[tokio::test]
async fn noop_stats_provider_queue_stats_errors() {
use super::super::management::QueueStatsProvider as MgmtProvider;
let provider = NoopQueueStatsProvider;
let err = <NoopQueueStatsProvider as MgmtProvider>::get_queue_stats(&provider, "whatever")
.await
.expect_err("NoopQueueStatsProvider must refuse to produce stats");
assert!(matches!(err, ShoveError::Topology(_)));
}
}