use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::{Mutex, mpsc};
use crate::config::Config;
use crate::consumer::consumer::ConsumeAttemptResult;
use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator};
use crate::database::Database;
use crate::emitter::emitter::EmissionState;
use crate::processor::Processor;
use crate::processor::processor::ProcessorHandles;
use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
use crate::worker::worker_manager::WorkerManagerResult;
pub struct SimpleProcessor<CFG: Config, TR, TA: TransformAttempt, TAC, CA: ConsumeAttempt, CAC, DB>
{
emitter_output_recv: mpsc::Receiver<TR>,
_emitter_hints: mpsc::Sender<EmissionState>,
_emitter_state: EmissionState,
current_in_process_transform_attempts: u32,
worker_manager_input_sender: mpsc::Sender<TA>,
worker_manager_output_recv: mpsc::Receiver<WorkerManagerResult<TA>>,
consumer_input_sender: mpsc::Sender<CA>,
consumer_output_recv: mpsc::Receiver<ConsumeAttemptResult<CA>>,
kill_signal_receiver: mpsc::Receiver<()>,
database: DB,
transform_attempt_creator: TAC,
consume_attempt_creator: CAC,
_max_in_process_transform_attempts: u32,
_config: Arc<Mutex<CFG>>,
}
#[derive(Error, Debug)]
pub enum ProcessorError {
#[error("max in-process transform attempts reached")]
MaxInProcessTransformAttemptsReached,
#[error("failed creating transform attempt: {0}")]
TransformAttemptCreationFailed(String),
#[error("failed sending to worker manager input channel")]
WorkerManagerInputSendFailed(String),
#[error("failed creating consumption attempt: {0}")]
ConsumeAttemptCreationFailed(String),
#[error("failed sending to consumer input channel")]
ConsumerInputSendFailed(String),
#[error("database error: {0}")]
DatabaseError(String),
#[error("unknown error occurred")]
Unknown,
}
#[async_trait]
impl<CFG, TR, TA, TAC, CA, CAC, DB> Processor for SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
where
CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
TR: TransformRequest,
TA: TransformAttempt<
TransformRequestIdentifier = TR::Identifier,
CallArgsType = TR::Input,
ReturnType = TR::Output,
>,
TAC: TransformAttemptCreator<
TransformRequest = TR,
TransformAttempt = TA,
Input = TR::Input,
Output = TR::Output,
>,
CA: ConsumeAttempt<
TransformRequestIdentifier = TR::Identifier,
TransformAttemptIdentifier = TA::Identifier,
ConsumeVal = TR::Output,
>,
CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
DB: Database<
TransformRequest = TR,
TransformAttempt = TA,
ConsumeAttempt = CA,
Input = TR::Input,
Output = TR::Output,
>,
{
type Config = CFG;
type ConsumeAttempt = CA;
type ConsumeAttemptCreator = CAC;
type Database = DB;
type Input = TR::Input;
type Output = TR::Output;
type ProcessorError = ProcessorError;
type TransformAttempt = TA;
type TransformAttemptCreator = TAC;
type TransformRequest = TR;
async fn new(
init_config: Arc<Mutex<CFG>>,
database: Self::Database,
transform_attempt_creator: Self::TransformAttemptCreator,
consume_attempt_creator: Self::ConsumeAttemptCreator,
) -> (Self, ProcessorHandles<TR, TA, CA>) {
let init_config_mutex_guard = init_config.lock().await;
let transform_request_channel_size = init_config_mutex_guard
.get("processor.transform_request_channel_size".to_string())
.await
.expect("Failed to get transform request channel size");
let size: toml::Value =
serde_json::from_slice(&transform_request_channel_size).expect("Failed to parse size");
let transform_request_channel_size =
size.as_integer().expect("Failed to parse channel size") as usize;
let transform_attempt_channel_size = init_config_mutex_guard
.get("processor.transform_attempt_channel_size".to_string())
.await
.expect("Failed to get transform attempt channel size");
let size: toml::Value = serde_json::from_slice(&transform_attempt_channel_size)
.expect("Failed to parse transform attempt channel size");
let transform_attempt_channel_size =
size.as_integer().expect("Failed to parse channel size") as usize;
let consume_attempt_channel_size = init_config_mutex_guard
.get("processor.consume_attempt_channel_size".to_string())
.await
.expect("Failed to get consume attempt channel size");
let size: toml::Value = serde_json::from_slice(&consume_attempt_channel_size)
.expect("failed to parse consume attempt channel size");
let consume_attempt_channel_size =
size.as_integer().expect("Failed to parse channel size") as usize;
let max_in_process_transform_attempts = init_config_mutex_guard
.get("processor.max_in_process_transform_attempts".to_string())
.await
.expect("Failed to get max_in_process_transform_attempts");
let size: toml::Value = serde_json::from_slice(&max_in_process_transform_attempts)
.expect("Failed to parse max in process transform attempts");
let max_in_process_transform_attempts =
size.as_integer().expect("Failed to parse channel size") as u32;
drop(init_config_mutex_guard);
let (emitter_output_sender, emitter_output_recv) =
mpsc::channel(transform_request_channel_size);
let (emitter_hints, emitter_hints_recv) = mpsc::channel(1);
let (worker_manager_input_sender, worker_manager_input_recv) =
mpsc::channel(transform_attempt_channel_size);
let (worker_manager_output_sender, worker_manager_output_recv) =
mpsc::channel(transform_attempt_channel_size);
let (consumer_input_sender, consumer_input_recv) =
mpsc::channel(consume_attempt_channel_size);
let (consumer_output_sender, consumer_output_recv) =
mpsc::channel(consume_attempt_channel_size);
let (kill_signal_sender, kill_signal_receiver) = mpsc::channel(1);
(
Self {
emitter_output_recv,
_emitter_hints: emitter_hints,
_emitter_state: EmissionState::Operational,
current_in_process_transform_attempts: 0,
worker_manager_input_sender,
worker_manager_output_recv,
consumer_input_sender,
consumer_output_recv,
kill_signal_receiver,
database,
transform_attempt_creator,
consume_attempt_creator,
_max_in_process_transform_attempts: max_in_process_transform_attempts,
_config: init_config.clone(),
},
ProcessorHandles {
emitter_output_sender,
emitter_hints_recv,
worker_manager_input_recv,
worker_manager_output_sender,
consumer_input_recv,
consumer_output_sender,
kill_signal_sender,
},
)
}
async fn processor_loop(&mut self) {
loop {
let _res = tokio::select! {
Some(transform_request) = self.emitter_output_recv.recv() => {
self.process_emitter_output(transform_request).await
},
Some(worker_output) = self.worker_manager_output_recv.recv() => {
self.process_worker_output(worker_output).await
},
Some(consume_output) = self.consumer_output_recv.recv() => {
self.process_consumer_output(consume_output).await
},
Some(_) = self.kill_signal_receiver.recv() => {
return;
},
};
}
}
}
impl<CFG, TR, TA, TAC, CA, CAC, DB> SimpleProcessor<CFG, TR, TA, TAC, CA, CAC, DB>
where
CFG: Config<KeyType = String, ValueType = Vec<u8>> + Send + Sync + 'static,
TR: TransformRequest,
TA: TransformAttempt<
TransformRequestIdentifier = TR::Identifier,
CallArgsType = TR::Input,
ReturnType = TR::Output,
>,
TAC: TransformAttemptCreator<
TransformRequest = TR,
TransformAttempt = TA,
Input = TR::Input,
Output = TR::Output,
>,
CA: ConsumeAttempt<
TransformRequestIdentifier = TR::Identifier,
TransformAttemptIdentifier = TA::Identifier,
ConsumeVal = TR::Output,
>,
CAC: ConsumeAttemptCreator<TransformAttempt = TA, ConsumeAttempt = CA, Output = TR::Output>,
DB: Database<
TransformRequest = TR,
TransformAttempt = TA,
ConsumeAttempt = CA,
Input = TR::Input,
Output = TR::Output,
>,
{
async fn process_emitter_output(
&mut self,
transform_request: TR,
) -> Result<(), <Self as Processor>::ProcessorError> {
self.database
.register_transform_request(&transform_request)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
let new_attempt = self
.transform_attempt_creator
.create_new_attempt(&transform_request)
.await
.map_err(|e| ProcessorError::TransformAttemptCreationFailed(e.to_string()))?;
self.current_in_process_transform_attempts =
self.current_in_process_transform_attempts.saturating_add(1);
self.database
.register_transform_attempt(&new_attempt)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
self.worker_manager_input_sender
.send(new_attempt)
.await
.map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
Ok(())
}
async fn process_worker_output(
&mut self,
worker_output: WorkerManagerResult<TA>,
) -> Result<(), <Self as Processor>::ProcessorError> {
self.database
.update_transform_attempt(&worker_output)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
match worker_output {
WorkerManagerResult::Success(transform_attempt_identifier, return_package) => {
let consume_attempt = self
.consume_attempt_creator
.create_new_attempt(&TA::from_return_package(
transform_attempt_identifier.clone(),
return_package,
))
.await;
if let Err(e) = consume_attempt {
log::error!(
"Failed to create consume attempt for transform attempt {:?}: {}",
transform_attempt_identifier,
e
);
self.database
.archive_request_with_id(&transform_attempt_identifier.into())
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
return Ok(());
}
let consume_attempt = consume_attempt.unwrap();
self.database
.register_consume_attempt(&consume_attempt)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
self.consumer_input_sender
.send(consume_attempt)
.await
.map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
}
WorkerManagerResult::Failure(transform_attempt_identifier, return_package) => {
let reattempt = self
.transform_attempt_creator
.create_new_reattempt(transform_attempt_identifier.clone(), return_package)
.await;
if let Err(e) = reattempt {
log::error!(
"Failed to create reattempt for transform attempt {:?}: {}",
transform_attempt_identifier,
e
);
self.database
.archive_request_with_id(&transform_attempt_identifier.into())
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
return Ok(());
}
let reattempt = reattempt.unwrap();
self.database
.register_transform_attempt(&reattempt)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
self.worker_manager_input_sender
.send(reattempt)
.await
.map_err(|e| ProcessorError::WorkerManagerInputSendFailed(e.to_string()))?;
}
}
Ok(())
}
async fn process_consumer_output(
&mut self,
consume_output: ConsumeAttemptResult<CA>,
) -> Result<(), <Self as Processor>::ProcessorError> {
self.database
.update_consume_attempt(consume_output.clone())
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
match consume_output {
ConsumeAttemptResult::Success(consume_id, _return_ctx) => {
self.database
.archive_request_with_id(&consume_id.into())
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
}
ConsumeAttemptResult::Failure(consume_id, return_ctx) => {
let reattempt = self
.consume_attempt_creator
.create_new_reattempt(consume_id.clone().into(), return_ctx)
.await;
if let Err(e) = reattempt {
log::error!(
"Failed to create reattempt for consume attempt {:?}: {}",
consume_id,
e
);
self.database
.archive_request_with_id(&consume_id.into())
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
return Ok(());
}
let reattempt = reattempt.unwrap();
self.database
.register_consume_attempt(&reattempt)
.await
.map_err(|e| ProcessorError::DatabaseError(e.to_string()))?;
self.consumer_input_sender
.send(reattempt)
.await
.map_err(|e| ProcessorError::ConsumerInputSendFailed(e.to_string()))?;
}
}
Ok(())
}
}