1use futures::{future::BoxFuture, Stream};
2use serde::{Deserialize, Serialize};
3use tower::layer::util::Identity;
4
5use std::{fmt, fmt::Debug, pin::Pin, str::FromStr};
6
7use crate::{
8 backend::Backend,
9 codec::NoopCodec,
10 data::Extensions,
11 error::Error,
12 poller::Poller,
13 task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
14 worker::{Context, Worker},
15};
16
17#[derive(Serialize, Debug, Deserialize, Clone, Default)]
20pub struct Request<Args, Ctx> {
21 pub args: Args,
23 pub parts: Parts<Ctx>,
25}
26
27#[non_exhaustive]
29#[derive(Serialize, Debug, Deserialize, Clone, Default)]
30pub struct Parts<Ctx> {
31 pub task_id: TaskId,
33
34 #[serde(skip)]
36 pub data: Extensions,
37
38 pub attempt: Attempt,
40
41 pub context: Ctx,
43
44 #[serde(skip)]
46 pub namespace: Option<Namespace>,
47 }
49
50impl<T, Ctx> Request<T, Ctx> {
51 pub fn new(args: T) -> Self
53 where
54 Ctx: Default,
55 {
56 Self::new_with_data(args, Extensions::default(), Ctx::default())
57 }
58
59 pub fn new_with_parts(args: T, parts: Parts<Ctx>) -> Self {
61 Self { args, parts }
62 }
63
64 pub fn new_with_ctx(req: T, ctx: Ctx) -> Self {
66 Self {
67 args: req,
68 parts: Parts {
69 context: ctx,
70 task_id: Default::default(),
71 attempt: Default::default(),
72 data: Default::default(),
73 namespace: Default::default(),
74 },
75 }
76 }
77
78 pub fn new_with_data(req: T, data: Extensions, ctx: Ctx) -> Self {
80 Self {
81 args: req,
82 parts: Parts {
83 context: ctx,
84 task_id: Default::default(),
85 attempt: Default::default(),
86 data,
87 namespace: Default::default(),
88 },
89 }
90 }
91
92 pub fn take_parts(self) -> (T, Parts<Ctx>) {
94 (self.args, self.parts)
95 }
96}
97
98impl<T, Ctx> std::ops::Deref for Request<T, Ctx> {
99 type Target = Extensions;
100 fn deref(&self) -> &Self::Target {
101 &self.parts.data
102 }
103}
104
105impl<T, Ctx> std::ops::DerefMut for Request<T, Ctx> {
106 fn deref_mut(&mut self) -> &mut Self::Target {
107 &mut self.parts.data
108 }
109}
110
111#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)]
113pub enum State {
114 #[serde(alias = "Latest")]
116 Pending,
117 Scheduled,
119 Running,
121 Done,
123 Failed,
125 Killed,
127}
128
129impl Default for State {
130 fn default() -> Self {
131 State::Pending
132 }
133}
134
135impl FromStr for State {
136 type Err = Error;
137
138 fn from_str(s: &str) -> Result<Self, Self::Err> {
139 match s {
140 "Pending" | "Latest" => Ok(State::Pending),
141 "Running" => Ok(State::Running),
142 "Done" => Ok(State::Done),
143 "Failed" => Ok(State::Failed),
144 "Killed" => Ok(State::Killed),
145 "Scheduled" => Ok(State::Scheduled),
146 _ => Err(Error::MissingData("Invalid Job state".to_string())),
147 }
148 }
149}
150
151impl fmt::Display for State {
152 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153 match &self {
154 State::Pending => write!(f, "Pending"),
155 State::Running => write!(f, "Running"),
156 State::Done => write!(f, "Done"),
157 State::Failed => write!(f, "Failed"),
158 State::Killed => write!(f, "Killed"),
159 State::Scheduled => write!(f, "Scheduled"),
160 }
161 }
162}
163
164pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
166
167pub type RequestFuture<T> = BoxFuture<'static, T>;
169pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
171
172impl<T, Ctx> Backend<Request<T, Ctx>> for RequestStream<Request<T, Ctx>> {
173 type Stream = Self;
174
175 type Layer = Identity;
176
177 type Codec = NoopCodec<Request<T, Ctx>>;
178
179 fn poll(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
180 Poller {
181 stream: self,
182 heartbeat: Box::pin(futures::future::pending()),
183 layer: Identity::new(),
184 _priv: (),
185 }
186 }
187}