1use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use futures::future::BoxFuture;
6use futures::stream::{self, Stream, StreamExt};
7use futures::{FutureExt, TryStreamExt};
8use reqwest::StatusCode;
9use serde::{Deserialize, Serialize};
10use serde_with::DeserializeFromStr;
11use std::fmt;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
15use thiserror::Error;
16
17use crate::Lava;
18use crate::paginator::{PaginationError, Paginator};
19use crate::queryset::{QuerySet, QuerySetMember};
20use crate::tag::Tag;
21
22#[derive(
24 Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter, Display, EnumString, DeserializeFromStr,
25)]
26pub enum State {
27 Submitted,
28 Scheduling,
29 Scheduled,
30 Running,
31 Canceling,
32 Finished,
33}
34
35impl QuerySetMember for State {
36 type Iter = StateIter;
37 fn all() -> Self::Iter {
38 Self::iter()
39 }
40}
41
42#[derive(
44 Copy, Clone, Debug, PartialEq, Eq, Hash, EnumIter, EnumString, Display, DeserializeFromStr,
45)]
46pub enum Health {
47 Unknown,
49 Complete,
51 Incomplete,
53 Canceled,
54}
55
56impl QuerySetMember for Health {
57 type Iter = HealthIter;
58 fn all() -> Self::Iter {
59 Self::iter()
60 }
61}
62
63#[derive(Debug, Clone)]
68pub enum Ordering {
69 Id,
70 StartTime,
71 EndTime,
72 SubmitTime,
73}
74
75impl fmt::Display for Ordering {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 match self {
78 Ordering::Id => write!(f, "id"),
79 Ordering::StartTime => write!(f, "start_time"),
80 Ordering::EndTime => write!(f, "end_time"),
81 Ordering::SubmitTime => write!(f, "submit_time"),
82 }
83 }
84}
85
86#[derive(Clone, Deserialize, Debug)]
87struct LavaJob {
88 id: i64,
89 submitter: String,
90 viewing_groups: Vec<i64>,
91 description: String,
92 health_check: bool,
93 requested_device_type: Option<String>,
94 tags: Vec<u32>,
95 actual_device: Option<String>,
96 submit_time: DateTime<Utc>,
97 start_time: Option<DateTime<Utc>>,
98 end_time: Option<DateTime<Utc>>,
99 state: State,
100 health: Health,
101 priority: i64,
102 definition: String,
103 original_definition: String,
104 multinode_definition: String,
105 failure_tags: Vec<u32>,
106 failure_comment: Option<String>,
107}
108
109#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct Job {
117 pub id: i64,
118 pub submitter: String,
119 pub viewing_groups: Vec<i64>,
120 pub description: String,
121 pub health_check: bool,
122 pub requested_device_type: Option<String>,
123 pub tags: Vec<Tag>,
124 pub actual_device: Option<String>,
125 pub submit_time: DateTime<Utc>,
126 pub start_time: Option<DateTime<Utc>>,
127 pub end_time: Option<DateTime<Utc>>,
128 pub state: State,
129 pub health: Health,
130 pub priority: i64,
131 pub definition: String,
132 pub original_definition: String,
133 pub multinode_definition: String,
134 pub failure_tags: Vec<Tag>,
135 pub failure_comment: Option<String>,
136}
137
138enum PagingState<'a> {
139 Paging,
140 Transforming(BoxFuture<'a, Job>),
141}
142
143pub struct Jobs<'a> {
149 lava: &'a Lava,
150 paginator: Paginator<LavaJob>,
151 state: PagingState<'a>,
152}
153
154impl Jobs<'_> {
155 pub fn reported_items(&self) -> Option<u32> {
165 self.paginator.reported_items()
166 }
167}
168
169#[derive(Debug, Clone)]
206pub struct JobsBuilder<'a> {
207 lava: &'a Lava,
208 states: QuerySet<State>,
209 healths: QuerySet<Health>,
210 limit: Option<u32>,
211 ordering: Ordering,
212 ids: Vec<i64>,
213 id_after: Option<i64>,
214 started_after: Option<DateTime<Utc>>,
215 submitted_after: Option<DateTime<Utc>>,
216 ended_after: Option<DateTime<Utc>>,
217 ascending: bool,
218}
219
220impl<'a> JobsBuilder<'a> {
221 pub fn new(lava: &'a Lava) -> Self {
228 Self {
229 lava,
230 states: QuerySet::new(String::from("state")),
231 healths: QuerySet::new(String::from("health")),
232 limit: None,
233 ordering: Ordering::Id,
234 ids: Vec::new(),
235 id_after: None,
236 started_after: None,
237 submitted_after: None,
238 ended_after: None,
239 ascending: true,
240 }
241 }
242
243 pub fn state(mut self, state: State) -> Self {
245 self.states.include(state);
246 self
247 }
248
249 pub fn state_not(mut self, state: State) -> Self {
251 self.states.exclude(&state);
252 self
253 }
254
255 pub fn limit(mut self, limit: u32) -> Self {
282 self.limit = Some(limit);
283 self
284 }
285
286 pub fn health(mut self, health: Health) -> Self {
288 self.healths.include(health);
289 self
290 }
291
292 pub fn health_not(mut self, health: Health) -> Self {
294 self.healths.exclude(&health);
295 self
296 }
297
298 pub fn id(mut self, id: i64) -> Self {
300 self.ids.push(id);
301 self
302 }
303
304 pub fn id_after(mut self, id: i64) -> Self {
306 self.id_after = Some(id);
307 self
308 }
309
310 pub fn started_after(mut self, when: chrono::DateTime<Utc>) -> Self {
313 self.started_after = Some(when);
314 self
315 }
316
317 pub fn submitted_after(mut self, when: chrono::DateTime<Utc>) -> Self {
320 self.submitted_after = Some(when);
321 self
322 }
323
324 pub fn ended_after(mut self, when: chrono::DateTime<Utc>) -> Self {
326 self.ended_after = Some(when);
327 self
328 }
329
330 pub fn ordering(mut self, ordering: Ordering, ascending: bool) -> Self {
332 self.ordering = ordering;
333 self.ascending = ascending;
334 self
335 }
336
337 pub fn query(self) -> Jobs<'a> {
339 let mut url = self
340 .lava
341 .base
342 .join("jobs/")
343 .expect("Failed to append to base url");
344 url.query_pairs_mut().append_pair(
345 "ordering",
346 &format!(
347 "{}{}",
348 match self.ascending {
349 true => "",
350 false => "-",
351 },
352 self.ordering
353 ),
354 );
355 if let Some(pair) = self.states.query() {
356 url.query_pairs_mut().append_pair(&pair.0, &pair.1);
357 }
358 if let Some(limit) = self.limit {
359 url.query_pairs_mut()
360 .append_pair("limit", &limit.to_string());
361 };
362 if let Some(pair) = self.healths.query() {
363 url.query_pairs_mut().append_pair(&pair.0, &pair.1);
364 }
365
366 match self.ids.len() {
367 0 => (),
368 1 => {
369 url.query_pairs_mut()
370 .append_pair("id", &self.ids[0].to_string());
371 }
372 _ => {
373 let ids: Vec<_> = self.ids.iter().map(i64::to_string).collect();
374 url.query_pairs_mut().append_pair("id__in", &ids.join(","));
375 }
376 }
377
378 if let Some(id_after) = self.id_after {
379 url.query_pairs_mut()
380 .append_pair("id__gt", &id_after.to_string());
381 };
382 if let Some(started_after) = self.started_after {
383 url.query_pairs_mut()
384 .append_pair("start_time__gt", &started_after.to_rfc3339());
385 };
386 if let Some(submitted_after) = self.submitted_after {
387 url.query_pairs_mut()
388 .append_pair("submit_time__gt", &submitted_after.to_rfc3339());
389 };
390 if let Some(ended_after) = self.ended_after {
391 url.query_pairs_mut()
392 .append_pair("end_time__gt", &ended_after.to_rfc3339());
393 };
394
395 let paginator = Paginator::new(self.lava.client.clone(), url);
396 Jobs {
397 lava: self.lava,
398 paginator,
399 state: PagingState::Paging,
400 }
401 }
402}
403
404async fn transform_job(job: LavaJob, lava: &Lava) -> Job {
405 let t = stream::iter(job.tags.iter());
406 let tags = t
407 .filter_map(|i| async move { lava.tag(*i).await })
408 .collect()
409 .await;
410
411 let t = stream::iter(job.failure_tags.iter());
412 let failure_tags = t
413 .filter_map(|i| async move { lava.tag(*i).await })
414 .collect()
415 .await;
416
417 Job {
418 id: job.id,
419 submitter: job.submitter,
420 viewing_groups: job.viewing_groups,
421 description: job.description,
422 health_check: job.health_check,
423 requested_device_type: job.requested_device_type,
424 tags,
425 actual_device: job.actual_device,
426 submit_time: job.submit_time,
427 start_time: job.start_time,
428 end_time: job.end_time,
429 state: job.state,
430 health: job.health,
431 priority: job.priority,
432 definition: job.definition,
433 original_definition: job.original_definition,
434 multinode_definition: job.multinode_definition,
435 failure_tags,
436 failure_comment: job.failure_comment,
437 }
438}
439
440impl Stream for Jobs<'_> {
441 type Item = Result<Job, PaginationError>;
442
443 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444 let me = self.get_mut();
445
446 loop {
447 return match &mut me.state {
448 PagingState::Paging => {
449 let p = Pin::new(&mut me.paginator);
450 match p.poll_next(cx) {
451 Poll::Ready(None) => Poll::Ready(None),
452 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
453 Poll::Ready(Some(Ok(d))) => {
454 me.state = PagingState::Transforming(transform_job(d, me.lava).boxed());
455 continue;
456 }
457 Poll::Pending => Poll::Pending,
458 }
459 }
460 PagingState::Transforming(fut) => match fut.as_mut().poll(cx) {
461 Poll::Ready(d) => {
462 me.state = PagingState::Paging;
463 Poll::Ready(Some(Ok(d)))
464 }
465 Poll::Pending => Poll::Pending,
466 },
467 };
468 }
469 }
470}
471
472#[derive(Error, Debug)]
473pub enum SubmissionError {
474 #[error("Request failed {0}")]
475 Request(#[from] reqwest::Error),
476 #[error("Invalid job: {0}")]
477 InvalidJob(String),
478 #[error("Unexpected reply: {0}")]
479 UnexpectedReply(reqwest::StatusCode),
480}
481
482#[derive(Debug, Serialize)]
483struct Submission<'a> {
484 definition: &'a str,
485}
486
487#[derive(Debug, Deserialize)]
488struct SubmissionReply {
489 message: String,
490 #[serde(default)]
491 job_ids: Vec<i64>,
492}
493
494pub async fn submit_job(lava: &Lava, definition: &str) -> Result<Vec<i64>, SubmissionError> {
495 let url = lava
496 .base
497 .join("jobs/")
498 .expect("Failed to append to base url");
499 let sub = Submission { definition };
500
501 let post = lava.client.post(url).json(&sub).send().await?;
502
503 match post.status() {
504 StatusCode::CREATED => {
505 let reply: SubmissionReply = post.json().await?;
506 Ok(reply.job_ids)
507 }
508 StatusCode::BAD_REQUEST => {
509 let reply: SubmissionReply = post.json().await?;
510 Err(SubmissionError::InvalidJob(reply.message))
511 }
512 s => Err(SubmissionError::UnexpectedReply(s)),
513 }
514}
515
516#[derive(Error, Debug)]
517pub enum CancellationError {
518 #[error("Request failed {0}")]
519 Request(#[from] reqwest::Error),
520 #[error("Unexpected reply: {0}")]
521 UnexpectedReply(reqwest::StatusCode),
522}
523
524pub async fn cancel_job(lava: &Lava, id: i64) -> Result<(), CancellationError> {
525 let mut url = lava.base.clone();
526 url.path_segments_mut()
527 .unwrap()
528 .pop_if_empty()
529 .push("jobs")
530 .push(&id.to_string())
531 .push("cancel")
532 .push("");
533
534 let res = lava.client.get(url).send().await?;
535
536 match res.status() {
537 StatusCode::OK => Ok(()),
538 s => Err(CancellationError::UnexpectedReply(s)),
539 }
540}
541
542#[derive(Error, Debug)]
543pub enum ResultsError {
544 #[error("Request failed {0}")]
545 Request(#[from] reqwest::Error),
546 #[error("Unexpected reply: {0}")]
547 UnexpectedReply(reqwest::StatusCode),
548}
549
550pub async fn job_results_as_junit(
551 lava: &Lava,
552 id: i64,
553) -> Result<impl Stream<Item = Result<Bytes, ResultsError>> + Send + Unpin + '_, ResultsError> {
554 let mut url = lava.base.clone();
555 url.path_segments_mut()
556 .unwrap()
557 .pop_if_empty()
558 .push("jobs")
559 .push(&id.to_string())
560 .push("junit")
561 .push("");
562
563 let res = lava.client.get(url).send().await?;
564 match res.status() {
565 StatusCode::OK => Ok(res.bytes_stream().map_err(ResultsError::from)),
566 s => Err(ResultsError::UnexpectedReply(s)),
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::{Health, Job, Ordering, State, Tag};
573 use crate::Lava;
574
575 use boulder::{
576 Buildable, Builder, GeneratableWithPersianRug, GeneratorWithPersianRugMutIterator, Repeat,
577 Some as GSome, SubsetsFromPersianRug, Time,
578 };
579 use chrono::{DateTime, Duration, Utc};
580 use futures::{AsyncReadExt, TryStreamExt};
581 use lava_api_mock::{
582 Device as MockDevice, DeviceType as MockDeviceType, Group as MockGroup, Job as MockJob,
583 JobHealth as MockJobHealth, JobState as MockJobState, LavaMock, PaginationLimits, PassFail,
584 PopulationParams, SharedState, Tag as MockTag, User as MockUser,
585 };
586 use persian_rug::{Accessor, Context, Proxy};
587 use std::collections::{BTreeMap, BTreeSet};
588 use std::str::FromStr;
589 use test_log::test;
590
591 impl Job {
592 #[persian_rug::constraints(
593 context = C,
594 access(
595 MockUser<C>,
596 MockGroup<C>,
597 MockTag<C>,
598 MockDevice<C>,
599 MockDeviceType<C>
600 )
601 )]
602 pub fn from_mock<'b, B, C>(job: &MockJob<C>, context: B) -> Job
603 where
604 B: 'b + Accessor<Context = C>,
605 C: Context + 'static,
606 {
607 Self {
608 id: job.id,
609 submitter: context.get(&job.submitter).username.clone(),
610 viewing_groups: job
611 .viewing_groups
612 .iter()
613 .map(|g| context.get(g).id)
614 .collect::<Vec<_>>(),
615 description: job.description.clone(),
616 health_check: job.health_check,
617 requested_device_type: job
618 .requested_device_type
619 .map(|d| context.get(&d).name.to_string()),
620 tags: job
621 .tags
622 .iter()
623 .map(|t| Tag::from_mock(context.get(t), context.clone()))
624 .collect::<Vec<_>>(),
625 actual_device: job
626 .actual_device
627 .as_ref()
628 .map(|d| context.get(d).hostname.to_string()),
629 submit_time: job.submit_time.unwrap(),
630 start_time: job.start_time,
631 end_time: job.end_time,
632 state: job.state.into(),
633 health: job.health.into(),
634 priority: job.priority,
635 definition: job.definition.clone(),
636 original_definition: job.original_definition.clone(),
637 multinode_definition: job.multinode_definition.clone(),
638 failure_tags: job
639 .failure_tags
640 .iter()
641 .map(|t| Tag::from_mock(context.get(t), context.clone()))
642 .collect::<Vec<_>>(),
643 failure_comment: job.failure_comment.clone(),
644 }
645 }
646 }
647
648 impl From<MockJobState> for State {
649 fn from(state: MockJobState) -> State {
650 use State::*;
651
652 match state {
653 MockJobState::Submitted => Submitted,
654 MockJobState::Scheduling => Scheduling,
655 MockJobState::Scheduled => Scheduled,
656 MockJobState::Running => Running,
657 MockJobState::Canceling => Canceling,
658 MockJobState::Finished => Finished,
659 }
660 }
661 }
662
663 impl From<MockJobHealth> for Health {
664 fn from(health: MockJobHealth) -> Health {
665 use Health::*;
666
667 match health {
668 MockJobHealth::Unknown => Unknown,
669 MockJobHealth::Complete => Complete,
670 MockJobHealth::Incomplete => Incomplete,
671 MockJobHealth::Canceled => Canceled,
672 }
673 }
674 }
675
676 #[test]
677 fn test_display() {
678 assert_eq!(State::Submitted.to_string(), "Submitted");
679 assert_eq!(State::Scheduling.to_string(), "Scheduling");
680 assert_eq!(State::Scheduled.to_string(), "Scheduled");
681 assert_eq!(State::Running.to_string(), "Running");
682 assert_eq!(State::Canceling.to_string(), "Canceling");
683 assert_eq!(State::Finished.to_string(), "Finished");
684
685 assert_eq!(Health::Unknown.to_string(), "Unknown");
686 assert_eq!(Health::Complete.to_string(), "Complete");
687 assert_eq!(Health::Incomplete.to_string(), "Incomplete");
688 assert_eq!(Health::Canceled.to_string(), "Canceled");
689 }
690
691 #[test]
692 fn test_from_str() {
693 assert_eq!(Ok(State::Submitted), State::from_str("Submitted"));
694 assert_eq!(Ok(State::Scheduling), State::from_str("Scheduling"));
695 assert_eq!(Ok(State::Scheduled), State::from_str("Scheduled"));
696 assert_eq!(Ok(State::Running), State::from_str("Running"));
697 assert_eq!(Ok(State::Canceling), State::from_str("Canceling"));
698 assert_eq!(Ok(State::Finished), State::from_str("Finished"));
699 assert_eq!(
700 Err(strum::ParseError::VariantNotFound),
701 State::from_str("womble")
702 );
703
704 assert_eq!(Ok(Health::Unknown), Health::from_str("Unknown"));
705 assert_eq!(Ok(Health::Complete), Health::from_str("Complete"));
706 assert_eq!(Ok(Health::Incomplete), Health::from_str("Incomplete"));
707 assert_eq!(Ok(Health::Canceled), Health::from_str("Canceled"));
708 assert_eq!(
709 Err(strum::ParseError::VariantNotFound),
710 Health::from_str("")
711 );
712 }
713
714 #[test(tokio::test)]
719 async fn test_basic() {
720 let state = SharedState::new_populated(PopulationParams::builder().jobs(50usize).build());
721 let server = LavaMock::new(
722 state.clone(),
723 PaginationLimits::builder().jobs(Some(7)).build(),
724 )
725 .await;
726
727 let mut map = BTreeMap::new();
728 let start = state.access();
729 for j in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
730 map.insert(j.id, j);
731 }
732
733 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
734
735 let mut lj = lava.jobs().query();
736
737 let mut seen = BTreeMap::new();
738 while let Some(job) = lj.try_next().await.expect("failed to get job") {
739 assert!(!seen.contains_key(&job.id));
740 assert!(map.contains_key(&job.id));
741 let jj = map.get(&job.id).unwrap();
742 assert_eq!(job.submitter, start.get(&jj.submitter).username);
743 assert_eq!(job.viewing_groups.len(), jj.viewing_groups.len());
744 for i in 0..job.viewing_groups.len() {
745 assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
746 }
747 assert_eq!(job.description, jj.description);
748 assert_eq!(job.health_check, jj.health_check);
749 assert_eq!(
750 job.requested_device_type.as_ref(),
751 jj.requested_device_type
752 .as_ref()
753 .map(|t| &start.get(t).name)
754 );
755
756 assert_eq!(job.tags.len(), jj.tags.len());
757 for i in 0..job.tags.len() {
758 assert_eq!(job.tags[i].id, start.get(&jj.tags[i]).id);
759 assert_eq!(job.tags[i].name, start.get(&jj.tags[i]).name);
760 assert_eq!(job.tags[i].description, start.get(&jj.tags[i]).description);
761 }
762
763 assert_eq!(
764 job.actual_device.as_ref(),
765 jj.actual_device.as_ref().map(|t| &start.get(t).hostname)
766 );
767 assert_eq!(Some(job.submit_time), jj.submit_time);
768 assert_eq!(job.start_time, jj.start_time);
769 assert_eq!(job.end_time, jj.end_time);
770 assert_eq!(job.state.to_string(), jj.state.to_string());
771 assert_eq!(job.health.to_string(), jj.health.to_string());
772 assert_eq!(job.priority, jj.priority);
773 assert_eq!(job.definition, jj.definition);
774 assert_eq!(job.original_definition, jj.original_definition);
775 assert_eq!(job.multinode_definition, jj.multinode_definition);
776
777 assert_eq!(job.failure_tags.len(), jj.failure_tags.len());
778 for i in 0..job.failure_tags.len() {
779 assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
780 }
781 assert_eq!(job.failure_comment, jj.failure_comment);
782
783 seen.insert(job.id, job.clone());
784 }
785 assert_eq!(seen.len(), 50);
786 }
787
788 #[test(tokio::test)]
793 async fn test_jobs_builder() {
794 let mut server = lava_api_mock::LavaMock::new(
795 SharedState::new_populated(
796 PopulationParams::builder()
797 .tags(5usize)
798 .jobs(0usize)
799 .build(),
800 ),
801 PaginationLimits::builder().jobs(Some(7)).build(),
802 )
803 .await;
804
805 let base_date = DateTime::parse_from_rfc3339("2022-04-10T16:30:00+01:00")
806 .unwrap()
807 .with_timezone(&Utc);
808
809 let mut item_gen = Proxy::<lava_api_mock::Job<lava_api_mock::State>>::generator()
810 .tags(SubsetsFromPersianRug::new())
811 .health(Repeat!(
812 MockJobHealth::Complete,
813 MockJobHealth::Incomplete,
814 MockJobHealth::Canceled,
815 MockJobHealth::Unknown
816 ))
817 .state(Repeat!(
818 MockJobState::Submitted,
819 MockJobState::Scheduling,
820 MockJobState::Scheduled,
821 MockJobState::Running,
822 MockJobState::Canceling,
823 MockJobState::Finished
824 ))
825 .submit_time(GSome(Time::new(
826 base_date - Duration::minutes(1),
827 Duration::minutes(-1),
828 )))
829 .start_time(GSome(Time::new(base_date, Duration::minutes(-1))))
830 .end_time(GSome(Time::new(base_date, Duration::seconds(-30))));
831
832 let _ = GeneratorWithPersianRugMutIterator::new(&mut item_gen, server.state_mut())
833 .take(50)
834 .collect::<Vec<_>>();
835
836 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
837
838 let mut lj = lava.jobs().state(State::Running).query();
839
840 let mut count = 0;
841 while let Some(job) = lj.try_next().await.expect("failed to get job") {
842 assert_eq!(job.state, State::Running);
843 count += 1;
844 }
845 assert_eq!(count, 8);
846
847 let mut lj = lava.jobs().state_not(State::Canceling).query();
848 let mut count = 0;
849 while let Some(job) = lj.try_next().await.expect("failed to get job") {
850 assert_ne!(job.state, State::Canceling);
851 count += 1;
852 }
853 assert_eq!(count, 42);
854
855 let mut lj = lava.jobs().health(Health::Incomplete).query();
856
857 let mut count = 0;
858 while let Some(job) = lj.try_next().await.expect("failed to get job") {
859 assert_eq!(job.health, Health::Incomplete);
860 count += 1;
861 }
862 assert_eq!(count, 13);
863
864 let mut lj = lava.jobs().health_not(Health::Canceled).query();
865 let mut count = 0;
866 while let Some(job) = lj.try_next().await.expect("failed to get job") {
867 assert_ne!(job.health, Health::Canceled);
868 count += 1;
869 }
870 assert_eq!(count, 38);
871
872 let mut lj = lava.jobs().id_after(9i64).query();
873 let mut count = 0;
874 while let Some(job) = lj.try_next().await.expect("failed to get job") {
875 assert!(job.id > 9i64);
876 count += 1;
877 }
878 assert_eq!(count, 40);
879
880 let job_35_start = DateTime::parse_from_rfc3339("2022-04-10T15:55:00+01:00")
881 .unwrap()
882 .with_timezone(&Utc);
883
884 let mut lj = lava.jobs().started_after(job_35_start).query();
885 let mut count = 0;
886 while let Some(job) = lj.try_next().await.expect("failed to get job") {
887 assert!(job.start_time.is_some() && job.start_time.unwrap() > job_35_start);
888 count += 1;
889 }
890 assert_eq!(count, 35);
891
892 let job_19_submit = DateTime::parse_from_rfc3339("2022-04-10T16:10:00+01:00")
893 .unwrap()
894 .with_timezone(&Utc);
895
896 let mut lj = lava.jobs().submitted_after(job_19_submit).query();
897 let mut count = 0;
898 while let Some(job) = lj.try_next().await.expect("failed to get job") {
899 assert!(job.submit_time > job_19_submit);
900 count += 1;
901 }
902 assert_eq!(count, 19);
903
904 let job_25_end = DateTime::parse_from_rfc3339("2022-04-10T16:17:30+01:00")
905 .unwrap()
906 .with_timezone(&Utc);
907
908 let mut lj = lava.jobs().ended_after(job_25_end).query();
909 let mut count = 0;
910 while let Some(job) = lj.try_next().await.expect("failed to get job") {
911 assert!(job.end_time.is_some() && job.end_time.unwrap() > job_25_end);
912 count += 1;
913 }
914 assert_eq!(count, 25);
915
916 let mut lj = lava.jobs().ordering(Ordering::SubmitTime, false).query();
917 let mut count = 0;
918 let mut prev = None;
919 while let Some(job) = lj.try_next().await.expect("failed to get job") {
920 if let Some(dt) = prev {
921 assert!(job.submit_time < dt);
922 }
923 prev = Some(job.submit_time);
924 count += 1;
925 }
926 assert_eq!(count, 50);
927
928 let mut lj = lava.jobs().ordering(Ordering::SubmitTime, true).query();
929 let mut count = 0;
930 let mut prev = None;
931 while let Some(job) = lj.try_next().await.expect("failed to get job") {
932 if let Some(dt) = prev {
933 assert!(job.submit_time > dt);
934 }
935 prev = Some(job.submit_time);
936 count += 1;
937 }
938 assert_eq!(count, 50);
939
940 let mut lj = lava.jobs().ordering(Ordering::StartTime, false).query();
941 let mut count = 0;
942 let mut prev = None;
943 while let Some(job) = lj.try_next().await.expect("failed to get job") {
944 if let Some(dt) = prev {
945 assert!(job.start_time < dt);
946 }
947 prev = Some(job.start_time);
948 count += 1;
949 }
950 assert_eq!(count, 50);
951
952 let mut lj = lava.jobs().ordering(Ordering::StartTime, true).query();
953 let mut count = 0;
954 let mut prev = None;
955 while let Some(job) = lj.try_next().await.expect("failed to get job") {
956 if let Some(dt) = prev {
957 assert!(job.start_time > dt);
958 }
959 prev = Some(job.start_time);
960 count += 1;
961 }
962 assert_eq!(count, 50);
963 }
964
965 #[test(tokio::test)]
966 async fn test_junit() {
967 let pop = PopulationParams::builder()
968 .jobs(3usize)
969 .test_suites(6usize)
970 .test_cases(20usize)
971 .build();
972 let state = SharedState::new_populated(pop);
973 let server = LavaMock::new(
974 state.clone(),
975 PaginationLimits::builder().test_cases(Some(6)).build(),
976 )
977 .await;
978
979 let mut map = BTreeMap::new();
980 let start = state.access();
981 for t in start.get_iter::<lava_api_mock::TestCase<lava_api_mock::State>>() {
982 map.insert(t.name.clone(), t.clone());
983 }
984
985 let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
986 let mut seen = BTreeSet::new();
987
988 for job in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
989 let mut v = Vec::new();
990 lava.job_results_as_junit(job.id)
991 .await
992 .expect("failed to obtain junit output")
993 .map_err(std::io::Error::other)
994 .into_async_read()
995 .read_to_end(&mut v)
996 .await
997 .expect("failed to fully read junit output");
998
999 let report = junit_parser::from_reader(std::io::Cursor::new(v))
1000 .expect("failed to parse mock junit output");
1001
1002 for suite in report.suites.iter() {
1003 for test in suite.cases.iter() {
1004 assert!(!seen.contains(&test.name));
1005 assert!(map.contains_key(&test.name));
1006 let tt = map.get(&test.name).unwrap();
1007 match tt.result {
1008 PassFail::Pass => {
1009 assert!(test.status.is_success());
1010 }
1011 PassFail::Fail => {
1012 assert!(test.status.is_failure());
1013 }
1014 PassFail::Skip => {
1015 assert!(test.status.is_skipped());
1016 }
1017 PassFail::Unknown => {
1018 assert!(test.status.is_error());
1019 }
1020 }
1021 seen.insert(test.name.clone());
1022 }
1023 }
1024 }
1025 assert_eq!(seen.len(), 60);
1026 }
1027}