mod config;
mod cron_api;
mod pipeline_executor_impl;
mod pipeline_result;
mod services;
pub use config::{DefaultRunnerBuilder, DefaultRunnerConfig, DefaultRunnerConfigBuilder};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use crate::dal::DAL;
use crate::dispatcher::{DefaultDispatcher, Dispatcher, RoutingConfig, TaskExecutor};
use crate::executor::pipeline_executor::PipelineError;
use crate::executor::types::ExecutorConfig;
use crate::executor::ThreadTaskExecutor;
use crate::registry::traits::WorkflowRegistry;
use crate::registry::RegistryReconciler;
use crate::Database;
use crate::Scheduler;
use crate::TaskScheduler;
#[must_use = "DefaultRunner runs background tasks; call shutdown() before dropping"]
pub struct DefaultRunner {
pub(super) database: Database,
pub(super) config: DefaultRunnerConfig,
pub(super) scheduler: Arc<TaskScheduler>,
pub(super) runtime_handles: Arc<RwLock<RuntimeHandles>>,
pub(super) cron_recovery: Arc<RwLock<Option<Arc<crate::CronRecoveryService>>>>,
pub(super) workflow_registry: Arc<RwLock<Option<Arc<dyn WorkflowRegistry>>>>,
pub(super) registry_reconciler: Arc<RwLock<Option<Arc<RegistryReconciler>>>>,
pub(super) unified_scheduler: Arc<RwLock<Option<Arc<Scheduler>>>>,
}
pub(super) struct RuntimeHandles {
pub(super) scheduler_handle: Option<tokio::task::JoinHandle<()>>,
pub(super) executor_handle: Option<tokio::task::JoinHandle<()>>,
pub(super) cron_recovery_handle: Option<tokio::task::JoinHandle<()>>,
pub(super) registry_reconciler_handle: Option<tokio::task::JoinHandle<()>>,
pub(super) unified_scheduler_handle: Option<tokio::task::JoinHandle<()>>,
pub(super) shutdown_sender: Option<broadcast::Sender<()>>,
}
impl DefaultRunner {
pub async fn new(database_url: &str) -> Result<Self, PipelineError> {
Self::with_config(database_url, DefaultRunnerConfig::default()).await
}
pub fn builder() -> DefaultRunnerBuilder {
DefaultRunnerBuilder::new()
}
pub async fn with_schema(database_url: &str, schema: &str) -> Result<Self, PipelineError> {
Self::builder()
.database_url(database_url)
.schema(schema)
.build()
.await
}
pub async fn with_config(
database_url: &str,
config: DefaultRunnerConfig,
) -> Result<Self, PipelineError> {
let database = Database::new(database_url, "cloacina", config.db_pool_size());
database
.run_migrations()
.await
.map_err(|e| PipelineError::DatabaseConnection { message: e })?;
let scheduler =
TaskScheduler::with_poll_interval(database.clone(), config.scheduler_poll_interval())
.await
.map_err(|e| PipelineError::Executor(e.into()))?;
let executor_config = ExecutorConfig {
max_concurrent_tasks: config.max_concurrent_tasks(),
task_timeout: config.task_timeout(),
enable_claiming: config.enable_claiming(),
heartbeat_interval: config.heartbeat_interval(),
};
let executor = ThreadTaskExecutor::with_global_registry(database.clone(), executor_config)
.map_err(|e| PipelineError::Configuration {
message: e.to_string(),
})?;
let dal = DAL::new(database.clone());
let routing_config = config
.routing_config()
.cloned()
.unwrap_or_else(RoutingConfig::default);
let dispatcher = DefaultDispatcher::new(dal, routing_config);
dispatcher.register_executor("default", Arc::new(executor) as Arc<dyn TaskExecutor>);
let scheduler = scheduler.with_dispatcher(Arc::new(dispatcher));
let default_runner = Self {
database,
config,
scheduler: Arc::new(scheduler),
runtime_handles: Arc::new(RwLock::new(RuntimeHandles {
scheduler_handle: None,
executor_handle: None,
cron_recovery_handle: None,
registry_reconciler_handle: None,
unified_scheduler_handle: None,
shutdown_sender: None,
})),
cron_recovery: Arc::new(RwLock::new(None)), workflow_registry: Arc::new(RwLock::new(None)), registry_reconciler: Arc::new(RwLock::new(None)), unified_scheduler: Arc::new(RwLock::new(None)), };
default_runner.start_background_services().await?;
Ok(default_runner)
}
pub fn database(&self) -> &Database {
&self.database
}
pub fn dal(&self) -> DAL {
DAL::new(self.database.clone())
}
pub async fn unified_scheduler(&self) -> Option<Arc<Scheduler>> {
self.unified_scheduler.read().await.clone()
}
pub async fn shutdown(&self) -> Result<(), PipelineError> {
let mut handles = self.runtime_handles.write().await;
if let Some(sender) = handles.shutdown_sender.take() {
let _ = sender.send(());
}
if let Some(handle) = handles.scheduler_handle.take() {
let _ = handle.await;
}
if let Some(handle) = handles.executor_handle.take() {
let _ = handle.await;
}
if let Some(handle) = handles.cron_recovery_handle.take() {
let _ = handle.await;
}
if let Some(handle) = handles.registry_reconciler_handle.take() {
let _ = handle.await;
}
if let Some(handle) = handles.unified_scheduler_handle.take() {
let _ = handle.await;
}
self.database.close();
Ok(())
}
}
impl Clone for DefaultRunner {
fn clone(&self) -> Self {
Self {
database: self.database.clone(),
config: self.config.clone(),
scheduler: self.scheduler.clone(),
runtime_handles: self.runtime_handles.clone(),
cron_recovery: self.cron_recovery.clone(),
workflow_registry: self.workflow_registry.clone(),
registry_reconciler: self.registry_reconciler.clone(),
unified_scheduler: self.unified_scheduler.clone(),
}
}
}
impl Drop for DefaultRunner {
fn drop(&mut self) {
tracing::info!("DefaultRunner dropping - consider calling shutdown() explicitly");
}
}