lava_api/
lib.rs

1//! Provide access to the data held by a
2//! [LAVA](https://docs.lavasoftware.org/lava/) server via the data
3//! export REST interface.
4//!
5//! # Overview
6//!
7//! The central object in this crate is a [`Lava`], which represents
8//! a local proxy for a LAVA server. The coverage of the data exposed
9//! by LAVA is not complete. It is however possible to readback
10//! - jobs
11//! - test results
12//! - devices
13//! - workers
14//! - tags (which apply to both jobs and devices)
15//!
16//! Pagination is handled transparently, but you will likely want to
17//! use [`TryStreamExt`] to iterate over returned streams of objects,
18//! since this crate is async and built on the [`tokio`] runtime.
19//!
20//! Example:
21//! ```rust
22//! use futures::stream::TryStreamExt;
23//! # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState};
24//! use lava_api::Lava;
25//! #
26//! # tokio_test::block_on( async {
27//! # let limits = PaginationLimits::new();
28//! # let population = PopulationParams::new();
29//! # let mock = LavaMock::new(SharedState::new_populated(population), limits).await;
30//! # let service_uri = mock.uri();
31//! # let lava_token = None;
32//!
33//! let lava = Lava::new(&service_uri, lava_token).expect("failed to create Lava object");
34//!
35//! // Read back the device data from the server
36//! let mut ld = lava.devices();
37//! while let Some(device) = ld
38//!     .try_next()
39//!     .await
40//!     .expect("failed to read devices from server")
41//! {
42//!     println!("Got device {:?}", device);
43//! }
44//! # });
45//! ```
46pub mod device;
47pub mod job;
48pub mod joblog;
49pub mod paginator;
50mod queryset;
51pub mod tag;
52pub mod test;
53pub mod worker;
54
55use bytes::Bytes;
56use futures::stream::{Stream, TryStreamExt};
57use joblog::JobLogBuilder;
58use log::debug;
59use reqwest::{header, redirect::Policy, Client};
60use std::collections::HashMap;
61use std::convert::TryInto;
62use tokio::sync::RwLock;
63use url::Url;
64
65use device::Devices;
66use job::JobsBuilder;
67use paginator::{PaginationError, Paginator};
68use tag::Tag;
69use test::TestCase;
70use thiserror::Error;
71use worker::Worker;
72
73/// Errors in construction of a [`Lava`] instance
74#[derive(Error, Debug)]
75pub enum LavaError {
76    #[error("Could not parse url")]
77    ParseUrlError(#[from] url::ParseError),
78    #[error("Invalid token format")]
79    InvalidToken(#[from] header::InvalidHeaderValue),
80    #[error("Failed to build reqwest client")]
81    ReqwestError(#[from] reqwest::Error),
82}
83
84/// A local proxy for a LAVA server
85///
86/// This provides convenient access to some of the data
87/// stored on a LAVA server, including jobs, devices, tags and
88/// workers.
89#[derive(Debug)]
90pub struct Lava {
91    client: Client,
92    base: Url,
93    tags: RwLock<HashMap<u32, Tag>>,
94}
95
96impl Lava {
97    /// Create a new Lava proxy
98    ///
99    /// Here `url` is the address of the server, and `token` is an
100    /// optional LAVA security token used to validate access.
101    pub fn new(url: &str, token: Option<String>) -> Result<Lava, LavaError> {
102        let host: Url = url.parse()?;
103        let base = host.join("api/v0.2/")?;
104        let tags = RwLock::new(HashMap::new());
105        let mut headers = header::HeaderMap::new();
106
107        if let Some(t) = token {
108            headers.insert(
109                reqwest::header::AUTHORIZATION,
110                format!("Token {}", t).try_into()?,
111            );
112        }
113
114        // Force redirect policy none as that will drop sensitive headers; in
115        // particular tokens
116        let client = Client::builder()
117            .redirect(Policy::none())
118            .default_headers(headers)
119            .build()?;
120
121        Ok(Lava { client, base, tags })
122    }
123
124    /// Refresh the tag cache
125    ///
126    /// Tags are cached to make lookup cheaper, and because the number
127    /// of jobs can be very large: resolving tags individually for
128    /// each job would be extremely slow. The cache has to be
129    /// periodically refreshed to account for changes.
130    ///
131    /// Note that tags are automatically refreshed by calling
132    /// [`tag`](Self::tag) or [`tags`](Self::tags), but not by calling
133    /// [`devices`](Self::devices) or [`jobs`](Self::jobs).
134    pub async fn refresh_tags(&self) -> Result<(), PaginationError> {
135        debug!("Refreshing tags cache");
136        let mut tags = self.tags.write().await;
137        let url = self.base.join("tags/")?;
138        let mut new_tags: Paginator<Tag> = Paginator::new(self.client.clone(), url);
139        while let Some(t) = new_tags.try_next().await? {
140            tags.insert(t.id, t);
141        }
142
143        Ok(())
144    }
145
146    /// Retrieve the [`Tag`] for the given tag id.
147    pub async fn tag(&self, tag: u32) -> Option<Tag> {
148        debug!("Checking for tag id: {}", tag);
149        {
150            let tags = self.tags.read().await;
151            if let Some(t) = tags.get(&tag) {
152                return Some(t.clone());
153            }
154        }
155        let _ = self.refresh_tags().await;
156
157        let tags = self.tags.read().await;
158        tags.get(&tag).cloned()
159    }
160
161    /// Retrieve all the tags from the server
162    ///
163    /// The returned data is not a stream, but a flat vector when the
164    /// method succeeds. This also updates the tag cache.
165    pub async fn tags(&self) -> Result<Vec<Tag>, PaginationError> {
166        self.refresh_tags().await?;
167        let tags = self.tags.read().await;
168        Ok(tags.values().cloned().collect())
169    }
170
171    /// Obtain a [`Stream`](futures::stream::Stream) of all the
172    /// [`Device`](device::Device) instances on the server.
173    pub fn devices(&self) -> Devices {
174        Devices::new(self)
175    }
176
177    pub fn log(&self, id: i64) -> JobLogBuilder {
178        JobLogBuilder::new(self, id)
179    }
180
181    /// Obtain a customisable query object for [`Job`](job::Job)
182    /// instances on the server.
183    ///
184    /// The returned [`JobsBuilder`] can be used first to select the
185    /// subset of jobs that will be returned, and then after that is
186    /// complete to obtain a stream of matching jobs. The default
187    /// query is the same as that for [`JobsBuilder::new`].
188    pub fn jobs(&self) -> JobsBuilder {
189        JobsBuilder::new(self)
190    }
191
192    pub async fn submit_job(&self, definition: &str) -> Result<Vec<i64>, job::SubmissionError> {
193        job::submit_job(self, definition).await
194    }
195
196    pub async fn cancel_job(&self, id: i64) -> Result<(), job::CancellationError> {
197        job::cancel_job(self, id).await
198    }
199
200    pub async fn job_results_as_junit(
201        &self,
202        id: i64,
203    ) -> Result<impl Stream<Item = Result<Bytes, job::ResultsError>> + '_, job::ResultsError> {
204        job::job_results_as_junit(self, id).await
205    }
206
207    /// Obtain a [`Stream`](futures::stream::Stream) of all the
208    /// [`Worker`] instances on the server.
209    pub fn workers(&self) -> Paginator<Worker> {
210        let url = self
211            .base
212            .join("workers/")
213            .expect("Failed to append to base url");
214        Paginator::new(self.client.clone(), url)
215    }
216
217    /// Obtain a [`Stream`](futures::stream::Stream) of all the
218    /// [`TestCase`] instances for a given job id.
219    pub fn test_cases(&self, job_id: i64) -> Paginator<TestCase> {
220        let url = self
221            .base
222            .join("jobs/")
223            .and_then(|x| x.join(&format!("{}/", job_id)))
224            .and_then(|x| x.join("tests/"))
225            .expect("Failed to build test case url");
226        Paginator::new(self.client.clone(), url)
227    }
228}