1use super::Task;
2use crate::error::ProtocolError;
3use crate::protocol::Message;
4use crate::Celery;
5use chrono::{DateTime, Utc};
6use std::sync::Arc;
7use std::time::SystemTime;
8use tokio::time::Duration;
9
10#[derive(Clone)]
12pub struct Request<T>
13where
14 T: Task,
15{
16 pub app: Arc<Celery>,
17 pub id: String,
19
20 pub group: Option<String>,
22
23 pub chord: Option<String>,
25
26 pub correlation_id: String,
28
29 pub params: T::Params,
31
32 pub origin: Option<String>,
34
35 pub retries: u32,
37
38 pub eta: Option<DateTime<Utc>>,
40
41 pub expires: Option<DateTime<Utc>>,
43
44 pub hostname: Option<String>,
46
47 pub reply_to: Option<String>,
49
50 pub time_limit: Option<u32>,
52}
53
54impl<T> Request<T>
55where
56 T: Task,
57{
58 pub fn new(app: Arc<Celery>, m: Message, p: T::Params) -> Self {
59 let time_limit = match m.headers.timelimit {
60 (Some(soft_timelimit), Some(hard_timelimit)) => {
61 Some(std::cmp::min(soft_timelimit, hard_timelimit))
62 }
63 (Some(soft_timelimit), None) => Some(soft_timelimit),
64 (None, Some(hard_timelimit)) => Some(hard_timelimit),
65 _ => None,
66 };
67 Self {
68 app,
69 id: m.headers.id,
70 group: m.headers.group,
71 chord: None,
72 correlation_id: m.properties.correlation_id,
73 params: p,
74 origin: m.headers.origin,
75 retries: m.headers.retries.unwrap_or(0),
76 eta: m.headers.eta,
77 expires: m.headers.expires,
78 hostname: None,
79 reply_to: m.properties.reply_to,
80 time_limit,
81 }
82 }
83
84 pub fn is_delayed(&self) -> bool {
86 self.eta.is_some()
87 }
88
89 pub fn countdown(&self) -> Option<Duration> {
91 if let Some(eta) = self.eta {
92 let now = DateTime::<Utc>::from(SystemTime::now());
93 let countdown = (eta - now).num_milliseconds();
94 if countdown < 0 {
95 None
96 } else {
97 Some(Duration::from_millis(countdown as u64))
98 }
99 } else {
100 None
101 }
102 }
103
104 pub fn is_expired(&self) -> bool {
106 if let Some(expires) = self.expires {
107 let now = DateTime::<Utc>::from(SystemTime::now());
108 (now - expires).num_milliseconds() >= 0
109 } else {
110 false
111 }
112 }
113 pub fn try_from_message(app: Arc<Celery>, m: Message) -> Result<Self, ProtocolError> {
114 let body = m.body::<T>()?;
115 let (task_params, _) = body.parts();
116 Ok(Self::new(app, m, task_params))
117 }
118}