ora_storage/
lib.rs

1//! Storage interface for the Ora server.
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6    common::{
7        self,
8        v1::{
9            self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10            JobDefinition, TimeRange,
11        },
12    },
13    server::{self, v1::Job},
14    snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21/// Re-export the storage types.
22pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23/// Re-export the storage types.
24pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26/// An interface for storing and querying job and schedule data used
27/// by the Ora server.
28#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30    /// Add or update a job type.
31    async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32    /// Persist the given new jobs.
33    async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34    /// Cancel the given jobs.
35    ///
36    /// - Mark the job as cancelled.
37    /// - Mark the job as unschedulable.
38    ///
39    /// Returns the cancelled jobs with their active executions, if any.
40    async fn jobs_cancelled(
41        &self,
42        job_ids: &[Uuid],
43        timestamp: SystemTime,
44    ) -> eyre::Result<Vec<CancelledJob>>;
45    /// Add new executions.
46    async fn executions_added(
47        &self,
48        executions: Vec<NewExecution>,
49        timestamp: SystemTime,
50    ) -> eyre::Result<()>;
51    /// An execution is ready to be executed.
52    async fn executions_ready(
53        &self,
54        execution_ids: &[Uuid],
55        timestamp: SystemTime,
56    ) -> eyre::Result<()>;
57    /// An execution was assigned to an executor.
58    async fn execution_assigned(
59        &self,
60        execution_id: Uuid,
61        executor_id: Uuid,
62        timestamp: SystemTime,
63    ) -> eyre::Result<()>;
64    /// Set an execution as started.
65    async fn execution_started(
66        &self,
67        execution_id: Uuid,
68        timestamp: SystemTime,
69    ) -> eyre::Result<()>;
70    /// Set an execution as succeeded.
71    async fn execution_succeeded(
72        &self,
73        execution_id: Uuid,
74        timestamp: SystemTime,
75        output_payload_json: String,
76    ) -> eyre::Result<()>;
77    /// Set an execution as failed, if `mark_job_unschedulable` is false
78    /// the job is not marked as unschedulable, this happens when the job
79    /// is retried.
80    async fn executions_failed(
81        &self,
82        execution_ids: &[Uuid],
83        timestamp: SystemTime,
84        reason: String,
85        mark_job_unschedulable: bool,
86    ) -> eyre::Result<()>;
87
88    /// Return assigned executions that are not assigned to any of
89    /// the given executor IDs.
90    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
91
92    /// Mark a job as unschedulable.
93    ///
94    /// No more executions will be created for the jobs and the given IDs must not be
95    /// returned by the `pending_jobs` method. However existing executions are not affected.
96    async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
97        -> eyre::Result<()>;
98    /// Executions that are not yet ready to be executed.
99    ///
100    /// The returned values must be in ascending order,
101    /// an optional `after` parameter can be used to filter out values
102    /// that were created before the given UUID.
103    ///
104    /// If the `after` parameter is given, the stream must not include
105    /// the execution with the given UUID and any IDs that are smaller.
106    ///
107    /// The implementation may choose to return a limited
108    /// number of executions in a single call in order to avoid
109    /// resource exhaustion.
110    async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
111    /// Unassigned executions that are ready at the time of the call.
112    ///
113    /// The returned values must be in ascending order,
114    /// an optional `after` parameter can be used to filter out values
115    /// that were created before the given UUID.
116    ///
117    /// The returned executions must be ordered
118    /// by their ID in ascending order.
119    ///
120    /// The implementation may choose to return a limited
121    /// number of executions in a single call in order to avoid
122    /// resource exhaustion.
123    async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
124    /// Jobs that satisfy the following conditions:
125    ///
126    /// - have no executions in progress
127    /// - are active
128    ///
129    /// The returned values must be in ascending order,
130    /// an optional `after` parameter can be used to filter out values
131    /// that were created before the given UUID.
132    ///
133    /// The implementation may choose to return a limited
134    /// number of executions in a single call in order to avoid
135    /// resource exhaustion.
136    async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
137
138    /// Query and return a list of jobs with the given parameters.
139    ///
140    /// A next token can be provided to continue the query from the last result.
141    async fn query_jobs(
142        &self,
143        cursor: Option<String>,
144        limit: usize,
145        order: JobQueryOrder,
146        filters: JobQueryFilters,
147    ) -> eyre::Result<JobQueryResult>;
148
149    /// Query and return a list of job IDs with the given parameters.
150    async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
151
152    /// Count the number of jobs that satisfy the given filters.
153    async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
154
155    /// Query and return a list of job types with the given parameters.
156    async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
157
158    /// Remove jobs and all related data, returns the removed job IDs.
159    ///
160    /// This should also remove all executions created by the jobs.
161    async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
162
163    /// Persist the given new schedules.
164    async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
165
166    /// Cancel the given schedules.
167    ///
168    /// - Mark the schedule as cancelled.
169    /// - Mark the schedule as unschedulable.
170    ///
171    /// Returns the cancelled schedules.
172    async fn schedules_cancelled(
173        &self,
174        schedule_ids: &[Uuid],
175        timestamp: SystemTime,
176    ) -> eyre::Result<Vec<CancelledSchedule>>;
177
178    /// Mark the given schedules as unschedulable.
179    async fn schedules_unschedulable(
180        &self,
181        schedule_ids: &[Uuid],
182        timestamp: SystemTime,
183    ) -> eyre::Result<()>;
184
185    /// Return all pending schedules.
186    ///
187    /// A pending schedule is a schedule that
188    /// satisfies all the following conditions:
189    /// - is active
190    /// - has not been cancelled
191    /// - has no active jobs
192    ///
193    /// The returned values must be in ascending order,
194    /// an optional `after` parameter can be used to filter out values
195    /// that were created before the given UUID.
196    ///
197    /// The implementation may choose to return a limited
198    /// number of executions in a single call in order to avoid
199    /// resource exhaustion.
200    async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
201
202    /// Query and return a list of schedules with the given parameters.
203    ///
204    /// A next token can be provided to continue the query from the last result.
205    async fn query_schedules(
206        &self,
207        cursor: Option<String>,
208        limit: usize,
209        filters: ScheduleQueryFilters,
210        order: ScheduleQueryOrder,
211    ) -> eyre::Result<ScheduleQueryResult>;
212
213    /// Query and return a list of schedule IDs with the given parameters.
214    async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
215
216    /// Count the number of schedules that satisfy the given filters.
217    async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
218
219    /// Remove schedules and all related data, returns the removed schedule IDs.
220    ///
221    /// This should also remove all jobs created by the schedules.
222    async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
223}
224
225/// A trait for storages that support
226/// exporting and importing snapshots of their data.
227#[async_trait]
228pub trait StorageSnapshot {
229    /// Export a snapshot of the storage.
230    fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
231
232    /// Import a snapshot of the storage.
233    ///
234    /// The snapshot stream must be consumed to completion.
235    ///
236    /// Whether data is overwritten or merged is up to the implementation.
237    async fn import_snapshot(
238        &self,
239        snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
240    ) -> eyre::Result<()>;
241}
242
243/// Essential data for a job.
244#[derive(Debug, Clone)]
245pub struct NewJob {
246    /// The unique identifier of the job.
247    pub id: Uuid,
248    /// The schedule ID of the job.
249    pub schedule_id: Option<Uuid>,
250    /// The time the job was created.
251    pub created_at: SystemTime,
252    /// The job type ID.
253    pub job_type_id: String,
254    /// The target execution time of the job.
255    pub target_execution_time: SystemTime,
256    /// The input payload of the job.
257    pub input_payload_json: String,
258    /// Timeout policy for the job.
259    pub timeout_policy: JobTimeoutPolicy,
260    /// Retry policy for the job.
261    pub retry_policy: JobRetryPolicy,
262    /// Labels of the job.
263    pub labels: IndexMap<String, String>,
264    /// Arbitrary metadata in JSON format.
265    pub metadata_json: Option<String>,
266}
267
268/// A pending job.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct PendingJob {
271    /// The unique identifier of the job.
272    pub id: Uuid,
273    /// The target execution time of the job.
274    pub target_execution_time: SystemTime,
275    /// The number of previous executions.
276    pub execution_count: u64,
277    /// The retry policy for the job.
278    pub retry_policy: JobRetryPolicy,
279    /// The timeout policy for the job.
280    pub timeout_policy: JobTimeoutPolicy,
281}
282
283/// Job timeout policy.
284#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
285pub struct JobTimeoutPolicy {
286    /// The timeout in seconds.
287    pub timeout: Option<Duration>,
288    /// The base time for the timeout.
289    ///
290    /// The timeout is calculated from this time.
291    pub base_time: JobTimeoutBaseTime,
292}
293
294impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
295    fn from(proto: v1::JobTimeoutPolicy) -> Self {
296        Self {
297            timeout: proto.timeout.and_then(|d| d.try_into().ok()),
298            base_time: proto.base_time().into(),
299        }
300    }
301}
302
303impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
304    fn from(policy: JobTimeoutPolicy) -> Self {
305        Self {
306            timeout: policy.timeout.and_then(|d| d.try_into().ok()),
307            base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
308        }
309    }
310}
311
312impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
313    fn from(proto: v1::JobTimeoutBaseTime) -> Self {
314        match proto {
315            v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
316            v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
317                Self::StartTime
318            }
319        }
320    }
321}
322
323impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
324    fn from(base_time: JobTimeoutBaseTime) -> Self {
325        match base_time {
326            JobTimeoutBaseTime::StartTime => Self::StartTime,
327            JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
328        }
329    }
330}
331
332/// The base time for the timeout.
333#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
334pub enum JobTimeoutBaseTime {
335    /// The base time is the start time of the job.
336    #[default]
337    StartTime,
338    /// The base time is the target execution time of the job.
339    ///
340    /// Note that if the target execution time is not set,
341    /// the timeout is calculated from the start time of the job.
342    ///
343    /// If the target execution time is in the past,
344    /// the jobs may be immediately timed out.
345    TargetExecutionTime,
346}
347
348/// Job retry policy.
349#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
350pub struct JobRetryPolicy {
351    /// The number of retries for the job.
352    ///
353    /// If the number of retries is zero, the job is not retried.
354    pub retries: u64,
355}
356
357impl From<v1::JobRetryPolicy> for JobRetryPolicy {
358    fn from(proto: v1::JobRetryPolicy) -> Self {
359        Self {
360            retries: proto.retries,
361        }
362    }
363}
364
365impl From<JobRetryPolicy> for v1::JobRetryPolicy {
366    fn from(policy: JobRetryPolicy) -> Self {
367        Self {
368            retries: policy.retries,
369        }
370    }
371}
372
373/// A new pending execution.
374#[derive(Debug, Clone)]
375pub struct NewExecution {
376    /// The unique identifier of the execution.
377    pub id: Uuid,
378    /// The unique identifier of the job.
379    pub job_id: Uuid,
380    /// Attempt number of the execution.
381    pub attempt_number: u64,
382    /// The target execution time of the job.
383    pub target_execution_time: SystemTime,
384}
385
386/// A new pending execution.
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub struct PendingExecution {
389    /// The unique identifier of the execution.
390    pub id: Uuid,
391    /// The target execution time of the job.
392    pub target_execution_time: SystemTime,
393}
394
395/// An execution that is ready to be executed.
396#[derive(Debug, Clone, PartialEq, Eq)]
397pub struct ReadyExecution {
398    /// The unique identifier of the execution.
399    pub id: Uuid,
400    /// The unique identifier of the job.
401    pub job_id: Uuid,
402    /// The input payload of the job.
403    pub input_payload_json: String,
404    /// Attempt number of the execution.
405    pub attempt_number: u64,
406    /// The job type ID.
407    pub job_type_id: String,
408    /// The target execution time of the job.
409    pub target_execution_time: SystemTime,
410    /// Timeout policy for the job.
411    pub timeout_policy: JobTimeoutPolicy,
412}
413
414/// A job type.
415#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct JobType {
417    /// The ID of the job type.
418    pub id: String,
419    /// The name of the job type.
420    pub name: String,
421    /// The description of the job type.
422    pub description: String,
423    /// The input schema of the job type.
424    pub input_schema_json: Option<String>,
425    /// The output schema of the job type.
426    pub output_schema_json: Option<String>,
427}
428
429/// Filters for querying jobs.
430#[derive(Debug, Default, Clone, Serialize, Deserialize)]
431pub struct JobQueryFilters {
432    /// Job IDs to filter the query results by.
433    pub job_ids: Option<IndexSet<Uuid>>,
434    /// Job type IDs to filter the query results by.
435    pub job_type_ids: Option<IndexSet<String>>,
436    /// Execution IDs to filter the query results by.
437    pub execution_ids: Option<IndexSet<Uuid>>,
438    /// Schedule IDs to filter the query results by.
439    pub schedule_ids: Option<IndexSet<Uuid>>,
440    /// Execution status to filter the query results by.
441    pub execution_status: Option<IndexSet<JobExecutionStatus>>,
442    /// Labels to filter the query results by.
443    pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
444    /// Whether to return active or inactive jobs only.
445    pub active: Option<bool>,
446    /// Jobs created after the given time, inclusive.
447    pub created_after: Option<SystemTime>,
448    /// Jobs created before the given time, exclusive.
449    pub created_before: Option<SystemTime>,
450    /// Jobs with target execution time after the given time, inclusive.
451    pub target_execution_time_after: Option<SystemTime>,
452    /// Jobs with target execution time before the given time, exclusive.
453    pub target_execution_time_before: Option<SystemTime>,
454}
455
456/// The order of jobs returned.
457#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
458pub enum JobQueryOrder {
459    /// Order by the time the job was created in ascending order.
460    CreatedAtAsc,
461    /// Order by the time the job was created in descending order.
462    CreatedAtDesc,
463    /// Order by the target execution time in ascending order.
464    TargetExecutionTimeAsc,
465    /// Order by the target execution time in descending order.
466    TargetExecutionTimeDesc,
467}
468
469/// The results of a job query.
470#[derive(Debug, Clone)]
471pub struct JobQueryResult {
472    /// The jobs that satisfy the query.
473    pub jobs: Vec<JobDetails>,
474    /// A cursor that can be used to continue the query.
475    pub cursor: Option<String>,
476    /// Whether there are more results to query.
477    pub has_more: bool,
478}
479
480/// Job status used for querying jobs.
481#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
482pub enum JobExecutionStatus {
483    /// The job execution is pending and is not ready to run.
484    Pending,
485    /// The job execution is ready to run.
486    Ready,
487    /// The job execution is assigned to an executor.
488    Assigned,
489    /// The job execution is running.
490    Running,
491    /// The job execution is completed successfully.
492    Succeeded,
493    /// The job execution is failed.
494    Failed,
495}
496
497/// Job label filter.
498#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
499pub enum JobLabelFilterValue {
500    /// The label exists with any value.
501    Exists,
502    /// The label does not exist.
503    Equals(String),
504}
505
506/// All core information about a job.
507#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct JobDetails {
509    /// Whether the job is active.
510    pub active: bool,
511    /// Whether the job was cancelled.
512    pub cancelled: bool,
513    /// The unique identifier of the job.
514    pub id: Uuid,
515    /// The ID of the job type.
516    pub job_type_id: String,
517    /// The schedule ID of the job.
518    pub schedule_id: Option<Uuid>,
519    /// The target execution time of the job.
520    ///
521    /// If not provided, it should be set to the current time.
522    pub target_execution_time: SystemTime,
523    /// The job input payload JSON that is passed to the executor.
524    pub input_payload_json: String,
525    /// The labels of the job.
526    pub labels: IndexMap<String, String>,
527    /// The timeout policy of the job.
528    pub timeout_policy: JobTimeoutPolicy,
529    /// Retry policy for the job.
530    pub retry_policy: JobRetryPolicy,
531    /// The creation time of the job.
532    pub created_at: SystemTime,
533    /// A list of executions for the job.
534    pub executions: Vec<ExecutionDetails>,
535    /// Arbitrary metadata in JSON format.
536    pub metadata_json: Option<String>,
537}
538
539/// All core information about an execution
540/// that is associated with a job.
541#[derive(Debug, Clone, PartialEq, Eq)]
542pub struct ExecutionDetails {
543    /// The ID of the job execution.
544    pub id: Uuid,
545    /// The ID of the job.
546    pub job_id: Uuid,
547    /// The ID of the associated executor.
548    pub executor_id: Option<Uuid>,
549    /// The status of the job execution.
550    pub status: JobExecutionStatus,
551    /// The time the job execution was created.
552    pub created_at: SystemTime,
553    /// The time the job execution was marked as ready.
554    pub ready_at: Option<SystemTime>,
555    /// The time the job execution was assigned to an executor.
556    pub assigned_at: Option<SystemTime>,
557    /// The time the job execution has started.
558    pub started_at: Option<SystemTime>,
559    /// The time the job execution has succeeded.
560    pub succeeded_at: Option<SystemTime>,
561    /// The time the job execution has failed.
562    pub failed_at: Option<SystemTime>,
563    /// The output payload of the execution.
564    pub output_payload_json: Option<String>,
565    /// The error message of the execution.
566    pub failure_reason: Option<String>,
567}
568
569/// A job that was cancelled.
570#[derive(Debug, Clone, PartialEq, Eq)]
571pub struct CancelledJob {
572    /// The unique identifier of the job.
573    pub id: Uuid,
574    /// Active execution of the job.
575    pub active_execution: Option<Uuid>,
576}
577
578/// A new schedule.
579#[derive(Debug, Clone)]
580pub struct NewSchedule {
581    /// The unique identifier of the schedule.
582    pub id: Uuid,
583    /// The time the schedule was created.
584    pub created_at: SystemTime,
585    /// Scheduling policy for the schedule.
586    pub job_timing_policy: ScheduleJobTimingPolicy,
587    /// Policy for new jobs created by the schedule.
588    pub job_creation_policy: ScheduleJobCreationPolicy,
589    /// Labels of the job.
590    pub labels: IndexMap<String, String>,
591    /// The time range for the schedule.
592    pub time_range: Option<ScheduleTimeRange>,
593    /// Arbitrary metadata in JSON format.
594    pub metadata_json: Option<String>,
595}
596
597/// Scheduling policy for a schedule.
598#[derive(Debug, Clone, PartialEq, Eq)]
599pub enum ScheduleJobTimingPolicy {
600    /// A schedule that repeats.
601    Repeat(SchedulingPolicyRepeat),
602    /// A schedule based on a cron expression.
603    Cron(SchedulingPolicyCron),
604}
605
606/// Scheduling policy for a schedule that repeats.
607#[derive(Debug, Clone, PartialEq, Eq)]
608pub struct SchedulingPolicyRepeat {
609    /// The interval between each job.
610    pub interval: Duration,
611    /// Whether the schedule should create a job immediately.
612    pub immediate: bool,
613    /// The policy for missed jobs.
614    pub missed_policy: ScheduleMissedTimePolicy,
615}
616
617/// Scheduling policy based on a cron expression.
618#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct SchedulingPolicyCron {
620    /// The cron expression.
621    pub cron_expression: String,
622    /// Whether the schedule should create a job immediately.
623    pub immediate: bool,
624    /// The policy for missed jobs.
625    pub missed_policy: ScheduleMissedTimePolicy,
626}
627
628/// Policy for missed jobs.
629#[derive(Debug, Clone, PartialEq, Eq)]
630pub enum ScheduleMissedTimePolicy {
631    /// Skip any missed times.
632    Skip,
633    /// Create a job for each missed time.
634    Create,
635}
636
637/// Policy for new jobs created by a schedule.
638#[derive(Debug, Clone, PartialEq, Eq)]
639pub enum ScheduleJobCreationPolicy {
640    /// Create a new job from the given job definition.
641    JobDefinition(ScheduleNewJobDefinition),
642}
643
644/// A job definition for a new job created by a schedule.
645#[derive(Debug, Clone, PartialEq, Eq)]
646pub struct ScheduleNewJobDefinition {
647    /// The job type ID.
648    pub job_type_id: String,
649    /// The input payload of the job.
650    pub input_payload_json: String,
651    /// Timeout policy for the job.
652    pub timeout_policy: JobTimeoutPolicy,
653    /// Retry policy for the job.
654    pub retry_policy: JobRetryPolicy,
655    /// Labels of the job.
656    pub labels: IndexMap<String, String>,
657}
658
659/// A schedule that was cancelled.
660#[derive(Debug, Clone, PartialEq, Eq)]
661pub struct CancelledSchedule {
662    /// The unique identifier of the schedule.
663    pub id: Uuid,
664}
665
666/// A pending schedule.
667#[derive(Debug, Clone, PartialEq, Eq)]
668pub struct PendingSchedule {
669    /// The unique identifier of the schedule.
670    pub id: Uuid,
671    /// Scheduling policy for the schedule.
672    pub job_timing_policy: ScheduleJobTimingPolicy,
673    /// Policy for new jobs created by the schedule.
674    pub job_creation_policy: ScheduleJobCreationPolicy,
675    /// The target execution time of the last
676    /// job created by the schedule, if any.
677    pub last_target_execution_time: Option<SystemTime>,
678    /// The time range for the schedule.
679    pub time_range: Option<ScheduleTimeRange>,
680}
681
682/// Filters for querying schedules.
683#[derive(Debug, Default, Clone, Serialize, Deserialize)]
684pub struct ScheduleQueryFilters {
685    /// Schedule IDs to filter the query results by.
686    pub schedule_ids: Option<IndexSet<Uuid>>,
687    /// Job IDs to filter the query results by.
688    pub job_ids: Option<IndexSet<Uuid>>,
689    /// Job type IDs to filter the query results by.
690    pub job_type_ids: Option<IndexSet<String>>,
691    /// Labels to filter the query results by.
692    pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
693    /// Whether to return active or inactive schedules only.
694    pub active: Option<bool>,
695    /// Schedules created after the given time, inclusive.
696    pub created_after: Option<SystemTime>,
697    /// Schedules created before the given time, exclusive.
698    pub created_before: Option<SystemTime>,
699}
700
701/// The order of jobs returned.
702#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
703pub enum ScheduleQueryOrder {
704    /// Order by the time the job was created in ascending order.
705    CreatedAtAsc,
706    /// Order by the time the job was created in descending order.
707    CreatedAtDesc,
708}
709
710/// The results of a schedule query.
711#[derive(Debug, Clone)]
712pub struct ScheduleQueryResult {
713    /// The schedules that satisfy the query.
714    pub schedules: Vec<ScheduleDetails>,
715    /// A cursor that can be used to continue the query.
716    pub cursor: Option<String>,
717    /// Whether there are more results to query.
718    pub has_more: bool,
719}
720
721/// Details of a schedule.
722///
723/// Associated jobs are not included on purpose
724/// as there can be many jobs associated with a schedule,
725/// additional queries can be made to get the jobs.
726#[derive(Debug, Clone, PartialEq, Eq)]
727pub struct ScheduleDetails {
728    /// The unique identifier of the schedule.
729    pub id: Uuid,
730    /// The time the schedule was created.
731    pub created_at: SystemTime,
732    /// Scheduling policy for the schedule.
733    pub job_timing_policy: ScheduleJobTimingPolicy,
734    /// Policy for new jobs created by the schedule.
735    pub job_creation_policy: ScheduleJobCreationPolicy,
736    /// Labels of the schedule.
737    pub labels: IndexMap<String, String>,
738    /// Whether the schedule is active.
739    pub active: bool,
740    /// Whether the schedule was cancelled.
741    pub cancelled: bool,
742    /// The time range for the schedule.
743    pub time_range: Option<ScheduleTimeRange>,
744    /// Arbitrary metadata in JSON format.
745    pub metadata_json: Option<String>,
746}
747
748/// Schedule label filter.
749#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
750pub enum ScheduleLabelFilterValue {
751    /// The label exists with any value.
752    Exists,
753    /// The label does not exist.
754    Equals(String),
755}
756
757/// The time range for a schedule.
758#[derive(Debug, Clone, Copy, PartialEq, Eq)]
759pub struct ScheduleTimeRange {
760    /// The schedule must not start before this time.
761    pub start: Option<SystemTime>,
762    /// The schedule must end before this time.
763    pub end: Option<SystemTime>,
764}
765
766impl ScheduleTimeRange {
767    /// Check if the given time is within the time range.
768    #[must_use]
769    pub fn contains(&self, time: SystemTime) -> bool {
770        if let Some(start) = self.start {
771            if time < start {
772                return false;
773            }
774        }
775        if let Some(end) = self.end {
776            if time >= end {
777                return false;
778            }
779        }
780        true
781    }
782
783    /// Check if the time range is valid.
784    #[must_use]
785    pub fn is_valid(&self) -> bool {
786        if let (Some(start), Some(end)) = (self.start, self.end) {
787            start < end
788        } else {
789            true
790        }
791    }
792}
793
794impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
795    fn from(value: ScheduleJobTimingPolicy) -> Self {
796        match value {
797            ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
798                job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
799                    common::v1::ScheduleJobTimingPolicyRepeat {
800                        interval: Some(policy.interval.try_into().unwrap()),
801                        immediate: policy.immediate,
802                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
803                            policy.missed_policy,
804                        )
805                        .into(),
806                    },
807                )),
808            },
809            ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
810                job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
811                    common::v1::ScheduleJobTimingPolicyCron {
812                        cron_expression: policy.cron_expression,
813                        immediate: policy.immediate,
814                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
815                            policy.missed_policy,
816                        )
817                        .into(),
818                    },
819                )),
820            },
821        }
822    }
823}
824
825impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
826    type Error = Status;
827
828    fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
829        let value = value.job_timing.ok_or_else(|| {
830            Status::invalid_argument("missing job timing policy in schedule definition")
831        })?;
832
833        match value {
834            schedule_job_timing_policy::JobTiming::Repeat(policy) => {
835                Ok(Self::Repeat(SchedulingPolicyRepeat {
836                    interval: policy
837                        .interval
838                        .ok_or_else(|| {
839                            Status::invalid_argument("missing interval in repeat policy")
840                        })?
841                        .try_into()
842                        .map_err(|_| Status::invalid_argument("invalid interval"))?,
843                    immediate: policy.immediate,
844                    missed_policy: policy.missed_time_policy().into(),
845                }))
846            }
847            schedule_job_timing_policy::JobTiming::Cron(policy) => {
848                Ok(Self::Cron(SchedulingPolicyCron {
849                    missed_policy: policy.missed_time_policy().into(),
850                    cron_expression: {
851                        let mut parse_options = cronexpr::ParseOptions::default();
852                        parse_options.fallback_timezone_option =
853                            cronexpr::FallbackTimezoneOption::UTC;
854
855                        // Validate the cron expression.
856                        cronexpr::parse_crontab_with(&policy.cron_expression, parse_options)
857                            .map_err(|err| {
858                                Status::invalid_argument(format!("invalid cron expression: {err}"))
859                            })?;
860
861                        policy.cron_expression
862                    },
863                    immediate: policy.immediate,
864                }))
865            }
866        }
867    }
868}
869
870impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
871    fn from(value: ScheduleJobCreationPolicy) -> Self {
872        match value {
873            ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
874                common::v1::ScheduleJobCreationPolicy {
875                    job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
876                        job_type_id: job_definition.job_type_id,
877                        input_payload_json: job_definition.input_payload_json,
878                        target_execution_time: None,
879                        retry_policy: Some(common::v1::JobRetryPolicy {
880                            retries: job_definition.retry_policy.retries,
881                        }),
882                        timeout_policy: Some(common::v1::JobTimeoutPolicy {
883                            timeout: job_definition
884                                .timeout_policy
885                                .timeout
886                                .and_then(|d| d.try_into().ok()),
887                            base_time: match job_definition.timeout_policy.base_time {
888                                JobTimeoutBaseTime::StartTime => {
889                                    common::v1::JobTimeoutBaseTime::StartTime.into()
890                                }
891                                JobTimeoutBaseTime::TargetExecutionTime => {
892                                    common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
893                                }
894                            },
895                        }),
896                        labels: job_definition
897                            .labels
898                            .into_iter()
899                            .map(|(key, value)| common::v1::JobLabel { key, value })
900                            .collect(),
901                        metadata_json: None,
902                    })),
903                }
904            }
905        }
906    }
907}
908
909impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
910    type Error = Status;
911
912    fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
913        let value = value.job_creation.ok_or_else(|| {
914            Status::invalid_argument("missing job creation policy in schedule definition")
915        })?;
916
917        match value {
918            JobCreation::JobDefinition(job_definition) => {
919                Ok(Self::JobDefinition(ScheduleNewJobDefinition {
920                    job_type_id: job_definition.job_type_id,
921                    input_payload_json: job_definition.input_payload_json,
922                    timeout_policy: job_definition
923                        .timeout_policy
924                        .map(Into::into)
925                        .unwrap_or_default(),
926                    retry_policy: job_definition
927                        .retry_policy
928                        .map(Into::into)
929                        .unwrap_or_default(),
930                    labels: job_definition
931                        .labels
932                        .into_iter()
933                        .map(|label| (label.key, label.value))
934                        .collect(),
935                }))
936            }
937        }
938    }
939}
940
941impl From<ScheduleTimeRange> for TimeRange {
942    fn from(value: ScheduleTimeRange) -> Self {
943        Self {
944            start: value.start.map(Into::into),
945            end: value.end.map(Into::into),
946        }
947    }
948}
949
950impl TryFrom<TimeRange> for ScheduleTimeRange {
951    type Error = Status;
952
953    fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
954        Ok(Self {
955            start: value
956                .start
957                .map(SystemTime::try_from)
958                .transpose()
959                .map_err(|error| {
960                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
961                })?,
962            end: value
963                .end
964                .map(SystemTime::try_from)
965                .transpose()
966                .map_err(|error| {
967                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
968                })?,
969        })
970    }
971}
972
973impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
974    fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
975        match value {
976            common::v1::ScheduleMissedTimePolicy::Skip
977            | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
978            common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
979        }
980    }
981}
982
983impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
984    fn from(value: ScheduleMissedTimePolicy) -> Self {
985        match value {
986            ScheduleMissedTimePolicy::Skip => Self::Skip,
987            ScheduleMissedTimePolicy::Create => Self::Create,
988        }
989    }
990}
991
992impl From<server::v1::JobQueryOrder> for JobQueryOrder {
993    fn from(value: server::v1::JobQueryOrder) -> Self {
994        match value {
995            server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
996            server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
997                Self::CreatedAtDesc
998            }
999            server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
1000            server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
1001        }
1002    }
1003}
1004
1005impl From<JobDetails> for Job {
1006    fn from(job: JobDetails) -> Self {
1007        Job {
1008            id: job.id.to_string(),
1009            schedule_id: job.schedule_id.map(|t| t.to_string()),
1010            cancelled: job.cancelled,
1011            active: job.active,
1012            definition: Some(JobDefinition {
1013                job_type_id: job.job_type_id,
1014                input_payload_json: job.input_payload_json,
1015                target_execution_time: Some(job.target_execution_time.into()),
1016                retry_policy: Some(common::v1::JobRetryPolicy {
1017                    retries: job.retry_policy.retries,
1018                }),
1019                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1020                    timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1021                    base_time: match job.timeout_policy.base_time {
1022                        JobTimeoutBaseTime::StartTime => {
1023                            common::v1::JobTimeoutBaseTime::StartTime.into()
1024                        }
1025                        JobTimeoutBaseTime::TargetExecutionTime => {
1026                            common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1027                        }
1028                    },
1029                }),
1030                labels: job
1031                    .labels
1032                    .into_iter()
1033                    .map(|(key, value)| common::v1::JobLabel { key, value })
1034                    .collect(),
1035                metadata_json: job.metadata_json,
1036            }),
1037            created_at: Some(job.created_at.into()),
1038            executions: job.executions.into_iter().map(Into::into).collect(),
1039        }
1040    }
1041}
1042
1043impl From<ExecutionDetails> for server::v1::JobExecution {
1044    fn from(value: ExecutionDetails) -> Self {
1045        Self {
1046            id: value.id.to_string(),
1047            job_id: value.job_id.to_string(),
1048            executor_id: value.executor_id.map(|t| t.to_string()),
1049            status: match value.status {
1050                JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1051                JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1052                JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1053                JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1054                JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1055                JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1056            }
1057            .into(),
1058            created_at: Some(value.created_at.into()),
1059            ready_at: value.ready_at.map(Into::into),
1060            assigned_at: value.assigned_at.map(Into::into),
1061            started_at: value.started_at.map(Into::into),
1062            succeeded_at: value.succeeded_at.map(Into::into),
1063            failed_at: value.failed_at.map(Into::into),
1064            output_payload_json: value.output_payload_json,
1065            failure_reason: value.failure_reason,
1066        }
1067    }
1068}
1069
1070impl From<JobType> for ora_proto::common::v1::JobType {
1071    fn from(value: JobType) -> Self {
1072        Self {
1073            id: value.id,
1074            name: if value.name.is_empty() {
1075                None
1076            } else {
1077                Some(value.name)
1078            },
1079            description: if value.description.is_empty() {
1080                None
1081            } else {
1082                Some(value.description)
1083            },
1084            input_schema_json: value.input_schema_json,
1085            output_schema_json: value.output_schema_json,
1086        }
1087    }
1088}
1089
1090impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1091    type Error = Status;
1092
1093    fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1094        Ok(Self {
1095            execution_status: {
1096                let status = filter
1097                    .status()
1098                    .filter_map(|status| match status {
1099                        server::v1::JobExecutionStatus::Unspecified => None,
1100                        server::v1::JobExecutionStatus::Pending => {
1101                            Some(JobExecutionStatus::Pending)
1102                        }
1103                        server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1104                        server::v1::JobExecutionStatus::Assigned => {
1105                            Some(JobExecutionStatus::Assigned)
1106                        }
1107                        server::v1::JobExecutionStatus::Running => {
1108                            Some(JobExecutionStatus::Running)
1109                        }
1110                        server::v1::JobExecutionStatus::Succeeded => {
1111                            Some(JobExecutionStatus::Succeeded)
1112                        }
1113                        server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1114                    })
1115                    .collect::<IndexSet<_>>();
1116
1117                if status.is_empty() {
1118                    None
1119                } else {
1120                    Some(status)
1121                }
1122            },
1123            job_ids: if filter.job_ids.is_empty() {
1124                None
1125            } else {
1126                Some(
1127                    filter
1128                        .job_ids
1129                        .iter()
1130                        .map(|f| {
1131                            f.parse().map_err(|err| {
1132                                Status::invalid_argument(format!("invalid job ID: {err}"))
1133                            })
1134                        })
1135                        .collect::<Result<_, _>>()?,
1136                )
1137            },
1138            job_type_ids: if filter.job_type_ids.is_empty() {
1139                None
1140            } else {
1141                Some(filter.job_type_ids.into_iter().collect())
1142            },
1143            execution_ids: if filter.execution_ids.is_empty() {
1144                None
1145            } else {
1146                Some(
1147                    filter
1148                        .execution_ids
1149                        .iter()
1150                        .map(|f| {
1151                            f.parse().map_err(|err| {
1152                                Status::invalid_argument(format!("invalid execution ID: {err}"))
1153                            })
1154                        })
1155                        .collect::<Result<_, _>>()?,
1156                )
1157            },
1158            schedule_ids: if filter.schedule_ids.is_empty() {
1159                None
1160            } else {
1161                Some(
1162                    filter
1163                        .schedule_ids
1164                        .iter()
1165                        .map(|f| {
1166                            f.parse().map_err(|err| {
1167                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1168                            })
1169                        })
1170                        .collect::<Result<_, _>>()?,
1171                )
1172            },
1173            labels: if filter.labels.is_empty() {
1174                None
1175            } else {
1176                Some(
1177                    filter
1178                        .labels
1179                        .into_iter()
1180                        .filter_map(|label| {
1181                            Some((
1182                                label.key,
1183                                match label.value? {
1184                                    server::v1::job_label_filter::Value::Exists(_) => {
1185                                        JobLabelFilterValue::Exists
1186                                    }
1187                                    server::v1::job_label_filter::Value::Equals(value) => {
1188                                        JobLabelFilterValue::Equals(value)
1189                                    }
1190                                },
1191                            ))
1192                        })
1193                        .collect(),
1194                )
1195            },
1196            active: filter.active,
1197            created_after: filter
1198                .created_at
1199                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1200            created_before: filter
1201                .created_at
1202                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1203            target_execution_time_after: filter
1204                .target_execution_time
1205                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1206            target_execution_time_before: filter
1207                .target_execution_time
1208                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1209        })
1210    }
1211}
1212
1213impl From<ScheduleDetails> for server::v1::Schedule {
1214    fn from(value: ScheduleDetails) -> Self {
1215        Self {
1216            id: value.id.to_string(),
1217            definition: Some(common::v1::ScheduleDefinition {
1218                job_timing_policy: Some(match value.job_timing_policy {
1219                    ScheduleJobTimingPolicy::Repeat(policy) => {
1220                        common::v1::ScheduleJobTimingPolicy {
1221                            job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1222                                common::v1::ScheduleJobTimingPolicyRepeat {
1223                                    interval: policy.interval.try_into().ok(),
1224                                    immediate: policy.immediate,
1225                                    missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1226                                        policy.missed_policy,
1227                                    )
1228                                    .into(),
1229                                },
1230                            )),
1231                        }
1232                    }
1233                    ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1234                        job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1235                            common::v1::ScheduleJobTimingPolicyCron {
1236                                cron_expression: policy.cron_expression,
1237                                immediate: policy.immediate,
1238                                missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1239                                    policy.missed_policy,
1240                                )
1241                                .into(),
1242                            },
1243                        )),
1244                    },
1245                }),
1246                job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1247                    job_creation: Some(match value.job_creation_policy {
1248                        ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1249                            JobCreation::JobDefinition(common::v1::JobDefinition {
1250                                job_type_id: job_definition.job_type_id,
1251                                input_payload_json: job_definition.input_payload_json,
1252                                target_execution_time: None,
1253                                retry_policy: Some(common::v1::JobRetryPolicy {
1254                                    retries: job_definition.retry_policy.retries,
1255                                }),
1256                                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1257                                    timeout: job_definition
1258                                        .timeout_policy
1259                                        .timeout
1260                                        .and_then(|d| d.try_into().ok()),
1261                                    base_time: match job_definition.timeout_policy.base_time {
1262                                        JobTimeoutBaseTime::StartTime => {
1263                                            common::v1::JobTimeoutBaseTime::StartTime.into()
1264                                        }
1265                                        JobTimeoutBaseTime::TargetExecutionTime => {
1266                                            common::v1::JobTimeoutBaseTime::TargetExecutionTime
1267                                                .into()
1268                                        }
1269                                    },
1270                                }),
1271                                labels: job_definition
1272                                    .labels
1273                                    .into_iter()
1274                                    .map(|(key, value)| common::v1::JobLabel { key, value })
1275                                    .collect(),
1276                                metadata_json: None,
1277                            })
1278                        }
1279                    }),
1280                }),
1281                time_range: value.time_range.map(|range| TimeRange {
1282                    start: range.start.map(Into::into),
1283                    end: range.end.map(Into::into),
1284                }),
1285                labels: value
1286                    .labels
1287                    .into_iter()
1288                    .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1289                    .collect(),
1290                metadata_json: value.metadata_json,
1291            }),
1292            created_at: Some(value.created_at.into()),
1293            active: value.active,
1294            cancelled: value.cancelled,
1295        }
1296    }
1297}
1298
1299impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1300    fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1301        match value {
1302            server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1303            server::v1::ScheduleQueryOrder::CreatedAtDesc
1304            | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1305        }
1306    }
1307}
1308
1309impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1310    type Error = Status;
1311
1312    fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1313        Ok(Self {
1314            schedule_ids: if value.schedule_ids.is_empty() {
1315                None
1316            } else {
1317                Some(
1318                    value
1319                        .schedule_ids
1320                        .iter()
1321                        .map(|f| {
1322                            f.parse().map_err(|err| {
1323                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1324                            })
1325                        })
1326                        .collect::<Result<_, _>>()?,
1327                )
1328            },
1329            job_ids: if value.job_ids.is_empty() {
1330                None
1331            } else {
1332                Some(
1333                    value
1334                        .job_ids
1335                        .iter()
1336                        .map(|f| {
1337                            f.parse().map_err(|err| {
1338                                Status::invalid_argument(format!("invalid job ID: {err}"))
1339                            })
1340                        })
1341                        .collect::<Result<_, _>>()?,
1342                )
1343            },
1344            job_type_ids: if value.job_type_ids.is_empty() {
1345                None
1346            } else {
1347                Some(value.job_type_ids.into_iter().collect())
1348            },
1349            labels: if value.labels.is_empty() {
1350                None
1351            } else {
1352                Some(
1353                    value
1354                        .labels
1355                        .into_iter()
1356                        .filter_map(|label| {
1357                            Some((
1358                                label.key,
1359                                match label.value? {
1360                                    server::v1::schedule_label_filter::Value::Exists(_) => {
1361                                        ScheduleLabelFilterValue::Exists
1362                                    }
1363                                    server::v1::schedule_label_filter::Value::Equals(value) => {
1364                                        ScheduleLabelFilterValue::Equals(value)
1365                                    }
1366                                },
1367                            ))
1368                        })
1369                        .collect(),
1370                )
1371            },
1372            active: value.active,
1373            created_after: value
1374                .created_at
1375                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1376            created_before: value
1377                .created_at
1378                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1379        })
1380    }
1381}
1382
1383impl From<JobType> for snapshot::v1::ExportedJobType {
1384    fn from(job_type: JobType) -> Self {
1385        snapshot::v1::ExportedJobType {
1386            job_type: Some(common::v1::JobType {
1387                id: job_type.id,
1388                name: Some(job_type.name),
1389                description: Some(job_type.description),
1390                input_schema_json: job_type.input_schema_json,
1391                output_schema_json: job_type.output_schema_json,
1392            }),
1393        }
1394    }
1395}
1396
1397impl From<snapshot::v1::ExportedJobType> for JobType {
1398    fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1399        let job_type = job_type.job_type.unwrap();
1400
1401        Self {
1402            id: job_type.id,
1403            name: job_type.name.unwrap_or_default(),
1404            description: job_type.description.unwrap_or_default(),
1405            input_schema_json: job_type.input_schema_json,
1406            output_schema_json: job_type.output_schema_json,
1407        }
1408    }
1409}