gitlab_runner/
job.rs

1//! This module describes a single gitlab job
2use crate::artifact::Artifact;
3use crate::client::{Client, JobArtifactFile, JobDependency, JobResponse, JobVariable};
4use bytes::{Bytes, BytesMut};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use tracing::info;
9
10use crate::client::Error as ClientError;
11
12/// Gitlab job environment variable
13///
14/// To get the underlying value [`Variable::value`] should be used, however this should not be
15/// directly displayed in the log. For displaying to the user the Variable's
16/// [`Display`](`std::fmt::Display`)
17/// implementation should be used (e.g. via `{}` as that will show the value taking the masked
18/// status into account
19pub struct Variable<'a> {
20    v: &'a JobVariable,
21}
22
23impl<'a> Variable<'a> {
24    /// Get the key of the variable
25    pub fn key(&self) -> &'a str {
26        &self.v.key
27    }
28
29    /// Get the value of the variable
30    pub fn value(&self) -> &'a str {
31        &self.v.value
32    }
33
34    /// Whether or not the variable is masked
35    pub fn masked(&self) -> bool {
36        self.v.masked
37    }
38
39    /// Whether or not the variable is public
40    pub fn public(&self) -> bool {
41        self.v.public
42    }
43}
44
45impl std::fmt::Display for Variable<'_> {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
47        if self.v.masked {
48            write!(f, "<MASKED>")
49        } else {
50            write!(f, "{}", self.v.value)
51        }
52    }
53}
54
55/// A dependency of a gitlab job
56///
57/// Dependencies in gitlab are the jobs that had to be run before this job could run.
58/// This can happen either implicitly in the pipeline definition via stages of directly
59/// via `needs` keyword
60#[derive(Debug)]
61pub struct Dependency<'a> {
62    job: &'a Job,
63    dependency: &'a JobDependency,
64}
65
66impl Dependency<'_> {
67    /// The id of the dependency
68    ///
69    /// This id matches the job id of the generated this depenency
70    pub fn id(&self) -> u64 {
71        self.dependency.id
72    }
73
74    /// The name job that creaof the dependency
75    ///
76    /// This name matches the job name of the job that generated this depenency
77    pub fn name(&self) -> &str {
78        &self.dependency.name
79    }
80
81    /// The filename of the dependencies artifact if it has one
82    pub fn artifact_filename(&self) -> Option<&str> {
83        self.dependency
84            .artifacts_file
85            .as_ref()
86            .map(|a| a.filename.as_str())
87    }
88
89    /// The size of the dependencies artifact if it has one
90    pub fn artifact_size(&self) -> Option<usize> {
91        self.dependency.artifacts_file.as_ref().map(|a| a.size)
92    }
93
94    async fn download_to_file(&self, _file: &JobArtifactFile) -> Result<(), ClientError> {
95        let mut path = self.job.build_dir.join("artifacts");
96        if let Err(e) = tokio::fs::create_dir(&path).await {
97            if e.kind() != std::io::ErrorKind::AlreadyExists {
98                return Err(ClientError::WriteFailure(e));
99            }
100        }
101
102        // TODO this assumes it's all zip artifacts
103        path.push(format!("{}.zip", self.id()));
104        let mut f = tokio::fs::File::create(&path)
105            .await
106            .map_err(ClientError::WriteFailure)?;
107        self.job
108            .client
109            .download_artifact(self.dependency.id, &self.dependency.token, &mut f)
110            .await?;
111        self.job.artifacts.insert_file(self.dependency.id, path);
112        Ok(())
113    }
114
115    async fn download_to_mem(&self, file: &JobArtifactFile) -> Result<(), ClientError> {
116        let mut bytes = Vec::with_capacity(file.size);
117        self.job
118            .client
119            .download_artifact(self.dependency.id, &self.dependency.token, &mut bytes)
120            .await?;
121        self.job.artifacts.insert_data(self.dependency.id, bytes);
122        Ok(())
123    }
124
125    /// Download dependencies artifact
126    ///
127    /// This downloads the actual artifact file from gitlab if it hadn't been downloaded yet.
128    /// Bigger files get saved on the filesystem while small ones are simply cached in memory
129    pub async fn download(&self) -> Result<Option<Artifact>, ClientError> {
130        if let Some(file) = &self.dependency.artifacts_file {
131            let cached = self.job.artifacts.get(self.dependency.id).await?;
132            if cached.is_some() {
133                return Ok(cached);
134            }
135
136            // Load up to 64 kilobyte directly into memory; bigger files to storage to not bloat
137            // the memory usage
138            if file.size > 64 * 1024 {
139                info!("Downloading dependency {} to file", self.dependency.id);
140                self.download_to_file(file).await?
141            } else {
142                info!("Downloading dependency {} to mem", self.dependency.id);
143                self.download_to_mem(file).await?
144            }
145            self.job.artifacts.get(self.dependency.id).await
146        } else {
147            Ok(None)
148        }
149    }
150}
151
152#[derive(Debug, Clone)]
153struct ArcU8(Arc<Vec<u8>>);
154
155impl ArcU8 {
156    fn new(data: Vec<u8>) -> Self {
157        Self(Arc::new(data))
158    }
159}
160
161impl AsRef<[u8]> for ArcU8 {
162    fn as_ref(&self) -> &[u8] {
163        self.0.as_ref().as_ref()
164    }
165}
166
167#[derive(Clone, Debug)]
168enum CacheData {
169    MemoryBacked(ArcU8),
170    FileBacked(PathBuf),
171}
172
173#[derive(Debug)]
174struct ArtifactCache {
175    data: Mutex<HashMap<u64, CacheData>>,
176}
177
178impl ArtifactCache {
179    fn new() -> Self {
180        ArtifactCache {
181            data: Mutex::new(HashMap::new()),
182        }
183    }
184
185    fn insert_file(&self, id: u64, path: PathBuf) {
186        let mut d = self.data.lock().unwrap();
187        d.insert(id, CacheData::FileBacked(path));
188    }
189
190    fn insert_data(&self, id: u64, data: Vec<u8>) {
191        let mut d = self.data.lock().unwrap();
192        d.insert(id, CacheData::MemoryBacked(ArcU8::new(data)));
193    }
194
195    fn lookup(&self, id: u64) -> Option<CacheData> {
196        let d = self.data.lock().unwrap();
197        d.get(&id).cloned()
198    }
199
200    async fn get(&self, id: u64) -> Result<Option<Artifact>, ClientError> {
201        if let Some(data) = self.lookup(id) {
202            match data {
203                CacheData::MemoryBacked(m) => {
204                    Ok(Some(Artifact::new(Box::new(std::io::Cursor::new(m)))?))
205                }
206                CacheData::FileBacked(p) => {
207                    let f = tokio::fs::File::open(p)
208                        .await
209                        .map_err(ClientError::WriteFailure)?;
210                    // Always succeeds as no operations have been started
211                    Ok(Some(Artifact::new(Box::new(f.try_into_std().unwrap()))?))
212                }
213            }
214        } else {
215            Ok(None)
216        }
217    }
218}
219
220#[derive(Clone, Debug)]
221pub(crate) struct JobLog {
222    trace: Arc<Mutex<BytesMut>>,
223}
224
225impl JobLog {
226    pub(crate) fn new() -> Self {
227        let trace = Arc::new(Mutex::new(BytesMut::new()));
228        Self { trace }
229    }
230
231    pub(crate) fn trace(&self, data: &[u8]) {
232        let mut trace = self.trace.lock().unwrap();
233        trace.extend_from_slice(data);
234    }
235
236    pub(crate) fn split_trace(&self) -> Option<Bytes> {
237        let mut trace = self.trace.lock().unwrap();
238        if trace.is_empty() {
239            None
240        } else {
241            Some(trace.split().freeze())
242        }
243    }
244}
245
246/// A running Gitlab Job
247#[derive(Debug)]
248pub struct Job {
249    response: Arc<JobResponse>,
250    client: Client,
251    log: JobLog,
252    build_dir: PathBuf,
253    artifacts: ArtifactCache,
254}
255
256impl Job {
257    pub(crate) fn new(
258        client: Client,
259        response: Arc<JobResponse>,
260        build_dir: PathBuf,
261        log: JobLog,
262    ) -> Self {
263        Self {
264            client,
265            response,
266            log,
267            build_dir,
268            artifacts: ArtifactCache::new(),
269        }
270    }
271
272    /// Get the job id
273    pub fn id(&self) -> u64 {
274        self.response.id
275    }
276
277    /// Sent data to the gitlab log
278    ///
279    /// Normally [`outputln!`](crate::outputln) should be used. This function directly puts data in the queue for
280    /// the gitlab log and side-steps the tracing infrastructure
281    pub fn trace<D: AsRef<[u8]>>(&self, data: D) {
282        self.log.trace(data.as_ref());
283    }
284
285    /// Get the variable matching the given key
286    pub fn variable(&self, key: &str) -> Option<Variable> {
287        self.response.variables.get(key).map(|v| Variable { v })
288    }
289
290    /// Get an iterator over all the variables associated with this job.
291    pub fn variables(&self) -> impl Iterator<Item = Variable> {
292        self.response.variables.values().map(|v| Variable { v })
293    }
294
295    /// Get an iterator over the job dependencies
296    pub fn dependencies(&self) -> impl Iterator<Item = Dependency> {
297        self.response
298            .dependencies
299            .iter()
300            .map(move |dependency| Dependency {
301                job: self,
302                dependency,
303            })
304    }
305
306    /// Get a reference to the jobs build dir.
307    pub fn build_dir(&self) -> &Path {
308        &self.build_dir
309    }
310}