sqs_lambda/
sqs_service.rs

1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use log::info;
4use rusoto_s3::S3;
5use rusoto_sqs::{SendMessageRequest, Sqs, SqsClient};
6
7use crate::cache::Cache;
8use crate::completion_event_serializer::CompletionEventSerializer;
9use crate::event_decoder::PayloadDecoder;
10
11use crate::event_handler::EventHandler;
12use crate::event_processor::{EventProcessor, EventProcessorActor};
13use crate::event_retriever::S3PayloadRetriever;
14use crate::s3_event_emitter::S3EventEmitter;
15use crate::sqs_completion_handler::{
16    CompletionPolicy, SqsCompletionHandler, SqsCompletionHandlerActor,
17};
18use crate::sqs_consumer::{ConsumePolicy, IntoDeadline, SqsConsumer, SqsConsumerActor};
19
20use std::error::Error;
21use std::future::Future;
22
23fn time_based_key_fn(_event: &[u8]) -> String {
24    let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
25        Ok(n) => n.as_millis(),
26        Err(_) => panic!("SystemTime before UNIX EPOCH!"),
27    };
28
29    let cur_day = cur_ms - (cur_ms % 86400);
30
31    format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
32}
33
34pub async fn sqs_service<
35    S3T,
36    SInit,
37    SqsT,
38    EventT,
39    CompletedEventT,
40    EventDecoderT,
41    EventEncoderT,
42    EventHandlerT,
43    CacheT,
44    OnAck,
45    EmissionResult,
46    OnEmission,
47>(
48    queue_url: impl Into<String>,
49    initial_messages: impl IntoIterator<Item = rusoto_sqs::Message>,
50    dest_bucket: impl Into<String>,
51    consume_policy: ConsumePolicy,
52    completion_policy: CompletionPolicy,
53    s3_init: SInit,
54    s3_client: S3T,
55    sqs_client: SqsT,
56    event_decoder: EventDecoderT,
57    event_encoder: EventEncoderT,
58    event_handler: EventHandlerT,
59    cache: CacheT,
60    on_ack: OnAck,
61    on_emit: OnEmission,
62) -> Result<(), Box<dyn std::error::Error>>
63where
64    S3T: S3 + Clone + Send + Sync + 'static,
65    SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
66    SqsT: Sqs + Clone + Send + Sync + 'static,
67    CompletedEventT: Clone + Send + Sync + 'static,
68    EventT: Clone + Send + Sync + 'static,
69    EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
70    EventEncoderT: CompletionEventSerializer<
71            CompletedEvent = CompletedEventT,
72            Output = Vec<u8>,
73            Error = <EventHandlerT as EventHandler>::Error,
74        > + Clone
75        + Send
76        + Sync
77        + 'static,
78    EventHandlerT: EventHandler<
79            InputEvent = EventT,
80            OutputEvent = CompletedEventT,
81            Error = crate::error::Error,
82        > + Clone
83        + Send
84        + Sync
85        + 'static,
86    CacheT: Cache + Clone + Send + Sync + 'static,
87    OnAck: Fn(
88            SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
89            Result<String, String>,
90        ) + Send
91        + Sync
92        + 'static,
93    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
94    EmissionResult:
95        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
96{
97    info!("sqs service init");
98    let queue_url = queue_url.into();
99    let dest_bucket = dest_bucket.into();
100
101    let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
102
103    let (sqs_completion_handler, _sqs_completion_handle) =
104        SqsCompletionHandlerActor::new(SqsCompletionHandler::new(
105            sqs_client.clone(),
106            queue_url.clone(),
107            event_encoder,
108            S3EventEmitter::new(
109                s3_client.clone(),
110                dest_bucket.clone(),
111                time_based_key_fn,
112                on_emit,
113            ),
114            completion_policy,
115            on_ack,
116            cache,
117        ));
118
119    let (sqs_consumer, sqs_consumer_handle) = SqsConsumerActor::new(SqsConsumer::new(
120        sqs_client.clone(),
121        queue_url.clone(),
122        consume_policy,
123        sqs_completion_handler.clone(),
124        tx,
125    ))
126    .await;
127
128    let event_processors: Vec<_> = (0..40)
129        .into_iter()
130        .map(|_| {
131            EventProcessorActor::new(EventProcessor::new(
132                sqs_consumer.clone(),
133                sqs_completion_handler.clone(),
134                event_handler.clone(),
135                S3PayloadRetriever::new(s3_init.clone(), event_decoder.clone()),
136            ))
137        })
138        .collect();
139
140    futures::future::join_all(event_processors.iter().map(|ep| ep.0.start_processing())).await;
141
142    let mut proc_iter = event_processors.iter().cycle();
143    for message in initial_messages.into_iter() {
144        let next_proc = proc_iter.next().unwrap();
145        next_proc.0.process_event(message).await;
146    }
147
148    drop(event_processors);
149    drop(sqs_consumer);
150    drop(sqs_completion_handler);
151
152    sqs_consumer_handle.await;
153    shutdown_notify.await;
154    Ok(())
155}