Skip to main content

ora_backend/
test.rs

1//! Tests for the Ora backend implementations.
2#![allow(clippy::missing_panics_doc, missing_docs)]
3
4use std::{
5    pin::pin,
6    time::{Duration, UNIX_EPOCH},
7};
8
9use futures::TryStreamExt;
10use uuid::Uuid;
11
12use crate::{
13    Backend,
14    common::{Label, TimeRange},
15    executions::{ExecutionStatus, FailedExecution, StartedExecution, SucceededExecution},
16    jobs::{JobDefinition, JobFilters, JobTypeId, NewJob, RetryPolicy, TimeoutPolicy},
17    schedules::{MissedTimePolicy, SchedulingPolicy},
18};
19
20/// Run basic smoke tests for the given backend.
21pub async fn smoke(backend: &impl Backend) {
22    job_execution(backend).await;
23    job_cancellation(backend).await;
24}
25
26/// Smoke tests for jobs and executions.
27async fn job_execution(backend: &impl Backend) {
28    let job_definitions = [
29        JobDefinition {
30            job_type_id: JobTypeId::new("DoSomething").unwrap(),
31            target_execution_time: std::time::SystemTime::now(),
32            input_payload_json: r#"{"task": "clean"}"#.to_string(),
33            labels: [Label {
34                key: "foo_id".to_string(),
35                value: "2".to_string(),
36            }]
37            .to_vec(),
38            timeout_policy: TimeoutPolicy {
39                timeout: Duration::from_secs(20),
40                base_time: crate::jobs::TimeoutBaseTime::StartTime,
41            },
42            retry_policy: RetryPolicy { retries: 0 },
43        },
44        JobDefinition {
45            job_type_id: JobTypeId::new("DoSomething2").unwrap(),
46            target_execution_time: std::time::SystemTime::now(),
47            input_payload_json: r#"{"task": "clean"}"#.to_string(),
48            labels: [
49                Label {
50                    key: "a_id".to_string(),
51                    value: "3".to_string(),
52                },
53                Label {
54                    key: "b_id".to_string(),
55                    value: "4".to_string(),
56                },
57            ]
58            .to_vec(),
59            timeout_policy: TimeoutPolicy {
60                timeout: Duration::from_secs(20),
61                base_time: crate::jobs::TimeoutBaseTime::TargetExecutionTime,
62            },
63            retry_policy: RetryPolicy { retries: 0 },
64        },
65    ]
66    .to_vec();
67    let result = backend
68        .add_jobs(&new_jobs(&job_definitions), None)
69        .await
70        .expect("Failed to add jobs");
71    assert_eq!(result.len(), job_definitions.len());
72
73    let jobs = backend
74        .list_jobs(crate::jobs::JobFilters::default(), None, 10, None)
75        .await
76        .expect("Failed to list jobs")
77        .0;
78
79    assert_eq!(jobs.len(), job_definitions.len());
80
81    for mut job in jobs {
82        let job_definition = job_definitions
83            .iter()
84            .find(|def| def.job_type_id == job.job.job_type_id)
85            .expect("Job definition not found");
86
87        job.job.labels.sort_by(|a, b| a.key.cmp(&b.key));
88
89        assert_eq!(
90            job.job.input_payload_json,
91            job_definition.input_payload_json
92        );
93        assert_eq!(job.job.labels, job_definition.labels);
94        assert_eq!(
95            job.job.timeout_policy.timeout,
96            job_definition.timeout_policy.timeout
97        );
98        assert_eq!(
99            job.job.retry_policy.retries,
100            job_definition.retry_policy.retries
101        );
102    }
103
104    let ready_executions = pin!(backend.ready_executions())
105        .try_next()
106        .await
107        .unwrap()
108        .unwrap();
109    assert_eq!(ready_executions.len(), job_definitions.len());
110    assert!(
111        ready_executions
112            .windows(2)
113            .all(|w| w[0].execution_id <= w[1].execution_id)
114    );
115
116    backend
117        .executions_started(
118            &ready_executions
119                .iter()
120                .map(|exec| StartedExecution {
121                    execution_id: exec.execution_id,
122                    executor_id: Uuid::nil().into(),
123                    started_at: std::time::SystemTime::now(),
124                })
125                .collect::<Vec<_>>(),
126        )
127        .await
128        .unwrap();
129
130    let ready_executions_2 = pin!(backend.ready_executions()).try_next().await.unwrap();
131    assert!(ready_executions_2.is_none());
132
133    let in_progress_executions = pin!(backend.in_progress_executions())
134        .try_next()
135        .await
136        .unwrap()
137        .unwrap();
138    assert_eq!(in_progress_executions.len(), job_definitions.len());
139
140    let exec1 = &in_progress_executions[0];
141    let exec2 = &in_progress_executions[1];
142
143    backend
144        .executions_succeeded(&[SucceededExecution {
145            execution_id: exec1.execution_id,
146            succeeded_at: std::time::SystemTime::now(),
147            output_json: r#"{"result": "done"}"#.to_string(),
148        }])
149        .await
150        .unwrap();
151
152    let in_progress_executions = pin!(backend.in_progress_executions())
153        .try_next()
154        .await
155        .unwrap()
156        .unwrap();
157    assert_eq!(in_progress_executions.len(), 1);
158
159    assert_eq!(in_progress_executions[0].execution_id, exec2.execution_id);
160
161    backend
162        .executions_retried(&[FailedExecution {
163            execution_id: exec2.execution_id,
164            job_id: exec2.job_id,
165            failed_at: std::time::SystemTime::now(),
166            failure_reason: "Temporary failure".to_string(),
167        }])
168        .await
169        .unwrap();
170
171    let ready_executions = pin!(backend.ready_executions())
172        .try_next()
173        .await
174        .unwrap()
175        .unwrap();
176    assert_eq!(ready_executions.len(), 1);
177
178    backend
179        .executions_failed(&[FailedExecution {
180            execution_id: ready_executions[0].execution_id,
181            job_id: ready_executions[0].job_id,
182            failed_at: std::time::SystemTime::now(),
183            failure_reason: "Permanent failure".to_string(),
184        }])
185        .await
186        .unwrap();
187
188    let in_progress_executions = pin!(backend.in_progress_executions())
189        .try_next()
190        .await
191        .unwrap();
192    assert!(in_progress_executions.is_none());
193
194    let ready_executions = pin!(backend.ready_executions()).try_next().await.unwrap();
195    assert!(ready_executions.is_none());
196}
197
198async fn job_cancellation(backend: &impl Backend) {
199    let job_definitions = [JobDefinition {
200        job_type_id: JobTypeId::new("ToBeCancelled").unwrap(),
201        target_execution_time: std::time::SystemTime::now(),
202        input_payload_json: r#"{"task": "clean"}"#.to_string(),
203        labels: vec![],
204        timeout_policy: TimeoutPolicy {
205            timeout: Duration::from_secs(20),
206            base_time: crate::jobs::TimeoutBaseTime::StartTime,
207        },
208        retry_policy: RetryPolicy { retries: 0 },
209    }]
210    .to_vec();
211    let result = backend
212        .add_jobs(&new_jobs(&job_definitions), None)
213        .await
214        .expect("Failed to add jobs");
215    assert_eq!(result.len(), job_definitions.len());
216
217    let ready_executions = pin!(backend.ready_executions())
218        .try_next()
219        .await
220        .unwrap()
221        .unwrap();
222    assert_eq!(ready_executions.len(), job_definitions.len());
223
224    let cancelled_jobs = backend.cancel_jobs(JobFilters::default()).await.unwrap();
225
226    assert_eq!(cancelled_jobs.len(), job_definitions.len());
227
228    let ready_executions = pin!(backend.ready_executions()).try_next().await.unwrap();
229    assert!(ready_executions.is_none());
230
231    let jobs = backend
232        .list_jobs(
233            JobFilters {
234                execution_statuses: Some(vec![ExecutionStatus::Cancelled]),
235                ..Default::default()
236            },
237            None,
238            10,
239            None,
240        )
241        .await
242        .unwrap()
243        .0;
244
245    assert_eq!(jobs.len(), 1);
246    assert_eq!(jobs[0].job.job_type_id, job_definitions[0].job_type_id);
247    assert_eq!(jobs[0].executions.len(), 1);
248    assert_eq!(jobs[0].executions[0].status, ExecutionStatus::Cancelled);
249
250    backend
251        .executions_succeeded(&[SucceededExecution {
252            execution_id: jobs[0].executions[0].id,
253            succeeded_at: std::time::SystemTime::now(),
254            output_json: r#"{"result": "should not happen"}"#.to_string(),
255        }])
256        .await
257        .unwrap();
258
259    let jobs = backend
260        .list_jobs(
261            JobFilters {
262                execution_statuses: Some(vec![ExecutionStatus::Cancelled]),
263                ..Default::default()
264            },
265            None,
266            10,
267            None,
268        )
269        .await
270        .unwrap()
271        .0;
272
273    assert_eq!(jobs.len(), 1);
274    assert_eq!(jobs[0].job.job_type_id, job_definitions[0].job_type_id);
275    assert_eq!(jobs[0].executions.len(), 1);
276    assert_eq!(jobs[0].executions[0].status, ExecutionStatus::Cancelled);
277}
278
279pub async fn job_queries(backend: &impl Backend) {
280    let job_definitions = [
281        JobDefinition {
282            job_type_id: JobTypeId::new("QueryJob1").unwrap(),
283            target_execution_time: std::time::SystemTime::now(),
284            input_payload_json: r#"{"task": "clean"}"#.to_string(),
285            labels: [Label {
286                key: "foo".to_string(),
287                value: "bar".to_string(),
288            }]
289            .to_vec(),
290            timeout_policy: TimeoutPolicy {
291                timeout: Duration::from_secs(20),
292                base_time: crate::jobs::TimeoutBaseTime::StartTime,
293            },
294            retry_policy: RetryPolicy { retries: 0 },
295        },
296        JobDefinition {
297            job_type_id: JobTypeId::new("QueryJob2").unwrap(),
298            target_execution_time: std::time::SystemTime::now(),
299            input_payload_json: r#"{"task": "clean"}"#.to_string(),
300            labels: [Label {
301                key: "foo".to_string(),
302                value: "bar".to_string(),
303            }]
304            .to_vec(),
305            timeout_policy: TimeoutPolicy {
306                timeout: Duration::from_secs(20),
307                base_time: crate::jobs::TimeoutBaseTime::StartTime,
308            },
309            retry_policy: RetryPolicy { retries: 0 },
310        },
311        JobDefinition {
312            job_type_id: JobTypeId::new("QueryJob2").unwrap(),
313            target_execution_time: UNIX_EPOCH + Duration::from_secs(5),
314            input_payload_json: r#"{"task": "build"}"#.to_string(),
315            labels: [
316                Label {
317                    key: "foo".to_string(),
318                    value: "baz".to_string(),
319                },
320                Label {
321                    key: "bar".to_string(),
322                    value: "stuff".to_string(),
323                },
324            ]
325            .to_vec(),
326            timeout_policy: TimeoutPolicy {
327                timeout: Duration::from_secs(30),
328                base_time: crate::jobs::TimeoutBaseTime::StartTime,
329            },
330            retry_policy: RetryPolicy { retries: 1 },
331        },
332    ]
333    .to_vec();
334    let result = backend
335        .add_jobs(&new_jobs(&job_definitions), None)
336        .await
337        .expect("Failed to add jobs");
338    assert_eq!(result.len(), job_definitions.len());
339
340    let jobs = backend
341        .list_jobs(crate::jobs::JobFilters::default(), None, 10, None)
342        .await
343        .expect("Failed to list jobs")
344        .0;
345
346    assert_eq!(jobs.len(), job_definitions.len());
347
348    let foo_bar_jobs = backend
349        .list_jobs(
350            JobFilters {
351                labels: Some(vec![crate::common::LabelFilter {
352                    key: "foo".to_string(),
353                    value: Some("bar".to_string()),
354                }]),
355                ..Default::default()
356            },
357            None,
358            10,
359            None,
360        )
361        .await
362        .expect("Failed to list jobs")
363        .0;
364
365    assert_eq!(foo_bar_jobs.len(), 2);
366
367    let foo_bar_jobs = backend
368        .list_jobs(
369            JobFilters {
370                labels: Some(vec![crate::common::LabelFilter {
371                    key: "foo".to_string(),
372                    value: Some("bar".to_string()),
373                }]),
374                job_type_ids: Some(vec![JobTypeId::new("QueryJob1").unwrap()]),
375                ..Default::default()
376            },
377            None,
378            10,
379            None,
380        )
381        .await
382        .expect("Failed to list jobs")
383        .0;
384
385    assert_eq!(foo_bar_jobs.len(), 1);
386
387    let foo_jobs = backend
388        .list_jobs(
389            JobFilters {
390                labels: Some(vec![crate::common::LabelFilter {
391                    key: "foo".to_string(),
392                    value: None,
393                }]),
394                ..Default::default()
395            },
396            None,
397            10,
398            None,
399        )
400        .await
401        .expect("Failed to list jobs")
402        .0;
403
404    assert_eq!(foo_jobs.len(), 3);
405
406    let foo_bar_jobs = backend
407        .list_jobs(
408            JobFilters {
409                labels: Some(vec![
410                    crate::common::LabelFilter {
411                        key: "foo".to_string(),
412                        value: None,
413                    },
414                    crate::common::LabelFilter {
415                        key: "foo".to_string(),
416                        value: Some("bar".to_string()),
417                    },
418                ]),
419                ..Default::default()
420            },
421            None,
422            10,
423            None,
424        )
425        .await
426        .expect("Failed to list jobs")
427        .0;
428
429    assert_eq!(foo_bar_jobs.len(), 2);
430
431    let jobs = backend
432        .list_jobs(
433            JobFilters {
434                target_execution_time: Some(TimeRange {
435                    start: Some(UNIX_EPOCH),
436                    end: Some(UNIX_EPOCH + Duration::from_secs(10)),
437                }),
438                ..Default::default()
439            },
440            None,
441            10,
442            None,
443        )
444        .await
445        .expect("Failed to list jobs")
446        .0;
447
448    assert_eq!(jobs.len(), 1);
449}
450
451pub async fn pagination_and_ordering(backend: &impl Backend) {
452    let job_definitions = (0..25)
453        .map(|i| JobDefinition {
454            job_type_id: JobTypeId::new(format!("Job{i}")).unwrap(),
455            target_execution_time: UNIX_EPOCH + Duration::from_secs(200) - Duration::from_secs(i),
456            input_payload_json: r#"{"task": "clean"}"#.to_string(),
457            labels: vec![],
458            timeout_policy: TimeoutPolicy {
459                timeout: Duration::from_secs(20),
460                base_time: crate::jobs::TimeoutBaseTime::StartTime,
461            },
462            retry_policy: RetryPolicy { retries: 0 },
463        })
464        .collect::<Vec<_>>();
465    let result = backend
466        .add_jobs(&new_jobs(&job_definitions), None)
467        .await
468        .expect("Failed to add jobs");
469    assert_eq!(result.len(), job_definitions.len());
470
471    {
472        let mut all_jobs = Vec::new();
473        let mut next_page_token = None;
474
475        loop {
476            let (jobs, page_token) = backend
477                .list_jobs(crate::jobs::JobFilters::default(), None, 2, next_page_token)
478                .await
479                .expect("Failed to list jobs");
480            assert!(jobs.len() <= 2);
481            all_jobs.extend(jobs);
482            if let Some(token) = page_token {
483                next_page_token = Some(token);
484            } else {
485                break;
486            }
487        }
488        assert_eq!(all_jobs.len(), job_definitions.len());
489    }
490
491    {
492        let mut all_jobs = Vec::new();
493        let mut next_page_token = None;
494
495        loop {
496            let (jobs, page_token) = backend
497                .list_jobs(
498                    crate::jobs::JobFilters::default(),
499                    Some(crate::jobs::JobOrderBy::CreatedAtAsc),
500                    2,
501                    next_page_token,
502                )
503                .await
504                .expect("Failed to list jobs");
505            assert!(jobs.len() <= 2);
506            all_jobs.extend(jobs);
507            if let Some(token) = page_token {
508                next_page_token = Some(token);
509            } else {
510                break;
511            }
512        }
513        assert_eq!(all_jobs.len(), job_definitions.len());
514
515        let reference_jobs = job_definitions.clone();
516        for (i, job) in all_jobs.iter().enumerate() {
517            assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
518        }
519    }
520
521    {
522        let mut all_jobs = Vec::new();
523        let mut next_page_token = None;
524
525        loop {
526            let (jobs, page_token) = backend
527                .list_jobs(
528                    crate::jobs::JobFilters::default(),
529                    Some(crate::jobs::JobOrderBy::CreatedAtDesc),
530                    2,
531                    next_page_token,
532                )
533                .await
534                .expect("Failed to list jobs");
535            assert!(jobs.len() <= 2);
536            all_jobs.extend(jobs);
537            if let Some(token) = page_token {
538                next_page_token = Some(token);
539            } else {
540                break;
541            }
542        }
543        assert_eq!(all_jobs.len(), job_definitions.len());
544
545        let mut reference_jobs = job_definitions.clone();
546        reference_jobs.reverse();
547        for (i, job) in all_jobs.iter().enumerate() {
548            assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
549        }
550    }
551
552    {
553        let mut all_jobs = Vec::new();
554        let mut next_page_token = None;
555
556        loop {
557            let (jobs, page_token) = backend
558                .list_jobs(
559                    crate::jobs::JobFilters::default(),
560                    Some(crate::jobs::JobOrderBy::TargetExecutionTimeDesc),
561                    2,
562                    next_page_token,
563                )
564                .await
565                .expect("Failed to list jobs");
566            assert!(jobs.len() <= 2);
567            all_jobs.extend(jobs);
568            if let Some(token) = page_token {
569                next_page_token = Some(token);
570            } else {
571                break;
572            }
573        }
574        assert_eq!(all_jobs.len(), job_definitions.len());
575
576        let reference_jobs = job_definitions.clone();
577        for (i, job) in all_jobs.iter().enumerate() {
578            assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
579        }
580    }
581}
582
583pub async fn schedules(backend: &impl Backend) {
584    use crate::schedules::{ScheduleDefinition, ScheduleFilters};
585
586    let schedule_definitions = [ScheduleDefinition {
587        job_template: JobDefinition {
588            job_type_id: JobTypeId::new("ScheduledJob1").unwrap(),
589            target_execution_time: std::time::SystemTime::now(),
590            input_payload_json: r#"{"task": "clean"}"#.to_string(),
591            labels: vec![],
592            timeout_policy: TimeoutPolicy {
593                timeout: Duration::from_secs(20),
594                base_time: crate::jobs::TimeoutBaseTime::StartTime,
595            },
596            retry_policy: RetryPolicy { retries: 0 },
597        },
598        scheduling: SchedulingPolicy::FixedInterval {
599            interval: Duration::from_secs(1),
600            immediate: true,
601            missed: MissedTimePolicy::Skip,
602        },
603        labels: vec![],
604        time_range: TimeRange::default(),
605    }]
606    .to_vec();
607
608    let result = backend
609        .add_schedules(&schedule_definitions, None)
610        .await
611        .expect("Failed to add schedules");
612    assert_eq!(result.len(), schedule_definitions.len());
613
614    let schedules = backend
615        .list_schedules(ScheduleFilters::default(), None, 10, None)
616        .await
617        .expect("Failed to list schedules")
618        .0;
619
620    assert_eq!(schedules.len(), schedule_definitions.len());
621
622    let pending_schedules = pin!(backend.pending_schedules())
623        .try_next()
624        .await
625        .unwrap()
626        .unwrap();
627    assert_eq!(pending_schedules.len(), schedule_definitions.len());
628
629    for schedule in pending_schedules {
630        backend
631            .add_jobs(
632                &[NewJob {
633                    job: schedule.job_template.clone(),
634                    schedule_id: Some(schedule.schedule_id),
635                }],
636                None,
637            )
638            .await
639            .unwrap();
640    }
641
642    let pending_schedules = pin!(backend.pending_schedules()).try_next().await.unwrap();
643    assert!(pending_schedules.is_none());
644}
645
646fn new_jobs(jobs: &[JobDefinition]) -> Vec<NewJob> {
647    jobs.iter()
648        .map(|j| NewJob {
649            job: j.clone(),
650            schedule_id: None,
651        })
652        .collect()
653}