1use std::{borrow::Cow, time::SystemTime};
4
5use eyre::{Context, OptionExt};
6use ora_proto::{
7 common::v1::{JobRetryPolicy, JobTimeoutPolicy},
8 server::v1::{Job, JobExecutionStatus},
9};
10use serde::Serialize;
11use uuid::Uuid;
12
13use crate::{
14 schedule_definition::{
15 ScheduleDefinition, ScheduleJobTimingPolicy, ScheduleJobTimingPolicyCron,
16 ScheduleJobTimingPolicyRepeat,
17 },
18 IndexMap,
19};
20
21#[derive(Debug, Clone)]
24#[must_use]
25pub struct JobDefinition {
26 pub job_type_id: Cow<'static, str>,
28 pub target_execution_time: std::time::SystemTime,
32 pub input_payload_json: String,
34 pub labels: IndexMap<String, String>,
36 pub timeout_policy: TimeoutPolicy,
38 pub retry_policy: RetryPolicy,
40 pub metadata_json: Option<String>,
42}
43
44impl JobDefinition {
45 pub fn at(mut self, target_execution_time: impl Into<std::time::SystemTime>) -> Self {
47 self.target_execution_time = target_execution_time.into();
48 self
49 }
50
51 pub fn now(mut self) -> Self {
53 self.target_execution_time = std::time::SystemTime::now();
54 self
55 }
56
57 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
59 self.labels.insert(key.into(), value.into());
60 self
61 }
62
63 pub fn with_label_json(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
69 self.labels.insert(
70 key.into(),
71 serde_json::to_string(&value).expect("label serialization failed"),
72 );
73 self
74 }
75
76 pub fn with_timeout(mut self, timeout: impl Into<std::time::Duration>) -> Self {
81 self.timeout_policy.timeout = Some(timeout.into());
82 self
83 }
84
85 pub fn with_retries(mut self, retries: u64) -> Self {
87 self.retry_policy.retries = retries;
88 self
89 }
90
91 pub fn replace_metadata(mut self, metadata: impl Serialize) -> Self {
99 self.metadata_json =
100 Some(serde_json::to_string(&metadata).expect("metadata serialization failed"));
101 self
102 }
103}
104
105impl JobDefinition {
106 pub fn repeat_every(self, interval: std::time::Duration) -> ScheduleDefinition {
109 ScheduleDefinition {
110 job_timing_policy: ScheduleJobTimingPolicy::Repeat(ScheduleJobTimingPolicyRepeat {
111 interval,
112 immediate: false,
113 missed_time_policy: Default::default(),
114 }),
115 job_creation_policy:
116 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(self),
117 time_range: None,
118 labels: Default::default(),
119 metadata_json: None,
120 propagate_labels_to_jobs: true,
121 }
122 }
123
124 pub fn repeat_cron(
131 self,
132 cron_expression: impl Into<String>,
133 ) -> eyre::Result<ScheduleDefinition> {
134 let expr = cron_expression.into();
135
136 let mut parse_options = cronexpr::ParseOptions::default();
137 parse_options.fallback_timezone_option = cronexpr::FallbackTimezoneOption::UTC;
138
139 cronexpr::parse_crontab_with(&expr, parse_options)?;
140
141 Ok(ScheduleDefinition {
142 job_timing_policy: ScheduleJobTimingPolicy::Cron(ScheduleJobTimingPolicyCron {
143 cron_expression: expr,
144 immediate: false,
145 missed_time_policy: Default::default(),
146 }),
147 job_creation_policy:
148 crate::schedule_definition::ScheduleJobCreationPolicy::JobDefinition(self),
149 time_range: None,
150 labels: Default::default(),
151 metadata_json: None,
152 propagate_labels_to_jobs: true,
153 })
154 }
155}
156
157#[derive(Debug, Default, Clone, Copy)]
159pub struct TimeoutPolicy {
160 pub timeout: Option<std::time::Duration>,
162 pub base_time: TimeoutBaseTime,
166}
167
168#[derive(Debug, Default, Clone, Copy)]
170pub enum TimeoutBaseTime {
171 #[default]
173 StartTime,
174 TargetExecutionTime,
182}
183
184#[derive(Debug, Default, Clone, Copy)]
186pub struct RetryPolicy {
187 pub retries: u64,
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
195pub enum JobStatus {
196 Pending,
198 Ready,
200 Assigned,
202 Running,
204 Succeeded,
206 Failed,
208}
209
210impl JobStatus {
211 #[must_use]
213 pub fn is_terminal(&self) -> bool {
214 matches!(self, Self::Succeeded | Self::Failed)
215 }
216
217 #[must_use]
221 pub fn is_pending(&self) -> bool {
222 matches!(self, Self::Pending)
223 }
224
225 #[must_use]
229 pub fn is_ready(&self) -> bool {
230 matches!(self, Self::Ready)
231 }
232
233 #[must_use]
237 pub fn is_assigned(&self) -> bool {
238 matches!(self, Self::Assigned)
239 }
240
241 #[must_use]
245 pub fn is_running(&self) -> bool {
246 matches!(self, Self::Running)
247 }
248
249 #[must_use]
253 pub fn is_succeeded(&self) -> bool {
254 matches!(self, Self::Succeeded)
255 }
256
257 #[must_use]
261 pub fn is_failed(&self) -> bool {
262 matches!(self, Self::Failed)
263 }
264}
265
266impl From<JobExecutionStatus> for JobStatus {
267 fn from(status: JobExecutionStatus) -> Self {
268 match status {
269 JobExecutionStatus::Pending | JobExecutionStatus::Unspecified => Self::Pending,
270 JobExecutionStatus::Ready => Self::Ready,
271 JobExecutionStatus::Assigned => Self::Assigned,
272 JobExecutionStatus::Running => Self::Running,
273 JobExecutionStatus::Succeeded => Self::Succeeded,
274 JobExecutionStatus::Failed => Self::Failed,
275 }
276 }
277}
278
279impl From<JobStatus> for JobExecutionStatus {
280 fn from(status: JobStatus) -> Self {
281 match status {
282 JobStatus::Pending => JobExecutionStatus::Pending,
283 JobStatus::Ready => JobExecutionStatus::Ready,
284 JobStatus::Assigned => JobExecutionStatus::Assigned,
285 JobStatus::Running => JobExecutionStatus::Running,
286 JobStatus::Succeeded => JobExecutionStatus::Succeeded,
287 JobStatus::Failed => JobExecutionStatus::Failed,
288 }
289 }
290}
291
292#[derive(Debug, Clone)]
294pub struct JobDetails {
295 pub id: Uuid,
297 pub active: bool,
304 pub cancelled: bool,
306 pub job_type_id: String,
308 pub target_execution_time: SystemTime,
312 pub input_payload_json: String,
314 pub labels: IndexMap<String, String>,
316 pub timeout_policy: JobTimeoutPolicy,
318 pub retry_policy: JobRetryPolicy,
320 pub created_at: SystemTime,
322 pub executions: Vec<ExecutionDetails>,
324}
325
326impl JobDetails {
327 #[must_use]
329 pub fn status(&self) -> JobStatus {
330 let Some(last_execution) = self.executions.last() else {
331 return JobStatus::Pending;
332 };
333
334 last_execution.status
335 }
336}
337
338impl TryFrom<Job> for JobDetails {
339 type Error = eyre::Report;
340
341 fn try_from(value: Job) -> Result<Self, Self::Error> {
342 let Some(definition) = value.definition else {
343 return Err(eyre::eyre!("missing job definition"));
344 };
345
346 Ok(Self {
347 id: value.id.parse().wrap_err("invalid job ID")?,
348 active: value.active,
349 cancelled: value.cancelled,
350 job_type_id: definition.job_type_id,
351 target_execution_time: definition
352 .target_execution_time
353 .and_then(|t| t.try_into().ok())
354 .unwrap_or(SystemTime::UNIX_EPOCH),
355 input_payload_json: definition.input_payload_json,
356 labels: definition
357 .labels
358 .into_iter()
359 .map(|job_label| (job_label.key, job_label.value))
360 .collect(),
361 timeout_policy: definition.timeout_policy.unwrap_or_default(),
362 retry_policy: definition.retry_policy.unwrap_or_default(),
363 created_at: value
364 .created_at
365 .map(TryInto::try_into)
366 .transpose()?
367 .ok_or_eyre("missing creation time")?,
368 executions: value
369 .executions
370 .into_iter()
371 .map(TryInto::try_into)
372 .collect::<Result<_, _>>()?,
373 })
374 }
375}
376
377#[derive(Debug, Clone)]
380pub struct ExecutionDetails {
381 pub id: Uuid,
383 pub job_id: Uuid,
385 pub executor_id: Option<Uuid>,
387 pub status: JobStatus,
389 pub created_at: SystemTime,
391 pub ready_at: Option<SystemTime>,
393 pub assigned_at: Option<SystemTime>,
395 pub started_at: Option<SystemTime>,
397 pub succeeded_at: Option<SystemTime>,
399 pub failed_at: Option<SystemTime>,
401 pub output_payload_json: Option<String>,
403 pub failure_reason: Option<String>,
405}
406
407impl TryFrom<ora_proto::server::v1::JobExecution> for ExecutionDetails {
408 type Error = eyre::Report;
409
410 fn try_from(value: ora_proto::server::v1::JobExecution) -> Result<Self, Self::Error> {
411 Ok(Self {
412 status: value.status().into(),
413 id: value.id.parse().wrap_err("invalid execution ID")?,
414 job_id: value.job_id.parse().wrap_err("invalid job ID")?,
415 executor_id: value
416 .executor_id
417 .map(|id| id.parse())
418 .transpose()
419 .wrap_err("invalid executor ID")?,
420 created_at: value
421 .created_at
422 .map(TryInto::try_into)
423 .transpose()?
424 .ok_or_eyre("missing creation time")?,
425 ready_at: value.ready_at.map(TryInto::try_into).transpose()?,
426 assigned_at: value.assigned_at.map(TryInto::try_into).transpose()?,
427 started_at: value.started_at.map(TryInto::try_into).transpose()?,
428 succeeded_at: value.succeeded_at.map(TryInto::try_into).transpose()?,
429 failed_at: value.failed_at.map(TryInto::try_into).transpose()?,
430 output_payload_json: value.output_payload_json,
431 failure_reason: value.failure_reason,
432 })
433 }
434}