use std::future::Future;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::autoscale_metrics::AutoscaleMetrics;
use crate::backend::{
AutoscalerBackendImpl, Backend, QueueStatsProviderImpl, RegistryImpl, TopologyImpl,
capability::HasCoordinatedGroups, sealed,
};
use crate::consumer_supervisor::SupervisorOutcome;
use crate::error::Result;
use crate::handler::MessageHandler;
use crate::markers::Redis;
use crate::topic::{SequencedTopic, Topic};
use super::autoscaler::{RedisAutoscalerBackend, XlenStatsProvider};
use super::client::{RedisClient, RedisConfig};
use super::consumer::RedisConsumer;
use super::consumer_group::{RedisConsumerGroupConfig, RedisConsumerGroupRegistry};
use super::publisher::RedisPublisher;
use super::topology::RedisTopologyDeclarer;
impl sealed::Sealed for Redis {}
impl Backend for Redis {
type Config = RedisConfig;
type Client = RedisClient;
type PublisherImpl = RedisPublisher;
type ConsumerImpl = RedisConsumer;
type TopologyImpl = RedisTopologyDeclarer;
type AutoscalerImpl = RedisAutoscalerBackend<XlenStatsProvider>;
type QueueStatsImpl = XlenStatsProvider;
async fn connect(config: Self::Config) -> Result<Self::Client> {
RedisClient::connect(config).await
}
async fn make_publisher(client: &Self::Client) -> Result<Self::PublisherImpl> {
Ok(RedisPublisher::new(client.clone()))
}
fn make_consumer(client: &Self::Client) -> Self::ConsumerImpl {
RedisConsumer::new(client.clone())
}
fn make_declarer(client: &Self::Client) -> Self::TopologyImpl {
RedisTopologyDeclarer::new(client.clone())
}
fn make_autoscaler(client: &Self::Client) -> Self::AutoscalerImpl {
use std::sync::Arc;
use tokio::sync::Mutex;
let registry = Arc::new(Mutex::new(RedisConsumerGroupRegistry::new(client.clone())));
RedisAutoscalerBackend::new(client.clone(), registry)
}
fn make_stats_provider(client: &Self::Client) -> Self::QueueStatsImpl {
XlenStatsProvider::new(client.clone())
}
async fn close(_client: &Self::Client) {
}
async fn ping(client: &Self::Client, timeout: std::time::Duration) -> Result<()> {
client.ping(timeout).await
}
}
impl HasCoordinatedGroups for Redis {
type ConsumerGroupConfig = RedisConsumerGroupConfig;
type RegistryImpl = RedisConsumerGroupRegistry;
fn make_registry(client: &Self::Client) -> Self::RegistryImpl {
RedisConsumerGroupRegistry::new(client.clone())
}
}
impl TopologyImpl for RedisTopologyDeclarer {
async fn declare<T: Topic>(&self) -> Result<()> {
RedisTopologyDeclarer::declare(self, T::topology()).await
}
}
impl RegistryImpl for RedisConsumerGroupRegistry {
type GroupConfig = RedisConsumerGroupConfig;
async fn register<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
RedisConsumerGroupRegistry::register::<T, H>(self, config, factory, ctx).await
}
async fn register_fifo<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
RedisConsumerGroupRegistry::register_fifo::<T, H>(self, config, factory, ctx).await
}
fn cancellation_token(&self) -> CancellationToken {
self.broker_shutdown_token()
}
fn set_default_handler_timeout(&mut self, timeout: std::time::Duration) {
self.default_handler_timeout = Some(timeout);
}
async fn run_until_timeout<S>(self, signal: S, drain_timeout: Duration) -> SupervisorOutcome
where
S: Future<Output = ()> + Send + 'static,
{
RedisConsumerGroupRegistry::run_until_timeout(self, signal, drain_timeout).await
}
}