celery/task/
request.rs

1use super::Task;
2use crate::error::ProtocolError;
3use crate::protocol::Message;
4use crate::Celery;
5use chrono::{DateTime, Utc};
6use std::sync::Arc;
7use std::time::SystemTime;
8use tokio::time::Duration;
9
10/// A [`Request`] contains information and state related to the currently executing task.
11#[derive(Clone)]
12pub struct Request<T>
13where
14    T: Task,
15{
16    pub app: Arc<Celery>,
17    /// The unique ID of the executing task.
18    pub id: String,
19
20    /// The unique ID of the task's group, if this task is a member.
21    pub group: Option<String>,
22
23    /// The unique ID of the chord this task belongs to (if the task is part of the header).
24    pub chord: Option<String>,
25
26    /// Custom ID used for things like de-duplication. Usually the same as `id`.
27    pub correlation_id: String,
28
29    /// Parameters used to call this task.
30    pub params: T::Params,
31
32    /// Name of the host that sent this task.
33    pub origin: Option<String>,
34
35    /// How many times the current task has been retried.
36    pub retries: u32,
37
38    /// The original ETA of the task.
39    pub eta: Option<DateTime<Utc>>,
40
41    /// The original expiration time of the task.
42    pub expires: Option<DateTime<Utc>>,
43
44    /// Node name of the worker instance executing the task.
45    pub hostname: Option<String>,
46
47    /// Where to send reply to (queue name).
48    pub reply_to: Option<String>,
49
50    /// The time limit (in seconds) allocated for this task to execute.
51    pub time_limit: Option<u32>,
52}
53
54impl<T> Request<T>
55where
56    T: Task,
57{
58    pub fn new(app: Arc<Celery>, m: Message, p: T::Params) -> Self {
59        let time_limit = match m.headers.timelimit {
60            (Some(soft_timelimit), Some(hard_timelimit)) => {
61                Some(std::cmp::min(soft_timelimit, hard_timelimit))
62            }
63            (Some(soft_timelimit), None) => Some(soft_timelimit),
64            (None, Some(hard_timelimit)) => Some(hard_timelimit),
65            _ => None,
66        };
67        Self {
68            app,
69            id: m.headers.id,
70            group: m.headers.group,
71            chord: None,
72            correlation_id: m.properties.correlation_id,
73            params: p,
74            origin: m.headers.origin,
75            retries: m.headers.retries.unwrap_or(0),
76            eta: m.headers.eta,
77            expires: m.headers.expires,
78            hostname: None,
79            reply_to: m.properties.reply_to,
80            time_limit,
81        }
82    }
83
84    /// Check if the request has a future ETA.
85    pub fn is_delayed(&self) -> bool {
86        self.eta.is_some()
87    }
88
89    /// Get the TTL in seconds if the task has a future ETA.
90    pub fn countdown(&self) -> Option<Duration> {
91        if let Some(eta) = self.eta {
92            let now = DateTime::<Utc>::from(SystemTime::now());
93            let countdown = (eta - now).num_milliseconds();
94            if countdown < 0 {
95                None
96            } else {
97                Some(Duration::from_millis(countdown as u64))
98            }
99        } else {
100            None
101        }
102    }
103
104    /// Check if the request is expired.
105    pub fn is_expired(&self) -> bool {
106        if let Some(expires) = self.expires {
107            let now = DateTime::<Utc>::from(SystemTime::now());
108            (now - expires).num_milliseconds() >= 0
109        } else {
110            false
111        }
112    }
113    pub fn try_from_message(app: Arc<Celery>, m: Message) -> Result<Self, ProtocolError> {
114        let body = m.body::<T>()?;
115        let (task_params, _) = body.parts();
116        Ok(Self::new(app, m, task_params))
117    }
118}