use super::Task;
use crate::error::ProtocolError;
use crate::protocol::Message;
use crate::Celery;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::time::Duration;
#[derive(Clone)]
pub struct Request<T>
where
T: Task,
{
pub app: Arc<Celery>,
pub id: String,
pub group: Option<String>,
pub chord: Option<String>,
pub correlation_id: String,
pub params: T::Params,
pub origin: Option<String>,
pub retries: u32,
pub eta: Option<DateTime<Utc>>,
pub expires: Option<DateTime<Utc>>,
pub hostname: Option<String>,
pub reply_to: Option<String>,
pub time_limit: Option<u32>,
}
impl<T> Request<T>
where
T: Task,
{
pub fn new(app: Arc<Celery>, m: Message, p: T::Params) -> Self {
let time_limit = match m.headers.timelimit {
(Some(soft_timelimit), Some(hard_timelimit)) => {
Some(std::cmp::min(soft_timelimit, hard_timelimit))
}
(Some(soft_timelimit), None) => Some(soft_timelimit),
(None, Some(hard_timelimit)) => Some(hard_timelimit),
_ => None,
};
Self {
app,
id: m.headers.id,
group: m.headers.group,
chord: None,
correlation_id: m.properties.correlation_id,
params: p,
origin: m.headers.origin,
retries: m.headers.retries.unwrap_or(0),
eta: m.headers.eta,
expires: m.headers.expires,
hostname: None,
reply_to: m.properties.reply_to,
time_limit,
}
}
pub fn is_delayed(&self) -> bool {
self.eta.is_some()
}
pub fn countdown(&self) -> Option<Duration> {
if let Some(eta) = self.eta {
let now = DateTime::<Utc>::from(SystemTime::now());
let countdown = (eta - now).num_milliseconds();
if countdown < 0 {
None
} else {
Some(Duration::from_millis(countdown as u64))
}
} else {
None
}
}
pub fn is_expired(&self) -> bool {
if let Some(expires) = self.expires {
let now = DateTime::<Utc>::from(SystemTime::now());
(now - expires).num_milliseconds() >= 0
} else {
false
}
}
pub fn try_from_message(app: Arc<Celery>, m: Message) -> Result<Self, ProtocolError> {
let body = m.body::<T>()?;
let (task_params, _) = body.parts();
Ok(Self::new(app, m, task_params))
}
}