Skip to main content

celers_broker_sql/
types.rs

1//! Core type definitions for the MySQL broker
2//!
3//! This module contains all public types, enums, and data structures
4//! used throughout the celers-broker-sql crate.
5
6use celers_core::{CelersError, SerializedTask};
7use chrono::{DateTime, Datelike, Timelike, Utc};
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11/// Task state in the database
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum DbTaskState {
15    Pending,
16    Processing,
17    Completed,
18    Failed,
19    Cancelled,
20}
21
22impl std::fmt::Display for DbTaskState {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            DbTaskState::Pending => write!(f, "pending"),
26            DbTaskState::Processing => write!(f, "processing"),
27            DbTaskState::Completed => write!(f, "completed"),
28            DbTaskState::Failed => write!(f, "failed"),
29            DbTaskState::Cancelled => write!(f, "cancelled"),
30        }
31    }
32}
33
34impl std::str::FromStr for DbTaskState {
35    type Err = CelersError;
36
37    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
38        match s.to_lowercase().as_str() {
39            "pending" => Ok(DbTaskState::Pending),
40            "processing" => Ok(DbTaskState::Processing),
41            "completed" => Ok(DbTaskState::Completed),
42            "failed" => Ok(DbTaskState::Failed),
43            "cancelled" => Ok(DbTaskState::Cancelled),
44            _ => Err(CelersError::Other(format!("Unknown task state: {}", s))),
45        }
46    }
47}
48
49/// Information about a task in the database
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct TaskInfo {
52    pub id: Uuid,
53    pub task_name: String,
54    pub state: DbTaskState,
55    pub priority: i32,
56    pub retry_count: i32,
57    pub max_retries: i32,
58    pub created_at: DateTime<Utc>,
59    pub scheduled_at: DateTime<Utc>,
60    pub started_at: Option<DateTime<Utc>>,
61    pub completed_at: Option<DateTime<Utc>>,
62    pub worker_id: Option<String>,
63    pub error_message: Option<String>,
64}
65
66/// Information about a dead-lettered task
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct DlqTaskInfo {
69    pub id: Uuid,
70    pub task_id: Uuid,
71    pub task_name: String,
72    pub retry_count: i32,
73    pub error_message: Option<String>,
74    pub failed_at: DateTime<Utc>,
75}
76
77/// Database health status
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct HealthStatus {
80    pub healthy: bool,
81    pub connection_pool_size: u32,
82    pub idle_connections: u32,
83    pub pending_tasks: i64,
84    pub processing_tasks: i64,
85    pub dlq_tasks: i64,
86    pub database_version: String,
87}
88
89/// Queue statistics
90#[derive(Debug, Clone, Default, Serialize, Deserialize)]
91pub struct QueueStatistics {
92    pub pending: i64,
93    pub processing: i64,
94    pub completed: i64,
95    pub failed: i64,
96    pub cancelled: i64,
97    pub dlq: i64,
98    pub total: i64,
99}
100
101/// Task result stored in the database
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct TaskResult {
104    pub task_id: Uuid,
105    pub task_name: String,
106    pub status: TaskResultStatus,
107    pub result: Option<serde_json::Value>,
108    pub error: Option<String>,
109    pub traceback: Option<String>,
110    pub created_at: DateTime<Utc>,
111    pub completed_at: Option<DateTime<Utc>>,
112    pub runtime_ms: Option<i64>,
113}
114
115/// Task result status
116#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "lowercase")]
118pub enum TaskResultStatus {
119    Pending,
120    Started,
121    Success,
122    Failure,
123    Retry,
124    Revoked,
125}
126
127impl std::fmt::Display for TaskResultStatus {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            TaskResultStatus::Pending => write!(f, "PENDING"),
131            TaskResultStatus::Started => write!(f, "STARTED"),
132            TaskResultStatus::Success => write!(f, "SUCCESS"),
133            TaskResultStatus::Failure => write!(f, "FAILURE"),
134            TaskResultStatus::Retry => write!(f, "RETRY"),
135            TaskResultStatus::Revoked => write!(f, "REVOKED"),
136        }
137    }
138}
139
140impl std::str::FromStr for TaskResultStatus {
141    type Err = CelersError;
142
143    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
144        match s.to_uppercase().as_str() {
145            "PENDING" => Ok(TaskResultStatus::Pending),
146            "STARTED" => Ok(TaskResultStatus::Started),
147            "SUCCESS" => Ok(TaskResultStatus::Success),
148            "FAILURE" => Ok(TaskResultStatus::Failure),
149            "RETRY" => Ok(TaskResultStatus::Retry),
150            "REVOKED" => Ok(TaskResultStatus::Revoked),
151            _ => Err(CelersError::Other(format!("Unknown result status: {}", s))),
152        }
153    }
154}
155
156/// Table size information
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct TableSizeInfo {
159    pub table_name: String,
160    pub row_count: i64,
161    pub data_size_bytes: i64,
162    pub index_size_bytes: i64,
163}
164
165/// Task count by task name
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct TaskNameCount {
168    pub task_name: String,
169    pub pending: i64,
170    pub processing: i64,
171    pub completed: i64,
172    pub failed: i64,
173    pub total: i64,
174}
175
176/// Scheduled task information
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct ScheduledTaskInfo {
179    pub id: Uuid,
180    pub task_name: String,
181    pub priority: i32,
182    pub scheduled_at: DateTime<Utc>,
183    pub created_at: DateTime<Utc>,
184    pub delay_remaining_secs: i64,
185}
186
187/// Connection pool configuration
188#[derive(Debug, Clone)]
189pub struct PoolConfig {
190    /// Maximum number of connections in the pool
191    pub max_connections: u32,
192    /// Minimum number of idle connections
193    pub min_connections: u32,
194    /// Connection timeout in seconds
195    pub acquire_timeout_secs: u64,
196    /// Maximum lifetime of a connection in seconds
197    pub max_lifetime_secs: Option<u64>,
198    /// Idle timeout for connections in seconds
199    pub idle_timeout_secs: Option<u64>,
200}
201
202impl Default for PoolConfig {
203    fn default() -> Self {
204        Self {
205            max_connections: 20,
206            min_connections: 2,
207            acquire_timeout_secs: 5,
208            max_lifetime_secs: Some(1800), // 30 minutes
209            idle_timeout_secs: Some(600),  // 10 minutes
210        }
211    }
212}
213
214/// Query performance statistics
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct QueryStats {
217    pub query_name: String,
218    pub execution_count: i64,
219    pub total_time_ms: i64,
220    pub avg_time_ms: f64,
221    pub min_time_ms: i64,
222    pub max_time_ms: i64,
223}
224
225/// Index usage statistics
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct IndexStats {
228    pub table_name: String,
229    pub index_name: String,
230    pub cardinality: i64,
231    pub unique_values: bool,
232}
233
234/// Query execution plan information
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct QueryPlan {
237    pub id: i32,
238    pub select_type: String,
239    pub table: Option<String>,
240    pub query_type: Option<String>,
241    pub possible_keys: Option<String>,
242    pub key_used: Option<String>,
243    pub key_length: Option<String>,
244    pub rows_examined: Option<i64>,
245    pub filtered: Option<f64>,
246    pub extra: Option<String>,
247}
248
249/// Migration information
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct MigrationInfo {
252    pub version: String,
253    pub name: String,
254    pub applied_at: DateTime<Utc>,
255}
256
257/// Connection pool diagnostics
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct ConnectionDiagnostics {
260    pub total_connections: u32,
261    pub idle_connections: u32,
262    pub active_connections: u32,
263    pub max_connections: u32,
264    pub connection_wait_time_ms: Option<i64>,
265    pub pool_utilization_percent: f64,
266}
267
268/// Performance metrics snapshot
269#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct PerformanceMetrics {
271    pub timestamp: DateTime<Utc>,
272    pub tasks_per_second: f64,
273    pub avg_dequeue_time_ms: f64,
274    pub avg_enqueue_time_ms: f64,
275    pub queue_depth: i64,
276    pub processing_tasks: i64,
277    pub dlq_size: i64,
278    pub connection_pool: ConnectionDiagnostics,
279}
280
281/// Task chain builder for creating dependent task sequences
282#[derive(Debug, Clone)]
283pub struct TaskChain {
284    tasks: Vec<SerializedTask>,
285    delay_between_secs: Option<u64>,
286}
287
288impl TaskChain {
289    /// Create a new task chain
290    pub fn new() -> Self {
291        Self {
292            tasks: Vec::new(),
293            delay_between_secs: None,
294        }
295    }
296
297    /// Add a task to the chain
298    pub fn then(mut self, task: SerializedTask) -> Self {
299        self.tasks.push(task);
300        self
301    }
302
303    /// Set delay between tasks in the chain (in seconds)
304    pub fn with_delay(mut self, delay_secs: u64) -> Self {
305        self.delay_between_secs = Some(delay_secs);
306        self
307    }
308
309    /// Get the tasks in the chain
310    pub fn tasks(&self) -> &[SerializedTask] {
311        &self.tasks
312    }
313
314    /// Get the delay between tasks
315    pub fn delay_between_secs(&self) -> Option<u64> {
316        self.delay_between_secs
317    }
318}
319
320impl Default for TaskChain {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326/// Worker statistics for monitoring distributed workers
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct WorkerStatistics {
329    pub worker_id: String,
330    pub active_tasks: i64,
331    pub completed_tasks: i64,
332    pub failed_tasks: i64,
333    pub last_seen: DateTime<Utc>,
334    pub avg_task_duration_secs: f64,
335}
336
337/// Task age distribution for queue health monitoring
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct TaskAgeDistribution {
340    pub bucket_label: String,
341    pub task_count: i64,
342    pub oldest_task_age_secs: i64,
343}
344
345/// Retry statistics for understanding task failure patterns
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct RetryStatistics {
348    pub task_name: String,
349    pub total_retries: i64,
350    pub unique_tasks: i64,
351    pub avg_retries_per_task: f64,
352    pub max_retries_observed: i32,
353}
354
355/// Queue health summary combining multiple metrics
356#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct QueueHealth {
358    pub overall_status: String, // "healthy", "degraded", "critical"
359    pub pending_tasks: i64,
360    pub processing_tasks: i64,
361    pub oldest_pending_age_secs: i64,
362    pub active_workers: i64,
363    pub queue_backlog_minutes: f64,
364}
365
366/// Task throughput metrics
367#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct TaskThroughput {
369    pub completed_last_minute: i64,
370    pub completed_last_hour: i64,
371    pub failed_last_minute: i64,
372    pub failed_last_hour: i64,
373    pub tasks_per_second: f64,
374}
375
376/// Dead Letter Queue statistics
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct DlqStatistics {
379    pub total_tasks: i64,
380    pub by_task_name: Vec<DlqTaskStats>,
381}
382
383/// DLQ statistics per task name
384#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct DlqTaskStats {
386    pub task_name: String,
387    pub count: i64,
388    pub avg_retries: Option<f64>,
389    pub max_retries: i32,
390}
391
392/// Task progress information for long-running tasks
393#[derive(Debug, Clone, Serialize, Deserialize)]
394pub struct TaskProgress {
395    pub task_id: Uuid,
396    pub progress_percent: f64,
397    pub current_step: Option<String>,
398    pub total_steps: Option<i32>,
399    pub updated_at: DateTime<Utc>,
400}
401
402/// Rate limit configuration per task type
403#[derive(Debug, Clone, Serialize, Deserialize)]
404pub struct RateLimit {
405    pub task_name: String,
406    pub max_per_second: f64,
407    pub max_per_minute: i64,
408    pub max_per_hour: i64,
409}
410
411/// Rate limit status showing current usage
412#[derive(Debug, Clone, Serialize, Deserialize)]
413pub struct RateLimitStatus {
414    pub task_name: String,
415    pub current_per_second: f64,
416    pub current_per_minute: i64,
417    pub current_per_hour: i64,
418    pub limit_exceeded: bool,
419}
420
421/// Recurring task schedule configuration
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct RecurringTaskConfig {
424    pub task_name: String,
425    pub schedule: RecurringSchedule,
426    pub payload: Vec<u8>,
427    pub priority: i32,
428    pub enabled: bool,
429    pub last_run: Option<DateTime<Utc>>,
430    pub next_run: DateTime<Utc>,
431}
432
433/// Recurring schedule types
434#[derive(Debug, Clone, Serialize, Deserialize)]
435pub enum RecurringSchedule {
436    /// Run every N seconds
437    EverySeconds(u64),
438    /// Run every N minutes
439    EveryMinutes(u64),
440    /// Run every N hours
441    EveryHours(u64),
442    /// Run every N days at specific time (hour, minute)
443    EveryDays(u64, u32, u32),
444    /// Run on specific day of week (0=Sunday) at specific time
445    Weekly(u32, u32, u32),
446    /// Run on specific day of month at specific time
447    Monthly(u32, u32, u32),
448}
449
450impl RecurringSchedule {
451    /// Calculate next run time from a given timestamp
452    pub fn next_run_from(&self, from: DateTime<Utc>) -> DateTime<Utc> {
453        match self {
454            RecurringSchedule::EverySeconds(secs) => from + chrono::Duration::seconds(*secs as i64),
455            RecurringSchedule::EveryMinutes(mins) => from + chrono::Duration::minutes(*mins as i64),
456            RecurringSchedule::EveryHours(hours) => from + chrono::Duration::hours(*hours as i64),
457            RecurringSchedule::EveryDays(days, hour, minute) => {
458                let mut next = from + chrono::Duration::days(*days as i64);
459                next = next
460                    .with_hour(*hour)
461                    .and_then(|dt| dt.with_minute(*minute))
462                    .and_then(|dt| dt.with_second(0))
463                    .unwrap_or(next);
464                if next <= from {
465                    next += chrono::Duration::days(1);
466                }
467                next
468            }
469            RecurringSchedule::Weekly(day_of_week, hour, minute) => {
470                let mut next = from;
471                let current_weekday = from.weekday().num_days_from_sunday();
472                let days_until = ((*day_of_week + 7 - current_weekday) % 7) as i64;
473                next += chrono::Duration::days(if days_until == 0 { 7 } else { days_until });
474                next = next
475                    .with_hour(*hour)
476                    .and_then(|dt| dt.with_minute(*minute))
477                    .and_then(|dt| dt.with_second(0))
478                    .unwrap_or(next);
479                next
480            }
481            RecurringSchedule::Monthly(day, hour, minute) => {
482                let mut next = from;
483                if let Some(dt) = next
484                    .with_day(*day)
485                    .and_then(|dt| dt.with_hour(*hour))
486                    .and_then(|dt| dt.with_minute(*minute))
487                    .and_then(|dt| dt.with_second(0))
488                {
489                    next = dt;
490                    if next <= from {
491                        // Move to next month
492                        next += chrono::Duration::days(30);
493                        next = next
494                            .with_day(*day)
495                            .and_then(|dt| dt.with_hour(*hour))
496                            .and_then(|dt| dt.with_minute(*minute))
497                            .and_then(|dt| dt.with_second(0))
498                            .unwrap_or(next);
499                    }
500                }
501                next
502            }
503        }
504    }
505}
506
507/// Advanced retry policy configuration
508#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct RetryPolicy {
510    /// Maximum number of retries
511    pub max_retries: u32,
512    /// Retry strategy
513    pub strategy: RetryStrategy,
514}
515
516/// Retry strategy types
517#[derive(Debug, Clone, Serialize, Deserialize)]
518pub enum RetryStrategy {
519    /// Fixed delay between retries (seconds)
520    Fixed(u64),
521    /// Linear backoff: delay = attempt * base_delay
522    Linear { base_delay_secs: u64 },
523    /// Exponential backoff: delay = base * (multiplier ^ attempt)
524    Exponential {
525        base_delay_secs: u64,
526        multiplier: f64,
527        max_delay_secs: u64,
528    },
529    /// Exponential backoff with jitter to avoid thundering herd
530    ExponentialWithJitter {
531        base_delay_secs: u64,
532        multiplier: f64,
533        max_delay_secs: u64,
534    },
535}
536
537impl RetryStrategy {
538    /// Calculate delay in seconds for a given retry attempt
539    pub fn calculate_delay(&self, attempt: u32) -> u64 {
540        match self {
541            RetryStrategy::Fixed(delay) => *delay,
542            RetryStrategy::Linear { base_delay_secs } => base_delay_secs * (attempt as u64 + 1),
543            RetryStrategy::Exponential {
544                base_delay_secs,
545                multiplier,
546                max_delay_secs,
547            } => {
548                let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
549                delay.min(*max_delay_secs as f64) as u64
550            }
551            RetryStrategy::ExponentialWithJitter {
552                base_delay_secs,
553                multiplier,
554                max_delay_secs,
555            } => {
556                let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
557                let max_delay = delay.min(*max_delay_secs as f64);
558                // Add random jitter (0-25% of delay)
559                let jitter = (max_delay * 0.25 * (attempt as f64 % 1.0).abs()) as u64;
560                (max_delay as u64).saturating_sub(jitter)
561            }
562        }
563    }
564}
565
566impl Default for RetryPolicy {
567    fn default() -> Self {
568        Self {
569            max_retries: 3,
570            strategy: RetryStrategy::ExponentialWithJitter {
571                base_delay_secs: 1,
572                multiplier: 2.0,
573                max_delay_secs: 300, // 5 minutes max
574            },
575        }
576    }
577}