use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::dispatcher::{DefaultDispatcher, Dispatcher, RoutingConfig, TaskExecutor};
use crate::executor::types::ExecutorConfig;
use crate::executor::workflow_executor::WorkflowExecutionError;
use crate::executor::ThreadTaskExecutor;
use crate::Database;
use crate::Runtime;
use crate::TaskScheduler;
use super::service_manager::ServiceManager;
use super::DefaultRunner;
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("Invalid configuration: {0}")]
Invalid(String),
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct DefaultRunnerConfig {
max_concurrent_tasks: usize,
scheduler_poll_interval: Duration,
task_timeout: Duration,
workflow_timeout: Option<Duration>,
db_pool_size: u32,
enable_recovery: bool,
enable_cron_scheduling: bool,
cron_poll_interval: Duration,
cron_max_catchup_executions: usize,
cron_enable_recovery: bool,
cron_recovery_interval: Duration,
cron_lost_threshold_minutes: i32,
cron_max_recovery_age: Duration,
cron_max_recovery_attempts: usize,
enable_trigger_scheduling: bool,
trigger_base_poll_interval: Duration,
trigger_poll_timeout: Duration,
enable_registry_reconciler: bool,
registry_reconcile_interval: Duration,
registry_enable_startup_reconciliation: bool,
registry_storage_path: Option<std::path::PathBuf>,
registry_storage_backend: String,
enable_claiming: bool,
heartbeat_interval: Duration,
stale_claim_sweep_interval: Duration,
stale_claim_threshold: Duration,
runner_id: Option<String>,
runner_name: Option<String>,
routing_config: Option<RoutingConfig>,
}
impl DefaultRunnerConfig {
pub fn builder() -> DefaultRunnerConfigBuilder {
DefaultRunnerConfigBuilder::default()
}
pub fn max_concurrent_tasks(&self) -> usize {
self.max_concurrent_tasks
}
pub fn scheduler_poll_interval(&self) -> Duration {
self.scheduler_poll_interval
}
pub fn task_timeout(&self) -> Duration {
self.task_timeout
}
pub fn workflow_timeout(&self) -> Option<Duration> {
self.workflow_timeout
}
pub fn db_pool_size(&self) -> u32 {
self.db_pool_size
}
pub fn enable_recovery(&self) -> bool {
self.enable_recovery
}
pub fn enable_cron_scheduling(&self) -> bool {
self.enable_cron_scheduling
}
pub fn cron_poll_interval(&self) -> Duration {
self.cron_poll_interval
}
pub fn cron_max_catchup_executions(&self) -> usize {
self.cron_max_catchup_executions
}
pub fn cron_enable_recovery(&self) -> bool {
self.cron_enable_recovery
}
pub fn cron_recovery_interval(&self) -> Duration {
self.cron_recovery_interval
}
pub fn cron_lost_threshold_minutes(&self) -> i32 {
self.cron_lost_threshold_minutes
}
pub fn cron_max_recovery_age(&self) -> Duration {
self.cron_max_recovery_age
}
pub fn cron_max_recovery_attempts(&self) -> usize {
self.cron_max_recovery_attempts
}
pub fn enable_trigger_scheduling(&self) -> bool {
self.enable_trigger_scheduling
}
pub fn trigger_base_poll_interval(&self) -> Duration {
self.trigger_base_poll_interval
}
pub fn trigger_poll_timeout(&self) -> Duration {
self.trigger_poll_timeout
}
pub fn enable_registry_reconciler(&self) -> bool {
self.enable_registry_reconciler
}
pub fn registry_reconcile_interval(&self) -> Duration {
self.registry_reconcile_interval
}
pub fn registry_enable_startup_reconciliation(&self) -> bool {
self.registry_enable_startup_reconciliation
}
pub fn registry_storage_path(&self) -> Option<&std::path::Path> {
self.registry_storage_path.as_deref()
}
pub fn registry_storage_backend(&self) -> &str {
&self.registry_storage_backend
}
pub fn enable_claiming(&self) -> bool {
self.enable_claiming
}
pub fn heartbeat_interval(&self) -> Duration {
self.heartbeat_interval
}
pub fn stale_claim_sweep_interval(&self) -> Duration {
self.stale_claim_sweep_interval
}
pub fn stale_claim_threshold(&self) -> Duration {
self.stale_claim_threshold
}
pub fn runner_id(&self) -> Option<&str> {
self.runner_id.as_deref()
}
pub fn runner_name(&self) -> Option<&str> {
self.runner_name.as_deref()
}
pub fn routing_config(&self) -> Option<&RoutingConfig> {
self.routing_config.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct DefaultRunnerConfigBuilder {
config: DefaultRunnerConfig,
}
impl Default for DefaultRunnerConfigBuilder {
fn default() -> Self {
Self {
config: DefaultRunnerConfig {
max_concurrent_tasks: 4,
scheduler_poll_interval: Duration::from_millis(100),
task_timeout: Duration::from_secs(300),
workflow_timeout: Some(Duration::from_secs(3600)),
db_pool_size: 10,
enable_recovery: true,
enable_cron_scheduling: true,
cron_poll_interval: Duration::from_secs(30),
cron_max_catchup_executions: 100,
cron_enable_recovery: true,
cron_recovery_interval: Duration::from_secs(300),
cron_lost_threshold_minutes: 10,
cron_max_recovery_age: Duration::from_secs(86400),
cron_max_recovery_attempts: 3,
enable_trigger_scheduling: true,
trigger_base_poll_interval: Duration::from_secs(1),
trigger_poll_timeout: Duration::from_secs(30),
enable_registry_reconciler: true,
registry_reconcile_interval: Duration::from_secs(5),
registry_enable_startup_reconciliation: true,
registry_storage_path: None,
registry_storage_backend: "filesystem".to_string(),
enable_claiming: true,
heartbeat_interval: Duration::from_secs(10),
stale_claim_sweep_interval: Duration::from_secs(30),
stale_claim_threshold: Duration::from_secs(60),
runner_id: None,
runner_name: None,
routing_config: None,
},
}
}
}
impl DefaultRunnerConfigBuilder {
pub fn max_concurrent_tasks(mut self, value: usize) -> Self {
self.config.max_concurrent_tasks = value;
self
}
pub fn scheduler_poll_interval(mut self, value: Duration) -> Self {
self.config.scheduler_poll_interval = value;
self
}
pub fn task_timeout(mut self, value: Duration) -> Self {
self.config.task_timeout = value;
self
}
pub fn workflow_timeout(mut self, value: Option<Duration>) -> Self {
self.config.workflow_timeout = value;
self
}
pub fn db_pool_size(mut self, value: u32) -> Self {
self.config.db_pool_size = value;
self
}
pub fn enable_recovery(mut self, value: bool) -> Self {
self.config.enable_recovery = value;
self
}
pub fn enable_cron_scheduling(mut self, value: bool) -> Self {
self.config.enable_cron_scheduling = value;
self
}
pub fn cron_poll_interval(mut self, value: Duration) -> Self {
self.config.cron_poll_interval = value;
self
}
pub fn cron_max_catchup_executions(mut self, value: usize) -> Self {
self.config.cron_max_catchup_executions = value;
self
}
pub fn cron_enable_recovery(mut self, value: bool) -> Self {
self.config.cron_enable_recovery = value;
self
}
pub fn cron_recovery_interval(mut self, value: Duration) -> Self {
self.config.cron_recovery_interval = value;
self
}
pub fn cron_lost_threshold_minutes(mut self, value: i32) -> Self {
self.config.cron_lost_threshold_minutes = value;
self
}
pub fn cron_max_recovery_age(mut self, value: Duration) -> Self {
self.config.cron_max_recovery_age = value;
self
}
pub fn cron_max_recovery_attempts(mut self, value: usize) -> Self {
self.config.cron_max_recovery_attempts = value;
self
}
pub fn enable_trigger_scheduling(mut self, value: bool) -> Self {
self.config.enable_trigger_scheduling = value;
self
}
pub fn trigger_base_poll_interval(mut self, value: Duration) -> Self {
self.config.trigger_base_poll_interval = value;
self
}
pub fn trigger_poll_timeout(mut self, value: Duration) -> Self {
self.config.trigger_poll_timeout = value;
self
}
pub fn enable_registry_reconciler(mut self, value: bool) -> Self {
self.config.enable_registry_reconciler = value;
self
}
pub fn registry_reconcile_interval(mut self, value: Duration) -> Self {
self.config.registry_reconcile_interval = value;
self
}
pub fn registry_enable_startup_reconciliation(mut self, value: bool) -> Self {
self.config.registry_enable_startup_reconciliation = value;
self
}
pub fn registry_storage_path(mut self, value: Option<std::path::PathBuf>) -> Self {
self.config.registry_storage_path = value;
self
}
pub fn registry_storage_backend(mut self, value: impl Into<String>) -> Self {
self.config.registry_storage_backend = value.into();
self
}
pub fn runner_id(mut self, value: Option<String>) -> Self {
self.config.runner_id = value;
self
}
pub fn runner_name(mut self, value: Option<String>) -> Self {
self.config.runner_name = value;
self
}
pub fn routing_config(mut self, value: Option<RoutingConfig>) -> Self {
self.config.routing_config = value;
self
}
pub fn enable_claiming(mut self, value: bool) -> Self {
self.config.enable_claiming = value;
self
}
pub fn heartbeat_interval(mut self, value: Duration) -> Self {
self.config.heartbeat_interval = value;
self
}
pub fn build(self) -> Result<DefaultRunnerConfig, ConfigError> {
if self.config.max_concurrent_tasks == 0 {
return Err(ConfigError::Invalid(
"max_concurrent_tasks must be > 0".into(),
));
}
if self.config.db_pool_size == 0 {
return Err(ConfigError::Invalid("db_pool_size must be > 0".into()));
}
if self.config.scheduler_poll_interval < Duration::from_millis(10) {
return Err(ConfigError::Invalid(
"scheduler_poll_interval must be >= 10ms".into(),
));
}
if self.config.stale_claim_threshold <= self.config.heartbeat_interval {
return Err(ConfigError::Invalid(format!(
"stale_claim_threshold ({:?}) must be greater than heartbeat_interval ({:?})",
self.config.stale_claim_threshold, self.config.heartbeat_interval
)));
}
if self.config.cron_max_catchup_executions > 1000 {
return Err(ConfigError::Invalid(format!(
"cron_max_catchup_executions ({}) must be <= 1000",
self.config.cron_max_catchup_executions
)));
}
Ok(self.config)
}
}
impl Default for DefaultRunnerConfig {
fn default() -> Self {
DefaultRunnerConfigBuilder::default()
.build()
.expect("default config must be valid")
}
}
pub struct DefaultRunnerBuilder {
pub(super) database_url: Option<String>,
pub(super) schema: Option<String>,
pub(super) config: DefaultRunnerConfig,
pub(super) runtime: Option<Runtime>,
pub(super) runtime_arc: Option<Arc<Runtime>>,
}
impl Default for DefaultRunnerBuilder {
fn default() -> Self {
Self::new()
}
}
impl DefaultRunnerBuilder {
pub fn new() -> Self {
Self {
database_url: None,
schema: None,
config: DefaultRunnerConfig::default(),
runtime: None,
runtime_arc: None,
}
}
pub fn database_url(mut self, url: &str) -> Self {
self.database_url = Some(url.to_string());
self
}
pub fn schema(mut self, schema: &str) -> Self {
self.schema = Some(schema.to_string());
self
}
pub fn with_config(mut self, config: DefaultRunnerConfig) -> Self {
self.config = config;
self
}
pub fn runtime(mut self, runtime: Runtime) -> Self {
self.runtime = Some(runtime);
self
}
pub fn runtime_arc(mut self, runtime: Arc<Runtime>) -> Self {
self.runtime_arc = Some(runtime);
self
}
pub(super) fn validate_schema_name(schema: &str) -> Result<(), WorkflowExecutionError> {
if !schema.chars().all(|c| c.is_alphanumeric() || c == '_') {
return Err(WorkflowExecutionError::Configuration {
message: "Schema name must contain only alphanumeric characters and underscores"
.to_string(),
});
}
Ok(())
}
pub async fn build(self) -> Result<DefaultRunner, WorkflowExecutionError> {
let database_url =
self.database_url
.ok_or_else(|| WorkflowExecutionError::Configuration {
message: "Database URL is required".to_string(),
})?;
if let Some(ref schema) = self.schema {
Self::validate_schema_name(schema)?;
if !database_url.starts_with("postgresql://")
&& !database_url.starts_with("postgres://")
{
return Err(WorkflowExecutionError::Configuration {
message: "Schema isolation is only supported with PostgreSQL. \
For SQLite multi-tenancy, use separate database files instead."
.to_string(),
});
}
}
let database = Database::new_with_schema(
&database_url,
"cloacina",
self.config.db_pool_size(),
self.schema.as_deref(),
);
#[cfg(feature = "postgres")]
{
if let Some(ref schema) = self.schema {
database.setup_schema(schema).await.map_err(|e| {
WorkflowExecutionError::Configuration {
message: format!("Failed to set up schema '{}': {}", schema, e),
}
})?;
} else {
database
.run_migrations()
.await
.map_err(|e| WorkflowExecutionError::DatabaseConnection { message: e })?;
}
}
#[cfg(not(feature = "postgres"))]
{
database
.run_migrations()
.await
.map_err(|e| WorkflowExecutionError::DatabaseConnection { message: e })?;
}
let runtime = self
.runtime_arc
.unwrap_or_else(|| Arc::new(self.runtime.unwrap_or_default()));
let scheduler = TaskScheduler::with_poll_interval(
database.clone(),
self.config.scheduler_poll_interval(),
)
.await
.map_err(|e| WorkflowExecutionError::Executor(e.into()))?
.with_runtime(runtime.clone());
let executor_config = ExecutorConfig {
max_concurrent_tasks: self.config.max_concurrent_tasks(),
task_timeout: self.config.task_timeout(),
enable_claiming: self.config.enable_claiming(),
heartbeat_interval: self.config.heartbeat_interval(),
};
let executor = ThreadTaskExecutor::with_runtime_and_registry(
database.clone(),
Arc::new(crate::TaskRegistry::new()),
runtime.clone(),
executor_config,
);
let dal = crate::dal::DAL::new(database.clone());
let routing_config = self
.config
.routing_config()
.cloned()
.unwrap_or_else(RoutingConfig::default);
let dispatcher = DefaultDispatcher::new(dal, routing_config);
dispatcher.register_executor("default", Arc::new(executor) as Arc<dyn TaskExecutor>);
let scheduler = scheduler.with_dispatcher(Arc::new(dispatcher));
let default_runner = DefaultRunner {
runtime,
database,
config: self.config.clone(),
scheduler: Arc::new(scheduler),
service_manager: Arc::new(RwLock::new(ServiceManager::new())),
};
default_runner.start_background_services().await?;
Ok(default_runner)
}
pub fn routing_config(mut self, config: RoutingConfig) -> Self {
self.config.routing_config = Some(config);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_runner_config() {
let config = DefaultRunnerConfig::default();
assert_eq!(config.max_concurrent_tasks(), 4);
assert_eq!(config.scheduler_poll_interval(), Duration::from_millis(100));
assert_eq!(config.task_timeout(), Duration::from_secs(300));
assert_eq!(config.workflow_timeout(), Some(Duration::from_secs(3600)));
assert!(config.enable_recovery());
assert!(config.enable_cron_scheduling());
assert!(config.enable_registry_reconciler());
assert_eq!(config.registry_storage_backend(), "filesystem");
assert!(config.registry_storage_path().is_none());
assert!(config.runner_id().is_none());
assert!(config.runner_name().is_none());
}
#[test]
fn test_registry_storage_backend_configuration() {
let config = DefaultRunnerConfig::default();
assert_eq!(config.registry_storage_backend(), "filesystem");
let config = DefaultRunnerConfig::builder()
.registry_storage_backend("sqlite")
.build()
.unwrap();
assert_eq!(config.registry_storage_backend(), "sqlite");
let config = DefaultRunnerConfig::builder()
.registry_storage_backend("postgres")
.build()
.unwrap();
assert_eq!(config.registry_storage_backend(), "postgres");
let custom_path = std::path::PathBuf::from("/custom/registry/path");
let config = DefaultRunnerConfig::builder()
.registry_storage_path(Some(custom_path.clone()))
.build()
.unwrap();
assert_eq!(config.registry_storage_path(), Some(custom_path.as_path()));
}
#[test]
fn test_runner_identification() {
let config = DefaultRunnerConfig::builder()
.runner_id(Some("test-runner-123".to_string()))
.runner_name(Some("Test Runner".to_string()))
.build()
.unwrap();
assert_eq!(config.runner_id(), Some("test-runner-123"));
assert_eq!(config.runner_name(), Some("Test Runner"));
}
#[test]
fn test_registry_configuration_options() {
let config = DefaultRunnerConfig::builder()
.enable_registry_reconciler(false)
.build()
.unwrap();
assert!(!config.enable_registry_reconciler());
let config = DefaultRunnerConfig::builder()
.registry_reconcile_interval(Duration::from_secs(30))
.build()
.unwrap();
assert_eq!(
config.registry_reconcile_interval(),
Duration::from_secs(30)
);
let config = DefaultRunnerConfig::builder()
.registry_enable_startup_reconciliation(false)
.build()
.unwrap();
assert!(!config.registry_enable_startup_reconciliation());
}
#[test]
fn test_cron_configuration() {
let config = DefaultRunnerConfig::builder()
.cron_poll_interval(Duration::from_secs(60))
.cron_recovery_interval(Duration::from_secs(300))
.cron_lost_threshold_minutes(15)
.cron_max_recovery_age(Duration::from_secs(86400))
.cron_max_recovery_attempts(5)
.build()
.unwrap();
assert_eq!(config.cron_poll_interval(), Duration::from_secs(60));
assert_eq!(config.cron_recovery_interval(), Duration::from_secs(300));
assert_eq!(config.cron_lost_threshold_minutes(), 15);
assert_eq!(config.cron_max_recovery_age(), Duration::from_secs(86400));
assert_eq!(config.cron_max_recovery_attempts(), 5);
}
#[test]
fn test_db_pool_size_default() {
let config = DefaultRunnerConfig::default();
assert_eq!(config.db_pool_size(), 10); }
#[test]
fn test_config_clone() {
let config = DefaultRunnerConfig::default();
let cloned = config.clone();
assert_eq!(
config.registry_storage_backend(),
cloned.registry_storage_backend()
);
assert_eq!(config.max_concurrent_tasks(), cloned.max_concurrent_tasks());
assert_eq!(
config.enable_registry_reconciler(),
cloned.enable_registry_reconciler()
);
}
#[test]
fn test_config_debug() {
let config = DefaultRunnerConfig::default();
let debug_str = format!("{:?}", config);
assert!(debug_str.contains("registry_storage_backend"));
assert!(debug_str.contains("filesystem"));
assert!(debug_str.contains("max_concurrent_tasks"));
}
#[test]
fn test_builder_all_fields() {
let config = DefaultRunnerConfig::builder()
.max_concurrent_tasks(8)
.scheduler_poll_interval(Duration::from_millis(200))
.task_timeout(Duration::from_secs(600))
.workflow_timeout(Some(Duration::from_secs(7200)))
.db_pool_size(20)
.enable_recovery(false)
.enable_cron_scheduling(false)
.cron_poll_interval(Duration::from_secs(60))
.cron_max_catchup_executions(10)
.cron_enable_recovery(false)
.enable_trigger_scheduling(false)
.trigger_base_poll_interval(Duration::from_secs(5))
.trigger_poll_timeout(Duration::from_secs(60))
.build()
.unwrap();
assert_eq!(config.max_concurrent_tasks(), 8);
assert_eq!(config.scheduler_poll_interval(), Duration::from_millis(200));
assert_eq!(config.task_timeout(), Duration::from_secs(600));
assert_eq!(config.workflow_timeout(), Some(Duration::from_secs(7200)));
assert_eq!(config.db_pool_size(), 20);
assert!(!config.enable_recovery());
assert!(!config.enable_cron_scheduling());
assert_eq!(config.cron_poll_interval(), Duration::from_secs(60));
assert_eq!(config.cron_max_catchup_executions(), 10);
assert!(!config.cron_enable_recovery());
assert!(!config.enable_trigger_scheduling());
assert_eq!(config.trigger_base_poll_interval(), Duration::from_secs(5));
assert_eq!(config.trigger_poll_timeout(), Duration::from_secs(60));
}
}