1use celers_core::{CelersError, SerializedTask};
7use chrono::{DateTime, Datelike, Timelike, Utc};
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum DbTaskState {
15 Pending,
16 Processing,
17 Completed,
18 Failed,
19 Cancelled,
20}
21
22impl std::fmt::Display for DbTaskState {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 DbTaskState::Pending => write!(f, "pending"),
26 DbTaskState::Processing => write!(f, "processing"),
27 DbTaskState::Completed => write!(f, "completed"),
28 DbTaskState::Failed => write!(f, "failed"),
29 DbTaskState::Cancelled => write!(f, "cancelled"),
30 }
31 }
32}
33
34impl std::str::FromStr for DbTaskState {
35 type Err = CelersError;
36
37 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
38 match s.to_lowercase().as_str() {
39 "pending" => Ok(DbTaskState::Pending),
40 "processing" => Ok(DbTaskState::Processing),
41 "completed" => Ok(DbTaskState::Completed),
42 "failed" => Ok(DbTaskState::Failed),
43 "cancelled" => Ok(DbTaskState::Cancelled),
44 _ => Err(CelersError::Other(format!("Unknown task state: {}", s))),
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct TaskInfo {
52 pub id: Uuid,
53 pub task_name: String,
54 pub state: DbTaskState,
55 pub priority: i32,
56 pub retry_count: i32,
57 pub max_retries: i32,
58 pub created_at: DateTime<Utc>,
59 pub scheduled_at: DateTime<Utc>,
60 pub started_at: Option<DateTime<Utc>>,
61 pub completed_at: Option<DateTime<Utc>>,
62 pub worker_id: Option<String>,
63 pub error_message: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct DlqTaskInfo {
69 pub id: Uuid,
70 pub task_id: Uuid,
71 pub task_name: String,
72 pub retry_count: i32,
73 pub error_message: Option<String>,
74 pub failed_at: DateTime<Utc>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct HealthStatus {
80 pub healthy: bool,
81 pub connection_pool_size: u32,
82 pub idle_connections: u32,
83 pub pending_tasks: i64,
84 pub processing_tasks: i64,
85 pub dlq_tasks: i64,
86 pub database_version: String,
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
91pub struct QueueStatistics {
92 pub pending: i64,
93 pub processing: i64,
94 pub completed: i64,
95 pub failed: i64,
96 pub cancelled: i64,
97 pub dlq: i64,
98 pub total: i64,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct TaskResult {
104 pub task_id: Uuid,
105 pub task_name: String,
106 pub status: TaskResultStatus,
107 pub result: Option<serde_json::Value>,
108 pub error: Option<String>,
109 pub traceback: Option<String>,
110 pub created_at: DateTime<Utc>,
111 pub completed_at: Option<DateTime<Utc>>,
112 pub runtime_ms: Option<i64>,
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
117#[serde(rename_all = "lowercase")]
118pub enum TaskResultStatus {
119 Pending,
120 Started,
121 Success,
122 Failure,
123 Retry,
124 Revoked,
125}
126
127impl std::fmt::Display for TaskResultStatus {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 TaskResultStatus::Pending => write!(f, "PENDING"),
131 TaskResultStatus::Started => write!(f, "STARTED"),
132 TaskResultStatus::Success => write!(f, "SUCCESS"),
133 TaskResultStatus::Failure => write!(f, "FAILURE"),
134 TaskResultStatus::Retry => write!(f, "RETRY"),
135 TaskResultStatus::Revoked => write!(f, "REVOKED"),
136 }
137 }
138}
139
140impl std::str::FromStr for TaskResultStatus {
141 type Err = CelersError;
142
143 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
144 match s.to_uppercase().as_str() {
145 "PENDING" => Ok(TaskResultStatus::Pending),
146 "STARTED" => Ok(TaskResultStatus::Started),
147 "SUCCESS" => Ok(TaskResultStatus::Success),
148 "FAILURE" => Ok(TaskResultStatus::Failure),
149 "RETRY" => Ok(TaskResultStatus::Retry),
150 "REVOKED" => Ok(TaskResultStatus::Revoked),
151 _ => Err(CelersError::Other(format!("Unknown result status: {}", s))),
152 }
153 }
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct TableSizeInfo {
159 pub table_name: String,
160 pub row_count: i64,
161 pub data_size_bytes: i64,
162 pub index_size_bytes: i64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct TaskNameCount {
168 pub task_name: String,
169 pub pending: i64,
170 pub processing: i64,
171 pub completed: i64,
172 pub failed: i64,
173 pub total: i64,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct ScheduledTaskInfo {
179 pub id: Uuid,
180 pub task_name: String,
181 pub priority: i32,
182 pub scheduled_at: DateTime<Utc>,
183 pub created_at: DateTime<Utc>,
184 pub delay_remaining_secs: i64,
185}
186
187#[derive(Debug, Clone)]
189pub struct PoolConfig {
190 pub max_connections: u32,
192 pub min_connections: u32,
194 pub acquire_timeout_secs: u64,
196 pub max_lifetime_secs: Option<u64>,
198 pub idle_timeout_secs: Option<u64>,
200}
201
202impl Default for PoolConfig {
203 fn default() -> Self {
204 Self {
205 max_connections: 20,
206 min_connections: 2,
207 acquire_timeout_secs: 5,
208 max_lifetime_secs: Some(1800), idle_timeout_secs: Some(600), }
211 }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct QueryStats {
217 pub query_name: String,
218 pub execution_count: i64,
219 pub total_time_ms: i64,
220 pub avg_time_ms: f64,
221 pub min_time_ms: i64,
222 pub max_time_ms: i64,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct IndexStats {
228 pub table_name: String,
229 pub index_name: String,
230 pub cardinality: i64,
231 pub unique_values: bool,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct QueryPlan {
237 pub id: i32,
238 pub select_type: String,
239 pub table: Option<String>,
240 pub query_type: Option<String>,
241 pub possible_keys: Option<String>,
242 pub key_used: Option<String>,
243 pub key_length: Option<String>,
244 pub rows_examined: Option<i64>,
245 pub filtered: Option<f64>,
246 pub extra: Option<String>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct MigrationInfo {
252 pub version: String,
253 pub name: String,
254 pub applied_at: DateTime<Utc>,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct ConnectionDiagnostics {
260 pub total_connections: u32,
261 pub idle_connections: u32,
262 pub active_connections: u32,
263 pub max_connections: u32,
264 pub connection_wait_time_ms: Option<i64>,
265 pub pool_utilization_percent: f64,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
270pub struct PerformanceMetrics {
271 pub timestamp: DateTime<Utc>,
272 pub tasks_per_second: f64,
273 pub avg_dequeue_time_ms: f64,
274 pub avg_enqueue_time_ms: f64,
275 pub queue_depth: i64,
276 pub processing_tasks: i64,
277 pub dlq_size: i64,
278 pub connection_pool: ConnectionDiagnostics,
279}
280
281#[derive(Debug, Clone)]
283pub struct TaskChain {
284 tasks: Vec<SerializedTask>,
285 delay_between_secs: Option<u64>,
286}
287
288impl TaskChain {
289 pub fn new() -> Self {
291 Self {
292 tasks: Vec::new(),
293 delay_between_secs: None,
294 }
295 }
296
297 pub fn then(mut self, task: SerializedTask) -> Self {
299 self.tasks.push(task);
300 self
301 }
302
303 pub fn with_delay(mut self, delay_secs: u64) -> Self {
305 self.delay_between_secs = Some(delay_secs);
306 self
307 }
308
309 pub fn tasks(&self) -> &[SerializedTask] {
311 &self.tasks
312 }
313
314 pub fn delay_between_secs(&self) -> Option<u64> {
316 self.delay_between_secs
317 }
318}
319
320impl Default for TaskChain {
321 fn default() -> Self {
322 Self::new()
323 }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct WorkerStatistics {
329 pub worker_id: String,
330 pub active_tasks: i64,
331 pub completed_tasks: i64,
332 pub failed_tasks: i64,
333 pub last_seen: DateTime<Utc>,
334 pub avg_task_duration_secs: f64,
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct TaskAgeDistribution {
340 pub bucket_label: String,
341 pub task_count: i64,
342 pub oldest_task_age_secs: i64,
343}
344
345#[derive(Debug, Clone, Serialize, Deserialize)]
347pub struct RetryStatistics {
348 pub task_name: String,
349 pub total_retries: i64,
350 pub unique_tasks: i64,
351 pub avg_retries_per_task: f64,
352 pub max_retries_observed: i32,
353}
354
355#[derive(Debug, Clone, Serialize, Deserialize)]
357pub struct QueueHealth {
358 pub overall_status: String, pub pending_tasks: i64,
360 pub processing_tasks: i64,
361 pub oldest_pending_age_secs: i64,
362 pub active_workers: i64,
363 pub queue_backlog_minutes: f64,
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
368pub struct TaskThroughput {
369 pub completed_last_minute: i64,
370 pub completed_last_hour: i64,
371 pub failed_last_minute: i64,
372 pub failed_last_hour: i64,
373 pub tasks_per_second: f64,
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct DlqStatistics {
379 pub total_tasks: i64,
380 pub by_task_name: Vec<DlqTaskStats>,
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
385pub struct DlqTaskStats {
386 pub task_name: String,
387 pub count: i64,
388 pub avg_retries: Option<f64>,
389 pub max_retries: i32,
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize)]
394pub struct TaskProgress {
395 pub task_id: Uuid,
396 pub progress_percent: f64,
397 pub current_step: Option<String>,
398 pub total_steps: Option<i32>,
399 pub updated_at: DateTime<Utc>,
400}
401
402#[derive(Debug, Clone, Serialize, Deserialize)]
404pub struct RateLimit {
405 pub task_name: String,
406 pub max_per_second: f64,
407 pub max_per_minute: i64,
408 pub max_per_hour: i64,
409}
410
411#[derive(Debug, Clone, Serialize, Deserialize)]
413pub struct RateLimitStatus {
414 pub task_name: String,
415 pub current_per_second: f64,
416 pub current_per_minute: i64,
417 pub current_per_hour: i64,
418 pub limit_exceeded: bool,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct RecurringTaskConfig {
424 pub task_name: String,
425 pub schedule: RecurringSchedule,
426 pub payload: Vec<u8>,
427 pub priority: i32,
428 pub enabled: bool,
429 pub last_run: Option<DateTime<Utc>>,
430 pub next_run: DateTime<Utc>,
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize)]
435pub enum RecurringSchedule {
436 EverySeconds(u64),
438 EveryMinutes(u64),
440 EveryHours(u64),
442 EveryDays(u64, u32, u32),
444 Weekly(u32, u32, u32),
446 Monthly(u32, u32, u32),
448}
449
450impl RecurringSchedule {
451 pub fn next_run_from(&self, from: DateTime<Utc>) -> DateTime<Utc> {
453 match self {
454 RecurringSchedule::EverySeconds(secs) => from + chrono::Duration::seconds(*secs as i64),
455 RecurringSchedule::EveryMinutes(mins) => from + chrono::Duration::minutes(*mins as i64),
456 RecurringSchedule::EveryHours(hours) => from + chrono::Duration::hours(*hours as i64),
457 RecurringSchedule::EveryDays(days, hour, minute) => {
458 let mut next = from + chrono::Duration::days(*days as i64);
459 next = next
460 .with_hour(*hour)
461 .and_then(|dt| dt.with_minute(*minute))
462 .and_then(|dt| dt.with_second(0))
463 .unwrap_or(next);
464 if next <= from {
465 next += chrono::Duration::days(1);
466 }
467 next
468 }
469 RecurringSchedule::Weekly(day_of_week, hour, minute) => {
470 let mut next = from;
471 let current_weekday = from.weekday().num_days_from_sunday();
472 let days_until = ((*day_of_week + 7 - current_weekday) % 7) as i64;
473 next += chrono::Duration::days(if days_until == 0 { 7 } else { days_until });
474 next = next
475 .with_hour(*hour)
476 .and_then(|dt| dt.with_minute(*minute))
477 .and_then(|dt| dt.with_second(0))
478 .unwrap_or(next);
479 next
480 }
481 RecurringSchedule::Monthly(day, hour, minute) => {
482 let mut next = from;
483 if let Some(dt) = next
484 .with_day(*day)
485 .and_then(|dt| dt.with_hour(*hour))
486 .and_then(|dt| dt.with_minute(*minute))
487 .and_then(|dt| dt.with_second(0))
488 {
489 next = dt;
490 if next <= from {
491 next += chrono::Duration::days(30);
493 next = next
494 .with_day(*day)
495 .and_then(|dt| dt.with_hour(*hour))
496 .and_then(|dt| dt.with_minute(*minute))
497 .and_then(|dt| dt.with_second(0))
498 .unwrap_or(next);
499 }
500 }
501 next
502 }
503 }
504 }
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
509pub struct RetryPolicy {
510 pub max_retries: u32,
512 pub strategy: RetryStrategy,
514}
515
516#[derive(Debug, Clone, Serialize, Deserialize)]
518pub enum RetryStrategy {
519 Fixed(u64),
521 Linear { base_delay_secs: u64 },
523 Exponential {
525 base_delay_secs: u64,
526 multiplier: f64,
527 max_delay_secs: u64,
528 },
529 ExponentialWithJitter {
531 base_delay_secs: u64,
532 multiplier: f64,
533 max_delay_secs: u64,
534 },
535}
536
537impl RetryStrategy {
538 pub fn calculate_delay(&self, attempt: u32) -> u64 {
540 match self {
541 RetryStrategy::Fixed(delay) => *delay,
542 RetryStrategy::Linear { base_delay_secs } => base_delay_secs * (attempt as u64 + 1),
543 RetryStrategy::Exponential {
544 base_delay_secs,
545 multiplier,
546 max_delay_secs,
547 } => {
548 let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
549 delay.min(*max_delay_secs as f64) as u64
550 }
551 RetryStrategy::ExponentialWithJitter {
552 base_delay_secs,
553 multiplier,
554 max_delay_secs,
555 } => {
556 let delay = (*base_delay_secs as f64) * multiplier.powi(attempt as i32);
557 let max_delay = delay.min(*max_delay_secs as f64);
558 let jitter = (max_delay * 0.25 * (attempt as f64 % 1.0).abs()) as u64;
560 (max_delay as u64).saturating_sub(jitter)
561 }
562 }
563 }
564}
565
566impl Default for RetryPolicy {
567 fn default() -> Self {
568 Self {
569 max_retries: 3,
570 strategy: RetryStrategy::ExponentialWithJitter {
571 base_delay_secs: 1,
572 multiplier: 2.0,
573 max_delay_secs: 300, },
575 }
576 }
577}