lava_api/lib.rs
1//! Provide access to the data held by a
2//! [LAVA](https://docs.lavasoftware.org/lava/) server via the data
3//! export REST interface.
4//!
5//! # Overview
6//!
7//! The central object in this crate is a [`Lava`], which represents
8//! a local proxy for a LAVA server. The coverage of the data exposed
9//! by LAVA is not complete. It is however possible to readback
10//! - jobs
11//! - test results
12//! - devices
13//! - workers
14//! - tags (which apply to both jobs and devices)
15//!
16//! Pagination is handled transparently, but you will likely want to
17//! use [`TryStreamExt`] to iterate over returned streams of objects,
18//! since this crate is async and built on the [`tokio`] runtime.
19//!
20//! Example:
21//! ```rust
22//! use futures::stream::TryStreamExt;
23//! # use lava_api_mock::{LavaMock, PaginationLimits, PopulationParams, SharedState};
24//! use lava_api::Lava;
25//! #
26//! # tokio_test::block_on( async {
27//! # let limits = PaginationLimits::new();
28//! # let population = PopulationParams::new();
29//! # let mock = LavaMock::new(SharedState::new_populated(population), limits).await;
30//! # let service_uri = mock.uri();
31//! # let lava_token = None;
32//!
33//! let lava = Lava::new(&service_uri, lava_token).expect("failed to create Lava object");
34//!
35//! // Read back the device data from the server
36//! let mut ld = lava.devices();
37//! while let Some(device) = ld
38//! .try_next()
39//! .await
40//! .expect("failed to read devices from server")
41//! {
42//! println!("Got device {:?}", device);
43//! }
44//! # });
45//! ```
46pub mod device;
47pub mod job;
48pub mod joblog;
49pub mod paginator;
50mod queryset;
51pub mod tag;
52pub mod test;
53pub mod worker;
54
55use bytes::Bytes;
56use futures::stream::{Stream, TryStreamExt};
57use joblog::JobLogBuilder;
58use log::debug;
59use reqwest::{header, redirect::Policy, Client};
60use std::collections::HashMap;
61use std::convert::TryInto;
62use tokio::sync::RwLock;
63use url::Url;
64
65use device::Devices;
66use job::JobsBuilder;
67use paginator::{PaginationError, Paginator};
68use tag::Tag;
69use test::TestCase;
70use thiserror::Error;
71use worker::Worker;
72
73/// Errors in construction of a [`Lava`] instance
74#[derive(Error, Debug)]
75pub enum LavaError {
76 #[error("Could not parse url")]
77 ParseUrlError(#[from] url::ParseError),
78 #[error("Invalid token format")]
79 InvalidToken(#[from] header::InvalidHeaderValue),
80 #[error("Failed to build reqwest client")]
81 ReqwestError(#[from] reqwest::Error),
82}
83
84/// A local proxy for a LAVA server
85///
86/// This provides convenient access to some of the data
87/// stored on a LAVA server, including jobs, devices, tags and
88/// workers.
89#[derive(Debug)]
90pub struct Lava {
91 client: Client,
92 base: Url,
93 tags: RwLock<HashMap<u32, Tag>>,
94}
95
96impl Lava {
97 /// Create a new Lava proxy
98 ///
99 /// Here `url` is the address of the server, and `token` is an
100 /// optional LAVA security token used to validate access.
101 pub fn new(url: &str, token: Option<String>) -> Result<Lava, LavaError> {
102 let host: Url = url.parse()?;
103 let base = host.join("api/v0.2/")?;
104 let tags = RwLock::new(HashMap::new());
105 let mut headers = header::HeaderMap::new();
106
107 if let Some(t) = token {
108 headers.insert(
109 reqwest::header::AUTHORIZATION,
110 format!("Token {}", t).try_into()?,
111 );
112 }
113
114 // Force redirect policy none as that will drop sensitive headers; in
115 // particular tokens
116 let client = Client::builder()
117 .redirect(Policy::none())
118 .default_headers(headers)
119 .build()?;
120
121 Ok(Lava { client, base, tags })
122 }
123
124 /// Refresh the tag cache
125 ///
126 /// Tags are cached to make lookup cheaper, and because the number
127 /// of jobs can be very large: resolving tags individually for
128 /// each job would be extremely slow. The cache has to be
129 /// periodically refreshed to account for changes.
130 ///
131 /// Note that tags are automatically refreshed by calling
132 /// [`tag`](Self::tag) or [`tags`](Self::tags), but not by calling
133 /// [`devices`](Self::devices) or [`jobs`](Self::jobs).
134 pub async fn refresh_tags(&self) -> Result<(), PaginationError> {
135 debug!("Refreshing tags cache");
136 let mut tags = self.tags.write().await;
137 let url = self.base.join("tags/")?;
138 let mut new_tags: Paginator<Tag> = Paginator::new(self.client.clone(), url);
139 while let Some(t) = new_tags.try_next().await? {
140 tags.insert(t.id, t);
141 }
142
143 Ok(())
144 }
145
146 /// Retrieve the [`Tag`] for the given tag id.
147 pub async fn tag(&self, tag: u32) -> Option<Tag> {
148 debug!("Checking for tag id: {}", tag);
149 {
150 let tags = self.tags.read().await;
151 if let Some(t) = tags.get(&tag) {
152 return Some(t.clone());
153 }
154 }
155 let _ = self.refresh_tags().await;
156
157 let tags = self.tags.read().await;
158 tags.get(&tag).cloned()
159 }
160
161 /// Retrieve all the tags from the server
162 ///
163 /// The returned data is not a stream, but a flat vector when the
164 /// method succeeds. This also updates the tag cache.
165 pub async fn tags(&self) -> Result<Vec<Tag>, PaginationError> {
166 self.refresh_tags().await?;
167 let tags = self.tags.read().await;
168 Ok(tags.values().cloned().collect())
169 }
170
171 /// Obtain a [`Stream`](futures::stream::Stream) of all the
172 /// [`Device`](device::Device) instances on the server.
173 pub fn devices(&self) -> Devices {
174 Devices::new(self)
175 }
176
177 pub fn log(&self, id: i64) -> JobLogBuilder {
178 JobLogBuilder::new(self, id)
179 }
180
181 /// Obtain a customisable query object for [`Job`](job::Job)
182 /// instances on the server.
183 ///
184 /// The returned [`JobsBuilder`] can be used first to select the
185 /// subset of jobs that will be returned, and then after that is
186 /// complete to obtain a stream of matching jobs. The default
187 /// query is the same as that for [`JobsBuilder::new`].
188 pub fn jobs(&self) -> JobsBuilder {
189 JobsBuilder::new(self)
190 }
191
192 pub async fn submit_job(&self, definition: &str) -> Result<Vec<i64>, job::SubmissionError> {
193 job::submit_job(self, definition).await
194 }
195
196 pub async fn cancel_job(&self, id: i64) -> Result<(), job::CancellationError> {
197 job::cancel_job(self, id).await
198 }
199
200 pub async fn job_results_as_junit(
201 &self,
202 id: i64,
203 ) -> Result<impl Stream<Item = Result<Bytes, job::ResultsError>> + '_, job::ResultsError> {
204 job::job_results_as_junit(self, id).await
205 }
206
207 /// Obtain a [`Stream`](futures::stream::Stream) of all the
208 /// [`Worker`] instances on the server.
209 pub fn workers(&self) -> Paginator<Worker> {
210 let url = self
211 .base
212 .join("workers/")
213 .expect("Failed to append to base url");
214 Paginator::new(self.client.clone(), url)
215 }
216
217 /// Obtain a [`Stream`](futures::stream::Stream) of all the
218 /// [`TestCase`] instances for a given job id.
219 pub fn test_cases(&self, job_id: i64) -> Paginator<TestCase> {
220 let url = self
221 .base
222 .join("jobs/")
223 .and_then(|x| x.join(&format!("{}/", job_id)))
224 .and_then(|x| x.join("tests/"))
225 .expect("Failed to build test case url");
226 Paginator::new(self.client.clone(), url)
227 }
228}