use std::error::Error;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{Mutex, mpsc};
use crate::config::Config;
use crate::consumer::consumer::ConsumeAttemptResult;
use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator};
use crate::database::Database;
use crate::emitter::emitter::EmissionState;
use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
use crate::worker::worker_manager::WorkerManagerResult;
pub struct ProcessorHandles<TR: TransformRequest, TA: TransformAttempt, CA: ConsumeAttempt> {
pub emitter_output_sender: mpsc::Sender<TR>,
pub emitter_hints_recv: mpsc::Receiver<EmissionState>,
pub worker_manager_input_recv: mpsc::Receiver<TA>,
pub worker_manager_output_sender: mpsc::Sender<WorkerManagerResult<TA>>,
pub consumer_input_recv: mpsc::Receiver<CA>,
pub consumer_output_sender: mpsc::Sender<ConsumeAttemptResult<CA>>,
pub kill_signal_sender: mpsc::Sender<()>,
}
#[async_trait]
pub trait Processor: Send {
type Input: Send;
type Output: Send;
type Config: Config + Send + Sync + 'static;
type TransformRequest: TransformRequest<Input = Self::Input, Output = Self::Output>;
type TransformAttempt: TransformAttempt<CallArgsType = Self::Input, ReturnType = Self::Output>;
type TransformAttemptCreator: TransformAttemptCreator<
TransformRequest = Self::TransformRequest,
TransformAttempt = Self::TransformAttempt,
Input = Self::Input,
Output = Self::Output,
>;
type ConsumeAttempt: ConsumeAttempt<ConsumeVal = Self::Output>;
type ConsumeAttemptCreator: ConsumeAttemptCreator<
ConsumeAttempt = Self::ConsumeAttempt,
TransformAttempt = Self::TransformAttempt,
Output = Self::Output,
>;
type Database: Database<
TransformRequest = Self::TransformRequest,
TransformAttempt = Self::TransformAttempt,
ConsumeAttempt = Self::ConsumeAttempt,
Input = Self::Input,
Output = Self::Output,
>;
type ProcessorError: Error + Send + Sync;
async fn new(
config: Arc<Mutex<Self::Config>>,
database: Self::Database,
transform_attempt_creator: Self::TransformAttemptCreator,
consume_attempt_creator: Self::ConsumeAttemptCreator,
) -> (
Self,
ProcessorHandles<Self::TransformRequest, Self::TransformAttempt, Self::ConsumeAttempt>,
)
where
Self: Sized;
async fn processor_loop(&mut self);
}