sqs-lambda 0.21.1

SQS Service helper library
Documentation
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use log::info;
use rusoto_s3::S3;
use rusoto_sqs::{SendMessageRequest, Sqs, SqsClient};

use crate::cache::Cache;
use crate::completion_event_serializer::CompletionEventSerializer;
use crate::event_decoder::PayloadDecoder;

use crate::event_handler::EventHandler;
use crate::event_processor::{EventProcessor, EventProcessorActor};
use crate::event_retriever::S3PayloadRetriever;
use crate::local_sqs_service_options::{LocalSqsServiceOptions, LocalSqsServiceOptionsBuilder};
use crate::s3_event_emitter::S3EventEmitter;
use crate::sqs_completion_handler::{SqsCompletionHandler, SqsCompletionHandlerActor};
use crate::sqs_consumer::{ConsumePolicy, IntoDeadline, SqsConsumer, SqsConsumerActor};

use std::error::Error;
use std::future::Future;

fn time_based_key_fn(_event: &[u8]) -> String {
    let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
        Ok(n) => n.as_millis(),
        Err(_) => panic!("SystemTime before UNIX EPOCH!"),
    };

    let cur_day = cur_ms - (cur_ms % 86400);

    format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
}

#[tracing::instrument(skip(
    queue_url,
    dest_bucket,
    deadline,
    s3_init,
    s3_client,
    sqs_client,
    event_decoder,
    event_encoder,
    event_handler,
    cache,
    on_ack,
    on_emit,
    options
))]
pub async fn local_sqs_service_with_options<
    S3T,
    SInit,
    SqsT,
    EventT,
    CompletedEventT,
    EventDecoderT,
    EventEncoderT,
    EventHandlerT,
    CacheT,
    OnAck,
    EmissionResult,
    OnEmission,
>(
    queue_url: impl Into<String>,
    dest_bucket: impl Into<String>,
    deadline: impl IntoDeadline,
    s3_init: SInit,
    s3_client: S3T,
    sqs_client: SqsT,
    event_decoder: EventDecoderT,
    event_encoder: EventEncoderT,
    event_handler: EventHandlerT,
    cache: CacheT,
    on_ack: OnAck,
    on_emit: OnEmission,
    options: LocalSqsServiceOptions,
) -> Result<(), Box<dyn std::error::Error>>
where
    SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
    S3T: S3 + Clone + Send + Sync + 'static,
    SqsT: Sqs + Clone + Send + Sync + 'static,
    CompletedEventT: Clone + Send + Sync + 'static,
    EventT: Clone + Send + Sync + 'static,
    EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
    EventEncoderT: CompletionEventSerializer<
            CompletedEvent = CompletedEventT,
            Output = Vec<u8>,
            Error = <EventHandlerT as EventHandler>::Error,
        > + Clone
        + Send
        + Sync
        + 'static,
    EventHandlerT: EventHandler<
            InputEvent = EventT,
            OutputEvent = CompletedEventT,
            Error = crate::error::Error,
        > + Clone
        + Send
        + Sync
        + 'static,
    CacheT: Cache + Clone + Send + Sync + 'static,
    OnAck: Fn(
            SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
            Result<String, String>,
        ) + Send
        + Sync
        + 'static,
    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
    EmissionResult:
        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
{
    let queue_url = queue_url.into();
    let dest_bucket = dest_bucket.into();

    let (tx, shutdown_notify) = tokio::sync::oneshot::channel();

    let (sqs_completion_handler, sqs_completion_handle) =
        SqsCompletionHandlerActor::new(SqsCompletionHandler::new(
            sqs_client.clone(),
            queue_url.clone(),
            event_encoder,
            S3EventEmitter::new(
                s3_client.clone(),
                dest_bucket.clone(),
                time_based_key_fn,
                on_emit,
            ),
            options.completion_policy,
            on_ack,
            cache,
        ));

    let (sqs_consumer, sqs_consumer_handle) = SqsConsumerActor::new(SqsConsumer::new(
        sqs_client.clone(),
        queue_url.clone(),
        options.consume_policy,
        sqs_completion_handler.clone(),
        tx,
    ))
    .await;

    let event_processors: Vec<_> = (0..1)
        .into_iter()
        .map(|_| {
            EventProcessorActor::new(EventProcessor::new(
                sqs_consumer.clone(),
                sqs_completion_handler.clone(),
                event_handler.clone(),
                S3PayloadRetriever::new(s3_init.clone(), event_decoder.clone()),
            ))
        })
        .collect();
    info!("created event_processors");

    futures::future::join_all(event_processors.iter().map(|ep| ep.0.start_processing())).await;

    drop(event_processors);
    drop(sqs_consumer);
    drop(sqs_completion_handler);

    sqs_consumer_handle.await;
    sqs_completion_handle.await;

    shutdown_notify.await;

    info!("Delaying");
    tokio::time::delay_for(Duration::from_secs(15)).await;
    Ok(())
}

pub async fn local_sqs_service<
    S3T,
    SInit,
    SqsT,
    EventT,
    CompletedEventT,
    EventDecoderT,
    EventEncoderT,
    EventHandlerT,
    CacheT,
    OnAck,
    EmissionResult,
    OnEmission,
>(
    queue_url: impl Into<String>,
    dest_bucket: impl Into<String>,
    deadline: impl IntoDeadline,
    s3_init: SInit,
    s3_client: S3T,
    sqs_client: SqsT,
    event_decoder: EventDecoderT,
    event_encoder: EventEncoderT,
    event_handler: EventHandlerT,
    cache: CacheT,
    on_ack: OnAck,
    on_emit: OnEmission,
) -> Result<(), Box<dyn std::error::Error>>
where
    SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
    S3T: S3 + Clone + Send + Sync + 'static,
    SqsT: Sqs + Clone + Send + Sync + 'static,
    CompletedEventT: Clone + Send + Sync + 'static,
    EventT: Clone + Send + Sync + 'static,
    EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
    EventEncoderT: CompletionEventSerializer<
            CompletedEvent = CompletedEventT,
            Output = Vec<u8>,
            Error = <EventHandlerT as EventHandler>::Error,
        > + Clone
        + Send
        + Sync
        + 'static,
    EventHandlerT: EventHandler<
            InputEvent = EventT,
            OutputEvent = CompletedEventT,
            Error = crate::error::Error,
        > + Clone
        + Send
        + Sync
        + 'static,
    CacheT: Cache + Clone + Send + Sync + 'static,
    OnAck: Fn(
            SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
            Result<String, String>,
        ) + Send
        + Sync
        + 'static,
    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
    EmissionResult:
        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
{
    let options = LocalSqsServiceOptionsBuilder::default().build();
    local_sqs_service_with_options(
        queue_url,
        dest_bucket,
        deadline,
        s3_init,
        s3_client,
        sqs_client,
        event_decoder,
        event_encoder,
        event_handler,
        cache,
        on_ack,
        on_emit,
        options,
    )
    .await
}