1#![allow(clippy::missing_panics_doc, missing_docs)]
3
4use std::{
5 pin::pin,
6 time::{Duration, UNIX_EPOCH},
7};
8
9use futures::TryStreamExt;
10use uuid::Uuid;
11
12use crate::{
13 Backend,
14 common::{Label, TimeRange},
15 executions::{ExecutionStatus, FailedExecution, StartedExecution, SucceededExecution},
16 jobs::{JobDefinition, JobFilters, JobTypeId, NewJob, RetryPolicy, TimeoutPolicy},
17 schedules::{MissedTimePolicy, SchedulingPolicy},
18};
19
20pub async fn smoke(backend: &impl Backend) {
22 job_execution(backend).await;
23 job_cancellation(backend).await;
24}
25
26async fn job_execution(backend: &impl Backend) {
28 let job_definitions = [
29 JobDefinition {
30 job_type_id: JobTypeId::new("DoSomething").unwrap(),
31 target_execution_time: std::time::SystemTime::now(),
32 input_payload_json: r#"{"task": "clean"}"#.to_string(),
33 labels: [Label {
34 key: "foo_id".to_string(),
35 value: "2".to_string(),
36 }]
37 .to_vec(),
38 timeout_policy: TimeoutPolicy {
39 timeout: Duration::from_secs(20),
40 base_time: crate::jobs::TimeoutBaseTime::StartTime,
41 },
42 retry_policy: RetryPolicy { retries: 0 },
43 },
44 JobDefinition {
45 job_type_id: JobTypeId::new("DoSomething2").unwrap(),
46 target_execution_time: std::time::SystemTime::now(),
47 input_payload_json: r#"{"task": "clean"}"#.to_string(),
48 labels: [
49 Label {
50 key: "a_id".to_string(),
51 value: "3".to_string(),
52 },
53 Label {
54 key: "b_id".to_string(),
55 value: "4".to_string(),
56 },
57 ]
58 .to_vec(),
59 timeout_policy: TimeoutPolicy {
60 timeout: Duration::from_secs(20),
61 base_time: crate::jobs::TimeoutBaseTime::TargetExecutionTime,
62 },
63 retry_policy: RetryPolicy { retries: 0 },
64 },
65 ]
66 .to_vec();
67 let result = backend
68 .add_jobs(&new_jobs(&job_definitions), None)
69 .await
70 .expect("Failed to add jobs");
71 assert_eq!(result.len(), job_definitions.len());
72
73 let jobs = backend
74 .list_jobs(crate::jobs::JobFilters::default(), None, 10, None)
75 .await
76 .expect("Failed to list jobs")
77 .0;
78
79 assert_eq!(jobs.len(), job_definitions.len());
80
81 for mut job in jobs {
82 let job_definition = job_definitions
83 .iter()
84 .find(|def| def.job_type_id == job.job.job_type_id)
85 .expect("Job definition not found");
86
87 job.job.labels.sort_by(|a, b| a.key.cmp(&b.key));
88
89 assert_eq!(
90 job.job.input_payload_json,
91 job_definition.input_payload_json
92 );
93 assert_eq!(job.job.labels, job_definition.labels);
94 assert_eq!(
95 job.job.timeout_policy.timeout,
96 job_definition.timeout_policy.timeout
97 );
98 assert_eq!(
99 job.job.retry_policy.retries,
100 job_definition.retry_policy.retries
101 );
102 }
103
104 let ready_executions = pin!(backend.ready_executions())
105 .try_next()
106 .await
107 .unwrap()
108 .unwrap();
109 assert_eq!(ready_executions.len(), job_definitions.len());
110 assert!(
111 ready_executions
112 .windows(2)
113 .all(|w| w[0].execution_id <= w[1].execution_id)
114 );
115
116 backend
117 .executions_started(
118 &ready_executions
119 .iter()
120 .map(|exec| StartedExecution {
121 execution_id: exec.execution_id,
122 executor_id: Uuid::nil().into(),
123 started_at: std::time::SystemTime::now(),
124 })
125 .collect::<Vec<_>>(),
126 )
127 .await
128 .unwrap();
129
130 let ready_executions_2 = pin!(backend.ready_executions()).try_next().await.unwrap();
131 assert!(ready_executions_2.is_none());
132
133 let in_progress_executions = pin!(backend.in_progress_executions())
134 .try_next()
135 .await
136 .unwrap()
137 .unwrap();
138 assert_eq!(in_progress_executions.len(), job_definitions.len());
139
140 let exec1 = &in_progress_executions[0];
141 let exec2 = &in_progress_executions[1];
142
143 backend
144 .executions_succeeded(&[SucceededExecution {
145 execution_id: exec1.execution_id,
146 succeeded_at: std::time::SystemTime::now(),
147 output_json: r#"{"result": "done"}"#.to_string(),
148 }])
149 .await
150 .unwrap();
151
152 let in_progress_executions = pin!(backend.in_progress_executions())
153 .try_next()
154 .await
155 .unwrap()
156 .unwrap();
157 assert_eq!(in_progress_executions.len(), 1);
158
159 assert_eq!(in_progress_executions[0].execution_id, exec2.execution_id);
160
161 backend
162 .executions_retried(&[FailedExecution {
163 execution_id: exec2.execution_id,
164 job_id: exec2.job_id,
165 failed_at: std::time::SystemTime::now(),
166 failure_reason: "Temporary failure".to_string(),
167 }])
168 .await
169 .unwrap();
170
171 let ready_executions = pin!(backend.ready_executions())
172 .try_next()
173 .await
174 .unwrap()
175 .unwrap();
176 assert_eq!(ready_executions.len(), 1);
177
178 backend
179 .executions_failed(&[FailedExecution {
180 execution_id: ready_executions[0].execution_id,
181 job_id: ready_executions[0].job_id,
182 failed_at: std::time::SystemTime::now(),
183 failure_reason: "Permanent failure".to_string(),
184 }])
185 .await
186 .unwrap();
187
188 let in_progress_executions = pin!(backend.in_progress_executions())
189 .try_next()
190 .await
191 .unwrap();
192 assert!(in_progress_executions.is_none());
193
194 let ready_executions = pin!(backend.ready_executions()).try_next().await.unwrap();
195 assert!(ready_executions.is_none());
196}
197
198async fn job_cancellation(backend: &impl Backend) {
199 let job_definitions = [JobDefinition {
200 job_type_id: JobTypeId::new("ToBeCancelled").unwrap(),
201 target_execution_time: std::time::SystemTime::now(),
202 input_payload_json: r#"{"task": "clean"}"#.to_string(),
203 labels: vec![],
204 timeout_policy: TimeoutPolicy {
205 timeout: Duration::from_secs(20),
206 base_time: crate::jobs::TimeoutBaseTime::StartTime,
207 },
208 retry_policy: RetryPolicy { retries: 0 },
209 }]
210 .to_vec();
211 let result = backend
212 .add_jobs(&new_jobs(&job_definitions), None)
213 .await
214 .expect("Failed to add jobs");
215 assert_eq!(result.len(), job_definitions.len());
216
217 let ready_executions = pin!(backend.ready_executions())
218 .try_next()
219 .await
220 .unwrap()
221 .unwrap();
222 assert_eq!(ready_executions.len(), job_definitions.len());
223
224 let cancelled_jobs = backend.cancel_jobs(JobFilters::default()).await.unwrap();
225
226 assert_eq!(cancelled_jobs.len(), job_definitions.len());
227
228 let ready_executions = pin!(backend.ready_executions()).try_next().await.unwrap();
229 assert!(ready_executions.is_none());
230
231 let jobs = backend
232 .list_jobs(
233 JobFilters {
234 execution_statuses: Some(vec![ExecutionStatus::Cancelled]),
235 ..Default::default()
236 },
237 None,
238 10,
239 None,
240 )
241 .await
242 .unwrap()
243 .0;
244
245 assert_eq!(jobs.len(), 1);
246 assert_eq!(jobs[0].job.job_type_id, job_definitions[0].job_type_id);
247 assert_eq!(jobs[0].executions.len(), 1);
248 assert_eq!(jobs[0].executions[0].status, ExecutionStatus::Cancelled);
249
250 backend
251 .executions_succeeded(&[SucceededExecution {
252 execution_id: jobs[0].executions[0].id,
253 succeeded_at: std::time::SystemTime::now(),
254 output_json: r#"{"result": "should not happen"}"#.to_string(),
255 }])
256 .await
257 .unwrap();
258
259 let jobs = backend
260 .list_jobs(
261 JobFilters {
262 execution_statuses: Some(vec![ExecutionStatus::Cancelled]),
263 ..Default::default()
264 },
265 None,
266 10,
267 None,
268 )
269 .await
270 .unwrap()
271 .0;
272
273 assert_eq!(jobs.len(), 1);
274 assert_eq!(jobs[0].job.job_type_id, job_definitions[0].job_type_id);
275 assert_eq!(jobs[0].executions.len(), 1);
276 assert_eq!(jobs[0].executions[0].status, ExecutionStatus::Cancelled);
277}
278
279pub async fn job_queries(backend: &impl Backend) {
280 let job_definitions = [
281 JobDefinition {
282 job_type_id: JobTypeId::new("QueryJob1").unwrap(),
283 target_execution_time: std::time::SystemTime::now(),
284 input_payload_json: r#"{"task": "clean"}"#.to_string(),
285 labels: [Label {
286 key: "foo".to_string(),
287 value: "bar".to_string(),
288 }]
289 .to_vec(),
290 timeout_policy: TimeoutPolicy {
291 timeout: Duration::from_secs(20),
292 base_time: crate::jobs::TimeoutBaseTime::StartTime,
293 },
294 retry_policy: RetryPolicy { retries: 0 },
295 },
296 JobDefinition {
297 job_type_id: JobTypeId::new("QueryJob2").unwrap(),
298 target_execution_time: std::time::SystemTime::now(),
299 input_payload_json: r#"{"task": "clean"}"#.to_string(),
300 labels: [Label {
301 key: "foo".to_string(),
302 value: "bar".to_string(),
303 }]
304 .to_vec(),
305 timeout_policy: TimeoutPolicy {
306 timeout: Duration::from_secs(20),
307 base_time: crate::jobs::TimeoutBaseTime::StartTime,
308 },
309 retry_policy: RetryPolicy { retries: 0 },
310 },
311 JobDefinition {
312 job_type_id: JobTypeId::new("QueryJob2").unwrap(),
313 target_execution_time: UNIX_EPOCH + Duration::from_secs(5),
314 input_payload_json: r#"{"task": "build"}"#.to_string(),
315 labels: [
316 Label {
317 key: "foo".to_string(),
318 value: "baz".to_string(),
319 },
320 Label {
321 key: "bar".to_string(),
322 value: "stuff".to_string(),
323 },
324 ]
325 .to_vec(),
326 timeout_policy: TimeoutPolicy {
327 timeout: Duration::from_secs(30),
328 base_time: crate::jobs::TimeoutBaseTime::StartTime,
329 },
330 retry_policy: RetryPolicy { retries: 1 },
331 },
332 ]
333 .to_vec();
334 let result = backend
335 .add_jobs(&new_jobs(&job_definitions), None)
336 .await
337 .expect("Failed to add jobs");
338 assert_eq!(result.len(), job_definitions.len());
339
340 let jobs = backend
341 .list_jobs(crate::jobs::JobFilters::default(), None, 10, None)
342 .await
343 .expect("Failed to list jobs")
344 .0;
345
346 assert_eq!(jobs.len(), job_definitions.len());
347
348 let foo_bar_jobs = backend
349 .list_jobs(
350 JobFilters {
351 labels: Some(vec![crate::common::LabelFilter {
352 key: "foo".to_string(),
353 value: Some("bar".to_string()),
354 }]),
355 ..Default::default()
356 },
357 None,
358 10,
359 None,
360 )
361 .await
362 .expect("Failed to list jobs")
363 .0;
364
365 assert_eq!(foo_bar_jobs.len(), 2);
366
367 let foo_bar_jobs = backend
368 .list_jobs(
369 JobFilters {
370 labels: Some(vec![crate::common::LabelFilter {
371 key: "foo".to_string(),
372 value: Some("bar".to_string()),
373 }]),
374 job_type_ids: Some(vec![JobTypeId::new("QueryJob1").unwrap()]),
375 ..Default::default()
376 },
377 None,
378 10,
379 None,
380 )
381 .await
382 .expect("Failed to list jobs")
383 .0;
384
385 assert_eq!(foo_bar_jobs.len(), 1);
386
387 let foo_jobs = backend
388 .list_jobs(
389 JobFilters {
390 labels: Some(vec![crate::common::LabelFilter {
391 key: "foo".to_string(),
392 value: None,
393 }]),
394 ..Default::default()
395 },
396 None,
397 10,
398 None,
399 )
400 .await
401 .expect("Failed to list jobs")
402 .0;
403
404 assert_eq!(foo_jobs.len(), 3);
405
406 let foo_bar_jobs = backend
407 .list_jobs(
408 JobFilters {
409 labels: Some(vec![
410 crate::common::LabelFilter {
411 key: "foo".to_string(),
412 value: None,
413 },
414 crate::common::LabelFilter {
415 key: "foo".to_string(),
416 value: Some("bar".to_string()),
417 },
418 ]),
419 ..Default::default()
420 },
421 None,
422 10,
423 None,
424 )
425 .await
426 .expect("Failed to list jobs")
427 .0;
428
429 assert_eq!(foo_bar_jobs.len(), 2);
430
431 let jobs = backend
432 .list_jobs(
433 JobFilters {
434 target_execution_time: Some(TimeRange {
435 start: Some(UNIX_EPOCH),
436 end: Some(UNIX_EPOCH + Duration::from_secs(10)),
437 }),
438 ..Default::default()
439 },
440 None,
441 10,
442 None,
443 )
444 .await
445 .expect("Failed to list jobs")
446 .0;
447
448 assert_eq!(jobs.len(), 1);
449}
450
451pub async fn pagination_and_ordering(backend: &impl Backend) {
452 let job_definitions = (0..25)
453 .map(|i| JobDefinition {
454 job_type_id: JobTypeId::new(format!("Job{i}")).unwrap(),
455 target_execution_time: UNIX_EPOCH + Duration::from_secs(200) - Duration::from_secs(i),
456 input_payload_json: r#"{"task": "clean"}"#.to_string(),
457 labels: vec![],
458 timeout_policy: TimeoutPolicy {
459 timeout: Duration::from_secs(20),
460 base_time: crate::jobs::TimeoutBaseTime::StartTime,
461 },
462 retry_policy: RetryPolicy { retries: 0 },
463 })
464 .collect::<Vec<_>>();
465 let result = backend
466 .add_jobs(&new_jobs(&job_definitions), None)
467 .await
468 .expect("Failed to add jobs");
469 assert_eq!(result.len(), job_definitions.len());
470
471 {
472 let mut all_jobs = Vec::new();
473 let mut next_page_token = None;
474
475 loop {
476 let (jobs, page_token) = backend
477 .list_jobs(crate::jobs::JobFilters::default(), None, 2, next_page_token)
478 .await
479 .expect("Failed to list jobs");
480 assert!(jobs.len() <= 2);
481 all_jobs.extend(jobs);
482 if let Some(token) = page_token {
483 next_page_token = Some(token);
484 } else {
485 break;
486 }
487 }
488 assert_eq!(all_jobs.len(), job_definitions.len());
489 }
490
491 {
492 let mut all_jobs = Vec::new();
493 let mut next_page_token = None;
494
495 loop {
496 let (jobs, page_token) = backend
497 .list_jobs(
498 crate::jobs::JobFilters::default(),
499 Some(crate::jobs::JobOrderBy::CreatedAtAsc),
500 2,
501 next_page_token,
502 )
503 .await
504 .expect("Failed to list jobs");
505 assert!(jobs.len() <= 2);
506 all_jobs.extend(jobs);
507 if let Some(token) = page_token {
508 next_page_token = Some(token);
509 } else {
510 break;
511 }
512 }
513 assert_eq!(all_jobs.len(), job_definitions.len());
514
515 let reference_jobs = job_definitions.clone();
516 for (i, job) in all_jobs.iter().enumerate() {
517 assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
518 }
519 }
520
521 {
522 let mut all_jobs = Vec::new();
523 let mut next_page_token = None;
524
525 loop {
526 let (jobs, page_token) = backend
527 .list_jobs(
528 crate::jobs::JobFilters::default(),
529 Some(crate::jobs::JobOrderBy::CreatedAtDesc),
530 2,
531 next_page_token,
532 )
533 .await
534 .expect("Failed to list jobs");
535 assert!(jobs.len() <= 2);
536 all_jobs.extend(jobs);
537 if let Some(token) = page_token {
538 next_page_token = Some(token);
539 } else {
540 break;
541 }
542 }
543 assert_eq!(all_jobs.len(), job_definitions.len());
544
545 let mut reference_jobs = job_definitions.clone();
546 reference_jobs.reverse();
547 for (i, job) in all_jobs.iter().enumerate() {
548 assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
549 }
550 }
551
552 {
553 let mut all_jobs = Vec::new();
554 let mut next_page_token = None;
555
556 loop {
557 let (jobs, page_token) = backend
558 .list_jobs(
559 crate::jobs::JobFilters::default(),
560 Some(crate::jobs::JobOrderBy::TargetExecutionTimeDesc),
561 2,
562 next_page_token,
563 )
564 .await
565 .expect("Failed to list jobs");
566 assert!(jobs.len() <= 2);
567 all_jobs.extend(jobs);
568 if let Some(token) = page_token {
569 next_page_token = Some(token);
570 } else {
571 break;
572 }
573 }
574 assert_eq!(all_jobs.len(), job_definitions.len());
575
576 let reference_jobs = job_definitions.clone();
577 for (i, job) in all_jobs.iter().enumerate() {
578 assert_eq!(job.job.job_type_id, reference_jobs[i].job_type_id);
579 }
580 }
581}
582
583pub async fn schedules(backend: &impl Backend) {
584 use crate::schedules::{ScheduleDefinition, ScheduleFilters};
585
586 let schedule_definitions = [ScheduleDefinition {
587 job_template: JobDefinition {
588 job_type_id: JobTypeId::new("ScheduledJob1").unwrap(),
589 target_execution_time: std::time::SystemTime::now(),
590 input_payload_json: r#"{"task": "clean"}"#.to_string(),
591 labels: vec![],
592 timeout_policy: TimeoutPolicy {
593 timeout: Duration::from_secs(20),
594 base_time: crate::jobs::TimeoutBaseTime::StartTime,
595 },
596 retry_policy: RetryPolicy { retries: 0 },
597 },
598 scheduling: SchedulingPolicy::FixedInterval {
599 interval: Duration::from_secs(1),
600 immediate: true,
601 missed: MissedTimePolicy::Skip,
602 },
603 labels: vec![],
604 time_range: TimeRange::default(),
605 }]
606 .to_vec();
607
608 let result = backend
609 .add_schedules(&schedule_definitions, None)
610 .await
611 .expect("Failed to add schedules");
612 assert_eq!(result.len(), schedule_definitions.len());
613
614 let schedules = backend
615 .list_schedules(ScheduleFilters::default(), None, 10, None)
616 .await
617 .expect("Failed to list schedules")
618 .0;
619
620 assert_eq!(schedules.len(), schedule_definitions.len());
621
622 let pending_schedules = pin!(backend.pending_schedules())
623 .try_next()
624 .await
625 .unwrap()
626 .unwrap();
627 assert_eq!(pending_schedules.len(), schedule_definitions.len());
628
629 for schedule in pending_schedules {
630 backend
631 .add_jobs(
632 &[NewJob {
633 job: schedule.job_template.clone(),
634 schedule_id: Some(schedule.schedule_id),
635 }],
636 None,
637 )
638 .await
639 .unwrap();
640 }
641
642 let pending_schedules = pin!(backend.pending_schedules()).try_next().await.unwrap();
643 assert!(pending_schedules.is_none());
644}
645
646fn new_jobs(jobs: &[JobDefinition]) -> Vec<NewJob> {
647 jobs.iter()
648 .map(|j| NewJob {
649 job: j.clone(),
650 schedule_id: None,
651 })
652 .collect()
653}