1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use log::info;
4use rusoto_s3::S3;
5use rusoto_sqs::{SendMessageRequest, Sqs, SqsClient};
6
7use crate::cache::Cache;
8use crate::completion_event_serializer::CompletionEventSerializer;
9use crate::event_decoder::PayloadDecoder;
10
11use crate::event_handler::EventHandler;
12use crate::event_processor::{EventProcessor, EventProcessorActor};
13use crate::event_retriever::S3PayloadRetriever;
14use crate::local_sqs_service_options::{LocalSqsServiceOptions, LocalSqsServiceOptionsBuilder};
15use crate::s3_event_emitter::S3EventEmitter;
16use crate::sqs_completion_handler::{SqsCompletionHandler, SqsCompletionHandlerActor};
17use crate::sqs_consumer::{ConsumePolicy, IntoDeadline, SqsConsumer, SqsConsumerActor};
18
19use std::error::Error;
20use std::future::Future;
21
22fn time_based_key_fn(_event: &[u8]) -> String {
23 let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
24 Ok(n) => n.as_millis(),
25 Err(_) => panic!("SystemTime before UNIX EPOCH!"),
26 };
27
28 let cur_day = cur_ms - (cur_ms % 86400);
29
30 format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
31}
32
33#[tracing::instrument(skip(
34 queue_url,
35 dest_bucket,
36 deadline,
37 s3_init,
38 s3_client,
39 sqs_client,
40 event_decoder,
41 event_encoder,
42 event_handler,
43 cache,
44 on_ack,
45 on_emit,
46 options
47))]
48pub async fn local_sqs_service_with_options<
49 S3T,
50 SInit,
51 SqsT,
52 EventT,
53 CompletedEventT,
54 EventDecoderT,
55 EventEncoderT,
56 EventHandlerT,
57 CacheT,
58 OnAck,
59 EmissionResult,
60 OnEmission,
61>(
62 queue_url: impl Into<String>,
63 dest_bucket: impl Into<String>,
64 deadline: impl IntoDeadline,
65 s3_init: SInit,
66 s3_client: S3T,
67 sqs_client: SqsT,
68 event_decoder: EventDecoderT,
69 event_encoder: EventEncoderT,
70 event_handler: EventHandlerT,
71 cache: CacheT,
72 on_ack: OnAck,
73 on_emit: OnEmission,
74 options: LocalSqsServiceOptions,
75) -> Result<(), Box<dyn std::error::Error>>
76where
77 SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
78 S3T: S3 + Clone + Send + Sync + 'static,
79 SqsT: Sqs + Clone + Send + Sync + 'static,
80 CompletedEventT: Clone + Send + Sync + 'static,
81 EventT: Clone + Send + Sync + 'static,
82 EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
83 EventEncoderT: CompletionEventSerializer<
84 CompletedEvent = CompletedEventT,
85 Output = Vec<u8>,
86 Error = <EventHandlerT as EventHandler>::Error,
87 > + Clone
88 + Send
89 + Sync
90 + 'static,
91 EventHandlerT: EventHandler<
92 InputEvent = EventT,
93 OutputEvent = CompletedEventT,
94 Error = crate::error::Error,
95 > + Clone
96 + Send
97 + Sync
98 + 'static,
99 CacheT: Cache + Clone + Send + Sync + 'static,
100 OnAck: Fn(
101 SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
102 Result<String, String>,
103 ) + Send
104 + Sync
105 + 'static,
106 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
107 EmissionResult:
108 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
109{
110 let queue_url = queue_url.into();
111 let dest_bucket = dest_bucket.into();
112
113 let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
114
115 let (sqs_completion_handler, sqs_completion_handle) =
116 SqsCompletionHandlerActor::new(SqsCompletionHandler::new(
117 sqs_client.clone(),
118 queue_url.clone(),
119 event_encoder,
120 S3EventEmitter::new(
121 s3_client.clone(),
122 dest_bucket.clone(),
123 time_based_key_fn,
124 on_emit,
125 ),
126 options.completion_policy,
127 on_ack,
128 cache,
129 ));
130
131 let (sqs_consumer, sqs_consumer_handle) = SqsConsumerActor::new(SqsConsumer::new(
132 sqs_client.clone(),
133 queue_url.clone(),
134 options.consume_policy,
135 sqs_completion_handler.clone(),
136 tx,
137 ))
138 .await;
139
140 let event_processors: Vec<_> = (0..1)
141 .into_iter()
142 .map(|_| {
143 EventProcessorActor::new(EventProcessor::new(
144 sqs_consumer.clone(),
145 sqs_completion_handler.clone(),
146 event_handler.clone(),
147 S3PayloadRetriever::new(s3_init.clone(), event_decoder.clone()),
148 ))
149 })
150 .collect();
151 info!("created event_processors");
152
153 futures::future::join_all(event_processors.iter().map(|ep| ep.0.start_processing())).await;
154
155 drop(event_processors);
156 drop(sqs_consumer);
157 drop(sqs_completion_handler);
158
159 sqs_consumer_handle.await;
160 sqs_completion_handle.await;
161
162 shutdown_notify.await;
163
164 info!("Delaying");
165 tokio::time::delay_for(Duration::from_secs(15)).await;
166 Ok(())
167}
168
169pub async fn local_sqs_service<
170 S3T,
171 SInit,
172 SqsT,
173 EventT,
174 CompletedEventT,
175 EventDecoderT,
176 EventEncoderT,
177 EventHandlerT,
178 CacheT,
179 OnAck,
180 EmissionResult,
181 OnEmission,
182>(
183 queue_url: impl Into<String>,
184 dest_bucket: impl Into<String>,
185 deadline: impl IntoDeadline,
186 s3_init: SInit,
187 s3_client: S3T,
188 sqs_client: SqsT,
189 event_decoder: EventDecoderT,
190 event_encoder: EventEncoderT,
191 event_handler: EventHandlerT,
192 cache: CacheT,
193 on_ack: OnAck,
194 on_emit: OnEmission,
195) -> Result<(), Box<dyn std::error::Error>>
196where
197 SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
198 S3T: S3 + Clone + Send + Sync + 'static,
199 SqsT: Sqs + Clone + Send + Sync + 'static,
200 CompletedEventT: Clone + Send + Sync + 'static,
201 EventT: Clone + Send + Sync + 'static,
202 EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
203 EventEncoderT: CompletionEventSerializer<
204 CompletedEvent = CompletedEventT,
205 Output = Vec<u8>,
206 Error = <EventHandlerT as EventHandler>::Error,
207 > + Clone
208 + Send
209 + Sync
210 + 'static,
211 EventHandlerT: EventHandler<
212 InputEvent = EventT,
213 OutputEvent = CompletedEventT,
214 Error = crate::error::Error,
215 > + Clone
216 + Send
217 + Sync
218 + 'static,
219 CacheT: Cache + Clone + Send + Sync + 'static,
220 OnAck: Fn(
221 SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
222 Result<String, String>,
223 ) + Send
224 + Sync
225 + 'static,
226 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
227 EmissionResult:
228 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
229{
230 let options = LocalSqsServiceOptionsBuilder::default().build();
231 local_sqs_service_with_options(
232 queue_url,
233 dest_bucket,
234 deadline,
235 s3_init,
236 s3_client,
237 sqs_client,
238 event_decoder,
239 event_encoder,
240 event_handler,
241 cache,
242 on_ack,
243 on_emit,
244 options,
245 )
246 .await
247}