use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tracing::Instrument;
use crate::dal::FilesystemRegistryStorage;
use crate::dal::UnifiedRegistryStorage;
use crate::dal::DAL;
use crate::executor::pipeline_executor::PipelineError;
use crate::registry::traits::WorkflowRegistry;
use crate::registry::{ReconcilerConfig, RegistryReconciler, WorkflowRegistryImpl};
use crate::{Scheduler, SchedulerConfig};
use super::DefaultRunner;
impl DefaultRunner {
pub(super) fn create_runner_span(&self, operation: &str) -> tracing::Span {
if let (Some(runner_id), Some(runner_name)) =
(self.config.runner_id(), self.config.runner_name())
{
tracing::info_span!(
"runner_task",
runner_id = %runner_id,
runner_name = %runner_name,
operation = operation,
component = "cloacina_runner"
)
} else {
tracing::info_span!(
"runner_task",
operation = operation,
component = "cloacina_runner"
)
}
}
pub(super) async fn start_background_services(&self) -> Result<(), PipelineError> {
let mut handles = self.runtime_handles.write().await;
tracing::info!("Starting scheduler and executor background services");
let (shutdown_tx, mut scheduler_shutdown_rx) = broadcast::channel(1);
let executor_shutdown_rx = shutdown_tx.subscribe();
let scheduler = self.scheduler.clone();
let scheduler_span = self.create_runner_span("task_scheduler");
let scheduler_handle = tokio::spawn(
async move {
let mut scheduler_future = Box::pin(scheduler.run_scheduling_loop());
tokio::select! {
result = &mut scheduler_future => {
if let Err(e) = result {
tracing::error!("Scheduler loop failed: {}", e);
} else {
tracing::info!("Scheduler loop completed");
}
}
_ = scheduler_shutdown_rx.recv() => {
tracing::info!("Scheduler shutdown requested");
}
}
}
.instrument(scheduler_span),
);
drop(executor_shutdown_rx);
handles.scheduler_handle = Some(scheduler_handle);
handles.executor_handle = None; handles.shutdown_sender = Some(shutdown_tx.clone());
if self.config.enable_cron_scheduling() || self.config.enable_trigger_scheduling() {
self.start_unified_scheduler(&mut handles, &shutdown_tx)
.await?;
}
if self.config.enable_cron_scheduling() && self.config.cron_enable_recovery() {
self.start_cron_recovery(&mut handles, &shutdown_tx).await?;
}
if self.config.enable_registry_reconciler() {
self.start_registry_reconciler(&mut handles, &shutdown_tx)
.await?;
}
if self.config.enable_claiming() {
self.start_stale_claim_sweeper(&mut handles, &shutdown_tx)
.await?;
}
Ok(())
}
async fn start_unified_scheduler(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting unified scheduler");
let (unified_shutdown_tx, unified_shutdown_rx) = watch::channel(false);
let scheduler_config = SchedulerConfig {
cron_poll_interval: self.config.cron_poll_interval(),
max_catchup_executions: self.config.cron_max_catchup_executions(),
max_acceptable_delay: Duration::from_secs(300), trigger_base_poll_interval: self.config.trigger_base_poll_interval(),
trigger_poll_timeout: self.config.trigger_poll_timeout(),
};
let dal = DAL::new(self.database.clone());
let unified_scheduler = Scheduler::new(
Arc::new(dal),
Arc::new(self.clone()), scheduler_config,
unified_shutdown_rx,
);
let mut scheduler_clone = unified_scheduler.clone();
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let span = self.create_runner_span("unified_scheduler");
let handle = tokio::spawn(
async move {
tokio::select! {
result = scheduler_clone.run_polling_loop() => {
if let Err(e) = result {
tracing::error!("Unified scheduler failed: {}", e);
} else {
tracing::info!("Unified scheduler completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Unified scheduler shutdown requested via broadcast");
let _ = unified_shutdown_tx.send(true);
}
}
}
.instrument(span),
);
*self.unified_scheduler.write().await = Some(Arc::new(unified_scheduler));
handles.unified_scheduler_handle = Some(handle);
Ok(())
}
async fn start_cron_recovery(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting cron recovery service");
let (recovery_shutdown_tx, recovery_shutdown_rx) = watch::channel(false);
let recovery_config = crate::CronRecoveryConfig {
check_interval: self.config.cron_recovery_interval(),
lost_threshold_minutes: self.config.cron_lost_threshold_minutes(),
max_recovery_age: self.config.cron_max_recovery_age(),
max_recovery_attempts: self.config.cron_max_recovery_attempts(),
recover_disabled_schedules: false,
};
let dal = DAL::new(self.database.clone());
let recovery_service = crate::CronRecoveryService::new(
Arc::new(dal),
Arc::new(self.clone()), recovery_config,
recovery_shutdown_rx,
);
let mut recovery_service_clone = recovery_service.clone();
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let recovery_span = self.create_runner_span("cron_recovery");
let recovery_handle = tokio::spawn(
async move {
tokio::select! {
result = recovery_service_clone.run_recovery_loop() => {
if let Err(e) = result {
tracing::error!("Cron recovery service failed: {}", e);
} else {
tracing::info!("Cron recovery service completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Cron recovery service shutdown requested via broadcast");
let _ = recovery_shutdown_tx.send(true);
}
}
}
.instrument(recovery_span),
);
*self.cron_recovery.write().await = Some(Arc::new(recovery_service));
handles.cron_recovery_handle = Some(recovery_handle);
Ok(())
}
async fn start_registry_reconciler(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting registry reconciler");
let (reconciler_shutdown_tx, reconciler_shutdown_rx) = watch::channel(false);
let reconciler_config = ReconcilerConfig {
reconcile_interval: self.config.registry_reconcile_interval(),
enable_startup_reconciliation: self.config.registry_enable_startup_reconciliation(),
package_operation_timeout: Duration::from_secs(30),
continue_on_package_error: true,
default_tenant_id: "public".to_string(),
};
let workflow_registry_result = match self.config.registry_storage_backend() {
"filesystem" => {
let storage_path = self
.config
.registry_storage_path()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| std::env::temp_dir().join("cloacina_registry"));
match FilesystemRegistryStorage::new(storage_path) {
Ok(storage) => WorkflowRegistryImpl::new(storage, self.database.clone())
.map(|registry| Arc::new(registry) as Arc<dyn WorkflowRegistry>)
.map_err(|e| {
format!("Failed to create filesystem workflow registry: {}", e)
}),
Err(e) => Err(format!("Failed to create filesystem storage: {}", e)),
}
}
"sqlite" | "postgres" | "database" => {
let dal = crate::dal::DAL::new(self.database.clone());
let storage = UnifiedRegistryStorage::new(self.database.clone());
let registry_dal = dal.workflow_registry(storage);
Ok(Arc::new(registry_dal) as Arc<dyn WorkflowRegistry>)
}
backend => Err(format!(
"Unknown registry storage backend: {}. Valid options: filesystem, sqlite, postgres, database",
backend
)),
};
match workflow_registry_result {
Ok(workflow_registry_arc) => {
let registry_reconciler = RegistryReconciler::new(
workflow_registry_arc.clone(),
reconciler_config,
reconciler_shutdown_rx,
)
.map_err(|e| PipelineError::Configuration {
message: format!("Failed to create registry reconciler: {}", e),
})?;
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let reconciler_span = self.create_runner_span("registry_reconciler");
let reconciler_handle = tokio::spawn(
async move {
tokio::select! {
result = registry_reconciler.start_reconciliation_loop() => {
if let Err(e) = result {
tracing::error!("Registry reconciler failed: {}", e);
} else {
tracing::info!("Registry reconciler completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Registry reconciler shutdown requested via broadcast");
let _ = reconciler_shutdown_tx.send(true);
}
}
}
.instrument(reconciler_span),
);
*self.workflow_registry.write().await = Some(workflow_registry_arc);
handles.registry_reconciler_handle = Some(reconciler_handle);
}
Err(e) => {
tracing::error!("Failed to create workflow registry: {}", e);
}
}
Ok(())
}
async fn start_stale_claim_sweeper(
&self,
_handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
use crate::task_scheduler::stale_claim_sweeper::{
StaleClaimSweeper, StaleClaimSweeperConfig,
};
tracing::info!("Starting stale claim sweeper");
let (sweeper_shutdown_tx, sweeper_shutdown_rx) = watch::channel(false);
let sweeper_config = StaleClaimSweeperConfig {
sweep_interval: self.config.stale_claim_sweep_interval(),
stale_threshold: self.config.stale_claim_threshold(),
};
let dal = DAL::new(self.database.clone());
let mut sweeper =
StaleClaimSweeper::new(Arc::new(dal), sweeper_config, sweeper_shutdown_rx);
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let sweeper_span = self.create_runner_span("stale_claim_sweeper");
let sweeper_handle = tokio::spawn(
async move {
tokio::select! {
_ = sweeper.run() => {
tracing::info!("Stale claim sweeper completed");
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Stale claim sweeper shutdown requested");
let _ = sweeper_shutdown_tx.send(true);
}
}
}
.instrument(sweeper_span),
);
drop(sweeper_handle);
Ok(())
}
}