use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tracing::Instrument;
use crate::dal::FilesystemRegistryStorage;
use crate::dal::UnifiedRegistryStorage;
use crate::dal::DAL;
use crate::executor::pipeline_executor::PipelineError;
use crate::registry::traits::WorkflowRegistry;
use crate::registry::{ReconcilerConfig, RegistryReconciler, WorkflowRegistryImpl};
use crate::{CronScheduler, CronSchedulerConfig};
use crate::{TriggerScheduler, TriggerSchedulerConfig};
use super::DefaultRunner;
impl DefaultRunner {
pub(super) fn create_runner_span(&self, operation: &str) -> tracing::Span {
if let (Some(runner_id), Some(runner_name)) =
(self.config.runner_id(), self.config.runner_name())
{
tracing::info_span!(
"runner_task",
runner_id = %runner_id,
runner_name = %runner_name,
operation = operation,
component = "cloacina_runner"
)
} else {
tracing::info_span!(
"runner_task",
operation = operation,
component = "cloacina_runner"
)
}
}
pub(super) async fn start_background_services(&self) -> Result<(), PipelineError> {
let mut handles = self.runtime_handles.write().await;
tracing::info!("Starting scheduler and executor background services");
let (shutdown_tx, mut scheduler_shutdown_rx) = broadcast::channel(1);
let executor_shutdown_rx = shutdown_tx.subscribe();
let scheduler = self.scheduler.clone();
let scheduler_span = self.create_runner_span("task_scheduler");
let scheduler_handle = tokio::spawn(
async move {
let mut scheduler_future = Box::pin(scheduler.run_scheduling_loop());
tokio::select! {
result = &mut scheduler_future => {
if let Err(e) = result {
tracing::error!("Scheduler loop failed: {}", e);
} else {
tracing::info!("Scheduler loop completed");
}
}
_ = scheduler_shutdown_rx.recv() => {
tracing::info!("Scheduler shutdown requested");
}
}
}
.instrument(scheduler_span),
);
drop(executor_shutdown_rx);
handles.scheduler_handle = Some(scheduler_handle);
handles.executor_handle = None; handles.shutdown_sender = Some(shutdown_tx.clone());
if self.config.enable_cron_scheduling() {
self.start_cron_services(&mut handles, &shutdown_tx).await?;
}
if self.config.enable_registry_reconciler() {
self.start_registry_reconciler(&mut handles, &shutdown_tx)
.await?;
}
if self.config.enable_trigger_scheduling() {
self.start_trigger_services(&mut handles, &shutdown_tx)
.await?;
}
Ok(())
}
async fn start_cron_services(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting cron scheduler");
let (cron_shutdown_tx, cron_shutdown_rx) = watch::channel(false);
let cron_config = CronSchedulerConfig {
poll_interval: self.config.cron_poll_interval(),
max_catchup_executions: self.config.cron_max_catchup_executions(),
max_acceptable_delay: Duration::from_secs(300), };
let dal = DAL::new(self.database.clone());
let cron_scheduler = CronScheduler::new(
Arc::new(dal),
Arc::new(self.clone()), cron_config,
cron_shutdown_rx,
);
let mut cron_scheduler_clone = cron_scheduler.clone();
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let cron_span = self.create_runner_span("cron_scheduler");
let cron_handle = tokio::spawn(
async move {
tokio::select! {
result = cron_scheduler_clone.run_polling_loop() => {
if let Err(e) = result {
tracing::error!("Cron scheduler failed: {}", e);
} else {
tracing::info!("Cron scheduler completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Cron scheduler shutdown requested via broadcast");
let _ = cron_shutdown_tx.send(true);
}
}
}
.instrument(cron_span),
);
*self.cron_scheduler.write().await = Some(Arc::new(cron_scheduler));
handles.cron_scheduler_handle = Some(cron_handle);
if self.config.cron_enable_recovery() {
self.start_cron_recovery(handles, shutdown_tx).await?;
}
Ok(())
}
async fn start_cron_recovery(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting cron recovery service");
let (recovery_shutdown_tx, recovery_shutdown_rx) = watch::channel(false);
let recovery_config = crate::CronRecoveryConfig {
check_interval: self.config.cron_recovery_interval(),
lost_threshold_minutes: self.config.cron_lost_threshold_minutes(),
max_recovery_age: self.config.cron_max_recovery_age(),
max_recovery_attempts: self.config.cron_max_recovery_attempts(),
recover_disabled_schedules: false,
};
let dal = DAL::new(self.database.clone());
let recovery_service = crate::CronRecoveryService::new(
Arc::new(dal),
Arc::new(self.clone()), recovery_config,
recovery_shutdown_rx,
);
let mut recovery_service_clone = recovery_service.clone();
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let recovery_span = self.create_runner_span("cron_recovery");
let recovery_handle = tokio::spawn(
async move {
tokio::select! {
result = recovery_service_clone.run_recovery_loop() => {
if let Err(e) = result {
tracing::error!("Cron recovery service failed: {}", e);
} else {
tracing::info!("Cron recovery service completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Cron recovery service shutdown requested via broadcast");
let _ = recovery_shutdown_tx.send(true);
}
}
}
.instrument(recovery_span),
);
*self.cron_recovery.write().await = Some(Arc::new(recovery_service));
handles.cron_recovery_handle = Some(recovery_handle);
Ok(())
}
async fn start_registry_reconciler(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting registry reconciler");
let (reconciler_shutdown_tx, reconciler_shutdown_rx) = watch::channel(false);
let reconciler_config = ReconcilerConfig {
reconcile_interval: self.config.registry_reconcile_interval(),
enable_startup_reconciliation: self.config.registry_enable_startup_reconciliation(),
package_operation_timeout: Duration::from_secs(30),
continue_on_package_error: true,
default_tenant_id: "public".to_string(),
};
let workflow_registry_result = match self.config.registry_storage_backend() {
"filesystem" => {
let storage_path = self
.config
.registry_storage_path()
.map(|p| p.to_path_buf())
.unwrap_or_else(|| std::env::temp_dir().join("cloacina_registry"));
match FilesystemRegistryStorage::new(storage_path) {
Ok(storage) => WorkflowRegistryImpl::new(storage, self.database.clone())
.map(|registry| Arc::new(registry) as Arc<dyn WorkflowRegistry>)
.map_err(|e| {
format!("Failed to create filesystem workflow registry: {}", e)
}),
Err(e) => Err(format!("Failed to create filesystem storage: {}", e)),
}
}
"sqlite" | "postgres" | "database" => {
let dal = crate::dal::DAL::new(self.database.clone());
let storage = UnifiedRegistryStorage::new(self.database.clone());
let registry_dal = dal.workflow_registry(storage);
Ok(Arc::new(registry_dal) as Arc<dyn WorkflowRegistry>)
}
backend => Err(format!(
"Unknown registry storage backend: {}. Valid options: filesystem, sqlite, postgres, database",
backend
)),
};
match workflow_registry_result {
Ok(workflow_registry_arc) => {
let registry_reconciler = RegistryReconciler::new(
workflow_registry_arc.clone(),
reconciler_config,
reconciler_shutdown_rx,
)
.map_err(|e| PipelineError::Configuration {
message: format!("Failed to create registry reconciler: {}", e),
})?;
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let reconciler_span = self.create_runner_span("registry_reconciler");
let reconciler_handle = tokio::spawn(
async move {
tokio::select! {
result = registry_reconciler.start_reconciliation_loop() => {
if let Err(e) = result {
tracing::error!("Registry reconciler failed: {}", e);
} else {
tracing::info!("Registry reconciler completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Registry reconciler shutdown requested via broadcast");
let _ = reconciler_shutdown_tx.send(true);
}
}
}
.instrument(reconciler_span),
);
*self.workflow_registry.write().await = Some(workflow_registry_arc);
handles.registry_reconciler_handle = Some(reconciler_handle);
}
Err(e) => {
tracing::error!("Failed to create workflow registry: {}", e);
}
}
Ok(())
}
async fn start_trigger_services(
&self,
handles: &mut super::RuntimeHandles,
shutdown_tx: &broadcast::Sender<()>,
) -> Result<(), PipelineError> {
tracing::info!("Starting trigger scheduler");
let (trigger_shutdown_tx, trigger_shutdown_rx) = watch::channel(false);
let trigger_config = TriggerSchedulerConfig {
base_poll_interval: self.config.trigger_base_poll_interval(),
poll_timeout: self.config.trigger_poll_timeout(),
};
let dal = DAL::new(self.database.clone());
let trigger_scheduler = TriggerScheduler::new(
Arc::new(dal),
Arc::new(self.clone()), trigger_config,
trigger_shutdown_rx,
);
let mut trigger_scheduler_clone = trigger_scheduler.clone();
let mut broadcast_shutdown_rx = shutdown_tx.subscribe();
let trigger_span = self.create_runner_span("trigger_scheduler");
let trigger_handle = tokio::spawn(
async move {
tokio::select! {
result = trigger_scheduler_clone.run_polling_loop() => {
if let Err(e) = result {
tracing::error!("Trigger scheduler failed: {}", e);
} else {
tracing::info!("Trigger scheduler completed");
}
}
_ = broadcast_shutdown_rx.recv() => {
tracing::info!("Trigger scheduler shutdown requested via broadcast");
let _ = trigger_shutdown_tx.send(true);
}
}
}
.instrument(trigger_span),
);
*self.trigger_scheduler.write().await = Some(Arc::new(trigger_scheduler));
handles.trigger_scheduler_handle = Some(trigger_handle);
Ok(())
}
}