mod config;
mod cron_api;
mod service_manager;
mod services;
mod workflow_executor_impl;
mod workflow_result;
pub use config::{DefaultRunnerBuilder, DefaultRunnerConfig, DefaultRunnerConfigBuilder};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::dal::DAL;
use crate::dispatcher::{DefaultDispatcher, Dispatcher, RoutingConfig, TaskExecutor};
use crate::executor::types::ExecutorConfig;
use crate::executor::workflow_executor::WorkflowExecutionError;
use crate::executor::ThreadTaskExecutor;
use crate::Database;
use crate::Runtime;
use crate::Scheduler;
use crate::TaskScheduler;
use service_manager::ServiceManager;
#[must_use = "DefaultRunner runs background tasks; call shutdown() before dropping"]
pub struct DefaultRunner {
pub(super) runtime: Arc<Runtime>,
pub(super) database: Database,
pub(super) config: DefaultRunnerConfig,
pub(super) scheduler: Arc<TaskScheduler>,
pub(super) service_manager: Arc<RwLock<ServiceManager>>,
}
impl DefaultRunner {
pub async fn new(database_url: &str) -> Result<Self, WorkflowExecutionError> {
Self::with_config(database_url, DefaultRunnerConfig::default()).await
}
pub fn builder() -> DefaultRunnerBuilder {
DefaultRunnerBuilder::new()
}
pub async fn with_schema(
database_url: &str,
schema: &str,
) -> Result<Self, WorkflowExecutionError> {
Self::builder()
.database_url(database_url)
.schema(schema)
.build()
.await
}
pub async fn with_config(
database_url: &str,
config: DefaultRunnerConfig,
) -> Result<Self, WorkflowExecutionError> {
let database = Database::new(database_url, "cloacina", config.db_pool_size());
database
.run_migrations()
.await
.map_err(|e| WorkflowExecutionError::DatabaseConnection { message: e })?;
let runtime = Arc::new(Runtime::new());
let scheduler =
TaskScheduler::with_poll_interval(database.clone(), config.scheduler_poll_interval())
.await
.map_err(|e| WorkflowExecutionError::Executor(e.into()))?
.with_runtime(runtime.clone());
let executor_config = ExecutorConfig {
max_concurrent_tasks: config.max_concurrent_tasks(),
task_timeout: config.task_timeout(),
enable_claiming: config.enable_claiming(),
heartbeat_interval: config.heartbeat_interval(),
};
let executor = ThreadTaskExecutor::with_runtime_and_registry(
database.clone(),
Arc::new(crate::task::TaskRegistry::new()),
runtime.clone(),
executor_config,
);
let dal = DAL::new(database.clone());
let routing_config = config
.routing_config()
.cloned()
.unwrap_or_else(RoutingConfig::default);
let dispatcher = DefaultDispatcher::new(dal, routing_config);
dispatcher.register_executor("default", Arc::new(executor) as Arc<dyn TaskExecutor>);
let scheduler = scheduler.with_dispatcher(Arc::new(dispatcher));
let default_runner = Self {
runtime,
database,
config,
scheduler: Arc::new(scheduler),
service_manager: Arc::new(RwLock::new(ServiceManager::new())),
};
default_runner.start_background_services().await?;
Ok(default_runner)
}
pub fn database(&self) -> &Database {
&self.database
}
pub fn dal(&self) -> DAL {
DAL::new(self.database.clone())
}
pub fn runtime(&self) -> Arc<Runtime> {
self.runtime.clone()
}
pub async fn unified_scheduler(&self) -> Option<Arc<Scheduler>> {
self.service_manager.read().await.unified_scheduler.clone()
}
pub async fn set_graph_scheduler(
&self,
scheduler: Arc<crate::computation_graph::scheduler::ComputationGraphScheduler>,
) {
let slot = self.service_manager.read().await.graph_scheduler.clone();
*slot.write().await = Some(scheduler);
}
pub async fn shutdown(&self) -> Result<(), WorkflowExecutionError> {
self.service_manager.write().await.shutdown_all().await?;
self.database.close();
Ok(())
}
}
impl Clone for DefaultRunner {
fn clone(&self) -> Self {
Self {
runtime: self.runtime.clone(),
database: self.database.clone(),
config: self.config.clone(),
scheduler: self.scheduler.clone(),
service_manager: self.service_manager.clone(),
}
}
}
impl Drop for DefaultRunner {
fn drop(&mut self) {
tracing::info!("DefaultRunner dropping - consider calling shutdown() explicitly");
}
}