1use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use futures::future::BoxFuture;
6use futures::stream::{self, Stream, StreamExt};
7use futures::{FutureExt, TryStreamExt};
8use reqwest::StatusCode;
9use serde::{Deserialize, Serialize};
10use serde_with::DeserializeFromStr;
11use std::fmt;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
15use thiserror::Error;
16
17use crate::paginator::{PaginationError, Paginator};
18use crate::queryset::{QuerySet, QuerySetMember};
19use crate::tag::Tag;
20use crate::Lava;
21
22#[derive(
24 Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter, Display, EnumString, DeserializeFromStr,
25)]
26pub enum State {
27 Submitted,
28 Scheduling,
29 Scheduled,
30 Running,
31 Canceling,
32 Finished,
33}
34
35impl QuerySetMember for State {
36 type Iter = StateIter;
37 fn all() -> Self::Iter {
38 Self::iter()
39 }
40}
41
42#[derive(
44 Copy, Clone, Debug, PartialEq, Eq, Hash, EnumIter, EnumString, Display, DeserializeFromStr,
45)]
46pub enum Health {
47 Unknown,
49 Complete,
51 Incomplete,
53 Canceled,
54}
55
56impl QuerySetMember for Health {
57 type Iter = HealthIter;
58 fn all() -> Self::Iter {
59 Self::iter()
60 }
61}
62
63#[derive(Debug, Clone)]
68pub enum Ordering {
69 Id,
70 StartTime,
71 EndTime,
72 SubmitTime,
73}
74
75impl fmt::Display for Ordering {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 match self {
78 Ordering::Id => write!(f, "id"),
79 Ordering::StartTime => write!(f, "start_time"),
80 Ordering::EndTime => write!(f, "end_time"),
81 Ordering::SubmitTime => write!(f, "submit_time"),
82 }
83 }
84}
85
86#[derive(Clone, Deserialize, Debug)]
87struct LavaJob {
88 id: i64,
89 submitter: String,
90 viewing_groups: Vec<i64>,
91 description: String,
92 health_check: bool,
93 requested_device_type: Option<String>,
94 tags: Vec<u32>,
95 actual_device: Option<String>,
96 submit_time: DateTime<Utc>,
97 start_time: Option<DateTime<Utc>>,
98 end_time: Option<DateTime<Utc>>,
99 state: State,
100 health: Health,
101 priority: i64,
102 definition: String,
103 original_definition: String,
104 multinode_definition: String,
105 failure_tags: Vec<u32>,
106 failure_comment: Option<String>,
107}
108
109#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct Job {
117 pub id: i64,
118 pub submitter: String,
119 pub viewing_groups: Vec<i64>,
120 pub description: String,
121 pub health_check: bool,
122 pub requested_device_type: Option<String>,
123 pub tags: Vec<Tag>,
124 pub actual_device: Option<String>,
125 pub submit_time: DateTime<Utc>,
126 pub start_time: Option<DateTime<Utc>>,
127 pub end_time: Option<DateTime<Utc>>,
128 pub state: State,
129 pub health: Health,
130 pub priority: i64,
131 pub definition: String,
132 pub original_definition: String,
133 pub multinode_definition: String,
134 pub failure_tags: Vec<Tag>,
135 pub failure_comment: Option<String>,
136}
137
138enum PagingState<'a> {
139 Paging,
140 Transforming(BoxFuture<'a, Job>),
141}
142
143pub struct Jobs<'a> {
149 lava: &'a Lava,
150 paginator: Paginator<LavaJob>,
151 state: PagingState<'a>,
152}
153
154impl<'a> Jobs<'a> {
155 pub fn reported_items(&self) -> Option<u32> {
165 self.paginator.reported_items()
166 }
167}
168
169#[derive(Debug, Clone)]
206pub struct JobsBuilder<'a> {
207 lava: &'a Lava,
208 states: QuerySet<State>,
209 healths: QuerySet<Health>,
210 limit: Option<u32>,
211 ordering: Ordering,
212 ids: Vec<i64>,
213 id_after: Option<i64>,
214 started_after: Option<DateTime<Utc>>,
215 submitted_after: Option<DateTime<Utc>>,
216 ended_after: Option<DateTime<Utc>>,
217 ascending: bool,
218}
219
220impl<'a> JobsBuilder<'a> {
221 pub fn new(lava: &'a Lava) -> Self {
228 Self {
229 lava,
230 states: QuerySet::new(String::from("state")),
231 healths: QuerySet::new(String::from("health")),
232 limit: None,
233 ordering: Ordering::Id,
234 ids: Vec::new(),
235 id_after: None,
236 started_after: None,
237 submitted_after: None,
238 ended_after: None,
239 ascending: true,
240 }
241 }
242
243 pub fn state(mut self, state: State) -> Self {
245 self.states.include(state);
246 self
247 }
248
249 pub fn state_not(mut self, state: State) -> Self {
251 self.states.exclude(&state);
252 self
253 }
254
255 pub fn limit(mut self, limit: u32) -> Self {
282 self.limit = Some(limit);
283 self
284 }
285
286 pub fn health(mut self, health: Health) -> Self {
288 self.healths.include(health);
289 self
290 }
291
292 pub fn health_not(mut self, health: Health) -> Self {
294 self.healths.exclude(&health);
295 self
296 }
297
298 pub fn id(mut self, id: i64) -> Self {
300 self.ids.push(id);
301 self
302 }
303
304 pub fn id_after(mut self, id: i64) -> Self {
306 self.id_after = Some(id);
307 self
308 }
309
310 pub fn started_after(mut self, when: chrono::DateTime<Utc>) -> Self {
313 self.started_after = Some(when);
314 self
315 }
316
317 pub fn submitted_after(mut self, when: chrono::DateTime<Utc>) -> Self {
320 self.submitted_after = Some(when);
321 self
322 }
323
324 pub fn ended_after(mut self, when: chrono::DateTime<Utc>) -> Self {
326 self.ended_after = Some(when);
327 self
328 }
329
330 pub fn ordering(mut self, ordering: Ordering, ascending: bool) -> Self {
332 self.ordering = ordering;
333 self.ascending = ascending;
334 self
335 }
336
337 pub fn query(self) -> Jobs<'a> {
339 let mut url = self
340 .lava
341 .base
342 .join("jobs/")
343 .expect("Failed to append to base url");
344 url.query_pairs_mut().append_pair(
345 "ordering",
346 &format!(
347 "{}{}",
348 match self.ascending {
349 true => "",
350 false => "-",
351 },
352 self.ordering
353 ),
354 );
355 if let Some(pair) = self.states.query() {
356 url.query_pairs_mut().append_pair(&pair.0, &pair.1);
357 }
358 if let Some(limit) = self.limit {
359 url.query_pairs_mut()
360 .append_pair("limit", &limit.to_string());
361 };
362 if let Some(pair) = self.healths.query() {
363 url.query_pairs_mut().append_pair(&pair.0, &pair.1);
364 }
365
366 match self.ids.len() {
367 0 => (),
368 1 => {
369 url.query_pairs_mut()
370 .append_pair("id", &self.ids[0].to_string());
371 }
372 _ => {
373 let ids: Vec<_> = self.ids.iter().map(i64::to_string).collect();
374 url.query_pairs_mut().append_pair("id__in", &ids.join(","));
375 }
376 }
377
378 if let Some(id_after) = self.id_after {
379 url.query_pairs_mut()
380 .append_pair("id__gt", &id_after.to_string());
381 };
382 if let Some(started_after) = self.started_after {
383 url.query_pairs_mut()
384 .append_pair("start_time__gt", &started_after.to_rfc3339());
385 };
386 if let Some(submitted_after) = self.submitted_after {
387 url.query_pairs_mut()
388 .append_pair("submit_time__gt", &submitted_after.to_rfc3339());
389 };
390 if let Some(ended_after) = self.ended_after {
391 url.query_pairs_mut()
392 .append_pair("end_time__gt", &ended_after.to_rfc3339());
393 };
394
395 let paginator = Paginator::new(self.lava.client.clone(), url);
396 Jobs {
397 lava: self.lava,
398 paginator,
399 state: PagingState::Paging,
400 }
401 }
402}
403
404async fn transform_job(job: LavaJob, lava: &Lava) -> Job {
405 let t = stream::iter(job.tags.iter());
406 let tags = t
407 .filter_map(|i| async move { lava.tag(*i).await })
408 .collect()
409 .await;
410
411 let t = stream::iter(job.failure_tags.iter());
412 let failure_tags = t
413 .filter_map(|i| async move { lava.tag(*i).await })
414 .collect()
415 .await;
416
417 Job {
418 id: job.id,
419 submitter: job.submitter,
420 viewing_groups: job.viewing_groups,
421 description: job.description,
422 health_check: job.health_check,
423 requested_device_type: job.requested_device_type,
424 tags,
425 actual_device: job.actual_device,
426 submit_time: job.submit_time,
427 start_time: job.start_time,
428 end_time: job.end_time,
429 state: job.state,
430 health: job.health,
431 priority: job.priority,
432 definition: job.definition,
433 original_definition: job.original_definition,
434 multinode_definition: job.multinode_definition,
435 failure_tags,
436 failure_comment: job.failure_comment,
437 }
438}
439
440impl<'a> Stream for Jobs<'a> {
441 type Item = Result<Job, PaginationError>;
442
443 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444 let me = self.get_mut();
445
446 loop {
447 return match &mut me.state {
448 PagingState::Paging => {
449 let p = Pin::new(&mut me.paginator);
450 match p.poll_next(cx) {
451 Poll::Ready(None) => Poll::Ready(None),
452 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
453 Poll::Ready(Some(Ok(d))) => {
454 me.state = PagingState::Transforming(transform_job(d, me.lava).boxed());
455 continue;
456 }
457 Poll::Pending => Poll::Pending,
458 }
459 }
460 PagingState::Transforming(fut) => match fut.as_mut().poll(cx) {
461 Poll::Ready(d) => {
462 me.state = PagingState::Paging;
463 Poll::Ready(Some(Ok(d)))
464 }
465 Poll::Pending => Poll::Pending,
466 },
467 };
468 }
469 }
470}
471
472#[derive(Error, Debug)]
473pub enum SubmissionError {
474 #[error("Request failed {0}")]
475 Request(#[from] reqwest::Error),
476 #[error("Invalid job: {0}")]
477 InvalidJob(String),
478 #[error("Unexpected reply: {0}")]
479 UnexpectedReply(reqwest::StatusCode),
480}
481
482#[derive(Debug, Serialize)]
483struct Submission<'a> {
484 definition: &'a str,
485}
486
487#[derive(Debug, Deserialize)]
488struct SubmissionReply {
489 message: String,
490 #[serde(default)]
491 job_ids: Vec<i64>,
492}
493
494pub async fn submit_job(lava: &Lava, definition: &str) -> Result<Vec<i64>, SubmissionError> {
495 let url = lava
496 .base
497 .join("jobs/")
498 .expect("Failed to append to base url");
499 let sub = Submission { definition };
500
501 let post = lava.client.post(url).json(&sub).send().await?;
502
503 match post.status() {
504 StatusCode::CREATED => {
505 let reply: SubmissionReply = post.json().await?;
506 Ok(reply.job_ids)
507 }
508 StatusCode::BAD_REQUEST => {
509 let reply: SubmissionReply = post.json().await?;
510 Err(SubmissionError::InvalidJob(reply.message))
511 }
512 s => Err(SubmissionError::UnexpectedReply(s)),
513 }
514}
515
516#[derive(Error, Debug)]
517pub enum CancellationError {
518 #[error("Request failed {0}")]
519 Request(#[from] reqwest::Error),
520 #[error("Unexpected reply: {0}")]
521 UnexpectedReply(reqwest::StatusCode),
522}
523
524pub async fn cancel_job(lava: &Lava, id: i64) -> Result<(), CancellationError> {
525 let mut url = lava.base.clone();
526 url.path_segments_mut()
527 .unwrap()
528 .pop_if_empty()
529 .push("jobs")
530 .push(&id.to_string())
531 .push("cancel")
532 .push("");
533
534 let res = lava.client.get(url).send().await?;
535
536 match res.status() {
537 StatusCode::OK => Ok(()),
538 s => Err(CancellationError::UnexpectedReply(s)),
539 }
540}
541
542#[derive(Error, Debug)]
543pub enum ResultsError {
544 #[error("Request failed {0}")]
545 Request(#[from] reqwest::Error),
546 #[error("Unexpected reply: {0}")]
547 UnexpectedReply(reqwest::StatusCode),
548}
549
550pub async fn job_results_as_junit(
551 lava: &Lava,
552 id: i64,
553) -> Result<impl Stream<Item = Result<Bytes, ResultsError>> + Send + Unpin + '_, ResultsError> {
554 let mut url = lava.base.clone();
555 url.path_segments_mut()
556 .unwrap()
557 .pop_if_empty()
558 .push("jobs")
559 .push(&id.to_string())
560 .push("junit")
561 .push("");
562
563 let res = lava.client.get(url).send().await?;
564 match res.status() {
565 StatusCode::OK => Ok(res.bytes_stream().map_err(ResultsError::from)),
566 s => Err(ResultsError::UnexpectedReply(s)),
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::{Health, Job, Ordering, State, Tag};
573 use crate::Lava;
574
575 use boulder::{
576 Buildable, Builder, GeneratableWithPersianRug, GeneratorWithPersianRugMutIterator, Repeat,
577 Some as GSome, SubsetsFromPersianRug, Time,
578 };
579 use chrono::{DateTime, Duration, Utc};
580 use futures::{AsyncReadExt, TryStreamExt};
581 use lava_api_mock::{
582 Device as MockDevice, DeviceType as MockDeviceType, Group as MockGroup, Job as MockJob,
583 JobHealth as MockJobHealth, JobState as MockJobState, LavaMock, PaginationLimits, PassFail,
584 PopulationParams, SharedState, Tag as MockTag, User as MockUser,
585 };
586 use persian_rug::{Accessor, Context, Proxy};
587 use std::collections::{BTreeMap, BTreeSet};
588 use std::convert::{Infallible, TryFrom, TryInto};
589 use std::str::FromStr;
590 use test_log::test;
591
592 impl Job {
593 #[persian_rug::constraints(
594 context = C,
595 access(
596 MockUser<C>,
597 MockGroup<C>,
598 MockTag<C>,
599 MockDevice<C>,
600 MockDeviceType<C>
601 )
602 )]
603 pub fn from_mock<'b, B, C>(job: &MockJob<C>, context: B) -> Job
604 where
605 B: 'b + Accessor<Context = C>,
606 C: Context + 'static,
607 {
608 Self {
609 id: job.id,
610 submitter: context.get(&job.submitter).username.clone(),
611 viewing_groups: job
612 .viewing_groups
613 .iter()
614 .map(|g| context.get(g).id)
615 .collect::<Vec<_>>(),
616 description: job.description.clone(),
617 health_check: job.health_check,
618 requested_device_type: job
619 .requested_device_type
620 .map(|d| context.get(&d).name.to_string()),
621 tags: job
622 .tags
623 .iter()
624 .map(|t| Tag::from_mock(context.get(t), context.clone()))
625 .collect::<Vec<_>>(),
626 actual_device: job
627 .actual_device
628 .as_ref()
629 .map(|d| context.get(d).hostname.to_string()),
630 submit_time: job.submit_time.unwrap(),
631 start_time: job.start_time,
632 end_time: job.end_time,
633 state: job.state.try_into().unwrap(),
634 health: job.health.try_into().unwrap(),
635 priority: job.priority,
636 definition: job.definition.clone(),
637 original_definition: job.original_definition.clone(),
638 multinode_definition: job.multinode_definition.clone(),
639 failure_tags: job
640 .failure_tags
641 .iter()
642 .map(|t| Tag::from_mock(context.get(t), context.clone()))
643 .collect::<Vec<_>>(),
644 failure_comment: job.failure_comment.clone(),
645 }
646 }
647 }
648
649 impl TryFrom<MockJobState> for State {
650 type Error = Infallible;
651 fn try_from(state: MockJobState) -> Result<State, Self::Error> {
652 use State::*;
653
654 match state {
655 MockJobState::Submitted => Ok(Submitted),
656 MockJobState::Scheduling => Ok(Scheduling),
657 MockJobState::Scheduled => Ok(Scheduled),
658 MockJobState::Running => Ok(Running),
659 MockJobState::Canceling => Ok(Canceling),
660 MockJobState::Finished => Ok(Finished),
661 }
662 }
663 }
664
665 impl TryFrom<MockJobHealth> for Health {
666 type Error = Infallible;
667 fn try_from(health: MockJobHealth) -> Result<Health, Self::Error> {
668 use Health::*;
669
670 match health {
671 MockJobHealth::Unknown => Ok(Unknown),
672 MockJobHealth::Complete => Ok(Complete),
673 MockJobHealth::Incomplete => Ok(Incomplete),
674 MockJobHealth::Canceled => Ok(Canceled),
675 }
676 }
677 }
678
679 #[test]
680 fn test_display() {
681 assert_eq!(State::Submitted.to_string(), "Submitted");
682 assert_eq!(State::Scheduling.to_string(), "Scheduling");
683 assert_eq!(State::Scheduled.to_string(), "Scheduled");
684 assert_eq!(State::Running.to_string(), "Running");
685 assert_eq!(State::Canceling.to_string(), "Canceling");
686 assert_eq!(State::Finished.to_string(), "Finished");
687
688 assert_eq!(Health::Unknown.to_string(), "Unknown");
689 assert_eq!(Health::Complete.to_string(), "Complete");
690 assert_eq!(Health::Incomplete.to_string(), "Incomplete");
691 assert_eq!(Health::Canceled.to_string(), "Canceled");
692 }
693
694 #[test]
695 fn test_from_str() {
696 assert_eq!(Ok(State::Submitted), State::from_str("Submitted"));
697 assert_eq!(Ok(State::Scheduling), State::from_str("Scheduling"));
698 assert_eq!(Ok(State::Scheduled), State::from_str("Scheduled"));
699 assert_eq!(Ok(State::Running), State::from_str("Running"));
700 assert_eq!(Ok(State::Canceling), State::from_str("Canceling"));
701 assert_eq!(Ok(State::Finished), State::from_str("Finished"));
702 assert_eq!(
703 Err(strum::ParseError::VariantNotFound),
704 State::from_str("womble")
705 );
706
707 assert_eq!(Ok(Health::Unknown), Health::from_str("Unknown"));
708 assert_eq!(Ok(Health::Complete), Health::from_str("Complete"));
709 assert_eq!(Ok(Health::Incomplete), Health::from_str("Incomplete"));
710 assert_eq!(Ok(Health::Canceled), Health::from_str("Canceled"));
711 assert_eq!(
712 Err(strum::ParseError::VariantNotFound),
713 Health::from_str("")
714 );
715 }
716
717 #[test(tokio::test)]
722 async fn test_basic() {
723 let state = SharedState::new_populated(PopulationParams::builder().jobs(50usize).build());
724 let server = LavaMock::new(
725 state.clone(),
726 PaginationLimits::builder().jobs(Some(7)).build(),
727 )
728 .await;
729
730 let mut map = BTreeMap::new();
731 let start = state.access();
732 for j in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
733 map.insert(j.id, j);
734 }
735
736 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
737
738 let mut lj = lava.jobs().query();
739
740 let mut seen = BTreeMap::new();
741 while let Some(job) = lj.try_next().await.expect("failed to get job") {
742 assert!(!seen.contains_key(&job.id));
743 assert!(map.contains_key(&job.id));
744 let jj = map.get(&job.id).unwrap();
745 assert_eq!(job.submitter, start.get(&jj.submitter).username);
746 assert_eq!(job.viewing_groups.len(), jj.viewing_groups.len());
747 for i in 0..job.viewing_groups.len() {
748 assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
749 }
750 assert_eq!(job.description, jj.description);
751 assert_eq!(job.health_check, jj.health_check);
752 assert_eq!(
753 job.requested_device_type.as_ref(),
754 jj.requested_device_type
755 .as_ref()
756 .map(|t| &start.get(t).name)
757 );
758
759 assert_eq!(job.tags.len(), jj.tags.len());
760 for i in 0..job.tags.len() {
761 assert_eq!(job.tags[i].id, start.get(&jj.tags[i]).id);
762 assert_eq!(job.tags[i].name, start.get(&jj.tags[i]).name);
763 assert_eq!(job.tags[i].description, start.get(&jj.tags[i]).description);
764 }
765
766 assert_eq!(
767 job.actual_device.as_ref(),
768 jj.actual_device.as_ref().map(|t| &start.get(t).hostname)
769 );
770 assert_eq!(Some(job.submit_time), jj.submit_time);
771 assert_eq!(job.start_time, jj.start_time);
772 assert_eq!(job.end_time, jj.end_time);
773 assert_eq!(job.state.to_string(), jj.state.to_string());
774 assert_eq!(job.health.to_string(), jj.health.to_string());
775 assert_eq!(job.priority, jj.priority);
776 assert_eq!(job.definition, jj.definition);
777 assert_eq!(job.original_definition, jj.original_definition);
778 assert_eq!(job.multinode_definition, jj.multinode_definition);
779
780 assert_eq!(job.failure_tags.len(), jj.failure_tags.len());
781 for i in 0..job.failure_tags.len() {
782 assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
783 }
784 assert_eq!(job.failure_comment, jj.failure_comment);
785
786 seen.insert(job.id, job.clone());
787 }
788 assert_eq!(seen.len(), 50);
789 }
790
791 #[test(tokio::test)]
796 async fn test_jobs_builder() {
797 let mut server = lava_api_mock::LavaMock::new(
798 SharedState::new_populated(
799 PopulationParams::builder()
800 .tags(5usize)
801 .jobs(0usize)
802 .build(),
803 ),
804 PaginationLimits::builder().jobs(Some(7)).build(),
805 )
806 .await;
807
808 let base_date = DateTime::parse_from_rfc3339("2022-04-10T16:30:00+01:00")
809 .unwrap()
810 .with_timezone(&Utc);
811
812 let mut gen = Proxy::<lava_api_mock::Job<lava_api_mock::State>>::generator()
813 .tags(SubsetsFromPersianRug::new())
814 .health(Repeat!(
815 MockJobHealth::Complete,
816 MockJobHealth::Incomplete,
817 MockJobHealth::Canceled,
818 MockJobHealth::Unknown
819 ))
820 .state(Repeat!(
821 MockJobState::Submitted,
822 MockJobState::Scheduling,
823 MockJobState::Scheduled,
824 MockJobState::Running,
825 MockJobState::Canceling,
826 MockJobState::Finished
827 ))
828 .submit_time(GSome(Time::new(
829 base_date - Duration::minutes(1),
830 Duration::minutes(-1),
831 )))
832 .start_time(GSome(Time::new(base_date, Duration::minutes(-1))))
833 .end_time(GSome(Time::new(base_date, Duration::seconds(-30))));
834
835 let _ = GeneratorWithPersianRugMutIterator::new(&mut gen, server.state_mut())
836 .take(50)
837 .collect::<Vec<_>>();
838
839 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
840
841 let mut lj = lava.jobs().state(State::Running).query();
842
843 let mut count = 0;
844 while let Some(job) = lj.try_next().await.expect("failed to get job") {
845 assert_eq!(job.state, State::Running);
846 count += 1;
847 }
848 assert_eq!(count, 8);
849
850 let mut lj = lava.jobs().state_not(State::Canceling).query();
851 let mut count = 0;
852 while let Some(job) = lj.try_next().await.expect("failed to get job") {
853 assert_ne!(job.state, State::Canceling);
854 count += 1;
855 }
856 assert_eq!(count, 42);
857
858 let mut lj = lava.jobs().health(Health::Incomplete).query();
859
860 let mut count = 0;
861 while let Some(job) = lj.try_next().await.expect("failed to get job") {
862 assert_eq!(job.health, Health::Incomplete);
863 count += 1;
864 }
865 assert_eq!(count, 13);
866
867 let mut lj = lava.jobs().health_not(Health::Canceled).query();
868 let mut count = 0;
869 while let Some(job) = lj.try_next().await.expect("failed to get job") {
870 assert_ne!(job.health, Health::Canceled);
871 count += 1;
872 }
873 assert_eq!(count, 38);
874
875 let mut lj = lava.jobs().id_after(9i64).query();
876 let mut count = 0;
877 while let Some(job) = lj.try_next().await.expect("failed to get job") {
878 assert!(job.id > 9i64);
879 count += 1;
880 }
881 assert_eq!(count, 40);
882
883 let job_35_start = DateTime::parse_from_rfc3339("2022-04-10T15:55:00+01:00")
884 .unwrap()
885 .with_timezone(&Utc);
886
887 let mut lj = lava.jobs().started_after(job_35_start).query();
888 let mut count = 0;
889 while let Some(job) = lj.try_next().await.expect("failed to get job") {
890 assert!(job.start_time.is_some() && job.start_time.unwrap() > job_35_start);
891 count += 1;
892 }
893 assert_eq!(count, 35);
894
895 let job_19_submit = DateTime::parse_from_rfc3339("2022-04-10T16:10:00+01:00")
896 .unwrap()
897 .with_timezone(&Utc);
898
899 let mut lj = lava.jobs().submitted_after(job_19_submit).query();
900 let mut count = 0;
901 while let Some(job) = lj.try_next().await.expect("failed to get job") {
902 assert!(job.submit_time > job_19_submit);
903 count += 1;
904 }
905 assert_eq!(count, 19);
906
907 let job_25_end = DateTime::parse_from_rfc3339("2022-04-10T16:17:30+01:00")
908 .unwrap()
909 .with_timezone(&Utc);
910
911 let mut lj = lava.jobs().ended_after(job_25_end).query();
912 let mut count = 0;
913 while let Some(job) = lj.try_next().await.expect("failed to get job") {
914 assert!(job.end_time.is_some() && job.end_time.unwrap() > job_25_end);
915 count += 1;
916 }
917 assert_eq!(count, 25);
918
919 let mut lj = lava.jobs().ordering(Ordering::SubmitTime, false).query();
920 let mut count = 0;
921 let mut prev = None;
922 while let Some(job) = lj.try_next().await.expect("failed to get job") {
923 if let Some(dt) = prev {
924 assert!(job.submit_time < dt);
925 }
926 prev = Some(job.submit_time);
927 count += 1;
928 }
929 assert_eq!(count, 50);
930
931 let mut lj = lava.jobs().ordering(Ordering::SubmitTime, true).query();
932 let mut count = 0;
933 let mut prev = None;
934 while let Some(job) = lj.try_next().await.expect("failed to get job") {
935 if let Some(dt) = prev {
936 assert!(job.submit_time > dt);
937 }
938 prev = Some(job.submit_time);
939 count += 1;
940 }
941 assert_eq!(count, 50);
942
943 let mut lj = lava.jobs().ordering(Ordering::StartTime, false).query();
944 let mut count = 0;
945 let mut prev = None;
946 while let Some(job) = lj.try_next().await.expect("failed to get job") {
947 if let Some(dt) = prev {
948 assert!(job.start_time < dt);
949 }
950 prev = Some(job.start_time);
951 count += 1;
952 }
953 assert_eq!(count, 50);
954
955 let mut lj = lava.jobs().ordering(Ordering::StartTime, true).query();
956 let mut count = 0;
957 let mut prev = None;
958 while let Some(job) = lj.try_next().await.expect("failed to get job") {
959 if let Some(dt) = prev {
960 assert!(job.start_time > dt);
961 }
962 prev = Some(job.start_time);
963 count += 1;
964 }
965 assert_eq!(count, 50);
966 }
967
968 #[test(tokio::test)]
969 async fn test_junit() {
970 let pop = PopulationParams::builder()
971 .jobs(3usize)
972 .test_suites(6usize)
973 .test_cases(20usize)
974 .build();
975 let state = SharedState::new_populated(pop);
976 let server = LavaMock::new(
977 state.clone(),
978 PaginationLimits::builder().test_cases(Some(6)).build(),
979 )
980 .await;
981
982 let mut map = BTreeMap::new();
983 let start = state.access();
984 for t in start.get_iter::<lava_api_mock::TestCase<lava_api_mock::State>>() {
985 map.insert(t.name.clone(), t.clone());
986 }
987
988 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
989 let mut seen = BTreeSet::new();
990
991 for job in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
992 let mut v = Vec::new();
993 lava.job_results_as_junit(job.id)
994 .await
995 .expect("failed to obtain junit output")
996 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
997 .into_async_read()
998 .read_to_end(&mut v)
999 .await
1000 .expect("failed to fully read junit output");
1001
1002 let report = junit_parser::from_reader(std::io::Cursor::new(v))
1003 .expect("failed to parse mock junit output");
1004
1005 for suite in report.suites.iter() {
1006 for test in suite.cases.iter() {
1007 assert!(!seen.contains(&test.name));
1008 assert!(map.contains_key(&test.name));
1009 let tt = map.get(&test.name).unwrap();
1010 match tt.result {
1011 PassFail::Pass => {
1012 assert!(test.status.is_success());
1013 }
1014 PassFail::Fail => {
1015 assert!(test.status.is_failure());
1016 }
1017 PassFail::Skip => {
1018 assert!(test.status.is_skipped());
1019 }
1020 PassFail::Unknown => {
1021 assert!(test.status.is_error());
1022 }
1023 }
1024 seen.insert(test.name.clone());
1025 }
1026 }
1027 }
1028 assert_eq!(seen.len(), 60);
1029 }
1030}