ora_storage/
lib.rs

1//! Storage interface for the Ora server.
2
3use async_trait::async_trait;
4use futures::stream::BoxStream;
5use ora_proto::{
6    common::{
7        self,
8        v1::{
9            self, schedule_job_creation_policy::JobCreation, schedule_job_timing_policy,
10            JobDefinition, TimeRange,
11        },
12    },
13    server::{self, v1::Job},
14    snapshot,
15};
16use serde::{Deserialize, Serialize};
17use std::time::{Duration, SystemTime};
18use tonic::Status;
19use uuid::Uuid;
20
21/// Re-export the storage types.
22pub type IndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
23/// Re-export the storage types.
24pub type IndexSet<T> = indexmap::IndexSet<T, ahash::RandomState>;
25
26/// An interface for storing and querying job and schedule data used
27/// by the Ora server.
28#[async_trait]
29pub trait Storage: Send + Sync + 'static + Clone {
30    /// Add or update a job type.
31    async fn job_types_added(&self, job_types: Vec<JobType>) -> eyre::Result<()>;
32    /// Persist the given new jobs.
33    async fn jobs_added(&self, jobs: Vec<NewJob>) -> eyre::Result<()>;
34    /// Persist the given job if no jobs match the filter.
35    ///
36    /// If any jobs match the filter,
37    /// return the first one in arbitrary order.
38    async fn job_added_conditionally(
39        &self,
40        job: NewJob,
41        filters: JobQueryFilters,
42    ) -> eyre::Result<ConditionalJobResult>;
43    /// Cancel the given jobs.
44    ///
45    /// - Mark the job as cancelled.
46    /// - Mark the job as unschedulable.
47    ///
48    /// Returns the cancelled jobs with their active executions, if any.
49    async fn jobs_cancelled(
50        &self,
51        job_ids: &[Uuid],
52        timestamp: SystemTime,
53    ) -> eyre::Result<Vec<CancelledJob>>;
54    /// Add new executions.
55    async fn executions_added(
56        &self,
57        executions: Vec<NewExecution>,
58        timestamp: SystemTime,
59    ) -> eyre::Result<()>;
60    /// An execution is ready to be executed.
61    async fn executions_ready(
62        &self,
63        execution_ids: &[Uuid],
64        timestamp: SystemTime,
65    ) -> eyre::Result<()>;
66    /// An execution was assigned to an executor.
67    async fn execution_assigned(
68        &self,
69        execution_id: Uuid,
70        executor_id: Uuid,
71        timestamp: SystemTime,
72    ) -> eyre::Result<()>;
73    /// Set an execution as started.
74    async fn execution_started(
75        &self,
76        execution_id: Uuid,
77        timestamp: SystemTime,
78    ) -> eyre::Result<()>;
79    /// Set an execution as succeeded.
80    async fn execution_succeeded(
81        &self,
82        execution_id: Uuid,
83        timestamp: SystemTime,
84        output_payload_json: String,
85    ) -> eyre::Result<()>;
86    /// Set an execution as failed, if `mark_job_unschedulable` is false
87    /// the job is not marked as unschedulable, this happens when the job
88    /// is retried.
89    async fn executions_failed(
90        &self,
91        execution_ids: &[Uuid],
92        timestamp: SystemTime,
93        reason: String,
94        mark_job_unschedulable: bool,
95    ) -> eyre::Result<()>;
96
97    /// Return assigned executions that are not assigned to any of
98    /// the given executor IDs.
99    async fn orphan_execution_ids(&self, executor_ids: &[Uuid]) -> eyre::Result<Vec<Uuid>>;
100
101    /// Mark a job as unschedulable.
102    ///
103    /// No more executions will be created for the jobs and the given IDs must not be
104    /// returned by the `pending_jobs` method. However existing executions are not affected.
105    async fn jobs_unschedulable(&self, job_ids: &[Uuid], timestamp: SystemTime)
106        -> eyre::Result<()>;
107    /// Executions that are not yet ready to be executed.
108    ///
109    /// The returned values must be in ascending order,
110    /// an optional `after` parameter can be used to filter out values
111    /// that were created before the given UUID.
112    ///
113    /// If the `after` parameter is given, the stream must not include
114    /// the execution with the given UUID and any IDs that are smaller.
115    ///
116    /// The implementation may choose to return a limited
117    /// number of executions in a single call in order to avoid
118    /// resource exhaustion.
119    async fn pending_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingExecution>>;
120    /// Unassigned executions that are ready at the time of the call.
121    ///
122    /// The returned values must be in ascending order,
123    /// an optional `after` parameter can be used to filter out values
124    /// that were created before the given UUID.
125    ///
126    /// The returned executions must be ordered
127    /// by their ID in ascending order.
128    ///
129    /// The implementation may choose to return a limited
130    /// number of executions in a single call in order to avoid
131    /// resource exhaustion.
132    async fn ready_executions(&self, after: Option<Uuid>) -> eyre::Result<Vec<ReadyExecution>>;
133    /// Jobs that satisfy the following conditions:
134    ///
135    /// - have no executions in progress
136    /// - are active
137    ///
138    /// The returned values must be in ascending order,
139    /// an optional `after` parameter can be used to filter out values
140    /// that were created before the given UUID.
141    ///
142    /// The implementation may choose to return a limited
143    /// number of executions in a single call in order to avoid
144    /// resource exhaustion.
145    async fn pending_jobs(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingJob>>;
146
147    /// Query and return a list of jobs with the given parameters.
148    ///
149    /// A next token can be provided to continue the query from the last result.
150    async fn query_jobs(
151        &self,
152        cursor: Option<String>,
153        limit: usize,
154        order: JobQueryOrder,
155        filters: JobQueryFilters,
156    ) -> eyre::Result<JobQueryResult>;
157
158    /// Query and return a list of job IDs with the given parameters.
159    async fn query_job_ids(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
160
161    /// Count the number of jobs that satisfy the given filters.
162    async fn count_jobs(&self, filters: JobQueryFilters) -> eyre::Result<u64>;
163
164    /// Query and return a list of job types with the given parameters.
165    async fn query_job_types(&self) -> eyre::Result<Vec<JobType>>;
166
167    /// Remove jobs and all related data, returns the removed job IDs.
168    ///
169    /// This should also remove all executions created by the jobs.
170    async fn delete_jobs(&self, filters: JobQueryFilters) -> eyre::Result<Vec<Uuid>>;
171
172    /// Persist the given new schedules.
173    async fn schedules_added(&self, schedules: Vec<NewSchedule>) -> eyre::Result<()>;
174
175    /// Persist the given schedule if no schedules match the filter.
176    ///
177    /// If any schedules match the filter,
178    /// return the first one in arbitrary order.
179    async fn schedule_added_conditionally(
180        &self,
181        schedule: NewSchedule,
182        filters: ScheduleQueryFilters,
183    ) -> eyre::Result<ConditionalScheduleResult>;
184
185    /// Cancel the given schedules.
186    ///
187    /// - Mark the schedule as cancelled.
188    /// - Mark the schedule as unschedulable.
189    ///
190    /// Returns the cancelled schedules.
191    async fn schedules_cancelled(
192        &self,
193        schedule_ids: &[Uuid],
194        timestamp: SystemTime,
195    ) -> eyre::Result<Vec<CancelledSchedule>>;
196
197    /// Mark the given schedules as unschedulable.
198    async fn schedules_unschedulable(
199        &self,
200        schedule_ids: &[Uuid],
201        timestamp: SystemTime,
202    ) -> eyre::Result<()>;
203
204    /// Return all pending schedules.
205    ///
206    /// A pending schedule is a schedule that
207    /// satisfies all the following conditions:
208    /// - is active
209    /// - has not been cancelled
210    /// - has no active jobs
211    ///
212    /// The returned values must be in ascending order,
213    /// an optional `after` parameter can be used to filter out values
214    /// that were created before the given UUID.
215    ///
216    /// The implementation may choose to return a limited
217    /// number of executions in a single call in order to avoid
218    /// resource exhaustion.
219    async fn pending_schedules(&self, after: Option<Uuid>) -> eyre::Result<Vec<PendingSchedule>>;
220
221    /// Query and return a list of schedules with the given parameters.
222    ///
223    /// A next token can be provided to continue the query from the last result.
224    async fn query_schedules(
225        &self,
226        cursor: Option<String>,
227        limit: usize,
228        filters: ScheduleQueryFilters,
229        order: ScheduleQueryOrder,
230    ) -> eyre::Result<ScheduleQueryResult>;
231
232    /// Query and return a list of schedule IDs with the given parameters.
233    async fn query_schedule_ids(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
234
235    /// Count the number of schedules that satisfy the given filters.
236    async fn count_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<u64>;
237
238    /// Remove schedules and all related data, returns the removed schedule IDs.
239    ///
240    /// This should also remove all jobs created by the schedules.
241    async fn delete_schedules(&self, filters: ScheduleQueryFilters) -> eyre::Result<Vec<Uuid>>;
242}
243
244/// A trait for storages that support
245/// exporting and importing snapshots of their data.
246#[async_trait]
247pub trait StorageSnapshot {
248    /// Export a snapshot of the storage.
249    fn export_snapshot(&self) -> BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>;
250
251    /// Import a snapshot of the storage.
252    ///
253    /// The snapshot stream must be consumed to completion.
254    ///
255    /// Whether data is overwritten or merged is up to the implementation.
256    async fn import_snapshot(
257        &self,
258        snapshot: BoxStream<'static, eyre::Result<snapshot::v1::SnapshotData>>,
259    ) -> eyre::Result<()>;
260}
261
262/// Essential data for a job.
263#[derive(Debug, Clone)]
264pub struct NewJob {
265    /// The unique identifier of the job.
266    pub id: Uuid,
267    /// The schedule ID of the job.
268    pub schedule_id: Option<Uuid>,
269    /// The time the job was created.
270    pub created_at: SystemTime,
271    /// The job type ID.
272    pub job_type_id: String,
273    /// The target execution time of the job.
274    pub target_execution_time: SystemTime,
275    /// The input payload of the job.
276    pub input_payload_json: String,
277    /// Timeout policy for the job.
278    pub timeout_policy: JobTimeoutPolicy,
279    /// Retry policy for the job.
280    pub retry_policy: JobRetryPolicy,
281    /// Labels of the job.
282    pub labels: IndexMap<String, String>,
283    /// Arbitrary metadata in JSON format.
284    pub metadata_json: Option<String>,
285}
286
287/// A job that was added conditionally.
288pub enum ConditionalJobResult {
289    /// The job was added successfully.
290    Added,
291    /// A matching job already exists.
292    AlreadyExists {
293        /// The ID of the existing job.
294        job_id: Uuid,
295    },
296}
297
298/// A pending job.
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub struct PendingJob {
301    /// The unique identifier of the job.
302    pub id: Uuid,
303    /// The target execution time of the job.
304    pub target_execution_time: SystemTime,
305    /// The number of previous executions.
306    pub execution_count: u64,
307    /// The retry policy for the job.
308    pub retry_policy: JobRetryPolicy,
309    /// The timeout policy for the job.
310    pub timeout_policy: JobTimeoutPolicy,
311}
312
313/// Job timeout policy.
314#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
315pub struct JobTimeoutPolicy {
316    /// The timeout in seconds.
317    pub timeout: Option<Duration>,
318    /// The base time for the timeout.
319    ///
320    /// The timeout is calculated from this time.
321    pub base_time: JobTimeoutBaseTime,
322}
323
324impl From<v1::JobTimeoutPolicy> for JobTimeoutPolicy {
325    fn from(proto: v1::JobTimeoutPolicy) -> Self {
326        Self {
327            timeout: proto.timeout.and_then(|d| d.try_into().ok()),
328            base_time: proto.base_time().into(),
329        }
330    }
331}
332
333impl From<JobTimeoutPolicy> for v1::JobTimeoutPolicy {
334    fn from(policy: JobTimeoutPolicy) -> Self {
335        Self {
336            timeout: policy.timeout.and_then(|d| d.try_into().ok()),
337            base_time: v1::JobTimeoutBaseTime::from(policy.base_time).into(),
338        }
339    }
340}
341
342impl From<v1::JobTimeoutBaseTime> for JobTimeoutBaseTime {
343    fn from(proto: v1::JobTimeoutBaseTime) -> Self {
344        match proto {
345            v1::JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
346            v1::JobTimeoutBaseTime::StartTime | v1::JobTimeoutBaseTime::Unspecified => {
347                Self::StartTime
348            }
349        }
350    }
351}
352
353impl From<JobTimeoutBaseTime> for v1::JobTimeoutBaseTime {
354    fn from(base_time: JobTimeoutBaseTime) -> Self {
355        match base_time {
356            JobTimeoutBaseTime::StartTime => Self::StartTime,
357            JobTimeoutBaseTime::TargetExecutionTime => Self::TargetExecutionTime,
358        }
359    }
360}
361
362/// The base time for the timeout.
363#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
364pub enum JobTimeoutBaseTime {
365    /// The base time is the start time of the job.
366    #[default]
367    StartTime,
368    /// The base time is the target execution time of the job.
369    ///
370    /// Note that if the target execution time is not set,
371    /// the timeout is calculated from the start time of the job.
372    ///
373    /// If the target execution time is in the past,
374    /// the jobs may be immediately timed out.
375    TargetExecutionTime,
376}
377
378/// Job retry policy.
379#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
380pub struct JobRetryPolicy {
381    /// The number of retries for the job.
382    ///
383    /// If the number of retries is zero, the job is not retried.
384    pub retries: u64,
385}
386
387impl From<v1::JobRetryPolicy> for JobRetryPolicy {
388    fn from(proto: v1::JobRetryPolicy) -> Self {
389        Self {
390            retries: proto.retries,
391        }
392    }
393}
394
395impl From<JobRetryPolicy> for v1::JobRetryPolicy {
396    fn from(policy: JobRetryPolicy) -> Self {
397        Self {
398            retries: policy.retries,
399        }
400    }
401}
402
403/// A new pending execution.
404#[derive(Debug, Clone)]
405pub struct NewExecution {
406    /// The unique identifier of the execution.
407    pub id: Uuid,
408    /// The unique identifier of the job.
409    pub job_id: Uuid,
410    /// Attempt number of the execution.
411    pub attempt_number: u64,
412    /// The target execution time of the job.
413    pub target_execution_time: SystemTime,
414}
415
416/// A new pending execution.
417#[derive(Debug, Clone, PartialEq, Eq)]
418pub struct PendingExecution {
419    /// The unique identifier of the execution.
420    pub id: Uuid,
421    /// The target execution time of the job.
422    pub target_execution_time: SystemTime,
423}
424
425/// An execution that is ready to be executed.
426#[derive(Debug, Clone, PartialEq, Eq)]
427pub struct ReadyExecution {
428    /// The unique identifier of the execution.
429    pub id: Uuid,
430    /// The unique identifier of the job.
431    pub job_id: Uuid,
432    /// The input payload of the job.
433    pub input_payload_json: String,
434    /// Attempt number of the execution.
435    pub attempt_number: u64,
436    /// The job type ID.
437    pub job_type_id: String,
438    /// The target execution time of the job.
439    pub target_execution_time: SystemTime,
440    /// Timeout policy for the job.
441    pub timeout_policy: JobTimeoutPolicy,
442}
443
444/// A job type.
445#[derive(Debug, Clone, PartialEq, Eq)]
446pub struct JobType {
447    /// The ID of the job type.
448    pub id: String,
449    /// The name of the job type.
450    pub name: String,
451    /// The description of the job type.
452    pub description: String,
453    /// The input schema of the job type.
454    pub input_schema_json: Option<String>,
455    /// The output schema of the job type.
456    pub output_schema_json: Option<String>,
457}
458
459/// Filters for querying jobs.
460#[derive(Debug, Default, Clone, Serialize, Deserialize)]
461pub struct JobQueryFilters {
462    /// Job IDs to filter the query results by.
463    pub job_ids: Option<IndexSet<Uuid>>,
464    /// Job type IDs to filter the query results by.
465    pub job_type_ids: Option<IndexSet<String>>,
466    /// Execution IDs to filter the query results by.
467    pub execution_ids: Option<IndexSet<Uuid>>,
468    /// Schedule IDs to filter the query results by.
469    pub schedule_ids: Option<IndexSet<Uuid>>,
470    /// Execution status to filter the query results by.
471    pub execution_status: Option<IndexSet<JobExecutionStatus>>,
472    /// Labels to filter the query results by.
473    pub labels: Option<IndexMap<String, JobLabelFilterValue>>,
474    /// Whether to return active or inactive jobs only.
475    pub active: Option<bool>,
476    /// Jobs created after the given time, inclusive.
477    pub created_after: Option<SystemTime>,
478    /// Jobs created before the given time, exclusive.
479    pub created_before: Option<SystemTime>,
480    /// Jobs with target execution time after the given time, inclusive.
481    pub target_execution_time_after: Option<SystemTime>,
482    /// Jobs with target execution time before the given time, exclusive.
483    pub target_execution_time_before: Option<SystemTime>,
484}
485
486/// The order of jobs returned.
487#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
488pub enum JobQueryOrder {
489    /// Order by the time the job was created in ascending order.
490    CreatedAtAsc,
491    /// Order by the time the job was created in descending order.
492    CreatedAtDesc,
493    /// Order by the target execution time in ascending order.
494    TargetExecutionTimeAsc,
495    /// Order by the target execution time in descending order.
496    TargetExecutionTimeDesc,
497}
498
499/// The results of a job query.
500#[derive(Debug, Clone)]
501pub struct JobQueryResult {
502    /// The jobs that satisfy the query.
503    pub jobs: Vec<JobDetails>,
504    /// A cursor that can be used to continue the query.
505    pub cursor: Option<String>,
506    /// Whether there are more results to query.
507    pub has_more: bool,
508}
509
510/// Job status used for querying jobs.
511#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
512pub enum JobExecutionStatus {
513    /// The job execution is pending and is not ready to run.
514    Pending,
515    /// The job execution is ready to run.
516    Ready,
517    /// The job execution is assigned to an executor.
518    Assigned,
519    /// The job execution is running.
520    Running,
521    /// The job execution is completed successfully.
522    Succeeded,
523    /// The job execution is failed.
524    Failed,
525}
526
527/// Job label filter.
528#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
529pub enum JobLabelFilterValue {
530    /// The label exists with any value.
531    Exists,
532    /// The label does not exist.
533    Equals(String),
534}
535
536/// All core information about a job.
537#[derive(Debug, Clone, PartialEq, Eq)]
538pub struct JobDetails {
539    /// Whether the job is active.
540    pub active: bool,
541    /// Whether the job was cancelled.
542    pub cancelled: bool,
543    /// The unique identifier of the job.
544    pub id: Uuid,
545    /// The ID of the job type.
546    pub job_type_id: String,
547    /// The schedule ID of the job.
548    pub schedule_id: Option<Uuid>,
549    /// The target execution time of the job.
550    ///
551    /// If not provided, it should be set to the current time.
552    pub target_execution_time: SystemTime,
553    /// The job input payload JSON that is passed to the executor.
554    pub input_payload_json: String,
555    /// The labels of the job.
556    pub labels: IndexMap<String, String>,
557    /// The timeout policy of the job.
558    pub timeout_policy: JobTimeoutPolicy,
559    /// Retry policy for the job.
560    pub retry_policy: JobRetryPolicy,
561    /// The creation time of the job.
562    pub created_at: SystemTime,
563    /// A list of executions for the job.
564    pub executions: Vec<ExecutionDetails>,
565    /// Arbitrary metadata in JSON format.
566    pub metadata_json: Option<String>,
567}
568
569/// All core information about an execution
570/// that is associated with a job.
571#[derive(Debug, Clone, PartialEq, Eq)]
572pub struct ExecutionDetails {
573    /// The ID of the job execution.
574    pub id: Uuid,
575    /// The ID of the job.
576    pub job_id: Uuid,
577    /// The ID of the associated executor.
578    pub executor_id: Option<Uuid>,
579    /// The status of the job execution.
580    pub status: JobExecutionStatus,
581    /// The time the job execution was created.
582    pub created_at: SystemTime,
583    /// The time the job execution was marked as ready.
584    pub ready_at: Option<SystemTime>,
585    /// The time the job execution was assigned to an executor.
586    pub assigned_at: Option<SystemTime>,
587    /// The time the job execution has started.
588    pub started_at: Option<SystemTime>,
589    /// The time the job execution has succeeded.
590    pub succeeded_at: Option<SystemTime>,
591    /// The time the job execution has failed.
592    pub failed_at: Option<SystemTime>,
593    /// The output payload of the execution.
594    pub output_payload_json: Option<String>,
595    /// The error message of the execution.
596    pub failure_reason: Option<String>,
597}
598
599/// A job that was cancelled.
600#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct CancelledJob {
602    /// The unique identifier of the job.
603    pub id: Uuid,
604    /// Active execution of the job.
605    pub active_execution: Option<Uuid>,
606}
607
608/// A new schedule.
609#[derive(Debug, Clone)]
610pub struct NewSchedule {
611    /// The unique identifier of the schedule.
612    pub id: Uuid,
613    /// The time the schedule was created.
614    pub created_at: SystemTime,
615    /// Scheduling policy for the schedule.
616    pub job_timing_policy: ScheduleJobTimingPolicy,
617    /// Policy for new jobs created by the schedule.
618    pub job_creation_policy: ScheduleJobCreationPolicy,
619    /// Labels of the job.
620    pub labels: IndexMap<String, String>,
621    /// The time range for the schedule.
622    pub time_range: Option<ScheduleTimeRange>,
623    /// Arbitrary metadata in JSON format.
624    pub metadata_json: Option<String>,
625}
626
627/// Conditionally added schedule result.
628pub enum ConditionalScheduleResult {
629    /// The schedule was added successfully.
630    Added,
631    /// A matching schedule already exists.
632    AlreadyExists {
633        /// The ID of the existing schedule.
634        schedule_id: Uuid,
635    },
636}
637
638/// Scheduling policy for a schedule.
639#[derive(Debug, Clone, PartialEq, Eq)]
640pub enum ScheduleJobTimingPolicy {
641    /// A schedule that repeats.
642    Repeat(SchedulingPolicyRepeat),
643    /// A schedule based on a cron expression.
644    Cron(SchedulingPolicyCron),
645}
646
647/// Scheduling policy for a schedule that repeats.
648#[derive(Debug, Clone, PartialEq, Eq)]
649pub struct SchedulingPolicyRepeat {
650    /// The interval between each job.
651    pub interval: Duration,
652    /// Whether the schedule should create a job immediately.
653    pub immediate: bool,
654    /// The policy for missed jobs.
655    pub missed_policy: ScheduleMissedTimePolicy,
656}
657
658/// Scheduling policy based on a cron expression.
659#[derive(Debug, Clone, PartialEq, Eq)]
660pub struct SchedulingPolicyCron {
661    /// The cron expression.
662    pub cron_expression: String,
663    /// Whether the schedule should create a job immediately.
664    pub immediate: bool,
665    /// The policy for missed jobs.
666    pub missed_policy: ScheduleMissedTimePolicy,
667}
668
669/// Policy for missed jobs.
670#[derive(Debug, Clone, PartialEq, Eq)]
671pub enum ScheduleMissedTimePolicy {
672    /// Skip any missed times.
673    Skip,
674    /// Create a job for each missed time.
675    Create,
676}
677
678/// Policy for new jobs created by a schedule.
679#[derive(Debug, Clone, PartialEq, Eq)]
680pub enum ScheduleJobCreationPolicy {
681    /// Create a new job from the given job definition.
682    JobDefinition(ScheduleNewJobDefinition),
683}
684
685/// A job definition for a new job created by a schedule.
686#[derive(Debug, Clone, PartialEq, Eq)]
687pub struct ScheduleNewJobDefinition {
688    /// The job type ID.
689    pub job_type_id: String,
690    /// The input payload of the job.
691    pub input_payload_json: String,
692    /// Timeout policy for the job.
693    pub timeout_policy: JobTimeoutPolicy,
694    /// Retry policy for the job.
695    pub retry_policy: JobRetryPolicy,
696    /// Labels of the job.
697    pub labels: IndexMap<String, String>,
698}
699
700/// A schedule that was cancelled.
701#[derive(Debug, Clone, PartialEq, Eq)]
702pub struct CancelledSchedule {
703    /// The unique identifier of the schedule.
704    pub id: Uuid,
705}
706
707/// A pending schedule.
708#[derive(Debug, Clone, PartialEq, Eq)]
709pub struct PendingSchedule {
710    /// The unique identifier of the schedule.
711    pub id: Uuid,
712    /// Scheduling policy for the schedule.
713    pub job_timing_policy: ScheduleJobTimingPolicy,
714    /// Policy for new jobs created by the schedule.
715    pub job_creation_policy: ScheduleJobCreationPolicy,
716    /// The target execution time of the last
717    /// job created by the schedule, if any.
718    pub last_target_execution_time: Option<SystemTime>,
719    /// The time range for the schedule.
720    pub time_range: Option<ScheduleTimeRange>,
721}
722
723/// Filters for querying schedules.
724#[derive(Debug, Default, Clone, Serialize, Deserialize)]
725pub struct ScheduleQueryFilters {
726    /// Schedule IDs to filter the query results by.
727    pub schedule_ids: Option<IndexSet<Uuid>>,
728    /// Job IDs to filter the query results by.
729    pub job_ids: Option<IndexSet<Uuid>>,
730    /// Job type IDs to filter the query results by.
731    pub job_type_ids: Option<IndexSet<String>>,
732    /// Labels to filter the query results by.
733    pub labels: Option<IndexMap<String, ScheduleLabelFilterValue>>,
734    /// Whether to return active or inactive schedules only.
735    pub active: Option<bool>,
736    /// Schedules created after the given time, inclusive.
737    pub created_after: Option<SystemTime>,
738    /// Schedules created before the given time, exclusive.
739    pub created_before: Option<SystemTime>,
740}
741
742/// The order of jobs returned.
743#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
744pub enum ScheduleQueryOrder {
745    /// Order by the time the job was created in ascending order.
746    CreatedAtAsc,
747    /// Order by the time the job was created in descending order.
748    CreatedAtDesc,
749}
750
751/// The results of a schedule query.
752#[derive(Debug, Clone)]
753pub struct ScheduleQueryResult {
754    /// The schedules that satisfy the query.
755    pub schedules: Vec<ScheduleDetails>,
756    /// A cursor that can be used to continue the query.
757    pub cursor: Option<String>,
758    /// Whether there are more results to query.
759    pub has_more: bool,
760}
761
762/// Details of a schedule.
763///
764/// Associated jobs are not included on purpose
765/// as there can be many jobs associated with a schedule,
766/// additional queries can be made to get the jobs.
767#[derive(Debug, Clone, PartialEq, Eq)]
768pub struct ScheduleDetails {
769    /// The unique identifier of the schedule.
770    pub id: Uuid,
771    /// The time the schedule was created.
772    pub created_at: SystemTime,
773    /// Scheduling policy for the schedule.
774    pub job_timing_policy: ScheduleJobTimingPolicy,
775    /// Policy for new jobs created by the schedule.
776    pub job_creation_policy: ScheduleJobCreationPolicy,
777    /// Labels of the schedule.
778    pub labels: IndexMap<String, String>,
779    /// Whether the schedule is active.
780    pub active: bool,
781    /// Whether the schedule was cancelled.
782    pub cancelled: bool,
783    /// The time range for the schedule.
784    pub time_range: Option<ScheduleTimeRange>,
785    /// Arbitrary metadata in JSON format.
786    pub metadata_json: Option<String>,
787}
788
789/// Schedule label filter.
790#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
791pub enum ScheduleLabelFilterValue {
792    /// The label exists with any value.
793    Exists,
794    /// The label does not exist.
795    Equals(String),
796}
797
798/// The time range for a schedule.
799#[derive(Debug, Clone, Copy, PartialEq, Eq)]
800pub struct ScheduleTimeRange {
801    /// The schedule must not start before this time.
802    pub start: Option<SystemTime>,
803    /// The schedule must end before this time.
804    pub end: Option<SystemTime>,
805}
806
807impl ScheduleTimeRange {
808    /// Check if the given time is within the time range.
809    #[must_use]
810    pub fn contains(&self, time: SystemTime) -> bool {
811        if let Some(start) = self.start {
812            if time < start {
813                return false;
814            }
815        }
816        if let Some(end) = self.end {
817            if time >= end {
818                return false;
819            }
820        }
821        true
822    }
823
824    /// Check if the time range is valid.
825    #[must_use]
826    pub fn is_valid(&self) -> bool {
827        if let (Some(start), Some(end)) = (self.start, self.end) {
828            start < end
829        } else {
830            true
831        }
832    }
833}
834
835impl From<ScheduleJobTimingPolicy> for common::v1::ScheduleJobTimingPolicy {
836    fn from(value: ScheduleJobTimingPolicy) -> Self {
837        match value {
838            ScheduleJobTimingPolicy::Repeat(policy) => common::v1::ScheduleJobTimingPolicy {
839                job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
840                    common::v1::ScheduleJobTimingPolicyRepeat {
841                        interval: Some(policy.interval.try_into().unwrap()),
842                        immediate: policy.immediate,
843                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
844                            policy.missed_policy,
845                        )
846                        .into(),
847                    },
848                )),
849            },
850            ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
851                job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
852                    common::v1::ScheduleJobTimingPolicyCron {
853                        cron_expression: policy.cron_expression,
854                        immediate: policy.immediate,
855                        missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
856                            policy.missed_policy,
857                        )
858                        .into(),
859                    },
860                )),
861            },
862        }
863    }
864}
865
866impl TryFrom<common::v1::ScheduleJobTimingPolicy> for ScheduleJobTimingPolicy {
867    type Error = Status;
868
869    fn try_from(value: common::v1::ScheduleJobTimingPolicy) -> Result<Self, Self::Error> {
870        let value = value.job_timing.ok_or_else(|| {
871            Status::invalid_argument("missing job timing policy in schedule definition")
872        })?;
873
874        match value {
875            schedule_job_timing_policy::JobTiming::Repeat(policy) => {
876                Ok(Self::Repeat(SchedulingPolicyRepeat {
877                    interval: policy
878                        .interval
879                        .ok_or_else(|| {
880                            Status::invalid_argument("missing interval in repeat policy")
881                        })?
882                        .try_into()
883                        .map_err(|_| Status::invalid_argument("invalid interval"))?,
884                    immediate: policy.immediate,
885                    missed_policy: policy.missed_time_policy().into(),
886                }))
887            }
888            schedule_job_timing_policy::JobTiming::Cron(policy) => {
889                Ok(Self::Cron(SchedulingPolicyCron {
890                    missed_policy: policy.missed_time_policy().into(),
891                    cron_expression: {
892                        let mut parse_options = cronexpr::ParseOptions::default();
893                        parse_options.fallback_timezone_option =
894                            cronexpr::FallbackTimezoneOption::UTC;
895
896                        // Validate the cron expression.
897                        cronexpr::parse_crontab_with(&policy.cron_expression, parse_options)
898                            .map_err(|err| {
899                                Status::invalid_argument(format!("invalid cron expression: {err}"))
900                            })?;
901
902                        policy.cron_expression
903                    },
904                    immediate: policy.immediate,
905                }))
906            }
907        }
908    }
909}
910
911impl From<ScheduleJobCreationPolicy> for common::v1::ScheduleJobCreationPolicy {
912    fn from(value: ScheduleJobCreationPolicy) -> Self {
913        match value {
914            ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
915                common::v1::ScheduleJobCreationPolicy {
916                    job_creation: Some(JobCreation::JobDefinition(common::v1::JobDefinition {
917                        job_type_id: job_definition.job_type_id,
918                        input_payload_json: job_definition.input_payload_json,
919                        target_execution_time: None,
920                        retry_policy: Some(common::v1::JobRetryPolicy {
921                            retries: job_definition.retry_policy.retries,
922                        }),
923                        timeout_policy: Some(common::v1::JobTimeoutPolicy {
924                            timeout: job_definition
925                                .timeout_policy
926                                .timeout
927                                .and_then(|d| d.try_into().ok()),
928                            base_time: match job_definition.timeout_policy.base_time {
929                                JobTimeoutBaseTime::StartTime => {
930                                    common::v1::JobTimeoutBaseTime::StartTime.into()
931                                }
932                                JobTimeoutBaseTime::TargetExecutionTime => {
933                                    common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
934                                }
935                            },
936                        }),
937                        labels: job_definition
938                            .labels
939                            .into_iter()
940                            .map(|(key, value)| common::v1::JobLabel { key, value })
941                            .collect(),
942                        metadata_json: None,
943                    })),
944                }
945            }
946        }
947    }
948}
949
950impl TryFrom<common::v1::ScheduleJobCreationPolicy> for ScheduleJobCreationPolicy {
951    type Error = Status;
952
953    fn try_from(value: common::v1::ScheduleJobCreationPolicy) -> Result<Self, Self::Error> {
954        let value = value.job_creation.ok_or_else(|| {
955            Status::invalid_argument("missing job creation policy in schedule definition")
956        })?;
957
958        match value {
959            JobCreation::JobDefinition(job_definition) => {
960                Ok(Self::JobDefinition(ScheduleNewJobDefinition {
961                    job_type_id: job_definition.job_type_id,
962                    input_payload_json: job_definition.input_payload_json,
963                    timeout_policy: job_definition
964                        .timeout_policy
965                        .map(Into::into)
966                        .unwrap_or_default(),
967                    retry_policy: job_definition
968                        .retry_policy
969                        .map(Into::into)
970                        .unwrap_or_default(),
971                    labels: job_definition
972                        .labels
973                        .into_iter()
974                        .map(|label| (label.key, label.value))
975                        .collect(),
976                }))
977            }
978        }
979    }
980}
981
982impl From<ScheduleTimeRange> for TimeRange {
983    fn from(value: ScheduleTimeRange) -> Self {
984        Self {
985            start: value.start.map(Into::into),
986            end: value.end.map(Into::into),
987        }
988    }
989}
990
991impl TryFrom<TimeRange> for ScheduleTimeRange {
992    type Error = Status;
993
994    fn try_from(value: TimeRange) -> Result<Self, Self::Error> {
995        Ok(Self {
996            start: value
997                .start
998                .map(SystemTime::try_from)
999                .transpose()
1000                .map_err(|error| {
1001                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
1002                })?,
1003            end: value
1004                .end
1005                .map(SystemTime::try_from)
1006                .transpose()
1007                .map_err(|error| {
1008                    Status::invalid_argument(format!("unsupported timestamp: {error}"))
1009                })?,
1010        })
1011    }
1012}
1013
1014impl From<common::v1::ScheduleMissedTimePolicy> for ScheduleMissedTimePolicy {
1015    fn from(value: common::v1::ScheduleMissedTimePolicy) -> Self {
1016        match value {
1017            common::v1::ScheduleMissedTimePolicy::Skip
1018            | common::v1::ScheduleMissedTimePolicy::Unspecified => Self::Skip,
1019            common::v1::ScheduleMissedTimePolicy::Create => Self::Create,
1020        }
1021    }
1022}
1023
1024impl From<ScheduleMissedTimePolicy> for common::v1::ScheduleMissedTimePolicy {
1025    fn from(value: ScheduleMissedTimePolicy) -> Self {
1026        match value {
1027            ScheduleMissedTimePolicy::Skip => Self::Skip,
1028            ScheduleMissedTimePolicy::Create => Self::Create,
1029        }
1030    }
1031}
1032
1033impl From<server::v1::JobQueryOrder> for JobQueryOrder {
1034    fn from(value: server::v1::JobQueryOrder) -> Self {
1035        match value {
1036            server::v1::JobQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1037            server::v1::JobQueryOrder::CreatedAtDesc | server::v1::JobQueryOrder::Unspecified => {
1038                Self::CreatedAtDesc
1039            }
1040            server::v1::JobQueryOrder::TargetExecutionTimeAsc => Self::TargetExecutionTimeAsc,
1041            server::v1::JobQueryOrder::TargetExecutionTimeDesc => Self::TargetExecutionTimeDesc,
1042        }
1043    }
1044}
1045
1046impl From<JobDetails> for Job {
1047    fn from(job: JobDetails) -> Self {
1048        Job {
1049            id: job.id.to_string(),
1050            schedule_id: job.schedule_id.map(|t| t.to_string()),
1051            cancelled: job.cancelled,
1052            active: job.active,
1053            definition: Some(JobDefinition {
1054                job_type_id: job.job_type_id,
1055                input_payload_json: job.input_payload_json,
1056                target_execution_time: Some(job.target_execution_time.into()),
1057                retry_policy: Some(common::v1::JobRetryPolicy {
1058                    retries: job.retry_policy.retries,
1059                }),
1060                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1061                    timeout: job.timeout_policy.timeout.and_then(|d| d.try_into().ok()),
1062                    base_time: match job.timeout_policy.base_time {
1063                        JobTimeoutBaseTime::StartTime => {
1064                            common::v1::JobTimeoutBaseTime::StartTime.into()
1065                        }
1066                        JobTimeoutBaseTime::TargetExecutionTime => {
1067                            common::v1::JobTimeoutBaseTime::TargetExecutionTime.into()
1068                        }
1069                    },
1070                }),
1071                labels: job
1072                    .labels
1073                    .into_iter()
1074                    .map(|(key, value)| common::v1::JobLabel { key, value })
1075                    .collect(),
1076                metadata_json: job.metadata_json,
1077            }),
1078            created_at: Some(job.created_at.into()),
1079            executions: job.executions.into_iter().map(Into::into).collect(),
1080        }
1081    }
1082}
1083
1084impl From<ExecutionDetails> for server::v1::JobExecution {
1085    fn from(value: ExecutionDetails) -> Self {
1086        Self {
1087            id: value.id.to_string(),
1088            job_id: value.job_id.to_string(),
1089            executor_id: value.executor_id.map(|t| t.to_string()),
1090            status: match value.status {
1091                JobExecutionStatus::Pending => server::v1::JobExecutionStatus::Pending,
1092                JobExecutionStatus::Ready => server::v1::JobExecutionStatus::Ready,
1093                JobExecutionStatus::Assigned => server::v1::JobExecutionStatus::Assigned,
1094                JobExecutionStatus::Running => server::v1::JobExecutionStatus::Running,
1095                JobExecutionStatus::Succeeded => server::v1::JobExecutionStatus::Succeeded,
1096                JobExecutionStatus::Failed => server::v1::JobExecutionStatus::Failed,
1097            }
1098            .into(),
1099            created_at: Some(value.created_at.into()),
1100            ready_at: value.ready_at.map(Into::into),
1101            assigned_at: value.assigned_at.map(Into::into),
1102            started_at: value.started_at.map(Into::into),
1103            succeeded_at: value.succeeded_at.map(Into::into),
1104            failed_at: value.failed_at.map(Into::into),
1105            output_payload_json: value.output_payload_json,
1106            failure_reason: value.failure_reason,
1107        }
1108    }
1109}
1110
1111impl From<JobType> for ora_proto::common::v1::JobType {
1112    fn from(value: JobType) -> Self {
1113        Self {
1114            id: value.id,
1115            name: if value.name.is_empty() {
1116                None
1117            } else {
1118                Some(value.name)
1119            },
1120            description: if value.description.is_empty() {
1121                None
1122            } else {
1123                Some(value.description)
1124            },
1125            input_schema_json: value.input_schema_json,
1126            output_schema_json: value.output_schema_json,
1127        }
1128    }
1129}
1130
1131impl TryFrom<server::v1::JobQueryFilter> for JobQueryFilters {
1132    type Error = Status;
1133
1134    fn try_from(filter: server::v1::JobQueryFilter) -> Result<Self, Self::Error> {
1135        Ok(Self {
1136            execution_status: {
1137                let status = filter
1138                    .status()
1139                    .filter_map(|status| match status {
1140                        server::v1::JobExecutionStatus::Unspecified => None,
1141                        server::v1::JobExecutionStatus::Pending => {
1142                            Some(JobExecutionStatus::Pending)
1143                        }
1144                        server::v1::JobExecutionStatus::Ready => Some(JobExecutionStatus::Ready),
1145                        server::v1::JobExecutionStatus::Assigned => {
1146                            Some(JobExecutionStatus::Assigned)
1147                        }
1148                        server::v1::JobExecutionStatus::Running => {
1149                            Some(JobExecutionStatus::Running)
1150                        }
1151                        server::v1::JobExecutionStatus::Succeeded => {
1152                            Some(JobExecutionStatus::Succeeded)
1153                        }
1154                        server::v1::JobExecutionStatus::Failed => Some(JobExecutionStatus::Failed),
1155                    })
1156                    .collect::<IndexSet<_>>();
1157
1158                if status.is_empty() {
1159                    None
1160                } else {
1161                    Some(status)
1162                }
1163            },
1164            job_ids: if filter.job_ids.is_empty() {
1165                None
1166            } else {
1167                Some(
1168                    filter
1169                        .job_ids
1170                        .iter()
1171                        .map(|f| {
1172                            f.parse().map_err(|err| {
1173                                Status::invalid_argument(format!("invalid job ID: {err}"))
1174                            })
1175                        })
1176                        .collect::<Result<_, _>>()?,
1177                )
1178            },
1179            job_type_ids: if filter.job_type_ids.is_empty() {
1180                None
1181            } else {
1182                Some(filter.job_type_ids.into_iter().collect())
1183            },
1184            execution_ids: if filter.execution_ids.is_empty() {
1185                None
1186            } else {
1187                Some(
1188                    filter
1189                        .execution_ids
1190                        .iter()
1191                        .map(|f| {
1192                            f.parse().map_err(|err| {
1193                                Status::invalid_argument(format!("invalid execution ID: {err}"))
1194                            })
1195                        })
1196                        .collect::<Result<_, _>>()?,
1197                )
1198            },
1199            schedule_ids: if filter.schedule_ids.is_empty() {
1200                None
1201            } else {
1202                Some(
1203                    filter
1204                        .schedule_ids
1205                        .iter()
1206                        .map(|f| {
1207                            f.parse().map_err(|err| {
1208                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1209                            })
1210                        })
1211                        .collect::<Result<_, _>>()?,
1212                )
1213            },
1214            labels: if filter.labels.is_empty() {
1215                None
1216            } else {
1217                Some(
1218                    filter
1219                        .labels
1220                        .into_iter()
1221                        .filter_map(|label| {
1222                            Some((
1223                                label.key,
1224                                match label.value? {
1225                                    server::v1::job_label_filter::Value::Exists(_) => {
1226                                        JobLabelFilterValue::Exists
1227                                    }
1228                                    server::v1::job_label_filter::Value::Equals(value) => {
1229                                        JobLabelFilterValue::Equals(value)
1230                                    }
1231                                },
1232                            ))
1233                        })
1234                        .collect(),
1235                )
1236            },
1237            active: filter.active,
1238            created_after: filter
1239                .created_at
1240                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1241            created_before: filter
1242                .created_at
1243                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1244            target_execution_time_after: filter
1245                .target_execution_time
1246                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1247            target_execution_time_before: filter
1248                .target_execution_time
1249                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1250        })
1251    }
1252}
1253
1254impl From<ScheduleDetails> for server::v1::Schedule {
1255    fn from(value: ScheduleDetails) -> Self {
1256        Self {
1257            id: value.id.to_string(),
1258            definition: Some(common::v1::ScheduleDefinition {
1259                job_timing_policy: Some(match value.job_timing_policy {
1260                    ScheduleJobTimingPolicy::Repeat(policy) => {
1261                        common::v1::ScheduleJobTimingPolicy {
1262                            job_timing: Some(schedule_job_timing_policy::JobTiming::Repeat(
1263                                common::v1::ScheduleJobTimingPolicyRepeat {
1264                                    interval: policy.interval.try_into().ok(),
1265                                    immediate: policy.immediate,
1266                                    missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1267                                        policy.missed_policy,
1268                                    )
1269                                    .into(),
1270                                },
1271                            )),
1272                        }
1273                    }
1274                    ScheduleJobTimingPolicy::Cron(policy) => common::v1::ScheduleJobTimingPolicy {
1275                        job_timing: Some(schedule_job_timing_policy::JobTiming::Cron(
1276                            common::v1::ScheduleJobTimingPolicyCron {
1277                                cron_expression: policy.cron_expression,
1278                                immediate: policy.immediate,
1279                                missed_time_policy: common::v1::ScheduleMissedTimePolicy::from(
1280                                    policy.missed_policy,
1281                                )
1282                                .into(),
1283                            },
1284                        )),
1285                    },
1286                }),
1287                job_creation_policy: Some(common::v1::ScheduleJobCreationPolicy {
1288                    job_creation: Some(match value.job_creation_policy {
1289                        ScheduleJobCreationPolicy::JobDefinition(job_definition) => {
1290                            JobCreation::JobDefinition(common::v1::JobDefinition {
1291                                job_type_id: job_definition.job_type_id,
1292                                input_payload_json: job_definition.input_payload_json,
1293                                target_execution_time: None,
1294                                retry_policy: Some(common::v1::JobRetryPolicy {
1295                                    retries: job_definition.retry_policy.retries,
1296                                }),
1297                                timeout_policy: Some(common::v1::JobTimeoutPolicy {
1298                                    timeout: job_definition
1299                                        .timeout_policy
1300                                        .timeout
1301                                        .and_then(|d| d.try_into().ok()),
1302                                    base_time: match job_definition.timeout_policy.base_time {
1303                                        JobTimeoutBaseTime::StartTime => {
1304                                            common::v1::JobTimeoutBaseTime::StartTime.into()
1305                                        }
1306                                        JobTimeoutBaseTime::TargetExecutionTime => {
1307                                            common::v1::JobTimeoutBaseTime::TargetExecutionTime
1308                                                .into()
1309                                        }
1310                                    },
1311                                }),
1312                                labels: job_definition
1313                                    .labels
1314                                    .into_iter()
1315                                    .map(|(key, value)| common::v1::JobLabel { key, value })
1316                                    .collect(),
1317                                metadata_json: None,
1318                            })
1319                        }
1320                    }),
1321                }),
1322                time_range: value.time_range.map(|range| TimeRange {
1323                    start: range.start.map(Into::into),
1324                    end: range.end.map(Into::into),
1325                }),
1326                labels: value
1327                    .labels
1328                    .into_iter()
1329                    .map(|(key, value)| common::v1::ScheduleLabel { key, value })
1330                    .collect(),
1331                metadata_json: value.metadata_json,
1332            }),
1333            created_at: Some(value.created_at.into()),
1334            active: value.active,
1335            cancelled: value.cancelled,
1336        }
1337    }
1338}
1339
1340impl From<server::v1::ScheduleQueryOrder> for ScheduleQueryOrder {
1341    fn from(value: server::v1::ScheduleQueryOrder) -> Self {
1342        match value {
1343            server::v1::ScheduleQueryOrder::CreatedAtAsc => Self::CreatedAtAsc,
1344            server::v1::ScheduleQueryOrder::CreatedAtDesc
1345            | server::v1::ScheduleQueryOrder::Unspecified => Self::CreatedAtDesc,
1346        }
1347    }
1348}
1349
1350impl TryFrom<server::v1::ScheduleQueryFilter> for ScheduleQueryFilters {
1351    type Error = Status;
1352
1353    fn try_from(value: server::v1::ScheduleQueryFilter) -> Result<Self, Self::Error> {
1354        Ok(Self {
1355            schedule_ids: if value.schedule_ids.is_empty() {
1356                None
1357            } else {
1358                Some(
1359                    value
1360                        .schedule_ids
1361                        .iter()
1362                        .map(|f| {
1363                            f.parse().map_err(|err| {
1364                                Status::invalid_argument(format!("invalid schedule ID: {err}"))
1365                            })
1366                        })
1367                        .collect::<Result<_, _>>()?,
1368                )
1369            },
1370            job_ids: if value.job_ids.is_empty() {
1371                None
1372            } else {
1373                Some(
1374                    value
1375                        .job_ids
1376                        .iter()
1377                        .map(|f| {
1378                            f.parse().map_err(|err| {
1379                                Status::invalid_argument(format!("invalid job ID: {err}"))
1380                            })
1381                        })
1382                        .collect::<Result<_, _>>()?,
1383                )
1384            },
1385            job_type_ids: if value.job_type_ids.is_empty() {
1386                None
1387            } else {
1388                Some(value.job_type_ids.into_iter().collect())
1389            },
1390            labels: if value.labels.is_empty() {
1391                None
1392            } else {
1393                Some(
1394                    value
1395                        .labels
1396                        .into_iter()
1397                        .filter_map(|label| {
1398                            Some((
1399                                label.key,
1400                                match label.value? {
1401                                    server::v1::schedule_label_filter::Value::Exists(_) => {
1402                                        ScheduleLabelFilterValue::Exists
1403                                    }
1404                                    server::v1::schedule_label_filter::Value::Equals(value) => {
1405                                        ScheduleLabelFilterValue::Equals(value)
1406                                    }
1407                                },
1408                            ))
1409                        })
1410                        .collect(),
1411                )
1412            },
1413            active: value.active,
1414            created_after: value
1415                .created_at
1416                .and_then(|c| c.start.and_then(|t| t.try_into().ok())),
1417            created_before: value
1418                .created_at
1419                .and_then(|c| c.end.and_then(|t| t.try_into().ok())),
1420        })
1421    }
1422}
1423
1424impl From<JobType> for snapshot::v1::ExportedJobType {
1425    fn from(job_type: JobType) -> Self {
1426        snapshot::v1::ExportedJobType {
1427            job_type: Some(common::v1::JobType {
1428                id: job_type.id,
1429                name: Some(job_type.name),
1430                description: Some(job_type.description),
1431                input_schema_json: job_type.input_schema_json,
1432                output_schema_json: job_type.output_schema_json,
1433            }),
1434        }
1435    }
1436}
1437
1438impl From<snapshot::v1::ExportedJobType> for JobType {
1439    fn from(job_type: snapshot::v1::ExportedJobType) -> Self {
1440        let job_type = job_type.job_type.unwrap();
1441
1442        Self {
1443            id: job_type.id,
1444            name: job_type.name.unwrap_or_default(),
1445            description: job_type.description.unwrap_or_default(),
1446            input_schema_json: job_type.input_schema_json,
1447            output_schema_json: job_type.output_schema_json,
1448        }
1449    }
1450}