apalis_core/
request.rs

1use futures::{future::BoxFuture, Stream};
2use serde::{Deserialize, Serialize};
3use tower::layer::util::Identity;
4
5use std::{fmt, fmt::Debug, pin::Pin, str::FromStr};
6
7use crate::{
8    backend::Backend,
9    codec::NoopCodec,
10    data::Extensions,
11    error::Error,
12    poller::Poller,
13    task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
14    worker::{Context, Worker},
15};
16
17/// Represents a job which can be serialized and executed
18
19#[derive(Serialize, Debug, Deserialize, Clone, Default)]
20pub struct Request<Args, Ctx> {
21    /// The inner request part
22    pub args: Args,
23    /// Parts of the request eg id, attempts and context
24    pub parts: Parts<Ctx>,
25}
26
27/// Component parts of a `Request`
28#[non_exhaustive]
29#[derive(Serialize, Debug, Deserialize, Clone, Default)]
30pub struct Parts<Ctx> {
31    /// The request's id
32    pub task_id: TaskId,
33
34    /// The request's extensions
35    #[serde(skip)]
36    pub data: Extensions,
37
38    /// The request's attempts
39    pub attempt: Attempt,
40
41    /// The Context stored by the storage
42    pub context: Ctx,
43
44    /// Represents the namespace
45    #[serde(skip)]
46    pub namespace: Option<Namespace>,
47    //TODO: add State
48}
49
50impl<T, Ctx> Request<T, Ctx> {
51    /// Creates a new [Request]
52    pub fn new(args: T) -> Self
53    where
54        Ctx: Default,
55    {
56        Self::new_with_data(args, Extensions::default(), Ctx::default())
57    }
58
59    /// Creates a request with all parts provided
60    pub fn new_with_parts(args: T, parts: Parts<Ctx>) -> Self {
61        Self { args, parts }
62    }
63
64    /// Creates a request with context provided
65    pub fn new_with_ctx(req: T, ctx: Ctx) -> Self {
66        Self {
67            args: req,
68            parts: Parts {
69                context: ctx,
70                task_id: Default::default(),
71                attempt: Default::default(),
72                data: Default::default(),
73                namespace: Default::default(),
74            },
75        }
76    }
77
78    /// Creates a request with data and context provided
79    pub fn new_with_data(req: T, data: Extensions, ctx: Ctx) -> Self {
80        Self {
81            args: req,
82            parts: Parts {
83                context: ctx,
84                task_id: Default::default(),
85                attempt: Default::default(),
86                data,
87                namespace: Default::default(),
88            },
89        }
90    }
91
92    /// Take the parts
93    pub fn take_parts(self) -> (T, Parts<Ctx>) {
94        (self.args, self.parts)
95    }
96}
97
98impl<T, Ctx> std::ops::Deref for Request<T, Ctx> {
99    type Target = Extensions;
100    fn deref(&self) -> &Self::Target {
101        &self.parts.data
102    }
103}
104
105impl<T, Ctx> std::ops::DerefMut for Request<T, Ctx> {
106    fn deref_mut(&mut self) -> &mut Self::Target {
107        &mut self.parts.data
108    }
109}
110
111/// Represents the state of a job/task
112#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)]
113pub enum State {
114    /// Job is pending
115    #[serde(alias = "Latest")]
116    Pending,
117    /// Job is in the queue but not ready for execution
118    Scheduled,
119    /// Job is running
120    Running,
121    /// Job was done successfully
122    Done,
123    /// Job has failed. Check `last_error`
124    Failed,
125    /// Job has been killed
126    Killed,
127}
128
129impl Default for State {
130    fn default() -> Self {
131        State::Pending
132    }
133}
134
135impl FromStr for State {
136    type Err = Error;
137
138    fn from_str(s: &str) -> Result<Self, Self::Err> {
139        match s {
140            "Pending" | "Latest" => Ok(State::Pending),
141            "Running" => Ok(State::Running),
142            "Done" => Ok(State::Done),
143            "Failed" => Ok(State::Failed),
144            "Killed" => Ok(State::Killed),
145            "Scheduled" => Ok(State::Scheduled),
146            _ => Err(Error::MissingData("Invalid Job state".to_string())),
147        }
148    }
149}
150
151impl fmt::Display for State {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        match &self {
154            State::Pending => write!(f, "Pending"),
155            State::Running => write!(f, "Running"),
156            State::Done => write!(f, "Done"),
157            State::Failed => write!(f, "Failed"),
158            State::Killed => write!(f, "Killed"),
159            State::Scheduled => write!(f, "Scheduled"),
160        }
161    }
162}
163
164/// Represents a stream that is send
165pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
166
167/// Represents a result for a future that yields T
168pub type RequestFuture<T> = BoxFuture<'static, T>;
169/// Represents a stream for T.
170pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
171
172impl<T, Ctx> Backend<Request<T, Ctx>> for RequestStream<Request<T, Ctx>> {
173    type Stream = Self;
174
175    type Layer = Identity;
176
177    type Codec = NoopCodec<Request<T, Ctx>>;
178
179    fn poll(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
180        Poller {
181            stream: self,
182            heartbeat: Box::pin(futures::future::pending()),
183            layer: Identity::new(),
184            _priv: (),
185        }
186    }
187}