Skip to main content

lava_api/
job.rs

1//! Retrieve jobs
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use futures::future::BoxFuture;
6use futures::stream::{self, Stream, StreamExt};
7use futures::{FutureExt, TryStreamExt};
8use reqwest::StatusCode;
9use serde::{Deserialize, Serialize};
10use serde_with::DeserializeFromStr;
11use std::fmt;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
15use thiserror::Error;
16
17use crate::Lava;
18use crate::paginator::{PaginationError, Paginator};
19use crate::queryset::{QuerySet, QuerySetMember};
20use crate::tag::Tag;
21
22/// The progress of a job through the system.
23#[derive(
24    Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter, Display, EnumString, DeserializeFromStr,
25)]
26pub enum State {
27    Submitted,
28    Scheduling,
29    Scheduled,
30    Running,
31    Canceling,
32    Finished,
33}
34
35impl QuerySetMember for State {
36    type Iter = StateIter;
37    fn all() -> Self::Iter {
38        Self::iter()
39    }
40}
41
42/// The completion state of a job.
43#[derive(
44    Copy, Clone, Debug, PartialEq, Eq, Hash, EnumIter, EnumString, Display, DeserializeFromStr,
45)]
46pub enum Health {
47    /// Unknown is the usual state before the job has finished.
48    Unknown,
49    /// Complete is used as the success state in general.
50    Complete,
51    /// Incomplete is used as the error state.
52    Incomplete,
53    Canceled,
54}
55
56impl QuerySetMember for Health {
57    type Iter = HealthIter;
58    fn all() -> Self::Iter {
59        Self::iter()
60    }
61}
62
63/// The possible orderings in which jobs can be returned
64///
65/// These are usually combined with a [`bool`] in use, indicating
66/// whether the order is to be ascending or descending.
67#[derive(Debug, Clone)]
68pub enum Ordering {
69    Id,
70    StartTime,
71    EndTime,
72    SubmitTime,
73}
74
75impl fmt::Display for Ordering {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        match self {
78            Ordering::Id => write!(f, "id"),
79            Ordering::StartTime => write!(f, "start_time"),
80            Ordering::EndTime => write!(f, "end_time"),
81            Ordering::SubmitTime => write!(f, "submit_time"),
82        }
83    }
84}
85
86#[derive(Clone, Deserialize, Debug)]
87struct LavaJob {
88    id: i64,
89    submitter: String,
90    viewing_groups: Vec<i64>,
91    description: String,
92    health_check: bool,
93    requested_device_type: Option<String>,
94    tags: Vec<u32>,
95    actual_device: Option<String>,
96    submit_time: DateTime<Utc>,
97    start_time: Option<DateTime<Utc>>,
98    end_time: Option<DateTime<Utc>>,
99    state: State,
100    health: Health,
101    priority: i64,
102    definition: String,
103    original_definition: String,
104    multinode_definition: String,
105    failure_tags: Vec<u32>,
106    failure_comment: Option<String>,
107}
108
109/// The data available for a job from the LAVA API
110///
111/// Note that [`tags`](Job::tags) have been resolved into [`Tag`]
112/// objects, rather than tag ids, but that
113/// [`viewing_groups`](Job::viewing_groups) and
114/// [`failure_tags`](Job::failure_tags) have not.
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct Job {
117    pub id: i64,
118    pub submitter: String,
119    pub viewing_groups: Vec<i64>,
120    pub description: String,
121    pub health_check: bool,
122    pub requested_device_type: Option<String>,
123    pub tags: Vec<Tag>,
124    pub actual_device: Option<String>,
125    pub submit_time: DateTime<Utc>,
126    pub start_time: Option<DateTime<Utc>>,
127    pub end_time: Option<DateTime<Utc>>,
128    pub state: State,
129    pub health: Health,
130    pub priority: i64,
131    pub definition: String,
132    pub original_definition: String,
133    pub multinode_definition: String,
134    pub failure_tags: Vec<Tag>,
135    pub failure_comment: Option<String>,
136}
137
138enum PagingState<'a> {
139    Paging,
140    Transforming(BoxFuture<'a, Job>),
141}
142
143/// A [`Stream`] that yields a selected subset of the [`Job`]
144/// instances on a LAVA server.
145///
146/// These are constructed using a [`JobsBuilder`]; there is no `new`
147/// method on this struct.
148pub struct Jobs<'a> {
149    lava: &'a Lava,
150    paginator: Paginator<LavaJob>,
151    state: PagingState<'a>,
152}
153
154impl Jobs<'_> {
155    /// The server's latest report of how many [`Job`] instances are
156    /// in the result set.
157    ///
158    /// Note that this is not the total number of instances, only the
159    /// total number matching the query. Also, note that the number is
160    /// subject to change as the stream is read, owing to pagination;
161    /// this number will always be the number of results most recently
162    /// reported, and can be an over- or under-estimate by the time
163    /// the stream is drained.
164    pub fn reported_items(&self) -> Option<u32> {
165        self.paginator.reported_items()
166    }
167}
168
169/// Select a set of [`Job`] instances to return from the LAVA server.
170///
171/// This is the way to construct a [`Jobs`] object, which can stream
172/// the actual data. It allows customisation of which jobs to return,
173/// and in what order.
174///
175/// Example:
176/// ```rust
177/// use futures::stream::TryStreamExt;
178/// # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState};
179/// use lava_api::{Lava, job::State, job::Ordering};
180/// #
181/// # tokio_test::block_on( async {
182/// # let limits = PaginationLimits::new();
183/// # let population = PopulationParams::new();
184/// # let mock = LavaMock::new(SharedState::new_populated(population), limits).await;
185/// # let service_uri = mock.uri();
186/// # let lava_token = None;
187///
188/// let lava = Lava::new(&service_uri, lava_token).expect("failed to make lava");
189///
190/// let mut lj = lava
191///     .jobs()
192///     .state(State::Submitted)
193///     .ordering(Ordering::StartTime, true)
194///     .query();
195///
196/// while let Some(job) = lj
197///     .try_next()
198///     .await
199///     .expect("failed to get job")
200/// {
201///     println!("Got job {:?}", job);
202/// }
203/// # });
204/// ```
205#[derive(Debug, Clone)]
206pub struct JobsBuilder<'a> {
207    lava: &'a Lava,
208    states: QuerySet<State>,
209    healths: QuerySet<Health>,
210    limit: Option<u32>,
211    ordering: Ordering,
212    ids: Vec<i64>,
213    id_after: Option<i64>,
214    started_after: Option<DateTime<Utc>>,
215    submitted_after: Option<DateTime<Utc>>,
216    ended_after: Option<DateTime<Utc>>,
217    ascending: bool,
218}
219
220impl<'a> JobsBuilder<'a> {
221    /// Create a new [`JobsBuilder`]
222    ///
223    /// The default query is:
224    /// - order by [`Ordering::Id`]
225    /// - no filtering
226    /// - default result pagination
227    pub fn new(lava: &'a Lava) -> Self {
228        Self {
229            lava,
230            states: QuerySet::new(String::from("state")),
231            healths: QuerySet::new(String::from("health")),
232            limit: None,
233            ordering: Ordering::Id,
234            ids: Vec::new(),
235            id_after: None,
236            started_after: None,
237            submitted_after: None,
238            ended_after: None,
239            ascending: true,
240        }
241    }
242
243    /// Return jobs in this state.
244    pub fn state(mut self, state: State) -> Self {
245        self.states.include(state);
246        self
247    }
248
249    /// Exclude jobs in this state.
250    pub fn state_not(mut self, state: State) -> Self {
251        self.states.exclude(&state);
252        self
253    }
254
255    /// Set the number of jobs retrieved at a time while the query is
256    /// running. The query will be processed transparently as a
257    /// sequence of requests that return all matching responses. This
258    /// setting governs the size of each of the (otherwise
259    /// transparent) requests, so this number is really a page size.
260    ///
261    /// Note that you will see artifacts on queries that are split
262    /// into many requests, especially when responses are slow. This
263    /// makes setting the limit much smaller than the response size
264    /// unattractive when accurate data is required. However, the
265    /// server will need to return records in chunks of this size,
266    /// regardless of how many are consumed from the response stream,
267    /// which makes setting the limit much higher than the response
268    /// size wasteful. In practice, it is probably best to set this
269    /// limit to the expected response size for most use cases.
270    ///
271    /// Artifacts occur when paging occurs, because paging is entirely
272    /// client side. Each page contains a section of the query
273    /// begining with the job at some multiple of the limit count into
274    /// the result set.  However the result set is evolving while the
275    /// paging is occurring, and this is not currently compensated
276    /// for, which leads to jobs being returned multiple times at the
277    /// boundaries between pages - or even omitted depending on the
278    /// query. In general, query sets that can shrink are not safe to
279    /// use with paging, because results can be lost rather than
280    /// duplicated.
281    pub fn limit(mut self, limit: u32) -> Self {
282        self.limit = Some(limit);
283        self
284    }
285
286    /// Return jobs with this health.
287    pub fn health(mut self, health: Health) -> Self {
288        self.healths.include(health);
289        self
290    }
291
292    /// Exclude jobs with this health.
293    pub fn health_not(mut self, health: Health) -> Self {
294        self.healths.exclude(&health);
295        self
296    }
297
298    /// Return only jobs whose id is `id`.
299    pub fn id(mut self, id: i64) -> Self {
300        self.ids.push(id);
301        self
302    }
303
304    /// Return only jobs whose id is strictly greater than `id`.
305    pub fn id_after(mut self, id: i64) -> Self {
306        self.id_after = Some(id);
307        self
308    }
309
310    /// Return only jobs whose start time is strictly after the given
311    /// instant.
312    pub fn started_after(mut self, when: chrono::DateTime<Utc>) -> Self {
313        self.started_after = Some(when);
314        self
315    }
316
317    /// Return only jobs whose submission time is strictly after the
318    /// given instant.
319    pub fn submitted_after(mut self, when: chrono::DateTime<Utc>) -> Self {
320        self.submitted_after = Some(when);
321        self
322    }
323
324    /// Return only jobs which ended strictly after the given instant.
325    pub fn ended_after(mut self, when: chrono::DateTime<Utc>) -> Self {
326        self.ended_after = Some(when);
327        self
328    }
329
330    /// Order returned jobs by the given key.
331    pub fn ordering(mut self, ordering: Ordering, ascending: bool) -> Self {
332        self.ordering = ordering;
333        self.ascending = ascending;
334        self
335    }
336
337    /// Begin querying for jobs, returning a [`Jobs`] instance
338    pub fn query(self) -> Jobs<'a> {
339        let mut url = self
340            .lava
341            .base
342            .join("jobs/")
343            .expect("Failed to append to base url");
344        url.query_pairs_mut().append_pair(
345            "ordering",
346            &format!(
347                "{}{}",
348                match self.ascending {
349                    true => "",
350                    false => "-",
351                },
352                self.ordering
353            ),
354        );
355        if let Some(pair) = self.states.query() {
356            url.query_pairs_mut().append_pair(&pair.0, &pair.1);
357        }
358        if let Some(limit) = self.limit {
359            url.query_pairs_mut()
360                .append_pair("limit", &limit.to_string());
361        };
362        if let Some(pair) = self.healths.query() {
363            url.query_pairs_mut().append_pair(&pair.0, &pair.1);
364        }
365
366        match self.ids.len() {
367            0 => (),
368            1 => {
369                url.query_pairs_mut()
370                    .append_pair("id", &self.ids[0].to_string());
371            }
372            _ => {
373                let ids: Vec<_> = self.ids.iter().map(i64::to_string).collect();
374                url.query_pairs_mut().append_pair("id__in", &ids.join(","));
375            }
376        }
377
378        if let Some(id_after) = self.id_after {
379            url.query_pairs_mut()
380                .append_pair("id__gt", &id_after.to_string());
381        };
382        if let Some(started_after) = self.started_after {
383            url.query_pairs_mut()
384                .append_pair("start_time__gt", &started_after.to_rfc3339());
385        };
386        if let Some(submitted_after) = self.submitted_after {
387            url.query_pairs_mut()
388                .append_pair("submit_time__gt", &submitted_after.to_rfc3339());
389        };
390        if let Some(ended_after) = self.ended_after {
391            url.query_pairs_mut()
392                .append_pair("end_time__gt", &ended_after.to_rfc3339());
393        };
394
395        let paginator = Paginator::new(self.lava.client.clone(), url);
396        Jobs {
397            lava: self.lava,
398            paginator,
399            state: PagingState::Paging,
400        }
401    }
402}
403
404async fn transform_job(job: LavaJob, lava: &Lava) -> Job {
405    let t = stream::iter(job.tags.iter());
406    let tags = t
407        .filter_map(|i| async move { lava.tag(*i).await })
408        .collect()
409        .await;
410
411    let t = stream::iter(job.failure_tags.iter());
412    let failure_tags = t
413        .filter_map(|i| async move { lava.tag(*i).await })
414        .collect()
415        .await;
416
417    Job {
418        id: job.id,
419        submitter: job.submitter,
420        viewing_groups: job.viewing_groups,
421        description: job.description,
422        health_check: job.health_check,
423        requested_device_type: job.requested_device_type,
424        tags,
425        actual_device: job.actual_device,
426        submit_time: job.submit_time,
427        start_time: job.start_time,
428        end_time: job.end_time,
429        state: job.state,
430        health: job.health,
431        priority: job.priority,
432        definition: job.definition,
433        original_definition: job.original_definition,
434        multinode_definition: job.multinode_definition,
435        failure_tags,
436        failure_comment: job.failure_comment,
437    }
438}
439
440impl Stream for Jobs<'_> {
441    type Item = Result<Job, PaginationError>;
442
443    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444        let me = self.get_mut();
445
446        loop {
447            return match &mut me.state {
448                PagingState::Paging => {
449                    let p = Pin::new(&mut me.paginator);
450                    match p.poll_next(cx) {
451                        Poll::Ready(None) => Poll::Ready(None),
452                        Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
453                        Poll::Ready(Some(Ok(d))) => {
454                            me.state = PagingState::Transforming(transform_job(d, me.lava).boxed());
455                            continue;
456                        }
457                        Poll::Pending => Poll::Pending,
458                    }
459                }
460                PagingState::Transforming(fut) => match fut.as_mut().poll(cx) {
461                    Poll::Ready(d) => {
462                        me.state = PagingState::Paging;
463                        Poll::Ready(Some(Ok(d)))
464                    }
465                    Poll::Pending => Poll::Pending,
466                },
467            };
468        }
469    }
470}
471
472#[derive(Error, Debug)]
473pub enum SubmissionError {
474    #[error("Request failed {0}")]
475    Request(#[from] reqwest::Error),
476    #[error("Invalid job: {0}")]
477    InvalidJob(String),
478    #[error("Unexpected reply: {0}")]
479    UnexpectedReply(reqwest::StatusCode),
480}
481
482#[derive(Debug, Serialize)]
483struct Submission<'a> {
484    definition: &'a str,
485}
486
487#[derive(Debug, Deserialize)]
488struct SubmissionReply {
489    message: String,
490    #[serde(default)]
491    job_ids: Vec<i64>,
492}
493
494pub async fn submit_job(lava: &Lava, definition: &str) -> Result<Vec<i64>, SubmissionError> {
495    let url = lava
496        .base
497        .join("jobs/")
498        .expect("Failed to append to base url");
499    let sub = Submission { definition };
500
501    let post = lava.client.post(url).json(&sub).send().await?;
502
503    match post.status() {
504        StatusCode::CREATED => {
505            let reply: SubmissionReply = post.json().await?;
506            Ok(reply.job_ids)
507        }
508        StatusCode::BAD_REQUEST => {
509            let reply: SubmissionReply = post.json().await?;
510            Err(SubmissionError::InvalidJob(reply.message))
511        }
512        s => Err(SubmissionError::UnexpectedReply(s)),
513    }
514}
515
516#[derive(Error, Debug)]
517pub enum CancellationError {
518    #[error("Request failed {0}")]
519    Request(#[from] reqwest::Error),
520    #[error("Unexpected reply: {0}")]
521    UnexpectedReply(reqwest::StatusCode),
522}
523
524pub async fn cancel_job(lava: &Lava, id: i64) -> Result<(), CancellationError> {
525    let mut url = lava.base.clone();
526    url.path_segments_mut()
527        .unwrap()
528        .pop_if_empty()
529        .push("jobs")
530        .push(&id.to_string())
531        .push("cancel")
532        .push("");
533
534    let res = lava.client.get(url).send().await?;
535
536    match res.status() {
537        StatusCode::OK => Ok(()),
538        s => Err(CancellationError::UnexpectedReply(s)),
539    }
540}
541
542#[derive(Error, Debug)]
543pub enum ResultsError {
544    #[error("Request failed {0}")]
545    Request(#[from] reqwest::Error),
546    #[error("Unexpected reply: {0}")]
547    UnexpectedReply(reqwest::StatusCode),
548}
549
550pub async fn job_results_as_junit(
551    lava: &Lava,
552    id: i64,
553) -> Result<impl Stream<Item = Result<Bytes, ResultsError>> + Send + Unpin + '_, ResultsError> {
554    let mut url = lava.base.clone();
555    url.path_segments_mut()
556        .unwrap()
557        .pop_if_empty()
558        .push("jobs")
559        .push(&id.to_string())
560        .push("junit")
561        .push("");
562
563    let res = lava.client.get(url).send().await?;
564    match res.status() {
565        StatusCode::OK => Ok(res.bytes_stream().map_err(ResultsError::from)),
566        s => Err(ResultsError::UnexpectedReply(s)),
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::{Health, Job, Ordering, State, Tag};
573    use crate::Lava;
574
575    use boulder::{
576        Buildable, Builder, GeneratableWithPersianRug, GeneratorWithPersianRugMutIterator, Repeat,
577        Some as GSome, SubsetsFromPersianRug, Time,
578    };
579    use chrono::{DateTime, Duration, Utc};
580    use futures::{AsyncReadExt, TryStreamExt};
581    use lava_api_mock::{
582        Device as MockDevice, DeviceType as MockDeviceType, Group as MockGroup, Job as MockJob,
583        JobHealth as MockJobHealth, JobState as MockJobState, LavaMock, PaginationLimits, PassFail,
584        PopulationParams, SharedState, Tag as MockTag, User as MockUser,
585    };
586    use persian_rug::{Accessor, Context, Proxy};
587    use std::collections::{BTreeMap, BTreeSet};
588    use std::str::FromStr;
589    use test_log::test;
590
591    impl Job {
592        #[persian_rug::constraints(
593            context = C,
594            access(
595                MockUser<C>,
596                MockGroup<C>,
597                MockTag<C>,
598                MockDevice<C>,
599                MockDeviceType<C>
600            )
601        )]
602        pub fn from_mock<'b, B, C>(job: &MockJob<C>, context: B) -> Job
603        where
604            B: 'b + Accessor<Context = C>,
605            C: Context + 'static,
606        {
607            Self {
608                id: job.id,
609                submitter: context.get(&job.submitter).username.clone(),
610                viewing_groups: job
611                    .viewing_groups
612                    .iter()
613                    .map(|g| context.get(g).id)
614                    .collect::<Vec<_>>(),
615                description: job.description.clone(),
616                health_check: job.health_check,
617                requested_device_type: job
618                    .requested_device_type
619                    .map(|d| context.get(&d).name.to_string()),
620                tags: job
621                    .tags
622                    .iter()
623                    .map(|t| Tag::from_mock(context.get(t), context.clone()))
624                    .collect::<Vec<_>>(),
625                actual_device: job
626                    .actual_device
627                    .as_ref()
628                    .map(|d| context.get(d).hostname.to_string()),
629                submit_time: job.submit_time.unwrap(),
630                start_time: job.start_time,
631                end_time: job.end_time,
632                state: job.state.into(),
633                health: job.health.into(),
634                priority: job.priority,
635                definition: job.definition.clone(),
636                original_definition: job.original_definition.clone(),
637                multinode_definition: job.multinode_definition.clone(),
638                failure_tags: job
639                    .failure_tags
640                    .iter()
641                    .map(|t| Tag::from_mock(context.get(t), context.clone()))
642                    .collect::<Vec<_>>(),
643                failure_comment: job.failure_comment.clone(),
644            }
645        }
646    }
647
648    impl From<MockJobState> for State {
649        fn from(state: MockJobState) -> State {
650            use State::*;
651
652            match state {
653                MockJobState::Submitted => Submitted,
654                MockJobState::Scheduling => Scheduling,
655                MockJobState::Scheduled => Scheduled,
656                MockJobState::Running => Running,
657                MockJobState::Canceling => Canceling,
658                MockJobState::Finished => Finished,
659            }
660        }
661    }
662
663    impl From<MockJobHealth> for Health {
664        fn from(health: MockJobHealth) -> Health {
665            use Health::*;
666
667            match health {
668                MockJobHealth::Unknown => Unknown,
669                MockJobHealth::Complete => Complete,
670                MockJobHealth::Incomplete => Incomplete,
671                MockJobHealth::Canceled => Canceled,
672            }
673        }
674    }
675
676    #[test]
677    fn test_display() {
678        assert_eq!(State::Submitted.to_string(), "Submitted");
679        assert_eq!(State::Scheduling.to_string(), "Scheduling");
680        assert_eq!(State::Scheduled.to_string(), "Scheduled");
681        assert_eq!(State::Running.to_string(), "Running");
682        assert_eq!(State::Canceling.to_string(), "Canceling");
683        assert_eq!(State::Finished.to_string(), "Finished");
684
685        assert_eq!(Health::Unknown.to_string(), "Unknown");
686        assert_eq!(Health::Complete.to_string(), "Complete");
687        assert_eq!(Health::Incomplete.to_string(), "Incomplete");
688        assert_eq!(Health::Canceled.to_string(), "Canceled");
689    }
690
691    #[test]
692    fn test_from_str() {
693        assert_eq!(Ok(State::Submitted), State::from_str("Submitted"));
694        assert_eq!(Ok(State::Scheduling), State::from_str("Scheduling"));
695        assert_eq!(Ok(State::Scheduled), State::from_str("Scheduled"));
696        assert_eq!(Ok(State::Running), State::from_str("Running"));
697        assert_eq!(Ok(State::Canceling), State::from_str("Canceling"));
698        assert_eq!(Ok(State::Finished), State::from_str("Finished"));
699        assert_eq!(
700            Err(strum::ParseError::VariantNotFound),
701            State::from_str("womble")
702        );
703
704        assert_eq!(Ok(Health::Unknown), Health::from_str("Unknown"));
705        assert_eq!(Ok(Health::Complete), Health::from_str("Complete"));
706        assert_eq!(Ok(Health::Incomplete), Health::from_str("Incomplete"));
707        assert_eq!(Ok(Health::Canceled), Health::from_str("Canceled"));
708        assert_eq!(
709            Err(strum::ParseError::VariantNotFound),
710            Health::from_str("")
711        );
712    }
713
714    /// Stream 50 jobs with a page limit of 7 from the server
715    /// checking that we correctly reconstruct their tags and that
716    /// they are all accounted for (that pagination is handled
717    /// properly)
718    #[test(tokio::test)]
719    async fn test_basic() {
720        let state = SharedState::new_populated(PopulationParams::builder().jobs(50usize).build());
721        let server = LavaMock::new(
722            state.clone(),
723            PaginationLimits::builder().jobs(Some(7)).build(),
724        )
725        .await;
726
727        let mut map = BTreeMap::new();
728        let start = state.access();
729        for j in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
730            map.insert(j.id, j);
731        }
732
733        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
734
735        let mut lj = lava.jobs().query();
736
737        let mut seen = BTreeMap::new();
738        while let Some(job) = lj.try_next().await.expect("failed to get job") {
739            assert!(!seen.contains_key(&job.id));
740            assert!(map.contains_key(&job.id));
741            let jj = map.get(&job.id).unwrap();
742            assert_eq!(job.submitter, start.get(&jj.submitter).username);
743            assert_eq!(job.viewing_groups.len(), jj.viewing_groups.len());
744            for i in 0..job.viewing_groups.len() {
745                assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
746            }
747            assert_eq!(job.description, jj.description);
748            assert_eq!(job.health_check, jj.health_check);
749            assert_eq!(
750                job.requested_device_type.as_ref(),
751                jj.requested_device_type
752                    .as_ref()
753                    .map(|t| &start.get(t).name)
754            );
755
756            assert_eq!(job.tags.len(), jj.tags.len());
757            for i in 0..job.tags.len() {
758                assert_eq!(job.tags[i].id, start.get(&jj.tags[i]).id);
759                assert_eq!(job.tags[i].name, start.get(&jj.tags[i]).name);
760                assert_eq!(job.tags[i].description, start.get(&jj.tags[i]).description);
761            }
762
763            assert_eq!(
764                job.actual_device.as_ref(),
765                jj.actual_device.as_ref().map(|t| &start.get(t).hostname)
766            );
767            assert_eq!(Some(job.submit_time), jj.submit_time);
768            assert_eq!(job.start_time, jj.start_time);
769            assert_eq!(job.end_time, jj.end_time);
770            assert_eq!(job.state.to_string(), jj.state.to_string());
771            assert_eq!(job.health.to_string(), jj.health.to_string());
772            assert_eq!(job.priority, jj.priority);
773            assert_eq!(job.definition, jj.definition);
774            assert_eq!(job.original_definition, jj.original_definition);
775            assert_eq!(job.multinode_definition, jj.multinode_definition);
776
777            assert_eq!(job.failure_tags.len(), jj.failure_tags.len());
778            for i in 0..job.failure_tags.len() {
779                assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
780            }
781            assert_eq!(job.failure_comment, jj.failure_comment);
782
783            seen.insert(job.id, job.clone());
784        }
785        assert_eq!(seen.len(), 50);
786    }
787
788    /// Stream 50 jobs with a page limit of 7 from the server
789    /// checking that we correctly reconstruct their tags and that
790    /// they are all accounted for (that pagination is handled
791    /// properly)
792    #[test(tokio::test)]
793    async fn test_jobs_builder() {
794        let mut server = lava_api_mock::LavaMock::new(
795            SharedState::new_populated(
796                PopulationParams::builder()
797                    .tags(5usize)
798                    .jobs(0usize)
799                    .build(),
800            ),
801            PaginationLimits::builder().jobs(Some(7)).build(),
802        )
803        .await;
804
805        let base_date = DateTime::parse_from_rfc3339("2022-04-10T16:30:00+01:00")
806            .unwrap()
807            .with_timezone(&Utc);
808
809        let mut item_gen = Proxy::<lava_api_mock::Job<lava_api_mock::State>>::generator()
810            .tags(SubsetsFromPersianRug::new())
811            .health(Repeat!(
812                MockJobHealth::Complete,
813                MockJobHealth::Incomplete,
814                MockJobHealth::Canceled,
815                MockJobHealth::Unknown
816            ))
817            .state(Repeat!(
818                MockJobState::Submitted,
819                MockJobState::Scheduling,
820                MockJobState::Scheduled,
821                MockJobState::Running,
822                MockJobState::Canceling,
823                MockJobState::Finished
824            ))
825            .submit_time(GSome(Time::new(
826                base_date - Duration::minutes(1),
827                Duration::minutes(-1),
828            )))
829            .start_time(GSome(Time::new(base_date, Duration::minutes(-1))))
830            .end_time(GSome(Time::new(base_date, Duration::seconds(-30))));
831
832        let _ = GeneratorWithPersianRugMutIterator::new(&mut item_gen, server.state_mut())
833            .take(50)
834            .collect::<Vec<_>>();
835
836        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
837
838        let mut lj = lava.jobs().state(State::Running).query();
839
840        let mut count = 0;
841        while let Some(job) = lj.try_next().await.expect("failed to get job") {
842            assert_eq!(job.state, State::Running);
843            count += 1;
844        }
845        assert_eq!(count, 8);
846
847        let mut lj = lava.jobs().state_not(State::Canceling).query();
848        let mut count = 0;
849        while let Some(job) = lj.try_next().await.expect("failed to get job") {
850            assert_ne!(job.state, State::Canceling);
851            count += 1;
852        }
853        assert_eq!(count, 42);
854
855        let mut lj = lava.jobs().health(Health::Incomplete).query();
856
857        let mut count = 0;
858        while let Some(job) = lj.try_next().await.expect("failed to get job") {
859            assert_eq!(job.health, Health::Incomplete);
860            count += 1;
861        }
862        assert_eq!(count, 13);
863
864        let mut lj = lava.jobs().health_not(Health::Canceled).query();
865        let mut count = 0;
866        while let Some(job) = lj.try_next().await.expect("failed to get job") {
867            assert_ne!(job.health, Health::Canceled);
868            count += 1;
869        }
870        assert_eq!(count, 38);
871
872        let mut lj = lava.jobs().id_after(9i64).query();
873        let mut count = 0;
874        while let Some(job) = lj.try_next().await.expect("failed to get job") {
875            assert!(job.id > 9i64);
876            count += 1;
877        }
878        assert_eq!(count, 40);
879
880        let job_35_start = DateTime::parse_from_rfc3339("2022-04-10T15:55:00+01:00")
881            .unwrap()
882            .with_timezone(&Utc);
883
884        let mut lj = lava.jobs().started_after(job_35_start).query();
885        let mut count = 0;
886        while let Some(job) = lj.try_next().await.expect("failed to get job") {
887            assert!(job.start_time.is_some() && job.start_time.unwrap() > job_35_start);
888            count += 1;
889        }
890        assert_eq!(count, 35);
891
892        let job_19_submit = DateTime::parse_from_rfc3339("2022-04-10T16:10:00+01:00")
893            .unwrap()
894            .with_timezone(&Utc);
895
896        let mut lj = lava.jobs().submitted_after(job_19_submit).query();
897        let mut count = 0;
898        while let Some(job) = lj.try_next().await.expect("failed to get job") {
899            assert!(job.submit_time > job_19_submit);
900            count += 1;
901        }
902        assert_eq!(count, 19);
903
904        let job_25_end = DateTime::parse_from_rfc3339("2022-04-10T16:17:30+01:00")
905            .unwrap()
906            .with_timezone(&Utc);
907
908        let mut lj = lava.jobs().ended_after(job_25_end).query();
909        let mut count = 0;
910        while let Some(job) = lj.try_next().await.expect("failed to get job") {
911            assert!(job.end_time.is_some() && job.end_time.unwrap() > job_25_end);
912            count += 1;
913        }
914        assert_eq!(count, 25);
915
916        let mut lj = lava.jobs().ordering(Ordering::SubmitTime, false).query();
917        let mut count = 0;
918        let mut prev = None;
919        while let Some(job) = lj.try_next().await.expect("failed to get job") {
920            if let Some(dt) = prev {
921                assert!(job.submit_time < dt);
922            }
923            prev = Some(job.submit_time);
924            count += 1;
925        }
926        assert_eq!(count, 50);
927
928        let mut lj = lava.jobs().ordering(Ordering::SubmitTime, true).query();
929        let mut count = 0;
930        let mut prev = None;
931        while let Some(job) = lj.try_next().await.expect("failed to get job") {
932            if let Some(dt) = prev {
933                assert!(job.submit_time > dt);
934            }
935            prev = Some(job.submit_time);
936            count += 1;
937        }
938        assert_eq!(count, 50);
939
940        let mut lj = lava.jobs().ordering(Ordering::StartTime, false).query();
941        let mut count = 0;
942        let mut prev = None;
943        while let Some(job) = lj.try_next().await.expect("failed to get job") {
944            if let Some(dt) = prev {
945                assert!(job.start_time < dt);
946            }
947            prev = Some(job.start_time);
948            count += 1;
949        }
950        assert_eq!(count, 50);
951
952        let mut lj = lava.jobs().ordering(Ordering::StartTime, true).query();
953        let mut count = 0;
954        let mut prev = None;
955        while let Some(job) = lj.try_next().await.expect("failed to get job") {
956            if let Some(dt) = prev {
957                assert!(job.start_time > dt);
958            }
959            prev = Some(job.start_time);
960            count += 1;
961        }
962        assert_eq!(count, 50);
963    }
964
965    #[test(tokio::test)]
966    async fn test_junit() {
967        let pop = PopulationParams::builder()
968            .jobs(3usize)
969            .test_suites(6usize)
970            .test_cases(20usize)
971            .build();
972        let state = SharedState::new_populated(pop);
973        let server = LavaMock::new(
974            state.clone(),
975            PaginationLimits::builder().test_cases(Some(6)).build(),
976        )
977        .await;
978
979        let mut map = BTreeMap::new();
980        let start = state.access();
981        for t in start.get_iter::<lava_api_mock::TestCase<lava_api_mock::State>>() {
982            map.insert(t.name.clone(), t.clone());
983        }
984
985        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
986        let mut seen = BTreeSet::new();
987
988        for job in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
989            let mut v = Vec::new();
990            lava.job_results_as_junit(job.id)
991                .await
992                .expect("failed to obtain junit output")
993                .map_err(std::io::Error::other)
994                .into_async_read()
995                .read_to_end(&mut v)
996                .await
997                .expect("failed to fully read junit output");
998
999            let report = junit_parser::from_reader(std::io::Cursor::new(v))
1000                .expect("failed to parse mock junit output");
1001
1002            for suite in report.suites.iter() {
1003                for test in suite.cases.iter() {
1004                    assert!(!seen.contains(&test.name));
1005                    assert!(map.contains_key(&test.name));
1006                    let tt = map.get(&test.name).unwrap();
1007                    match tt.result {
1008                        PassFail::Pass => {
1009                            assert!(test.status.is_success());
1010                        }
1011                        PassFail::Fail => {
1012                            assert!(test.status.is_failure());
1013                        }
1014                        PassFail::Skip => {
1015                            assert!(test.status.is_skipped());
1016                        }
1017                        PassFail::Unknown => {
1018                            assert!(test.status.is_error());
1019                        }
1020                    }
1021                    seen.insert(test.name.clone());
1022                }
1023            }
1024        }
1025        assert_eq!(seen.len(), 60);
1026    }
1027}