shepherd-rs 0.2.0

Shepherd is a resilient, non-blocking orchestrator that persistently transforms and delivers data—built for remote, compute-heavy workloads.
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::config::Config;
use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator, Consumer};
use crate::database::Database;
use crate::emitter::emitter::Emitter;
use crate::instrumentation::instrumentation::Instrumentation;
use crate::processor::Processor;
use crate::processor::processor::ProcessorHandles;
use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
use crate::worker::worker_manager::WorkerManager;

#[async_trait]
pub trait Instance {
    type StaticConfigHandle: Send;
    type Config: Config<StaticConfigHandle = Self::StaticConfigHandle> + Send + Sync + 'static;
    type Input;
    type Output;
    type TransformRequestIdentifier;
    type TransformAttemptIdentifier: Into<Self::TransformRequestIdentifier> + Send;
    type ConsumeAttemptIdentifier: Into<Self::TransformRequestIdentifier>
        + Into<Self::TransformAttemptIdentifier>
        + Send;

    type TransformRequest: TransformRequest<
            Identifier = Self::TransformRequestIdentifier,
            Input = Self::Input,
            Output = Self::Output,
        >;
    type TransformAttempt: TransformAttempt<
            Identifier = Self::TransformAttemptIdentifier,
            TransformRequestIdentifier = Self::TransformRequestIdentifier,
            CallArgsType = Self::Input,
            ReturnType = Self::Output,
        >;
    type ConsumeAttempt: ConsumeAttempt<
            Identifier = Self::ConsumeAttemptIdentifier,
            TransformRequestIdentifier = Self::TransformRequestIdentifier,
            TransformAttemptIdentifier = Self::TransformAttemptIdentifier,
            ConsumeVal = Self::Output,
        >;
    type TransformAttemptCreator: TransformAttemptCreator<
            TransformRequest = Self::TransformRequest,
            TransformAttempt = Self::TransformAttempt,
            Input = Self::Input,
            Output = Self::Output,
        >;
    type ConsumeAttemptCreator: ConsumeAttemptCreator<
            ConsumeAttempt = Self::ConsumeAttempt,
            TransformAttempt = Self::TransformAttempt,
            Output = Self::Output,
        >;

    type Emitter: Emitter<TransformRequest = Self::TransformRequest, Config = Self::Config>
        + 'static;
    type Consumer: Consumer<ConsumeAttempt = Self::ConsumeAttempt, Config = Self::Config> + 'static;
    type WorkerManager: WorkerManager<TransformAttempt = Self::TransformAttempt, Config = Self::Config>
        + 'static;
    type Database: Database<
            Input = Self::Input,
            Output = Self::Output,
            Config = Self::Config,
            TransformRequest = Self::TransformRequest,
            TransformAttempt = Self::TransformAttempt,
            ConsumeAttempt = Self::ConsumeAttempt,
        >;
    type Processor: Processor<
            Input = Self::Input,
            Output = Self::Output,
            Config = Self::Config,
            TransformRequest = Self::TransformRequest,
            TransformAttempt = Self::TransformAttempt,
            ConsumeAttempt = Self::ConsumeAttempt,
            TransformAttemptCreator = Self::TransformAttemptCreator,
            ConsumeAttemptCreator = Self::ConsumeAttemptCreator,
            Database = Self::Database,
        > + 'static;
    type Instrumentation: Instrumentation<
            TransformRequestIdentifier = Self::TransformRequestIdentifier,
            TransformAttemptIdentifier = Self::TransformAttemptIdentifier,
            ConsumeAttemptIdentifier = Self::ConsumeAttemptIdentifier,
            Config = Self::Config,
        > + 'static;

    async fn start(config_handle: Self::StaticConfigHandle) {
        let config = Self::Config::new(config_handle)
            .await
            .expect("Failed to create composite config");
        let config = Arc::new(Mutex::new(config));

        let mut database = Self::Database::new(config.clone())
            .await
            .expect("Failed to create database instance");

        let dyn_configs = database
            .get_dyn_configs()
            .await
            .expect("Failed to fetch dynamic configurations");

        config
            .lock()
            .await
            .set_bulk(dyn_configs)
            .await
            .expect("Failed to set dynamic configurations in config");

        // Create a processor instance and get handles
        let (mut processor, processor_handles) = Self::Processor::new(
            config.clone(),
            database,
            Self::TransformAttemptCreator::new(config.clone()).await,
            Self::ConsumeAttemptCreator::new(config.clone()).await,
        )
        .await;

        let ProcessorHandles {
            emitter_output_sender,
            emitter_hints_recv,
            worker_manager_input_recv,
            worker_manager_output_sender,
            consumer_input_recv,
            consumer_output_sender,
            kill_signal_sender: _,
        } = processor_handles;

        // Create an emitter instance
        let mut emitter =
            Self::Emitter::new(config.clone(), emitter_output_sender, emitter_hints_recv)
                .await
                .expect("Failed to create emitter instance");

        let mut instrumentation = Self::Instrumentation::new(config.clone())
            .await
            .expect("Failed to create instrumentation instance");

        // Create a consumer instance
        let mut consumer =
            Self::Consumer::new(config.clone(), consumer_input_recv, consumer_output_sender)
                .await
                .expect("Failed to create consumer instance");

        let mut worker_manager = Self::WorkerManager::new(
            config,
            worker_manager_input_recv,
            worker_manager_output_sender,
        )
        .await
        .expect("Failed to create worker manager instance");

        // Start all the components
        let processor_join_handle = tokio::spawn(async move { processor.processor_loop().await });

        let instrumentation_join_handle =
            tokio::spawn(async move { instrumentation.instrumentation_loop().await });

        let worker_manager_join_handle =
            tokio::spawn(async move { worker_manager.wm_loop().await });

        let consumer_join_handle = tokio::spawn(async move { consumer.consumer_loop().await });

        let emitter_join_handle = tokio::spawn(async move { emitter.emitter_loop().await });

        // Wait to join all handles
        let _ = processor_join_handle
            .await
            .expect("Processor task panicked");
        let _ = instrumentation_join_handle
            .await
            .expect("Instrumentation task panicked");
        let _ = consumer_join_handle.await.expect("Consumer task panicked");
        let _ = worker_manager_join_handle
            .await
            .expect("Worker manager task panicked");
        let _ = emitter_join_handle.await.expect("Emitter task panicked");
    }
}