1use std::{borrow::Cow, time::SystemTime};
4
5use schemars::JsonSchema;
6use serde::{de::DeserializeOwned, Serialize};
7
8use crate::{
9 job_definition::{JobDefinition, RetryPolicy, TimeoutPolicy},
10 schedule_definition::ScheduleDefinition,
11 IndexMap,
12};
13
14pub trait JobType: Serialize + DeserializeOwned + JsonSchema + Send + Sync + 'static {
19 type Output: Serialize + DeserializeOwned + JsonSchema + Send + Sync + 'static;
21
22 fn id() -> &'static str;
27
28 #[must_use]
30 fn default_timeout_policy() -> TimeoutPolicy {
31 TimeoutPolicy::default()
32 }
33
34 #[must_use]
36 fn default_retry_policy() -> RetryPolicy {
37 RetryPolicy::default()
38 }
39
40 #[must_use]
42 fn default_labels() -> IndexMap<String, String> {
43 IndexMap::default()
44 }
45}
46
47pub trait JobTypeExt: JobType {
49 fn job(&self) -> TypedJobDefinition<Self>;
56
57 #[must_use]
59 fn metadata() -> JobTypeMetadata;
60}
61
62impl<J> JobTypeExt for J
63where
64 J: JobType,
65{
66 fn job(&self) -> TypedJobDefinition<Self> {
67 JobDefinition {
68 job_type_id: Self::id().into(),
69 target_execution_time: SystemTime::now(),
70 input_payload_json: serde_json::to_string(self).unwrap(),
71 labels: Self::default_labels(),
72 timeout_policy: Self::default_timeout_policy(),
73 retry_policy: Self::default_retry_policy(),
74 metadata_json: None,
75 }
76 .into()
77 }
78
79 fn metadata() -> JobTypeMetadata {
80 let input_schema = schemars::schema_for!(J);
81
82 JobTypeMetadata {
83 id: Self::id().into(),
84 name: input_schema
85 .schema
86 .metadata
87 .as_ref()
88 .and_then(|m| m.title.clone()),
89 description: input_schema
90 .schema
91 .metadata
92 .as_ref()
93 .and_then(|m| m.description.clone()),
94 input_schema_json: Some(serde_json::to_string(&input_schema).unwrap()),
95 output_schema_json: Some(
96 serde_json::to_string(&schemars::schema_for!(J::Output)).unwrap(),
97 ),
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
107#[must_use]
108pub struct TypedJobDefinition<J = ()> {
109 pub(crate) inner: JobDefinition,
110 _job_type: std::marker::PhantomData<fn() -> J>,
111}
112
113impl<J> TypedJobDefinition<J> {
114 pub fn at(mut self, target_execution_time: impl Into<std::time::SystemTime>) -> Self {
116 self.inner.target_execution_time = target_execution_time.into();
117 self
118 }
119
120 pub fn now(mut self) -> Self {
122 self.inner.target_execution_time = std::time::SystemTime::now();
123 self
124 }
125
126 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
128 self.inner.labels.insert(key.into(), value.into());
129 self
130 }
131
132 pub fn with_timeout(mut self, timeout: impl Into<std::time::Duration>) -> Self {
137 self.inner.timeout_policy.timeout = Some(timeout.into());
138 self
139 }
140
141 pub fn with_retries(mut self, retries: u64) -> Self {
143 self.inner.retry_policy.retries = retries;
144 self
145 }
146
147 pub fn into_inner(self) -> JobDefinition {
149 self.inner
150 }
151
152 pub fn cast_type<T>(self) -> TypedJobDefinition<T> {
154 TypedJobDefinition {
155 inner: self.inner,
156 _job_type: std::marker::PhantomData,
157 }
158 }
159}
160
161impl<J> TypedJobDefinition<J> {
162 pub fn repeat_every(self, interval: std::time::Duration) -> ScheduleDefinition {
165 self.inner.repeat_every(interval)
166 }
167
168 pub fn repeat_cron(
175 self,
176 cron_expression: impl Into<String>,
177 ) -> eyre::Result<ScheduleDefinition> {
178 self.inner.repeat_cron(cron_expression)
179 }
180}
181
182impl<J> TypedJobDefinition<J>
183where
184 J: JobType,
185{
186 #[must_use]
192 pub fn input(&self) -> J {
193 serde_json::from_str(&self.inner.input_payload_json).unwrap()
194 }
195
196 pub fn try_input(&self) -> Result<J, serde_json::Error> {
198 serde_json::from_str(&self.inner.input_payload_json)
199 }
200
201 pub fn with_input(mut self, input: impl AsRef<J>) -> Self {
207 self.inner.input_payload_json = serde_json::to_string(input.as_ref()).unwrap();
208 self
209 }
210
211 #[must_use]
214 pub fn is_valid(&self) -> bool {
215 self.inner.job_type_id == J::id() && self.try_input().is_ok()
216 }
217}
218
219impl<J> From<JobDefinition> for TypedJobDefinition<J> {
220 fn from(job_definition: JobDefinition) -> Self {
221 Self {
222 inner: job_definition,
223 _job_type: std::marker::PhantomData,
224 }
225 }
226}
227
228impl<J> From<TypedJobDefinition<J>> for JobDefinition {
229 fn from(typed: TypedJobDefinition<J>) -> Self {
230 typed.inner
231 }
232}
233
234impl JobDefinition {
235 pub fn into_typed_unknown(self) -> TypedJobDefinition {
238 TypedJobDefinition {
239 inner: self,
240 _job_type: std::marker::PhantomData,
241 }
242 }
243
244 pub fn into_typed<J>(self) -> TypedJobDefinition<J> {
247 TypedJobDefinition {
248 inner: self,
249 _job_type: std::marker::PhantomData,
250 }
251 }
252}
253
254#[derive(Debug, Clone)]
256pub struct JobTypeMetadata {
257 pub id: Cow<'static, str>,
259 pub name: Option<String>,
261 pub description: Option<String>,
263 pub input_schema_json: Option<String>,
265 pub output_schema_json: Option<String>,
267}