Skip to main content

ora_backend/
lib.rs

1//! Common storage and timer backend interfaces for the Ora server.
2
3use std::future::Future;
4
5use futures::Stream;
6
7use crate::{
8    common::NextPageToken,
9    executions::{
10        ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, StartedExecution,
11        SucceededExecution,
12    },
13    jobs::{CancelledJob, JobDetails, JobFilters, JobId, JobOrderBy, JobType, NewJob},
14    schedules::{
15        PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters, ScheduleId,
16        ScheduleOrderBy, StoppedSchedule,
17    },
18};
19
20pub mod common;
21pub mod executions;
22pub mod executors;
23pub mod jobs;
24pub mod schedules;
25
26#[cfg(feature = "test")]
27pub mod test;
28
29/// Backend implementation for the Ora server.
30///
31/// A backend is responsible for all storage operations as well
32/// as timing of executions.
33pub trait Backend: Send + Sync + 'static {
34    /// The error type for backend operations.
35    type Error: core::error::Error + Send + Sync + 'static;
36
37    /// Add or update job types.
38    fn add_job_types(
39        &self,
40        job_types: &[JobType],
41    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
42    /// List all job types.
43    fn list_job_types(&self) -> impl Future<Output = Result<Vec<JobType>, Self::Error>> + Send;
44
45    /// Add new jobs and return their IDs in the same order.
46    ///
47    /// For each new job an execution must also be created.
48    ///
49    /// If `if_not_exists` is provided and any jobs exist
50    /// matching the given filters, no new jobs must be added
51    /// and an empty list must be returned.
52    fn add_jobs(
53        &self,
54        jobs: &[NewJob],
55        if_not_exists: Option<JobFilters>,
56    ) -> impl Future<Output = Result<Vec<JobId>, Self::Error>> + Send;
57
58    /// List jobs matching the given filters.
59    fn list_jobs(
60        &self,
61        filters: JobFilters,
62        order_by: Option<JobOrderBy>,
63        page_size: u32,
64        page_token: Option<NextPageToken>,
65    ) -> impl Future<Output = Result<(Vec<JobDetails>, Option<NextPageToken>), Self::Error>> + Send;
66
67    /// Count the jobs matching the given filters.
68    fn count_jobs(
69        &self,
70        filters: JobFilters,
71    ) -> impl Future<Output = Result<u64, Self::Error>> + Send;
72
73    /// Cancel the jobs matching the given filters, returning
74    /// the cancelled jobs.
75    fn cancel_jobs(
76        &self,
77        filters: JobFilters,
78    ) -> impl Future<Output = Result<Vec<CancelledJob>, Self::Error>> + Send;
79
80    /// Add new schedules and return their IDs in the same order.
81    ///
82    /// If `if_not_exists` is provided and any schedules exist
83    /// matching the given filters, no new schedules must be added
84    /// and an empty list must be returned.
85    fn add_schedules(
86        &self,
87        schedules: &[ScheduleDefinition],
88        if_not_exists: Option<ScheduleFilters>,
89    ) -> impl Future<Output = Result<Vec<ScheduleId>, Self::Error>> + Send;
90
91    /// List schedules matching the given filters.
92    fn list_schedules(
93        &self,
94        filters: ScheduleFilters,
95        order_by: Option<ScheduleOrderBy>,
96        page_size: u32,
97        page_token: Option<NextPageToken>,
98    ) -> impl Future<Output = Result<(Vec<ScheduleDetails>, Option<NextPageToken>), Self::Error>> + Send;
99
100    /// Count the schedules matching the given filters.
101    fn count_schedules(
102        &self,
103        filters: ScheduleFilters,
104    ) -> impl Future<Output = Result<u64, Self::Error>> + Send;
105
106    /// Stop the schedules matching the given filters, returning
107    /// the IDs of the stopped schedules.
108    fn stop_schedules(
109        &self,
110        filters: ScheduleFilters,
111    ) -> impl Future<Output = Result<Vec<StoppedSchedule>, Self::Error>> + Send;
112
113    /// Return ready executions.
114    ///
115    /// The executions must be ordered by their ID.
116    ///
117    /// The batch size of ready executions returned by the stream
118    /// is backend-dependent.
119    fn ready_executions(
120        &self,
121    ) -> impl Stream<Item = Result<Vec<ReadyExecution>, Self::Error>> + Send;
122
123    /// Wait for executions to be ready.
124    ///
125    /// This function should ignore the provided execution IDs.
126    fn wait_for_ready_executions(
127        &self,
128        ignore: &[ExecutionId],
129    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
130
131    /// Return a stream of in-progress executions.
132    ///
133    /// These are executions that have been started by executors
134    /// but have not yet completed, failed, or been cancelled.
135    fn in_progress_executions(
136        &self,
137    ) -> impl Stream<Item = Result<Vec<InProgressExecution>, Self::Error>> + Send;
138
139    /// The given executions have started.
140    ///
141    /// The backend must update the execution records accordingly.
142    fn executions_started(
143        &self,
144        executions: &[StartedExecution],
145    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
146
147    /// The given executions have successfully completed.
148    fn executions_succeeded(
149        &self,
150        executions: &[SucceededExecution],
151    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
152
153    /// The given executions have failed.
154    fn executions_failed(
155        &self,
156        executions: &[FailedExecution],
157    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
158
159    /// The given executions have failed but need to be retried.
160    ///
161    /// For each job, a new execution must be created.
162    fn executions_retried(
163        &self,
164        executions: &[FailedExecution],
165    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
166
167    /// Return a stream of active schedules that do not have
168    /// an active job.
169    fn pending_schedules(
170        &self,
171    ) -> impl Stream<Item = Result<Vec<PendingSchedule>, Self::Error>> + Send;
172
173    /// Wait for pending schedules to be available.
174    fn wait_for_pending_schedules(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
175
176    /// Delete historical data for inactive jobs and schedules.
177    ///
178    /// The following should be deleted:
179    /// - Jobs with executions that finished before the given time.
180    /// - Schedules that were stopped before the given time.
181    /// - Delete job types that are not referenced by any jobs.
182    fn delete_history(
183        &self,
184        before: std::time::SystemTime,
185    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
186}