1use futures::{future::BoxFuture, Stream};
2use serde::{Deserialize, Serialize};
3use tower::layer::util::Identity;
4
5use std::{fmt, fmt::Debug, pin::Pin, str::FromStr};
6
7use crate::{
8 backend::Backend,
9 data::Extensions,
10 error::Error,
11 poller::Poller,
12 task::{attempt::Attempt, namespace::Namespace, task_id::TaskId},
13 worker::{Context, Worker},
14};
15
16#[derive(Serialize, Debug, Deserialize, Clone, Default)]
19pub struct Request<Args, Ctx> {
20 pub args: Args,
22 pub parts: Parts<Ctx>,
24}
25
26#[non_exhaustive]
28#[derive(Serialize, Debug, Deserialize, Clone, Default)]
29pub struct Parts<Ctx> {
30 pub task_id: TaskId,
32
33 #[serde(skip)]
35 pub data: Extensions,
36
37 pub attempt: Attempt,
39
40 pub context: Ctx,
42
43 #[serde(skip)]
45 pub namespace: Option<Namespace>,
46 }
48
49impl<T, Ctx: Default> Request<T, Ctx> {
50 pub fn new(args: T) -> Self {
52 Self::new_with_data(args, Extensions::default(), Ctx::default())
53 }
54
55 pub fn new_with_parts(args: T, parts: Parts<Ctx>) -> Self {
57 Self { args, parts }
58 }
59
60 pub fn new_with_ctx(req: T, ctx: Ctx) -> Self {
62 Self {
63 args: req,
64 parts: Parts {
65 context: ctx,
66 ..Default::default()
67 },
68 }
69 }
70
71 pub fn new_with_data(req: T, data: Extensions, ctx: Ctx) -> Self {
73 Self {
74 args: req,
75 parts: Parts {
76 context: ctx,
77 data,
78 ..Default::default()
79 },
80 }
81 }
82
83 pub fn take_parts(self) -> (T, Parts<Ctx>) {
85 (self.args, self.parts)
86 }
87}
88
89impl<T, Ctx> std::ops::Deref for Request<T, Ctx> {
90 type Target = Extensions;
91 fn deref(&self) -> &Self::Target {
92 &self.parts.data
93 }
94}
95
96impl<T, Ctx> std::ops::DerefMut for Request<T, Ctx> {
97 fn deref_mut(&mut self) -> &mut Self::Target {
98 &mut self.parts.data
99 }
100}
101
102#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, std::cmp::Eq)]
104pub enum State {
105 #[serde(alias = "Latest")]
107 Pending,
108 Scheduled,
110 Running,
112 Done,
114 Failed,
116 Killed,
118}
119
120impl Default for State {
121 fn default() -> Self {
122 State::Pending
123 }
124}
125
126impl FromStr for State {
127 type Err = Error;
128
129 fn from_str(s: &str) -> Result<Self, Self::Err> {
130 match s {
131 "Pending" | "Latest" => Ok(State::Pending),
132 "Running" => Ok(State::Running),
133 "Done" => Ok(State::Done),
134 "Failed" => Ok(State::Failed),
135 "Killed" => Ok(State::Killed),
136 "Scheduled" => Ok(State::Scheduled),
137 _ => Err(Error::MissingData("Invalid Job state".to_string())),
138 }
139 }
140}
141
142impl fmt::Display for State {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 match &self {
145 State::Pending => write!(f, "Pending"),
146 State::Running => write!(f, "Running"),
147 State::Done => write!(f, "Done"),
148 State::Failed => write!(f, "Failed"),
149 State::Killed => write!(f, "Killed"),
150 State::Scheduled => write!(f, "Scheduled"),
151 }
152 }
153}
154
155pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
157
158pub type RequestFuture<T> = BoxFuture<'static, T>;
160pub type RequestStream<T> = BoxStream<'static, Result<Option<T>, Error>>;
162
163impl<T, Res, Ctx> Backend<Request<T, Ctx>, Res> for RequestStream<Request<T, Ctx>> {
164 type Stream = Self;
165
166 type Layer = Identity;
167
168 fn poll<Svc>(self, _worker: &Worker<Context>) -> Poller<Self::Stream> {
169 Poller {
170 stream: self,
171 heartbeat: Box::pin(futures::future::pending()),
172 layer: Identity::new(),
173 _priv: (),
174 }
175 }
176}