use crate::autoscale_metrics::AutoscaleMetrics;
use crate::backend::{
AutoscalerBackendImpl, Backend, ConsumerImpl, ConsumerOptionsInner, QueueStatsProviderImpl,
RegistryImpl, TopologyImpl, capability::HasCoordinatedGroups, sealed,
};
use crate::consumer_supervisor::SupervisorOutcome;
use crate::error::Result;
use crate::handler::MessageHandler;
use crate::markers::Kafka;
use crate::topic::{SequencedTopic, Topic};
use std::future::Future;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use super::autoscaler::{KafkaAutoscalerBackend, KafkaLagStatsProvider, KafkaQueueStatsProvider};
use super::client::{KafkaClient, KafkaConfig};
use super::consumer::KafkaConsumer;
use super::consumer_group::{KafkaConsumerGroupConfig, KafkaConsumerGroupRegistry};
use super::publisher::KafkaPublisher;
use super::topology::KafkaTopologyDeclarer;
impl sealed::Sealed for Kafka {}
impl Backend for Kafka {
type Config = KafkaConfig;
type Client = KafkaClient;
type PublisherImpl = KafkaPublisher;
type ConsumerImpl = KafkaConsumer;
type TopologyImpl = KafkaTopologyDeclarer;
type AutoscalerImpl = KafkaAutoscalerBackend<KafkaLagStatsProvider>;
type QueueStatsImpl = KafkaLagStatsProvider;
async fn connect(config: Self::Config) -> Result<Self::Client> {
KafkaClient::connect(&config).await
}
async fn make_publisher(client: &Self::Client) -> Result<Self::PublisherImpl> {
KafkaPublisher::new(client.clone()).await
}
fn make_consumer(client: &Self::Client) -> Self::ConsumerImpl {
KafkaConsumer::new(client.clone())
}
fn make_declarer(client: &Self::Client) -> Self::TopologyImpl {
KafkaTopologyDeclarer::new(client.clone())
}
fn make_autoscaler(client: &Self::Client) -> Self::AutoscalerImpl {
use std::sync::Arc;
use tokio::sync::Mutex;
let registry = Arc::new(Mutex::new(KafkaConsumerGroupRegistry::new(client.clone())));
KafkaAutoscalerBackend::new(client.clone(), registry)
}
fn make_stats_provider(client: &Self::Client) -> Self::QueueStatsImpl {
KafkaLagStatsProvider::new(client.clone())
}
async fn close(client: &Self::Client) {
client.shutdown().await;
}
}
impl HasCoordinatedGroups for Kafka {
type ConsumerGroupConfig = KafkaConsumerGroupConfig;
type RegistryImpl = KafkaConsumerGroupRegistry;
fn make_registry(client: &Self::Client) -> Self::RegistryImpl {
KafkaConsumerGroupRegistry::new(client.clone())
}
}
impl ConsumerImpl for KafkaConsumer {
async fn run<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
KafkaConsumer::run_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_fifo<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
KafkaConsumer::run_fifo_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_dlq<T, H>(&self, handler: H, ctx: H::Context) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
KafkaConsumer::run_dlq::<T, H>(self, handler, ctx).await
}
async fn spawn_fifo_shards<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<Vec<tokio::task::JoinHandle<Result<()>>>>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
KafkaConsumer::spawn_fifo_shards::<T, H>(self, handler, ctx, options)
}
}
impl TopologyImpl for KafkaTopologyDeclarer {
async fn declare<T: Topic>(&self) -> Result<()> {
KafkaTopologyDeclarer::declare(self, T::topology()).await
}
}
impl AutoscalerBackendImpl for KafkaAutoscalerBackend<KafkaLagStatsProvider> {}
impl QueueStatsProviderImpl for KafkaLagStatsProvider {
async fn snapshot(&self, queue: &str) -> Result<AutoscaleMetrics> {
let stats = self.get_queue_stats(queue).await?;
Ok(AutoscaleMetrics {
backlog: Some(stats.messages_pending),
inflight: Some(stats.messages_in_flight),
throughput_per_sec: None,
processing_latency: None,
})
}
}
impl RegistryImpl for KafkaConsumerGroupRegistry {
type GroupConfig = KafkaConsumerGroupConfig;
async fn register<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
KafkaConsumerGroupRegistry::register::<T, H>(self, config, factory, ctx).await
}
async fn register_fifo<T, H>(
&mut self,
config: Self::GroupConfig,
factory: impl Fn() -> H + Send + Sync + 'static,
ctx: H::Context,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
KafkaConsumerGroupRegistry::register_fifo::<T, H>(self, config, factory, ctx).await
}
fn cancellation_token(&self) -> CancellationToken {
self.client_shutdown_token()
}
fn set_default_handler_timeout(&mut self, timeout: std::time::Duration) {
self.default_handler_timeout = Some(timeout);
}
async fn run_until_timeout<S>(mut self, signal: S, drain_timeout: Duration) -> SupervisorOutcome
where
S: Future<Output = ()> + Send + 'static,
{
self.start_all();
let broker_token = self.client_shutdown_token();
let signal_handle = tokio::spawn(signal);
tokio::select! {
_ = broker_token.cancelled() => {}
res = signal_handle => {
let _ = res;
broker_token.cancel();
}
}
let drain = self.shutdown_all_with_tally();
match tokio::time::timeout(drain_timeout, drain).await {
Ok(tally) => SupervisorOutcome {
errors: tally.errors,
panics: tally.panics,
timed_out: false,
},
Err(_) => SupervisorOutcome {
errors: 0,
panics: 0,
timed_out: true,
},
}
}
}