#![cfg(feature = "aws-sns-sqs")]
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::autoscale_metrics::AutoscaleMetrics;
use crate::backend::{
AutoscalerBackendImpl, Backend, ConsumerImpl, ConsumerOptionsInner, QueueStatsProviderImpl,
TopologyImpl, sealed,
};
use crate::error::Result;
use crate::handler::MessageHandler;
use crate::markers::Sqs;
use crate::topic::{SequencedTopic, Topic};
use super::autoscaler::SqsAutoscalerBackend;
use super::client::{SnsClient, SnsConfig};
use super::consumer::SqsConsumer;
use super::publisher::SnsPublisher;
use super::registry::SqsConsumerGroupRegistry;
use super::stats::SqsQueueStatsProvider;
use super::topology::SnsTopologyDeclarer;
impl sealed::Sealed for Sqs {}
impl Backend for Sqs {
type Config = SnsConfig;
type Client = SnsClient;
type PublisherImpl = SnsPublisher;
type ConsumerImpl = SqsConsumer;
type TopologyImpl = SnsTopologyDeclarer;
type AutoscalerImpl = SqsAutoscalerBackend<SqsQueueStatsProvider>;
type QueueStatsImpl = SqsQueueStatsProvider;
async fn connect(config: Self::Config) -> Result<Self::Client> {
SnsClient::new(&config).await
}
async fn make_publisher(client: &Self::Client) -> Result<Self::PublisherImpl> {
Ok(SnsPublisher::new(
client.clone(),
client.topic_registry().clone(),
))
}
fn make_consumer(client: &Self::Client) -> Self::ConsumerImpl {
SqsConsumer::new(client.clone(), client.queue_registry().clone())
}
fn make_declarer(client: &Self::Client) -> Self::TopologyImpl {
SnsTopologyDeclarer::new(client.clone())
}
fn make_autoscaler(client: &Self::Client) -> Self::AutoscalerImpl {
let stats_provider =
SqsQueueStatsProvider::new(client.clone(), client.queue_registry().clone());
let registry = Arc::new(Mutex::new(SqsConsumerGroupRegistry::new(client.clone())));
SqsAutoscalerBackend::new(stats_provider, registry)
}
fn make_stats_provider(client: &Self::Client) -> Self::QueueStatsImpl {
SqsQueueStatsProvider::new(client.clone(), client.queue_registry().clone())
}
async fn close(client: &Self::Client) {
client.shutdown().await;
}
async fn ping(client: &Self::Client, timeout: std::time::Duration) -> Result<()> {
client.ping(timeout).await
}
}
impl ConsumerImpl for SqsConsumer {
async fn run<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
SqsConsumer::run_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_fifo<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<()>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
SqsConsumer::run_fifo_with_inner::<T, H>(self, handler, ctx, options).await
}
async fn run_dlq<T, H>(
&self,
handler: H,
ctx: H::Context,
_options: ConsumerOptionsInner,
) -> Result<()>
where
T: Topic,
H: MessageHandler<T>,
{
SqsConsumer::run_dlq::<T, H>(self, handler, ctx).await
}
async fn spawn_fifo_shards<T, H>(
&self,
handler: H,
ctx: H::Context,
options: ConsumerOptionsInner,
) -> Result<Vec<tokio::task::JoinHandle<Result<()>>>>
where
T: SequencedTopic,
H: MessageHandler<T>,
{
SqsConsumer::spawn_fifo_shards::<T, H>(self, handler, ctx, options).await
}
}
impl TopologyImpl for SnsTopologyDeclarer {
async fn declare<T: Topic>(&self) -> Result<()> {
SnsTopologyDeclarer::declare(self, T::topology()).await
}
}
impl AutoscalerBackendImpl for SqsAutoscalerBackend<SqsQueueStatsProvider> {}
impl QueueStatsProviderImpl for SqsQueueStatsProvider {
async fn snapshot(&self, queue: &str) -> Result<AutoscaleMetrics> {
use super::stats::SqsQueueStatsProviderTrait;
let stats = self.get_queue_stats(queue).await?;
Ok(AutoscaleMetrics {
backlog: Some(stats.messages_ready),
inflight: Some(stats.messages_not_visible),
throughput_per_sec: None,
processing_latency: None,
})
}
}