Skip to main content

shepherd_rs/processor/
simple_processor.rs

1//! # Simple Processor
2//!
3//! This module provides a simple implementation of the `Processor` trait.
4//!
5//! ## Overview
6//! - **SimpleProcessor**: Processes transformation and consumption attempts.
7//! - **Error Handling**: Defines custom error types for processing operations.
8//!
9//! ## Example
10//! ```rust
11//! let processor = SimpleProcessor::new();
12//! processor.processor_loop();
13//! ```
14
15use std::fmt::Debug;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use thiserror::Error;
20use tokio::sync::{Mutex, mpsc};
21
22use crate::config::Config;
23use crate::consumer::consumer::ConsumeAttemptResult;
24use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator};
25use crate::database::Database;
26use crate::emitter::emitter::EmissionState;
27use crate::processor::Processor;
28use crate::processor::processor::ProcessorHandles;
29use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
30use crate::worker::worker_manager::WorkerManagerResult;
31
32pub struct SimpleProcessor<CFG: Config, TR, TA: TransformAttempt, TAC, CA: ConsumeAttempt, CAC, DB>
33{
34    emitter_output_recv: mpsc::Receiver<TR>,
35    _emitter_hints: mpsc::Sender<EmissionState>,
36    _emitter_state: EmissionState,
37
38    current_in_process_transform_attempts: u32,
39    worker_manager_input_sender: mpsc::Sender<TA>,
40    worker_manager_output_recv: mpsc::Receiver<WorkerManagerResult<TA>>,
41
42    consumer_input_sender: mpsc::Sender<CA>,
43    consumer_output_recv: mpsc::Receiver<ConsumeAttemptResult<CA>>,
44
45    kill_signal_receiver: mpsc::Receiver<()>,
46
47    database: DB,
48
49    transform_attempt_creator: TAC,
50    consume_attempt_creator: CAC,
51
52    _max_in_process_transform_attempts: u32,
53    _config: Arc<Mutex<CFG>>,
54}
55
56#[derive(Error, Debug)]
57pub enum ProcessorError {
58    #[error("max in-process transform attempts reached")]
59    MaxInProcessTransformAttemptsReached,
60
61    #[error("failed creating transform attempt: {0}")]
62    TransformAttemptCreationFailed(String),
63
64    #[error("failed sending to worker manager input channel")]
65    WorkerManagerInputSendFailed(String),
66
67    #[error("failed creating consumption attempt: {0}")]
68    ConsumeAttemptCreationFailed(String),
69
70    #[error("failed sending to consumer input channel")]
71    ConsumerInputSendFailed(String),
72
73    #[error("database error: {0}")]
74    DatabaseError(String),
75
76    #[error("unknown error occurred")]
77    Unknown,
78}
79
80#[async_trait]
81impl<CFG, TR, TA, TAC, CA, CAC, DB> Processor for SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
82where
83    CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
84    TR: TransformRequest,
85    TA: TransformAttempt<
86            TransformRequestIdentifier = TR::Identifier,
87            CallArgsType = TR::Input,
88            ReturnType = TR::Output,
89        >,
90    TAC: TransformAttemptCreator<
91            TransformRequest = TR,
92            TransformAttempt = TA,
93            Input = TR::Input,
94            Output = TR::Output,
95        >,
96    CA: ConsumeAttempt<
97            TransformRequestIdentifier = TR::Identifier,
98            TransformAttemptIdentifier = TA::Identifier,
99            ConsumeVal = TR::Output,
100        >,
101    CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
102    DB: Database<
103            TransformRequest = TR,
104            TransformAttempt = TA,
105            ConsumeAttempt = CA,
106            Input = TR::Input,
107            Output = TR::Output,
108        >,
109{
110    type Config = CFG;
111    type ConsumeAttempt = CA;
112    type ConsumeAttemptCreator = CAC;
113    type Database = DB;
114    type Input = TR::Input;
115    type Output = TR::Output;
116    type ProcessorError = ProcessorError;
117    type TransformAttempt = TA;
118    type TransformAttemptCreator = TAC;
119    type TransformRequest = TR;
120
121    async fn new(
122        init_config: Arc<Mutex<CFG>>,
123        database: Self::Database,
124        transform_attempt_creator: Self::TransformAttemptCreator,
125        consume_attempt_creator: Self::ConsumeAttemptCreator,
126    ) -> (Self, ProcessorHandles<TR, TA, CA>) {
127        let init_config_mutex_guard = init_config.lock().await;
128        let transform_request_channel_size = init_config_mutex_guard
129            .get("processor.transform_request_channel_size".to_string())
130            .await
131            .expect("Failed to get transform request channel size");
132
133        let size: toml::Value =
134            serde_json::from_slice(&transform_request_channel_size).expect("Failed to parse size");
135
136        let transform_request_channel_size =
137            size.as_integer().expect("Failed to parse channel size") as usize;
138
139        let transform_attempt_channel_size = init_config_mutex_guard
140            .get("processor.transform_attempt_channel_size".to_string())
141            .await
142            .expect("Failed to get transform attempt channel size");
143
144        let size: toml::Value = serde_json::from_slice(&transform_attempt_channel_size)
145            .expect("Failed to parse transform attempt channel size");
146
147        let transform_attempt_channel_size =
148            size.as_integer().expect("Failed to parse channel size") as usize;
149
150        let consume_attempt_channel_size = init_config_mutex_guard
151            .get("processor.consume_attempt_channel_size".to_string())
152            .await
153            .expect("Failed to get consume attempt channel size");
154
155        let size: toml::Value = serde_json::from_slice(&consume_attempt_channel_size)
156            .expect("failed to parse consume attempt channel size");
157
158        let consume_attempt_channel_size =
159            size.as_integer().expect("Failed to parse channel size") as usize;
160
161        let max_in_process_transform_attempts = init_config_mutex_guard
162            .get("processor.max_in_process_transform_attempts".to_string())
163            .await
164            .expect("Failed to get max_in_process_transform_attempts");
165
166        let size: toml::Value = serde_json::from_slice(&max_in_process_transform_attempts)
167            .expect("Failed to parse max in process transform attempts");
168
169        let max_in_process_transform_attempts =
170            size.as_integer().expect("Failed to parse channel size") as u32;
171
172        drop(init_config_mutex_guard);
173
174        let (emitter_output_sender, emitter_output_recv) =
175            mpsc::channel(transform_request_channel_size);
176        let (emitter_hints, emitter_hints_recv) = mpsc::channel(1);
177
178        let (worker_manager_input_sender, worker_manager_input_recv) =
179            mpsc::channel(transform_attempt_channel_size);
180        let (worker_manager_output_sender, worker_manager_output_recv) =
181            mpsc::channel(transform_attempt_channel_size);
182
183        let (consumer_input_sender, consumer_input_recv) =
184            mpsc::channel(consume_attempt_channel_size);
185        let (consumer_output_sender, consumer_output_recv) =
186            mpsc::channel(consume_attempt_channel_size);
187        let (kill_signal_sender, kill_signal_receiver) = mpsc::channel(1);
188
189        (
190            Self {
191                emitter_output_recv,
192                _emitter_hints: emitter_hints,
193                _emitter_state: EmissionState::Operational,
194                current_in_process_transform_attempts: 0,
195                worker_manager_input_sender,
196                worker_manager_output_recv,
197                consumer_input_sender,
198                consumer_output_recv,
199                kill_signal_receiver,
200                database,
201                transform_attempt_creator,
202                consume_attempt_creator,
203                _max_in_process_transform_attempts: max_in_process_transform_attempts,
204                _config: init_config.clone(),
205            },
206            ProcessorHandles {
207                emitter_output_sender,
208                emitter_hints_recv,
209                worker_manager_input_recv,
210                worker_manager_output_sender,
211                consumer_input_recv,
212                consumer_output_sender,
213                kill_signal_sender,
214            },
215        )
216    }
217
218    async fn processor_loop(&mut self) {
219        loop {
220            let _res = tokio::select! {
221                Some(transform_request) = self.emitter_output_recv.recv() => {
222                    self.process_emitter_output(transform_request).await
223                },
224                Some(worker_output) = self.worker_manager_output_recv.recv() => {
225                    self.process_worker_output(worker_output).await
226                },
227                Some(consume_output) = self.consumer_output_recv.recv() => {
228                    self.process_consumer_output(consume_output).await
229                },
230                Some(_) = self.kill_signal_receiver.recv() => {
231                    // Handle the kill signal to stop processing
232                    return;
233                },
234            };
235        }
236    }
237}
238
239impl<CFG, TR, TA, TAC, CA, CAC, DB> SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
240where
241    CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
242    TR: TransformRequest,
243    TA: TransformAttempt<
244            TransformRequestIdentifier = TR::Identifier,
245            CallArgsType = TR::Input,
246            ReturnType = TR::Output,
247        >,
248    TAC: TransformAttemptCreator<
249            TransformRequest = TR,
250            TransformAttempt = TA,
251            Input = TR::Input,
252            Output = TR::Output,
253        >,
254    CA: ConsumeAttempt<
255            TransformRequestIdentifier = TR::Identifier,
256            TransformAttemptIdentifier = TA::Identifier,
257            ConsumeVal = TR::Output,
258        >,
259    CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
260    DB: Database<
261            TransformRequest = TR,
262            TransformAttempt = TA,
263            ConsumeAttempt = CA,
264            Input = TR::Input,
265            Output = TR::Output,
266        >,
267{
268    async fn process_emitter_output(
269        &mut self,
270        transform_request: TR,
271    ) -> Result<(), <Self as Processor>::ProcessorError> {
272        self.database
273            .register_transform_request(&transform_request)
274            .await
275            .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
276
277        // // Check if the emitter is operational
278        // // If the emitter is operational, but the current in-process attempts
279        // // have reached the maximum allowed, we should halt the emitter.
280        // // If the emitter is already halted, we should not process any more
281        // // requests
282        // if self.current_in_process_transform_attempts >=
283        // self.max_in_process_transform_attempts {     if self.emitter_state !=
284        // EmissionState::Halt {         self.emitter_hints.
285        // send(EmissionState::Halt).await.unwrap();         self.emitter_state
286        // = EmissionState::Halt;     }
287        //     // TODO: Send to data layer regardless, add to backlog or similar
288        //     return Err(ProcessorError::MaxInProcessTransformAttemptsReached);
289        // }
290
291        // In case we still have capacity for transform attempts,
292        // create an attempt and send it to the worker
293        let new_attempt = self
294            .transform_attempt_creator
295            .create_new_attempt(&transform_request)
296            .await
297            .map_err(|e| ProcessorError::TransformAttemptCreationFailed(e.to_string()))?;
298
299        self.current_in_process_transform_attempts =
300            self.current_in_process_transform_attempts.saturating_add(1);
301
302        self.database
303            .register_transform_attempt(&new_attempt)
304            .await
305            .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
306
307        self.worker_manager_input_sender
308            .send(new_attempt)
309            .await
310            .map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
311
312        Ok(())
313    }
314
315    async fn process_worker_output(
316        &mut self,
317        worker_output: WorkerManagerResult<TA>,
318    ) -> Result<(), <Self as Processor>::ProcessorError> {
319        self.database
320            .update_transform_attempt(&worker_output)
321            .await
322            .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
323
324        match worker_output {
325            WorkerManagerResult::Success(transform_attempt_identifier, return_package) => {
326                let consume_attempt = self
327                    .consume_attempt_creator
328                    .create_new_attempt(&TA::from_return_package(
329                        transform_attempt_identifier.clone(),
330                        return_package,
331                    ))
332                    .await;
333
334                if let Err(e) = consume_attempt {
335                    log::error!(
336                        "Failed to create consume attempt for transform attempt {:?}: {}",
337                        transform_attempt_identifier,
338                        e
339                    );
340                    // Gracefully handle the error, archive since we can't do anything else
341                    self.database
342                        .archive_request_with_id(&transform_attempt_identifier.into())
343                        .await
344                        .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
345                    return Ok(());
346                }
347
348                let consume_attempt = consume_attempt.unwrap();
349
350                self.database
351                    .register_consume_attempt(&consume_attempt)
352                    .await
353                    .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
354
355                self.consumer_input_sender
356                    .send(consume_attempt)
357                    .await
358                    .map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
359            }
360            WorkerManagerResult::Failure(transform_attempt_identifier, return_package) => {
361                let reattempt = self
362                    .transform_attempt_creator
363                    .create_new_reattempt(transform_attempt_identifier.clone(), return_package)
364                    .await;
365
366                if let Err(e) = reattempt {
367                    log::error!(
368                        "Failed to create reattempt for transform attempt {:?}: {}",
369                        transform_attempt_identifier,
370                        e
371                    );
372                    // Gracefully handle the error, archive since we can't do anything else
373
374                    self.database
375                        .archive_request_with_id(&transform_attempt_identifier.into())
376                        .await
377                        .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
378
379                    return Ok(());
380                }
381
382                let reattempt = reattempt.unwrap();
383
384                self.database
385                    .register_transform_attempt(&reattempt)
386                    .await
387                    .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
388
389                self.worker_manager_input_sender
390                    .send(reattempt)
391                    .await
392                    .map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
393            }
394        }
395        Ok(())
396    }
397
398    async fn process_consumer_output(
399        &mut self,
400        consume_output: ConsumeAttemptResult<CA>,
401    ) -> Result<(), <Self as Processor>::ProcessorError> {
402        self.database
403            .update_consume_attempt(consume_output.clone())
404            .await
405            .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
406
407        match consume_output {
408            ConsumeAttemptResult::Success(consume_id, _return_ctx) => {
409                // just archive the request
410                self.database
411                    .archive_request_with_id(&consume_id.into())
412                    .await
413                    .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
414            }
415            ConsumeAttemptResult::Failure(consume_id, return_ctx) => {
416                let reattempt = self
417                    .consume_attempt_creator
418                    .create_new_reattempt(consume_id.clone().into(), return_ctx)
419                    .await;
420
421                if let Err(e) = reattempt {
422                    log::error!(
423                        "Failed to create reattempt for consume attempt {:?}: {}",
424                        consume_id,
425                        e
426                    );
427                    // Gracefully handle the error, archive since we can't do anything else
428                    self.database
429                        .archive_request_with_id(&consume_id.into())
430                        .await
431                        .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
432
433                    return Ok(());
434                }
435
436                let reattempt = reattempt.unwrap();
437
438                self.database
439                    .register_consume_attempt(&reattempt)
440                    .await
441                    .map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
442
443                self.consumer_input_sender
444                    .send(reattempt)
445                    .await
446                    .map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
447            }
448        }
449        Ok(())
450    }
451}