1use async_trait::async_trait;
19use celers_core::{Broker, BrokerMessage, CelersError, Result, SerializedTask, TaskId};
20use chrono::{DateTime, Datelike, Timelike, Utc};
21use serde::{Deserialize, Serialize};
22use serde_json::json;
23use sqlx::{mysql::MySqlPoolOptions, MySqlPool, Row};
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::{Arc, RwLock};
26use std::time::Duration;
27use uuid::Uuid;
28
29#[cfg(feature = "metrics")]
30use celers_metrics::{
31 DLQ_SIZE, PROCESSING_QUEUE_SIZE, QUEUE_SIZE, TASKS_ENQUEUED_BY_TYPE, TASKS_ENQUEUED_TOTAL,
32};
33
34pub mod monitoring;
35pub mod utilities;
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39#[serde(rename_all = "lowercase")]
40pub enum DbTaskState {
41 Pending,
42 Processing,
43 Completed,
44 Failed,
45 Cancelled,
46}
47
48impl std::fmt::Display for DbTaskState {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 DbTaskState::Pending => write!(f, "pending"),
52 DbTaskState::Processing => write!(f, "processing"),
53 DbTaskState::Completed => write!(f, "completed"),
54 DbTaskState::Failed => write!(f, "failed"),
55 DbTaskState::Cancelled => write!(f, "cancelled"),
56 }
57 }
58}
59
60impl std::str::FromStr for DbTaskState {
61 type Err = CelersError;
62
63 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
64 match s.to_lowercase().as_str() {
65 "pending" => Ok(DbTaskState::Pending),
66 "processing" => Ok(DbTaskState::Processing),
67 "completed" => Ok(DbTaskState::Completed),
68 "failed" => Ok(DbTaskState::Failed),
69 "cancelled" => Ok(DbTaskState::Cancelled),
70 _ => Err(CelersError::Other(format!("Unknown task state: {}", s))),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct TaskInfo {
78 pub id: Uuid,
79 pub task_name: String,
80 pub state: DbTaskState,
81 pub priority: i32,
82 pub retry_count: i32,
83 pub max_retries: i32,
84 pub created_at: DateTime<Utc>,
85 pub scheduled_at: DateTime<Utc>,
86 pub started_at: Option<DateTime<Utc>>,
87 pub completed_at: Option<DateTime<Utc>>,
88 pub worker_id: Option<String>,
89 pub error_message: Option<String>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct DlqTaskInfo {
95 pub id: Uuid,
96 pub task_id: Uuid,
97 pub task_name: String,
98 pub retry_count: i32,
99 pub error_message: Option<String>,
100 pub failed_at: DateTime<Utc>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct HealthStatus {
106 pub healthy: bool,
107 pub connection_pool_size: u32,
108 pub idle_connections: u32,
109 pub pending_tasks: i64,
110 pub processing_tasks: i64,
111 pub dlq_tasks: i64,
112 pub database_version: String,
113}
114
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
117pub struct QueueStatistics {
118 pub pending: i64,
119 pub processing: i64,
120 pub completed: i64,
121 pub failed: i64,
122 pub cancelled: i64,
123 pub dlq: i64,
124 pub total: i64,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct TaskResult {
130 pub task_id: Uuid,
131 pub task_name: String,
132 pub status: TaskResultStatus,
133 pub result: Option<serde_json::Value>,
134 pub error: Option<String>,
135 pub traceback: Option<String>,
136 pub created_at: DateTime<Utc>,
137 pub completed_at: Option<DateTime<Utc>>,
138 pub runtime_ms: Option<i64>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143#[serde(rename_all = "lowercase")]
144pub enum TaskResultStatus {
145 Pending,
146 Started,
147 Success,
148 Failure,
149 Retry,
150 Revoked,
151}
152
153impl std::fmt::Display for TaskResultStatus {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 TaskResultStatus::Pending => write!(f, "PENDING"),
157 TaskResultStatus::Started => write!(f, "STARTED"),
158 TaskResultStatus::Success => write!(f, "SUCCESS"),
159 TaskResultStatus::Failure => write!(f, "FAILURE"),
160 TaskResultStatus::Retry => write!(f, "RETRY"),
161 TaskResultStatus::Revoked => write!(f, "REVOKED"),
162 }
163 }
164}
165
166impl std::str::FromStr for TaskResultStatus {
167 type Err = CelersError;
168
169 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
170 match s.to_uppercase().as_str() {
171 "PENDING" => Ok(TaskResultStatus::Pending),
172 "STARTED" => Ok(TaskResultStatus::Started),
173 "SUCCESS" => Ok(TaskResultStatus::Success),
174 "FAILURE" => Ok(TaskResultStatus::Failure),
175 "RETRY" => Ok(TaskResultStatus::Retry),
176 "REVOKED" => Ok(TaskResultStatus::Revoked),
177 _ => Err(CelersError::Other(format!("Unknown result status: {}", s))),
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct TableSizeInfo {
185 pub table_name: String,
186 pub row_count: i64,
187 pub data_size_bytes: i64,
188 pub index_size_bytes: i64,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct TaskNameCount {
194 pub task_name: String,
195 pub pending: i64,
196 pub processing: i64,
197 pub completed: i64,
198 pub failed: i64,
199 pub total: i64,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ScheduledTaskInfo {
205 pub id: Uuid,
206 pub task_name: String,
207 pub priority: i32,
208 pub scheduled_at: DateTime<Utc>,
209 pub created_at: DateTime<Utc>,
210 pub delay_remaining_secs: i64,
211}
212
213#[derive(Debug, Clone)]
215pub struct PoolConfig {
216 pub max_connections: u32,
218 pub min_connections: u32,
220 pub acquire_timeout_secs: u64,
222 pub max_lifetime_secs: Option<u64>,
224 pub idle_timeout_secs: Option<u64>,
226}
227
228impl Default for PoolConfig {
229 fn default() -> Self {
230 Self {
231 max_connections: 20,
232 min_connections: 2,
233 acquire_timeout_secs: 5,
234 max_lifetime_secs: Some(1800), idle_timeout_secs: Some(600), }
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct QueryStats {
243 pub query_name: String,
244 pub execution_count: i64,
245 pub total_time_ms: i64,
246 pub avg_time_ms: f64,
247 pub min_time_ms: i64,
248 pub max_time_ms: i64,
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct IndexStats {
254 pub table_name: String,
255 pub index_name: String,
256 pub cardinality: i64,
257 pub unique_values: bool,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct QueryPlan {
263 pub id: i32,
264 pub select_type: String,
265 pub table: Option<String>,
266 pub query_type: Option<String>,
267 pub possible_keys: Option<String>,
268 pub key_used: Option<String>,
269 pub key_length: Option<String>,
270 pub rows_examined: Option<i64>,
271 pub filtered: Option<f64>,
272 pub extra: Option<String>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct MigrationInfo {
278 pub version: String,
279 pub name: String,
280 pub applied_at: DateTime<Utc>,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct ConnectionDiagnostics {
286 pub total_connections: u32,
287 pub idle_connections: u32,
288 pub active_connections: u32,
289 pub max_connections: u32,
290 pub connection_wait_time_ms: Option<i64>,
291 pub pool_utilization_percent: f64,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct PerformanceMetrics {
297 pub timestamp: DateTime<Utc>,
298 pub tasks_per_second: f64,
299 pub avg_dequeue_time_ms: f64,
300 pub avg_enqueue_time_ms: f64,
301 pub queue_depth: i64,
302 pub processing_tasks: i64,
303 pub dlq_size: i64,
304 pub connection_pool: ConnectionDiagnostics,
305}
306
307#[derive(Debug, Clone)]
309pub struct TaskChain {
310 tasks: Vec<SerializedTask>,
311 delay_between_secs: Option<u64>,
312}
313
314impl TaskChain {
315 pub fn new() -> Self {
317 Self {
318 tasks: Vec::new(),
319 delay_between_secs: None,
320 }
321 }
322
323 pub fn then(mut self, task: SerializedTask) -> Self {
325 self.tasks.push(task);
326 self
327 }
328
329 pub fn with_delay(mut self, delay_secs: u64) -> Self {
331 self.delay_between_secs = Some(delay_secs);
332 self
333 }
334
335 pub fn tasks(&self) -> &[SerializedTask] {
337 &self.tasks
338 }
339
340 pub fn delay_between_secs(&self) -> Option<u64> {
342 self.delay_between_secs
343 }
344}
345
346impl Default for TaskChain {
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352#[derive(Debug, Clone, Serialize, Deserialize)]
354pub struct WorkerStatistics {
355 pub worker_id: String,
356 pub active_tasks: i64,
357 pub completed_tasks: i64,
358 pub failed_tasks: i64,
359 pub last_seen: DateTime<Utc>,
360 pub avg_task_duration_secs: f64,
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct TaskAgeDistribution {
366 pub bucket_label: String,
367 pub task_count: i64,
368 pub oldest_task_age_secs: i64,
369}
370
371#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct RetryStatistics {
374 pub task_name: String,
375 pub total_retries: i64,
376 pub unique_tasks: i64,
377 pub avg_retries_per_task: f64,
378 pub max_retries_observed: i32,
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct QueueHealth {
384 pub overall_status: String, pub pending_tasks: i64,
386 pub processing_tasks: i64,
387 pub oldest_pending_age_secs: i64,
388 pub active_workers: i64,
389 pub queue_backlog_minutes: f64,
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize)]
394pub struct TaskThroughput {
395 pub completed_last_minute: i64,
396 pub completed_last_hour: i64,
397 pub failed_last_minute: i64,
398 pub failed_last_hour: i64,
399 pub tasks_per_second: f64,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize)]
404pub struct DlqStatistics {
405 pub total_tasks: i64,
406 pub by_task_name: Vec<DlqTaskStats>,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct DlqTaskStats {
412 pub task_name: String,
413 pub count: i64,
414 pub avg_retries: Option<f64>,
415 pub max_retries: i32,
416}
417
418#[derive(Debug, Clone, Serialize, Deserialize)]
420pub struct TaskProgress {
421 pub task_id: Uuid,
422 pub progress_percent: f64,
423 pub current_step: Option<String>,
424 pub total_steps: Option<i32>,
425 pub updated_at: DateTime<Utc>,
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct RateLimit {
431 pub task_name: String,
432 pub max_per_second: f64,
433 pub max_per_minute: i64,
434 pub max_per_hour: i64,
435}
436
437#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct RateLimitStatus {
440 pub task_name: String,
441 pub current_per_second: f64,
442 pub current_per_minute: i64,
443 pub current_per_hour: i64,
444 pub limit_exceeded: bool,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
449pub struct RecurringTaskConfig {
450 pub task_name: String,
451 pub schedule: RecurringSchedule,
452 pub payload: Vec<u8>,
453 pub priority: i32,
454 pub enabled: bool,
455 pub last_run: Option<DateTime<Utc>>,
456 pub next_run: DateTime<Utc>,
457}
458
459#[derive(Debug, Clone, Serialize, Deserialize)]
461pub enum RecurringSchedule {
462 EverySeconds(u64),
464 EveryMinutes(u64),
466 EveryHours(u64),
468 EveryDays(u64, u32, u32),
470 Weekly(u32, u32, u32),
472 Monthly(u32, u32, u32),
474}
475
476impl RecurringSchedule {
477 pub fn next_run_from(&self, from: DateTime<Utc>) -> DateTime<Utc> {
479 match self {
480 RecurringSchedule::EverySeconds(secs) => from + chrono::Duration::seconds(*secs as i64),
481 RecurringSchedule::EveryMinutes(mins) => from + chrono::Duration::minutes(*mins as i64),
482 RecurringSchedule::EveryHours(hours) => from + chrono::Duration::hours(*hours as i64),
483 RecurringSchedule::EveryDays(days, hour, minute) => {
484 let mut next = from + chrono::Duration::days(*days as i64);
485 next = next
486 .with_hour(*hour)
487 .and_then(|dt| dt.with_minute(*minute))
488 .and_then(|dt| dt.with_second(0))
489 .unwrap_or(next);
490 if next <= from {
491 next += chrono::Duration::days(1);
492 }
493 next
494 }
495 RecurringSchedule::Weekly(day_of_week, hour, minute) => {
496 let mut next = from;
497 let current_weekday = from.weekday().num_days_from_sunday();
498 let days_until = ((*day_of_week + 7 - current_weekday) % 7) as i64;
499 next += chrono::Duration::days(if days_until == 0 { 7 } else { days_until });
500 next = next
501 .with_hour(*hour)
502 .and_then(|dt| dt.with_minute(*minute))
503 .and_then(|dt| dt.with_second(0))
504 .unwrap_or(next);
505 next
506 }
507 RecurringSchedule::Monthly(day, hour, minute) => {
508 let mut next = from;
509 if let Some(dt) = next
510 .with_day(*day)
511 .and_then(|dt| dt.with_hour(*hour))
512 .and_then(|dt| dt.with_minute(*minute))
513 .and_then(|dt| dt.with_second(0))
514 {
515 next = dt;
516 if next <= from {
517 next += chrono::Duration::days(30);
519 next = next
520 .with_day(*day)
521 .and_then(|dt| dt.with_hour(*hour))
522 .and_then(|dt| dt.with_minute(*minute))
523 .and_then(|dt| dt.with_second(0))
524 .unwrap_or(next);
525 }
526 }
527 next
528 }
529 }
530 }
531}
532
533#[derive(Debug, Clone, Serialize, Deserialize)]
535pub struct RetryPolicy {
536 pub max_retries: u32,
538 pub strategy: RetryStrategy,
540}
541
542#[derive(Debug, Clone, Serialize, Deserialize)]
544pub enum RetryStrategy {
545 Fixed(u64),
547 Linear { base_delay_secs: u64 },
549 Exponential {
551 base_delay_secs: u64,
552 multiplier: f64,
553 max_delay_secs: u64,
554 },
555 ExponentialWithJitter {
557 base_delay_secs: u64,
558 multiplier: f64,
559 max_delay_secs: u64,
560 },
561}
562
563impl RetryStrategy {
564 pub fn calculate_delay(&self, attempt: u32) -> u64 {
566 match self {
567 RetryStrategy::Fixed(delay) => *delay,
568 RetryStrategy::Linear { base_delay_secs } => base_delay_secs * (attempt as u64 + 1),
569 RetryStrategy::Exponential {
570 base_delay_secs,
571 multiplier,
572 max_delay_secs,
573 } => {
574 let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
575 delay.min(*max_delay_secs as f64) as u64
576 }
577 RetryStrategy::ExponentialWithJitter {
578 base_delay_secs,
579 multiplier,
580 max_delay_secs,
581 } => {
582 let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
583 let max_delay = delay.min(*max_delay_secs as f64);
584 let jitter = (max_delay * 0.25 * (attempt as f64 % 1.0).abs()) as u64;
586 (max_delay as u64).saturating_sub(jitter)
587 }
588 }
589 }
590}
591
592impl Default for RetryPolicy {
593 fn default() -> Self {
594 Self {
595 max_retries: 3,
596 strategy: RetryStrategy::ExponentialWithJitter {
597 base_delay_secs: 1,
598 multiplier: 2.0,
599 max_delay_secs: 300, },
601 }
602 }
603}
604
605#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
607pub enum CircuitBreakerState {
608 Closed, Open, HalfOpen, }
612
613#[derive(Debug, Clone, Serialize, Deserialize)]
615pub struct CircuitBreakerStats {
616 pub state: CircuitBreakerState,
617 pub failure_count: u64,
618 pub success_count: u64,
619 pub last_failure_time: Option<DateTime<Utc>>,
620 pub last_state_change: DateTime<Utc>,
621}
622
623#[derive(Debug, Clone)]
625pub struct CircuitBreakerConfig {
626 pub failure_threshold: u64,
628 pub timeout_secs: u64,
630 pub success_threshold: u64,
632}
633
634impl Default for CircuitBreakerConfig {
635 fn default() -> Self {
636 Self {
637 failure_threshold: 5,
638 timeout_secs: 60,
639 success_threshold: 2,
640 }
641 }
642}
643
644#[derive(Debug, Clone)]
646struct CircuitBreakerStateInternal {
647 state: CircuitBreakerState,
648 failure_count: u64,
649 success_count: u64,
650 last_failure_time: Option<DateTime<Utc>>,
651 last_state_change: DateTime<Utc>,
652 config: CircuitBreakerConfig,
653}
654
655impl CircuitBreakerStateInternal {
656 fn new(config: CircuitBreakerConfig) -> Self {
657 Self {
658 state: CircuitBreakerState::Closed,
659 failure_count: 0,
660 success_count: 0,
661 last_failure_time: None,
662 last_state_change: Utc::now(),
663 config,
664 }
665 }
666}
667
668#[derive(Debug, Clone, Serialize, Deserialize)]
670pub struct IdempotencyRecord {
671 pub id: Uuid,
672 pub idempotency_key: String,
673 pub task_name: String,
674 pub task_id: Uuid,
675 pub created_at: DateTime<Utc>,
676 pub expires_at: DateTime<Utc>,
677 pub metadata: Option<serde_json::Value>,
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct IdempotencyStats {
683 pub task_name: String,
684 pub total_keys: i64,
685 pub unique_keys: i64,
686 pub active_keys: i64,
687 pub expired_keys: i64,
688 pub oldest_key: Option<DateTime<Utc>>,
689 pub newest_key: Option<DateTime<Utc>>,
690}
691
692#[derive(Debug, Clone)]
694pub struct IdempotencyConfig {
695 pub default_ttl_secs: u64,
697 pub auto_cleanup: bool,
699}
700
701impl Default for IdempotencyConfig {
702 fn default() -> Self {
703 Self {
704 default_ttl_secs: 86400, auto_cleanup: true,
706 }
707 }
708}
709
710#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
712#[serde(rename_all = "lowercase")]
713pub enum WorkflowState {
714 Pending,
715 Running,
716 Completed,
717 Failed,
718 Cancelled,
719}
720
721impl std::fmt::Display for WorkflowState {
722 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723 match self {
724 WorkflowState::Pending => write!(f, "pending"),
725 WorkflowState::Running => write!(f, "running"),
726 WorkflowState::Completed => write!(f, "completed"),
727 WorkflowState::Failed => write!(f, "failed"),
728 WorkflowState::Cancelled => write!(f, "cancelled"),
729 }
730 }
731}
732
733#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
735#[serde(rename_all = "lowercase")]
736pub enum StageState {
737 Pending,
738 Running,
739 Completed,
740 Failed,
741 Skipped,
742}
743
744impl std::fmt::Display for StageState {
745 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
746 match self {
747 StageState::Pending => write!(f, "pending"),
748 StageState::Running => write!(f, "running"),
749 StageState::Completed => write!(f, "completed"),
750 StageState::Failed => write!(f, "failed"),
751 StageState::Skipped => write!(f, "skipped"),
752 }
753 }
754}
755
756#[derive(Debug, Clone, Serialize, Deserialize)]
758pub struct Workflow {
759 pub id: Uuid,
760 pub workflow_name: String,
761 pub state: WorkflowState,
762 pub config: serde_json::Value,
763 pub created_at: DateTime<Utc>,
764 pub started_at: Option<DateTime<Utc>>,
765 pub completed_at: Option<DateTime<Utc>>,
766 pub error_message: Option<String>,
767}
768
769#[derive(Debug, Clone, Serialize, Deserialize)]
771pub struct WorkflowStage {
772 pub id: Uuid,
773 pub workflow_id: Uuid,
774 pub stage_number: i32,
775 pub stage_name: String,
776 pub state: StageState,
777 pub task_count: i32,
778 pub completed_count: i32,
779 pub failed_count: i32,
780 pub started_at: Option<DateTime<Utc>>,
781 pub completed_at: Option<DateTime<Utc>>,
782}
783
784#[derive(Debug, Clone, Serialize, Deserialize)]
786pub struct TaskDependency {
787 pub id: Uuid,
788 pub task_id: Uuid,
789 pub parent_task_id: Uuid,
790 pub workflow_id: Option<Uuid>,
791 pub stage_id: Option<Uuid>,
792 pub satisfied: bool,
793 pub created_at: DateTime<Utc>,
794 pub satisfied_at: Option<DateTime<Utc>>,
795}
796
797#[derive(Debug, Clone, Serialize, Deserialize)]
799pub struct WorkflowStatistics {
800 pub workflow_id: Uuid,
801 pub workflow_name: String,
802 pub workflow_state: WorkflowState,
803 pub created_at: DateTime<Utc>,
804 pub started_at: Option<DateTime<Utc>>,
805 pub completed_at: Option<DateTime<Utc>>,
806 pub total_stages: i64,
807 pub completed_stages: i64,
808 pub failed_stages: i64,
809 pub running_stages: i64,
810 pub total_tasks: i64,
811 pub completed_tasks: i64,
812 pub failed_tasks: i64,
813 pub duration_secs: Option<i64>,
814}
815
816#[derive(Debug, Clone)]
818pub struct WorkflowBuilder {
819 workflow_name: String,
820 stages: Vec<WorkflowStageBuilder>,
821}
822
823#[derive(Debug, Clone)]
825pub struct WorkflowStageBuilder {
826 stage_name: String,
827 tasks: Vec<SerializedTask>,
828 dependencies: Vec<String>, }
830
831impl WorkflowBuilder {
832 pub fn new(workflow_name: String) -> Self {
834 Self {
835 workflow_name,
836 stages: Vec::new(),
837 }
838 }
839
840 pub fn add_stage(mut self, stage_name: String) -> Self {
842 self.stages.push(WorkflowStageBuilder {
843 stage_name,
844 tasks: Vec::new(),
845 dependencies: Vec::new(),
846 });
847 self
848 }
849
850 pub fn add_task_to_stage(mut self, task: SerializedTask) -> Self {
852 if let Some(stage) = self.stages.last_mut() {
853 stage.tasks.push(task);
854 }
855 self
856 }
857
858 pub fn add_stage_dependencies(mut self, dependencies: Vec<String>) -> Self {
860 if let Some(stage) = self.stages.last_mut() {
861 stage.dependencies = dependencies;
862 }
863 self
864 }
865
866 pub fn workflow_name(&self) -> &str {
868 &self.workflow_name
869 }
870
871 pub fn stages(&self) -> &[WorkflowStageBuilder] {
873 &self.stages
874 }
875}
876
877impl WorkflowStageBuilder {
878 pub fn stage_name(&self) -> &str {
880 &self.stage_name
881 }
882
883 pub fn tasks(&self) -> &[SerializedTask] {
885 &self.tasks
886 }
887
888 pub fn dependencies(&self) -> &[String] {
890 &self.dependencies
891 }
892}
893
894pub type HookFn = Arc<
900 dyn Fn(
901 &HookContext,
902 &SerializedTask,
903 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
904 + Send
905 + Sync,
906>;
907
908#[derive(Debug, Clone)]
910pub struct HookContext {
911 pub queue_name: String,
913 pub task_id: Option<Uuid>,
915 pub timestamp: DateTime<Utc>,
917 pub metadata: serde_json::Value,
919}
920
921#[derive(Clone)]
923pub enum TaskHook {
924 BeforeEnqueue(HookFn),
926 AfterEnqueue(HookFn),
928 BeforeDequeue(HookFn),
930 AfterDequeue(HookFn),
932 BeforeAck(HookFn),
934 AfterAck(HookFn),
936 BeforeReject(HookFn),
938 AfterReject(HookFn),
940}
941
942#[derive(Clone, Default)]
944pub struct TaskHooks {
945 before_enqueue: Vec<HookFn>,
946 after_enqueue: Vec<HookFn>,
947 before_dequeue: Vec<HookFn>,
948 after_dequeue: Vec<HookFn>,
949 before_ack: Vec<HookFn>,
950 after_ack: Vec<HookFn>,
951 before_reject: Vec<HookFn>,
952 after_reject: Vec<HookFn>,
953}
954
955impl TaskHooks {
956 pub fn new() -> Self {
958 Self::default()
959 }
960
961 pub fn add(&mut self, hook: TaskHook) {
963 match hook {
964 TaskHook::BeforeEnqueue(f) => self.before_enqueue.push(f),
965 TaskHook::AfterEnqueue(f) => self.after_enqueue.push(f),
966 TaskHook::BeforeDequeue(f) => self.before_dequeue.push(f),
967 TaskHook::AfterDequeue(f) => self.after_dequeue.push(f),
968 TaskHook::BeforeAck(f) => self.before_ack.push(f),
969 TaskHook::AfterAck(f) => self.after_ack.push(f),
970 TaskHook::BeforeReject(f) => self.before_reject.push(f),
971 TaskHook::AfterReject(f) => self.after_reject.push(f),
972 }
973 }
974
975 pub fn clear(&mut self) {
977 self.before_enqueue.clear();
978 self.after_enqueue.clear();
979 self.before_dequeue.clear();
980 self.after_dequeue.clear();
981 self.before_ack.clear();
982 self.after_ack.clear();
983 self.before_reject.clear();
984 self.after_reject.clear();
985 }
986
987 async fn run_before_enqueue(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
989 for hook in &self.before_enqueue {
990 hook(ctx, task).await?;
991 }
992 Ok(())
993 }
994
995 async fn run_after_enqueue(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
997 for hook in &self.after_enqueue {
998 hook(ctx, task).await?;
999 }
1000 Ok(())
1001 }
1002
1003 #[allow(dead_code)]
1005 async fn run_before_ack(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
1006 for hook in &self.before_ack {
1007 hook(ctx, task).await?;
1008 }
1009 Ok(())
1010 }
1011
1012 #[allow(dead_code)]
1014 async fn run_after_ack(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
1015 for hook in &self.after_ack {
1016 hook(ctx, task).await?;
1017 }
1018 Ok(())
1019 }
1020
1021 #[allow(dead_code)]
1023 async fn run_before_reject(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
1024 for hook in &self.before_reject {
1025 hook(ctx, task).await?;
1026 }
1027 Ok(())
1028 }
1029
1030 #[allow(dead_code)]
1032 async fn run_after_reject(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
1033 for hook in &self.after_reject {
1034 hook(ctx, task).await?;
1035 }
1036 Ok(())
1037 }
1038
1039 #[allow(dead_code)]
1041 async fn run_after_dequeue(&self, ctx: &HookContext, task: &SerializedTask) -> Result<()> {
1042 for hook in &self.after_dequeue {
1043 hook(ctx, task).await?;
1044 }
1045 Ok(())
1046 }
1047}
1048
1049#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1081pub struct TraceContext {
1082 pub trace_id: String,
1084 pub span_id: String,
1086 #[serde(default = "default_trace_flags")]
1088 pub trace_flags: String,
1089 #[serde(skip_serializing_if = "Option::is_none")]
1091 pub trace_state: Option<String>,
1092}
1093
1094fn default_trace_flags() -> String {
1095 "01".to_string()
1096}
1097
1098impl TraceContext {
1099 pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
1116 Self {
1117 trace_id: trace_id.into(),
1118 span_id: span_id.into(),
1119 trace_flags: default_trace_flags(),
1120 trace_state: None,
1121 }
1122 }
1123
1124 pub fn from_traceparent(traceparent: &str) -> Result<Self> {
1139 let parts: Vec<&str> = traceparent.split('-').collect();
1140 if parts.len() != 4 || parts[0] != "00" {
1141 return Err(CelersError::Other(format!(
1142 "Invalid traceparent format: {}",
1143 traceparent
1144 )));
1145 }
1146
1147 Ok(Self {
1148 trace_id: parts[1].to_string(),
1149 span_id: parts[2].to_string(),
1150 trace_flags: parts[3].to_string(),
1151 trace_state: None,
1152 })
1153 }
1154
1155 pub fn to_traceparent(&self) -> String {
1171 format!("00-{}-{}-{}", self.trace_id, self.span_id, self.trace_flags)
1172 }
1173
1174 pub fn is_sampled(&self) -> bool {
1176 self.trace_flags == "01"
1177 }
1178
1179 pub fn create_child_span(&self) -> Self {
1196 let span_id = format!(
1197 "{:016x}",
1198 uuid::Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF
1199 );
1200 Self {
1201 trace_id: self.trace_id.clone(),
1202 span_id,
1203 trace_flags: self.trace_flags.clone(),
1204 trace_state: self.trace_state.clone(),
1205 }
1206 }
1207}
1208
1209pub struct MysqlBroker {
1211 pool: MySqlPool,
1212 queue_name: String,
1213 paused: AtomicBool,
1214 circuit_breaker: Arc<RwLock<CircuitBreakerStateInternal>>,
1215 hooks: Arc<tokio::sync::RwLock<TaskHooks>>,
1216}
1217
1218impl MysqlBroker {
1219 pub async fn new(database_url: &str) -> Result<Self> {
1225 Self::with_queue(database_url, "default").await
1226 }
1227
1228 pub async fn with_queue(database_url: &str, queue_name: &str) -> Result<Self> {
1230 Self::with_config(database_url, queue_name, PoolConfig::default()).await
1231 }
1232
1233 pub async fn with_config(
1235 database_url: &str,
1236 queue_name: &str,
1237 config: PoolConfig,
1238 ) -> Result<Self> {
1239 let mut pool_options = MySqlPoolOptions::new()
1240 .max_connections(config.max_connections)
1241 .min_connections(config.min_connections)
1242 .acquire_timeout(Duration::from_secs(config.acquire_timeout_secs));
1243
1244 if let Some(max_lifetime) = config.max_lifetime_secs {
1245 pool_options = pool_options.max_lifetime(Duration::from_secs(max_lifetime));
1246 }
1247
1248 if let Some(idle_timeout) = config.idle_timeout_secs {
1249 pool_options = pool_options.idle_timeout(Duration::from_secs(idle_timeout));
1250 }
1251
1252 let pool = pool_options
1253 .connect(database_url)
1254 .await
1255 .map_err(|e| CelersError::Other(format!("Failed to connect to database: {}", e)))?;
1256
1257 Ok(Self {
1258 pool,
1259 queue_name: queue_name.to_string(),
1260 paused: AtomicBool::new(false),
1261 circuit_breaker: Arc::new(RwLock::new(CircuitBreakerStateInternal::new(
1262 CircuitBreakerConfig::default(),
1263 ))),
1264 hooks: Arc::new(tokio::sync::RwLock::new(TaskHooks::new())),
1265 })
1266 }
1267
1268 pub async fn with_circuit_breaker_config(
1270 database_url: &str,
1271 queue_name: &str,
1272 pool_config: PoolConfig,
1273 circuit_breaker_config: CircuitBreakerConfig,
1274 ) -> Result<Self> {
1275 let mut pool_options = MySqlPoolOptions::new()
1276 .max_connections(pool_config.max_connections)
1277 .min_connections(pool_config.min_connections)
1278 .acquire_timeout(Duration::from_secs(pool_config.acquire_timeout_secs));
1279
1280 if let Some(max_lifetime) = pool_config.max_lifetime_secs {
1281 pool_options = pool_options.max_lifetime(Duration::from_secs(max_lifetime));
1282 }
1283
1284 if let Some(idle_timeout) = pool_config.idle_timeout_secs {
1285 pool_options = pool_options.idle_timeout(Duration::from_secs(idle_timeout));
1286 }
1287
1288 let pool = pool_options
1289 .connect(database_url)
1290 .await
1291 .map_err(|e| CelersError::Other(format!("Failed to connect to database: {}", e)))?;
1292
1293 Ok(Self {
1294 pool,
1295 queue_name: queue_name.to_string(),
1296 paused: AtomicBool::new(false),
1297 circuit_breaker: Arc::new(RwLock::new(CircuitBreakerStateInternal::new(
1298 circuit_breaker_config,
1299 ))),
1300 hooks: Arc::new(tokio::sync::RwLock::new(TaskHooks::new())),
1301 })
1302 }
1303
1304 pub async fn migrate(&self) -> Result<()> {
1306 self.run_migration_untracked(include_str!("../migrations/000_migrations.sql"))
1308 .await?;
1309
1310 self.run_migration_tracked(
1312 "001",
1313 "initial_schema",
1314 include_str!("../migrations/001_init.sql"),
1315 )
1316 .await?;
1317
1318 self.run_migration_tracked(
1319 "002",
1320 "results_table",
1321 include_str!("../migrations/002_results.sql"),
1322 )
1323 .await?;
1324
1325 self.run_migration_tracked(
1326 "003",
1327 "performance_indexes",
1328 include_str!("../migrations/003_performance_indexes.sql"),
1329 )
1330 .await?;
1331
1332 self.run_migration_tracked(
1333 "006",
1334 "idempotency_keys",
1335 include_str!("../migrations/006_idempotency.sql"),
1336 )
1337 .await?;
1338
1339 self.run_migration_tracked(
1340 "007",
1341 "workflow_dag",
1342 include_str!("../migrations/007_workflow.sql"),
1343 )
1344 .await?;
1345
1346 self.run_migration_tracked(
1347 "008",
1348 "production_features",
1349 include_str!("../migrations/008_production_features.sql"),
1350 )
1351 .await?;
1352
1353 Ok(())
1354 }
1355
1356 async fn is_migration_applied(&self, version: &str) -> Result<bool> {
1358 let count: i64 =
1359 sqlx::query_scalar("SELECT COUNT(*) FROM celers_migrations WHERE version = ?")
1360 .bind(version)
1361 .fetch_one(&self.pool)
1362 .await
1363 .map_err(|e| {
1364 CelersError::Other(format!("Failed to check migration status: {}", e))
1365 })?;
1366
1367 Ok(count > 0)
1368 }
1369
1370 async fn mark_migration_applied(&self, version: &str, name: &str) -> Result<()> {
1372 sqlx::query("INSERT INTO celers_migrations (version, name) VALUES (?, ?)")
1373 .bind(version)
1374 .bind(name)
1375 .execute(&self.pool)
1376 .await
1377 .map_err(|e| {
1378 CelersError::Other(format!("Failed to mark migration as applied: {}", e))
1379 })?;
1380
1381 tracing::info!(version = %version, name = %name, "Migration applied");
1382 Ok(())
1383 }
1384
1385 async fn run_migration_tracked(
1387 &self,
1388 version: &str,
1389 name: &str,
1390 migration_sql: &str,
1391 ) -> Result<()> {
1392 if self.is_migration_applied(version).await? {
1394 tracing::debug!(version = %version, name = %name, "Migration already applied, skipping");
1395 return Ok(());
1396 }
1397
1398 self.run_migration_untracked(migration_sql).await?;
1400
1401 self.mark_migration_applied(version, name).await?;
1403
1404 Ok(())
1405 }
1406
1407 async fn run_migration_untracked(&self, migration_sql: &str) -> Result<()> {
1409 self.run_migration(migration_sql).await
1410 }
1411
1412 async fn run_migration(&self, migration_sql: &str) -> Result<()> {
1414 let statements: Vec<&str> = migration_sql.split("DELIMITER //").collect();
1417
1418 if let Some(main_sql) = statements.first() {
1420 for statement in main_sql.split(';') {
1421 let trimmed = statement.trim();
1422 if !trimmed.is_empty() && !trimmed.starts_with("--") {
1423 sqlx::query(trimmed)
1424 .execute(&self.pool)
1425 .await
1426 .map_err(|e| CelersError::Other(format!("Migration failed: {}", e)))?;
1427 }
1428 }
1429 }
1430
1431 if statements.len() > 1 {
1433 let proc_section = statements[1];
1434 if let Some(proc_sql) = proc_section.split("DELIMITER ;").next() {
1435 let trimmed = proc_sql.trim();
1436 if !trimmed.is_empty() {
1437 sqlx::query(trimmed)
1438 .execute(&self.pool)
1439 .await
1440 .map_err(|e| {
1441 CelersError::Other(format!("Stored procedure creation failed: {}", e))
1442 })?;
1443 }
1444 }
1445 }
1446
1447 Ok(())
1448 }
1449
1450 pub fn pool(&self) -> &MySqlPool {
1452 &self.pool
1453 }
1454
1455 async fn move_to_dlq(&self, task_id: &TaskId) -> Result<()> {
1457 sqlx::query("CALL move_to_dlq(?)")
1458 .bind(task_id.to_string())
1459 .execute(&self.pool)
1460 .await
1461 .map_err(|e| CelersError::Other(format!("Failed to move task to DLQ: {}", e)))?;
1462
1463 Ok(())
1464 }
1465
1466 pub fn pause(&self) {
1470 self.paused.store(true, Ordering::SeqCst);
1471 tracing::info!(queue = %self.queue_name, "Queue paused");
1472 }
1473
1474 pub fn resume(&self) {
1476 self.paused.store(false, Ordering::SeqCst);
1477 tracing::info!(queue = %self.queue_name, "Queue resumed");
1478 }
1479
1480 pub fn is_paused(&self) -> bool {
1482 self.paused.load(Ordering::SeqCst)
1483 }
1484
1485 pub async fn get_task(&self, task_id: &TaskId) -> Result<Option<TaskInfo>> {
1489 let row = sqlx::query(
1490 r#"
1491 SELECT id, task_name, state, priority, retry_count, max_retries,
1492 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
1493 FROM celers_tasks
1494 WHERE id = ?
1495 "#,
1496 )
1497 .bind(task_id.to_string())
1498 .fetch_optional(&self.pool)
1499 .await
1500 .map_err(|e| CelersError::Other(format!("Failed to get task: {}", e)))?;
1501
1502 match row {
1503 Some(row) => {
1504 let task_id_str: String = row.get("id");
1505 let state_str: String = row.get("state");
1506 Ok(Some(TaskInfo {
1507 id: Uuid::parse_str(&task_id_str)
1508 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
1509 task_name: row.get("task_name"),
1510 state: state_str.parse()?,
1511 priority: row.get("priority"),
1512 retry_count: row.get("retry_count"),
1513 max_retries: row.get("max_retries"),
1514 created_at: row.get("created_at"),
1515 scheduled_at: row.get("scheduled_at"),
1516 started_at: row.get("started_at"),
1517 completed_at: row.get("completed_at"),
1518 worker_id: row.get("worker_id"),
1519 error_message: row.get("error_message"),
1520 }))
1521 }
1522 None => Ok(None),
1523 }
1524 }
1525
1526 pub async fn list_tasks(
1528 &self,
1529 state: Option<DbTaskState>,
1530 limit: i64,
1531 offset: i64,
1532 ) -> Result<Vec<TaskInfo>> {
1533 let rows = match state {
1534 Some(s) => {
1535 sqlx::query(
1536 r#"
1537 SELECT id, task_name, state, priority, retry_count, max_retries,
1538 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
1539 FROM celers_tasks
1540 WHERE state = ?
1541 ORDER BY created_at DESC
1542 LIMIT ? OFFSET ?
1543 "#,
1544 )
1545 .bind(s.to_string())
1546 .bind(limit)
1547 .bind(offset)
1548 .fetch_all(&self.pool)
1549 .await
1550 }
1551 None => {
1552 sqlx::query(
1553 r#"
1554 SELECT id, task_name, state, priority, retry_count, max_retries,
1555 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
1556 FROM celers_tasks
1557 ORDER BY created_at DESC
1558 LIMIT ? OFFSET ?
1559 "#,
1560 )
1561 .bind(limit)
1562 .bind(offset)
1563 .fetch_all(&self.pool)
1564 .await
1565 }
1566 }
1567 .map_err(|e| CelersError::Other(format!("Failed to list tasks: {}", e)))?;
1568
1569 let mut tasks = Vec::with_capacity(rows.len());
1570 for row in rows {
1571 let task_id_str: String = row.get("id");
1572 let state_str: String = row.get("state");
1573 tasks.push(TaskInfo {
1574 id: Uuid::parse_str(&task_id_str)
1575 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
1576 task_name: row.get("task_name"),
1577 state: state_str.parse()?,
1578 priority: row.get("priority"),
1579 retry_count: row.get("retry_count"),
1580 max_retries: row.get("max_retries"),
1581 created_at: row.get("created_at"),
1582 scheduled_at: row.get("scheduled_at"),
1583 started_at: row.get("started_at"),
1584 completed_at: row.get("completed_at"),
1585 worker_id: row.get("worker_id"),
1586 error_message: row.get("error_message"),
1587 });
1588 }
1589 Ok(tasks)
1590 }
1591
1592 pub async fn get_statistics(&self) -> Result<QueueStatistics> {
1594 let row = sqlx::query(
1596 r#"
1597 SELECT
1598 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending,
1599 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing,
1600 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed,
1601 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed,
1602 SUM(CASE WHEN state = 'cancelled' THEN 1 ELSE 0 END) as cancelled,
1603 COUNT(*) as total
1604 FROM celers_tasks
1605 "#,
1606 )
1607 .fetch_one(&self.pool)
1608 .await
1609 .map_err(|e| CelersError::Other(format!("Failed to get statistics: {}", e)))?;
1610
1611 let dlq_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM celers_dead_letter_queue")
1612 .fetch_one(&self.pool)
1613 .await
1614 .map_err(|e| CelersError::Other(format!("Failed to get DLQ count: {}", e)))?;
1615
1616 let pending: Option<rust_decimal::Decimal> = row.get("pending");
1618 let processing: Option<rust_decimal::Decimal> = row.get("processing");
1619 let completed: Option<rust_decimal::Decimal> = row.get("completed");
1620 let failed: Option<rust_decimal::Decimal> = row.get("failed");
1621 let cancelled: Option<rust_decimal::Decimal> = row.get("cancelled");
1622 let total: i64 = row.get("total");
1623
1624 Ok(QueueStatistics {
1625 pending: pending
1626 .map(|d| d.to_string().parse().unwrap_or(0))
1627 .unwrap_or(0),
1628 processing: processing
1629 .map(|d| d.to_string().parse().unwrap_or(0))
1630 .unwrap_or(0),
1631 completed: completed
1632 .map(|d| d.to_string().parse().unwrap_or(0))
1633 .unwrap_or(0),
1634 failed: failed
1635 .map(|d| d.to_string().parse().unwrap_or(0))
1636 .unwrap_or(0),
1637 cancelled: cancelled
1638 .map(|d| d.to_string().parse().unwrap_or(0))
1639 .unwrap_or(0),
1640 dlq: dlq_count,
1641 total,
1642 })
1643 }
1644
1645 pub async fn list_dlq(&self, limit: i64, offset: i64) -> Result<Vec<DlqTaskInfo>> {
1649 let rows = sqlx::query(
1650 r#"
1651 SELECT id, task_id, task_name, retry_count, error_message, failed_at
1652 FROM celers_dead_letter_queue
1653 ORDER BY failed_at DESC
1654 LIMIT ? OFFSET ?
1655 "#,
1656 )
1657 .bind(limit)
1658 .bind(offset)
1659 .fetch_all(&self.pool)
1660 .await
1661 .map_err(|e| CelersError::Other(format!("Failed to list DLQ: {}", e)))?;
1662
1663 let mut tasks = Vec::with_capacity(rows.len());
1664 for row in rows {
1665 let id_str: String = row.get("id");
1666 let task_id_str: String = row.get("task_id");
1667 tasks.push(DlqTaskInfo {
1668 id: Uuid::parse_str(&id_str)
1669 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
1670 task_id: Uuid::parse_str(&task_id_str)
1671 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
1672 task_name: row.get("task_name"),
1673 retry_count: row.get("retry_count"),
1674 error_message: row.get("error_message"),
1675 failed_at: row.get("failed_at"),
1676 });
1677 }
1678 Ok(tasks)
1679 }
1680
1681 pub async fn requeue_from_dlq(&self, dlq_id: &Uuid) -> Result<TaskId> {
1685 let mut tx = self
1686 .pool
1687 .begin()
1688 .await
1689 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
1690
1691 let row = sqlx::query(
1693 r#"
1694 SELECT task_id, task_name, payload, metadata
1695 FROM celers_dead_letter_queue
1696 WHERE id = ?
1697 "#,
1698 )
1699 .bind(dlq_id.to_string())
1700 .fetch_optional(&mut *tx)
1701 .await
1702 .map_err(|e| CelersError::Other(format!("Failed to fetch DLQ task: {}", e)))?;
1703
1704 let row = row.ok_or_else(|| CelersError::Other("DLQ task not found".to_string()))?;
1705
1706 let task_id_str: String = row.get("task_id");
1707 let task_id = Uuid::parse_str(&task_id_str)
1708 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
1709 let task_name: String = row.get("task_name");
1710 let payload: Vec<u8> = row.get("payload");
1711 let metadata: Option<String> = row.get("metadata");
1712
1713 let new_task_id = Uuid::new_v4();
1715 sqlx::query(
1716 r#"
1717 INSERT INTO celers_tasks
1718 (id, task_name, payload, state, priority, retry_count, max_retries, metadata, created_at, scheduled_at)
1719 VALUES (?, ?, ?, 'pending', 0, 0, 3, ?, NOW(), NOW())
1720 "#,
1721 )
1722 .bind(new_task_id.to_string())
1723 .bind(&task_name)
1724 .bind(&payload)
1725 .bind(metadata)
1726 .execute(&mut *tx)
1727 .await
1728 .map_err(|e| CelersError::Other(format!("Failed to requeue task: {}", e)))?;
1729
1730 sqlx::query("DELETE FROM celers_dead_letter_queue WHERE id = ?")
1732 .bind(dlq_id.to_string())
1733 .execute(&mut *tx)
1734 .await
1735 .map_err(|e| CelersError::Other(format!("Failed to delete from DLQ: {}", e)))?;
1736
1737 tx.commit()
1738 .await
1739 .map_err(|e| CelersError::Other(format!("Failed to commit requeue: {}", e)))?;
1740
1741 tracing::info!(original_task_id = %task_id, new_task_id = %new_task_id, task_name = %task_name, "Requeued task from DLQ");
1742
1743 Ok(new_task_id)
1744 }
1745
1746 pub async fn purge_dlq(&self, dlq_id: &Uuid) -> Result<bool> {
1748 let result = sqlx::query("DELETE FROM celers_dead_letter_queue WHERE id = ?")
1749 .bind(dlq_id.to_string())
1750 .execute(&self.pool)
1751 .await
1752 .map_err(|e| CelersError::Other(format!("Failed to purge DLQ task: {}", e)))?;
1753
1754 Ok(result.rows_affected() > 0)
1755 }
1756
1757 pub async fn purge_all_dlq(&self) -> Result<u64> {
1759 let result = sqlx::query("DELETE FROM celers_dead_letter_queue")
1760 .execute(&self.pool)
1761 .await
1762 .map_err(|e| CelersError::Other(format!("Failed to purge all DLQ: {}", e)))?;
1763
1764 tracing::info!(count = result.rows_affected(), "Purged all DLQ tasks");
1765 Ok(result.rows_affected())
1766 }
1767
1768 pub async fn check_health(&self) -> Result<HealthStatus> {
1772 let version: String = sqlx::query_scalar("SELECT VERSION()")
1774 .fetch_one(&self.pool)
1775 .await
1776 .map_err(|e| CelersError::Other(format!("Health check failed: {}", e)))?;
1777
1778 let stats = self.get_statistics().await?;
1780
1781 Ok(HealthStatus {
1782 healthy: true,
1783 connection_pool_size: self.pool.options().get_max_connections(),
1784 idle_connections: self.pool.num_idle() as u32,
1785 pending_tasks: stats.pending,
1786 processing_tasks: stats.processing,
1787 dlq_tasks: stats.dlq,
1788 database_version: version,
1789 })
1790 }
1791
1792 pub async fn archive_completed_tasks(&self, older_than: Duration) -> Result<u64> {
1796 let cutoff = Utc::now() - chrono::Duration::seconds(older_than.as_secs() as i64);
1797 let cutoff_str = cutoff.format("%Y-%m-%d %H:%M:%S").to_string();
1798
1799 let result = sqlx::query(
1800 r#"
1801 DELETE FROM celers_tasks
1802 WHERE state IN ('completed', 'failed', 'cancelled')
1803 AND completed_at < ?
1804 "#,
1805 )
1806 .bind(cutoff_str)
1807 .execute(&self.pool)
1808 .await
1809 .map_err(|e| CelersError::Other(format!("Failed to archive tasks: {}", e)))?;
1810
1811 tracing::info!(count = result.rows_affected(), cutoff = %cutoff, "Archived completed tasks");
1812 Ok(result.rows_affected())
1813 }
1814
1815 pub async fn recover_stuck_tasks(&self, stuck_threshold: Duration) -> Result<u64> {
1819 let cutoff = Utc::now() - chrono::Duration::seconds(stuck_threshold.as_secs() as i64);
1820 let cutoff_str = cutoff.format("%Y-%m-%d %H:%M:%S").to_string();
1821
1822 let result = sqlx::query(
1823 r#"
1824 UPDATE celers_tasks
1825 SET state = 'pending',
1826 started_at = NULL,
1827 worker_id = NULL,
1828 error_message = 'Recovered from stuck processing state'
1829 WHERE state = 'processing'
1830 AND started_at < ?
1831 "#,
1832 )
1833 .bind(cutoff_str)
1834 .execute(&self.pool)
1835 .await
1836 .map_err(|e| CelersError::Other(format!("Failed to recover stuck tasks: {}", e)))?;
1837
1838 if result.rows_affected() > 0 {
1839 tracing::warn!(
1840 count = result.rows_affected(),
1841 "Recovered stuck processing tasks"
1842 );
1843 }
1844 Ok(result.rows_affected())
1845 }
1846
1847 pub async fn purge_all(&self) -> Result<u64> {
1849 let result = sqlx::query("DELETE FROM celers_tasks")
1850 .execute(&self.pool)
1851 .await
1852 .map_err(|e| CelersError::Other(format!("Failed to purge all tasks: {}", e)))?;
1853
1854 tracing::warn!(count = result.rows_affected(), "Purged all tasks");
1855 Ok(result.rows_affected())
1856 }
1857
1858 #[allow(clippy::too_many_arguments)]
1864 pub async fn store_result(
1865 &self,
1866 task_id: &TaskId,
1867 task_name: &str,
1868 status: TaskResultStatus,
1869 result: Option<serde_json::Value>,
1870 error: Option<&str>,
1871 traceback: Option<&str>,
1872 runtime_ms: Option<i64>,
1873 ) -> Result<()> {
1874 let completed_at = match status {
1875 TaskResultStatus::Success | TaskResultStatus::Failure | TaskResultStatus::Revoked => {
1876 Some(Utc::now().format("%Y-%m-%d %H:%M:%S").to_string())
1877 }
1878 _ => None,
1879 };
1880
1881 sqlx::query(
1883 r#"
1884 INSERT INTO celers_task_results
1885 (task_id, task_name, status, result, error, traceback, created_at, completed_at, runtime_ms)
1886 VALUES (?, ?, ?, ?, ?, ?, NOW(), ?, ?)
1887 ON DUPLICATE KEY UPDATE
1888 status = VALUES(status),
1889 result = VALUES(result),
1890 error = VALUES(error),
1891 traceback = VALUES(traceback),
1892 completed_at = VALUES(completed_at),
1893 runtime_ms = VALUES(runtime_ms)
1894 "#,
1895 )
1896 .bind(task_id.to_string())
1897 .bind(task_name)
1898 .bind(status.to_string())
1899 .bind(result.map(|v| serde_json::to_string(&v).unwrap_or_else(|_| "null".to_string())))
1900 .bind(error)
1901 .bind(traceback)
1902 .bind(completed_at)
1903 .bind(runtime_ms)
1904 .execute(&self.pool)
1905 .await
1906 .map_err(|e| CelersError::Other(format!("Failed to store result: {}", e)))?;
1907
1908 Ok(())
1909 }
1910
1911 pub async fn get_result(&self, task_id: &TaskId) -> Result<Option<TaskResult>> {
1913 let row = sqlx::query(
1914 r#"
1915 SELECT task_id, task_name, status, result, error, traceback,
1916 created_at, completed_at, runtime_ms
1917 FROM celers_task_results
1918 WHERE task_id = ?
1919 "#,
1920 )
1921 .bind(task_id.to_string())
1922 .fetch_optional(&self.pool)
1923 .await
1924 .map_err(|e| CelersError::Other(format!("Failed to get result: {}", e)))?;
1925
1926 match row {
1927 Some(row) => {
1928 let task_id_str: String = row.get("task_id");
1929 let status_str: String = row.get("status");
1930 let result_str: Option<String> = row.get("result");
1931 Ok(Some(TaskResult {
1932 task_id: Uuid::parse_str(&task_id_str)
1933 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
1934 task_name: row.get("task_name"),
1935 status: status_str.parse()?,
1936 result: result_str.and_then(|s| serde_json::from_str(&s).ok()),
1937 error: row.get("error"),
1938 traceback: row.get("traceback"),
1939 created_at: row.get("created_at"),
1940 completed_at: row.get("completed_at"),
1941 runtime_ms: row.get("runtime_ms"),
1942 }))
1943 }
1944 None => Ok(None),
1945 }
1946 }
1947
1948 pub async fn delete_result(&self, task_id: &TaskId) -> Result<bool> {
1950 let result = sqlx::query("DELETE FROM celers_task_results WHERE task_id = ?")
1951 .bind(task_id.to_string())
1952 .execute(&self.pool)
1953 .await
1954 .map_err(|e| CelersError::Other(format!("Failed to delete result: {}", e)))?;
1955
1956 Ok(result.rows_affected() > 0)
1957 }
1958
1959 pub async fn archive_results(&self, older_than: Duration) -> Result<u64> {
1963 let cutoff = Utc::now() - chrono::Duration::seconds(older_than.as_secs() as i64);
1964 let cutoff_str = cutoff.format("%Y-%m-%d %H:%M:%S").to_string();
1965
1966 let result = sqlx::query(
1967 r#"
1968 DELETE FROM celers_task_results
1969 WHERE completed_at < ?
1970 "#,
1971 )
1972 .bind(cutoff_str)
1973 .execute(&self.pool)
1974 .await
1975 .map_err(|e| CelersError::Other(format!("Failed to archive results: {}", e)))?;
1976
1977 tracing::info!(count = result.rows_affected(), cutoff = %cutoff, "Archived old results");
1978 Ok(result.rows_affected())
1979 }
1980
1981 pub async fn get_table_sizes(&self) -> Result<Vec<TableSizeInfo>> {
1985 let rows = sqlx::query(
1986 r#"
1987 SELECT
1988 TABLE_NAME as table_name,
1989 TABLE_ROWS as row_count,
1990 DATA_LENGTH as data_size_bytes,
1991 INDEX_LENGTH as index_size_bytes
1992 FROM information_schema.TABLES
1993 WHERE TABLE_SCHEMA = DATABASE()
1994 AND TABLE_NAME LIKE 'celers_%'
1995 ORDER BY DATA_LENGTH DESC
1996 "#,
1997 )
1998 .fetch_all(&self.pool)
1999 .await
2000 .map_err(|e| CelersError::Other(format!("Failed to get table sizes: {}", e)))?;
2001
2002 let mut tables = Vec::with_capacity(rows.len());
2003 for row in rows {
2004 let row_count: Option<i64> = row.get("row_count");
2005 let data_size: Option<i64> = row.get("data_size_bytes");
2006 let index_size: Option<i64> = row.get("index_size_bytes");
2007 tables.push(TableSizeInfo {
2008 table_name: row.get("table_name"),
2009 row_count: row_count.unwrap_or(0),
2010 data_size_bytes: data_size.unwrap_or(0),
2011 index_size_bytes: index_size.unwrap_or(0),
2012 });
2013 }
2014 Ok(tables)
2015 }
2016
2017 pub async fn optimize_tables(&self) -> Result<()> {
2021 sqlx::query("OPTIMIZE TABLE celers_tasks")
2022 .execute(&self.pool)
2023 .await
2024 .map_err(|e| CelersError::Other(format!("Failed to optimize celers_tasks: {}", e)))?;
2025
2026 sqlx::query("OPTIMIZE TABLE celers_dead_letter_queue")
2027 .execute(&self.pool)
2028 .await
2029 .map_err(|e| {
2030 CelersError::Other(format!(
2031 "Failed to optimize celers_dead_letter_queue: {}",
2032 e
2033 ))
2034 })?;
2035
2036 sqlx::query("OPTIMIZE TABLE celers_task_results")
2037 .execute(&self.pool)
2038 .await
2039 .map_err(|e| {
2040 CelersError::Other(format!("Failed to optimize celers_task_results: {}", e))
2041 })?;
2042
2043 tracing::info!("Optimized all CeleRS tables");
2044 Ok(())
2045 }
2046
2047 pub async fn analyze_tables(&self) -> Result<()> {
2049 sqlx::query("ANALYZE TABLE celers_tasks")
2050 .execute(&self.pool)
2051 .await
2052 .map_err(|e| CelersError::Other(format!("Failed to analyze celers_tasks: {}", e)))?;
2053
2054 sqlx::query("ANALYZE TABLE celers_dead_letter_queue")
2055 .execute(&self.pool)
2056 .await
2057 .map_err(|e| {
2058 CelersError::Other(format!("Failed to analyze celers_dead_letter_queue: {}", e))
2059 })?;
2060
2061 sqlx::query("ANALYZE TABLE celers_task_results")
2062 .execute(&self.pool)
2063 .await
2064 .map_err(|e| {
2065 CelersError::Other(format!("Failed to analyze celers_task_results: {}", e))
2066 })?;
2067
2068 tracing::info!("Analyzed all CeleRS tables");
2069 Ok(())
2070 }
2071
2072 pub async fn count_by_task_name(&self) -> Result<Vec<TaskNameCount>> {
2078 let rows = sqlx::query(
2079 r#"
2080 SELECT
2081 task_name,
2082 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending,
2083 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing,
2084 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed,
2085 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed,
2086 COUNT(*) as total
2087 FROM celers_tasks
2088 GROUP BY task_name
2089 ORDER BY total DESC
2090 "#,
2091 )
2092 .fetch_all(&self.pool)
2093 .await
2094 .map_err(|e| CelersError::Other(format!("Failed to count by task name: {}", e)))?;
2095
2096 let mut counts = Vec::with_capacity(rows.len());
2097 for row in rows {
2098 let pending: Option<rust_decimal::Decimal> = row.get("pending");
2099 let processing: Option<rust_decimal::Decimal> = row.get("processing");
2100 let completed: Option<rust_decimal::Decimal> = row.get("completed");
2101 let failed: Option<rust_decimal::Decimal> = row.get("failed");
2102 let total: i64 = row.get("total");
2103
2104 counts.push(TaskNameCount {
2105 task_name: row.get("task_name"),
2106 pending: pending
2107 .map(|d| d.to_string().parse().unwrap_or(0))
2108 .unwrap_or(0),
2109 processing: processing
2110 .map(|d| d.to_string().parse().unwrap_or(0))
2111 .unwrap_or(0),
2112 completed: completed
2113 .map(|d| d.to_string().parse().unwrap_or(0))
2114 .unwrap_or(0),
2115 failed: failed
2116 .map(|d| d.to_string().parse().unwrap_or(0))
2117 .unwrap_or(0),
2118 total,
2119 });
2120 }
2121 Ok(counts)
2122 }
2123
2124 pub async fn get_processing_tasks(&self, limit: i64, offset: i64) -> Result<Vec<TaskInfo>> {
2128 self.list_tasks(Some(DbTaskState::Processing), limit, offset)
2129 .await
2130 }
2131
2132 pub async fn get_tasks_by_worker(&self, worker_id: &str) -> Result<Vec<TaskInfo>> {
2134 let rows = sqlx::query(
2135 r#"
2136 SELECT id, task_name, state, priority, retry_count, max_retries,
2137 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
2138 FROM celers_tasks
2139 WHERE worker_id = ?
2140 ORDER BY started_at DESC
2141 "#,
2142 )
2143 .bind(worker_id)
2144 .fetch_all(&self.pool)
2145 .await
2146 .map_err(|e| CelersError::Other(format!("Failed to get tasks by worker: {}", e)))?;
2147
2148 let mut tasks = Vec::with_capacity(rows.len());
2149 for row in rows {
2150 let task_id_str: String = row.get("id");
2151 let state_str: String = row.get("state");
2152 tasks.push(TaskInfo {
2153 id: Uuid::parse_str(&task_id_str)
2154 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
2155 task_name: row.get("task_name"),
2156 state: state_str.parse()?,
2157 priority: row.get("priority"),
2158 retry_count: row.get("retry_count"),
2159 max_retries: row.get("max_retries"),
2160 created_at: row.get("created_at"),
2161 scheduled_at: row.get("scheduled_at"),
2162 started_at: row.get("started_at"),
2163 completed_at: row.get("completed_at"),
2164 worker_id: row.get("worker_id"),
2165 error_message: row.get("error_message"),
2166 });
2167 }
2168 Ok(tasks)
2169 }
2170
2171 pub async fn list_scheduled_tasks(
2173 &self,
2174 limit: i64,
2175 offset: i64,
2176 ) -> Result<Vec<ScheduledTaskInfo>> {
2177 let rows = sqlx::query(
2178 r#"
2179 SELECT id, task_name, priority, scheduled_at, created_at,
2180 TIMESTAMPDIFF(SECOND, NOW(), scheduled_at) as delay_remaining_secs
2181 FROM celers_tasks
2182 WHERE state = 'pending'
2183 AND scheduled_at > NOW()
2184 ORDER BY scheduled_at ASC
2185 LIMIT ? OFFSET ?
2186 "#,
2187 )
2188 .bind(limit)
2189 .bind(offset)
2190 .fetch_all(&self.pool)
2191 .await
2192 .map_err(|e| CelersError::Other(format!("Failed to list scheduled tasks: {}", e)))?;
2193
2194 let mut tasks = Vec::with_capacity(rows.len());
2195 for row in rows {
2196 let task_id_str: String = row.get("id");
2197 let delay: Option<i64> = row.get("delay_remaining_secs");
2198 tasks.push(ScheduledTaskInfo {
2199 id: Uuid::parse_str(&task_id_str)
2200 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
2201 task_name: row.get("task_name"),
2202 priority: row.get("priority"),
2203 scheduled_at: row.get("scheduled_at"),
2204 created_at: row.get("created_at"),
2205 delay_remaining_secs: delay.unwrap_or(0),
2206 });
2207 }
2208 Ok(tasks)
2209 }
2210
2211 pub async fn count_scheduled_tasks(&self) -> Result<i64> {
2213 let count: i64 = sqlx::query_scalar(
2214 r#"
2215 SELECT COUNT(*) FROM celers_tasks
2216 WHERE state = 'pending' AND scheduled_at > NOW()
2217 "#,
2218 )
2219 .fetch_one(&self.pool)
2220 .await
2221 .map_err(|e| CelersError::Other(format!("Failed to count scheduled tasks: {}", e)))?;
2222
2223 Ok(count)
2224 }
2225
2226 pub async fn update_error_message(
2232 &self,
2233 task_id: &TaskId,
2234 error_message: &str,
2235 ) -> Result<bool> {
2236 let result = sqlx::query(
2237 r#"
2238 UPDATE celers_tasks
2239 SET error_message = ?
2240 WHERE id = ?
2241 "#,
2242 )
2243 .bind(error_message)
2244 .bind(task_id.to_string())
2245 .execute(&self.pool)
2246 .await
2247 .map_err(|e| CelersError::Other(format!("Failed to update error message: {}", e)))?;
2248
2249 Ok(result.rows_affected() > 0)
2250 }
2251
2252 pub async fn set_worker_id(&self, task_id: &TaskId, worker_id: &str) -> Result<bool> {
2256 let result = sqlx::query(
2257 r#"
2258 UPDATE celers_tasks
2259 SET worker_id = ?
2260 WHERE id = ? AND state = 'processing'
2261 "#,
2262 )
2263 .bind(worker_id)
2264 .bind(task_id.to_string())
2265 .execute(&self.pool)
2266 .await
2267 .map_err(|e| CelersError::Other(format!("Failed to set worker ID: {}", e)))?;
2268
2269 Ok(result.rows_affected() > 0)
2270 }
2271
2272 pub async fn dequeue_with_worker_id(&self, worker_id: &str) -> Result<Option<BrokerMessage>> {
2277 if self.paused.load(Ordering::SeqCst) {
2279 return Ok(None);
2280 }
2281
2282 let mut tx = self
2283 .pool
2284 .begin()
2285 .await
2286 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
2287
2288 let row = sqlx::query(
2289 r#"
2290 SELECT id, task_name, payload, retry_count
2291 FROM celers_tasks
2292 WHERE state = 'pending'
2293 AND scheduled_at <= NOW()
2294 ORDER BY priority DESC, created_at ASC
2295 FOR UPDATE SKIP LOCKED
2296 LIMIT 1
2297 "#,
2298 )
2299 .fetch_optional(&mut *tx)
2300 .await
2301 .map_err(|e| CelersError::Other(format!("Failed to dequeue task: {}", e)))?;
2302
2303 if let Some(row) = row {
2304 let task_id_str: String = row.get("id");
2305 let _task_id = Uuid::parse_str(&task_id_str)
2306 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
2307 let task_name: String = row.get("task_name");
2308 let payload: Vec<u8> = row.get("payload");
2309 let retry_count: i32 = row.get("retry_count");
2310
2311 sqlx::query(
2313 r#"
2314 UPDATE celers_tasks
2315 SET state = 'processing',
2316 started_at = NOW(),
2317 retry_count = retry_count + 1,
2318 worker_id = ?
2319 WHERE id = ?
2320 "#,
2321 )
2322 .bind(worker_id)
2323 .bind(&task_id_str)
2324 .execute(&mut *tx)
2325 .await
2326 .map_err(|e| CelersError::Other(format!("Failed to mark task as processing: {}", e)))?;
2327
2328 tx.commit()
2329 .await
2330 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
2331
2332 Ok(Some(BrokerMessage {
2333 task: SerializedTask::new(task_name, payload),
2334 receipt_handle: Some(retry_count.to_string()),
2335 }))
2336 } else {
2337 tx.rollback().await.map_err(|e| {
2338 CelersError::Other(format!("Failed to rollback transaction: {}", e))
2339 })?;
2340 Ok(None)
2341 }
2342 }
2343
2344 pub async fn purge_by_state(&self, state: DbTaskState) -> Result<u64> {
2350 let result = sqlx::query("DELETE FROM celers_tasks WHERE state = ?")
2351 .bind(state.to_string())
2352 .execute(&self.pool)
2353 .await
2354 .map_err(|e| CelersError::Other(format!("Failed to purge tasks by state: {}", e)))?;
2355
2356 tracing::info!(state = %state, count = result.rows_affected(), "Purged tasks by state");
2357 Ok(result.rows_affected())
2358 }
2359
2360 pub async fn purge_completed(&self) -> Result<u64> {
2362 self.purge_by_state(DbTaskState::Completed).await
2363 }
2364
2365 pub async fn purge_failed(&self) -> Result<u64> {
2367 self.purge_by_state(DbTaskState::Failed).await
2368 }
2369
2370 pub async fn purge_cancelled(&self) -> Result<u64> {
2372 self.purge_by_state(DbTaskState::Cancelled).await
2373 }
2374
2375 pub async fn purge_by_task_name(&self, task_name: &str) -> Result<u64> {
2379 let result = sqlx::query("DELETE FROM celers_tasks WHERE task_name = ?")
2380 .bind(task_name)
2381 .execute(&self.pool)
2382 .await
2383 .map_err(|e| CelersError::Other(format!("Failed to purge tasks by name: {}", e)))?;
2384
2385 tracing::info!(task_name = %task_name, count = result.rows_affected(), "Purged tasks by name");
2386 Ok(result.rows_affected())
2387 }
2388
2389 pub async fn list_migrations(&self) -> Result<Vec<MigrationInfo>> {
2393 let rows = sqlx::query(
2394 r#"
2395 SELECT version, name, applied_at
2396 FROM celers_migrations
2397 ORDER BY applied_at ASC
2398 "#,
2399 )
2400 .fetch_all(&self.pool)
2401 .await
2402 .map_err(|e| CelersError::Other(format!("Failed to list migrations: {}", e)))?;
2403
2404 let mut migrations = Vec::with_capacity(rows.len());
2405 for row in rows {
2406 migrations.push(MigrationInfo {
2407 version: row.get("version"),
2408 name: row.get("name"),
2409 applied_at: row.get("applied_at"),
2410 });
2411 }
2412 Ok(migrations)
2413 }
2414
2415 pub async fn get_query_stats(&self) -> Result<Vec<QueryStats>> {
2421 let rows = sqlx::query(
2422 r#"
2423 SELECT
2424 DIGEST_TEXT as query_name,
2425 COUNT_STAR as execution_count,
2426 SUM_TIMER_WAIT / 1000000000 as total_time_ms,
2427 AVG_TIMER_WAIT / 1000000000 as avg_time_ms,
2428 MIN_TIMER_WAIT / 1000000000 as min_time_ms,
2429 MAX_TIMER_WAIT / 1000000000 as max_time_ms
2430 FROM performance_schema.events_statements_summary_by_digest
2431 WHERE SCHEMA_NAME = DATABASE()
2432 AND DIGEST_TEXT LIKE '%celers%'
2433 ORDER BY SUM_TIMER_WAIT DESC
2434 LIMIT 50
2435 "#,
2436 )
2437 .fetch_all(&self.pool)
2438 .await
2439 .map_err(|e| CelersError::Other(format!("Failed to get query stats: {}", e)))?;
2440
2441 let mut stats = Vec::with_capacity(rows.len());
2442 for row in rows {
2443 let query_name: String = row.get("query_name");
2444 let execution_count: i64 = row.get("execution_count");
2445 let total_time: Option<rust_decimal::Decimal> = row.get("total_time_ms");
2446 let avg_time: Option<rust_decimal::Decimal> = row.get("avg_time_ms");
2447 let min_time: Option<rust_decimal::Decimal> = row.get("min_time_ms");
2448 let max_time: Option<rust_decimal::Decimal> = row.get("max_time_ms");
2449
2450 stats.push(QueryStats {
2451 query_name,
2452 execution_count,
2453 total_time_ms: total_time
2454 .map(|d| d.to_string().parse().unwrap_or(0))
2455 .unwrap_or(0),
2456 avg_time_ms: avg_time
2457 .map(|d| d.to_string().parse().unwrap_or(0.0))
2458 .unwrap_or(0.0),
2459 min_time_ms: min_time
2460 .map(|d| d.to_string().parse().unwrap_or(0))
2461 .unwrap_or(0),
2462 max_time_ms: max_time
2463 .map(|d| d.to_string().parse().unwrap_or(0))
2464 .unwrap_or(0),
2465 });
2466 }
2467 Ok(stats)
2468 }
2469
2470 pub async fn reset_query_stats(&self) -> Result<()> {
2474 sqlx::query(
2475 r#"
2476 CALL sys.ps_truncate_all_tables(FALSE)
2477 "#,
2478 )
2479 .execute(&self.pool)
2480 .await
2481 .map_err(|e| CelersError::Other(format!("Failed to reset query stats: {}", e)))?;
2482
2483 tracing::info!("Reset query performance statistics");
2484 Ok(())
2485 }
2486
2487 pub async fn get_index_stats(&self) -> Result<Vec<IndexStats>> {
2494 let rows = sqlx::query(
2495 r#"
2496 SELECT
2497 TABLE_NAME as table_name,
2498 INDEX_NAME as index_name,
2499 CARDINALITY as cardinality,
2500 NON_UNIQUE as non_unique
2501 FROM information_schema.STATISTICS
2502 WHERE TABLE_SCHEMA = DATABASE()
2503 AND TABLE_NAME LIKE 'celers_%'
2504 ORDER BY TABLE_NAME, INDEX_NAME
2505 "#,
2506 )
2507 .fetch_all(&self.pool)
2508 .await
2509 .map_err(|e| CelersError::Other(format!("Failed to get index stats: {}", e)))?;
2510
2511 let mut stats = Vec::with_capacity(rows.len());
2512 for row in rows {
2513 let cardinality: Option<i64> = row.get("cardinality");
2514 let non_unique: i32 = row.get("non_unique");
2515 stats.push(IndexStats {
2516 table_name: row.get("table_name"),
2517 index_name: row.get("index_name"),
2518 cardinality: cardinality.unwrap_or(0),
2519 unique_values: non_unique == 0,
2520 });
2521 }
2522 Ok(stats)
2523 }
2524
2525 pub async fn explain_dequeue(&self) -> Result<Vec<QueryPlan>> {
2530 let rows = sqlx::query(
2531 r#"
2532 EXPLAIN
2533 SELECT id, task_name, payload, retry_count
2534 FROM celers_tasks
2535 WHERE state = 'pending'
2536 AND scheduled_at <= NOW()
2537 ORDER BY priority DESC, created_at ASC
2538 FOR UPDATE SKIP LOCKED
2539 LIMIT 1
2540 "#,
2541 )
2542 .fetch_all(&self.pool)
2543 .await
2544 .map_err(|e| CelersError::Other(format!("Failed to explain query: {}", e)))?;
2545
2546 let mut plans = Vec::with_capacity(rows.len());
2547 for row in rows {
2548 let rows_examined: Option<i64> = row.try_get("rows").ok();
2549 let filtered: Option<rust_decimal::Decimal> = row.try_get("filtered").ok();
2550 plans.push(QueryPlan {
2551 id: row.get("id"),
2552 select_type: row.get("select_type"),
2553 table: row.try_get("table").ok(),
2554 query_type: row.try_get("type").ok(),
2555 possible_keys: row.try_get("possible_keys").ok(),
2556 key_used: row.try_get("key").ok(),
2557 key_length: row.try_get("key_len").ok(),
2558 rows_examined,
2559 filtered: filtered.map(|d| d.to_string().parse().unwrap_or(0.0)),
2560 extra: row.try_get("Extra").ok(),
2561 });
2562 }
2563 Ok(plans)
2564 }
2565
2566 pub async fn explain_query(&self, query: &str) -> Result<Vec<QueryPlan>> {
2570 let explain_query = format!("EXPLAIN {}", query);
2571 let rows = sqlx::query(&explain_query)
2572 .fetch_all(&self.pool)
2573 .await
2574 .map_err(|e| CelersError::Other(format!("Failed to explain query: {}", e)))?;
2575
2576 let mut plans = Vec::with_capacity(rows.len());
2577 for row in rows {
2578 let rows_examined: Option<i64> = row.try_get("rows").ok();
2579 let filtered: Option<rust_decimal::Decimal> = row.try_get("filtered").ok();
2580 plans.push(QueryPlan {
2581 id: row.get("id"),
2582 select_type: row.get("select_type"),
2583 table: row.try_get("table").ok(),
2584 query_type: row.try_get("type").ok(),
2585 possible_keys: row.try_get("possible_keys").ok(),
2586 key_used: row.try_get("key").ok(),
2587 key_length: row.try_get("key_len").ok(),
2588 rows_examined,
2589 filtered: filtered.map(|d| d.to_string().parse().unwrap_or(0.0)),
2590 extra: row.try_get("Extra").ok(),
2591 });
2592 }
2593 Ok(plans)
2594 }
2595
2596 pub async fn check_index_usage(&self) -> Result<Vec<String>> {
2601 let mut warnings = Vec::new();
2602
2603 let dequeue_plan = self.explain_dequeue().await?;
2605 for plan in dequeue_plan {
2606 if plan.key_used.is_none() {
2607 warnings.push(format!(
2608 "Dequeue query on table {:?} is not using an index (full table scan)",
2609 plan.table
2610 ));
2611 }
2612 if let Some(extra) = &plan.extra {
2613 if extra.contains("Using filesort") {
2614 warnings.push("Dequeue query requires filesort - consider adding composite index on (state, priority, created_at)".to_string());
2615 }
2616 }
2617 }
2618
2619 let index_stats = self.get_index_stats().await?;
2621 for stat in index_stats {
2622 if stat.cardinality == 0 && !stat.index_name.eq("PRIMARY") {
2623 warnings.push(format!(
2624 "Index {} on table {} has zero cardinality - consider running ANALYZE TABLE",
2625 stat.index_name, stat.table_name
2626 ));
2627 }
2628 }
2629
2630 Ok(warnings)
2631 }
2632
2633 pub fn get_connection_diagnostics(&self) -> ConnectionDiagnostics {
2639 let max_conns = self.pool.options().get_max_connections();
2640 let idle_conns = self.pool.num_idle() as u32;
2641 let min_conns = self.pool.options().get_min_connections();
2642
2643 let total_conns = idle_conns.max(min_conns);
2645 let active_conns = total_conns.saturating_sub(idle_conns);
2646
2647 let utilization = if max_conns > 0 {
2648 (total_conns as f64 / max_conns as f64) * 100.0
2649 } else {
2650 0.0
2651 };
2652
2653 ConnectionDiagnostics {
2654 total_connections: total_conns,
2655 idle_connections: idle_conns,
2656 active_connections: active_conns,
2657 max_connections: max_conns,
2658 connection_wait_time_ms: None, pool_utilization_percent: utilization,
2660 }
2661 }
2662
2663 pub async fn get_performance_metrics(&self) -> Result<PerformanceMetrics> {
2668 let stats = self.get_statistics().await?;
2669 let conn_diag = self.get_connection_diagnostics();
2670
2671 let tasks_per_second = 0.0;
2674
2675 let (avg_dequeue_ms, avg_enqueue_ms) = match self.get_query_stats().await {
2677 Ok(stats) => {
2678 let dequeue_stat = stats
2679 .iter()
2680 .find(|s| {
2681 s.query_name.contains("SELECT") && s.query_name.contains("celers_tasks")
2682 })
2683 .map(|s| s.avg_time_ms)
2684 .unwrap_or(0.0);
2685
2686 let enqueue_stat = stats
2687 .iter()
2688 .find(|s| {
2689 s.query_name.contains("INSERT") && s.query_name.contains("celers_tasks")
2690 })
2691 .map(|s| s.avg_time_ms)
2692 .unwrap_or(0.0);
2693
2694 (dequeue_stat, enqueue_stat)
2695 }
2696 Err(_) => (0.0, 0.0),
2697 };
2698
2699 Ok(PerformanceMetrics {
2700 timestamp: Utc::now(),
2701 tasks_per_second,
2702 avg_dequeue_time_ms: avg_dequeue_ms,
2703 avg_enqueue_time_ms: avg_enqueue_ms,
2704 queue_depth: stats.pending,
2705 processing_tasks: stats.processing,
2706 dlq_size: stats.dlq,
2707 connection_pool: conn_diag,
2708 })
2709 }
2710
2711 pub async fn is_ready(&self) -> bool {
2718 let version_check = sqlx::query_scalar::<_, String>("SELECT VERSION()")
2720 .fetch_one(&self.pool)
2721 .await;
2722
2723 if version_check.is_err() {
2724 return false;
2725 }
2726
2727 let idle = self.pool.num_idle();
2729 if idle == 0 {
2730 let max_conns = self.pool.options().get_max_connections();
2731 if max_conns > 0 && self.pool.size() >= max_conns {
2733 return false;
2734 }
2735 }
2736
2737 true
2738 }
2739
2740 pub async fn get_server_variables(&self) -> Result<std::collections::HashMap<String, String>> {
2744 let rows = sqlx::query(
2745 r#"
2746 SHOW VARIABLES WHERE Variable_name IN (
2747 'max_connections',
2748 'innodb_buffer_pool_size',
2749 'innodb_log_file_size',
2750 'query_cache_size',
2751 'query_cache_type',
2752 'innodb_flush_log_at_trx_commit',
2753 'innodb_flush_method',
2754 'binlog_format',
2755 'expire_logs_days'
2756 )
2757 "#,
2758 )
2759 .fetch_all(&self.pool)
2760 .await
2761 .map_err(|e| CelersError::Other(format!("Failed to get server variables: {}", e)))?;
2762
2763 let mut variables = std::collections::HashMap::new();
2764 for row in rows {
2765 let var_name: String = row.get("Variable_name");
2766 let var_value: String = row.get("Value");
2767 variables.insert(var_name, var_value);
2768 }
2769
2770 Ok(variables)
2771 }
2772
2773 pub async fn enqueue_batch_impl(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
2781 if tasks.is_empty() {
2782 return Ok(Vec::new());
2783 }
2784
2785 let mut tx = self
2786 .pool
2787 .begin()
2788 .await
2789 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
2790
2791 let mut task_ids = Vec::with_capacity(tasks.len());
2792
2793 for task in &tasks {
2794 let task_id = task.metadata.id;
2795 let mut db_metadata = json!({
2796 "queue": self.queue_name,
2797 "enqueued_at": chrono::Utc::now().to_rfc3339(),
2798 });
2799
2800 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
2802 if let Some(obj) = db_metadata.as_object_mut() {
2803 if let Some(meta_obj) = task_meta.as_object() {
2804 for (k, v) in meta_obj {
2805 obj.insert(k.clone(), v.clone());
2806 }
2807 }
2808 }
2809 }
2810
2811 sqlx::query(
2812 r#"
2813 INSERT INTO celers_tasks
2814 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
2815 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
2816 "#,
2817 )
2818 .bind(task_id.to_string())
2819 .bind(&task.metadata.name)
2820 .bind(&task.payload)
2821 .bind(task.metadata.priority)
2822 .bind(task.metadata.max_retries as i32)
2823 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
2824 .execute(&mut *tx)
2825 .await
2826 .map_err(|e| CelersError::Other(format!("Failed to enqueue task in batch: {}", e)))?;
2827
2828 task_ids.push(task_id);
2829 }
2830
2831 tx.commit()
2832 .await
2833 .map_err(|e| CelersError::Other(format!("Failed to commit batch enqueue: {}", e)))?;
2834
2835 #[cfg(feature = "metrics")]
2836 {
2837 TASKS_ENQUEUED_TOTAL.inc_by(tasks.len() as f64);
2838
2839 for task in &tasks {
2841 TASKS_ENQUEUED_BY_TYPE
2842 .with_label_values(&[&task.metadata.name])
2843 .inc();
2844 }
2845 }
2846
2847 Ok(task_ids)
2848 }
2849
2850 pub async fn dequeue_batch_impl(&self, limit: usize) -> Result<Vec<BrokerMessage>> {
2861 if limit == 0 || self.paused.load(Ordering::SeqCst) {
2862 return Ok(Vec::new());
2863 }
2864
2865 let mut tx = self
2866 .pool
2867 .begin()
2868 .await
2869 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
2870
2871 let rows = sqlx::query(
2872 r#"
2873 SELECT id, task_name, payload, retry_count
2874 FROM celers_tasks
2875 WHERE state = 'pending'
2876 AND scheduled_at <= NOW()
2877 ORDER BY priority DESC, created_at ASC
2878 FOR UPDATE SKIP LOCKED
2879 LIMIT ?
2880 "#,
2881 )
2882 .bind(limit as i64)
2883 .fetch_all(&mut *tx)
2884 .await
2885 .map_err(|e| CelersError::Other(format!("Failed to dequeue batch: {}", e)))?;
2886
2887 if rows.is_empty() {
2888 tx.rollback().await.map_err(|e| {
2889 CelersError::Other(format!("Failed to rollback transaction: {}", e))
2890 })?;
2891 return Ok(Vec::new());
2892 }
2893
2894 let mut messages = Vec::with_capacity(rows.len());
2895 let mut task_id_strings = Vec::with_capacity(rows.len());
2896
2897 for row in rows {
2898 let task_id_str: String = row.get("id");
2899 let _task_id = Uuid::parse_str(&task_id_str)
2900 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
2901 let task_name: String = row.get("task_name");
2902 let payload: Vec<u8> = row.get("payload");
2903 let retry_count: i32 = row.get("retry_count");
2904
2905 messages.push(BrokerMessage {
2906 task: SerializedTask::new(task_name, payload),
2907 receipt_handle: Some(retry_count.to_string()),
2908 });
2909
2910 task_id_strings.push(task_id_str);
2911 }
2912
2913 if !task_id_strings.is_empty() {
2917 let placeholders = task_id_strings
2918 .iter()
2919 .map(|_| "?")
2920 .collect::<Vec<_>>()
2921 .join(", ");
2922 let update_query = format!(
2923 r#"
2924 UPDATE celers_tasks
2925 SET state = 'processing',
2926 started_at = NOW(),
2927 retry_count = retry_count + 1
2928 WHERE id IN ({})
2929 "#,
2930 placeholders
2931 );
2932
2933 let mut query = sqlx::query(&update_query);
2934 for task_id in task_id_strings {
2935 query = query.bind(task_id);
2936 }
2937
2938 query.execute(&mut *tx).await.map_err(|e| {
2939 CelersError::Other(format!("Failed to mark batch as processing: {}", e))
2940 })?;
2941 }
2942
2943 tx.commit()
2944 .await
2945 .map_err(|e| CelersError::Other(format!("Failed to commit batch dequeue: {}", e)))?;
2946
2947 Ok(messages)
2948 }
2949}
2950
2951impl MysqlBroker {
2953 pub async fn enqueue_with_trace_context(
2980 &self,
2981 task: SerializedTask,
2982 trace_ctx: TraceContext,
2983 ) -> Result<TaskId> {
2984 let task_id = task.metadata.id;
2985
2986 let hook_ctx = HookContext {
2988 queue_name: self.queue_name.clone(),
2989 task_id: Some(task_id),
2990 timestamp: Utc::now(),
2991 metadata: json!({}),
2992 };
2993 {
2994 let hooks = self.hooks.read().await;
2995 hooks.run_before_enqueue(&hook_ctx, &task).await?;
2996 }
2997
2998 let mut db_metadata = json!({
2999 "queue": self.queue_name,
3000 "enqueued_at": chrono::Utc::now().to_rfc3339(),
3001 "trace_context": {
3002 "trace_id": trace_ctx.trace_id,
3003 "span_id": trace_ctx.span_id,
3004 "trace_flags": trace_ctx.trace_flags,
3005 "trace_state": trace_ctx.trace_state,
3006 }
3007 });
3008
3009 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
3011 if let Some(obj) = db_metadata.as_object_mut() {
3012 if let Some(meta_obj) = task_meta.as_object() {
3013 for (k, v) in meta_obj {
3014 if k != "trace_context" {
3015 obj.insert(k.clone(), v.clone());
3017 }
3018 }
3019 }
3020 }
3021 }
3022
3023 sqlx::query(
3024 r#"
3025 INSERT INTO celers_tasks
3026 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
3027 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
3028 "#,
3029 )
3030 .bind(task_id)
3031 .bind(&task.metadata.name)
3032 .bind(&task.payload)
3033 .bind(task.metadata.priority)
3034 .bind(task.metadata.max_retries as i32)
3035 .bind(db_metadata)
3036 .execute(&self.pool)
3037 .await
3038 .map_err(|e| CelersError::Other(format!("Failed to enqueue task with trace: {}", e)))?;
3039
3040 #[cfg(feature = "metrics")]
3041 {
3042 TASKS_ENQUEUED_TOTAL.inc();
3043 TASKS_ENQUEUED_BY_TYPE
3044 .with_label_values(&[&task.metadata.name])
3045 .inc();
3046 }
3047
3048 {
3050 let hooks = self.hooks.read().await;
3051 hooks.run_after_enqueue(&hook_ctx, &task).await?;
3052 }
3053
3054 Ok(task_id)
3055 }
3056
3057 pub async fn extract_trace_context(&self, task_id: &TaskId) -> Result<Option<TraceContext>> {
3088 let row = sqlx::query(
3089 r#"
3090 SELECT metadata
3091 FROM celers_tasks
3092 WHERE id = ?
3093 "#,
3094 )
3095 .bind(task_id)
3096 .fetch_optional(&self.pool)
3097 .await
3098 .map_err(|e| CelersError::Other(format!("Failed to fetch task metadata: {}", e)))?;
3099
3100 if let Some(row) = row {
3101 let metadata: serde_json::Value = row.get("metadata");
3102 if let Some(trace_value) = metadata.get("trace_context") {
3103 let trace_ctx: TraceContext =
3104 serde_json::from_value(trace_value.clone()).map_err(|e| {
3105 CelersError::Other(format!("Failed to deserialize trace context: {}", e))
3106 })?;
3107 return Ok(Some(trace_ctx));
3108 }
3109 }
3110 Ok(None)
3111 }
3112
3113 pub async fn enqueue_with_parent_trace(
3138 &self,
3139 parent_task_id: &TaskId,
3140 child_task: SerializedTask,
3141 ) -> Result<TaskId> {
3142 if let Some(parent_ctx) = self.extract_trace_context(parent_task_id).await? {
3143 let child_ctx = parent_ctx.create_child_span();
3145 self.enqueue_with_trace_context(child_task, child_ctx).await
3146 } else {
3147 self.enqueue(child_task).await
3149 }
3150 }
3151
3152 pub async fn add_hook(&self, hook: TaskHook) {
3200 let mut hooks = self.hooks.write().await;
3201 hooks.add(hook);
3202 }
3203
3204 pub async fn clear_hooks(&self) {
3219 let mut hooks = self.hooks.write().await;
3220 hooks.clear();
3221 }
3222}
3223
3224impl MysqlBroker {
3226 pub async fn cancel_batch(&self, task_ids: &[TaskId]) -> Result<u64> {
3256 if task_ids.is_empty() {
3257 return Ok(0);
3258 }
3259
3260 let task_id_strings: Vec<String> = task_ids.iter().map(|id| id.to_string()).collect();
3261 let placeholders = vec!["?"; task_ids.len()].join(", ");
3262
3263 let query = format!(
3264 r#"
3265 UPDATE celers_tasks
3266 SET state = 'cancelled', completed_at = NOW()
3267 WHERE id IN ({}) AND state IN ('pending', 'processing')
3268 "#,
3269 placeholders
3270 );
3271
3272 let mut query_builder = sqlx::query(&query);
3273 for task_id_str in task_id_strings {
3274 query_builder = query_builder.bind(task_id_str);
3275 }
3276
3277 let result = query_builder
3278 .execute(&self.pool)
3279 .await
3280 .map_err(|e| CelersError::Other(format!("Failed to cancel batch: {}", e)))?;
3281
3282 let cancelled = result.rows_affected();
3283 tracing::info!(count = cancelled, "Cancelled tasks in batch");
3284
3285 Ok(cancelled)
3286 }
3287
3288 pub async fn get_worker_statistics(&self, worker_id: &str) -> Result<WorkerStatistics> {
3314 let row = sqlx::query(
3315 r#"
3316 SELECT
3317 worker_id,
3318 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as active_tasks,
3319 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
3320 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
3321 MAX(started_at) as last_seen,
3322 AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as avg_duration
3323 FROM celers_tasks
3324 WHERE worker_id = ?
3325 GROUP BY worker_id
3326 "#,
3327 )
3328 .bind(worker_id)
3329 .fetch_optional(&self.pool)
3330 .await
3331 .map_err(|e| CelersError::Other(format!("Failed to get worker statistics: {}", e)))?;
3332
3333 if let Some(row) = row {
3334 let active: Option<rust_decimal::Decimal> = row.get("active_tasks");
3335 let completed: Option<rust_decimal::Decimal> = row.get("completed_tasks");
3336 let failed: Option<rust_decimal::Decimal> = row.get("failed_tasks");
3337 let last_seen: Option<DateTime<Utc>> = row.get("last_seen");
3338 let avg_duration: Option<rust_decimal::Decimal> = row.get("avg_duration");
3339
3340 Ok(WorkerStatistics {
3341 worker_id: worker_id.to_string(),
3342 active_tasks: active
3343 .map(|d| d.to_string().parse().unwrap_or(0))
3344 .unwrap_or(0),
3345 completed_tasks: completed
3346 .map(|d| d.to_string().parse().unwrap_or(0))
3347 .unwrap_or(0),
3348 failed_tasks: failed
3349 .map(|d| d.to_string().parse().unwrap_or(0))
3350 .unwrap_or(0),
3351 last_seen: last_seen.unwrap_or_else(Utc::now),
3352 avg_task_duration_secs: avg_duration
3353 .and_then(|d| d.to_string().parse::<f64>().ok())
3354 .unwrap_or(0.0),
3355 })
3356 } else {
3357 Ok(WorkerStatistics {
3359 worker_id: worker_id.to_string(),
3360 active_tasks: 0,
3361 completed_tasks: 0,
3362 failed_tasks: 0,
3363 last_seen: Utc::now(),
3364 avg_task_duration_secs: 0.0,
3365 })
3366 }
3367 }
3368
3369 pub async fn count_by_state_quick(&self, state: DbTaskState) -> Result<i64> {
3394 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM celers_tasks WHERE state = ?")
3395 .bind(state.to_string())
3396 .fetch_one(&self.pool)
3397 .await
3398 .map_err(|e| CelersError::Other(format!("Failed to count tasks by state: {}", e)))?;
3399
3400 Ok(count)
3401 }
3402
3403 pub async fn get_task_age_distribution(&self) -> Result<Vec<TaskAgeDistribution>> {
3435 let rows = sqlx::query(
3436 r#"
3437 SELECT
3438 CASE
3439 WHEN TIMESTAMPDIFF(SECOND, created_at, NOW()) < 60 THEN '< 1 min'
3440 WHEN TIMESTAMPDIFF(SECOND, created_at, NOW()) < 300 THEN '1-5 min'
3441 WHEN TIMESTAMPDIFF(SECOND, created_at, NOW()) < 900 THEN '5-15 min'
3442 WHEN TIMESTAMPDIFF(SECOND, created_at, NOW()) < 3600 THEN '15-60 min'
3443 ELSE '> 60 min'
3444 END as bucket,
3445 COUNT(*) as task_count,
3446 MAX(TIMESTAMPDIFF(SECOND, created_at, NOW())) as oldest_age
3447 FROM celers_tasks
3448 WHERE state = 'pending'
3449 GROUP BY bucket
3450 ORDER BY
3451 CASE bucket
3452 WHEN '< 1 min' THEN 1
3453 WHEN '1-5 min' THEN 2
3454 WHEN '5-15 min' THEN 3
3455 WHEN '15-60 min' THEN 4
3456 ELSE 5
3457 END
3458 "#,
3459 )
3460 .fetch_all(&self.pool)
3461 .await
3462 .map_err(|e| CelersError::Other(format!("Failed to get task age distribution: {}", e)))?;
3463
3464 let mut distribution = Vec::with_capacity(rows.len());
3465 for row in rows {
3466 let bucket: String = row.get("bucket");
3467 let task_count: i64 = row.get("task_count");
3468 let oldest_age: Option<i64> = row.get("oldest_age");
3469
3470 distribution.push(TaskAgeDistribution {
3471 bucket_label: bucket,
3472 task_count,
3473 oldest_task_age_secs: oldest_age.unwrap_or(0),
3474 });
3475 }
3476
3477 Ok(distribution)
3478 }
3479
3480 pub async fn get_retry_statistics(&self) -> Result<Vec<RetryStatistics>> {
3507 let rows = sqlx::query(
3508 r#"
3509 SELECT
3510 task_name,
3511 SUM(retry_count) as total_retries,
3512 COUNT(*) as unique_tasks,
3513 AVG(retry_count) as avg_retries,
3514 MAX(retry_count) as max_retries
3515 FROM celers_tasks
3516 WHERE retry_count > 0
3517 GROUP BY task_name
3518 ORDER BY total_retries DESC
3519 "#,
3520 )
3521 .fetch_all(&self.pool)
3522 .await
3523 .map_err(|e| CelersError::Other(format!("Failed to get retry statistics: {}", e)))?;
3524
3525 let mut stats = Vec::with_capacity(rows.len());
3526 for row in rows {
3527 let task_name: String = row.get("task_name");
3528 let total_retries: Option<rust_decimal::Decimal> = row.get("total_retries");
3529 let unique_tasks: i64 = row.get("unique_tasks");
3530 let avg_retries: Option<rust_decimal::Decimal> = row.get("avg_retries");
3531 let max_retries: i32 = row.get("max_retries");
3532
3533 stats.push(RetryStatistics {
3534 task_name,
3535 total_retries: total_retries
3536 .map(|d| d.to_string().parse().unwrap_or(0))
3537 .unwrap_or(0),
3538 unique_tasks,
3539 avg_retries_per_task: avg_retries
3540 .and_then(|d| d.to_string().parse::<f64>().ok())
3541 .unwrap_or(0.0),
3542 max_retries_observed: max_retries,
3543 });
3544 }
3545
3546 Ok(stats)
3547 }
3548
3549 pub async fn list_active_workers(&self) -> Result<Vec<String>> {
3569 let rows = sqlx::query(
3570 r#"
3571 SELECT DISTINCT worker_id
3572 FROM celers_tasks
3573 WHERE worker_id IS NOT NULL AND state = 'processing'
3574 ORDER BY worker_id
3575 "#,
3576 )
3577 .fetch_all(&self.pool)
3578 .await
3579 .map_err(|e| CelersError::Other(format!("Failed to list active workers: {}", e)))?;
3580
3581 let workers: Vec<String> = rows.into_iter().map(|row| row.get("worker_id")).collect();
3582
3583 Ok(workers)
3584 }
3585
3586 pub async fn get_all_worker_statistics(&self) -> Result<Vec<WorkerStatistics>> {
3613 let worker_ids = self.list_active_workers().await?;
3614 let mut all_stats = Vec::with_capacity(worker_ids.len());
3615
3616 for worker_id in worker_ids {
3617 if let Ok(stats) = self.get_worker_statistics(&worker_id).await {
3618 all_stats.push(stats);
3619 }
3620 }
3621
3622 Ok(all_stats)
3623 }
3624
3625 pub async fn get_queue_health(&self) -> Result<QueueHealth> {
3653 let stats = self.get_statistics().await?;
3654 let workers = self.list_active_workers().await?;
3655
3656 let oldest_age: Option<i64> = sqlx::query_scalar(
3658 r#"
3659 SELECT TIMESTAMPDIFF(SECOND, created_at, NOW())
3660 FROM celers_tasks
3661 WHERE state = 'pending'
3662 ORDER BY created_at ASC
3663 LIMIT 1
3664 "#,
3665 )
3666 .fetch_optional(&self.pool)
3667 .await
3668 .map_err(|e| CelersError::Other(format!("Failed to get oldest task: {}", e)))?;
3669
3670 let oldest_age_secs = oldest_age.unwrap_or(0);
3671 let oldest_age_minutes = oldest_age_secs as f64 / 60.0;
3672
3673 let overall_status = if stats.pending < 100 && oldest_age_minutes < 5.0 {
3675 "healthy"
3676 } else if stats.pending < 1000 && oldest_age_minutes < 15.0 {
3677 "degraded"
3678 } else {
3679 "critical"
3680 };
3681
3682 let avg_processing_rate = if !workers.is_empty() {
3684 workers.len() as f64
3685 } else {
3686 1.0
3687 };
3688 let backlog_minutes = if avg_processing_rate > 0.0 {
3689 stats.pending as f64 / avg_processing_rate
3690 } else {
3691 0.0
3692 };
3693
3694 Ok(QueueHealth {
3695 overall_status: overall_status.to_string(),
3696 pending_tasks: stats.pending,
3697 processing_tasks: stats.processing,
3698 oldest_pending_age_secs: oldest_age_secs,
3699 active_workers: workers.len() as i64,
3700 queue_backlog_minutes: backlog_minutes,
3701 })
3702 }
3703
3704 pub async fn get_task_throughput(&self) -> Result<TaskThroughput> {
3727 let row = sqlx::query(
3728 r#"
3729 SELECT
3730 SUM(CASE WHEN state = 'completed' AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 MINUTE) THEN 1 ELSE 0 END) as completed_1min,
3731 SUM(CASE WHEN state = 'completed' AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR) THEN 1 ELSE 0 END) as completed_1hour,
3732 SUM(CASE WHEN state = 'failed' AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 MINUTE) THEN 1 ELSE 0 END) as failed_1min,
3733 SUM(CASE WHEN state = 'failed' AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR) THEN 1 ELSE 0 END) as failed_1hour
3734 FROM celers_tasks
3735 "#,
3736 )
3737 .fetch_one(&self.pool)
3738 .await
3739 .map_err(|e| CelersError::Other(format!("Failed to get throughput: {}", e)))?;
3740
3741 let completed_1min: Option<rust_decimal::Decimal> = row.get("completed_1min");
3742 let completed_1hour: Option<rust_decimal::Decimal> = row.get("completed_1hour");
3743 let failed_1min: Option<rust_decimal::Decimal> = row.get("failed_1min");
3744 let failed_1hour: Option<rust_decimal::Decimal> = row.get("failed_1hour");
3745
3746 let completed_last_minute = completed_1min
3747 .map(|d| d.to_string().parse().unwrap_or(0))
3748 .unwrap_or(0);
3749 let completed_last_hour = completed_1hour
3750 .map(|d| d.to_string().parse().unwrap_or(0))
3751 .unwrap_or(0);
3752
3753 let tasks_per_second = completed_last_minute as f64 / 60.0;
3754
3755 Ok(TaskThroughput {
3756 completed_last_minute,
3757 completed_last_hour,
3758 failed_last_minute: failed_1min
3759 .map(|d| d.to_string().parse().unwrap_or(0))
3760 .unwrap_or(0),
3761 failed_last_hour: failed_1hour
3762 .map(|d| d.to_string().parse().unwrap_or(0))
3763 .unwrap_or(0),
3764 tasks_per_second,
3765 })
3766 }
3767
3768 pub async fn requeue_stuck_tasks_by_worker(&self, worker_id: &str) -> Result<u64> {
3792 let result = sqlx::query(
3793 r#"
3794 UPDATE celers_tasks
3795 SET state = 'pending', worker_id = NULL, started_at = NULL
3796 WHERE worker_id = ? AND state = 'processing'
3797 "#,
3798 )
3799 .bind(worker_id)
3800 .execute(&self.pool)
3801 .await
3802 .map_err(|e| CelersError::Other(format!("Failed to requeue tasks for worker: {}", e)))?;
3803
3804 let requeued = result.rows_affected();
3805 tracing::warn!(worker_id = %worker_id, count = requeued, "Requeued stuck tasks");
3806
3807 Ok(requeued)
3808 }
3809
3810 pub async fn with_transaction<F, T, Fut>(&self, f: F) -> Result<T>
3838 where
3839 F: FnOnce(sqlx::Transaction<'_, sqlx::MySql>) -> Fut,
3840 Fut: std::future::Future<Output = Result<T>>,
3841 {
3842 let tx = self
3843 .pool
3844 .begin()
3845 .await
3846 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
3847
3848 let result = f(tx).await?;
3849
3850 Ok(result)
3851 }
3852
3853 pub async fn query_tasks_by_metadata(
3879 &self,
3880 json_path: &str,
3881 value: &str,
3882 limit: i64,
3883 offset: i64,
3884 ) -> Result<Vec<TaskInfo>> {
3885 let rows = sqlx::query(
3886 r#"
3887 SELECT id, task_name, state, priority, retry_count, max_retries,
3888 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
3889 FROM celers_tasks
3890 WHERE JSON_EXTRACT(metadata, ?) = ?
3891 ORDER BY created_at DESC
3892 LIMIT ? OFFSET ?
3893 "#,
3894 )
3895 .bind(json_path)
3896 .bind(value)
3897 .bind(limit)
3898 .bind(offset)
3899 .fetch_all(&self.pool)
3900 .await
3901 .map_err(|e| CelersError::Other(format!("Failed to query tasks by metadata: {}", e)))?;
3902
3903 let mut tasks = Vec::new();
3904 for row in rows {
3905 let id_str: String = row
3906 .try_get("id")
3907 .map_err(|e| CelersError::Other(format!("Failed to get id: {}", e)))?;
3908 let state_str: String = row
3909 .try_get("state")
3910 .map_err(|e| CelersError::Other(format!("Failed to get state: {}", e)))?;
3911
3912 tasks.push(TaskInfo {
3913 id: Uuid::parse_str(&id_str)
3914 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
3915 task_name: row
3916 .try_get("task_name")
3917 .map_err(|e| CelersError::Other(format!("Failed to get task_name: {}", e)))?,
3918 state: state_str.parse()?,
3919 priority: row
3920 .try_get("priority")
3921 .map_err(|e| CelersError::Other(format!("Failed to get priority: {}", e)))?,
3922 retry_count: row
3923 .try_get("retry_count")
3924 .map_err(|e| CelersError::Other(format!("Failed to get retry_count: {}", e)))?,
3925 max_retries: row
3926 .try_get("max_retries")
3927 .map_err(|e| CelersError::Other(format!("Failed to get max_retries: {}", e)))?,
3928 created_at: row
3929 .try_get("created_at")
3930 .map_err(|e| CelersError::Other(format!("Failed to get created_at: {}", e)))?,
3931 scheduled_at: row.try_get("scheduled_at").map_err(|e| {
3932 CelersError::Other(format!("Failed to get scheduled_at: {}", e))
3933 })?,
3934 started_at: row
3935 .try_get("started_at")
3936 .map_err(|e| CelersError::Other(format!("Failed to get started_at: {}", e)))?,
3937 completed_at: row.try_get("completed_at").map_err(|e| {
3938 CelersError::Other(format!("Failed to get completed_at: {}", e))
3939 })?,
3940 worker_id: row
3941 .try_get("worker_id")
3942 .map_err(|e| CelersError::Other(format!("Failed to get worker_id: {}", e)))?,
3943 error_message: row.try_get("error_message").map_err(|e| {
3944 CelersError::Other(format!("Failed to get error_message: {}", e))
3945 })?,
3946 });
3947 }
3948
3949 Ok(tasks)
3950 }
3951
3952 pub async fn enqueue_deduplicated(
3981 &self,
3982 task: SerializedTask,
3983 dedup_key: &str,
3984 ) -> Result<TaskId> {
3985 let existing = sqlx::query(
3987 r#"
3988 SELECT id FROM celers_tasks
3989 WHERE JSON_EXTRACT(metadata, '$.dedup_key') = ?
3990 AND state IN ('pending', 'processing')
3991 LIMIT 1
3992 "#,
3993 )
3994 .bind(dedup_key)
3995 .fetch_optional(&self.pool)
3996 .await
3997 .map_err(|e| CelersError::Other(format!("Failed to check for duplicate task: {}", e)))?;
3998
3999 if let Some(row) = existing {
4000 let id_str: String = row
4001 .try_get("id")
4002 .map_err(|e| CelersError::Other(format!("Failed to get id: {}", e)))?;
4003 let task_id = Uuid::parse_str(&id_str)
4004 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
4005 tracing::info!(task_id = %task_id, dedup_key = %dedup_key, "Task already exists, skipping");
4006 return Ok(task_id);
4007 }
4008
4009 let task_id = task.metadata.id;
4011 let mut db_metadata = json!({
4012 "queue": self.queue_name,
4013 "enqueued_at": chrono::Utc::now().to_rfc3339(),
4014 "dedup_key": dedup_key,
4015 });
4016
4017 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
4019 if let Some(obj) = db_metadata.as_object_mut() {
4020 if let Some(meta_obj) = task_meta.as_object() {
4021 for (k, v) in meta_obj {
4022 obj.insert(k.clone(), v.clone());
4023 }
4024 }
4025 }
4026 }
4027
4028 sqlx::query(
4029 r#"
4030 INSERT INTO celers_tasks
4031 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
4032 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
4033 "#,
4034 )
4035 .bind(task_id.to_string())
4036 .bind(&task.metadata.name)
4037 .bind(&task.payload)
4038 .bind(task.metadata.priority)
4039 .bind(task.metadata.max_retries as i32)
4040 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
4041 .execute(&self.pool)
4042 .await
4043 .map_err(|e| CelersError::Other(format!("Failed to enqueue deduplicated task: {}", e)))?;
4044
4045 tracing::info!(task_id = %task_id, dedup_key = %dedup_key, "Enqueued new deduplicated task");
4046
4047 #[cfg(feature = "metrics")]
4048 {
4049 TASKS_ENQUEUED_TOTAL.inc();
4050 TASKS_ENQUEUED_BY_TYPE
4051 .with_label_values(&[&task.metadata.name])
4052 .inc();
4053 }
4054
4055 Ok(task_id)
4056 }
4057
4058 pub async fn update_batch_state(
4084 &self,
4085 task_ids: &[TaskId],
4086 new_state: DbTaskState,
4087 ) -> Result<u64> {
4088 if task_ids.is_empty() {
4089 return Ok(0);
4090 }
4091
4092 let task_id_strings: Vec<String> = task_ids.iter().map(|id| id.to_string()).collect();
4093 let placeholders = vec!["?"; task_ids.len()].join(", ");
4094
4095 let completed_at_clause = match new_state {
4096 DbTaskState::Completed | DbTaskState::Failed | DbTaskState::Cancelled => {
4097 ", completed_at = NOW()"
4098 }
4099 _ => "",
4100 };
4101
4102 let query = format!(
4103 r#"
4104 UPDATE celers_tasks
4105 SET state = ?{}
4106 WHERE id IN ({})
4107 "#,
4108 completed_at_clause, placeholders
4109 );
4110
4111 let mut query_builder = sqlx::query(&query);
4112 query_builder = query_builder.bind(new_state.to_string());
4113 for task_id_str in task_id_strings {
4114 query_builder = query_builder.bind(task_id_str);
4115 }
4116
4117 let result = query_builder
4118 .execute(&self.pool)
4119 .await
4120 .map_err(|e| CelersError::Other(format!("Failed to update batch state: {}", e)))?;
4121
4122 let updated = result.rows_affected();
4123 tracing::info!(count = updated, state = %new_state, "Updated task states in batch");
4124
4125 Ok(updated)
4126 }
4127
4128 pub async fn has_capacity(&self, max_size: i64) -> Result<bool> {
4157 let current_size = self.count_by_state_quick(DbTaskState::Pending).await?;
4158 Ok(current_size < max_size)
4159 }
4160
4161 pub async fn enqueue_with_capacity(
4189 &self,
4190 task: SerializedTask,
4191 max_size: i64,
4192 ) -> Result<TaskId> {
4193 if !self.has_capacity(max_size).await? {
4194 return Err(CelersError::Other(
4195 "Queue is at capacity, cannot enqueue".to_string(),
4196 ));
4197 }
4198
4199 self.enqueue(task).await
4200 }
4201
4202 pub async fn expire_pending_tasks(&self, ttl: Duration) -> Result<u64> {
4228 let ttl_seconds = ttl.as_secs() as i64;
4229
4230 let result = sqlx::query(
4231 r#"
4232 UPDATE celers_tasks
4233 SET state = 'cancelled',
4234 completed_at = NOW(),
4235 error_message = CONCAT('Task expired after ', ?, ' seconds')
4236 WHERE state = 'pending'
4237 AND TIMESTAMPDIFF(SECOND, created_at, NOW()) > ?
4238 "#,
4239 )
4240 .bind(ttl_seconds)
4241 .bind(ttl_seconds)
4242 .execute(&self.pool)
4243 .await
4244 .map_err(|e| CelersError::Other(format!("Failed to expire pending tasks: {}", e)))?;
4245
4246 let expired = result.rows_affected();
4247 if expired > 0 {
4248 tracing::warn!(
4249 count = expired,
4250 ttl_seconds = ttl_seconds,
4251 "Expired pending tasks"
4252 );
4253 }
4254
4255 Ok(expired)
4256 }
4257
4258 #[allow(clippy::too_many_arguments)]
4286 pub async fn delete_tasks_by_criteria(
4287 &self,
4288 state: Option<DbTaskState>,
4289 older_than: Duration,
4290 ) -> Result<u64> {
4291 let seconds_ago = older_than.as_secs() as i64;
4292
4293 let query = if let Some(state) = state {
4294 sqlx::query(
4295 r#"
4296 DELETE FROM celers_tasks
4297 WHERE state = ?
4298 AND TIMESTAMPDIFF(SECOND, created_at, NOW()) > ?
4299 "#,
4300 )
4301 .bind(state.to_string())
4302 .bind(seconds_ago)
4303 } else {
4304 sqlx::query(
4305 r#"
4306 DELETE FROM celers_tasks
4307 WHERE TIMESTAMPDIFF(SECOND, created_at, NOW()) > ?
4308 "#,
4309 )
4310 .bind(seconds_ago)
4311 };
4312
4313 let result = query
4314 .execute(&self.pool)
4315 .await
4316 .map_err(|e| CelersError::Other(format!("Failed to delete tasks: {}", e)))?;
4317
4318 let deleted = result.rows_affected();
4319 tracing::info!(count = deleted, "Deleted tasks by criteria");
4320
4321 Ok(deleted)
4322 }
4323
4324 pub async fn update_task_metadata(
4353 &self,
4354 task_id: &TaskId,
4355 json_path: &str,
4356 value: &str,
4357 ) -> Result<bool> {
4358 let result = sqlx::query(
4359 r#"
4360 UPDATE celers_tasks
4361 SET metadata = JSON_SET(metadata, ?, ?)
4362 WHERE id = ?
4363 "#,
4364 )
4365 .bind(json_path)
4366 .bind(value)
4367 .bind(task_id.to_string())
4368 .execute(&self.pool)
4369 .await
4370 .map_err(|e| CelersError::Other(format!("Failed to update task metadata: {}", e)))?;
4371
4372 Ok(result.rows_affected() > 0)
4373 }
4374
4375 #[allow(clippy::too_many_arguments)]
4412 pub async fn search_tasks_by_date_range(
4413 &self,
4414 from: DateTime<Utc>,
4415 to: DateTime<Utc>,
4416 state: Option<DbTaskState>,
4417 limit: i64,
4418 offset: i64,
4419 ) -> Result<Vec<TaskInfo>> {
4420 let query = if let Some(state) = state {
4421 sqlx::query(
4422 r#"
4423 SELECT id, task_name, state, priority, retry_count, max_retries,
4424 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
4425 FROM celers_tasks
4426 WHERE created_at >= ? AND created_at <= ?
4427 AND state = ?
4428 ORDER BY created_at DESC
4429 LIMIT ? OFFSET ?
4430 "#,
4431 )
4432 .bind(from)
4433 .bind(to)
4434 .bind(state.to_string())
4435 .bind(limit)
4436 .bind(offset)
4437 } else {
4438 sqlx::query(
4439 r#"
4440 SELECT id, task_name, state, priority, retry_count, max_retries,
4441 created_at, scheduled_at, started_at, completed_at, worker_id, error_message
4442 FROM celers_tasks
4443 WHERE created_at >= ? AND created_at <= ?
4444 ORDER BY created_at DESC
4445 LIMIT ? OFFSET ?
4446 "#,
4447 )
4448 .bind(from)
4449 .bind(to)
4450 .bind(limit)
4451 .bind(offset)
4452 };
4453
4454 let rows = query
4455 .fetch_all(&self.pool)
4456 .await
4457 .map_err(|e| CelersError::Other(format!("Failed to search tasks by date: {}", e)))?;
4458
4459 let mut tasks = Vec::new();
4460 for row in rows {
4461 let id_str: String = row
4462 .try_get("id")
4463 .map_err(|e| CelersError::Other(format!("Failed to get id: {}", e)))?;
4464 let state_str: String = row
4465 .try_get("state")
4466 .map_err(|e| CelersError::Other(format!("Failed to get state: {}", e)))?;
4467
4468 tasks.push(TaskInfo {
4469 id: Uuid::parse_str(&id_str)
4470 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
4471 task_name: row
4472 .try_get("task_name")
4473 .map_err(|e| CelersError::Other(format!("Failed to get task_name: {}", e)))?,
4474 state: state_str.parse()?,
4475 priority: row
4476 .try_get("priority")
4477 .map_err(|e| CelersError::Other(format!("Failed to get priority: {}", e)))?,
4478 retry_count: row
4479 .try_get("retry_count")
4480 .map_err(|e| CelersError::Other(format!("Failed to get retry_count: {}", e)))?,
4481 max_retries: row
4482 .try_get("max_retries")
4483 .map_err(|e| CelersError::Other(format!("Failed to get max_retries: {}", e)))?,
4484 created_at: row
4485 .try_get("created_at")
4486 .map_err(|e| CelersError::Other(format!("Failed to get created_at: {}", e)))?,
4487 scheduled_at: row.try_get("scheduled_at").map_err(|e| {
4488 CelersError::Other(format!("Failed to get scheduled_at: {}", e))
4489 })?,
4490 started_at: row
4491 .try_get("started_at")
4492 .map_err(|e| CelersError::Other(format!("Failed to get started_at: {}", e)))?,
4493 completed_at: row.try_get("completed_at").map_err(|e| {
4494 CelersError::Other(format!("Failed to get completed_at: {}", e))
4495 })?,
4496 worker_id: row
4497 .try_get("worker_id")
4498 .map_err(|e| CelersError::Other(format!("Failed to get worker_id: {}", e)))?,
4499 error_message: row.try_get("error_message").map_err(|e| {
4500 CelersError::Other(format!("Failed to get error_message: {}", e))
4501 })?,
4502 });
4503 }
4504
4505 Ok(tasks)
4506 }
4507
4508 pub async fn get_dlq_statistics(&self) -> Result<DlqStatistics> {
4528 let total_row = sqlx::query(
4530 r#"
4531 SELECT COUNT(*) as total
4532 FROM celers_dead_letter_queue
4533 "#,
4534 )
4535 .fetch_one(&self.pool)
4536 .await
4537 .map_err(|e| CelersError::Other(format!("Failed to get DLQ total: {}", e)))?;
4538
4539 let total_tasks: i64 = total_row
4540 .try_get("total")
4541 .map_err(|e| CelersError::Other(format!("Failed to get total: {}", e)))?;
4542
4543 let rows = sqlx::query(
4545 r#"
4546 SELECT task_name,
4547 COUNT(*) as count,
4548 AVG(retry_count) as avg_retries,
4549 MAX(retry_count) as max_retries
4550 FROM celers_dead_letter_queue
4551 GROUP BY task_name
4552 ORDER BY count DESC
4553 "#,
4554 )
4555 .fetch_all(&self.pool)
4556 .await
4557 .map_err(|e| CelersError::Other(format!("Failed to get DLQ stats: {}", e)))?;
4558
4559 let mut by_task_name = Vec::new();
4560 for row in rows {
4561 let count: i64 = row
4562 .try_get("count")
4563 .map_err(|e| CelersError::Other(format!("Failed to get count: {}", e)))?;
4564 let avg_retries: Option<rust_decimal::Decimal> = row
4565 .try_get("avg_retries")
4566 .map_err(|e| CelersError::Other(format!("Failed to get avg_retries: {}", e)))?;
4567 let max_retries: i32 = row
4568 .try_get("max_retries")
4569 .map_err(|e| CelersError::Other(format!("Failed to get max_retries: {}", e)))?;
4570
4571 by_task_name.push(DlqTaskStats {
4572 task_name: row
4573 .try_get("task_name")
4574 .map_err(|e| CelersError::Other(format!("Failed to get task_name: {}", e)))?,
4575 count,
4576 avg_retries: avg_retries.map(|d| d.to_string().parse::<f64>().unwrap_or(0.0)),
4577 max_retries,
4578 });
4579 }
4580
4581 Ok(DlqStatistics {
4582 total_tasks,
4583 by_task_name,
4584 })
4585 }
4586
4587 pub async fn recover_timed_out_tasks(&self, timeout: Duration) -> Result<u64> {
4613 let timeout_seconds = timeout.as_secs() as i64;
4614
4615 let result = sqlx::query(
4616 r#"
4617 UPDATE celers_tasks
4618 SET state = 'pending',
4619 worker_id = NULL,
4620 started_at = NULL,
4621 error_message = CONCAT(
4622 COALESCE(error_message, ''),
4623 IF(error_message IS NOT NULL, '; ', ''),
4624 'Task timed out after ', ?, ' seconds and was requeued'
4625 )
4626 WHERE state = 'processing'
4627 AND started_at IS NOT NULL
4628 AND TIMESTAMPDIFF(SECOND, started_at, NOW()) > ?
4629 "#,
4630 )
4631 .bind(timeout_seconds)
4632 .bind(timeout_seconds)
4633 .execute(&self.pool)
4634 .await
4635 .map_err(|e| CelersError::Other(format!("Failed to recover timed-out tasks: {}", e)))?;
4636
4637 let recovered = result.rows_affected();
4638 if recovered > 0 {
4639 tracing::warn!(
4640 count = recovered,
4641 timeout_seconds = timeout_seconds,
4642 "Recovered timed-out tasks"
4643 );
4644 }
4645
4646 Ok(recovered)
4647 }
4648}
4649
4650#[async_trait]
4651impl Broker for MysqlBroker {
4652 async fn enqueue(&self, task: SerializedTask) -> Result<TaskId> {
4653 let task_id = task.metadata.id;
4654 let mut db_metadata = json!({
4655 "queue": self.queue_name,
4656 "enqueued_at": chrono::Utc::now().to_rfc3339(),
4657 });
4658
4659 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
4661 if let Some(obj) = db_metadata.as_object_mut() {
4662 if let Some(meta_obj) = task_meta.as_object() {
4663 for (k, v) in meta_obj {
4664 obj.insert(k.clone(), v.clone());
4665 }
4666 }
4667 }
4668 }
4669
4670 sqlx::query(
4671 r#"
4672 INSERT INTO celers_tasks
4673 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
4674 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
4675 "#,
4676 )
4677 .bind(task_id.to_string())
4678 .bind(&task.metadata.name)
4679 .bind(&task.payload)
4680 .bind(task.metadata.priority)
4681 .bind(task.metadata.max_retries as i32)
4682 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
4683 .execute(&self.pool)
4684 .await
4685 .map_err(|e| CelersError::Other(format!("Failed to enqueue task: {}", e)))?;
4686
4687 #[cfg(feature = "metrics")]
4688 {
4689 TASKS_ENQUEUED_TOTAL.inc();
4690 TASKS_ENQUEUED_BY_TYPE
4691 .with_label_values(&[&task.metadata.name])
4692 .inc();
4693 }
4694
4695 Ok(task_id)
4696 }
4697
4698 async fn dequeue(&self) -> Result<Option<BrokerMessage>> {
4699 if self.paused.load(Ordering::SeqCst) {
4701 return Ok(None);
4702 }
4703
4704 let mut tx = self
4707 .pool
4708 .begin()
4709 .await
4710 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
4711
4712 let row = sqlx::query(
4713 r#"
4714 SELECT id, task_name, payload, retry_count
4715 FROM celers_tasks
4716 WHERE state = 'pending'
4717 AND scheduled_at <= NOW()
4718 ORDER BY priority DESC, created_at ASC
4719 FOR UPDATE SKIP LOCKED
4720 LIMIT 1
4721 "#,
4722 )
4723 .fetch_optional(&mut *tx)
4724 .await
4725 .map_err(|e| CelersError::Other(format!("Failed to dequeue task: {}", e)))?;
4726
4727 if let Some(row) = row {
4728 let task_id_str: String = row.get("id");
4729 let _task_id = Uuid::parse_str(&task_id_str)
4730 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
4731 let task_name: String = row.get("task_name");
4732 let payload: Vec<u8> = row.get("payload");
4733 let retry_count: i32 = row.get("retry_count");
4734
4735 sqlx::query(
4737 r#"
4738 UPDATE celers_tasks
4739 SET state = 'processing',
4740 started_at = NOW(),
4741 retry_count = retry_count + 1
4742 WHERE id = ?
4743 "#,
4744 )
4745 .bind(&task_id_str)
4746 .execute(&mut *tx)
4747 .await
4748 .map_err(|e| CelersError::Other(format!("Failed to mark task as processing: {}", e)))?;
4749
4750 tx.commit()
4751 .await
4752 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
4753
4754 Ok(Some(BrokerMessage {
4755 task: SerializedTask::new(task_name, payload),
4756 receipt_handle: Some(retry_count.to_string()),
4757 }))
4758 } else {
4759 tx.rollback().await.map_err(|e| {
4760 CelersError::Other(format!("Failed to rollback transaction: {}", e))
4761 })?;
4762 Ok(None)
4763 }
4764 }
4765
4766 async fn ack(&self, task_id: &TaskId, _receipt_handle: Option<&str>) -> Result<()> {
4767 sqlx::query(
4768 r#"
4769 UPDATE celers_tasks
4770 SET state = 'completed',
4771 completed_at = NOW()
4772 WHERE id = ?
4773 "#,
4774 )
4775 .bind(task_id.to_string())
4776 .execute(&self.pool)
4777 .await
4778 .map_err(|e| CelersError::Other(format!("Failed to ack task: {}", e)))?;
4779
4780 Ok(())
4784 }
4785
4786 async fn reject(
4787 &self,
4788 task_id: &TaskId,
4789 _receipt_handle: Option<&str>,
4790 requeue: bool,
4791 ) -> Result<()> {
4792 if requeue {
4793 let row = sqlx::query(
4795 r#"
4796 SELECT retry_count, max_retries
4797 FROM celers_tasks
4798 WHERE id = ?
4799 "#,
4800 )
4801 .bind(task_id.to_string())
4802 .fetch_one(&self.pool)
4803 .await
4804 .map_err(|e| CelersError::Other(format!("Failed to fetch task: {}", e)))?;
4805
4806 let retry_count: i32 = row.get("retry_count");
4807 let max_retries: i32 = row.get("max_retries");
4808
4809 if retry_count >= max_retries {
4810 self.move_to_dlq(task_id).await?;
4812 } else {
4813 let backoff_seconds = 2_i64.pow(retry_count as u32).min(3600); sqlx::query(
4817 r#"
4818 UPDATE celers_tasks
4819 SET state = 'pending',
4820 scheduled_at = DATE_ADD(NOW(), INTERVAL ? SECOND),
4821 started_at = NULL,
4822 worker_id = NULL
4823 WHERE id = ?
4824 "#,
4825 )
4826 .bind(backoff_seconds)
4827 .bind(task_id.to_string())
4828 .execute(&self.pool)
4829 .await
4830 .map_err(|e| CelersError::Other(format!("Failed to requeue task: {}", e)))?;
4831 }
4832 } else {
4833 sqlx::query(
4835 r#"
4836 UPDATE celers_tasks
4837 SET state = 'failed',
4838 completed_at = NOW()
4839 WHERE id = ?
4840 "#,
4841 )
4842 .bind(task_id.to_string())
4843 .execute(&self.pool)
4844 .await
4845 .map_err(|e| CelersError::Other(format!("Failed to mark task as failed: {}", e)))?;
4846 }
4847
4848 Ok(())
4849 }
4850
4851 async fn queue_size(&self) -> Result<usize> {
4852 let row = sqlx::query(
4853 r#"
4854 SELECT COUNT(*) as count
4855 FROM celers_tasks
4856 WHERE state = 'pending'
4857 "#,
4858 )
4859 .fetch_one(&self.pool)
4860 .await
4861 .map_err(|e| CelersError::Other(format!("Failed to get queue size: {}", e)))?;
4862
4863 let count: i64 = row.get("count");
4864 Ok(count as usize)
4865 }
4866
4867 async fn cancel(&self, task_id: &TaskId) -> Result<bool> {
4868 let result = sqlx::query(
4869 r#"
4870 UPDATE celers_tasks
4871 SET state = 'cancelled',
4872 completed_at = NOW()
4873 WHERE id = ? AND state IN ('pending', 'processing')
4874 "#,
4875 )
4876 .bind(task_id.to_string())
4877 .execute(&self.pool)
4878 .await
4879 .map_err(|e| CelersError::Other(format!("Failed to cancel task: {}", e)))?;
4880
4881 Ok(result.rows_affected() > 0)
4882 }
4883
4884 async fn enqueue_at(&self, task: SerializedTask, execute_at: i64) -> Result<TaskId> {
4886 let task_id = task.metadata.id;
4887 let mut db_metadata = json!({
4888 "queue": self.queue_name,
4889 "enqueued_at": chrono::Utc::now().to_rfc3339(),
4890 "scheduled_for": execute_at,
4891 });
4892
4893 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
4895 if let Some(obj) = db_metadata.as_object_mut() {
4896 if let Some(meta_obj) = task_meta.as_object() {
4897 for (k, v) in meta_obj {
4898 obj.insert(k.clone(), v.clone());
4899 }
4900 }
4901 }
4902 }
4903
4904 let scheduled_at = chrono::DateTime::from_timestamp(execute_at, 0)
4906 .ok_or_else(|| CelersError::Other("Invalid timestamp".to_string()))?
4907 .format("%Y-%m-%d %H:%M:%S")
4908 .to_string();
4909
4910 sqlx::query(
4911 r#"
4912 INSERT INTO celers_tasks
4913 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
4914 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), ?)
4915 "#,
4916 )
4917 .bind(task_id.to_string())
4918 .bind(&task.metadata.name)
4919 .bind(&task.payload)
4920 .bind(task.metadata.priority)
4921 .bind(task.metadata.max_retries as i32)
4922 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
4923 .bind(scheduled_at)
4924 .execute(&self.pool)
4925 .await
4926 .map_err(|e| CelersError::Other(format!("Failed to enqueue delayed task: {}", e)))?;
4927
4928 #[cfg(feature = "metrics")]
4929 {
4930 TASKS_ENQUEUED_TOTAL.inc();
4931 TASKS_ENQUEUED_BY_TYPE
4932 .with_label_values(&[&task.metadata.name])
4933 .inc();
4934 }
4935
4936 Ok(task_id)
4937 }
4938
4939 async fn enqueue_after(&self, task: SerializedTask, delay_secs: u64) -> Result<TaskId> {
4941 let task_id = task.metadata.id;
4942 let mut db_metadata = json!({
4943 "queue": self.queue_name,
4944 "enqueued_at": chrono::Utc::now().to_rfc3339(),
4945 "delay_seconds": delay_secs,
4946 });
4947
4948 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
4950 if let Some(obj) = db_metadata.as_object_mut() {
4951 if let Some(meta_obj) = task_meta.as_object() {
4952 for (k, v) in meta_obj {
4953 obj.insert(k.clone(), v.clone());
4954 }
4955 }
4956 }
4957 }
4958
4959 sqlx::query(
4960 r#"
4961 INSERT INTO celers_tasks
4962 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
4963 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), DATE_ADD(NOW(), INTERVAL ? SECOND))
4964 "#,
4965 )
4966 .bind(task_id.to_string())
4967 .bind(&task.metadata.name)
4968 .bind(&task.payload)
4969 .bind(task.metadata.priority)
4970 .bind(task.metadata.max_retries as i32)
4971 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
4972 .bind(delay_secs as i64)
4973 .execute(&self.pool)
4974 .await
4975 .map_err(|e| CelersError::Other(format!("Failed to enqueue delayed task: {}", e)))?;
4976
4977 #[cfg(feature = "metrics")]
4978 {
4979 TASKS_ENQUEUED_TOTAL.inc();
4980 TASKS_ENQUEUED_BY_TYPE
4981 .with_label_values(&[&task.metadata.name])
4982 .inc();
4983 }
4984
4985 Ok(task_id)
4986 }
4987
4988 async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
4992 self.enqueue_batch_impl(tasks).await
4993 }
4994
4995 async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>> {
4997 self.dequeue_batch_impl(count).await
4998 }
4999
5000 async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()> {
5002 if tasks.is_empty() {
5003 return Ok(());
5004 }
5005
5006 let task_ids: Vec<String> = tasks.iter().map(|(id, _)| id.to_string()).collect();
5007
5008 let placeholders = task_ids.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
5009 let query_str = format!(
5010 r#"
5011 UPDATE celers_tasks
5012 SET state = 'completed',
5013 completed_at = NOW()
5014 WHERE id IN ({})
5015 "#,
5016 placeholders
5017 );
5018
5019 let mut query = sqlx::query(&query_str);
5020 for task_id in task_ids {
5021 query = query.bind(task_id);
5022 }
5023
5024 query
5025 .execute(&self.pool)
5026 .await
5027 .map_err(|e| CelersError::Other(format!("Failed to batch ack tasks: {}", e)))?;
5028
5029 Ok(())
5030 }
5031}
5032
5033impl MysqlBroker {
5034 pub async fn enqueue_chain(&self, chain: TaskChain) -> Result<Vec<TaskId>> {
5058 if chain.tasks().is_empty() {
5059 return Ok(Vec::new());
5060 }
5061
5062 let mut task_ids = Vec::with_capacity(chain.tasks().len());
5063 let base_time = std::time::SystemTime::now()
5064 .duration_since(std::time::UNIX_EPOCH)
5065 .map_err(|e| CelersError::Other(format!("Failed to get system time: {}", e)))?
5066 .as_secs() as i64;
5067
5068 for (idx, task) in chain.tasks().iter().enumerate() {
5069 let execute_at = if idx == 0 {
5070 base_time
5072 } else {
5073 let delay = chain.delay_between_secs().unwrap_or(0) * idx as u64;
5075 base_time + delay as i64
5076 };
5077
5078 let task_id = self.enqueue_at(task.clone(), execute_at).await?;
5079 task_ids.push(task_id);
5080 }
5081
5082 tracing::info!(
5083 chain_length = chain.tasks().len(),
5084 delay_secs = chain.delay_between_secs().unwrap_or(0),
5085 "Enqueued task chain"
5086 );
5087
5088 Ok(task_ids)
5089 }
5090
5091 pub async fn reject_batch(&self, tasks: &[(TaskId, Option<String>, bool)]) -> Result<u64> {
5101 if tasks.is_empty() {
5102 return Ok(0);
5103 }
5104
5105 let mut tx = self
5106 .pool
5107 .begin()
5108 .await
5109 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
5110
5111 let mut rejected_count = 0u64;
5112
5113 for (task_id, _receipt_handle, requeue) in tasks {
5114 if *requeue {
5115 let row = sqlx::query(
5117 r#"
5118 SELECT retry_count, max_retries
5119 FROM celers_tasks
5120 WHERE id = ?
5121 "#,
5122 )
5123 .bind(task_id.to_string())
5124 .fetch_optional(&mut *tx)
5125 .await
5126 .map_err(|e| CelersError::Other(format!("Failed to fetch task: {}", e)))?;
5127
5128 if let Some(row) = row {
5129 let retry_count: i32 = row.get("retry_count");
5130 let max_retries: i32 = row.get("max_retries");
5131
5132 if retry_count >= max_retries {
5133 sqlx::query("CALL move_to_dlq(?)")
5135 .bind(task_id.to_string())
5136 .execute(&mut *tx)
5137 .await
5138 .map_err(|e| {
5139 CelersError::Other(format!("Failed to move task to DLQ: {}", e))
5140 })?;
5141 } else {
5142 let backoff_seconds = 2_i64.pow(retry_count as u32).min(3600); sqlx::query(
5146 r#"
5147 UPDATE celers_tasks
5148 SET state = 'pending',
5149 scheduled_at = DATE_ADD(NOW(), INTERVAL ? SECOND),
5150 started_at = NULL,
5151 worker_id = NULL
5152 WHERE id = ?
5153 "#,
5154 )
5155 .bind(backoff_seconds)
5156 .bind(task_id.to_string())
5157 .execute(&mut *tx)
5158 .await
5159 .map_err(|e| {
5160 CelersError::Other(format!("Failed to requeue task: {}", e))
5161 })?;
5162 }
5163 rejected_count += 1;
5164 }
5165 } else {
5166 let result = sqlx::query(
5168 r#"
5169 UPDATE celers_tasks
5170 SET state = 'failed',
5171 completed_at = NOW()
5172 WHERE id = ?
5173 "#,
5174 )
5175 .bind(task_id.to_string())
5176 .execute(&mut *tx)
5177 .await
5178 .map_err(|e| CelersError::Other(format!("Failed to mark task as failed: {}", e)))?;
5179
5180 rejected_count += result.rows_affected();
5181 }
5182 }
5183
5184 tx.commit()
5185 .await
5186 .map_err(|e| CelersError::Other(format!("Failed to commit batch reject: {}", e)))?;
5187
5188 Ok(rejected_count)
5189 }
5190}
5191
5192impl MysqlBroker {
5193 #[cfg(feature = "metrics")]
5198 pub async fn update_metrics(&self) -> Result<()> {
5199 let pending_count: i64 =
5201 sqlx::query_scalar("SELECT COUNT(*) FROM celers_tasks WHERE state = 'pending'")
5202 .fetch_one(&self.pool)
5203 .await
5204 .map_err(|e| CelersError::Other(format!("Failed to get pending count: {}", e)))?;
5205
5206 let processing_count: i64 =
5208 sqlx::query_scalar("SELECT COUNT(*) FROM celers_tasks WHERE state = 'processing'")
5209 .fetch_one(&self.pool)
5210 .await
5211 .map_err(|e| {
5212 CelersError::Other(format!("Failed to get processing count: {}", e))
5213 })?;
5214
5215 let dlq_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM celers_dead_letter_queue")
5217 .fetch_one(&self.pool)
5218 .await
5219 .map_err(|e| CelersError::Other(format!("Failed to get DLQ count: {}", e)))?;
5220
5221 QUEUE_SIZE.set(pending_count as f64);
5223 PROCESSING_QUEUE_SIZE.set(processing_count as f64);
5224 DLQ_SIZE.set(dlq_count as f64);
5225
5226 Ok(())
5227 }
5228
5229 pub async fn apply_dlq_retention(&self, retention_period: Duration) -> Result<u64> {
5254 let retention_seconds = retention_period.as_secs() as i64;
5255
5256 if retention_seconds < 3600 {
5258 return Err(CelersError::Other(
5259 "DLQ retention period must be at least 1 hour to prevent accidental deletion"
5260 .to_string(),
5261 ));
5262 }
5263 if retention_seconds < 86400 {
5264 tracing::warn!(
5265 retention_hours = retention_seconds / 3600,
5266 "DLQ retention period is less than 24 hours"
5267 );
5268 }
5269
5270 let result = sqlx::query(
5271 r#"
5272 DELETE FROM celers_dead_letter_queue
5273 WHERE TIMESTAMPDIFF(SECOND, failed_at, NOW()) > ?
5274 "#,
5275 )
5276 .bind(retention_seconds)
5277 .execute(&self.pool)
5278 .await
5279 .map_err(|e| CelersError::Other(format!("Failed to apply DLQ retention: {}", e)))?;
5280
5281 let deleted = result.rows_affected();
5282 if deleted > 0 {
5283 tracing::info!(
5284 count = deleted,
5285 retention_days = retention_seconds / 86400,
5286 "Applied DLQ retention policy"
5287 );
5288 }
5289
5290 Ok(deleted)
5291 }
5292
5293 pub async fn get_optimal_batch_size(&self, max_batch_size: Option<i64>) -> Result<i64> {
5320 if let Some(max) = max_batch_size {
5322 if max <= 0 {
5323 return Err(CelersError::Other(
5324 "max_batch_size must be positive".to_string(),
5325 ));
5326 }
5327 if max > 10000 {
5328 tracing::warn!(
5329 max_batch_size = max,
5330 "Very large max_batch_size may impact performance"
5331 );
5332 }
5333 }
5334
5335 let max_size = max_batch_size.unwrap_or(200);
5336
5337 let pending: i64 = sqlx::query_scalar(
5339 "SELECT COUNT(*) FROM celers_tasks WHERE state = 'pending' AND scheduled_at <= NOW()",
5340 )
5341 .fetch_one(&self.pool)
5342 .await
5343 .map_err(|e| CelersError::Other(format!("Failed to get pending count: {}", e)))?;
5344
5345 let optimal_size = if pending < 10 {
5347 std::cmp::min(pending.max(1), 5)
5349 } else if pending < 100 {
5350 std::cmp::min(pending / 2, 50)
5352 } else {
5353 std::cmp::min(pending / 4, max_size)
5355 };
5356
5357 Ok(optimal_size.max(1))
5358 }
5359
5360 pub async fn get_pool_health(&self) -> Result<ConnectionDiagnostics> {
5385 let total = self.pool.size();
5386 let idle = self.pool.num_idle() as u32;
5387 let active = total - idle;
5388 let max = self.pool.options().get_max_connections();
5389
5390 let utilization = if max > 0 {
5391 (total as f64 / max as f64) * 100.0
5392 } else {
5393 0.0
5394 };
5395
5396 Ok(ConnectionDiagnostics {
5397 total_connections: total,
5398 idle_connections: idle,
5399 active_connections: active,
5400 max_connections: max,
5401 connection_wait_time_ms: None, pool_utilization_percent: utilization,
5403 })
5404 }
5405
5406 #[allow(dead_code)]
5420 fn compress_payload(payload: &[u8]) -> Result<Vec<u8>> {
5421 use std::io::Write;
5422
5423 if payload.len() < 1024 {
5425 return Ok(payload.to_vec());
5426 }
5427
5428 let mut encoder =
5429 flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::fast());
5430 encoder
5431 .write_all(payload)
5432 .map_err(|e| CelersError::Other(format!("Compression failed: {}", e)))?;
5433
5434 encoder
5435 .finish()
5436 .map_err(|e| CelersError::Other(format!("Compression finalization failed: {}", e)))
5437 }
5438
5439 #[allow(dead_code)]
5447 fn decompress_payload(compressed: &[u8]) -> Result<Vec<u8>> {
5448 use std::io::Read;
5449
5450 let mut decoder = flate2::read::DeflateDecoder::new(compressed);
5451 let mut decompressed = Vec::new();
5452
5453 decoder
5454 .read_to_end(&mut decompressed)
5455 .map_err(|e| CelersError::Other(format!("Decompression failed: {}", e)))?;
5456
5457 Ok(decompressed)
5458 }
5459
5460 pub async fn vacuum_analyze(&self) -> Result<u64> {
5486 let tables = vec![
5487 "celers_tasks",
5488 "celers_dead_letter_queue",
5489 "celers_task_history",
5490 "celers_task_results",
5491 ];
5492
5493 let mut optimized = 0u64;
5494
5495 for table in &tables {
5496 sqlx::query(&format!("OPTIMIZE TABLE {}", table))
5498 .execute(&self.pool)
5499 .await
5500 .map_err(|e| {
5501 CelersError::Other(format!("Failed to optimize table {}: {}", table, e))
5502 })?;
5503
5504 sqlx::query(&format!("ANALYZE TABLE {}", table))
5506 .execute(&self.pool)
5507 .await
5508 .map_err(|e| {
5509 CelersError::Other(format!("Failed to analyze table {}: {}", table, e))
5510 })?;
5511
5512 optimized += 1;
5513 }
5514
5515 tracing::info!(tables_count = optimized, "Completed vacuum analyze");
5516 Ok(optimized)
5517 }
5518
5519 pub async fn get_slow_queries(&self, limit: i64) -> Result<Vec<SlowQueryInfo>> {
5532 let ps_enabled: String = sqlx::query_scalar("SELECT @@performance_schema")
5534 .fetch_one(&self.pool)
5535 .await
5536 .map_err(|e| {
5537 CelersError::Other(format!("Failed to check performance_schema: {}", e))
5538 })?;
5539
5540 if ps_enabled != "1" {
5541 return Ok(Vec::new());
5542 }
5543
5544 let rows = sqlx::query(
5546 r#"
5547 SELECT
5548 DIGEST_TEXT as query_text,
5549 COUNT_STAR as execution_count,
5550 AVG_TIMER_WAIT / 1000000000 as avg_time_ms,
5551 MAX_TIMER_WAIT / 1000000000 as max_time_ms,
5552 SUM_TIMER_WAIT / 1000000000 as total_time_ms
5553 FROM performance_schema.events_statements_summary_by_digest
5554 WHERE DIGEST_TEXT LIKE '%celers_%'
5555 ORDER BY SUM_TIMER_WAIT DESC
5556 LIMIT ?
5557 "#,
5558 )
5559 .bind(limit)
5560 .fetch_all(&self.pool)
5561 .await
5562 .map_err(|e| CelersError::Other(format!("Failed to query slow queries: {}", e)))?;
5563
5564 let mut slow_queries = Vec::new();
5565 for row in rows {
5566 slow_queries.push(SlowQueryInfo {
5567 query_text: row.try_get("query_text").unwrap_or_default(),
5568 execution_count: row.try_get("execution_count").unwrap_or(0),
5569 avg_time_ms: row.try_get("avg_time_ms").unwrap_or(0.0),
5570 max_time_ms: row.try_get("max_time_ms").unwrap_or(0.0),
5571 total_time_ms: row.try_get("total_time_ms").unwrap_or(0.0),
5572 });
5573 }
5574
5575 Ok(slow_queries)
5576 }
5577
5578 pub async fn apply_priority_aging(
5604 &self,
5605 age_threshold_secs: i64,
5606 priority_boost: i32,
5607 ) -> Result<u64> {
5608 if age_threshold_secs <= 0 {
5610 return Err(CelersError::Other(
5611 "age_threshold_secs must be positive".to_string(),
5612 ));
5613 }
5614 if priority_boost <= 0 {
5615 return Err(CelersError::Other(
5616 "priority_boost must be positive".to_string(),
5617 ));
5618 }
5619 if priority_boost > 100 {
5620 tracing::warn!(
5621 priority_boost = priority_boost,
5622 "Large priority boost may cause priority inversion"
5623 );
5624 }
5625
5626 let result = sqlx::query(
5627 r#"
5628 UPDATE celers_tasks
5629 SET priority = priority + ?
5630 WHERE state = 'pending'
5631 AND TIMESTAMPDIFF(SECOND, created_at, NOW()) > ?
5632 AND priority < 1000
5633 "#,
5634 )
5635 .bind(priority_boost)
5636 .bind(age_threshold_secs)
5637 .execute(&self.pool)
5638 .await
5639 .map_err(|e| CelersError::Other(format!("Failed to apply priority aging: {}", e)))?;
5640
5641 let updated = result.rows_affected();
5642 if updated > 0 {
5643 tracing::info!(
5644 count = updated,
5645 age_threshold_secs = age_threshold_secs,
5646 priority_boost = priority_boost,
5647 "Applied priority aging"
5648 );
5649 }
5650
5651 Ok(updated)
5652 }
5653
5654 pub async fn update_task_progress(
5681 &self,
5682 task_id: &TaskId,
5683 progress_percent: f64,
5684 current_step: Option<&str>,
5685 ) -> Result<bool> {
5686 if !(0.0..=100.0).contains(&progress_percent) {
5688 return Err(CelersError::Other(format!(
5689 "progress_percent must be between 0.0 and 100.0, got {}",
5690 progress_percent
5691 )));
5692 }
5693
5694 let progress_json = serde_json::json!({
5695 "progress_percent": progress_percent,
5696 "current_step": current_step,
5697 "updated_at": chrono::Utc::now().to_rfc3339(),
5698 });
5699
5700 let result = sqlx::query(
5701 r#"
5702 UPDATE celers_tasks
5703 SET metadata = JSON_SET(
5704 metadata,
5705 '$.progress', ?
5706 )
5707 WHERE id = ? AND state = 'processing'
5708 "#,
5709 )
5710 .bind(serde_json::to_string(&progress_json).unwrap_or_else(|_| "{}".to_string()))
5711 .bind(task_id.to_string())
5712 .execute(&self.pool)
5713 .await
5714 .map_err(|e| CelersError::Other(format!("Failed to update task progress: {}", e)))?;
5715
5716 Ok(result.rows_affected() > 0)
5717 }
5718
5719 pub async fn get_task_progress(&self, task_id: &TaskId) -> Result<Option<TaskProgress>> {
5727 let row = sqlx::query(
5728 r#"
5729 SELECT
5730 id,
5731 JSON_EXTRACT(metadata, '$.progress.progress_percent') as progress_percent,
5732 JSON_UNQUOTE(JSON_EXTRACT(metadata, '$.progress.current_step')) as current_step,
5733 JSON_UNQUOTE(JSON_EXTRACT(metadata, '$.progress.updated_at')) as updated_at
5734 FROM celers_tasks
5735 WHERE id = ?
5736 "#,
5737 )
5738 .bind(task_id.to_string())
5739 .fetch_optional(&self.pool)
5740 .await
5741 .map_err(|e| CelersError::Other(format!("Failed to get task progress: {}", e)))?;
5742
5743 if let Some(row) = row {
5744 let progress_percent: Option<f64> = row.try_get("progress_percent").ok();
5745 let current_step: Option<String> = row.try_get("current_step").ok();
5746 let updated_at_str: Option<String> = row.try_get("updated_at").ok();
5747
5748 if let Some(percent) = progress_percent {
5749 let updated_at = updated_at_str
5750 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
5751 .map(|dt| dt.with_timezone(&Utc))
5752 .unwrap_or_else(Utc::now);
5753
5754 return Ok(Some(TaskProgress {
5755 task_id: *task_id,
5756 progress_percent: percent,
5757 current_step,
5758 total_steps: None,
5759 updated_at,
5760 }));
5761 }
5762 }
5763
5764 Ok(None)
5765 }
5766
5767 pub async fn check_rate_limit(
5793 &self,
5794 task_name: &str,
5795 max_per_minute: i64,
5796 ) -> Result<RateLimitStatus> {
5797 let completed_last_minute: i64 = sqlx::query_scalar(
5799 r#"
5800 SELECT COUNT(*)
5801 FROM celers_tasks
5802 WHERE task_name = ?
5803 AND state = 'completed'
5804 AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 MINUTE)
5805 "#,
5806 )
5807 .bind(task_name)
5808 .fetch_one(&self.pool)
5809 .await
5810 .map_err(|e| CelersError::Other(format!("Failed to check rate limit: {}", e)))?;
5811
5812 let completed_last_hour: i64 = sqlx::query_scalar(
5814 r#"
5815 SELECT COUNT(*)
5816 FROM celers_tasks
5817 WHERE task_name = ?
5818 AND state = 'completed'
5819 AND completed_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
5820 "#,
5821 )
5822 .bind(task_name)
5823 .fetch_one(&self.pool)
5824 .await
5825 .unwrap_or(0);
5826
5827 let per_second = completed_last_minute as f64 / 60.0;
5828 let limit_exceeded = completed_last_minute >= max_per_minute;
5829
5830 Ok(RateLimitStatus {
5831 task_name: task_name.to_string(),
5832 current_per_second: per_second,
5833 current_per_minute: completed_last_minute,
5834 current_per_hour: completed_last_hour,
5835 limit_exceeded,
5836 })
5837 }
5838
5839 pub async fn enqueue_deduplicated_window(
5866 &self,
5867 task: SerializedTask,
5868 dedup_key: &str,
5869 window_secs: i64,
5870 ) -> Result<TaskId> {
5871 let existing: Option<String> = sqlx::query_scalar(
5873 r#"
5874 SELECT id
5875 FROM celers_tasks
5876 WHERE JSON_EXTRACT(metadata, '$.dedup_key') = ?
5877 AND created_at >= DATE_SUB(NOW(), INTERVAL ? SECOND)
5878 AND state IN ('pending', 'processing')
5879 LIMIT 1
5880 "#,
5881 )
5882 .bind(dedup_key)
5883 .bind(window_secs)
5884 .fetch_optional(&self.pool)
5885 .await
5886 .map_err(|e| CelersError::Other(format!("Failed to check for duplicates: {}", e)))?;
5887
5888 if let Some(id_str) = existing {
5889 let task_id = Uuid::parse_str(&id_str)
5891 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?;
5892 tracing::debug!(
5893 task_id = %task_id,
5894 dedup_key = dedup_key,
5895 "Found duplicate task within window"
5896 );
5897 return Ok(task_id);
5898 }
5899
5900 let task_id = task.metadata.id;
5902 let mut db_metadata = json!({
5903 "queue": self.queue_name,
5904 "enqueued_at": chrono::Utc::now().to_rfc3339(),
5905 "dedup_key": dedup_key,
5906 });
5907
5908 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
5910 if let Some(obj) = db_metadata.as_object_mut() {
5911 if let Some(meta_obj) = task_meta.as_object() {
5912 for (k, v) in meta_obj {
5913 obj.insert(k.clone(), v.clone());
5914 }
5915 }
5916 }
5917 }
5918
5919 sqlx::query(
5920 r#"
5921 INSERT INTO celers_tasks
5922 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
5923 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
5924 "#,
5925 )
5926 .bind(task_id.to_string())
5927 .bind(&task.metadata.name)
5928 .bind(&task.payload)
5929 .bind(task.metadata.priority)
5930 .bind(task.metadata.max_retries as i32)
5931 .bind(serde_json::to_string(&db_metadata).unwrap_or_else(|_| "{}".to_string()))
5932 .execute(&self.pool)
5933 .await
5934 .map_err(|e| CelersError::Other(format!("Failed to enqueue task: {}", e)))?;
5935
5936 Ok(task_id)
5937 }
5938
5939 pub async fn cancel_cascade(&self, task_id: &TaskId) -> Result<u64> {
5965 let parent_result = sqlx::query(
5967 r#"
5968 UPDATE celers_tasks
5969 SET state = 'cancelled',
5970 completed_at = NOW()
5971 WHERE id = ?
5972 AND state IN ('pending', 'processing')
5973 "#,
5974 )
5975 .bind(task_id.to_string())
5976 .execute(&self.pool)
5977 .await
5978 .map_err(|e| CelersError::Other(format!("Failed to cancel parent task: {}", e)))?;
5979
5980 let mut total_cancelled = parent_result.rows_affected();
5981
5982 let dependent_result = sqlx::query(
5984 r#"
5985 UPDATE celers_tasks
5986 SET state = 'cancelled',
5987 completed_at = NOW(),
5988 error_message = CONCAT(
5989 COALESCE(error_message, ''),
5990 'Cancelled due to parent task cancellation'
5991 )
5992 WHERE JSON_EXTRACT(metadata, '$.parent_task_id') = ?
5993 AND state IN ('pending', 'processing')
5994 "#,
5995 )
5996 .bind(task_id.to_string())
5997 .execute(&self.pool)
5998 .await
5999 .map_err(|e| CelersError::Other(format!("Failed to cancel dependent tasks: {}", e)))?;
6000
6001 total_cancelled += dependent_result.rows_affected();
6002
6003 if total_cancelled > 0 {
6004 tracing::info!(
6005 parent_task_id = %task_id,
6006 total_cancelled = total_cancelled,
6007 "Cascade cancelled tasks"
6008 );
6009 }
6010
6011 Ok(total_cancelled)
6012 }
6013
6014 pub async fn enqueue_with_retry_policy(
6047 &self,
6048 task: SerializedTask,
6049 retry_policy: RetryPolicy,
6050 ) -> Result<TaskId> {
6051 let task_id = task.metadata.id;
6052 let retry_policy_json = serde_json::to_value(&retry_policy)
6053 .map_err(|e| CelersError::Other(format!("Failed to serialize retry policy: {}", e)))?;
6054
6055 let mut metadata = json!({
6056 "queue": self.queue_name,
6057 "enqueued_at": chrono::Utc::now().to_rfc3339(),
6058 "retry_policy": retry_policy_json,
6059 });
6060
6061 if let Ok(task_meta) = serde_json::to_value(&task.metadata) {
6063 if let Some(obj) = metadata.as_object_mut() {
6064 if let Some(meta_obj) = task_meta.as_object() {
6065 for (k, v) in meta_obj {
6066 obj.insert(k.clone(), v.clone());
6067 }
6068 }
6069 }
6070 }
6071
6072 sqlx::query(
6073 r#"
6074 INSERT INTO celers_tasks
6075 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
6076 VALUES (?, ?, ?, 'pending', ?, ?, ?, NOW(), NOW())
6077 "#,
6078 )
6079 .bind(task_id.to_string())
6080 .bind(&task.metadata.name)
6081 .bind(&task.payload)
6082 .bind(task.metadata.priority)
6083 .bind(retry_policy.max_retries as i32)
6084 .bind(serde_json::to_string(&metadata).unwrap_or_else(|_| "{}".to_string()))
6085 .execute(&self.pool)
6086 .await
6087 .map_err(|e| CelersError::Other(format!("Failed to enqueue task: {}", e)))?;
6088
6089 #[cfg(feature = "metrics")]
6090 {
6091 TASKS_ENQUEUED_TOTAL.inc();
6092 TASKS_ENQUEUED_BY_TYPE
6093 .with_label_values(&[&task.metadata.name])
6094 .inc();
6095 }
6096
6097 tracing::debug!(task_id = %task_id, task_name = %task.metadata.name, "Task enqueued with custom retry policy");
6098 Ok(task_id)
6099 }
6100
6101 pub async fn reject_with_retry_policy(
6113 &self,
6114 task_id: &TaskId,
6115 error_message: Option<String>,
6116 requeue: bool,
6117 ) -> Result<bool> {
6118 let task_info = self.get_task(task_id).await?;
6120 let task_info = match task_info {
6121 Some(info) => info,
6122 None => return Ok(false),
6123 };
6124
6125 if !requeue || task_info.retry_count >= task_info.max_retries {
6126 self.reject(task_id, None, false).await?;
6128 return Ok(true);
6129 }
6130
6131 let retry_delay_secs = if let Ok(Some(task)) = self.get_task(task_id).await {
6133 let metadata_str =
6135 sqlx::query_scalar::<_, String>("SELECT metadata FROM celers_tasks WHERE id = ?")
6136 .bind(task_id.to_string())
6137 .fetch_optional(&self.pool)
6138 .await
6139 .ok()
6140 .flatten();
6141
6142 if let Some(meta_str) = metadata_str {
6143 if let Ok(meta) = serde_json::from_str::<serde_json::Value>(&meta_str) {
6144 if let Some(policy_value) = meta.get("retry_policy") {
6145 if let Ok(policy) =
6146 serde_json::from_value::<RetryPolicy>(policy_value.clone())
6147 {
6148 policy.strategy.calculate_delay(task.retry_count as u32)
6149 } else {
6150 60 }
6152 } else {
6153 60
6154 }
6155 } else {
6156 60
6157 }
6158 } else {
6159 60
6160 }
6161 } else {
6162 60
6163 };
6164
6165 let result = sqlx::query(
6167 r#"
6168 UPDATE celers_tasks
6169 SET state = 'pending',
6170 retry_count = retry_count + 1,
6171 error_message = ?,
6172 scheduled_at = DATE_ADD(NOW(), INTERVAL ? SECOND),
6173 started_at = NULL
6174 WHERE id = ? AND state = 'processing'
6175 "#,
6176 )
6177 .bind(error_message)
6178 .bind(retry_delay_secs as i64)
6179 .bind(task_id.to_string())
6180 .execute(&self.pool)
6181 .await
6182 .map_err(|e| CelersError::Other(format!("Failed to reject task: {}", e)))?;
6183
6184 Ok(result.rows_affected() > 0)
6185 }
6186
6187 pub async fn register_recurring_task(&self, config: RecurringTaskConfig) -> Result<String> {
6220 let config_id = Uuid::new_v4().to_string();
6221 let config_json = serde_json::to_string(&config)
6222 .map_err(|e| CelersError::Other(format!("Failed to serialize config: {}", e)))?;
6223
6224 sqlx::query(
6225 r#"
6226 INSERT INTO celers_task_results
6227 (task_id, task_name, status, result, created_at)
6228 VALUES (?, ?, 'PENDING', ?, NOW())
6229 ON DUPLICATE KEY UPDATE
6230 result = VALUES(result),
6231 created_at = NOW()
6232 "#,
6233 )
6234 .bind(&config_id)
6235 .bind(format!("__recurring__{}", config.task_name))
6236 .bind(&config_json)
6237 .execute(&self.pool)
6238 .await
6239 .map_err(|e| CelersError::Other(format!("Failed to register recurring task: {}", e)))?;
6240
6241 tracing::info!(
6242 config_id = config_id,
6243 task_name = config.task_name,
6244 "Recurring task registered"
6245 );
6246 Ok(config_id)
6247 }
6248
6249 pub async fn process_recurring_tasks(&self) -> Result<u64> {
6270 let rows = sqlx::query(
6272 r#"
6273 SELECT task_id, result
6274 FROM celers_task_results
6275 WHERE task_name LIKE '__recurring__%'
6276 AND status = 'PENDING'
6277 "#,
6278 )
6279 .fetch_all(&self.pool)
6280 .await
6281 .map_err(|e| CelersError::Other(format!("Failed to fetch recurring tasks: {}", e)))?;
6282
6283 let mut enqueued = 0u64;
6284 let now = Utc::now();
6285
6286 for row in rows {
6287 let config_id: String = row.get("task_id");
6288 let config_json: String = row.get("result");
6289
6290 let mut config: RecurringTaskConfig =
6291 serde_json::from_str(&config_json).map_err(|e| {
6292 CelersError::Other(format!("Failed to parse recurring config: {}", e))
6293 })?;
6294
6295 if !config.enabled || config.next_run > now {
6297 continue;
6298 }
6299
6300 let task = SerializedTask::new(config.task_name.clone(), config.payload.clone());
6302 match self.enqueue(task).await {
6303 Ok(_) => {
6304 enqueued += 1;
6305
6306 config.last_run = Some(now);
6308 config.next_run = config.schedule.next_run_from(now);
6309
6310 let updated_json = serde_json::to_string(&config).unwrap_or(config_json);
6311
6312 let _ = sqlx::query(
6314 r#"
6315 UPDATE celers_task_results
6316 SET result = ?
6317 WHERE task_id = ?
6318 "#,
6319 )
6320 .bind(&updated_json)
6321 .bind(&config_id)
6322 .execute(&self.pool)
6323 .await;
6324
6325 tracing::debug!(
6326 config_id = config_id,
6327 task_name = config.task_name,
6328 "Recurring task enqueued"
6329 );
6330 }
6331 Err(e) => {
6332 tracing::error!(
6333 config_id = config_id,
6334 task_name = config.task_name,
6335 error = %e,
6336 "Failed to enqueue recurring task"
6337 );
6338 }
6339 }
6340 }
6341
6342 Ok(enqueued)
6343 }
6344
6345 pub async fn list_recurring_tasks(&self) -> Result<Vec<(String, RecurringTaskConfig)>> {
6350 let rows = sqlx::query(
6351 r#"
6352 SELECT task_id, result
6353 FROM celers_task_results
6354 WHERE task_name LIKE '__recurring__%'
6355 AND status = 'PENDING'
6356 "#,
6357 )
6358 .fetch_all(&self.pool)
6359 .await
6360 .map_err(|e| CelersError::Other(format!("Failed to fetch recurring tasks: {}", e)))?;
6361
6362 let mut configs = Vec::new();
6363 for row in rows {
6364 let config_id: String = row.get("task_id");
6365 let config_json: String = row.get("result");
6366
6367 if let Ok(config) = serde_json::from_str::<RecurringTaskConfig>(&config_json) {
6368 configs.push((config_id, config));
6369 }
6370 }
6371
6372 Ok(configs)
6373 }
6374
6375 pub async fn delete_recurring_task(&self, config_id: &str) -> Result<bool> {
6383 let result = sqlx::query(
6384 r#"
6385 DELETE FROM celers_task_results
6386 WHERE task_id = ? AND task_name LIKE '__recurring__%'
6387 "#,
6388 )
6389 .bind(config_id)
6390 .execute(&self.pool)
6391 .await
6392 .map_err(|e| CelersError::Other(format!("Failed to delete recurring task: {}", e)))?;
6393
6394 Ok(result.rows_affected() > 0)
6395 }
6396
6397 pub async fn export_tasks(
6422 &self,
6423 state: Option<DbTaskState>,
6424 limit: Option<i64>,
6425 ) -> Result<String> {
6426 let mut query = String::from(
6427 r#"
6428 SELECT id, task_name, payload, state, priority, retry_count, max_retries,
6429 created_at, scheduled_at, started_at, completed_at, worker_id, error_message, metadata
6430 FROM celers_tasks
6431 "#,
6432 );
6433
6434 if let Some(s) = &state {
6435 query.push_str(&format!(" WHERE state = '{}'", s));
6436 }
6437
6438 query.push_str(" ORDER BY created_at ASC");
6439
6440 if let Some(l) = limit {
6441 query.push_str(&format!(" LIMIT {}", l));
6442 }
6443
6444 let rows = sqlx::query(&query)
6445 .fetch_all(&self.pool)
6446 .await
6447 .map_err(|e| CelersError::Other(format!("Failed to export tasks: {}", e)))?;
6448
6449 let mut tasks = Vec::new();
6450 for row in rows {
6451 let task = serde_json::json!({
6452 "id": row.get::<String, _>("id"),
6453 "task_name": row.get::<String, _>("task_name"),
6454 "payload": row.get::<Vec<u8>, _>("payload"),
6455 "state": row.get::<String, _>("state"),
6456 "priority": row.get::<i32, _>("priority"),
6457 "retry_count": row.get::<i32, _>("retry_count"),
6458 "max_retries": row.get::<i32, _>("max_retries"),
6459 "created_at": row.get::<DateTime<Utc>, _>("created_at"),
6460 "scheduled_at": row.get::<DateTime<Utc>, _>("scheduled_at"),
6461 "started_at": row.get::<Option<DateTime<Utc>>, _>("started_at"),
6462 "completed_at": row.get::<Option<DateTime<Utc>>, _>("completed_at"),
6463 "worker_id": row.get::<Option<String>, _>("worker_id"),
6464 "error_message": row.get::<Option<String>, _>("error_message"),
6465 "metadata": row.get::<String, _>("metadata"),
6466 });
6467 tasks.push(task);
6468 }
6469
6470 serde_json::to_string_pretty(&tasks)
6471 .map_err(|e| CelersError::Other(format!("Failed to serialize tasks: {}", e)))
6472 }
6473
6474 pub async fn import_tasks(&self, json_data: &str, skip_existing: bool) -> Result<u64> {
6500 let tasks: Vec<serde_json::Value> = serde_json::from_str(json_data)
6501 .map_err(|e| CelersError::Other(format!("Failed to parse JSON: {}", e)))?;
6502
6503 let mut imported = 0u64;
6504
6505 for task in tasks {
6506 let id = task["id"]
6507 .as_str()
6508 .ok_or_else(|| CelersError::Other("Missing task id".to_string()))?;
6509
6510 if skip_existing {
6512 let exists: i64 =
6513 sqlx::query_scalar("SELECT COUNT(*) FROM celers_tasks WHERE id = ?")
6514 .bind(id)
6515 .fetch_one(&self.pool)
6516 .await
6517 .map_err(|e| {
6518 CelersError::Other(format!("Failed to check task existence: {}", e))
6519 })?;
6520
6521 if exists > 0 {
6522 tracing::debug!(task_id = id, "Skipping existing task");
6523 continue;
6524 }
6525 }
6526
6527 let result = sqlx::query(
6529 r#"
6530 INSERT INTO celers_tasks
6531 (id, task_name, payload, state, priority, retry_count, max_retries,
6532 created_at, scheduled_at, started_at, completed_at, worker_id, error_message, metadata)
6533 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
6534 "#,
6535 )
6536 .bind(id)
6537 .bind(task["task_name"].as_str().unwrap_or(""))
6538 .bind(task["payload"].as_array().map(|a| {
6539 a.iter()
6540 .filter_map(|v| v.as_u64().map(|n| n as u8))
6541 .collect::<Vec<u8>>()
6542 }).unwrap_or_default())
6543 .bind(task["state"].as_str().unwrap_or("pending"))
6544 .bind(task["priority"].as_i64().unwrap_or(0) as i32)
6545 .bind(task["retry_count"].as_i64().unwrap_or(0) as i32)
6546 .bind(task["max_retries"].as_i64().unwrap_or(3) as i32)
6547 .bind(task["created_at"].as_str().and_then(|s| DateTime::parse_from_rfc3339(s).ok()).map(|dt| dt.with_timezone(&Utc)).unwrap_or_else(Utc::now))
6548 .bind(task["scheduled_at"].as_str().and_then(|s| DateTime::parse_from_rfc3339(s).ok()).map(|dt| dt.with_timezone(&Utc)).unwrap_or_else(Utc::now))
6549 .bind(task["started_at"].as_str().and_then(|s| DateTime::parse_from_rfc3339(s).ok()).map(|dt| dt.with_timezone(&Utc)))
6550 .bind(task["completed_at"].as_str().and_then(|s| DateTime::parse_from_rfc3339(s).ok()).map(|dt| dt.with_timezone(&Utc)))
6551 .bind(task["worker_id"].as_str())
6552 .bind(task["error_message"].as_str())
6553 .bind(task["metadata"].as_str().unwrap_or("{}"))
6554 .execute(&self.pool)
6555 .await;
6556
6557 match result {
6558 Ok(_) => {
6559 imported += 1;
6560 tracing::debug!(task_id = id, "Imported task");
6561 }
6562 Err(e) => {
6563 if !skip_existing {
6564 return Err(CelersError::Other(format!(
6565 "Failed to import task {}: {}",
6566 id, e
6567 )));
6568 }
6569 tracing::warn!(task_id = id, error = %e, "Failed to import task, skipping");
6570 }
6571 }
6572 }
6573
6574 tracing::info!(imported = imported, "Task import completed");
6575 Ok(imported)
6576 }
6577
6578 pub async fn export_dlq(&self, limit: Option<i64>) -> Result<String> {
6601 let mut query = String::from(
6602 r#"
6603 SELECT id, task_id, task_name, payload, retry_count, error_message, failed_at, metadata
6604 FROM celers_dead_letter_queue
6605 ORDER BY failed_at DESC
6606 "#,
6607 );
6608
6609 if let Some(l) = limit {
6610 query.push_str(&format!(" LIMIT {}", l));
6611 }
6612
6613 let rows = sqlx::query(&query)
6614 .fetch_all(&self.pool)
6615 .await
6616 .map_err(|e| CelersError::Other(format!("Failed to export DLQ: {}", e)))?;
6617
6618 let mut dlq_entries = Vec::new();
6619 for row in rows {
6620 let entry = serde_json::json!({
6621 "id": row.get::<String, _>("id"),
6622 "task_id": row.get::<String, _>("task_id"),
6623 "task_name": row.get::<String, _>("task_name"),
6624 "payload": row.get::<Vec<u8>, _>("payload"),
6625 "retry_count": row.get::<i32, _>("retry_count"),
6626 "error_message": row.get::<Option<String>, _>("error_message"),
6627 "failed_at": row.get::<DateTime<Utc>, _>("failed_at"),
6628 "metadata": row.get::<String, _>("metadata"),
6629 });
6630 dlq_entries.push(entry);
6631 }
6632
6633 serde_json::to_string_pretty(&dlq_entries)
6634 .map_err(|e| CelersError::Other(format!("Failed to serialize DLQ: {}", e)))
6635 }
6636
6637 pub fn get_circuit_breaker_stats(&self) -> CircuitBreakerStats {
6654 let cb = self
6655 .circuit_breaker
6656 .read()
6657 .expect("lock should not be poisoned");
6658 CircuitBreakerStats {
6659 state: cb.state,
6660 failure_count: cb.failure_count,
6661 success_count: cb.success_count,
6662 last_failure_time: cb.last_failure_time,
6663 last_state_change: cb.last_state_change,
6664 }
6665 }
6666
6667 pub fn reset_circuit_breaker(&self) {
6685 let mut cb = self
6686 .circuit_breaker
6687 .write()
6688 .expect("lock should not be poisoned");
6689 cb.state = CircuitBreakerState::Closed;
6690 cb.failure_count = 0;
6691 cb.success_count = 0;
6692 cb.last_failure_time = None;
6693 cb.last_state_change = Utc::now();
6694 tracing::info!("Circuit breaker manually reset to Closed state");
6695 }
6696
6697 fn record_success(&self) {
6702 let mut cb = self
6703 .circuit_breaker
6704 .write()
6705 .expect("lock should not be poisoned");
6706
6707 match cb.state {
6708 CircuitBreakerState::HalfOpen => {
6709 cb.success_count += 1;
6710 if cb.success_count >= cb.config.success_threshold {
6711 cb.state = CircuitBreakerState::Closed;
6712 cb.failure_count = 0;
6713 cb.success_count = 0;
6714 cb.last_state_change = Utc::now();
6715 tracing::info!(
6716 "Circuit breaker transitioned to Closed after successful recovery"
6717 );
6718 }
6719 }
6720 CircuitBreakerState::Closed => {
6721 cb.failure_count = 0;
6723 }
6724 CircuitBreakerState::Open => {
6725 }
6727 }
6728 }
6729
6730 fn record_failure(&self) {
6735 let mut cb = self
6736 .circuit_breaker
6737 .write()
6738 .expect("lock should not be poisoned");
6739 cb.failure_count += 1;
6740 cb.last_failure_time = Some(Utc::now());
6741
6742 match cb.state {
6743 CircuitBreakerState::Closed => {
6744 if cb.failure_count >= cb.config.failure_threshold {
6745 cb.state = CircuitBreakerState::Open;
6746 cb.last_state_change = Utc::now();
6747 tracing::warn!(
6748 failure_count = cb.failure_count,
6749 "Circuit breaker opened due to consecutive failures"
6750 );
6751 }
6752 }
6753 CircuitBreakerState::HalfOpen => {
6754 cb.state = CircuitBreakerState::Open;
6756 cb.success_count = 0;
6757 cb.last_state_change = Utc::now();
6758 tracing::warn!("Circuit breaker reopened after failure in HalfOpen state");
6759 }
6760 CircuitBreakerState::Open => {
6761 }
6763 }
6764 }
6765
6766 fn check_circuit(&self) -> Result<()> {
6771 let mut cb = self
6772 .circuit_breaker
6773 .write()
6774 .expect("lock should not be poisoned");
6775
6776 match cb.state {
6777 CircuitBreakerState::Closed | CircuitBreakerState::HalfOpen => Ok(()),
6778 CircuitBreakerState::Open => {
6779 let elapsed = Utc::now()
6781 .signed_duration_since(cb.last_state_change)
6782 .num_seconds();
6783
6784 if elapsed >= cb.config.timeout_secs as i64 {
6785 cb.state = CircuitBreakerState::HalfOpen;
6787 cb.success_count = 0;
6788 cb.last_state_change = Utc::now();
6789 tracing::info!("Circuit breaker transitioned to HalfOpen, testing recovery");
6790 Ok(())
6791 } else {
6792 Err(CelersError::Other(format!(
6793 "Circuit breaker is open (will retry in {} seconds)",
6794 cb.config.timeout_secs as i64 - elapsed
6795 )))
6796 }
6797 }
6798 }
6799 }
6800
6801 pub async fn with_circuit_breaker<F, T, Fut>(&self, operation: F) -> Result<T>
6813 where
6814 F: FnOnce() -> Fut,
6815 Fut: std::future::Future<Output = Result<T>>,
6816 {
6817 self.check_circuit()?;
6819
6820 match operation().await {
6822 Ok(result) => {
6823 self.record_success();
6824 Ok(result)
6825 }
6826 Err(e) => {
6827 self.record_failure();
6828 Err(e)
6829 }
6830 }
6831 }
6832
6833 pub async fn enqueue_with_idempotency(
6882 &self,
6883 task: SerializedTask,
6884 idempotency_key: &str,
6885 ttl_secs: u64,
6886 metadata: Option<serde_json::Value>,
6887 ) -> Result<TaskId> {
6888 let existing: Option<(String,)> = sqlx::query_as(
6890 r#"
6891 SELECT task_id
6892 FROM celers_task_idempotency
6893 WHERE idempotency_key = ?
6894 AND task_name = ?
6895 AND expires_at > NOW()
6896 "#,
6897 )
6898 .bind(idempotency_key)
6899 .bind(&task.metadata.name)
6900 .fetch_optional(&self.pool)
6901 .await
6902 .map_err(|e| CelersError::Other(format!("Failed to check idempotency record: {}", e)))?;
6903
6904 if let Some((task_id_str,)) = existing {
6905 let task_id = Uuid::parse_str(&task_id_str)
6907 .map_err(|e| CelersError::Other(format!("Invalid task UUID: {}", e)))?;
6908
6909 tracing::debug!(
6910 idempotency_key = %idempotency_key,
6911 task_id = %task_id,
6912 "Duplicate task detected, returning existing task ID"
6913 );
6914
6915 return Ok(task_id);
6916 }
6917
6918 let task_id = Uuid::new_v4();
6920 let idempotency_id = Uuid::new_v4();
6921
6922 let mut tx = self
6924 .pool
6925 .begin()
6926 .await
6927 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
6928
6929 sqlx::query(
6931 r#"
6932 INSERT INTO celers_tasks
6933 (id, task_name, payload, state, priority, max_retries, metadata, created_at, scheduled_at)
6934 VALUES (?, ?, ?, 'pending', ?, ?, '{}', NOW(), NOW())
6935 "#,
6936 )
6937 .bind(task_id.to_string())
6938 .bind(&task.metadata.name)
6939 .bind(&task.payload)
6940 .bind(task.metadata.priority)
6941 .bind(task.metadata.max_retries as i32)
6942 .execute(&mut *tx)
6943 .await
6944 .map_err(|e| CelersError::Other(format!("Failed to enqueue task: {}", e)))?;
6945
6946 sqlx::query(
6948 r#"
6949 INSERT INTO celers_task_idempotency
6950 (id, idempotency_key, task_name, task_id, created_at, expires_at, metadata)
6951 VALUES (?, ?, ?, ?, NOW(), DATE_ADD(NOW(), INTERVAL ? SECOND), ?)
6952 "#,
6953 )
6954 .bind(idempotency_id.to_string())
6955 .bind(idempotency_key)
6956 .bind(&task.metadata.name)
6957 .bind(task_id.to_string())
6958 .bind(ttl_secs as i64)
6959 .bind(metadata.map(|m| serde_json::to_string(&m).unwrap_or_else(|_| "{}".to_string())))
6960 .execute(&mut *tx)
6961 .await
6962 .map_err(|e| CelersError::Other(format!("Failed to insert idempotency record: {}", e)))?;
6963
6964 tx.commit()
6966 .await
6967 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
6968
6969 tracing::info!(
6970 task_id = %task_id,
6971 idempotency_key = %idempotency_key,
6972 ttl_secs = ttl_secs,
6973 "Task enqueued with idempotency key"
6974 );
6975
6976 #[cfg(feature = "metrics")]
6977 {
6978 TASKS_ENQUEUED_TOTAL.inc();
6979 TASKS_ENQUEUED_BY_TYPE
6980 .with_label_values(&[&task.metadata.name])
6981 .inc();
6982 }
6983
6984 Ok(task_id)
6985 }
6986
6987 #[allow(clippy::type_complexity)]
7010 pub async fn get_idempotency_record(
7011 &self,
7012 idempotency_key: &str,
7013 task_name: &str,
7014 ) -> Result<Option<IdempotencyRecord>> {
7015 let record: Option<(
7016 String,
7017 String,
7018 String,
7019 String,
7020 DateTime<Utc>,
7021 DateTime<Utc>,
7022 Option<String>,
7023 )> = sqlx::query_as(
7024 r#"
7025 SELECT id, idempotency_key, task_name, task_id, created_at, expires_at, metadata
7026 FROM celers_task_idempotency
7027 WHERE idempotency_key = ?
7028 AND task_name = ?
7029 ORDER BY created_at DESC
7030 LIMIT 1
7031 "#,
7032 )
7033 .bind(idempotency_key)
7034 .bind(task_name)
7035 .fetch_optional(&self.pool)
7036 .await
7037 .map_err(|e| CelersError::Other(format!("Failed to fetch idempotency record: {}", e)))?;
7038
7039 if let Some((id, key, task_name, task_id, created_at, expires_at, metadata)) = record {
7040 Ok(Some(IdempotencyRecord {
7041 id: Uuid::parse_str(&id)
7042 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
7043 idempotency_key: key,
7044 task_name,
7045 task_id: Uuid::parse_str(&task_id)
7046 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
7047 created_at,
7048 expires_at,
7049 metadata: metadata.and_then(|m| serde_json::from_str(&m).ok()),
7050 }))
7051 } else {
7052 Ok(None)
7053 }
7054 }
7055
7056 pub async fn cleanup_expired_idempotency_keys(&self) -> Result<u64> {
7076 let result = sqlx::query(
7077 r#"
7078 DELETE FROM celers_task_idempotency
7079 WHERE expires_at <= NOW()
7080 "#,
7081 )
7082 .execute(&self.pool)
7083 .await
7084 .map_err(|e| {
7085 CelersError::Other(format!("Failed to cleanup expired idempotency keys: {}", e))
7086 })?;
7087
7088 let deleted = result.rows_affected();
7089
7090 if deleted > 0 {
7091 tracing::info!(count = deleted, "Cleaned up expired idempotency keys");
7092 }
7093
7094 Ok(deleted)
7095 }
7096
7097 #[allow(clippy::type_complexity)]
7119 pub async fn get_idempotency_statistics(&self) -> Result<Vec<IdempotencyStats>> {
7120 let rows: Vec<(
7121 String,
7122 i64,
7123 i64,
7124 i64,
7125 i64,
7126 Option<DateTime<Utc>>,
7127 Option<DateTime<Utc>>,
7128 )> = sqlx::query_as(
7129 r#"
7130 SELECT
7131 task_name,
7132 COUNT(*) as total_keys,
7133 COUNT(DISTINCT idempotency_key) as unique_keys,
7134 SUM(CASE WHEN expires_at > NOW() THEN 1 ELSE 0 END) as active_keys,
7135 SUM(CASE WHEN expires_at <= NOW() THEN 1 ELSE 0 END) as expired_keys,
7136 MIN(created_at) as oldest_key,
7137 MAX(created_at) as newest_key
7138 FROM celers_task_idempotency
7139 GROUP BY task_name
7140 ORDER BY task_name
7141 "#,
7142 )
7143 .fetch_all(&self.pool)
7144 .await
7145 .map_err(|e| {
7146 CelersError::Other(format!("Failed to fetch idempotency statistics: {}", e))
7147 })?;
7148
7149 Ok(rows
7150 .into_iter()
7151 .map(
7152 |(
7153 task_name,
7154 total_keys,
7155 unique_keys,
7156 active_keys,
7157 expired_keys,
7158 oldest_key,
7159 newest_key,
7160 )| {
7161 IdempotencyStats {
7162 task_name,
7163 total_keys,
7164 unique_keys,
7165 active_keys,
7166 expired_keys,
7167 oldest_key,
7168 newest_key,
7169 }
7170 },
7171 )
7172 .collect())
7173 }
7174}
7175
7176#[derive(Debug, Clone, Serialize, Deserialize)]
7178pub struct SlowQueryInfo {
7179 pub query_text: String,
7180 pub execution_count: i64,
7181 pub avg_time_ms: f64,
7182 pub max_time_ms: f64,
7183 pub total_time_ms: f64,
7184}
7185
7186#[derive(Debug, Clone, Serialize, Deserialize)]
7188pub struct WorkerHeartbeat {
7189 pub worker_id: String,
7190 pub last_heartbeat: DateTime<Utc>,
7191 pub status: WorkerStatus,
7192 pub task_count: i64,
7193 pub capabilities: Option<serde_json::Value>,
7194}
7195
7196#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
7198#[serde(rename_all = "lowercase")]
7199pub enum WorkerStatus {
7200 Active,
7201 Idle,
7202 Busy,
7203 Offline,
7204}
7205
7206impl std::fmt::Display for WorkerStatus {
7207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
7208 match self {
7209 WorkerStatus::Active => write!(f, "active"),
7210 WorkerStatus::Idle => write!(f, "idle"),
7211 WorkerStatus::Busy => write!(f, "busy"),
7212 WorkerStatus::Offline => write!(f, "offline"),
7213 }
7214 }
7215}
7216
7217#[derive(Debug, Clone, Serialize, Deserialize)]
7219pub struct TaskGroup {
7220 pub group_id: String,
7221 pub task_ids: Vec<Uuid>,
7222 pub created_at: DateTime<Utc>,
7223 pub metadata: Option<serde_json::Value>,
7224}
7225
7226#[derive(Debug, Clone, Serialize, Deserialize)]
7228pub struct TaskGroupStatus {
7229 pub group_id: String,
7230 pub total_tasks: i64,
7231 pub pending_tasks: i64,
7232 pub processing_tasks: i64,
7233 pub completed_tasks: i64,
7234 pub failed_tasks: i64,
7235 pub cancelled_tasks: i64,
7236}
7237
7238#[derive(Debug, Clone)]
7240pub struct BatchResultInput {
7241 pub task_id: Uuid,
7242 pub task_name: String,
7243 pub status: TaskResultStatus,
7244 pub result: Option<serde_json::Value>,
7245 pub error: Option<String>,
7246 pub traceback: Option<String>,
7247 pub runtime_ms: Option<i64>,
7248}
7249
7250impl MysqlBroker {
7251 pub async fn store_result_batch(&self, results: &[BatchResultInput]) -> Result<u64> {
7295 if results.is_empty() {
7296 return Ok(0);
7297 }
7298
7299 let mut tx = self
7300 .pool
7301 .begin()
7302 .await
7303 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
7304
7305 let mut stored = 0u64;
7306 for result in results {
7307 let result_json = result
7308 .result
7309 .as_ref()
7310 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
7311 .unwrap_or_else(|| "null".to_string());
7312
7313 let rows_affected = sqlx::query(
7314 r#"
7315 INSERT INTO celers_task_results
7316 (task_id, task_name, status, result, error, traceback, runtime_ms, created_at, completed_at)
7317 VALUES (?, ?, ?, ?, ?, ?, ?, NOW(), NOW())
7318 ON DUPLICATE KEY UPDATE
7319 task_name = VALUES(task_name),
7320 status = VALUES(status),
7321 result = VALUES(result),
7322 error = VALUES(error),
7323 traceback = VALUES(traceback),
7324 runtime_ms = VALUES(runtime_ms),
7325 completed_at = NOW()
7326 "#,
7327 )
7328 .bind(result.task_id.to_string())
7329 .bind(&result.task_name)
7330 .bind(result.status.to_string())
7331 .bind(result_json)
7332 .bind(&result.error)
7333 .bind(&result.traceback)
7334 .bind(result.runtime_ms)
7335 .execute(&mut *tx)
7336 .await
7337 .map_err(|e| {
7338 CelersError::Other(format!("Failed to store result for task {}: {}", result.task_id, e))
7339 })?
7340 .rows_affected();
7341
7342 stored += rows_affected;
7343 }
7344
7345 tx.commit()
7346 .await
7347 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
7348
7349 Ok(stored)
7350 }
7351
7352 pub async fn get_result_batch(&self, task_ids: &[Uuid]) -> Result<Vec<TaskResult>> {
7376 if task_ids.is_empty() {
7377 return Ok(Vec::new());
7378 }
7379
7380 let placeholders = task_ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
7381 let query_str = format!(
7382 r#"
7383 SELECT task_id, task_name, status, result, error, traceback, created_at, completed_at, runtime_ms
7384 FROM celers_task_results
7385 WHERE task_id IN ({})
7386 "#,
7387 placeholders
7388 );
7389
7390 let mut query = sqlx::query_as::<
7391 _,
7392 (
7393 String,
7394 String,
7395 String,
7396 String,
7397 Option<String>,
7398 Option<String>,
7399 DateTime<Utc>,
7400 Option<DateTime<Utc>>,
7401 Option<i64>,
7402 ),
7403 >(&query_str);
7404 for task_id in task_ids {
7405 query = query.bind(task_id.to_string());
7406 }
7407
7408 let rows = query
7409 .fetch_all(&self.pool)
7410 .await
7411 .map_err(|e| CelersError::Other(format!("Failed to fetch results: {}", e)))?;
7412
7413 rows.into_iter()
7414 .map(
7415 |(
7416 task_id,
7417 task_name,
7418 status,
7419 result,
7420 error,
7421 traceback,
7422 created_at,
7423 completed_at,
7424 runtime_ms,
7425 )| {
7426 Ok(TaskResult {
7427 task_id: Uuid::parse_str(&task_id)
7428 .map_err(|e| CelersError::Other(format!("Invalid UUID: {}", e)))?,
7429 task_name,
7430 status: status.parse()?,
7431 result: serde_json::from_str(&result).ok(),
7432 error,
7433 traceback,
7434 created_at,
7435 completed_at,
7436 runtime_ms,
7437 })
7438 },
7439 )
7440 .collect()
7441 }
7442
7443 pub async fn enable_drain_mode(&self) -> Result<()> {
7469 sqlx::query(
7470 r#"
7471 INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
7472 VALUES (?, 'drain_mode', 'true', NOW())
7473 ON DUPLICATE KEY UPDATE config_value = 'true', updated_at = NOW()
7474 "#,
7475 )
7476 .bind(&self.queue_name)
7477 .execute(&self.pool)
7478 .await
7479 .map_err(|e| CelersError::Other(format!("Failed to enable drain mode: {}", e)))?;
7480
7481 Ok(())
7482 }
7483
7484 pub async fn disable_drain_mode(&self) -> Result<()> {
7486 sqlx::query(
7487 r#"
7488 INSERT INTO celers_queue_config (queue_name, config_key, config_value, updated_at)
7489 VALUES (?, 'drain_mode', 'false', NOW())
7490 ON DUPLICATE KEY UPDATE config_value = 'false', updated_at = NOW()
7491 "#,
7492 )
7493 .bind(&self.queue_name)
7494 .execute(&self.pool)
7495 .await
7496 .map_err(|e| CelersError::Other(format!("Failed to disable drain mode: {}", e)))?;
7497
7498 Ok(())
7499 }
7500
7501 pub async fn is_drain_mode(&self) -> Result<bool> {
7503 let row: Option<(String,)> = sqlx::query_as(
7504 r#"
7505 SELECT config_value
7506 FROM celers_queue_config
7507 WHERE queue_name = ? AND config_key = 'drain_mode'
7508 "#,
7509 )
7510 .bind(&self.queue_name)
7511 .fetch_optional(&self.pool)
7512 .await
7513 .map_err(|e| CelersError::Other(format!("Failed to check drain mode: {}", e)))?;
7514
7515 Ok(row.map(|(val,)| val == "true").unwrap_or(false))
7516 }
7517
7518 pub async fn register_worker(
7542 &self,
7543 worker_id: &str,
7544 status: WorkerStatus,
7545 capabilities: Option<serde_json::Value>,
7546 ) -> Result<()> {
7547 let capabilities_json = capabilities
7548 .as_ref()
7549 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
7550 .unwrap_or_else(|| "null".to_string());
7551
7552 sqlx::query(
7553 r#"
7554 INSERT INTO celers_worker_heartbeat
7555 (worker_id, queue_name, last_heartbeat, status, capabilities, task_count, updated_at)
7556 VALUES (?, ?, NOW(), ?, ?, 0, NOW())
7557 ON DUPLICATE KEY UPDATE
7558 last_heartbeat = NOW(),
7559 status = VALUES(status),
7560 capabilities = VALUES(capabilities),
7561 updated_at = NOW()
7562 "#,
7563 )
7564 .bind(worker_id)
7565 .bind(&self.queue_name)
7566 .bind(status.to_string())
7567 .bind(capabilities_json)
7568 .execute(&self.pool)
7569 .await
7570 .map_err(|e| {
7571 CelersError::Other(format!("Failed to register worker: {}", e))
7572 })?;
7573
7574 Ok(())
7575 }
7576
7577 pub async fn update_worker_heartbeat(
7579 &self,
7580 worker_id: &str,
7581 status: WorkerStatus,
7582 ) -> Result<()> {
7583 let rows_affected = sqlx::query(
7584 r#"
7585 UPDATE celers_worker_heartbeat
7586 SET last_heartbeat = NOW(), status = ?, updated_at = NOW()
7587 WHERE worker_id = ? AND queue_name = ?
7588 "#,
7589 )
7590 .bind(status.to_string())
7591 .bind(worker_id)
7592 .bind(&self.queue_name)
7593 .execute(&self.pool)
7594 .await
7595 .map_err(|e| CelersError::Other(format!("Failed to update worker heartbeat: {}", e)))?
7596 .rows_affected();
7597
7598 if rows_affected == 0 {
7599 return Err(CelersError::Other(format!(
7600 "Worker {} not found",
7601 worker_id
7602 )));
7603 }
7604
7605 Ok(())
7606 }
7607
7608 pub async fn get_all_worker_heartbeats(
7632 &self,
7633 stale_threshold_secs: i64,
7634 ) -> Result<Vec<WorkerHeartbeat>> {
7635 let rows: Vec<(String, DateTime<Utc>, String, i64, String)> = sqlx::query_as(
7636 r#"
7637 SELECT
7638 worker_id,
7639 last_heartbeat,
7640 CASE
7641 WHEN TIMESTAMPDIFF(SECOND, last_heartbeat, NOW()) > ? THEN 'offline'
7642 ELSE status
7643 END as status,
7644 task_count,
7645 COALESCE(capabilities, 'null') as capabilities
7646 FROM celers_worker_heartbeat
7647 WHERE queue_name = ?
7648 ORDER BY last_heartbeat DESC
7649 "#,
7650 )
7651 .bind(stale_threshold_secs)
7652 .bind(&self.queue_name)
7653 .fetch_all(&self.pool)
7654 .await
7655 .map_err(|e| CelersError::Other(format!("Failed to fetch worker heartbeats: {}", e)))?;
7656
7657 rows.into_iter()
7658 .map(
7659 |(worker_id, last_heartbeat, status, task_count, capabilities)| {
7660 let status = match status.as_str() {
7661 "active" => WorkerStatus::Active,
7662 "idle" => WorkerStatus::Idle,
7663 "busy" => WorkerStatus::Busy,
7664 _ => WorkerStatus::Offline,
7665 };
7666 let capabilities = serde_json::from_str(&capabilities).ok();
7667 Ok(WorkerHeartbeat {
7668 worker_id,
7669 last_heartbeat,
7670 status,
7671 task_count,
7672 capabilities,
7673 })
7674 },
7675 )
7676 .collect()
7677 }
7678
7679 pub async fn enqueue_group(
7715 &self,
7716 group_id: &str,
7717 tasks: Vec<SerializedTask>,
7718 metadata: Option<serde_json::Value>,
7719 ) -> Result<Vec<TaskId>> {
7720 if tasks.is_empty() {
7721 return Ok(Vec::new());
7722 }
7723
7724 let mut tx = self
7725 .pool
7726 .begin()
7727 .await
7728 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
7729
7730 let mut task_ids = Vec::new();
7731
7732 for task in tasks {
7733 let task_id = Uuid::new_v4();
7734
7735 sqlx::query(
7736 r#"
7737 INSERT INTO celers_tasks
7738 (id, task_name, payload, state, priority, retry_count, max_retries, created_at, scheduled_at, metadata)
7739 VALUES (?, ?, ?, 'pending', ?, 0, ?, NOW(), NOW(), ?)
7740 "#,
7741 )
7742 .bind(task_id.to_string())
7743 .bind(&task.metadata.name)
7744 .bind(&task.payload)
7745 .bind(task.metadata.priority)
7746 .bind(task.metadata.max_retries as i32)
7747 .bind(serde_json::to_string(&json!({"group_id": group_id})).unwrap_or_else(|_| "{}".to_string()))
7748 .execute(&mut *tx)
7749 .await
7750 .map_err(|e| {
7751 CelersError::Other(format!("Failed to insert task: {}", e))
7752 })?;
7753
7754 task_ids.push(task_id);
7755 }
7756
7757 let metadata_json = metadata
7759 .as_ref()
7760 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
7761 .unwrap_or_else(|| "null".to_string());
7762
7763 sqlx::query(
7764 r#"
7765 INSERT INTO celers_task_groups
7766 (group_id, queue_name, task_count, created_at, metadata)
7767 VALUES (?, ?, ?, NOW(), ?)
7768 "#,
7769 )
7770 .bind(group_id)
7771 .bind(&self.queue_name)
7772 .bind(task_ids.len() as i64)
7773 .bind(metadata_json)
7774 .execute(&mut *tx)
7775 .await
7776 .map_err(|e| CelersError::Other(format!("Failed to insert task group: {}", e)))?;
7777
7778 tx.commit()
7779 .await
7780 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
7781
7782 Ok(task_ids)
7783 }
7784
7785 pub async fn get_group_status(&self, group_id: &str) -> Result<TaskGroupStatus> {
7808 let row: (i64, i64, i64, i64, i64, i64) = sqlx::query_as(
7809 r#"
7810 SELECT
7811 COUNT(*) as total_tasks,
7812 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending_tasks,
7813 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing_tasks,
7814 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed_tasks,
7815 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed_tasks,
7816 SUM(CASE WHEN state = 'cancelled' THEN 1 ELSE 0 END) as cancelled_tasks
7817 FROM celers_tasks
7818 WHERE JSON_UNQUOTE(JSON_EXTRACT(metadata, '$.group_id')) = ?
7819 "#,
7820 )
7821 .bind(group_id)
7822 .fetch_one(&self.pool)
7823 .await
7824 .map_err(|e| CelersError::Other(format!("Failed to fetch group status: {}", e)))?;
7825
7826 Ok(TaskGroupStatus {
7827 group_id: group_id.to_string(),
7828 total_tasks: row.0,
7829 pending_tasks: row.1,
7830 processing_tasks: row.2,
7831 completed_tasks: row.3,
7832 failed_tasks: row.4,
7833 cancelled_tasks: row.5,
7834 })
7835 }
7836
7837 pub async fn check_connection_health(&self) -> Result<bool> {
7864 let start = std::time::Instant::now();
7866 let conn_result =
7867 tokio::time::timeout(std::time::Duration::from_secs(5), self.pool.acquire()).await;
7868
7869 match conn_result {
7870 Err(_) => {
7871 tracing::error!("Connection pool timeout: failed to acquire connection within 5s");
7872 Err(CelersError::Other(
7873 "Connection pool exhausted: timeout acquiring connection".to_string(),
7874 ))
7875 }
7876 Ok(Err(e)) => {
7877 tracing::error!(error = %e, "Connection pool error");
7878 Err(CelersError::Other(format!("Connection pool error: {}", e)))
7879 }
7880 Ok(Ok(mut conn)) => {
7881 let acquire_time = start.elapsed();
7882
7883 if acquire_time > std::time::Duration::from_secs(1) {
7885 tracing::warn!(
7886 acquire_time_ms = acquire_time.as_millis(),
7887 "Slow connection acquisition indicates pool pressure"
7888 );
7889 }
7890
7891 let query_start = std::time::Instant::now();
7893 let result = sqlx::query_scalar::<_, i64>("SELECT 1")
7894 .fetch_one(&mut *conn)
7895 .await;
7896
7897 match result {
7898 Err(e) => {
7899 tracing::error!(error = %e, "Database health check query failed");
7900 Err(CelersError::Other(format!("Database unresponsive: {}", e)))
7901 }
7902 Ok(_) => {
7903 let query_time = query_start.elapsed();
7904
7905 if query_time > std::time::Duration::from_millis(100) {
7907 tracing::warn!(
7908 query_time_ms = query_time.as_millis(),
7909 "Slow database response indicates potential issues"
7910 );
7911 return Ok(false); }
7913
7914 let pool_size = self.pool.size();
7916 let idle_conns = self.pool.num_idle() as u32;
7917 let active_conns = pool_size.saturating_sub(idle_conns);
7918 let utilization = (active_conns as f64 / pool_size as f64) * 100.0;
7919
7920 if utilization > 90.0 {
7921 tracing::warn!(
7922 utilization_percent = utilization,
7923 pool_size = pool_size,
7924 active = active_conns,
7925 idle = idle_conns,
7926 "High connection pool utilization"
7927 );
7928 return Ok(false); }
7930
7931 tracing::debug!(
7932 acquire_time_ms = acquire_time.as_millis(),
7933 query_time_ms = query_time.as_millis(),
7934 utilization_percent = utilization,
7935 "Connection pool health check passed"
7936 );
7937
7938 Ok(true) }
7940 }
7941 }
7942 }
7943 }
7944
7945 pub async fn replay_dlq_batch(
7978 &self,
7979 task_name_filter: Option<&str>,
7980 min_retry_count: Option<i32>,
7981 limit: i64,
7982 ) -> Result<u64> {
7983 let mut query = String::from("SELECT id FROM celers_dead_letter_queue WHERE 1=1");
7984
7985 if task_name_filter.is_some() {
7986 query.push_str(" AND task_name LIKE ?");
7987 }
7988
7989 if min_retry_count.is_some() {
7990 query.push_str(" AND retry_count >= ?");
7991 }
7992
7993 query.push_str(" ORDER BY failed_at ASC LIMIT ?");
7994
7995 let mut q = sqlx::query_scalar::<_, String>(&query);
7996
7997 if let Some(filter) = task_name_filter {
7998 q = q.bind(format!("%{}%", filter));
7999 }
8000
8001 if let Some(min_retries) = min_retry_count {
8002 q = q.bind(min_retries);
8003 }
8004
8005 q = q.bind(limit);
8006
8007 let dlq_ids = q
8008 .fetch_all(&self.pool)
8009 .await
8010 .map_err(|e| CelersError::Other(format!("Failed to fetch DLQ IDs: {}", e)))?;
8011
8012 let mut replayed = 0u64;
8013 for dlq_id in dlq_ids {
8014 match Uuid::parse_str(&dlq_id) {
8015 Ok(id) => {
8016 if self.requeue_from_dlq(&id).await.is_ok() {
8017 replayed += 1;
8018 }
8019 }
8020 Err(e) => {
8021 tracing::warn!(dlq_id = %dlq_id, error = %e, "Failed to parse DLQ ID");
8022 }
8023 }
8024 }
8025
8026 tracing::info!(
8027 replayed = replayed,
8028 task_filter = ?task_name_filter,
8029 min_retries = ?min_retry_count,
8030 "Batch replay from DLQ completed"
8031 );
8032
8033 Ok(replayed)
8034 }
8035
8036 pub async fn generate_load(
8078 &self,
8079 task_count: usize,
8080 task_name: &str,
8081 payload_size_bytes: usize,
8082 priority_range: Option<(i32, i32)>,
8083 ) -> Result<Vec<Uuid>> {
8084 use rand::Rng;
8085
8086 let mut tasks = Vec::with_capacity(task_count);
8087 let mut rng = rand::rng();
8088
8089 for _i in 0..task_count {
8090 let payload: Vec<u8> = (0..payload_size_bytes)
8092 .map(|_| rng.random::<u8>())
8093 .collect();
8094
8095 let mut task = SerializedTask::new(task_name.to_string(), payload);
8096
8097 if let Some((min_prio, max_prio)) = priority_range {
8099 task.metadata.priority = rng.random_range(min_prio..=max_prio);
8100 }
8101
8102 tasks.push(task);
8103 }
8104
8105 let task_ids = self.enqueue_batch(tasks).await?;
8106
8107 tracing::info!(
8108 count = task_count,
8109 task_name = task_name,
8110 payload_size = payload_size_bytes,
8111 "Generated synthetic load"
8112 );
8113
8114 Ok(task_ids)
8115 }
8116
8117 pub async fn verify_migrations(&self) -> Result<MigrationVerification> {
8147 let table_exists = sqlx::query_scalar::<_, i64>(
8149 "SELECT COUNT(*) FROM information_schema.tables
8150 WHERE table_schema = DATABASE()
8151 AND table_name = 'celers_migrations'",
8152 )
8153 .fetch_one(&self.pool)
8154 .await
8155 .map_err(|e| CelersError::Other(format!("Failed to check migrations table: {}", e)))?;
8156
8157 if table_exists == 0 {
8158 return Ok(MigrationVerification {
8159 is_complete: false,
8160 applied_count: 0,
8161 missing_count: 8, applied_migrations: vec![],
8163 missing_migrations: vec![
8164 "001_init.sql".to_string(),
8165 "002_results.sql".to_string(),
8166 "003_performance_indexes.sql".to_string(),
8167 "004_partitioning_guide.sql".to_string(),
8168 "005_uuid_optimization.sql".to_string(),
8169 "006_idempotency.sql".to_string(),
8170 "007_workflow.sql".to_string(),
8171 "008_production_features.sql".to_string(),
8172 ],
8173 schema_valid: false,
8174 });
8175 }
8176
8177 let applied: Vec<String> =
8179 sqlx::query_scalar("SELECT version FROM celers_migrations ORDER BY version")
8180 .fetch_all(&self.pool)
8181 .await
8182 .map_err(|e| {
8183 CelersError::Other(format!("Failed to fetch applied migrations: {}", e))
8184 })?;
8185
8186 let expected = [
8187 "001_init.sql",
8188 "002_results.sql",
8189 "003_performance_indexes.sql",
8190 "006_idempotency.sql",
8191 "008_production_features.sql",
8192 ];
8193
8194 let missing: Vec<String> = expected
8195 .iter()
8196 .filter(|&v| !applied.contains(&v.to_string()))
8197 .map(|s| s.to_string())
8198 .collect();
8199
8200 let core_tables = vec![
8202 "celers_tasks",
8203 "celers_dead_letter_queue",
8204 "celers_task_results",
8205 "celers_task_idempotency",
8206 "celers_queue_config",
8207 "celers_worker_heartbeat",
8208 "celers_task_groups",
8209 ];
8210
8211 let mut schema_valid = true;
8212 for table in &core_tables {
8213 let exists = sqlx::query_scalar::<_, i64>(
8214 "SELECT COUNT(*) FROM information_schema.tables
8215 WHERE table_schema = DATABASE() AND table_name = ?",
8216 )
8217 .bind(table)
8218 .fetch_one(&self.pool)
8219 .await
8220 .map_err(|e| CelersError::Other(format!("Failed to check table {}: {}", table, e)))?;
8221
8222 if exists == 0 {
8223 schema_valid = false;
8224 tracing::warn!(table = table, "Core table missing");
8225 }
8226 }
8227
8228 Ok(MigrationVerification {
8229 is_complete: missing.is_empty() && schema_valid,
8230 applied_count: applied.len(),
8231 missing_count: missing.len(),
8232 applied_migrations: applied,
8233 missing_migrations: missing,
8234 schema_valid,
8235 })
8236 }
8237
8238 pub async fn profile_query_performance(
8270 &self,
8271 min_execution_time_ms: f64,
8272 limit: i64,
8273 ) -> Result<Vec<QueryPerformanceProfile>> {
8274 let rows = sqlx::query(
8275 "SELECT
8276 DIGEST_TEXT as query_digest,
8277 COUNT_STAR as execution_count,
8278 AVG_TIMER_WAIT / 1000000000000 as avg_execution_time_ms,
8279 SUM_ROWS_EXAMINED as total_rows_examined,
8280 SUM_ROWS_SENT as total_rows_sent,
8281 SUM_NO_INDEX_USED as no_index_used_count,
8282 SUM_NO_GOOD_INDEX_USED as no_good_index_used_count
8283 FROM performance_schema.events_statements_summary_by_digest
8284 WHERE DIGEST_TEXT IS NOT NULL
8285 AND SCHEMA_NAME = DATABASE()
8286 AND AVG_TIMER_WAIT / 1000000000000 >= ?
8287 ORDER BY AVG_TIMER_WAIT DESC
8288 LIMIT ?",
8289 )
8290 .bind(min_execution_time_ms)
8291 .bind(limit)
8292 .fetch_all(&self.pool)
8293 .await;
8294
8295 let rows = match rows {
8296 Ok(r) => r,
8297 Err(e) => {
8298 tracing::warn!(error = %e, "Failed to query performance_schema");
8300 return Ok(vec![]);
8301 }
8302 };
8303
8304 let mut profiles = Vec::new();
8305 for row in rows {
8306 let query_digest: String = row.try_get("query_digest").unwrap_or_default();
8307 let execution_count: i64 = row.try_get("execution_count").unwrap_or(0);
8308 let avg_time: rust_decimal::Decimal =
8309 row.try_get("avg_execution_time_ms").unwrap_or_default();
8310 let rows_examined: i64 = row.try_get("total_rows_examined").unwrap_or(0);
8311 let rows_sent: i64 = row.try_get("total_rows_sent").unwrap_or(0);
8312 let no_index: i64 = row.try_get("no_index_used_count").unwrap_or(0);
8313 let no_good_index: i64 = row.try_get("no_good_index_used_count").unwrap_or(0);
8314
8315 if query_digest.is_empty() {
8316 continue; }
8318
8319 profiles.push(QueryPerformanceProfile {
8320 query_digest,
8321 execution_count,
8322 avg_execution_time_ms: avg_time.to_string().parse().unwrap_or(0.0),
8323 total_rows_examined: rows_examined,
8324 total_rows_sent: rows_sent,
8325 no_index_used_count: no_index,
8326 no_good_index_used_count: no_good_index,
8327 needs_optimization: no_index > 0 || no_good_index > 0,
8328 });
8329 }
8330
8331 Ok(profiles)
8332 }
8333
8334 #[allow(dead_code)]
8370 pub async fn ack_batch_with_results(
8371 &self,
8372 tasks_with_results: &[(TaskId, Option<String>, BatchResultInput)],
8373 ) -> Result<()> {
8374 if tasks_with_results.is_empty() {
8375 return Ok(());
8376 }
8377
8378 let mut tx = self
8379 .pool
8380 .begin()
8381 .await
8382 .map_err(|e| CelersError::Other(format!("Failed to begin transaction: {}", e)))?;
8383
8384 for (task_id, _receipt_handle, _) in tasks_with_results {
8386 sqlx::query(
8387 "UPDATE celers_tasks
8388 SET state = 'completed', completed_at = NOW()
8389 WHERE id = ? AND state = 'processing'",
8390 )
8391 .bind(task_id.to_string())
8392 .execute(&mut *tx)
8393 .await
8394 .map_err(|e| {
8395 CelersError::Other(format!("Failed to acknowledge task {}: {}", task_id, e))
8396 })?;
8397 }
8398
8399 for (_, _, result) in tasks_with_results {
8401 let result_json = result
8402 .result
8403 .as_ref()
8404 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
8405 .unwrap_or_else(|| "null".to_string());
8406
8407 sqlx::query(
8408 r#"
8409 INSERT INTO celers_task_results
8410 (task_id, task_name, status, result, error, traceback, runtime_ms, created_at, completed_at)
8411 VALUES (?, ?, ?, ?, ?, ?, ?, NOW(), NOW())
8412 ON DUPLICATE KEY UPDATE
8413 task_name = VALUES(task_name),
8414 status = VALUES(status),
8415 result = VALUES(result),
8416 error = VALUES(error),
8417 traceback = VALUES(traceback),
8418 runtime_ms = VALUES(runtime_ms),
8419 completed_at = NOW()
8420 "#,
8421 )
8422 .bind(result.task_id.to_string())
8423 .bind(&result.task_name)
8424 .bind(result.status.to_string())
8425 .bind(result_json)
8426 .bind(&result.error)
8427 .bind(&result.traceback)
8428 .bind(result.runtime_ms)
8429 .execute(&mut *tx)
8430 .await
8431 .map_err(|e| {
8432 CelersError::Other(format!("Failed to store result for task {}: {}", result.task_id, e))
8433 })?;
8434 }
8435
8436 tx.commit()
8437 .await
8438 .map_err(|e| CelersError::Other(format!("Failed to commit transaction: {}", e)))?;
8439
8440 tracing::info!(
8441 count = tasks_with_results.len(),
8442 "Acknowledged tasks with results"
8443 );
8444 Ok(())
8445 }
8446
8447 #[allow(dead_code)]
8465 pub async fn warmup_connection_pool(&self) -> Result<()> {
8466 let pool_options = self.pool.options();
8468 let min_connections = pool_options.get_min_connections();
8469
8470 tracing::info!(
8471 min_connections = min_connections,
8472 "Warming up connection pool"
8473 );
8474
8475 for i in 0..min_connections {
8477 let _ = sqlx::query("SELECT 1")
8478 .fetch_one(&self.pool)
8479 .await
8480 .map_err(|e| {
8481 CelersError::Other(format!("Failed to warm up connection {}: {}", i, e))
8482 })?;
8483 }
8484
8485 tracing::info!("Connection pool warmup complete");
8486 Ok(())
8487 }
8488
8489 #[allow(dead_code)]
8511 pub async fn get_task_latency_stats(&self) -> Result<TaskLatencyStats> {
8512 let row = sqlx::query(
8513 "SELECT
8514 COUNT(*) as task_count,
8515 MIN(TIMESTAMPDIFF(SECOND, created_at, started_at)) as min_latency,
8516 MAX(TIMESTAMPDIFF(SECOND, created_at, started_at)) as max_latency,
8517 AVG(TIMESTAMPDIFF(SECOND, created_at, started_at)) as avg_latency,
8518 STDDEV(TIMESTAMPDIFF(SECOND, created_at, started_at)) as stddev_latency
8519 FROM celers_tasks
8520 WHERE state IN ('processing', 'completed')
8521 AND started_at IS NOT NULL",
8522 )
8523 .fetch_one(&self.pool)
8524 .await
8525 .map_err(|e| CelersError::Other(format!("Failed to get task latency stats: {}", e)))?;
8526
8527 let task_count: i64 = row.try_get("task_count").unwrap_or(0);
8528 let min_latency: Option<i64> = row.try_get("min_latency").ok();
8529 let max_latency: Option<i64> = row.try_get("max_latency").ok();
8530 let avg_latency: Option<rust_decimal::Decimal> = row.try_get("avg_latency").ok();
8531 let stddev_latency: Option<rust_decimal::Decimal> = row.try_get("stddev_latency").ok();
8532
8533 Ok(TaskLatencyStats {
8534 task_count,
8535 min_latency_secs: min_latency.unwrap_or(0) as f64,
8536 max_latency_secs: max_latency.unwrap_or(0) as f64,
8537 avg_latency_secs: avg_latency
8538 .map(|d| d.to_string().parse().unwrap_or(0.0))
8539 .unwrap_or(0.0),
8540 stddev_latency_secs: stddev_latency
8541 .map(|d| d.to_string().parse().unwrap_or(0.0))
8542 .unwrap_or(0.0),
8543 })
8544 }
8545
8546 #[allow(dead_code)]
8570 pub async fn get_priority_queue_stats(&self) -> Result<Vec<PriorityQueueStats>> {
8571 let rows = sqlx::query(
8572 "SELECT
8573 priority,
8574 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending_count,
8575 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing_count,
8576 SUM(CASE WHEN state = 'completed' THEN 1 ELSE 0 END) as completed_count,
8577 SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) as failed_count,
8578 AVG(CASE WHEN started_at IS NOT NULL
8579 THEN TIMESTAMPDIFF(SECOND, created_at, started_at)
8580 ELSE NULL END) as avg_wait_time_secs
8581 FROM celers_tasks
8582 GROUP BY priority
8583 ORDER BY priority DESC",
8584 )
8585 .fetch_all(&self.pool)
8586 .await
8587 .map_err(|e| CelersError::Other(format!("Failed to get priority queue stats: {}", e)))?;
8588
8589 let mut stats = Vec::new();
8590 for row in rows {
8591 let priority: i32 = row.try_get("priority").unwrap_or(0);
8592 let pending_count: i64 = row.try_get("pending_count").unwrap_or(0);
8593 let processing_count: i64 = row.try_get("processing_count").unwrap_or(0);
8594 let completed_count: i64 = row.try_get("completed_count").unwrap_or(0);
8595 let failed_count: i64 = row.try_get("failed_count").unwrap_or(0);
8596 let avg_wait_time: Option<rust_decimal::Decimal> =
8597 row.try_get("avg_wait_time_secs").ok();
8598
8599 stats.push(PriorityQueueStats {
8600 priority,
8601 pending_count,
8602 processing_count,
8603 completed_count,
8604 failed_count,
8605 avg_wait_time_secs: avg_wait_time
8606 .map(|d| d.to_string().parse().unwrap_or(0.0))
8607 .unwrap_or(0.0),
8608 });
8609 }
8610
8611 Ok(stats)
8612 }
8613
8614 #[allow(dead_code)]
8636 pub async fn get_task_execution_stats(&self) -> Result<TaskExecutionStats> {
8637 let row = sqlx::query(
8638 "SELECT
8639 COUNT(*) as task_count,
8640 MIN(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as min_execution,
8641 MAX(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as max_execution,
8642 AVG(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as avg_execution,
8643 STDDEV(TIMESTAMPDIFF(SECOND, started_at, completed_at)) as stddev_execution
8644 FROM celers_tasks
8645 WHERE state = 'completed'
8646 AND started_at IS NOT NULL
8647 AND completed_at IS NOT NULL",
8648 )
8649 .fetch_one(&self.pool)
8650 .await
8651 .map_err(|e| CelersError::Other(format!("Failed to get task execution stats: {}", e)))?;
8652
8653 let task_count: i64 = row.try_get("task_count").unwrap_or(0);
8654 let min_execution: Option<i64> = row.try_get("min_execution").ok();
8655 let max_execution: Option<i64> = row.try_get("max_execution").ok();
8656 let avg_execution: Option<rust_decimal::Decimal> = row.try_get("avg_execution").ok();
8657 let stddev_execution: Option<rust_decimal::Decimal> = row.try_get("stddev_execution").ok();
8658
8659 let p95_row = sqlx::query(
8661 "SELECT TIMESTAMPDIFF(SECOND, started_at, completed_at) as execution_time
8662 FROM celers_tasks
8663 WHERE state = 'completed'
8664 AND started_at IS NOT NULL
8665 AND completed_at IS NOT NULL
8666 ORDER BY execution_time DESC
8667 LIMIT 1 OFFSET ?",
8668 )
8669 .bind((task_count as f64 * 0.05).ceil() as i64)
8670 .fetch_optional(&self.pool)
8671 .await
8672 .map_err(|e| CelersError::Other(format!("Failed to calculate P95: {}", e)))?;
8673
8674 let p95_execution = p95_row
8675 .and_then(|r| r.try_get::<i64, _>("execution_time").ok())
8676 .unwrap_or(0);
8677
8678 Ok(TaskExecutionStats {
8679 task_count,
8680 min_execution_secs: min_execution.unwrap_or(0) as f64,
8681 max_execution_secs: max_execution.unwrap_or(0) as f64,
8682 avg_execution_secs: avg_execution
8683 .map(|d| d.to_string().parse().unwrap_or(0.0))
8684 .unwrap_or(0.0),
8685 stddev_execution_secs: stddev_execution
8686 .map(|d| d.to_string().parse().unwrap_or(0.0))
8687 .unwrap_or(0.0),
8688 p95_execution_secs: p95_execution as f64,
8689 })
8690 }
8691
8692 #[allow(dead_code)]
8720 pub async fn get_queue_saturation(&self, capacity_threshold: i64) -> Result<QueueSaturation> {
8721 if capacity_threshold <= 0 {
8722 return Err(CelersError::Other(
8723 "Capacity threshold must be positive".to_string(),
8724 ));
8725 }
8726
8727 let row = sqlx::query(
8728 "SELECT
8729 SUM(CASE WHEN state = 'pending' THEN 1 ELSE 0 END) as pending_count,
8730 SUM(CASE WHEN state = 'processing' THEN 1 ELSE 0 END) as processing_count,
8731 COUNT(*) as total_tasks
8732 FROM celers_tasks",
8733 )
8734 .fetch_one(&self.pool)
8735 .await
8736 .map_err(|e| CelersError::Other(format!("Failed to get queue saturation: {}", e)))?;
8737
8738 let pending_count: i64 = row.try_get("pending_count").unwrap_or(0);
8739 let processing_count: i64 = row.try_get("processing_count").unwrap_or(0);
8740 let total_tasks: i64 = row.try_get("total_tasks").unwrap_or(0);
8741
8742 let utilization_percent =
8743 (pending_count as f64 / capacity_threshold as f64 * 100.0).min(100.0);
8744 let is_saturated = pending_count >= (capacity_threshold as f64 * 0.8) as i64; let is_critical = pending_count >= (capacity_threshold as f64 * 0.95) as i64; let status = if is_critical {
8748 "critical".to_string()
8749 } else if is_saturated {
8750 "warning".to_string()
8751 } else {
8752 "healthy".to_string()
8753 };
8754
8755 Ok(QueueSaturation {
8756 pending_count,
8757 processing_count,
8758 total_tasks,
8759 capacity_threshold,
8760 utilization_percent,
8761 is_saturated,
8762 is_critical,
8763 status,
8764 })
8765 }
8766
8767 #[allow(dead_code)]
8790 pub async fn get_task_latency_percentiles(&self) -> Result<TaskLatencyPercentiles> {
8791 let count_row = sqlx::query(
8793 "SELECT COUNT(*) as task_count
8794 FROM celers_tasks
8795 WHERE state IN ('processing', 'completed')
8796 AND started_at IS NOT NULL",
8797 )
8798 .fetch_one(&self.pool)
8799 .await
8800 .map_err(|e| CelersError::Other(format!("Failed to count tasks: {}", e)))?;
8801
8802 let task_count: i64 = count_row.try_get("task_count").unwrap_or(0);
8803
8804 if task_count == 0 {
8805 return Ok(TaskLatencyPercentiles {
8806 task_count: 0,
8807 p50_latency_secs: 0.0,
8808 p95_latency_secs: 0.0,
8809 p99_latency_secs: 0.0,
8810 });
8811 }
8812
8813 let p50_offset = (task_count as f64 * 0.5) as i64;
8815 let p50_row = sqlx::query(
8816 "SELECT TIMESTAMPDIFF(SECOND, created_at, started_at) as latency
8817 FROM celers_tasks
8818 WHERE state IN ('processing', 'completed')
8819 AND started_at IS NOT NULL
8820 ORDER BY latency
8821 LIMIT 1 OFFSET ?",
8822 )
8823 .bind(p50_offset)
8824 .fetch_optional(&self.pool)
8825 .await
8826 .map_err(|e| CelersError::Other(format!("Failed to calculate P50: {}", e)))?;
8827
8828 let p50_latency = p50_row
8829 .and_then(|r| r.try_get::<i64, _>("latency").ok())
8830 .unwrap_or(0);
8831
8832 let p95_offset = (task_count as f64 * 0.95) as i64;
8834 let p95_row = sqlx::query(
8835 "SELECT TIMESTAMPDIFF(SECOND, created_at, started_at) as latency
8836 FROM celers_tasks
8837 WHERE state IN ('processing', 'completed')
8838 AND started_at IS NOT NULL
8839 ORDER BY latency
8840 LIMIT 1 OFFSET ?",
8841 )
8842 .bind(p95_offset)
8843 .fetch_optional(&self.pool)
8844 .await
8845 .map_err(|e| CelersError::Other(format!("Failed to calculate P95: {}", e)))?;
8846
8847 let p95_latency = p95_row
8848 .and_then(|r| r.try_get::<i64, _>("latency").ok())
8849 .unwrap_or(0);
8850
8851 let p99_offset = (task_count as f64 * 0.99) as i64;
8853 let p99_row = sqlx::query(
8854 "SELECT TIMESTAMPDIFF(SECOND, created_at, started_at) as latency
8855 FROM celers_tasks
8856 WHERE state IN ('processing', 'completed')
8857 AND started_at IS NOT NULL
8858 ORDER BY latency
8859 LIMIT 1 OFFSET ?",
8860 )
8861 .bind(p99_offset)
8862 .fetch_optional(&self.pool)
8863 .await
8864 .map_err(|e| CelersError::Other(format!("Failed to calculate P99: {}", e)))?;
8865
8866 let p99_latency = p99_row
8867 .and_then(|r| r.try_get::<i64, _>("latency").ok())
8868 .unwrap_or(0);
8869
8870 Ok(TaskLatencyPercentiles {
8871 task_count,
8872 p50_latency_secs: p50_latency as f64,
8873 p95_latency_secs: p95_latency as f64,
8874 p99_latency_secs: p99_latency as f64,
8875 })
8876 }
8877
8878 #[allow(dead_code)]
8908 pub async fn get_task_state_transitions(
8909 &self,
8910 task_id: &TaskId,
8911 ) -> Result<Vec<TaskStateTransition>> {
8912 let row = sqlx::query(
8915 "SELECT state, created_at, started_at, completed_at
8916 FROM celers_tasks
8917 WHERE id = ?",
8918 )
8919 .bind(task_id.to_string())
8920 .fetch_optional(&self.pool)
8921 .await
8922 .map_err(|e| CelersError::Other(format!("Failed to get task state transitions: {}", e)))?;
8923
8924 let Some(row) = row else {
8925 return Ok(vec![]);
8926 };
8927
8928 let current_state: String = row.try_get("state").unwrap_or_default();
8929 let created_at: chrono::DateTime<Utc> =
8930 row.try_get("created_at").unwrap_or_else(|_| Utc::now());
8931 let started_at: Option<chrono::DateTime<Utc>> = row.try_get("started_at").ok();
8932 let completed_at: Option<chrono::DateTime<Utc>> = row.try_get("completed_at").ok();
8933
8934 let mut transitions = Vec::new();
8935
8936 transitions.push(TaskStateTransition {
8938 task_id: *task_id,
8939 from_state: None,
8940 to_state: "pending".to_string(),
8941 transitioned_at: created_at,
8942 });
8943
8944 if let Some(started) = started_at {
8945 transitions.push(TaskStateTransition {
8946 task_id: *task_id,
8947 from_state: Some("pending".to_string()),
8948 to_state: "processing".to_string(),
8949 transitioned_at: started,
8950 });
8951 }
8952
8953 if let Some(completed) = completed_at {
8954 transitions.push(TaskStateTransition {
8955 task_id: *task_id,
8956 from_state: Some("processing".to_string()),
8957 to_state: current_state,
8958 transitioned_at: completed,
8959 });
8960 }
8961
8962 Ok(transitions)
8963 }
8964}
8965
8966#[derive(Debug, Clone, Serialize, Deserialize)]
8968pub struct TaskExecutionStats {
8969 pub task_count: i64,
8971 pub min_execution_secs: f64,
8973 pub max_execution_secs: f64,
8975 pub avg_execution_secs: f64,
8977 pub stddev_execution_secs: f64,
8979 pub p95_execution_secs: f64,
8981}
8982
8983#[derive(Debug, Clone, Serialize, Deserialize)]
8985pub struct QueueSaturation {
8986 pub pending_count: i64,
8988 pub processing_count: i64,
8990 pub total_tasks: i64,
8992 pub capacity_threshold: i64,
8994 pub utilization_percent: f64,
8996 pub is_saturated: bool,
8998 pub is_critical: bool,
9000 pub status: String,
9002}
9003
9004#[derive(Debug, Clone, Serialize, Deserialize)]
9006pub struct TaskLatencyPercentiles {
9007 pub task_count: i64,
9009 pub p50_latency_secs: f64,
9011 pub p95_latency_secs: f64,
9013 pub p99_latency_secs: f64,
9015}
9016
9017#[derive(Debug, Clone, Serialize, Deserialize)]
9019pub struct TaskStateTransition {
9020 pub task_id: TaskId,
9022 pub from_state: Option<String>,
9024 pub to_state: String,
9026 pub transitioned_at: chrono::DateTime<Utc>,
9028}
9029
9030#[derive(Debug, Clone, Serialize, Deserialize)]
9032pub struct TaskLatencyStats {
9033 pub task_count: i64,
9035 pub min_latency_secs: f64,
9037 pub max_latency_secs: f64,
9039 pub avg_latency_secs: f64,
9041 pub stddev_latency_secs: f64,
9043}
9044
9045#[derive(Debug, Clone, Serialize, Deserialize)]
9047pub struct PriorityQueueStats {
9048 pub priority: i32,
9050 pub pending_count: i64,
9052 pub processing_count: i64,
9054 pub completed_count: i64,
9056 pub failed_count: i64,
9058 pub avg_wait_time_secs: f64,
9060}
9061
9062#[derive(Debug, Clone, Serialize, Deserialize)]
9064pub struct MigrationVerification {
9065 pub is_complete: bool,
9067 pub applied_count: usize,
9069 pub missing_count: usize,
9071 pub applied_migrations: Vec<String>,
9073 pub missing_migrations: Vec<String>,
9075 pub schema_valid: bool,
9077}
9078
9079#[derive(Debug, Clone, Serialize, Deserialize)]
9081pub struct QueryPerformanceProfile {
9082 pub query_digest: String,
9084 pub execution_count: i64,
9086 pub avg_execution_time_ms: f64,
9088 pub total_rows_examined: i64,
9090 pub total_rows_sent: i64,
9092 pub no_index_used_count: i64,
9094 pub no_good_index_used_count: i64,
9096 pub needs_optimization: bool,
9098}
9099
9100#[cfg(test)]
9101mod tests {
9102 use super::*;
9103
9104 #[test]
9105 fn test_db_task_state_display() {
9106 assert_eq!(DbTaskState::Pending.to_string(), "pending");
9107 assert_eq!(DbTaskState::Processing.to_string(), "processing");
9108 assert_eq!(DbTaskState::Completed.to_string(), "completed");
9109 assert_eq!(DbTaskState::Failed.to_string(), "failed");
9110 assert_eq!(DbTaskState::Cancelled.to_string(), "cancelled");
9111 }
9112
9113 #[test]
9114 fn test_db_task_state_from_str() {
9115 assert_eq!(
9116 "pending".parse::<DbTaskState>().unwrap(),
9117 DbTaskState::Pending
9118 );
9119 assert_eq!(
9120 "processing".parse::<DbTaskState>().unwrap(),
9121 DbTaskState::Processing
9122 );
9123 assert_eq!(
9124 "completed".parse::<DbTaskState>().unwrap(),
9125 DbTaskState::Completed
9126 );
9127 assert_eq!(
9128 "failed".parse::<DbTaskState>().unwrap(),
9129 DbTaskState::Failed
9130 );
9131 assert_eq!(
9132 "cancelled".parse::<DbTaskState>().unwrap(),
9133 DbTaskState::Cancelled
9134 );
9135 assert_eq!(
9137 "PENDING".parse::<DbTaskState>().unwrap(),
9138 DbTaskState::Pending
9139 );
9140 assert_eq!(
9141 "Completed".parse::<DbTaskState>().unwrap(),
9142 DbTaskState::Completed
9143 );
9144 }
9145
9146 #[test]
9147 fn test_db_task_state_invalid() {
9148 assert!("invalid".parse::<DbTaskState>().is_err());
9149 assert!("".parse::<DbTaskState>().is_err());
9150 }
9151
9152 #[test]
9153 fn test_queue_statistics_default() {
9154 let stats = QueueStatistics::default();
9155 assert_eq!(stats.pending, 0);
9156 assert_eq!(stats.processing, 0);
9157 assert_eq!(stats.completed, 0);
9158 assert_eq!(stats.failed, 0);
9159 assert_eq!(stats.cancelled, 0);
9160 assert_eq!(stats.dlq, 0);
9161 assert_eq!(stats.total, 0);
9162 }
9163
9164 #[test]
9165 fn test_db_task_state_serialization() {
9166 let state = DbTaskState::Pending;
9167 let json = serde_json::to_string(&state).unwrap();
9168 assert_eq!(json, "\"pending\"");
9169
9170 let deserialized: DbTaskState = serde_json::from_str(&json).unwrap();
9171 assert_eq!(deserialized, state);
9172 }
9173
9174 #[test]
9175 fn test_task_result_status_display() {
9176 assert_eq!(TaskResultStatus::Pending.to_string(), "PENDING");
9177 assert_eq!(TaskResultStatus::Started.to_string(), "STARTED");
9178 assert_eq!(TaskResultStatus::Success.to_string(), "SUCCESS");
9179 assert_eq!(TaskResultStatus::Failure.to_string(), "FAILURE");
9180 assert_eq!(TaskResultStatus::Retry.to_string(), "RETRY");
9181 assert_eq!(TaskResultStatus::Revoked.to_string(), "REVOKED");
9182 }
9183
9184 #[test]
9185 fn test_task_result_status_from_str() {
9186 assert_eq!(
9187 "PENDING".parse::<TaskResultStatus>().unwrap(),
9188 TaskResultStatus::Pending
9189 );
9190 assert_eq!(
9191 "STARTED".parse::<TaskResultStatus>().unwrap(),
9192 TaskResultStatus::Started
9193 );
9194 assert_eq!(
9195 "SUCCESS".parse::<TaskResultStatus>().unwrap(),
9196 TaskResultStatus::Success
9197 );
9198 assert_eq!(
9199 "FAILURE".parse::<TaskResultStatus>().unwrap(),
9200 TaskResultStatus::Failure
9201 );
9202 assert_eq!(
9203 "RETRY".parse::<TaskResultStatus>().unwrap(),
9204 TaskResultStatus::Retry
9205 );
9206 assert_eq!(
9207 "REVOKED".parse::<TaskResultStatus>().unwrap(),
9208 TaskResultStatus::Revoked
9209 );
9210 assert_eq!(
9212 "pending".parse::<TaskResultStatus>().unwrap(),
9213 TaskResultStatus::Pending
9214 );
9215 assert_eq!(
9216 "Success".parse::<TaskResultStatus>().unwrap(),
9217 TaskResultStatus::Success
9218 );
9219 }
9220
9221 #[test]
9222 fn test_task_result_status_invalid() {
9223 assert!("invalid".parse::<TaskResultStatus>().is_err());
9224 assert!("".parse::<TaskResultStatus>().is_err());
9225 }
9226
9227 #[test]
9228 fn test_task_result_status_serialization() {
9229 let status = TaskResultStatus::Success;
9230 let json = serde_json::to_string(&status).unwrap();
9231 assert_eq!(json, "\"success\"");
9232
9233 let deserialized: TaskResultStatus = serde_json::from_str(&json).unwrap();
9234 assert_eq!(deserialized, status);
9235 }
9236
9237 #[tokio::test]
9238 #[ignore] async fn test_mysql_broker_creation() {
9240 let database_url = std::env::var("MYSQL_URL")
9241 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9242
9243 let broker = MysqlBroker::new(&database_url).await;
9244 assert!(broker.is_ok());
9245 }
9246
9247 #[tokio::test]
9248 #[ignore] async fn test_mysql_broker_lifecycle() {
9250 let database_url = std::env::var("MYSQL_URL")
9251 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9252
9253 let broker = MysqlBroker::new(&database_url).await.unwrap();
9254 broker.migrate().await.unwrap();
9255
9256 let task = SerializedTask::new("test_task".to_string(), vec![1, 2, 3, 4]);
9258 let task_id = task.metadata.id;
9259
9260 let returned_id = broker.enqueue(task.clone()).await.unwrap();
9261 assert_eq!(returned_id, task_id);
9262
9263 let size = broker.queue_size().await.unwrap();
9265 assert!(size >= 1);
9266
9267 let msg = broker.dequeue().await.unwrap();
9269 assert!(msg.is_some());
9270 let msg = msg.unwrap();
9271 assert_eq!(msg.task.metadata.name, "test_task");
9272
9273 broker
9275 .ack(&msg.task.metadata.id, msg.receipt_handle.as_deref())
9276 .await
9277 .unwrap();
9278 }
9279
9280 #[tokio::test]
9281 #[ignore] async fn test_mysql_queue_pause_resume() {
9283 let database_url = std::env::var("MYSQL_URL")
9284 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9285
9286 let broker = MysqlBroker::new(&database_url).await.unwrap();
9287 broker.migrate().await.unwrap();
9288
9289 assert!(!broker.is_paused());
9291
9292 broker.pause();
9294 assert!(broker.is_paused());
9295
9296 let task = SerializedTask::new("pause_test".to_string(), vec![1, 2, 3]);
9298 broker.enqueue(task).await.unwrap();
9299
9300 let msg = broker.dequeue().await.unwrap();
9301 assert!(msg.is_none());
9302
9303 broker.resume();
9305 assert!(!broker.is_paused());
9306
9307 let msg = broker.dequeue().await.unwrap();
9309 assert!(msg.is_some());
9310 }
9311
9312 #[tokio::test]
9313 #[ignore] async fn test_mysql_statistics() {
9315 let database_url = std::env::var("MYSQL_URL")
9316 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9317
9318 let broker = MysqlBroker::new(&database_url).await.unwrap();
9319 broker.migrate().await.unwrap();
9320
9321 let stats = broker.get_statistics().await.unwrap();
9322 assert!(stats.total >= 0);
9323 }
9324
9325 #[tokio::test]
9326 #[ignore] async fn test_mysql_health_check() {
9328 let database_url = std::env::var("MYSQL_URL")
9329 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9330
9331 let broker = MysqlBroker::new(&database_url).await.unwrap();
9332
9333 let health = broker.check_health().await.unwrap();
9334 assert!(health.healthy);
9335 assert!(!health.database_version.is_empty());
9336 }
9337
9338 #[tokio::test]
9341 #[ignore] async fn test_batch_operations() {
9343 let database_url = std::env::var("MYSQL_URL")
9344 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9345
9346 let broker = MysqlBroker::new(&database_url).await.unwrap();
9347 broker.migrate().await.unwrap();
9348
9349 let tasks: Vec<_> = (0..10)
9351 .map(|i| SerializedTask::new(format!("task_{}", i), vec![i as u8]))
9352 .collect();
9353
9354 let task_ids = broker.enqueue_batch(tasks).await.unwrap();
9355 assert_eq!(task_ids.len(), 10);
9356
9357 let messages = broker.dequeue_batch(5).await.unwrap();
9359 assert_eq!(messages.len(), 5);
9360
9361 let ack_tasks: Vec<_> = messages
9363 .iter()
9364 .map(|m| (m.task.metadata.id, m.receipt_handle.clone()))
9365 .collect();
9366 broker.ack_batch(&ack_tasks).await.unwrap();
9367
9368 let remaining = broker.queue_size().await.unwrap();
9370 assert_eq!(remaining, 5);
9371 }
9372
9373 #[tokio::test]
9374 #[ignore] async fn test_task_chain() {
9376 let database_url = std::env::var("MYSQL_URL")
9377 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9378
9379 let broker = MysqlBroker::new(&database_url).await.unwrap();
9380 broker.migrate().await.unwrap();
9381
9382 let chain = TaskChain::new()
9384 .then(SerializedTask::new("step1".to_string(), vec![1]))
9385 .then(SerializedTask::new("step2".to_string(), vec![2]))
9386 .then(SerializedTask::new("step3".to_string(), vec![3]))
9387 .with_delay(2);
9388
9389 let task_ids = broker.enqueue_chain(chain).await.unwrap();
9390 assert_eq!(task_ids.len(), 3);
9391
9392 let scheduled = broker.list_scheduled_tasks(10, 0).await.unwrap();
9394 assert!(scheduled.len() >= 2); }
9396
9397 #[tokio::test]
9398 #[ignore] async fn test_connection_diagnostics() {
9400 let database_url = std::env::var("MYSQL_URL")
9401 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9402
9403 let broker = MysqlBroker::new(&database_url).await.unwrap();
9404
9405 let diag = broker.get_connection_diagnostics();
9406 assert!(diag.max_connections > 0);
9407 assert!(diag.pool_utilization_percent >= 0.0);
9408 assert!(diag.pool_utilization_percent <= 100.0);
9409 }
9410
9411 #[tokio::test]
9412 #[ignore] async fn test_performance_metrics() {
9414 let database_url = std::env::var("MYSQL_URL")
9415 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9416
9417 let broker = MysqlBroker::new(&database_url).await.unwrap();
9418 broker.migrate().await.unwrap();
9419
9420 let metrics = broker.get_performance_metrics().await.unwrap();
9421 assert!(metrics.queue_depth >= 0);
9422 assert!(metrics.processing_tasks >= 0);
9423 assert!(metrics.dlq_size >= 0);
9424 assert!(metrics.connection_pool.max_connections > 0);
9425 }
9426
9427 #[tokio::test]
9428 #[ignore] async fn test_migration_tracking() {
9430 let database_url = std::env::var("MYSQL_URL")
9431 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9432
9433 let broker = MysqlBroker::new(&database_url).await.unwrap();
9434 broker.migrate().await.unwrap();
9435
9436 let migrations = broker.list_migrations().await.unwrap();
9438 assert!(migrations.len() >= 3); let versions: Vec<_> = migrations.iter().map(|m| m.version.as_str()).collect();
9442 assert!(versions.contains(&"001"));
9443 assert!(versions.contains(&"002"));
9444 assert!(versions.contains(&"003"));
9445 }
9446
9447 #[tokio::test]
9448 #[ignore] async fn test_is_ready() {
9450 let database_url = std::env::var("MYSQL_URL")
9451 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9452
9453 let broker = MysqlBroker::new(&database_url).await.unwrap();
9454
9455 let ready = broker.is_ready().await;
9456 assert!(ready);
9457 }
9458
9459 #[tokio::test]
9462 #[ignore] async fn test_concurrent_dequeue() {
9464 let database_url = std::env::var("MYSQL_URL")
9465 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9466
9467 let broker = MysqlBroker::new(&database_url).await.unwrap();
9468 broker.migrate().await.unwrap();
9469
9470 let tasks: Vec<_> = (0..20)
9472 .map(|i| SerializedTask::new(format!("concurrent_{}", i), vec![i as u8]))
9473 .collect();
9474 broker.enqueue_batch(tasks).await.unwrap();
9475
9476 let mut handles = vec![];
9478 for worker_id in 0..5 {
9479 let db_url = database_url.clone();
9480 let handle = tokio::spawn(async move {
9481 let worker_broker = MysqlBroker::new(&db_url).await.unwrap();
9482 let mut dequeued = 0;
9483
9484 for _ in 0..10 {
9485 if let Ok(Some(msg)) = worker_broker.dequeue().await {
9486 dequeued += 1;
9487 let _ = worker_broker
9489 .ack(&msg.task.metadata.id, msg.receipt_handle.as_deref())
9490 .await;
9491 }
9492 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
9493 }
9494
9495 (worker_id, dequeued)
9496 });
9497 handles.push(handle);
9498 }
9499
9500 let results = futures::future::join_all(handles).await;
9502
9503 let total_dequeued: usize = results
9504 .iter()
9505 .filter_map(|r| r.as_ref().ok())
9506 .map(|(_, count)| *count)
9507 .sum();
9508
9509 assert_eq!(total_dequeued, 20);
9511 }
9512
9513 #[tokio::test]
9514 #[ignore] async fn test_skip_locked_behavior() {
9516 let database_url = std::env::var("MYSQL_URL")
9517 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9518
9519 let broker1 = MysqlBroker::new(&database_url).await.unwrap();
9520 broker1.migrate().await.unwrap();
9521
9522 for i in 0..10 {
9524 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9525 broker1.enqueue(task).await.unwrap();
9526 }
9527
9528 let broker2 = MysqlBroker::new(&database_url).await.unwrap();
9529
9530 let (msg1, msg2) = tokio::join!(broker1.dequeue(), broker2.dequeue());
9532
9533 assert!(msg1.is_ok());
9535 assert!(msg2.is_ok());
9536
9537 if let (Ok(Some(m1)), Ok(Some(m2))) = (msg1, msg2) {
9538 assert_ne!(m1.task.metadata.id, m2.task.metadata.id);
9540 }
9541 }
9542
9543 #[test]
9546 fn test_pool_config_default() {
9547 let config = PoolConfig::default();
9548 assert_eq!(config.max_connections, 20);
9549 assert_eq!(config.min_connections, 2);
9550 assert_eq!(config.acquire_timeout_secs, 5);
9551 assert_eq!(config.max_lifetime_secs, Some(1800));
9552 assert_eq!(config.idle_timeout_secs, Some(600));
9553 }
9554
9555 #[test]
9556 fn test_task_chain_builder() {
9557 let task1 = SerializedTask::new("task1".to_string(), vec![1]);
9558 let task2 = SerializedTask::new("task2".to_string(), vec![2]);
9559
9560 let chain = TaskChain::new().then(task1).then(task2).with_delay(5);
9561
9562 assert_eq!(chain.tasks().len(), 2);
9563 assert_eq!(chain.delay_between_secs(), Some(5));
9564 }
9565
9566 #[test]
9569 fn test_worker_statistics_serialization() {
9570 let stats = WorkerStatistics {
9571 worker_id: "worker-123".to_string(),
9572 active_tasks: 5,
9573 completed_tasks: 100,
9574 failed_tasks: 3,
9575 last_seen: Utc::now(),
9576 avg_task_duration_secs: 2.5,
9577 };
9578
9579 let json = serde_json::to_string(&stats).unwrap();
9581 let deserialized: WorkerStatistics = serde_json::from_str(&json).unwrap();
9582
9583 assert_eq!(deserialized.worker_id, "worker-123");
9584 assert_eq!(deserialized.active_tasks, 5);
9585 assert_eq!(deserialized.completed_tasks, 100);
9586 assert_eq!(deserialized.failed_tasks, 3);
9587 assert_eq!(deserialized.avg_task_duration_secs, 2.5);
9588 }
9589
9590 #[test]
9591 fn test_task_age_distribution_serialization() {
9592 let dist = TaskAgeDistribution {
9593 bucket_label: "< 1 min".to_string(),
9594 task_count: 42,
9595 oldest_task_age_secs: 55,
9596 };
9597
9598 let json = serde_json::to_string(&dist).unwrap();
9599 let deserialized: TaskAgeDistribution = serde_json::from_str(&json).unwrap();
9600
9601 assert_eq!(deserialized.bucket_label, "< 1 min");
9602 assert_eq!(deserialized.task_count, 42);
9603 assert_eq!(deserialized.oldest_task_age_secs, 55);
9604 }
9605
9606 #[test]
9607 fn test_retry_statistics_serialization() {
9608 let stats = RetryStatistics {
9609 task_name: "failing_task".to_string(),
9610 total_retries: 150,
9611 unique_tasks: 50,
9612 avg_retries_per_task: 3.0,
9613 max_retries_observed: 5,
9614 };
9615
9616 let json = serde_json::to_string(&stats).unwrap();
9617 let deserialized: RetryStatistics = serde_json::from_str(&json).unwrap();
9618
9619 assert_eq!(deserialized.task_name, "failing_task");
9620 assert_eq!(deserialized.total_retries, 150);
9621 assert_eq!(deserialized.unique_tasks, 50);
9622 assert_eq!(deserialized.avg_retries_per_task, 3.0);
9623 assert_eq!(deserialized.max_retries_observed, 5);
9624 }
9625
9626 #[tokio::test]
9629 #[ignore] async fn test_cancel_batch() {
9631 let database_url = std::env::var("MYSQL_URL")
9632 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9633
9634 let broker = MysqlBroker::new(&database_url).await.unwrap();
9635 broker.migrate().await.unwrap();
9636
9637 let mut task_ids = Vec::new();
9639 for i in 0..10 {
9640 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9641 let task_id = broker.enqueue(task).await.unwrap();
9642 task_ids.push(task_id);
9643 }
9644
9645 let to_cancel = &task_ids[0..5];
9647 let cancelled = broker.cancel_batch(to_cancel).await.unwrap();
9648 assert_eq!(cancelled, 5);
9649
9650 let stats = broker.get_statistics().await.unwrap();
9652 assert_eq!(stats.cancelled, 5);
9653 assert_eq!(stats.pending, 5);
9654 }
9655
9656 #[tokio::test]
9657 #[ignore] async fn test_worker_statistics() {
9659 let database_url = std::env::var("MYSQL_URL")
9660 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9661
9662 let broker = MysqlBroker::new(&database_url).await.unwrap();
9663 broker.migrate().await.unwrap();
9664
9665 let task = SerializedTask::new("test_task".to_string(), vec![1, 2, 3]);
9667 broker.enqueue(task).await.unwrap();
9668
9669 let msg = broker
9670 .dequeue_with_worker_id("test-worker-123")
9671 .await
9672 .unwrap()
9673 .unwrap();
9674
9675 let stats = broker
9677 .get_worker_statistics("test-worker-123")
9678 .await
9679 .unwrap();
9680
9681 assert_eq!(stats.worker_id, "test-worker-123");
9682 assert_eq!(stats.active_tasks, 1);
9683
9684 broker
9686 .ack(&msg.task_id(), msg.receipt_handle.as_deref())
9687 .await
9688 .unwrap();
9689
9690 let stats = broker
9692 .get_worker_statistics("test-worker-123")
9693 .await
9694 .unwrap();
9695 assert_eq!(stats.active_tasks, 0);
9696 assert_eq!(stats.completed_tasks, 1);
9697 }
9698
9699 #[tokio::test]
9700 #[ignore] async fn test_count_by_state_quick() {
9702 let database_url = std::env::var("MYSQL_URL")
9703 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9704
9705 let broker = MysqlBroker::new(&database_url).await.unwrap();
9706 broker.migrate().await.unwrap();
9707
9708 for i in 0..5 {
9710 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9711 broker.enqueue(task).await.unwrap();
9712 }
9713
9714 let pending_count = broker
9716 .count_by_state_quick(DbTaskState::Pending)
9717 .await
9718 .unwrap();
9719 assert_eq!(pending_count, 5);
9720
9721 broker.dequeue().await.unwrap();
9723
9724 let processing_count = broker
9726 .count_by_state_quick(DbTaskState::Processing)
9727 .await
9728 .unwrap();
9729 assert_eq!(processing_count, 1);
9730 }
9731
9732 #[tokio::test]
9733 #[ignore] async fn test_task_age_distribution() {
9735 let database_url = std::env::var("MYSQL_URL")
9736 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9737
9738 let broker = MysqlBroker::new(&database_url).await.unwrap();
9739 broker.migrate().await.unwrap();
9740
9741 for i in 0..10 {
9743 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9744 broker.enqueue(task).await.unwrap();
9745 }
9746
9747 let distribution = broker.get_task_age_distribution().await.unwrap();
9749
9750 assert!(!distribution.is_empty());
9752
9753 let youngest = distribution.first().unwrap();
9755 assert_eq!(youngest.bucket_label, "< 1 min");
9756 assert_eq!(youngest.task_count, 10);
9757 }
9758
9759 #[tokio::test]
9760 #[ignore] async fn test_retry_statistics() {
9762 let database_url = std::env::var("MYSQL_URL")
9763 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9764
9765 let broker = MysqlBroker::new(&database_url).await.unwrap();
9766 broker.migrate().await.unwrap();
9767
9768 for i in 0..3 {
9770 let task = SerializedTask::new("failing_task".to_string(), vec![i as u8]);
9771 let _task_id = broker.enqueue(task).await.unwrap();
9772
9773 let msg = broker.dequeue().await.unwrap().unwrap();
9775 broker
9776 .reject(&msg.task_id(), msg.receipt_handle.as_deref(), true)
9777 .await
9778 .unwrap();
9779 }
9780
9781 let stats = broker.get_retry_statistics().await.unwrap();
9783
9784 if !stats.is_empty() {
9786 let task_stats = &stats[0];
9787 assert_eq!(task_stats.task_name, "failing_task");
9788 assert!(task_stats.total_retries > 0);
9789 }
9790 }
9791
9792 #[tokio::test]
9793 #[ignore] async fn test_list_active_workers() {
9795 let database_url = std::env::var("MYSQL_URL")
9796 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9797
9798 let broker = MysqlBroker::new(&database_url).await.unwrap();
9799 broker.migrate().await.unwrap();
9800
9801 for i in 0..3 {
9803 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9804 broker.enqueue(task).await.unwrap();
9805 }
9806
9807 let _msg1 = broker
9809 .dequeue_with_worker_id("worker-1")
9810 .await
9811 .unwrap()
9812 .unwrap();
9813 let _msg2 = broker
9814 .dequeue_with_worker_id("worker-2")
9815 .await
9816 .unwrap()
9817 .unwrap();
9818
9819 let workers = broker.list_active_workers().await.unwrap();
9821 assert_eq!(workers.len(), 2);
9822 assert!(workers.contains(&"worker-1".to_string()));
9823 assert!(workers.contains(&"worker-2".to_string()));
9824 }
9825
9826 #[tokio::test]
9827 #[ignore] async fn test_get_all_worker_statistics() {
9829 let database_url = std::env::var("MYSQL_URL")
9830 .unwrap_or_else(|_| "mysql://root:password@localhost/celers_test".to_string());
9831
9832 let broker = MysqlBroker::new(&database_url).await.unwrap();
9833 broker.migrate().await.unwrap();
9834
9835 for i in 0..2 {
9837 let task = SerializedTask::new(format!("task_{}", i), vec![i as u8]);
9838 broker.enqueue(task).await.unwrap();
9839 }
9840
9841 let _msg1 = broker
9843 .dequeue_with_worker_id("worker-alpha")
9844 .await
9845 .unwrap()
9846 .unwrap();
9847 let _msg2 = broker
9848 .dequeue_with_worker_id("worker-beta")
9849 .await
9850 .unwrap()
9851 .unwrap();
9852
9853 let all_stats = broker.get_all_worker_statistics().await.unwrap();
9855 assert_eq!(all_stats.len(), 2);
9856
9857 for stats in &all_stats {
9859 assert!(stats.worker_id == "worker-alpha" || stats.worker_id == "worker-beta");
9860 assert_eq!(stats.active_tasks, 1);
9861 }
9862 }
9863
9864 #[test]
9865 fn test_circuit_breaker_initial_state() {
9866 let config = CircuitBreakerConfig::default();
9867 let cb_internal = CircuitBreakerStateInternal::new(config);
9868
9869 assert_eq!(cb_internal.state, CircuitBreakerState::Closed);
9870 assert_eq!(cb_internal.failure_count, 0);
9871 assert_eq!(cb_internal.success_count, 0);
9872 assert!(cb_internal.last_failure_time.is_none());
9873 }
9874
9875 #[test]
9876 fn test_circuit_breaker_config_default() {
9877 let config = CircuitBreakerConfig::default();
9878 assert_eq!(config.failure_threshold, 5);
9879 assert_eq!(config.timeout_secs, 60);
9880 assert_eq!(config.success_threshold, 2);
9881 }
9882
9883 #[tokio::test]
9884 async fn test_circuit_breaker_stats() {
9885 let database_url = "mysql://test:test@localhost/test";
9887 let result = MysqlBroker::new(database_url).await;
9888
9889 if result.is_err() {
9891 return;
9894 }
9895
9896 let broker = result.unwrap();
9897 let stats = broker.get_circuit_breaker_stats();
9898
9899 assert_eq!(stats.state, CircuitBreakerState::Closed);
9900 assert_eq!(stats.failure_count, 0);
9901 assert_eq!(stats.success_count, 0);
9902 }
9903
9904 #[tokio::test]
9905 async fn test_circuit_breaker_reset() {
9906 let database_url = "mysql://test:test@localhost/test";
9907 let result = MysqlBroker::new(database_url).await;
9908
9909 if result.is_err() {
9910 return;
9911 }
9912
9913 let broker = result.unwrap();
9914
9915 for _ in 0..3 {
9917 broker.record_failure();
9918 }
9919
9920 let stats_before = broker.get_circuit_breaker_stats();
9921 assert_eq!(stats_before.failure_count, 3);
9922
9923 broker.reset_circuit_breaker();
9925
9926 let stats_after = broker.get_circuit_breaker_stats();
9927 assert_eq!(stats_after.state, CircuitBreakerState::Closed);
9928 assert_eq!(stats_after.failure_count, 0);
9929 assert_eq!(stats_after.success_count, 0);
9930 }
9931
9932 #[test]
9933 fn test_circuit_breaker_state_serialization() {
9934 let state = CircuitBreakerState::Closed;
9936 let json = serde_json::to_string(&state).unwrap();
9937 let deserialized: CircuitBreakerState = serde_json::from_str(&json).unwrap();
9938 assert_eq!(state, deserialized);
9939
9940 let state = CircuitBreakerState::Open;
9942 let json = serde_json::to_string(&state).unwrap();
9943 let deserialized: CircuitBreakerState = serde_json::from_str(&json).unwrap();
9944 assert_eq!(state, deserialized);
9945
9946 let state = CircuitBreakerState::HalfOpen;
9948 let json = serde_json::to_string(&state).unwrap();
9949 let deserialized: CircuitBreakerState = serde_json::from_str(&json).unwrap();
9950 assert_eq!(state, deserialized);
9951 }
9952
9953 #[test]
9954 fn test_circuit_breaker_stats_serialization() {
9955 let stats = CircuitBreakerStats {
9956 state: CircuitBreakerState::Open,
9957 failure_count: 5,
9958 success_count: 0,
9959 last_failure_time: Some(Utc::now()),
9960 last_state_change: Utc::now(),
9961 };
9962
9963 let json = serde_json::to_string(&stats).unwrap();
9964 let deserialized: CircuitBreakerStats = serde_json::from_str(&json).unwrap();
9965
9966 assert_eq!(stats.state, deserialized.state);
9967 assert_eq!(stats.failure_count, deserialized.failure_count);
9968 assert_eq!(stats.success_count, deserialized.success_count);
9969 }
9970}