apalis_core/
request.rs

1use futures::{future::BoxFuture, Stream};
2use serde::{Deserialize, Serialize};
3use tower::layer::util::Identity;
4
5use std::{fmt, fmt::Debug, pin::Pin, str::FromStr};
6
7use crate::{
8    backend::Backend,
9    data::Extensions,
10    error::Error,
11    poller::Poller,
12    task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
13    worker::{Context, Worker},
14};
15
16/// Represents a job which can be serialized and executed
17
18#[derive(Serialize, Debug, Deserialize, Clone, Default)]
19pub struct Request<Args, Ctx> {
20    /// The inner request part
21    pub args: Args,
22    /// Parts of the request eg id, attempts and context
23    pub parts: Parts<Ctx>,
24}
25
26/// Component parts of a `Request`
27#[non_exhaustive]
28#[derive(Serialize, Debug, Deserialize, Clone, Default)]
29pub struct Parts<Ctx> {
30    /// The request's id
31    pub task_id: TaskId,
32
33    /// The request's extensions
34    #[serde(skip)]
35    pub data: Extensions,
36
37    /// The request's attempts
38    pub attempt: Attempt,
39
40    /// The Context stored by the storage
41    pub context: Ctx,
42
43    /// Represents the namespace
44    #[serde(skip)]
45    pub namespace: Option<Namespace>,
46    //TODO: add State
47}
48
49impl<T, Ctx: Default> Request<T, Ctx> {
50    /// Creates a new [Request]
51    pub fn new(args: T) -> Self {
52        Self::new_with_data(args, Extensions::default(), Ctx::default())
53    }
54
55    /// Creates a request with all parts provided
56    pub fn new_with_parts(args: T, parts: Parts<Ctx>) -> Self {
57        Self { args, parts }
58    }
59
60    /// Creates a request with context provided
61    pub fn new_with_ctx(req: T, ctx: Ctx) -> Self {
62        Self {
63            args: req,
64            parts: Parts {
65                context: ctx,
66                ..Default::default()
67            },
68        }
69    }
70
71    /// Creates a request with data and context provided
72    pub fn new_with_data(req: T, data: Extensions, ctx: Ctx) -> Self {
73        Self {
74            args: req,
75            parts: Parts {
76                context: ctx,
77                data,
78                ..Default::default()
79            },
80        }
81    }
82
83    /// Take the parts
84    pub fn take_parts(self) -> (T, Parts<Ctx>) {
85        (self.args, self.parts)
86    }
87}
88
89impl<T, Ctx> std::ops::Deref for Request<T, Ctx> {
90    type Target = Extensions;
91    fn deref(&self) -> &Self::Target {
92        &self.parts.data
93    }
94}
95
96impl<T, Ctx> std::ops::DerefMut for Request<T, Ctx> {
97    fn deref_mut(&mut self) -> &mut Self::Target {
98        &mut self.parts.data
99    }
100}
101
102/// Represents the state of a job/task
103#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)]
104pub enum State {
105    /// Job is pending
106    #[serde(alias = "Latest")]
107    Pending,
108    /// Job is in the queue but not ready for execution
109    Scheduled,
110    /// Job is running
111    Running,
112    /// Job was done successfully
113    Done,
114    /// Job has failed. Check `last_error`
115    Failed,
116    /// Job has been killed
117    Killed,
118}
119
120impl Default for State {
121    fn default() -> Self {
122        State::Pending
123    }
124}
125
126impl FromStr for State {
127    type Err = Error;
128
129    fn from_str(s: &str) -> Result<Self, Self::Err> {
130        match s {
131            "Pending" | "Latest" => Ok(State::Pending),
132            "Running" => Ok(State::Running),
133            "Done" => Ok(State::Done),
134            "Failed" => Ok(State::Failed),
135            "Killed" => Ok(State::Killed),
136            "Scheduled" => Ok(State::Scheduled),
137            _ => Err(Error::MissingData("Invalid Job state".to_string())),
138        }
139    }
140}
141
142impl fmt::Display for State {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        match &self {
145            State::Pending => write!(f, "Pending"),
146            State::Running => write!(f, "Running"),
147            State::Done => write!(f, "Done"),
148            State::Failed => write!(f, "Failed"),
149            State::Killed => write!(f, "Killed"),
150            State::Scheduled => write!(f, "Scheduled"),
151        }
152    }
153}
154
155/// Represents a stream that is send
156pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
157
158/// Represents a result for a future that yields T
159pub type RequestFuture<T> = BoxFuture<'static, T>;
160/// Represents a stream for T.
161pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
162
163impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx>> {
164    type Stream = Self;
165
166    type Layer = Identity;
167
168    fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
169        Poller {
170            stream: self,
171            heartbeat: Box::pin(futures::future::pending()),
172            layer: Identity::new(),
173            _priv: (),
174        }
175    }
176}