1use std::time::{Duration, SystemTime, UNIX_EPOCH};
2
3use log::info;
4use rusoto_s3::S3;
5use rusoto_sqs::{SendMessageRequest, Sqs, SqsClient};
6
7use crate::cache::Cache;
8use crate::completion_event_serializer::CompletionEventSerializer;
9use crate::event_decoder::PayloadDecoder;
10
11use crate::event_handler::EventHandler;
12use crate::event_processor::{EventProcessor, EventProcessorActor};
13use crate::event_retriever::S3PayloadRetriever;
14use crate::s3_event_emitter::S3EventEmitter;
15use crate::sqs_completion_handler::{
16 CompletionPolicy, SqsCompletionHandler, SqsCompletionHandlerActor,
17};
18use crate::sqs_consumer::{ConsumePolicy, IntoDeadline, SqsConsumer, SqsConsumerActor};
19
20use std::error::Error;
21use std::future::Future;
22
23fn time_based_key_fn(_event: &[u8]) -> String {
24 let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
25 Ok(n) => n.as_millis(),
26 Err(_) => panic!("SystemTime before UNIX EPOCH!"),
27 };
28
29 let cur_day = cur_ms - (cur_ms % 86400);
30
31 format!("{}/{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4())
32}
33
34pub async fn sqs_service<
35 S3T,
36 SInit,
37 SqsT,
38 EventT,
39 CompletedEventT,
40 EventDecoderT,
41 EventEncoderT,
42 EventHandlerT,
43 CacheT,
44 OnAck,
45 EmissionResult,
46 OnEmission,
47>(
48 queue_url: impl Into<String>,
49 initial_messages: impl IntoIterator<Item = rusoto_sqs::Message>,
50 dest_bucket: impl Into<String>,
51 consume_policy: ConsumePolicy,
52 completion_policy: CompletionPolicy,
53 s3_init: SInit,
54 s3_client: S3T,
55 sqs_client: SqsT,
56 event_decoder: EventDecoderT,
57 event_encoder: EventEncoderT,
58 event_handler: EventHandlerT,
59 cache: CacheT,
60 on_ack: OnAck,
61 on_emit: OnEmission,
62) -> Result<(), Box<dyn std::error::Error>>
63where
64 S3T: S3 + Clone + Send + Sync + 'static,
65 SInit: (Fn(String) -> S3T) + Clone + Send + Sync + 'static,
66 SqsT: Sqs + Clone + Send + Sync + 'static,
67 CompletedEventT: Clone + Send + Sync + 'static,
68 EventT: Clone + Send + Sync + 'static,
69 EventDecoderT: PayloadDecoder<EventT> + Clone + Send + Sync + 'static,
70 EventEncoderT: CompletionEventSerializer<
71 CompletedEvent = CompletedEventT,
72 Output = Vec<u8>,
73 Error = <EventHandlerT as EventHandler>::Error,
74 > + Clone
75 + Send
76 + Sync
77 + 'static,
78 EventHandlerT: EventHandler<
79 InputEvent = EventT,
80 OutputEvent = CompletedEventT,
81 Error = crate::error::Error,
82 > + Clone
83 + Send
84 + Sync
85 + 'static,
86 CacheT: Cache + Clone + Send + Sync + 'static,
87 OnAck: Fn(
88 SqsCompletionHandlerActor<CompletedEventT, <EventHandlerT as EventHandler>::Error, SqsT>,
89 Result<String, String>,
90 ) + Send
91 + Sync
92 + 'static,
93 OnEmission: Fn(String, String) -> EmissionResult + Send + Sync + 'static,
94 EmissionResult:
95 Future<Output = Result<(), Box<dyn Error + Send + Sync + 'static>>> + Send + 'static,
96{
97 info!("sqs service init");
98 let queue_url = queue_url.into();
99 let dest_bucket = dest_bucket.into();
100
101 let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
102
103 let (sqs_completion_handler, _sqs_completion_handle) =
104 SqsCompletionHandlerActor::new(SqsCompletionHandler::new(
105 sqs_client.clone(),
106 queue_url.clone(),
107 event_encoder,
108 S3EventEmitter::new(
109 s3_client.clone(),
110 dest_bucket.clone(),
111 time_based_key_fn,
112 on_emit,
113 ),
114 completion_policy,
115 on_ack,
116 cache,
117 ));
118
119 let (sqs_consumer, sqs_consumer_handle) = SqsConsumerActor::new(SqsConsumer::new(
120 sqs_client.clone(),
121 queue_url.clone(),
122 consume_policy,
123 sqs_completion_handler.clone(),
124 tx,
125 ))
126 .await;
127
128 let event_processors: Vec<_> = (0..40)
129 .into_iter()
130 .map(|_| {
131 EventProcessorActor::new(EventProcessor::new(
132 sqs_consumer.clone(),
133 sqs_completion_handler.clone(),
134 event_handler.clone(),
135 S3PayloadRetriever::new(s3_init.clone(), event_decoder.clone()),
136 ))
137 })
138 .collect();
139
140 futures::future::join_all(event_processors.iter().map(|ep| ep.0.start_processing())).await;
141
142 let mut proc_iter = event_processors.iter().cycle();
143 for message in initial_messages.into_iter() {
144 let next_proc = proc_iter.next().unwrap();
145 next_proc.0.process_event(message).await;
146 }
147
148 drop(event_processors);
149 drop(sqs_consumer);
150 drop(sqs_completion_handler);
151
152 sqs_consumer_handle.await;
153 shutdown_notify.await;
154 Ok(())
155}