1use std::fmt::Debug;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use thiserror::Error;
20use tokio::sync::{Mutex, mpsc};
21
22use crate::config::Config;
23use crate::consumer::consumer::ConsumeAttemptResult;
24use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator};
25use crate::database::Database;
26use crate::emitter::emitter::EmissionState;
27use crate::processor::Processor;
28use crate::processor::processor::ProcessorHandles;
29use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
30use crate::worker::worker_manager::WorkerManagerResult;
31
32pub struct SimpleProcessor<CFG: Config, TR, TA: TransformAttempt, TAC, CA: ConsumeAttempt, CAC, DB>
33{
34 emitter_output_recv: mpsc::Receiver<TR>,
35 _emitter_hints: mpsc::Sender<EmissionState>,
36 _emitter_state: EmissionState,
37
38 current_in_process_transform_attempts: u32,
39 worker_manager_input_sender: mpsc::Sender<TA>,
40 worker_manager_output_recv: mpsc::Receiver<WorkerManagerResult<TA>>,
41
42 consumer_input_sender: mpsc::Sender<CA>,
43 consumer_output_recv: mpsc::Receiver<ConsumeAttemptResult<CA>>,
44
45 kill_signal_receiver: mpsc::Receiver<()>,
46
47 database: DB,
48
49 transform_attempt_creator: TAC,
50 consume_attempt_creator: CAC,
51
52 _max_in_process_transform_attempts: u32,
53 _config: Arc<Mutex<CFG>>,
54}
55
56#[derive(Error, Debug)]
57pub enum ProcessorError {
58 #[error("max in-process transform attempts reached")]
59 MaxInProcessTransformAttemptsReached,
60
61 #[error("failed creating transform attempt: {0}")]
62 TransformAttemptCreationFailed(String),
63
64 #[error("failed sending to worker manager input channel")]
65 WorkerManagerInputSendFailed(String),
66
67 #[error("failed creating consumption attempt: {0}")]
68 ConsumeAttemptCreationFailed(String),
69
70 #[error("failed sending to consumer input channel")]
71 ConsumerInputSendFailed(String),
72
73 #[error("database error: {0}")]
74 DatabaseError(String),
75
76 #[error("unknown error occurred")]
77 Unknown,
78}
79
80#[async_trait]
81impl<CFG, TR, TA, TAC, CA, CAC, DB> Processor for SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
82where
83 CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
84 TR: TransformRequest,
85 TA: TransformAttempt<
86 TransformRequestIdentifier = TR::Identifier,
87 CallArgsType = TR::Input,
88 ReturnType = TR::Output,
89 >,
90 TAC: TransformAttemptCreator<
91 TransformRequest = TR,
92 TransformAttempt = TA,
93 Input = TR::Input,
94 Output = TR::Output,
95 >,
96 CA: ConsumeAttempt<
97 TransformRequestIdentifier = TR::Identifier,
98 TransformAttemptIdentifier = TA::Identifier,
99 ConsumeVal = TR::Output,
100 >,
101 CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
102 DB: Database<
103 TransformRequest = TR,
104 TransformAttempt = TA,
105 ConsumeAttempt = CA,
106 Input = TR::Input,
107 Output = TR::Output,
108 >,
109{
110 type Config = CFG;
111 type ConsumeAttempt = CA;
112 type ConsumeAttemptCreator = CAC;
113 type Database = DB;
114 type Input = TR::Input;
115 type Output = TR::Output;
116 type ProcessorError = ProcessorError;
117 type TransformAttempt = TA;
118 type TransformAttemptCreator = TAC;
119 type TransformRequest = TR;
120
121 async fn new(
122 init_config: Arc<Mutex<CFG>>,
123 database: Self::Database,
124 transform_attempt_creator: Self::TransformAttemptCreator,
125 consume_attempt_creator: Self::ConsumeAttemptCreator,
126 ) -> (Self, ProcessorHandles<TR, TA, CA>) {
127 let init_config_mutex_guard = init_config.lock().await;
128 let transform_request_channel_size = init_config_mutex_guard
129 .get("processor.transform_request_channel_size".to_string())
130 .await
131 .expect("Failed to get transform request channel size");
132
133 let size: toml::Value =
134 serde_json::from_slice(&transform_request_channel_size).expect("Failed to parse size");
135
136 let transform_request_channel_size =
137 size.as_integer().expect("Failed to parse channel size") as usize;
138
139 let transform_attempt_channel_size = init_config_mutex_guard
140 .get("processor.transform_attempt_channel_size".to_string())
141 .await
142 .expect("Failed to get transform attempt channel size");
143
144 let size: toml::Value = serde_json::from_slice(&transform_attempt_channel_size)
145 .expect("Failed to parse transform attempt channel size");
146
147 let transform_attempt_channel_size =
148 size.as_integer().expect("Failed to parse channel size") as usize;
149
150 let consume_attempt_channel_size = init_config_mutex_guard
151 .get("processor.consume_attempt_channel_size".to_string())
152 .await
153 .expect("Failed to get consume attempt channel size");
154
155 let size: toml::Value = serde_json::from_slice(&consume_attempt_channel_size)
156 .expect("failed to parse consume attempt channel size");
157
158 let consume_attempt_channel_size =
159 size.as_integer().expect("Failed to parse channel size") as usize;
160
161 let max_in_process_transform_attempts = init_config_mutex_guard
162 .get("processor.max_in_process_transform_attempts".to_string())
163 .await
164 .expect("Failed to get max_in_process_transform_attempts");
165
166 let size: toml::Value = serde_json::from_slice(&max_in_process_transform_attempts)
167 .expect("Failed to parse max in process transform attempts");
168
169 let max_in_process_transform_attempts =
170 size.as_integer().expect("Failed to parse channel size") as u32;
171
172 drop(init_config_mutex_guard);
173
174 let (emitter_output_sender, emitter_output_recv) =
175 mpsc::channel(transform_request_channel_size);
176 let (emitter_hints, emitter_hints_recv) = mpsc::channel(1);
177
178 let (worker_manager_input_sender, worker_manager_input_recv) =
179 mpsc::channel(transform_attempt_channel_size);
180 let (worker_manager_output_sender, worker_manager_output_recv) =
181 mpsc::channel(transform_attempt_channel_size);
182
183 let (consumer_input_sender, consumer_input_recv) =
184 mpsc::channel(consume_attempt_channel_size);
185 let (consumer_output_sender, consumer_output_recv) =
186 mpsc::channel(consume_attempt_channel_size);
187 let (kill_signal_sender, kill_signal_receiver) = mpsc::channel(1);
188
189 (
190 Self {
191 emitter_output_recv,
192 _emitter_hints: emitter_hints,
193 _emitter_state: EmissionState::Operational,
194 current_in_process_transform_attempts: 0,
195 worker_manager_input_sender,
196 worker_manager_output_recv,
197 consumer_input_sender,
198 consumer_output_recv,
199 kill_signal_receiver,
200 database,
201 transform_attempt_creator,
202 consume_attempt_creator,
203 _max_in_process_transform_attempts: max_in_process_transform_attempts,
204 _config: init_config.clone(),
205 },
206 ProcessorHandles {
207 emitter_output_sender,
208 emitter_hints_recv,
209 worker_manager_input_recv,
210 worker_manager_output_sender,
211 consumer_input_recv,
212 consumer_output_sender,
213 kill_signal_sender,
214 },
215 )
216 }
217
218 async fn processor_loop(&mut self) {
219 loop {
220 let _res = tokio::select! {
221 Some(transform_request) = self.emitter_output_recv.recv() => {
222 self.process_emitter_output(transform_request).await
223 },
224 Some(worker_output) = self.worker_manager_output_recv.recv() => {
225 self.process_worker_output(worker_output).await
226 },
227 Some(consume_output) = self.consumer_output_recv.recv() => {
228 self.process_consumer_output(consume_output).await
229 },
230 Some(_) = self.kill_signal_receiver.recv() => {
231 return;
233 },
234 };
235 }
236 }
237}
238
239impl<CFG, TR, TA, TAC, CA, CAC, DB> SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
240where
241 CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
242 TR: TransformRequest,
243 TA: TransformAttempt<
244 TransformRequestIdentifier = TR::Identifier,
245 CallArgsType = TR::Input,
246 ReturnType = TR::Output,
247 >,
248 TAC: TransformAttemptCreator<
249 TransformRequest = TR,
250 TransformAttempt = TA,
251 Input = TR::Input,
252 Output = TR::Output,
253 >,
254 CA: ConsumeAttempt<
255 TransformRequestIdentifier = TR::Identifier,
256 TransformAttemptIdentifier = TA::Identifier,
257 ConsumeVal = TR::Output,
258 >,
259 CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
260 DB: Database<
261 TransformRequest = TR,
262 TransformAttempt = TA,
263 ConsumeAttempt = CA,
264 Input = TR::Input,
265 Output = TR::Output,
266 >,
267{
268 async fn process_emitter_output(
269 &mut self,
270 transform_request: TR,
271 ) -> Result<(), <Self as Processor>::ProcessorError> {
272 self.database
273 .register_transform_request(&transform_request)
274 .await
275 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
276
277 let new_attempt = self
294 .transform_attempt_creator
295 .create_new_attempt(&transform_request)
296 .await
297 .map_err(|e| ProcessorError::TransformAttemptCreationFailed(e.to_string()))?;
298
299 self.current_in_process_transform_attempts =
300 self.current_in_process_transform_attempts.saturating_add(1);
301
302 self.database
303 .register_transform_attempt(&new_attempt)
304 .await
305 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
306
307 self.worker_manager_input_sender
308 .send(new_attempt)
309 .await
310 .map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
311
312 Ok(())
313 }
314
315 async fn process_worker_output(
316 &mut self,
317 worker_output: WorkerManagerResult<TA>,
318 ) -> Result<(), <Self as Processor>::ProcessorError> {
319 self.database
320 .update_transform_attempt(&worker_output)
321 .await
322 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
323
324 match worker_output {
325 WorkerManagerResult::Success(transform_attempt_identifier, return_package) => {
326 let consume_attempt = self
327 .consume_attempt_creator
328 .create_new_attempt(&TA::from_return_package(
329 transform_attempt_identifier.clone(),
330 return_package,
331 ))
332 .await;
333
334 if let Err(e) = consume_attempt {
335 log::error!(
336 "Failed to create consume attempt for transform attempt {:?}: {}",
337 transform_attempt_identifier,
338 e
339 );
340 self.database
342 .archive_request_with_id(&transform_attempt_identifier.into())
343 .await
344 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
345 return Ok(());
346 }
347
348 let consume_attempt = consume_attempt.unwrap();
349
350 self.database
351 .register_consume_attempt(&consume_attempt)
352 .await
353 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
354
355 self.consumer_input_sender
356 .send(consume_attempt)
357 .await
358 .map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
359 }
360 WorkerManagerResult::Failure(transform_attempt_identifier, return_package) => {
361 let reattempt = self
362 .transform_attempt_creator
363 .create_new_reattempt(transform_attempt_identifier.clone(), return_package)
364 .await;
365
366 if let Err(e) = reattempt {
367 log::error!(
368 "Failed to create reattempt for transform attempt {:?}: {}",
369 transform_attempt_identifier,
370 e
371 );
372 self.database
375 .archive_request_with_id(&transform_attempt_identifier.into())
376 .await
377 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
378
379 return Ok(());
380 }
381
382 let reattempt = reattempt.unwrap();
383
384 self.database
385 .register_transform_attempt(&reattempt)
386 .await
387 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
388
389 self.worker_manager_input_sender
390 .send(reattempt)
391 .await
392 .map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
393 }
394 }
395 Ok(())
396 }
397
398 async fn process_consumer_output(
399 &mut self,
400 consume_output: ConsumeAttemptResult<CA>,
401 ) -> Result<(), <Self as Processor>::ProcessorError> {
402 self.database
403 .update_consume_attempt(consume_output.clone())
404 .await
405 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
406
407 match consume_output {
408 ConsumeAttemptResult::Success(consume_id, _return_ctx) => {
409 self.database
411 .archive_request_with_id(&consume_id.into())
412 .await
413 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
414 }
415 ConsumeAttemptResult::Failure(consume_id, return_ctx) => {
416 let reattempt = self
417 .consume_attempt_creator
418 .create_new_reattempt(consume_id.clone().into(), return_ctx)
419 .await;
420
421 if let Err(e) = reattempt {
422 log::error!(
423 "Failed to create reattempt for consume attempt {:?}: {}",
424 consume_id,
425 e
426 );
427 self.database
429 .archive_request_with_id(&consume_id.into())
430 .await
431 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
432
433 return Ok(());
434 }
435
436 let reattempt = reattempt.unwrap();
437
438 self.database
439 .register_consume_attempt(&reattempt)
440 .await
441 .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
442
443 self.consumer_input_sender
444 .send(reattempt)
445 .await
446 .map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
447 }
448 }
449 Ok(())
450 }
451}