1use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::time::Duration;
15use thiserror::Error;
16use uuid::Uuid;
17
18#[derive(Debug, Error)]
20pub enum BackendError {
21 #[error("Redis error: {0}")]
22 Redis(#[from] redis::RedisError),
23
24 #[error("Serialization error: {0}")]
25 Serialization(String),
26
27 #[error("Result not found: {0}")]
28 NotFound(Uuid),
29
30 #[error("Connection error: {0}")]
31 Connection(String),
32}
33
34impl BackendError {
35 pub fn is_redis(&self) -> bool {
37 matches!(self, BackendError::Redis(_))
38 }
39
40 pub fn is_serialization(&self) -> bool {
42 matches!(self, BackendError::Serialization(_))
43 }
44
45 pub fn is_not_found(&self) -> bool {
47 matches!(self, BackendError::NotFound(_))
48 }
49
50 pub fn is_connection(&self) -> bool {
52 matches!(self, BackendError::Connection(_))
53 }
54
55 pub fn is_retryable(&self) -> bool {
60 matches!(self, BackendError::Redis(_) | BackendError::Connection(_))
61 }
62
63 pub fn category(&self) -> &'static str {
65 match self {
66 BackendError::Redis(_) => "redis",
67 BackendError::Serialization(_) => "serialization",
68 BackendError::NotFound(_) => "not_found",
69 BackendError::Connection(_) => "connection",
70 }
71 }
72}
73
74pub type Result<T> = std::result::Result<T, BackendError>;
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78pub enum TaskResult {
79 Pending,
81
82 Started,
84
85 Success(serde_json::Value),
87
88 Failure(String),
90
91 Revoked,
93
94 Retry(u32),
96}
97
98impl TaskResult {
99 pub fn is_pending(&self) -> bool {
101 matches!(self, TaskResult::Pending)
102 }
103
104 pub fn is_started(&self) -> bool {
106 matches!(self, TaskResult::Started)
107 }
108
109 pub fn is_success(&self) -> bool {
111 matches!(self, TaskResult::Success(_))
112 }
113
114 pub fn is_failure(&self) -> bool {
116 matches!(self, TaskResult::Failure(_))
117 }
118
119 pub fn is_revoked(&self) -> bool {
121 matches!(self, TaskResult::Revoked)
122 }
123
124 pub fn is_retry(&self) -> bool {
126 matches!(self, TaskResult::Retry(_))
127 }
128
129 pub fn is_terminal(&self) -> bool {
131 matches!(
132 self,
133 TaskResult::Success(_) | TaskResult::Failure(_) | TaskResult::Revoked
134 )
135 }
136
137 pub fn is_active(&self) -> bool {
139 !self.is_terminal()
140 }
141
142 pub fn same_variant(&self, other: &TaskResult) -> bool {
147 matches!(
148 (self, other),
149 (TaskResult::Pending, TaskResult::Pending)
150 | (TaskResult::Started, TaskResult::Started)
151 | (TaskResult::Success(_), TaskResult::Success(_))
152 | (TaskResult::Failure(_), TaskResult::Failure(_))
153 | (TaskResult::Revoked, TaskResult::Revoked)
154 | (TaskResult::Retry(_), TaskResult::Retry(_))
155 )
156 }
157
158 pub fn success_value(&self) -> Option<&serde_json::Value> {
160 match self {
161 TaskResult::Success(value) => Some(value),
162 _ => None,
163 }
164 }
165
166 pub fn failure_message(&self) -> Option<&str> {
168 match self {
169 TaskResult::Failure(msg) => Some(msg),
170 _ => None,
171 }
172 }
173
174 pub fn retry_count(&self) -> Option<u32> {
176 match self {
177 TaskResult::Retry(count) => Some(*count),
178 _ => None,
179 }
180 }
181}
182
183impl std::fmt::Display for TaskResult {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 match self {
186 TaskResult::Pending => write!(f, "PENDING"),
187 TaskResult::Started => write!(f, "STARTED"),
188 TaskResult::Success(_) => write!(f, "SUCCESS"),
189 TaskResult::Failure(err) => write!(f, "FAILURE: {}", err),
190 TaskResult::Revoked => write!(f, "REVOKED"),
191 TaskResult::Retry(count) => write!(f, "RETRY({})", count),
192 }
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct ProgressInfo {
199 pub current: u64,
201
202 pub total: u64,
204
205 pub message: Option<String>,
207
208 pub percent: f64,
210
211 pub updated_at: DateTime<Utc>,
213}
214
215impl ProgressInfo {
216 pub fn new(current: u64, total: u64) -> Self {
218 let percent = if total > 0 {
219 (current as f64 / total as f64 * 100.0).min(100.0)
220 } else {
221 0.0
222 };
223
224 Self {
225 current,
226 total,
227 message: None,
228 percent,
229 updated_at: Utc::now(),
230 }
231 }
232
233 pub fn with_message(mut self, message: String) -> Self {
235 self.message = Some(message);
236 self
237 }
238
239 pub fn is_complete(&self) -> bool {
241 self.percent >= 100.0
242 }
243
244 pub fn has_message(&self) -> bool {
246 self.message.is_some()
247 }
248
249 pub fn remaining(&self) -> u64 {
251 self.total.saturating_sub(self.current)
252 }
253
254 pub fn fraction(&self) -> f64 {
256 self.percent / 100.0
257 }
258}
259
260impl std::fmt::Display for ProgressInfo {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 write!(f, "{}/{} ({:.1}%)", self.current, self.total, self.percent)?;
263 if let Some(ref msg) = self.message {
264 write!(f, " - {}", msg)?;
265 }
266 Ok(())
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct TaskMeta {
273 pub task_id: Uuid,
275
276 pub task_name: String,
278
279 pub result: TaskResult,
281
282 pub created_at: DateTime<Utc>,
284
285 pub started_at: Option<DateTime<Utc>>,
287
288 pub completed_at: Option<DateTime<Utc>>,
290
291 pub worker: Option<String>,
293
294 #[serde(skip_serializing_if = "Option::is_none")]
296 pub progress: Option<ProgressInfo>,
297
298 #[serde(default)]
300 pub version: u32,
301
302 #[serde(default, skip_serializing_if = "Vec::is_empty")]
304 pub tags: Vec<String>,
305
306 #[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")]
308 pub metadata: std::collections::HashMap<String, serde_json::Value>,
309
310 #[serde(default, skip_serializing_if = "Option::is_none")]
312 pub worker_hostname: Option<String>,
313
314 #[serde(default, skip_serializing_if = "Option::is_none")]
316 pub runtime_ms: Option<u64>,
317
318 #[serde(default, skip_serializing_if = "Option::is_none")]
320 pub memory_bytes: Option<u64>,
321
322 #[serde(default, skip_serializing_if = "Option::is_none")]
324 pub retries: Option<u32>,
325
326 #[serde(default, skip_serializing_if = "Option::is_none")]
328 pub queue: Option<String>,
329}
330
331impl TaskMeta {
332 pub fn new(task_id: Uuid, task_name: String) -> Self {
333 Self {
334 task_id,
335 task_name,
336 result: TaskResult::Pending,
337 created_at: Utc::now(),
338 started_at: None,
339 completed_at: None,
340 worker: None,
341 progress: None,
342 version: 0,
343 tags: Vec::new(),
344 metadata: std::collections::HashMap::new(),
345 worker_hostname: None,
346 runtime_ms: None,
347 memory_bytes: None,
348 retries: None,
349 queue: None,
350 }
351 }
352
353 pub fn has_started(&self) -> bool {
355 self.started_at.is_some()
356 }
357
358 pub fn has_completed(&self) -> bool {
360 self.completed_at.is_some()
361 }
362
363 pub fn has_progress(&self) -> bool {
365 self.progress.is_some()
366 }
367
368 pub fn duration(&self) -> Option<chrono::Duration> {
370 match (self.started_at, self.completed_at) {
371 (Some(start), Some(end)) => Some(end - start),
372 _ => None,
373 }
374 }
375
376 pub fn age(&self) -> chrono::Duration {
378 Utc::now() - self.created_at
379 }
380
381 pub fn execution_time(&self) -> Option<chrono::Duration> {
383 self.started_at.map(|start| Utc::now() - start)
384 }
385
386 pub fn is_terminal(&self) -> bool {
388 self.result.is_terminal()
389 }
390
391 pub fn is_active(&self) -> bool {
393 self.result.is_active()
394 }
395
396 pub fn add_tag(&mut self, tag: impl Into<String>) {
398 let tag = tag.into();
399 if !self.tags.contains(&tag) {
400 self.tags.push(tag);
401 }
402 }
403
404 pub fn remove_tag(&mut self, tag: &str) {
406 self.tags.retain(|t| t != tag);
407 }
408
409 pub fn has_tag(&self, tag: &str) -> bool {
411 self.tags.iter().any(|t| t == tag)
412 }
413
414 pub fn has_any_tag(&self, tags: &[String]) -> bool {
416 tags.iter().any(|tag| self.has_tag(tag))
417 }
418
419 pub fn has_all_tags(&self, tags: &[String]) -> bool {
421 tags.iter().all(|tag| self.has_tag(tag))
422 }
423
424 pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
426 self.metadata.insert(key.into(), value);
427 }
428
429 pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
431 self.metadata.get(key)
432 }
433
434 pub fn remove_metadata(&mut self, key: &str) -> Option<serde_json::Value> {
436 self.metadata.remove(key)
437 }
438
439 pub fn has_metadata(&self, key: &str) -> bool {
441 self.metadata.contains_key(key)
442 }
443}
444
445impl std::fmt::Display for TaskMeta {
446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447 write!(
448 f,
449 "Task[{}] name={} result={}",
450 &self.task_id.to_string()[..8],
451 self.task_name,
452 self.result
453 )?;
454
455 if let Some(worker) = &self.worker {
456 write!(f, " worker={}", worker)?;
457 }
458
459 if let Some(progress) = &self.progress {
460 write!(f, " progress={}", progress)?;
461 }
462
463 Ok(())
464 }
465}
466
467#[derive(Debug, Clone)]
485pub struct TaskTtlConfig {
486 default_ttl: Option<Duration>,
488 task_ttls: HashMap<String, Duration>,
490}
491
492impl Default for TaskTtlConfig {
493 fn default() -> Self {
494 Self::new()
495 }
496}
497
498impl TaskTtlConfig {
499 pub fn new() -> Self {
501 Self {
502 default_ttl: None,
503 task_ttls: HashMap::new(),
504 }
505 }
506
507 pub fn with_default(ttl: Duration) -> Self {
509 Self {
510 default_ttl: Some(ttl),
511 task_ttls: HashMap::new(),
512 }
513 }
514
515 pub fn set_task_ttl(&mut self, task_name: &str, ttl: Duration) {
517 self.task_ttls.insert(task_name.to_string(), ttl);
518 }
519
520 pub fn get_ttl(&self, task_name: &str) -> Option<Duration> {
525 self.task_ttls.get(task_name).copied().or(self.default_ttl)
526 }
527
528 pub fn is_empty(&self) -> bool {
530 self.default_ttl.is_none() && self.task_ttls.is_empty()
531 }
532
533 pub fn default_ttl(&self) -> Option<Duration> {
535 self.default_ttl
536 }
537
538 pub fn set_default_ttl(&mut self, ttl: Duration) {
540 self.default_ttl = Some(ttl);
541 }
542
543 pub fn remove_task_ttl(&mut self, task_name: &str) -> Option<Duration> {
545 self.task_ttls.remove(task_name)
546 }
547
548 pub fn task_ttl_count(&self) -> usize {
550 self.task_ttls.len()
551 }
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize)]
556pub struct ChordState {
557 pub chord_id: Uuid,
559
560 pub total: usize,
562
563 pub completed: usize,
565
566 pub callback: Option<String>,
568
569 pub task_ids: Vec<Uuid>,
571
572 pub created_at: DateTime<Utc>,
574
575 #[serde(skip_serializing_if = "Option::is_none")]
577 pub timeout: Option<Duration>,
578
579 #[serde(default)]
581 pub cancelled: bool,
582
583 #[serde(skip_serializing_if = "Option::is_none")]
585 pub cancellation_reason: Option<String>,
586
587 #[serde(default)]
589 pub retry_count: u32,
590
591 #[serde(skip_serializing_if = "Option::is_none")]
593 pub max_retries: Option<u32>,
594}
595
596impl ChordState {
597 pub fn new(chord_id: Uuid, total: usize, task_ids: Vec<Uuid>) -> Self {
599 Self {
600 chord_id,
601 total,
602 completed: 0,
603 callback: None,
604 task_ids,
605 created_at: Utc::now(),
606 timeout: None,
607 cancelled: false,
608 cancellation_reason: None,
609 retry_count: 0,
610 max_retries: None,
611 }
612 }
613
614 pub fn with_timeout(mut self, timeout: Duration) -> Self {
616 self.timeout = Some(timeout);
617 self
618 }
619
620 pub fn with_callback(mut self, callback: String) -> Self {
622 self.callback = Some(callback);
623 self
624 }
625
626 pub fn is_complete(&self) -> bool {
628 self.completed >= self.total && !self.cancelled
629 }
630
631 pub fn is_cancelled(&self) -> bool {
633 self.cancelled
634 }
635
636 pub fn cancel(&mut self, reason: Option<String>) {
638 self.cancelled = true;
639 self.cancellation_reason = reason;
640 }
641
642 pub fn is_terminal(&self) -> bool {
644 self.is_complete() || self.is_cancelled() || self.is_timed_out()
645 }
646
647 pub fn is_timed_out(&self) -> bool {
649 if let Some(timeout) = self.timeout {
650 let age = Utc::now() - self.created_at;
651 age.num_milliseconds() > timeout.as_millis() as i64
652 } else {
653 false
654 }
655 }
656
657 pub fn remaining_timeout(&self) -> Option<Duration> {
659 self.timeout.and_then(|timeout| {
660 let age = Utc::now() - self.created_at;
661 let age_ms = age.num_milliseconds().max(0) as u64;
662 let timeout_ms = timeout.as_millis() as u64;
663
664 if age_ms < timeout_ms {
665 Some(Duration::from_millis(timeout_ms - age_ms))
666 } else {
667 None
668 }
669 })
670 }
671
672 pub fn remaining(&self) -> usize {
674 self.total.saturating_sub(self.completed)
675 }
676
677 pub fn percent_complete(&self) -> f64 {
679 if self.total > 0 {
680 (self.completed as f64 / self.total as f64 * 100.0).min(100.0)
681 } else {
682 0.0
683 }
684 }
685
686 pub fn has_callback(&self) -> bool {
688 self.callback.is_some()
689 }
690
691 pub fn has_timeout(&self) -> bool {
693 self.timeout.is_some()
694 }
695
696 pub fn task_count(&self) -> usize {
698 self.task_ids.len()
699 }
700
701 pub fn age(&self) -> chrono::Duration {
703 Utc::now() - self.created_at
704 }
705
706 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
708 self.max_retries = Some(max_retries);
709 self
710 }
711
712 pub fn can_retry(&self) -> bool {
714 if let Some(max_retries) = self.max_retries {
715 self.retry_count < max_retries
716 } else {
717 false
718 }
719 }
720
721 pub fn retry(&mut self) -> bool {
725 if !self.can_retry() {
726 return false;
727 }
728 self.retry_count += 1;
729 self.completed = 0;
730 self.cancelled = false;
731 self.cancellation_reason = None;
732 self.created_at = Utc::now();
733 true
734 }
735
736 pub fn remaining_retries(&self) -> Option<u32> {
738 self.max_retries
739 .map(|max| max.saturating_sub(self.retry_count))
740 }
741
742 pub fn is_retry(&self) -> bool {
744 self.retry_count > 0
745 }
746}
747
748impl std::fmt::Display for ChordState {
749 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
750 write!(
751 f,
752 "Chord[{}] {}/{} tasks ({:.1}%)",
753 &self.chord_id.to_string()[..8],
754 self.completed,
755 self.total,
756 self.percent_complete()
757 )?;
758
759 if let Some(ref callback) = self.callback {
760 write!(f, " callback={}", callback)?;
761 }
762
763 if self.is_cancelled() {
764 write!(f, " [CANCELLED")?;
765 if let Some(ref reason) = self.cancellation_reason {
766 write!(f, ": {}", reason)?;
767 }
768 write!(f, "]")?;
769 } else if let Some(timeout) = self.timeout {
770 if self.is_timed_out() {
771 write!(f, " [TIMED OUT]")?;
772 } else if let Some(remaining) = self.remaining_timeout() {
773 write!(f, " timeout={:?} remaining={:?}", timeout, remaining)?;
774 }
775 }
776
777 Ok(())
778 }
779}