cloacina 0.6.1

A Rust library for resilient task execution and orchestration.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
/*
 *  Copyright 2025-2026 Colliery Software
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

//! # Task Scheduler
//!
//! The Task Scheduler converts Workflow definitions into persistent database execution plans
//! and manages task readiness based on dependencies and trigger rules.
//!
//! ## Overview
//!
//! The scheduler builds on existing Cloacina components:
//! - **Workflow**: Task definitions and dependency graphs
//! - **Context**: Type-safe serializable execution context
//! - **Database**: Persistent execution state tracking
//! - **DAL**: Data access layer for database operations
//!
//! ## Key Features
//!
//! - Convert Workflow instances into database execution plans
//! - Manage task state transitions based on dependencies
//! - Support advanced trigger rules for conditional execution
//! - Coordinate with executor through database state
//! - Context management and merging for task dependencies
//!
//! Recovery of orphaned tasks is handled out-of-band by
//! [`stale_claim_sweeper::StaleClaimSweeper`], which releases stale claims
//! based on heartbeat expiry rather than wall-clock task status.
//!
//! ## Task State Management
//!
//! Tasks transition through the following states:
//! - **NotStarted**: Initial state when task is created
//! - **Pending**: Waiting for dependencies to complete
//! - **Ready**: Dependencies satisfied, ready for execution
//! - **Running**: Currently being executed
//! - **Completed**: Successfully finished
//! - **Failed**: Execution failed
//! - **Skipped**: Skipped due to trigger rules
//! - **Abandoned**: Permanently failed after recovery attempts
//!
//! ## Error Handling & Recovery
//!
//! The scheduler relies on per-task retry policies for failure handling.
//! Orphaned tasks (claimed by a runner whose heartbeat has expired) are
//! reclaimed by the stale claim sweeper rather than by scheduler startup.
//!
//! ## Context Management
//!
//! Context handling follows these rules:
//! - Initial context provided at workflow execution
//! - Single dependency: inherits context directly
//! - Multiple dependencies: merges contexts with later overrides
//! - Type-safe serialization/deserialization
//! - Validation of context values in trigger rules
//!
//! ## Performance Considerations
//!
//! - Scheduling loop runs every second by default
//! - Efficient database queries for task state updates
//! - Batch processing of task readiness checks
//! - Optimized context merging for multiple dependencies
//! - Minimal database locking for concurrent operations
//!
//! ## Thread Safety
//!
//! The scheduler is designed for concurrent operation:
//! - Thread-safe database operations
//! - Atomic task state transitions
//! - Safe context merging for parallel tasks
//! - Lock-free trigger rule evaluation
//!
//! ## Usage
//!
//! ```rust,ignore
//! use cloacina::{workflow, task, Context, Database, TaskError};
//! use cloacina::execution_planner::TaskScheduler;
//!
//! // Define tasks
//! #[task(id = "fetch-data", dependencies = [])]
//! async fn fetch_data(context: &mut Context<serde_json::Value>) -> Result<(), TaskError> {
//!     context.insert("data", serde_json::json!({"status": "fetched"}))?;
//!     Ok(())
//! }
//!
//! // Create workflow
//! let workflow = workflow! {
//!     name: "data-pipeline",
//!     description: "Simple data processing workflow",
//!     tasks: [fetch_data]
//! };
//!
//! // Schedule execution
//! let database = Database::new("postgresql://localhost/cloacina")?;
//! let scheduler = TaskScheduler::new(database, vec![workflow]);
//! let input_context = Context::new();
//! let execution_id = scheduler.schedule_workflow_execution("data-pipeline", input_context).await?;
//!
//! // Run scheduling loop
//! scheduler.run_scheduling_loop().await?;
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! ```

mod context_manager;
mod scheduler_loop;
pub mod stale_claim_sweeper;
mod state_manager;
mod trigger_rules;

// Re-export public types
pub use trigger_rules::{TriggerCondition, TriggerRule, ValueOperator};

use std::sync::Arc;
use std::time::Duration;

use diesel::prelude::*;
use diesel::Connection;
use tracing::info;
use uuid::Uuid;

use crate::dal::unified::models::{NewUnifiedTaskExecution, NewUnifiedWorkflowExecution};
use crate::dal::DAL;
use crate::database::schema::unified::{task_executions, workflow_executions};
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::dispatcher::Dispatcher;
use crate::error::ValidationError;
use crate::task::TaskNamespace;
use crate::Runtime;
use crate::{Context, Database, Workflow};

use scheduler_loop::SchedulerLoop;

/// The main Task Scheduler that manages workflow execution and task readiness.
///
/// The TaskScheduler converts Workflow definitions into persistent database execution plans,
/// tracks task state transitions, and manages dependencies through trigger rules.
///
/// # Thread Safety
///
/// The TaskScheduler is designed to be thread-safe and can be shared across multiple threads.
/// All database operations are performed through a connection pool, and state transitions
/// are handled atomically.
///
/// # Error Handling
///
/// The scheduler implements comprehensive error handling:
/// - Database errors are wrapped in ValidationError
/// - Workflow validation errors are caught early
/// - Context evaluation errors are handled gracefully
///
/// # Performance
///
/// The scheduler is optimized for:
/// - Efficient database operations
/// - Minimal locking
/// - Batch processing where possible
/// - Memory-efficient context management
///
/// # Examples
///
/// ```rust,ignore
/// use cloacina::{Database, TaskScheduler};
/// use cloacina::workflow::Workflow;
///
/// // Create a new scheduler with recovery
/// let database = Database::new("postgresql://localhost/cloacina")?;
/// let scheduler = TaskScheduler::with_global_workflows_and_recovery(database).await?;
///
/// // Run the scheduling loop
/// scheduler.run_scheduling_loop().await?;
/// ```
pub struct TaskScheduler {
    dal: DAL,
    runtime: Arc<Runtime>,
    instance_id: Uuid,
    poll_interval: Duration,
    /// Optional dispatcher for push-based task execution
    dispatcher: Option<Arc<dyn Dispatcher>>,
    /// Shutdown signal for graceful termination of the scheduling loop.
    shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
}

impl TaskScheduler {
    /// Creates a new TaskScheduler instance with default configuration using global workflow registry.
    ///
    /// This is the recommended constructor for most use cases. The TaskScheduler will:
    /// - Use all workflows registered in the global registry
    /// - Use default poll interval (100ms)
    ///
    /// # Arguments
    ///
    /// * `database` - Database instance for persistence
    ///
    /// # Returns
    ///
    /// A new TaskScheduler instance ready to schedule and manage workflow executions.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use cloacina::{Database, TaskScheduler};
    ///
    /// let database = Database::new("postgresql://localhost/cloacina")?;
    /// let scheduler = TaskScheduler::new(database).await?;
    /// ```
    ///
    /// # Errors
    ///
    /// May return ValidationError if construction fails.
    pub async fn new(database: Database) -> Result<Self, ValidationError> {
        let scheduler = Self::with_poll_interval(database, Duration::from_millis(100)).await?;
        Ok(scheduler)
    }

    /// Creates a new TaskScheduler with custom poll interval using global workflow registry.
    ///
    /// # Arguments
    ///
    /// * `database` - Database instance for persistence
    /// * `poll_interval` - How often to check for ready tasks
    ///
    /// # Returns
    ///
    /// A new TaskScheduler instance ready to schedule and manage workflow executions.
    pub async fn with_poll_interval(
        database: Database,
        poll_interval: Duration,
    ) -> Result<Self, ValidationError> {
        Ok(Self::with_poll_interval_sync(database, poll_interval))
    }

    /// Creates a new TaskScheduler with custom poll interval (synchronous version).
    pub(crate) fn with_poll_interval_sync(database: Database, poll_interval: Duration) -> Self {
        let dal = DAL::new(database.clone());

        Self {
            dal,
            runtime: Arc::new(Runtime::new()),
            instance_id: Uuid::new_v4(),
            poll_interval,
            dispatcher: None,
            shutdown_rx: None,
        }
    }

    /// Sets the runtime for this scheduler, replacing the default.
    pub fn with_runtime(mut self, runtime: Arc<Runtime>) -> Self {
        self.runtime = runtime;
        self
    }

    /// Returns a reference to the runtime used by this scheduler.
    pub fn runtime(&self) -> &Arc<Runtime> {
        &self.runtime
    }

    /// Sets the shutdown receiver for graceful termination of the scheduling loop.
    pub fn with_shutdown(mut self, shutdown_rx: tokio::sync::watch::Receiver<bool>) -> Self {
        self.shutdown_rx = Some(shutdown_rx);
        self
    }

    /// Sets the dispatcher for push-based task execution.
    ///
    /// When a dispatcher is configured, the scheduler will dispatch task events
    /// when tasks become ready, in addition to marking them Ready in the database.
    ///
    /// # Arguments
    ///
    /// * `dispatcher` - The dispatcher to use for task events
    ///
    /// # Returns
    ///
    /// Self for method chaining
    pub fn with_dispatcher(mut self, dispatcher: Arc<dyn Dispatcher>) -> Self {
        self.dispatcher = Some(dispatcher);
        self
    }

    /// Returns a reference to the dispatcher if configured.
    pub fn dispatcher(&self) -> Option<&Arc<dyn Dispatcher>> {
        self.dispatcher.as_ref()
    }

    /// Schedules a new workflow execution with the provided input context.
    ///
    /// This method:
    /// 1. Validates the workflow exists in the registry
    /// 2. Stores the input context in the database
    /// 3. Creates a new workflow execution record
    /// 4. Initializes task execution records for all workflow tasks
    ///
    /// # Arguments
    ///
    /// * `workflow_name` - Name of the workflow to execute
    /// * `input_context` - Context containing input data for the workflow
    ///
    /// # Returns
    ///
    /// The UUID of the created workflow execution on success.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use cloacina::{Context, TaskScheduler};
    /// use serde_json::json;
    ///
    /// let scheduler = TaskScheduler::new(database).await?;
    /// let mut context = Context::new();
    /// context.insert("input", json!({"key": "value"}))?;
    ///
    /// let execution_id = scheduler.schedule_workflow_execution("my-workflow", context).await?;
    /// ```
    ///
    /// # Errors
    ///
    /// Returns `ValidationError::WorkflowNotFound` if the workflow doesn't exist in the registry,
    /// or other validation errors if database operations fail.
    ///
    /// # Performance
    ///
    /// This operation performs multiple database transactions:
    /// - Context storage
    /// - Workflow execution creation
    /// - Task execution initialization
    ///
    /// All operations are performed in a single transaction for consistency.
    pub async fn schedule_workflow_execution(
        &self,
        workflow_name: &str,
        input_context: Context<serde_json::Value>,
    ) -> Result<Uuid, ValidationError> {
        info!("Scheduling workflow execution: {}", workflow_name);

        // Look up workflow in scoped runtime registry
        let workflow = match self.runtime.get_workflow(workflow_name) {
            Some(wf) => wf,
            None => return Err(ValidationError::WorkflowNotFound(workflow_name.to_string())),
        };

        let current_version = workflow.metadata().version.clone();
        let last_version = self
            .dal
            .workflow_execution()
            .get_last_version(workflow_name)
            .await?;

        if last_version.as_ref() != Some(&current_version) {
            info!(
                "Workflow '{}' version changed: {} -> {}",
                workflow_name,
                last_version.unwrap_or_else(|| "none".to_string()),
                current_version
            );
        }

        // Store context first (separate operation - needed before main transaction)
        let stored_context = self.dal.context().create(&input_context).await?;

        // Build all task data BEFORE the transaction
        let task_ids = workflow.topological_sort()?;
        let mut task_data: Vec<(String, String, String, i32)> = Vec::with_capacity(task_ids.len());

        for task_id in &task_ids {
            let trigger_rules = self.get_task_trigger_rules(&workflow, task_id);
            let task_config = self.get_task_configuration(&workflow, task_id);
            let max_attempts = workflow
                .get_task(task_id)
                .map(|t| t.retry_policy().max_attempts)
                .unwrap_or(3);

            task_data.push((
                task_id.to_string(),
                trigger_rules.to_string(),
                task_config.to_string(),
                max_attempts,
            ));
        }

        // Prepare workflow execution data
        let workflow_execution_id = UniversalUuid::new_v4();
        let now = UniversalTimestamp::now();
        let wf_name = workflow_name.to_string();
        let wf_version = current_version.clone();

        // Create workflow execution AND tasks in a single atomic transaction
        // This prevents the race condition where the scheduler sees a workflow execution before tasks exist
        crate::dispatch_backend!(
            self.dal.backend(),
            self.create_workflow_execution_postgres(
                workflow_execution_id,
                now,
                wf_name,
                wf_version,
                stored_context,
                task_data,
            )
            .await?,
            self.create_workflow_execution_sqlite(
                workflow_execution_id,
                now,
                wf_name,
                wf_version,
                stored_context,
                task_data,
            )
            .await?
        );

        // NOTE: cloacina_active_workflows gauge is SQL-derived — re-seeded
        // each tick in SchedulerLoop::process_active_executions from the
        // workflow_executions row count. We intentionally do NOT increment
        // here; doing so would cause gauge drift on any code path that skips
        // finalize_workflow_execution (crash, claim loss, etc.).
        info!("Workflow execution scheduled: {}", workflow_execution_id);
        Ok(workflow_execution_id.into())
    }

    /// Creates workflow execution and tasks in PostgreSQL.
    #[cfg(feature = "postgres")]
    async fn create_workflow_execution_postgres(
        &self,
        workflow_execution_id: UniversalUuid,
        now: UniversalTimestamp,
        workflow_name: String,
        workflow_version: String,
        stored_context: Option<UniversalUuid>,
        task_data: Vec<(String, String, String, i32)>,
    ) -> Result<(), ValidationError> {
        let conn = self
            .dal
            .database()
            .get_postgres_connection()
            .await
            .map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;

        conn.interact(move |conn| {
            conn.transaction(|conn| {
                // Insert workflow execution
                diesel::insert_into(workflow_executions::table)
                    .values(&NewUnifiedWorkflowExecution {
                        id: workflow_execution_id,
                        workflow_name,
                        workflow_version,
                        status: "Pending".to_string(),
                        context_id: stored_context,
                        started_at: now,
                        created_at: now,
                        updated_at: now,
                    })
                    .execute(conn)?;

                // Insert all tasks
                for (task_name, trigger_rules, task_config, max_attempts) in task_data {
                    diesel::insert_into(task_executions::table)
                        .values(&NewUnifiedTaskExecution {
                            id: UniversalUuid::new_v4(),
                            workflow_execution_id,
                            task_name,
                            status: "NotStarted".to_string(),
                            attempt: 1,
                            max_attempts,
                            trigger_rules,
                            task_configuration: task_config,
                            created_at: now,
                            updated_at: now,
                        })
                        .execute(conn)?;
                }

                Ok::<_, diesel::result::Error>(())
            })
        })
        .await
        .map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;

        Ok(())
    }

    /// Creates workflow execution and tasks in SQLite.
    #[cfg(feature = "sqlite")]
    async fn create_workflow_execution_sqlite(
        &self,
        workflow_execution_id: UniversalUuid,
        now: UniversalTimestamp,
        workflow_name: String,
        workflow_version: String,
        stored_context: Option<UniversalUuid>,
        task_data: Vec<(String, String, String, i32)>,
    ) -> Result<(), ValidationError> {
        let conn = self
            .dal
            .database()
            .get_sqlite_connection()
            .await
            .map_err(|e| ValidationError::ConnectionPool(e.to_string()))?;

        conn.interact(move |conn| {
            conn.transaction(|conn| {
                // Insert workflow execution
                diesel::insert_into(workflow_executions::table)
                    .values(&NewUnifiedWorkflowExecution {
                        id: workflow_execution_id,
                        workflow_name,
                        workflow_version,
                        status: "Pending".to_string(),
                        context_id: stored_context,
                        started_at: now,
                        created_at: now,
                        updated_at: now,
                    })
                    .execute(conn)?;

                // Insert all tasks
                for (task_name, trigger_rules, task_config, max_attempts) in task_data {
                    diesel::insert_into(task_executions::table)
                        .values(&NewUnifiedTaskExecution {
                            id: UniversalUuid::new_v4(),
                            workflow_execution_id,
                            task_name,
                            status: "NotStarted".to_string(),
                            attempt: 1,
                            max_attempts,
                            trigger_rules,
                            task_configuration: task_config,
                            created_at: now,
                            updated_at: now,
                        })
                        .execute(conn)?;
                }

                Ok::<_, diesel::result::Error>(())
            })
        })
        .await
        .map_err(|e| ValidationError::ConnectionPool(e.to_string()))??;

        Ok(())
    }

    /// Runs the main scheduling loop that continuously processes active workflow executions.
    ///
    /// This loop:
    /// 1. Checks for active workflow executions
    /// 2. Updates task readiness based on dependencies and trigger rules
    /// 3. Marks completed workflow executions
    /// 4. Repeats every second
    ///
    /// # Returns
    ///
    /// This method runs indefinitely until an error occurs.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use cloacina::TaskScheduler;
    ///
    /// let scheduler = TaskScheduler::with_global_workflows(database);
    /// scheduler.run_scheduling_loop().await?;
    /// ```
    ///
    /// # Errors
    ///
    /// Returns validation errors if database operations fail during the scheduling loop.
    /// The loop will continue running on non-fatal errors, with errors logged.
    ///
    /// # Performance
    ///
    /// The scheduling loop:
    /// - Runs every second by default
    /// - Processes all active workflow executions in each iteration
    /// - Uses efficient batch queries where possible
    /// - Implements backoff for database errors
    ///
    /// # Thread Safety
    ///
    /// The scheduling loop is designed to be run in a separate thread or task.
    /// Multiple instances should not be run simultaneously.
    pub async fn run_scheduling_loop(&self) -> Result<(), ValidationError> {
        let mut scheduler_loop = SchedulerLoop::with_dispatcher(
            &self.dal,
            self.runtime.clone(),
            self.instance_id,
            self.poll_interval,
            self.dispatcher.clone(),
        );
        if let Some(ref shutdown_rx) = self.shutdown_rx {
            scheduler_loop = scheduler_loop.with_shutdown(shutdown_rx.clone());
        }
        scheduler_loop.run().await
    }

    /// Processes all active workflow executions to update task readiness.
    pub async fn process_active_executions(&self) -> Result<(), ValidationError> {
        let scheduler_loop = SchedulerLoop::with_dispatcher(
            &self.dal,
            self.runtime.clone(),
            self.instance_id,
            self.poll_interval,
            self.dispatcher.clone(),
        );
        scheduler_loop.process_active_executions().await
    }

    /// Gets trigger rules for a specific task from the task implementation.
    fn get_task_trigger_rules(
        &self,
        workflow: &Workflow,
        task_namespace: &TaskNamespace,
    ) -> serde_json::Value {
        workflow
            .get_task(task_namespace)
            .map(|task| task.trigger_rules())
            .unwrap_or_else(|_| serde_json::json!({"type": "Always"}))
    }

    /// Gets task configuration (currently returns empty object).
    fn get_task_configuration(
        &self,
        _workflow: &Workflow,
        _task_namespace: &TaskNamespace,
    ) -> serde_json::Value {
        // In the future, this could include task-specific configuration
        serde_json::json!({})
    }
}