1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//! Common storage and timer backend interfaces for the Ora server.
use std::future::Future;
use futures::Stream;
use crate::{
common::NextPageToken,
executions::{
ExecutionId, FailedExecution, InProgressExecution, ReadyExecution, RetriedExecution,
StartedExecution, SucceededExecution,
},
jobs::{AddedJobs, CancelledJob, JobDetails, JobFilters, JobOrderBy, JobType, NewJob},
schedules::{
AddedSchedules, PendingSchedule, ScheduleDefinition, ScheduleDetails, ScheduleFilters,
ScheduleOrderBy, StoppedSchedule,
},
};
pub mod common;
pub mod executions;
pub mod executors;
pub mod jobs;
pub mod schedules;
#[cfg(feature = "test")]
pub mod test;
/// Backend implementation for the Ora server.
///
/// A backend is responsible for all storage operations as well
/// as timing of executions.
pub trait Backend: Send + Sync + 'static {
/// The error type for backend operations.
type Error: core::error::Error + Send + Sync + 'static;
/// Add or update job types.
fn add_job_types(
&self,
job_types: &[JobType],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// List all job types.
fn list_job_types(&self) -> impl Future<Output = Result<Vec<JobType>, Self::Error>> + Send;
/// Add new jobs and return their IDs in the same order.
///
/// For each new job an execution must also be created.
///
/// If `if_not_exists` is provided and any jobs exist
/// matching the given filters, no new jobs must be added
/// and an empty list must be returned.
fn add_jobs(
&self,
jobs: &[NewJob],
if_not_exists: Option<JobFilters>,
) -> impl Future<Output = Result<AddedJobs, Self::Error>> + Send;
/// List jobs matching the given filters.
fn list_jobs(
&self,
filters: JobFilters,
order_by: Option<JobOrderBy>,
page_size: u32,
page_token: Option<NextPageToken>,
) -> impl Future<Output = Result<(Vec<JobDetails>, Option<NextPageToken>), Self::Error>> + Send;
/// Count the jobs matching the given filters.
fn count_jobs(
&self,
filters: JobFilters,
) -> impl Future<Output = Result<u64, Self::Error>> + Send;
/// Cancel the jobs matching the given filters, returning
/// the cancelled jobs.
fn cancel_jobs(
&self,
filters: JobFilters,
) -> impl Future<Output = Result<Vec<CancelledJob>, Self::Error>> + Send;
/// Add new schedules and return their IDs in the same order.
///
/// If `if_not_exists` is provided and any schedules exist
/// matching the given filters, no new schedules must be added
/// and an empty list must be returned.
fn add_schedules(
&self,
schedules: &[ScheduleDefinition],
if_not_exists: Option<ScheduleFilters>,
) -> impl Future<Output = Result<AddedSchedules, Self::Error>> + Send;
/// List schedules matching the given filters.
fn list_schedules(
&self,
filters: ScheduleFilters,
order_by: Option<ScheduleOrderBy>,
page_size: u32,
page_token: Option<NextPageToken>,
) -> impl Future<Output = Result<(Vec<ScheduleDetails>, Option<NextPageToken>), Self::Error>> + Send;
/// Count the schedules matching the given filters.
fn count_schedules(
&self,
filters: ScheduleFilters,
) -> impl Future<Output = Result<u64, Self::Error>> + Send;
/// Stop the schedules matching the given filters, returning
/// the IDs of the stopped schedules.
fn stop_schedules(
&self,
filters: ScheduleFilters,
) -> impl Future<Output = Result<Vec<StoppedSchedule>, Self::Error>> + Send;
/// Return ready executions.
///
/// The executions must be ordered by their ID.
///
/// The batch size of ready executions returned by the stream
/// is backend-dependent.
fn ready_executions(
&self,
) -> impl Stream<Item = Result<Vec<ReadyExecution>, Self::Error>> + Send;
/// Wait for executions to be ready.
///
/// This function should ignore the provided execution IDs.
fn wait_for_ready_executions(
&self,
ignore: &[ExecutionId],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// Return a stream of in-progress executions.
///
/// These are executions that have been started by executors
/// but have not yet completed, failed, or been cancelled.
fn in_progress_executions(
&self,
) -> impl Stream<Item = Result<Vec<InProgressExecution>, Self::Error>> + Send;
/// The given executions have started.
///
/// The backend must update the execution records accordingly.
fn executions_started(
&self,
executions: &[StartedExecution],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// The given executions have successfully completed.
fn executions_succeeded(
&self,
executions: &[SucceededExecution],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// The given executions have failed.
fn executions_failed(
&self,
executions: &[FailedExecution],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// The given executions have failed but need to be retried.
///
/// For each job, a new execution must be created.
fn executions_retried(
&self,
executions: &[RetriedExecution],
) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// Return a stream of active schedules that do not have
/// an active job.
fn pending_schedules(
&self,
) -> impl Stream<Item = Result<Vec<PendingSchedule>, Self::Error>> + Send;
/// Wait for pending schedules to be available.
fn wait_for_pending_schedules(&self) -> impl Future<Output = Result<(), Self::Error>> + Send;
/// Delete historical data for inactive jobs and schedules.
///
/// The following should be deleted:
/// - Jobs with executions that finished before the given time.
/// - Schedules that were stopped before the given time.
/// - Delete job types that are not referenced by any jobs.
fn delete_history(
&self,
before: std::time::SystemTime,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
}