shepherd-rs 0.2.0

Shepherd is a resilient, non-blocking orchestrator that persistently transforms and delivers data—built for remote, compute-heavy workloads.
Documentation
//! # Processor Trait
//!
//! This trait defines the behavior of processors in the shepherd framework.
//!
//! ## Overview
//! - **Processor**: Manages transformation and consumption attempts.
//! - **Error Handling**: Provides mechanisms for handling processing errors.
//!
//! ## Example
//! ```rust
//! struct MyProcessor;
//!
//! impl Processor for MyProcessor {
//!     // Implementation details...
//! }
//! ```

use std::error::Error;
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::{Mutex, mpsc};

use crate::config::Config;
use crate::consumer::consumer::ConsumeAttemptResult;
use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator};
use crate::database::Database;
use crate::emitter::emitter::EmissionState;
use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
use crate::worker::worker_manager::WorkerManagerResult;

pub struct ProcessorHandles<TR: TransformRequest, TA: TransformAttempt, CA: ConsumeAttempt> {
    pub emitter_output_sender: mpsc::Sender<TR>,
    pub emitter_hints_recv: mpsc::Receiver<EmissionState>,
    pub worker_manager_input_recv: mpsc::Receiver<TA>,
    pub worker_manager_output_sender: mpsc::Sender<WorkerManagerResult<TA>>,
    pub consumer_input_recv: mpsc::Receiver<CA>,
    pub consumer_output_sender: mpsc::Sender<ConsumeAttemptResult<CA>>,
    /// A sender for a kill signal to stop the processor gracefully.
    pub kill_signal_sender: mpsc::Sender<()>,
}

#[async_trait]
pub trait Processor: Send {
    type Input: Send;
    type Output: Send;
    type Config: Config + Send + Sync + 'static;
    type TransformRequest: TransformRequest<Input = Self::Input, Output = Self::Output>;
    type TransformAttempt: TransformAttempt<CallArgsType = Self::Input, ReturnType = Self::Output>;
    type TransformAttemptCreator: TransformAttemptCreator<
            TransformRequest = Self::TransformRequest,
            TransformAttempt = Self::TransformAttempt,
            Input = Self::Input,
            Output = Self::Output,
        >;
    type ConsumeAttempt: ConsumeAttempt<ConsumeVal = Self::Output>;
    type ConsumeAttemptCreator: ConsumeAttemptCreator<
            ConsumeAttempt = Self::ConsumeAttempt,
            TransformAttempt = Self::TransformAttempt,
            Output = Self::Output,
        >;
    type Database: Database<
            TransformRequest = Self::TransformRequest,
            TransformAttempt = Self::TransformAttempt,
            ConsumeAttempt = Self::ConsumeAttempt,
            Input = Self::Input,
            Output = Self::Output,
        >;
    type ProcessorError: Error + Send + Sync;

    async fn new(
        config: Arc<Mutex<Self::Config>>,
        database: Self::Database,
        transform_attempt_creator: Self::TransformAttemptCreator,
        consume_attempt_creator: Self::ConsumeAttemptCreator,
    ) -> (
        Self,
        ProcessorHandles<Self::TransformRequest, Self::TransformAttempt, Self::ConsumeAttempt>,
    )
    where
        Self: Sized;

    /// The main processing loop for the processor.
    /// This is where the main processing logic would go
    /// It would typically involve receiving `TransfomRequest`s from emitter,
    /// processing them, halting the emitter when necessary,
    /// and sending results to the consumer etc.
    async fn processor_loop(&mut self);
}