sqs_lambda/
local_sqs_service.rs

1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use log::info;
4use rusoto_s3::S3;
5use rusoto_sqs::{SendMessageRequest, Sqs, SqsClient};
6
7use crate::cache::Cache;
8use crate::completion_event_serializer::CompletionEventSerializer;
9use crate::event_decoder::PayloadDecoder;
10
11use crate::event_handler::EventHandler;
12use crate::event_processor::{EventProcessor, EventProcessorActor};
13use crate::event_retriever::S3PayloadRetriever;
14use crate::local_sqs_service_options::{LocalSqsServiceOptions, LocalSqsServiceOptionsBuilder};
15use crate::s3_event_emitter::S3EventEmitter;
16use crate::sqs_completion_handler::{SqsCompletionHandler, SqsCompletionHandlerActor};
17use crate::sqs_consumer::{ConsumePolicy, IntoDeadline, SqsConsumer, SqsConsumerActor};
18
19use std::error::Error;
20use std::future::Future;
21
22fn time_based_key_fn(_event: &[u8]) -> String {
23    let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
24        Ok(n) => n.as_millis(),
25        Err(_) => panic!("SystemTime before UNIX EPOCH!"),
26    };
27
28    let cur_day = cur_ms - (cur_ms % 86400);
29
30    format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
31}
32
33#[tracing::instrument(skip(
34    queue_url,
35    dest_bucket,
36    deadline,
37    s3_init,
38    s3_client,
39    sqs_client,
40    event_decoder,
41    event_encoder,
42    event_handler,
43    cache,
44    on_ack,
45    on_emit,
46    options
47))]
48pub async fn local_sqs_service_with_options<
49    S3T,
50    SInit,
51    SqsT,
52    EventT,
53    CompletedEventT,
54    EventDecoderT,
55    EventEncoderT,
56    EventHandlerT,
57    CacheT,
58    OnAck,
59    EmissionResult,
60    OnEmission,
61>(
62    queue_url: impl Into<String>,
63    dest_bucket: impl Into<String>,
64    deadline: impl IntoDeadline,
65    s3_init: SInit,
66    s3_client: S3T,
67    sqs_client: SqsT,
68    event_decoder: EventDecoderT,
69    event_encoder: EventEncoderT,
70    event_handler: EventHandlerT,
71    cache: CacheT,
72    on_ack: OnAck,
73    on_emit: OnEmission,
74    options: LocalSqsServiceOptions,
75) -> Result<(), Box<dyn std::error::Error>>
76where
77    SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
78    S3T: S3 + Clone + Send + Sync + 'static,
79    SqsT: Sqs + Clone + Send + Sync + 'static,
80    CompletedEventT: Clone + Send + Sync + 'static,
81    EventT: Clone + Send + Sync + 'static,
82    EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
83    EventEncoderT: CompletionEventSerializer<
84            CompletedEvent = CompletedEventT,
85            Output = Vec<u8>,
86            Error = <EventHandlerT as EventHandler>::Error,
87        > + Clone
88        + Send
89        + Sync
90        + 'static,
91    EventHandlerT: EventHandler<
92            InputEvent = EventT,
93            OutputEvent = CompletedEventT,
94            Error = crate::error::Error,
95        > + Clone
96        + Send
97        + Sync
98        + 'static,
99    CacheT: Cache + Clone + Send + Sync + 'static,
100    OnAck: Fn(
101            SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
102            Result<String, String>,
103        ) + Send
104        + Sync
105        + 'static,
106    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
107    EmissionResult:
108        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
109{
110    let queue_url = queue_url.into();
111    let dest_bucket = dest_bucket.into();
112
113    let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
114
115    let (sqs_completion_handler, sqs_completion_handle) =
116        SqsCompletionHandlerActor::new(SqsCompletionHandler::new(
117            sqs_client.clone(),
118            queue_url.clone(),
119            event_encoder,
120            S3EventEmitter::new(
121                s3_client.clone(),
122                dest_bucket.clone(),
123                time_based_key_fn,
124                on_emit,
125            ),
126            options.completion_policy,
127            on_ack,
128            cache,
129        ));
130
131    let (sqs_consumer, sqs_consumer_handle) = SqsConsumerActor::new(SqsConsumer::new(
132        sqs_client.clone(),
133        queue_url.clone(),
134        options.consume_policy,
135        sqs_completion_handler.clone(),
136        tx,
137    ))
138    .await;
139
140    let event_processors: Vec<_> = (0..1)
141        .into_iter()
142        .map(|_| {
143            EventProcessorActor::new(EventProcessor::new(
144                sqs_consumer.clone(),
145                sqs_completion_handler.clone(),
146                event_handler.clone(),
147                S3PayloadRetriever::new(s3_init.clone(), event_decoder.clone()),
148            ))
149        })
150        .collect();
151    info!("created event_processors");
152
153    futures::future::join_all(event_processors.iter().map(|ep| ep.0.start_processing())).await;
154
155    drop(event_processors);
156    drop(sqs_consumer);
157    drop(sqs_completion_handler);
158
159    sqs_consumer_handle.await;
160    sqs_completion_handle.await;
161
162    shutdown_notify.await;
163
164    info!("Delaying");
165    tokio::time::delay_for(Duration::from_secs(15)).await;
166    Ok(())
167}
168
169pub async fn local_sqs_service<
170    S3T,
171    SInit,
172    SqsT,
173    EventT,
174    CompletedEventT,
175    EventDecoderT,
176    EventEncoderT,
177    EventHandlerT,
178    CacheT,
179    OnAck,
180    EmissionResult,
181    OnEmission,
182>(
183    queue_url: impl Into<String>,
184    dest_bucket: impl Into<String>,
185    deadline: impl IntoDeadline,
186    s3_init: SInit,
187    s3_client: S3T,
188    sqs_client: SqsT,
189    event_decoder: EventDecoderT,
190    event_encoder: EventEncoderT,
191    event_handler: EventHandlerT,
192    cache: CacheT,
193    on_ack: OnAck,
194    on_emit: OnEmission,
195) -> Result<(), Box<dyn std::error::Error>>
196where
197    SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
198    S3T: S3 + Clone + Send + Sync + 'static,
199    SqsT: Sqs + Clone + Send + Sync + 'static,
200    CompletedEventT: Clone + Send + Sync + 'static,
201    EventT: Clone + Send + Sync + 'static,
202    EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
203    EventEncoderT: CompletionEventSerializer<
204            CompletedEvent = CompletedEventT,
205            Output = Vec<u8>,
206            Error = <EventHandlerT as EventHandler>::Error,
207        > + Clone
208        + Send
209        + Sync
210        + 'static,
211    EventHandlerT: EventHandler<
212            InputEvent = EventT,
213            OutputEvent = CompletedEventT,
214            Error = crate::error::Error,
215        > + Clone
216        + Send
217        + Sync
218        + 'static,
219    CacheT: Cache + Clone + Send + Sync + 'static,
220    OnAck: Fn(
221            SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
222            Result<String, String>,
223        ) + Send
224        + Sync
225        + 'static,
226    OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
227    EmissionResult:
228        Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
229{
230    let options = LocalSqsServiceOptionsBuilder::default().build();
231    local_sqs_service_with_options(
232        queue_url,
233        dest_bucket,
234        deadline,
235        s3_init,
236        s3_client,
237        sqs_client,
238        event_decoder,
239        event_encoder,
240        event_handler,
241        cache,
242        on_ack,
243        on_emit,
244        options,
245    )
246    .await
247}