use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::config::Config;
use crate::consumer::{ConsumeAttempt, ConsumeAttemptCreator, Consumer};
use crate::database::Database;
use crate::emitter::emitter::Emitter;
use crate::instrumentation::instrumentation::Instrumentation;
use crate::processor::Processor;
use crate::processor::processor::ProcessorHandles;
use crate::transform::{TransformAttempt, TransformAttemptCreator, TransformRequest};
use crate::worker::worker_manager::WorkerManager;
#[async_trait]
pub trait Instance {
type StaticConfigHandle: Send;
type Config: Config<StaticConfigHandle = Self::StaticConfigHandle> + Send + Sync + 'static;
type Input;
type Output;
type TransformRequestIdentifier;
type TransformAttemptIdentifier: Into<Self::TransformRequestIdentifier> + Send;
type ConsumeAttemptIdentifier: Into<Self::TransformRequestIdentifier>
+ Into<Self::TransformAttemptIdentifier>
+ Send;
type TransformRequest: TransformRequest<
Identifier = Self::TransformRequestIdentifier,
Input = Self::Input,
Output = Self::Output,
>;
type TransformAttempt: TransformAttempt<
Identifier = Self::TransformAttemptIdentifier,
TransformRequestIdentifier = Self::TransformRequestIdentifier,
CallArgsType = Self::Input,
ReturnType = Self::Output,
>;
type ConsumeAttempt: ConsumeAttempt<
Identifier = Self::ConsumeAttemptIdentifier,
TransformRequestIdentifier = Self::TransformRequestIdentifier,
TransformAttemptIdentifier = Self::TransformAttemptIdentifier,
ConsumeVal = Self::Output,
>;
type TransformAttemptCreator: TransformAttemptCreator<
TransformRequest = Self::TransformRequest,
TransformAttempt = Self::TransformAttempt,
Input = Self::Input,
Output = Self::Output,
>;
type ConsumeAttemptCreator: ConsumeAttemptCreator<
ConsumeAttempt = Self::ConsumeAttempt,
TransformAttempt = Self::TransformAttempt,
Output = Self::Output,
>;
type Emitter: Emitter<TransformRequest = Self::TransformRequest, Config = Self::Config>
+ 'static;
type Consumer: Consumer<ConsumeAttempt = Self::ConsumeAttempt, Config = Self::Config> + 'static;
type WorkerManager: WorkerManager<TransformAttempt = Self::TransformAttempt, Config = Self::Config>
+ 'static;
type Database: Database<
Input = Self::Input,
Output = Self::Output,
Config = Self::Config,
TransformRequest = Self::TransformRequest,
TransformAttempt = Self::TransformAttempt,
ConsumeAttempt = Self::ConsumeAttempt,
>;
type Processor: Processor<
Input = Self::Input,
Output = Self::Output,
Config = Self::Config,
TransformRequest = Self::TransformRequest,
TransformAttempt = Self::TransformAttempt,
ConsumeAttempt = Self::ConsumeAttempt,
TransformAttemptCreator = Self::TransformAttemptCreator,
ConsumeAttemptCreator = Self::ConsumeAttemptCreator,
Database = Self::Database,
> + 'static;
type Instrumentation: Instrumentation<
TransformRequestIdentifier = Self::TransformRequestIdentifier,
TransformAttemptIdentifier = Self::TransformAttemptIdentifier,
ConsumeAttemptIdentifier = Self::ConsumeAttemptIdentifier,
Config = Self::Config,
> + 'static;
async fn start(config_handle: Self::StaticConfigHandle) {
let config = Self::Config::new(config_handle)
.await
.expect("Failed to create composite config");
let config = Arc::new(Mutex::new(config));
let mut database = Self::Database::new(config.clone())
.await
.expect("Failed to create database instance");
let dyn_configs = database
.get_dyn_configs()
.await
.expect("Failed to fetch dynamic configurations");
config
.lock()
.await
.set_bulk(dyn_configs)
.await
.expect("Failed to set dynamic configurations in config");
let (mut processor, processor_handles) = Self::Processor::new(
config.clone(),
database,
Self::TransformAttemptCreator::new(config.clone()).await,
Self::ConsumeAttemptCreator::new(config.clone()).await,
)
.await;
let ProcessorHandles {
emitter_output_sender,
emitter_hints_recv,
worker_manager_input_recv,
worker_manager_output_sender,
consumer_input_recv,
consumer_output_sender,
kill_signal_sender: _,
} = processor_handles;
let mut emitter =
Self::Emitter::new(config.clone(), emitter_output_sender, emitter_hints_recv)
.await
.expect("Failed to create emitter instance");
let mut instrumentation = Self::Instrumentation::new(config.clone())
.await
.expect("Failed to create instrumentation instance");
let mut consumer =
Self::Consumer::new(config.clone(), consumer_input_recv, consumer_output_sender)
.await
.expect("Failed to create consumer instance");
let mut worker_manager = Self::WorkerManager::new(
config,
worker_manager_input_recv,
worker_manager_output_sender,
)
.await
.expect("Failed to create worker manager instance");
let processor_join_handle = tokio::spawn(async move { processor.processor_loop().await });
let instrumentation_join_handle =
tokio::spawn(async move { instrumentation.instrumentation_loop().await });
let worker_manager_join_handle =
tokio::spawn(async move { worker_manager.wm_loop().await });
let consumer_join_handle = tokio::spawn(async move { consumer.consumer_loop().await });
let emitter_join_handle = tokio::spawn(async move { emitter.emitter_loop().await });
let _ = processor_join_handle
.await
.expect("Processor task panicked");
let _ = instrumentation_join_handle
.await
.expect("Instrumentation task panicked");
let _ = consumer_join_handle.await.expect("Consumer task panicked");
let _ = worker_manager_join_handle
.await
.expect("Worker manager task panicked");
let _ = emitter_join_handle.await.expect("Emitter task panicked");
}
}