cloacina 0.6.0

A Rust library for resilient task execution and orchestration.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
/*
 *  Copyright 2025-2026 Colliery Software
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

//! Scheduler loop and workflow execution processing.
//!
//! This module contains the main scheduling loop that continuously processes
//! active workflow executions and manages task readiness.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use tokio::time;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

use crate::dal::DAL;
use crate::database::universal_types::UniversalUuid;
use crate::dispatcher::{Dispatcher, TaskReadyEvent};
use crate::error::ValidationError;
use crate::models::task_execution::TaskExecution;
use crate::models::workflow_execution::WorkflowExecutionRecord;
use crate::Runtime;

use super::state_manager::StateManager;

/// Maximum backoff interval during sustained errors (30 seconds).
const MAX_BACKOFF: Duration = Duration::from_secs(30);

/// Number of consecutive errors before logging a circuit-open warning.
const CIRCUIT_OPEN_THRESHOLD: u32 = 5;

/// Scheduler loop operations.
pub struct SchedulerLoop<'a> {
    dal: &'a DAL,
    runtime: Arc<Runtime>,
    instance_id: Uuid,
    poll_interval: Duration,
    /// Optional dispatcher for push-based task execution
    dispatcher: Option<Arc<dyn Dispatcher>>,
    /// Shutdown signal — when the sender drops or sends, the loop exits cleanly.
    shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
    /// Consecutive error count for circuit breaker / backoff.
    consecutive_errors: u32,
}

impl<'a> SchedulerLoop<'a> {
    /// Creates a new SchedulerLoop with an optional dispatcher.
    pub fn with_dispatcher(
        dal: &'a DAL,
        runtime: Arc<Runtime>,
        instance_id: Uuid,
        poll_interval: Duration,
        dispatcher: Option<Arc<dyn Dispatcher>>,
    ) -> Self {
        Self {
            dal,
            runtime,
            instance_id,
            poll_interval,
            dispatcher,
            shutdown_rx: None,
            consecutive_errors: 0,
        }
    }

    /// Set the shutdown receiver for graceful termination.
    pub fn with_shutdown(mut self, shutdown_rx: tokio::sync::watch::Receiver<bool>) -> Self {
        self.shutdown_rx = Some(shutdown_rx);
        self
    }

    /// Runs the main scheduling loop that continuously processes active workflow executions.
    ///
    /// This loop:
    /// 1. Checks for active workflow executions
    /// 2. Updates task readiness based on dependencies and trigger rules
    /// 3. Marks completed workflow executions
    /// 4. Repeats at the configured poll interval
    pub async fn run(&mut self) -> Result<(), ValidationError> {
        info!(
            "Starting task scheduler loop (instance: {}, poll_interval: {:?})",
            self.instance_id, self.poll_interval
        );
        let mut interval = time::interval(self.poll_interval);

        loop {
            if let Some(ref mut shutdown_rx) = self.shutdown_rx {
                tokio::select! {
                    _ = interval.tick() => {}
                    _ = shutdown_rx.changed() => {
                        info!("SchedulerLoop shutting down");
                        break;
                    }
                }
            } else {
                interval.tick().await;
            }

            match self.process_active_executions().await {
                Ok(_) => {
                    if self.consecutive_errors > 0 {
                        info!(
                            "Scheduler loop recovered after {} consecutive errors",
                            self.consecutive_errors
                        );
                        self.consecutive_errors = 0;
                    }
                    debug!("Scheduling loop completed successfully");
                }
                Err(e) => {
                    self.consecutive_errors += 1;

                    if self.consecutive_errors == CIRCUIT_OPEN_THRESHOLD {
                        warn!(
                            "Scheduler loop circuit open: {} consecutive errors — backing off (latest: {})",
                            self.consecutive_errors, e
                        );
                    } else if self.consecutive_errors.is_multiple_of(10) {
                        // Rate-limited logging: every 10th error after circuit opens
                        warn!(
                            "Scheduler loop still failing: {} consecutive errors (latest: {})",
                            self.consecutive_errors, e
                        );
                    } else if self.consecutive_errors < CIRCUIT_OPEN_THRESHOLD {
                        error!("Scheduling loop error: {}", e);
                    }

                    // Exponential backoff: poll_interval * 2^min(errors, 8) capped at MAX_BACKOFF
                    let backoff_exp = self.consecutive_errors.min(8);
                    let backoff = self
                        .poll_interval
                        .saturating_mul(1u32 << backoff_exp)
                        .min(MAX_BACKOFF);
                    if backoff > self.poll_interval {
                        time::sleep(backoff - self.poll_interval).await;
                    }
                }
            }
        }

        Ok(())
    }

    /// Processes all active workflow executions to update task readiness.
    pub async fn process_active_executions(&self) -> Result<(), ValidationError> {
        let active_executions = self
            .dal
            .workflow_execution()
            .get_active_executions()
            .await?;

        // SQL-derived gauge — re-seeded every tick so it cannot drift on crash,
        // claim loss, or any path that skips finalize_workflow_execution.
        metrics::gauge!("cloacina_active_workflows").set(active_executions.len() as f64);

        if active_executions.is_empty() {
            // Even with no active workflow executions, dispatch any Ready tasks (e.g., retries)
            if self.dispatcher.is_some() {
                self.dispatch_ready_tasks().await?;
            }
            return Ok(());
        }

        // Batch process all active workflow executions (evaluates pending tasks, marks them Ready)
        self.process_executions_batch(active_executions).await?;

        // Dispatch all Ready tasks (including newly marked and retry tasks)
        if self.dispatcher.is_some() {
            self.dispatch_ready_tasks().await?;
        }

        Ok(())
    }

    /// Processes multiple workflow executions in batch for better performance.
    ///
    /// This method optimizes workflow execution processing by:
    /// 1. Loading all pending tasks across all workflow executions in one query
    /// 2. Grouping tasks by workflow execution for processing
    /// 3. Batch checking workflow execution completion
    async fn process_executions_batch(
        &self,
        active_executions: Vec<WorkflowExecutionRecord>,
    ) -> Result<(), ValidationError> {
        let execution_ids: Vec<UniversalUuid> = active_executions.iter().map(|e| e.id).collect();

        // Batch load all pending tasks across all active workflow executions
        let all_pending_tasks = self
            .dal
            .task_execution()
            .get_pending_tasks_batch(execution_ids)
            .await?;

        // Group tasks by workflow execution ID for processing
        let mut tasks_by_execution: HashMap<UniversalUuid, Vec<TaskExecution>> = HashMap::new();
        for task in all_pending_tasks {
            tasks_by_execution
                .entry(task.workflow_execution_id)
                .or_default()
                .push(task);
        }

        let state_manager = StateManager::new(self.dal, self.runtime.clone());

        // Process each workflow execution's tasks
        for execution in &active_executions {
            if let Some(execution_tasks) = tasks_by_execution.get(&execution.id) {
                if let Err(e) = state_manager
                    .update_workflow_task_readiness(execution.id, execution_tasks)
                    .await
                {
                    error!(
                        "Failed to update task readiness for workflow execution {}: {}",
                        execution.id, e
                    );
                    continue;
                }
            }

            // Check if workflow execution is complete
            if self
                .dal
                .task_execution()
                .check_workflow_completion(execution.id)
                .await?
            {
                self.complete_execution(execution).await?;
            }
        }

        Ok(())
    }

    /// Dispatches all Ready tasks to the executor.
    ///
    /// This method finds tasks that are Ready (either newly marked or from retries)
    /// and dispatches them via the configured dispatcher. Tasks are only dispatched
    /// if their retry_at time has passed (or is null).
    async fn dispatch_ready_tasks(&self) -> Result<(), ValidationError> {
        let dispatcher = match &self.dispatcher {
            Some(d) => d,
            None => return Ok(()),
        };

        // Get all Ready tasks where retry_at has passed (or is null)
        let ready_tasks = self.dal.task_execution().get_ready_for_retry().await?;

        for task in ready_tasks {
            let event = TaskReadyEvent::new(
                task.id,
                task.workflow_execution_id,
                task.task_name.clone(),
                task.attempt,
            );

            if let Err(e) = dispatcher.dispatch(event).await {
                warn!(
                    task_id = %task.id,
                    task_name = %task.task_name,
                    error = %e,
                    "Failed to dispatch ready task"
                );
            }
        }

        Ok(())
    }

    /// Completes a workflow execution by updating its final context and marking it as completed.
    ///
    /// Guards against the race where two scheduler ticks both see the workflow execution
    /// as complete and both try to finalise it. Only the first caller (whose
    /// workflow execution is still "Running") will proceed; subsequent calls return early.
    async fn complete_execution(
        &self,
        execution: &WorkflowExecutionRecord,
    ) -> Result<(), ValidationError> {
        // Guard: only proceed if the workflow execution is still running.
        // This prevents duplicate WorkflowCompleted events when two scheduler
        // ticks race through check_workflow_completion concurrently.
        let current = self
            .dal
            .workflow_execution()
            .get_by_id(execution.id)
            .await?;
        if current.status == "Completed" || current.status == "Failed" {
            debug!(
                "Workflow execution {} already in status '{}', skipping completion",
                execution.id, current.status
            );
            return Ok(());
        }

        // Get task summary for logging
        let all_tasks = self
            .dal
            .task_execution()
            .get_all_tasks_for_workflow(execution.id)
            .await?;
        let completed_count = all_tasks.iter().filter(|t| t.status == "Completed").count();
        let failed_count = all_tasks.iter().filter(|t| t.status == "Failed").count();
        let skipped_count = all_tasks.iter().filter(|t| t.status == "Skipped").count();

        // Update the workflow execution's final context before marking complete
        if let Err(e) = self
            .update_execution_final_context(execution.id, &all_tasks)
            .await
        {
            warn!(
                "Failed to update final context for workflow execution {}: {}",
                execution.id, e
            );
        }

        // Determine final workflow execution status based on task outcomes
        if failed_count > 0 {
            let reason = format!(
                "{} task(s) failed, {} completed, {} skipped",
                failed_count, completed_count, skipped_count
            );
            self.dal
                .workflow_execution()
                .mark_failed(execution.id, &reason)
                .await?;
            // Workflow-level failures are always downstream of task failures.
            metrics::counter!(
                "cloacina_workflows_total",
                "status" => "failed",
                "reason" => "dependency_failed",
            )
            .increment(1);
            info!(
                "Workflow execution failed: {} (name: {}, {})",
                execution.id, execution.workflow_name, reason
            );
        } else {
            self.dal
                .workflow_execution()
                .mark_completed(execution.id)
                .await?;
            metrics::counter!(
                "cloacina_workflows_total",
                "status" => "completed",
                "reason" => "ok",
            )
            .increment(1);
            info!(
                "Workflow execution completed: {} (name: {}, {} completed, {} skipped)",
                execution.id, execution.workflow_name, completed_count, skipped_count
            );
        }

        // Record workflow execution duration. The active-workflows gauge is
        // SQL-derived in process_active_executions and does NOT need an
        // explicit decrement here — the next tick will re-seed it from the
        // authoritative row count.
        let duration = chrono::Utc::now() - execution.started_at.0;
        if let Ok(secs) = duration.to_std() {
            metrics::histogram!("cloacina_workflow_duration_seconds").record(secs.as_secs_f64());
        }

        Ok(())
    }

    /// Updates the workflow execution's final context when it completes.
    ///
    /// This method finds the context from the final task(s) that produced output
    /// and updates the workflow execution's context_id to point to that final context,
    /// ensuring that WorkflowExecutionResult.final_context returns the correct data.
    async fn update_execution_final_context(
        &self,
        workflow_execution_id: UniversalUuid,
        all_tasks: &[TaskExecution],
    ) -> Result<(), ValidationError> {
        // Find the final context by looking at the last task that completed with context
        // Priority order: Completed > Skipped, then by completion time (latest first)
        let mut final_context_id: Option<UniversalUuid> = None;
        let mut latest_completion_time: Option<chrono::DateTime<chrono::Utc>> = None;

        for task in all_tasks {
            // Only consider tasks that have finished execution and might have output context
            if task.status == "Completed" || task.status == "Skipped" {
                if let Some(completed_at) = task.completed_at {
                    // Check if this task has a context stored
                    let task_namespace = crate::task::TaskNamespace::from_string(&task.task_name)
                        .map_err(|_| {
                        crate::error::ValidationError::InvalidTaskName(task.task_name.clone())
                    })?;
                    if let Ok(task_metadata) = self
                        .dal
                        .task_execution_metadata()
                        .get_by_workflow_and_task(workflow_execution_id, &task_namespace)
                        .await
                    {
                        if let Some(context_id) = task_metadata.context_id {
                            // Use this context if it's the latest completion time or we haven't found one yet
                            if latest_completion_time.is_none()
                                || completed_at.0 > latest_completion_time.unwrap()
                            {
                                final_context_id = Some(context_id);
                                latest_completion_time = Some(completed_at.0);
                            }
                        }
                    }
                }
            }
        }

        // Update the workflow execution's context_id if we found a final context
        if let Some(context_id) = final_context_id {
            debug!(
                "Updating workflow execution {} final context to context_id: {}",
                workflow_execution_id, context_id
            );
            self.dal
                .workflow_execution()
                .update_final_context(workflow_execution_id, context_id)
                .await?;
        } else {
            debug!(
                "No final context found for workflow execution {} - keeping initial context",
                workflow_execution_id
            );
        }

        Ok(())
    }
}