lava_api/
job.rs

1//! Retrieve jobs
2
3use bytes::Bytes;
4use chrono::{DateTime, Utc};
5use futures::future::BoxFuture;
6use futures::stream::{self, Stream, StreamExt};
7use futures::{FutureExt, TryStreamExt};
8use reqwest::StatusCode;
9use serde::{Deserialize, Serialize};
10use serde_with::DeserializeFromStr;
11use std::fmt;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use strum::{Display, EnumIter, EnumString, IntoEnumIterator};
15use thiserror::Error;
16
17use crate::paginator::{PaginationError, Paginator};
18use crate::queryset::{QuerySet, QuerySetMember};
19use crate::tag::Tag;
20use crate::Lava;
21
22/// The progress of a job through the system.
23#[derive(
24    Copy, Clone, Debug, Hash, PartialEq, Eq, EnumIter, Display, EnumString, DeserializeFromStr,
25)]
26pub enum State {
27    Submitted,
28    Scheduling,
29    Scheduled,
30    Running,
31    Canceling,
32    Finished,
33}
34
35impl QuerySetMember for State {
36    type Iter = StateIter;
37    fn all() -> Self::Iter {
38        Self::iter()
39    }
40}
41
42/// The completion state of a job.
43#[derive(
44    Copy, Clone, Debug, PartialEq, Eq, Hash, EnumIter, EnumString, Display, DeserializeFromStr,
45)]
46pub enum Health {
47    /// Unknown is the usual state before the job has finished.
48    Unknown,
49    /// Complete is used as the success state in general.
50    Complete,
51    /// Incomplete is used as the error state.
52    Incomplete,
53    Canceled,
54}
55
56impl QuerySetMember for Health {
57    type Iter = HealthIter;
58    fn all() -> Self::Iter {
59        Self::iter()
60    }
61}
62
63/// The possible orderings in which jobs can be returned
64///
65/// These are usually combined with a [`bool`] in use, indicating
66/// whether the order is to be ascending or descending.
67#[derive(Debug, Clone)]
68pub enum Ordering {
69    Id,
70    StartTime,
71    EndTime,
72    SubmitTime,
73}
74
75impl fmt::Display for Ordering {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        match self {
78            Ordering::Id => write!(f, "id"),
79            Ordering::StartTime => write!(f, "start_time"),
80            Ordering::EndTime => write!(f, "end_time"),
81            Ordering::SubmitTime => write!(f, "submit_time"),
82        }
83    }
84}
85
86#[derive(Clone, Deserialize, Debug)]
87struct LavaJob {
88    id: i64,
89    submitter: String,
90    viewing_groups: Vec<i64>,
91    description: String,
92    health_check: bool,
93    requested_device_type: Option<String>,
94    tags: Vec<u32>,
95    actual_device: Option<String>,
96    submit_time: DateTime<Utc>,
97    start_time: Option<DateTime<Utc>>,
98    end_time: Option<DateTime<Utc>>,
99    state: State,
100    health: Health,
101    priority: i64,
102    definition: String,
103    original_definition: String,
104    multinode_definition: String,
105    failure_tags: Vec<u32>,
106    failure_comment: Option<String>,
107}
108
109/// The data available for a job from the LAVA API
110///
111/// Note that [`tags`](Job::tags) have been resolved into [`Tag`]
112/// objects, rather than tag ids, but that
113/// [`viewing_groups`](Job::viewing_groups) and
114/// [`failure_tags`](Job::failure_tags) have not.
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub struct Job {
117    pub id: i64,
118    pub submitter: String,
119    pub viewing_groups: Vec<i64>,
120    pub description: String,
121    pub health_check: bool,
122    pub requested_device_type: Option<String>,
123    pub tags: Vec<Tag>,
124    pub actual_device: Option<String>,
125    pub submit_time: DateTime<Utc>,
126    pub start_time: Option<DateTime<Utc>>,
127    pub end_time: Option<DateTime<Utc>>,
128    pub state: State,
129    pub health: Health,
130    pub priority: i64,
131    pub definition: String,
132    pub original_definition: String,
133    pub multinode_definition: String,
134    pub failure_tags: Vec<Tag>,
135    pub failure_comment: Option<String>,
136}
137
138enum PagingState<'a> {
139    Paging,
140    Transforming(BoxFuture<'a, Job>),
141}
142
143/// A [`Stream`] that yields a selected subset of the [`Job`]
144/// instances on a LAVA server.
145///
146/// These are constructed using a [`JobsBuilder`]; there is no `new`
147/// method on this struct.
148pub struct Jobs<'a> {
149    lava: &'a Lava,
150    paginator: Paginator<LavaJob>,
151    state: PagingState<'a>,
152}
153
154impl<'a> Jobs<'a> {
155    /// The server's latest report of how many [`Job`] instances are
156    /// in the result set.
157    ///
158    /// Note that this is not the total number of instances, only the
159    /// total number matching the query. Also, note that the number is
160    /// subject to change as the stream is read, owing to pagination;
161    /// this number will always be the number of results most recently
162    /// reported, and can be an over- or under-estimate by the time
163    /// the stream is drained.
164    pub fn reported_items(&self) -> Option<u32> {
165        self.paginator.reported_items()
166    }
167}
168
169/// Select a set of [`Job`] instances to return from the LAVA server.
170///
171/// This is the way to construct a [`Jobs`] object, which can stream
172/// the actual data. It allows customisation of which jobs to return,
173/// and in what order.
174///
175/// Example:
176/// ```rust
177/// use futures::stream::TryStreamExt;
178/// # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState};
179/// use lava_api::{Lava, job::State, job::Ordering};
180/// #
181/// # tokio_test::block_on( async {
182/// # let limits = PaginationLimits::new();
183/// # let population = PopulationParams::new();
184/// # let mock = LavaMock::new(SharedState::new_populated(population), limits).await;
185/// # let service_uri = mock.uri();
186/// # let lava_token = None;
187///
188/// let lava = Lava::new(&service_uri, lava_token).expect("failed to make lava");
189///
190/// let mut lj = lava
191///     .jobs()
192///     .state(State::Submitted)
193///     .ordering(Ordering::StartTime, true)
194///     .query();
195///
196/// while let Some(job) = lj
197///     .try_next()
198///     .await
199///     .expect("failed to get job")
200/// {
201///     println!("Got job {:?}", job);
202/// }
203/// # });
204/// ```
205#[derive(Debug, Clone)]
206pub struct JobsBuilder<'a> {
207    lava: &'a Lava,
208    states: QuerySet<State>,
209    healths: QuerySet<Health>,
210    limit: Option<u32>,
211    ordering: Ordering,
212    ids: Vec<i64>,
213    id_after: Option<i64>,
214    started_after: Option<DateTime<Utc>>,
215    submitted_after: Option<DateTime<Utc>>,
216    ended_after: Option<DateTime<Utc>>,
217    ascending: bool,
218}
219
220impl<'a> JobsBuilder<'a> {
221    /// Create a new [`JobsBuilder`]
222    ///
223    /// The default query is:
224    /// - order by [`Ordering::Id`]
225    /// - no filtering
226    /// - default result pagination
227    pub fn new(lava: &'a Lava) -> Self {
228        Self {
229            lava,
230            states: QuerySet::new(String::from("state")),
231            healths: QuerySet::new(String::from("health")),
232            limit: None,
233            ordering: Ordering::Id,
234            ids: Vec::new(),
235            id_after: None,
236            started_after: None,
237            submitted_after: None,
238            ended_after: None,
239            ascending: true,
240        }
241    }
242
243    /// Return jobs in this state.
244    pub fn state(mut self, state: State) -> Self {
245        self.states.include(state);
246        self
247    }
248
249    /// Exclude jobs in this state.
250    pub fn state_not(mut self, state: State) -> Self {
251        self.states.exclude(&state);
252        self
253    }
254
255    /// Set the number of jobs retrieved at a time while the query is
256    /// running. The query will be processed transparently as a
257    /// sequence of requests that return all matching responses. This
258    /// setting governs the size of each of the (otherwise
259    /// transparent) requests, so this number is really a page size.
260    ///
261    /// Note that you will see artifacts on queries that are split
262    /// into many requests, especially when responses are slow. This
263    /// makes setting the limit much smaller than the response size
264    /// unattractive when accurate data is required. However, the
265    /// server will need to return records in chunks of this size,
266    /// regardless of how many are consumed from the response stream,
267    /// which makes setting the limit much higher than the response
268    /// size wasteful. In practice, it is probably best to set this
269    /// limit to the expected response size for most use cases.
270    ///
271    /// Artifacts occur when paging occurs, because paging is entirely
272    /// client side. Each page contains a section of the query
273    /// begining with the job at some multiple of the limit count into
274    /// the result set.  However the result set is evolving while the
275    /// paging is occurring, and this is not currently compensated
276    /// for, which leads to jobs being returned multiple times at the
277    /// boundaries between pages - or even omitted depending on the
278    /// query. In general, query sets that can shrink are not safe to
279    /// use with paging, because results can be lost rather than
280    /// duplicated.
281    pub fn limit(mut self, limit: u32) -> Self {
282        self.limit = Some(limit);
283        self
284    }
285
286    /// Return jobs with this health.
287    pub fn health(mut self, health: Health) -> Self {
288        self.healths.include(health);
289        self
290    }
291
292    /// Exclude jobs with this health.
293    pub fn health_not(mut self, health: Health) -> Self {
294        self.healths.exclude(&health);
295        self
296    }
297
298    /// Return only jobs whose id is `id`.
299    pub fn id(mut self, id: i64) -> Self {
300        self.ids.push(id);
301        self
302    }
303
304    /// Return only jobs whose id is strictly greater than `id`.
305    pub fn id_after(mut self, id: i64) -> Self {
306        self.id_after = Some(id);
307        self
308    }
309
310    /// Return only jobs whose start time is strictly after the given
311    /// instant.
312    pub fn started_after(mut self, when: chrono::DateTime<Utc>) -> Self {
313        self.started_after = Some(when);
314        self
315    }
316
317    /// Return only jobs whose submission time is strictly after the
318    /// given instant.
319    pub fn submitted_after(mut self, when: chrono::DateTime<Utc>) -> Self {
320        self.submitted_after = Some(when);
321        self
322    }
323
324    /// Return only jobs which ended strictly after the given instant.
325    pub fn ended_after(mut self, when: chrono::DateTime<Utc>) -> Self {
326        self.ended_after = Some(when);
327        self
328    }
329
330    /// Order returned jobs by the given key.
331    pub fn ordering(mut self, ordering: Ordering, ascending: bool) -> Self {
332        self.ordering = ordering;
333        self.ascending = ascending;
334        self
335    }
336
337    /// Begin querying for jobs, returning a [`Jobs`] instance
338    pub fn query(self) -> Jobs<'a> {
339        let mut url = self
340            .lava
341            .base
342            .join("jobs/")
343            .expect("Failed to append to base url");
344        url.query_pairs_mut().append_pair(
345            "ordering",
346            &format!(
347                "{}{}",
348                match self.ascending {
349                    true => "",
350                    false => "-",
351                },
352                self.ordering
353            ),
354        );
355        if let Some(pair) = self.states.query() {
356            url.query_pairs_mut().append_pair(&pair.0, &pair.1);
357        }
358        if let Some(limit) = self.limit {
359            url.query_pairs_mut()
360                .append_pair("limit", &limit.to_string());
361        };
362        if let Some(pair) = self.healths.query() {
363            url.query_pairs_mut().append_pair(&pair.0, &pair.1);
364        }
365
366        match self.ids.len() {
367            0 => (),
368            1 => {
369                url.query_pairs_mut()
370                    .append_pair("id", &self.ids[0].to_string());
371            }
372            _ => {
373                let ids: Vec<_> = self.ids.iter().map(i64::to_string).collect();
374                url.query_pairs_mut().append_pair("id__in", &ids.join(","));
375            }
376        }
377
378        if let Some(id_after) = self.id_after {
379            url.query_pairs_mut()
380                .append_pair("id__gt", &id_after.to_string());
381        };
382        if let Some(started_after) = self.started_after {
383            url.query_pairs_mut()
384                .append_pair("start_time__gt", &started_after.to_rfc3339());
385        };
386        if let Some(submitted_after) = self.submitted_after {
387            url.query_pairs_mut()
388                .append_pair("submit_time__gt", &submitted_after.to_rfc3339());
389        };
390        if let Some(ended_after) = self.ended_after {
391            url.query_pairs_mut()
392                .append_pair("end_time__gt", &ended_after.to_rfc3339());
393        };
394
395        let paginator = Paginator::new(self.lava.client.clone(), url);
396        Jobs {
397            lava: self.lava,
398            paginator,
399            state: PagingState::Paging,
400        }
401    }
402}
403
404async fn transform_job(job: LavaJob, lava: &Lava) -> Job {
405    let t = stream::iter(job.tags.iter());
406    let tags = t
407        .filter_map(|i| async move { lava.tag(*i).await })
408        .collect()
409        .await;
410
411    let t = stream::iter(job.failure_tags.iter());
412    let failure_tags = t
413        .filter_map(|i| async move { lava.tag(*i).await })
414        .collect()
415        .await;
416
417    Job {
418        id: job.id,
419        submitter: job.submitter,
420        viewing_groups: job.viewing_groups,
421        description: job.description,
422        health_check: job.health_check,
423        requested_device_type: job.requested_device_type,
424        tags,
425        actual_device: job.actual_device,
426        submit_time: job.submit_time,
427        start_time: job.start_time,
428        end_time: job.end_time,
429        state: job.state,
430        health: job.health,
431        priority: job.priority,
432        definition: job.definition,
433        original_definition: job.original_definition,
434        multinode_definition: job.multinode_definition,
435        failure_tags,
436        failure_comment: job.failure_comment,
437    }
438}
439
440impl<'a> Stream for Jobs<'a> {
441    type Item = Result<Job, PaginationError>;
442
443    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444        let me = self.get_mut();
445
446        loop {
447            return match &mut me.state {
448                PagingState::Paging => {
449                    let p = Pin::new(&mut me.paginator);
450                    match p.poll_next(cx) {
451                        Poll::Ready(None) => Poll::Ready(None),
452                        Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
453                        Poll::Ready(Some(Ok(d))) => {
454                            me.state = PagingState::Transforming(transform_job(d, me.lava).boxed());
455                            continue;
456                        }
457                        Poll::Pending => Poll::Pending,
458                    }
459                }
460                PagingState::Transforming(fut) => match fut.as_mut().poll(cx) {
461                    Poll::Ready(d) => {
462                        me.state = PagingState::Paging;
463                        Poll::Ready(Some(Ok(d)))
464                    }
465                    Poll::Pending => Poll::Pending,
466                },
467            };
468        }
469    }
470}
471
472#[derive(Error, Debug)]
473pub enum SubmissionError {
474    #[error("Request failed {0}")]
475    Request(#[from] reqwest::Error),
476    #[error("Invalid job: {0}")]
477    InvalidJob(String),
478    #[error("Unexpected reply: {0}")]
479    UnexpectedReply(reqwest::StatusCode),
480}
481
482#[derive(Debug, Serialize)]
483struct Submission<'a> {
484    definition: &'a str,
485}
486
487#[derive(Debug, Deserialize)]
488struct SubmissionReply {
489    message: String,
490    #[serde(default)]
491    job_ids: Vec<i64>,
492}
493
494pub async fn submit_job(lava: &Lava, definition: &str) -> Result<Vec<i64>, SubmissionError> {
495    let url = lava
496        .base
497        .join("jobs/")
498        .expect("Failed to append to base url");
499    let sub = Submission { definition };
500
501    let post = lava.client.post(url).json(&sub).send().await?;
502
503    match post.status() {
504        StatusCode::CREATED => {
505            let reply: SubmissionReply = post.json().await?;
506            Ok(reply.job_ids)
507        }
508        StatusCode::BAD_REQUEST => {
509            let reply: SubmissionReply = post.json().await?;
510            Err(SubmissionError::InvalidJob(reply.message))
511        }
512        s => Err(SubmissionError::UnexpectedReply(s)),
513    }
514}
515
516#[derive(Error, Debug)]
517pub enum CancellationError {
518    #[error("Request failed {0}")]
519    Request(#[from] reqwest::Error),
520    #[error("Unexpected reply: {0}")]
521    UnexpectedReply(reqwest::StatusCode),
522}
523
524pub async fn cancel_job(lava: &Lava, id: i64) -> Result<(), CancellationError> {
525    let mut url = lava.base.clone();
526    url.path_segments_mut()
527        .unwrap()
528        .pop_if_empty()
529        .push("jobs")
530        .push(&id.to_string())
531        .push("cancel")
532        .push("");
533
534    let res = lava.client.get(url).send().await?;
535
536    match res.status() {
537        StatusCode::OK => Ok(()),
538        s => Err(CancellationError::UnexpectedReply(s)),
539    }
540}
541
542#[derive(Error, Debug)]
543pub enum ResultsError {
544    #[error("Request failed {0}")]
545    Request(#[from] reqwest::Error),
546    #[error("Unexpected reply: {0}")]
547    UnexpectedReply(reqwest::StatusCode),
548}
549
550pub async fn job_results_as_junit(
551    lava: &Lava,
552    id: i64,
553) -> Result<impl Stream<Item = Result<Bytes, ResultsError>> + Send + Unpin + '_, ResultsError> {
554    let mut url = lava.base.clone();
555    url.path_segments_mut()
556        .unwrap()
557        .pop_if_empty()
558        .push("jobs")
559        .push(&id.to_string())
560        .push("junit")
561        .push("");
562
563    let res = lava.client.get(url).send().await?;
564    match res.status() {
565        StatusCode::OK => Ok(res.bytes_stream().map_err(ResultsError::from)),
566        s => Err(ResultsError::UnexpectedReply(s)),
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::{Health, Job, Ordering, State, Tag};
573    use crate::Lava;
574
575    use boulder::{
576        Buildable, Builder, GeneratableWithPersianRug, GeneratorWithPersianRugMutIterator, Repeat,
577        Some as GSome, SubsetsFromPersianRug, Time,
578    };
579    use chrono::{DateTime, Duration, Utc};
580    use futures::{AsyncReadExt, TryStreamExt};
581    use lava_api_mock::{
582        Device as MockDevice, DeviceType as MockDeviceType, Group as MockGroup, Job as MockJob,
583        JobHealth as MockJobHealth, JobState as MockJobState, LavaMock, PaginationLimits, PassFail,
584        PopulationParams, SharedState, Tag as MockTag, User as MockUser,
585    };
586    use persian_rug::{Accessor, Context, Proxy};
587    use std::collections::{BTreeMap, BTreeSet};
588    use std::convert::{Infallible, TryFrom, TryInto};
589    use std::str::FromStr;
590    use test_log::test;
591
592    impl Job {
593        #[persian_rug::constraints(
594            context = C,
595            access(
596                MockUser<C>,
597                MockGroup<C>,
598                MockTag<C>,
599                MockDevice<C>,
600                MockDeviceType<C>
601            )
602        )]
603        pub fn from_mock<'b, B, C>(job: &MockJob<C>, context: B) -> Job
604        where
605            B: 'b + Accessor<Context = C>,
606            C: Context + 'static,
607        {
608            Self {
609                id: job.id,
610                submitter: context.get(&job.submitter).username.clone(),
611                viewing_groups: job
612                    .viewing_groups
613                    .iter()
614                    .map(|g| context.get(g).id)
615                    .collect::<Vec<_>>(),
616                description: job.description.clone(),
617                health_check: job.health_check,
618                requested_device_type: job
619                    .requested_device_type
620                    .map(|d| context.get(&d).name.to_string()),
621                tags: job
622                    .tags
623                    .iter()
624                    .map(|t| Tag::from_mock(context.get(t), context.clone()))
625                    .collect::<Vec<_>>(),
626                actual_device: job
627                    .actual_device
628                    .as_ref()
629                    .map(|d| context.get(d).hostname.to_string()),
630                submit_time: job.submit_time.unwrap(),
631                start_time: job.start_time,
632                end_time: job.end_time,
633                state: job.state.try_into().unwrap(),
634                health: job.health.try_into().unwrap(),
635                priority: job.priority,
636                definition: job.definition.clone(),
637                original_definition: job.original_definition.clone(),
638                multinode_definition: job.multinode_definition.clone(),
639                failure_tags: job
640                    .failure_tags
641                    .iter()
642                    .map(|t| Tag::from_mock(context.get(t), context.clone()))
643                    .collect::<Vec<_>>(),
644                failure_comment: job.failure_comment.clone(),
645            }
646        }
647    }
648
649    impl TryFrom<MockJobState> for State {
650        type Error = Infallible;
651        fn try_from(state: MockJobState) -> Result<State, Self::Error> {
652            use State::*;
653
654            match state {
655                MockJobState::Submitted => Ok(Submitted),
656                MockJobState::Scheduling => Ok(Scheduling),
657                MockJobState::Scheduled => Ok(Scheduled),
658                MockJobState::Running => Ok(Running),
659                MockJobState::Canceling => Ok(Canceling),
660                MockJobState::Finished => Ok(Finished),
661            }
662        }
663    }
664
665    impl TryFrom<MockJobHealth> for Health {
666        type Error = Infallible;
667        fn try_from(health: MockJobHealth) -> Result<Health, Self::Error> {
668            use Health::*;
669
670            match health {
671                MockJobHealth::Unknown => Ok(Unknown),
672                MockJobHealth::Complete => Ok(Complete),
673                MockJobHealth::Incomplete => Ok(Incomplete),
674                MockJobHealth::Canceled => Ok(Canceled),
675            }
676        }
677    }
678
679    #[test]
680    fn test_display() {
681        assert_eq!(State::Submitted.to_string(), "Submitted");
682        assert_eq!(State::Scheduling.to_string(), "Scheduling");
683        assert_eq!(State::Scheduled.to_string(), "Scheduled");
684        assert_eq!(State::Running.to_string(), "Running");
685        assert_eq!(State::Canceling.to_string(), "Canceling");
686        assert_eq!(State::Finished.to_string(), "Finished");
687
688        assert_eq!(Health::Unknown.to_string(), "Unknown");
689        assert_eq!(Health::Complete.to_string(), "Complete");
690        assert_eq!(Health::Incomplete.to_string(), "Incomplete");
691        assert_eq!(Health::Canceled.to_string(), "Canceled");
692    }
693
694    #[test]
695    fn test_from_str() {
696        assert_eq!(Ok(State::Submitted), State::from_str("Submitted"));
697        assert_eq!(Ok(State::Scheduling), State::from_str("Scheduling"));
698        assert_eq!(Ok(State::Scheduled), State::from_str("Scheduled"));
699        assert_eq!(Ok(State::Running), State::from_str("Running"));
700        assert_eq!(Ok(State::Canceling), State::from_str("Canceling"));
701        assert_eq!(Ok(State::Finished), State::from_str("Finished"));
702        assert_eq!(
703            Err(strum::ParseError::VariantNotFound),
704            State::from_str("womble")
705        );
706
707        assert_eq!(Ok(Health::Unknown), Health::from_str("Unknown"));
708        assert_eq!(Ok(Health::Complete), Health::from_str("Complete"));
709        assert_eq!(Ok(Health::Incomplete), Health::from_str("Incomplete"));
710        assert_eq!(Ok(Health::Canceled), Health::from_str("Canceled"));
711        assert_eq!(
712            Err(strum::ParseError::VariantNotFound),
713            Health::from_str("")
714        );
715    }
716
717    /// Stream 50 jobs with a page limit of 7 from the server
718    /// checking that we correctly reconstruct their tags and that
719    /// they are all accounted for (that pagination is handled
720    /// properly)
721    #[test(tokio::test)]
722    async fn test_basic() {
723        let state = SharedState::new_populated(PopulationParams::builder().jobs(50usize).build());
724        let server = LavaMock::new(
725            state.clone(),
726            PaginationLimits::builder().jobs(Some(7)).build(),
727        )
728        .await;
729
730        let mut map = BTreeMap::new();
731        let start = state.access();
732        for j in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
733            map.insert(j.id, j);
734        }
735
736        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
737
738        let mut lj = lava.jobs().query();
739
740        let mut seen = BTreeMap::new();
741        while let Some(job) = lj.try_next().await.expect("failed to get job") {
742            assert!(!seen.contains_key(&job.id));
743            assert!(map.contains_key(&job.id));
744            let jj = map.get(&job.id).unwrap();
745            assert_eq!(job.submitter, start.get(&jj.submitter).username);
746            assert_eq!(job.viewing_groups.len(), jj.viewing_groups.len());
747            for i in 0..job.viewing_groups.len() {
748                assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
749            }
750            assert_eq!(job.description, jj.description);
751            assert_eq!(job.health_check, jj.health_check);
752            assert_eq!(
753                job.requested_device_type.as_ref(),
754                jj.requested_device_type
755                    .as_ref()
756                    .map(|t| &start.get(t).name)
757            );
758
759            assert_eq!(job.tags.len(), jj.tags.len());
760            for i in 0..job.tags.len() {
761                assert_eq!(job.tags[i].id, start.get(&jj.tags[i]).id);
762                assert_eq!(job.tags[i].name, start.get(&jj.tags[i]).name);
763                assert_eq!(job.tags[i].description, start.get(&jj.tags[i]).description);
764            }
765
766            assert_eq!(
767                job.actual_device.as_ref(),
768                jj.actual_device.as_ref().map(|t| &start.get(t).hostname)
769            );
770            assert_eq!(Some(job.submit_time), jj.submit_time);
771            assert_eq!(job.start_time, jj.start_time);
772            assert_eq!(job.end_time, jj.end_time);
773            assert_eq!(job.state.to_string(), jj.state.to_string());
774            assert_eq!(job.health.to_string(), jj.health.to_string());
775            assert_eq!(job.priority, jj.priority);
776            assert_eq!(job.definition, jj.definition);
777            assert_eq!(job.original_definition, jj.original_definition);
778            assert_eq!(job.multinode_definition, jj.multinode_definition);
779
780            assert_eq!(job.failure_tags.len(), jj.failure_tags.len());
781            for i in 0..job.failure_tags.len() {
782                assert_eq!(job.viewing_groups[i], start.get(&jj.viewing_groups[i]).id);
783            }
784            assert_eq!(job.failure_comment, jj.failure_comment);
785
786            seen.insert(job.id, job.clone());
787        }
788        assert_eq!(seen.len(), 50);
789    }
790
791    /// Stream 50 jobs with a page limit of 7 from the server
792    /// checking that we correctly reconstruct their tags and that
793    /// they are all accounted for (that pagination is handled
794    /// properly)
795    #[test(tokio::test)]
796    async fn test_jobs_builder() {
797        let mut server = lava_api_mock::LavaMock::new(
798            SharedState::new_populated(
799                PopulationParams::builder()
800                    .tags(5usize)
801                    .jobs(0usize)
802                    .build(),
803            ),
804            PaginationLimits::builder().jobs(Some(7)).build(),
805        )
806        .await;
807
808        let base_date = DateTime::parse_from_rfc3339("2022-04-10T16:30:00+01:00")
809            .unwrap()
810            .with_timezone(&Utc);
811
812        let mut gen = Proxy::<lava_api_mock::Job<lava_api_mock::State>>::generator()
813            .tags(SubsetsFromPersianRug::new())
814            .health(Repeat!(
815                MockJobHealth::Complete,
816                MockJobHealth::Incomplete,
817                MockJobHealth::Canceled,
818                MockJobHealth::Unknown
819            ))
820            .state(Repeat!(
821                MockJobState::Submitted,
822                MockJobState::Scheduling,
823                MockJobState::Scheduled,
824                MockJobState::Running,
825                MockJobState::Canceling,
826                MockJobState::Finished
827            ))
828            .submit_time(GSome(Time::new(
829                base_date - Duration::minutes(1),
830                Duration::minutes(-1),
831            )))
832            .start_time(GSome(Time::new(base_date, Duration::minutes(-1))))
833            .end_time(GSome(Time::new(base_date, Duration::seconds(-30))));
834
835        let _ = GeneratorWithPersianRugMutIterator::new(&mut gen, server.state_mut())
836            .take(50)
837            .collect::<Vec<_>>();
838
839        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
840
841        let mut lj = lava.jobs().state(State::Running).query();
842
843        let mut count = 0;
844        while let Some(job) = lj.try_next().await.expect("failed to get job") {
845            assert_eq!(job.state, State::Running);
846            count += 1;
847        }
848        assert_eq!(count, 8);
849
850        let mut lj = lava.jobs().state_not(State::Canceling).query();
851        let mut count = 0;
852        while let Some(job) = lj.try_next().await.expect("failed to get job") {
853            assert_ne!(job.state, State::Canceling);
854            count += 1;
855        }
856        assert_eq!(count, 42);
857
858        let mut lj = lava.jobs().health(Health::Incomplete).query();
859
860        let mut count = 0;
861        while let Some(job) = lj.try_next().await.expect("failed to get job") {
862            assert_eq!(job.health, Health::Incomplete);
863            count += 1;
864        }
865        assert_eq!(count, 13);
866
867        let mut lj = lava.jobs().health_not(Health::Canceled).query();
868        let mut count = 0;
869        while let Some(job) = lj.try_next().await.expect("failed to get job") {
870            assert_ne!(job.health, Health::Canceled);
871            count += 1;
872        }
873        assert_eq!(count, 38);
874
875        let mut lj = lava.jobs().id_after(9i64).query();
876        let mut count = 0;
877        while let Some(job) = lj.try_next().await.expect("failed to get job") {
878            assert!(job.id > 9i64);
879            count += 1;
880        }
881        assert_eq!(count, 40);
882
883        let job_35_start = DateTime::parse_from_rfc3339("2022-04-10T15:55:00+01:00")
884            .unwrap()
885            .with_timezone(&Utc);
886
887        let mut lj = lava.jobs().started_after(job_35_start).query();
888        let mut count = 0;
889        while let Some(job) = lj.try_next().await.expect("failed to get job") {
890            assert!(job.start_time.is_some() && job.start_time.unwrap() > job_35_start);
891            count += 1;
892        }
893        assert_eq!(count, 35);
894
895        let job_19_submit = DateTime::parse_from_rfc3339("2022-04-10T16:10:00+01:00")
896            .unwrap()
897            .with_timezone(&Utc);
898
899        let mut lj = lava.jobs().submitted_after(job_19_submit).query();
900        let mut count = 0;
901        while let Some(job) = lj.try_next().await.expect("failed to get job") {
902            assert!(job.submit_time > job_19_submit);
903            count += 1;
904        }
905        assert_eq!(count, 19);
906
907        let job_25_end = DateTime::parse_from_rfc3339("2022-04-10T16:17:30+01:00")
908            .unwrap()
909            .with_timezone(&Utc);
910
911        let mut lj = lava.jobs().ended_after(job_25_end).query();
912        let mut count = 0;
913        while let Some(job) = lj.try_next().await.expect("failed to get job") {
914            assert!(job.end_time.is_some() && job.end_time.unwrap() > job_25_end);
915            count += 1;
916        }
917        assert_eq!(count, 25);
918
919        let mut lj = lava.jobs().ordering(Ordering::SubmitTime, false).query();
920        let mut count = 0;
921        let mut prev = None;
922        while let Some(job) = lj.try_next().await.expect("failed to get job") {
923            if let Some(dt) = prev {
924                assert!(job.submit_time < dt);
925            }
926            prev = Some(job.submit_time);
927            count += 1;
928        }
929        assert_eq!(count, 50);
930
931        let mut lj = lava.jobs().ordering(Ordering::SubmitTime, true).query();
932        let mut count = 0;
933        let mut prev = None;
934        while let Some(job) = lj.try_next().await.expect("failed to get job") {
935            if let Some(dt) = prev {
936                assert!(job.submit_time > dt);
937            }
938            prev = Some(job.submit_time);
939            count += 1;
940        }
941        assert_eq!(count, 50);
942
943        let mut lj = lava.jobs().ordering(Ordering::StartTime, false).query();
944        let mut count = 0;
945        let mut prev = None;
946        while let Some(job) = lj.try_next().await.expect("failed to get job") {
947            if let Some(dt) = prev {
948                assert!(job.start_time < dt);
949            }
950            prev = Some(job.start_time);
951            count += 1;
952        }
953        assert_eq!(count, 50);
954
955        let mut lj = lava.jobs().ordering(Ordering::StartTime, true).query();
956        let mut count = 0;
957        let mut prev = None;
958        while let Some(job) = lj.try_next().await.expect("failed to get job") {
959            if let Some(dt) = prev {
960                assert!(job.start_time > dt);
961            }
962            prev = Some(job.start_time);
963            count += 1;
964        }
965        assert_eq!(count, 50);
966    }
967
968    #[test(tokio::test)]
969    async fn test_junit() {
970        let pop = PopulationParams::builder()
971            .jobs(3usize)
972            .test_suites(6usize)
973            .test_cases(20usize)
974            .build();
975        let state = SharedState::new_populated(pop);
976        let server = LavaMock::new(
977            state.clone(),
978            PaginationLimits::builder().test_cases(Some(6)).build(),
979        )
980        .await;
981
982        let mut map = BTreeMap::new();
983        let start = state.access();
984        for t in start.get_iter::<lava_api_mock::TestCase<lava_api_mock::State>>() {
985            map.insert(t.name.clone(), t.clone());
986        }
987
988        let lava = Lava::new(&server.uri(), None).expect("failed to make lava server");
989        let mut seen = BTreeSet::new();
990
991        for job in start.get_iter::<lava_api_mock::Job<lava_api_mock::State>>() {
992            let mut v = Vec::new();
993            lava.job_results_as_junit(job.id)
994                .await
995                .expect("failed to obtain junit output")
996                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
997                .into_async_read()
998                .read_to_end(&mut v)
999                .await
1000                .expect("failed to fully read junit output");
1001
1002            let report = junit_parser::from_reader(std::io::Cursor::new(v))
1003                .expect("failed to parse mock junit output");
1004
1005            for suite in report.suites.iter() {
1006                for test in suite.cases.iter() {
1007                    assert!(!seen.contains(&test.name));
1008                    assert!(map.contains_key(&test.name));
1009                    let tt = map.get(&test.name).unwrap();
1010                    match tt.result {
1011                        PassFail::Pass => {
1012                            assert!(test.status.is_success());
1013                        }
1014                        PassFail::Fail => {
1015                            assert!(test.status.is_failure());
1016                        }
1017                        PassFail::Skip => {
1018                            assert!(test.status.is_skipped());
1019                        }
1020                        PassFail::Unknown => {
1021                            assert!(test.status.is_error());
1022                        }
1023                    }
1024                    seen.insert(test.name.clone());
1025                }
1026            }
1027        }
1028        assert_eq!(seen.len(), 60);
1029    }
1030}