Skip to main content

gitlab_runner/
job.rs

1//! This module describes a single gitlab job
2use crate::artifact::Artifact;
3use crate::client::{Client, JobArtifactFile, JobDependency, JobResponse, JobVariable};
4use crate::outputln;
5use bytes::{Bytes, BytesMut};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9use tokio::io::AsyncWrite;
10use tokio_retry2::strategy::{FibonacciBackoff, jitter};
11use tracing::info;
12
13use crate::client::Error as ClientError;
14
15/// Gitlab job environment variable
16///
17/// To get the underlying value [`Variable::value`] should be used, however this should not be
18/// directly displayed in the log. For displaying to the user the Variable's
19/// [`Display`](`std::fmt::Display`)
20/// implementation should be used (e.g. via `{}` as that will show the value taking the masked
21/// status into account
22pub struct Variable<'a> {
23    v: &'a JobVariable,
24}
25
26impl<'a> Variable<'a> {
27    /// Get the key of the variable
28    pub fn key(&self) -> &'a str {
29        &self.v.key
30    }
31
32    /// Get the value of the variable
33    pub fn value(&self) -> &'a str {
34        &self.v.value
35    }
36
37    /// Whether or not the variable is masked
38    pub fn masked(&self) -> bool {
39        self.v.masked
40    }
41
42    /// Whether or not the variable is public
43    pub fn public(&self) -> bool {
44        self.v.public
45    }
46}
47
48impl std::fmt::Display for Variable<'_> {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
50        if self.v.masked {
51            write!(f, "<MASKED>")
52        } else {
53            write!(f, "{}", self.v.value)
54        }
55    }
56}
57
58/// A dependency of a gitlab job
59///
60/// Dependencies in gitlab are the jobs that had to be run before this job could run.
61/// This can happen either implicitly in the pipeline definition via stages of directly
62/// via `needs` keyword
63#[derive(Debug)]
64pub struct Dependency<'a> {
65    job: &'a Job,
66    dependency: &'a JobDependency,
67}
68
69fn is_retriable_error(err: &ClientError) -> bool {
70    match err {
71        ClientError::Request(_) => true,
72        ClientError::UnexpectedStatus(status) => status.is_server_error(),
73        _ => false,
74    }
75}
76
77impl Dependency<'_> {
78    /// The id of the dependency
79    ///
80    /// This id matches the job id of the generated this depenency
81    pub fn id(&self) -> u64 {
82        self.dependency.id
83    }
84
85    /// The name job that creaof the dependency
86    ///
87    /// This name matches the job name of the job that generated this depenency
88    pub fn name(&self) -> &str {
89        &self.dependency.name
90    }
91
92    /// The filename of the dependencies artifact if it has one
93    pub fn artifact_filename(&self) -> Option<&str> {
94        self.dependency
95            .artifacts_file
96            .as_ref()
97            .map(|a| a.filename.as_str())
98    }
99
100    /// The size of the dependencies artifact if it has one
101    pub fn artifact_size(&self) -> Option<usize> {
102        self.dependency.artifacts_file.as_ref().map(|a| a.size)
103    }
104
105    async fn download_impl<A>(&self, mut writer: A) -> Result<(), ClientError>
106    where
107        A: AsyncWrite + Unpin,
108    {
109        let mut strategy = FibonacciBackoff::from_millis(900).map(jitter).take(4);
110        loop {
111            let r = self
112                .job
113                .client
114                .download_artifact(self.dependency.id, &self.dependency.token, &mut writer)
115                .await;
116
117            match (r, strategy.next()) {
118                (Err(err), Some(d)) if is_retriable_error(&err) => {
119                    outputln!(
120                        "Error getting artifacts from {}: {err}. Retrying",
121                        self.dependency.name
122                    );
123                    tokio::time::sleep(d).await;
124                }
125                (r, _) => return r,
126            }
127        }
128    }
129
130    async fn download_to_file(&self, _file: &JobArtifactFile) -> Result<(), ClientError> {
131        let mut path = self.job.build_dir.join("artifacts");
132        if let Err(e) = tokio::fs::create_dir(&path).await
133            && e.kind() != std::io::ErrorKind::AlreadyExists
134        {
135            return Err(ClientError::WriteFailure(e));
136        }
137
138        // TODO this assumes it's all zip artifacts
139        path.push(format!("{}.zip", self.id()));
140        let mut f = tokio::fs::File::create(&path)
141            .await
142            .map_err(ClientError::WriteFailure)?;
143        self.download_impl(&mut f).await?;
144        self.job.artifacts.insert_file(self.dependency.id, path);
145        Ok(())
146    }
147
148    async fn download_to_mem(&self, file: &JobArtifactFile) -> Result<(), ClientError> {
149        let mut bytes = Vec::with_capacity(file.size);
150        self.download_impl(&mut bytes).await?;
151        self.job.artifacts.insert_data(self.dependency.id, bytes);
152        Ok(())
153    }
154
155    /// Download dependencies artifact
156    ///
157    /// This downloads the actual artifact file from gitlab if it hadn't been downloaded yet.
158    /// Bigger files get saved on the filesystem while small ones are simply cached in memory
159    pub async fn download(&self) -> Result<Option<Artifact>, ClientError> {
160        if let Some(file) = &self.dependency.artifacts_file {
161            let cached = self.job.artifacts.get(self.dependency.id).await?;
162            if cached.is_some() {
163                return Ok(cached);
164            }
165
166            // Load up to 64 kilobyte directly into memory; bigger files to storage to not bloat
167            // the memory usage
168            if file.size > 64 * 1024 {
169                info!("Downloading dependency {} to file", self.dependency.id);
170                self.download_to_file(file).await?
171            } else {
172                info!("Downloading dependency {} to mem", self.dependency.id);
173                self.download_to_mem(file).await?
174            }
175            self.job.artifacts.get(self.dependency.id).await
176        } else {
177            Ok(None)
178        }
179    }
180}
181
182#[derive(Debug, Clone)]
183struct ArcU8(Arc<Vec<u8>>);
184
185impl ArcU8 {
186    fn new(data: Vec<u8>) -> Self {
187        Self(Arc::new(data))
188    }
189}
190
191impl AsRef<[u8]> for ArcU8 {
192    fn as_ref(&self) -> &[u8] {
193        self.0.as_ref().as_ref()
194    }
195}
196
197#[derive(Clone, Debug)]
198enum CacheData {
199    MemoryBacked(ArcU8),
200    FileBacked(PathBuf),
201}
202
203#[derive(Debug)]
204struct ArtifactCache {
205    data: Mutex<HashMap<u64, CacheData>>,
206}
207
208impl ArtifactCache {
209    fn new() -> Self {
210        ArtifactCache {
211            data: Mutex::new(HashMap::new()),
212        }
213    }
214
215    fn insert_file(&self, id: u64, path: PathBuf) {
216        let mut d = self.data.lock().unwrap();
217        d.insert(id, CacheData::FileBacked(path));
218    }
219
220    fn insert_data(&self, id: u64, data: Vec<u8>) {
221        let mut d = self.data.lock().unwrap();
222        d.insert(id, CacheData::MemoryBacked(ArcU8::new(data)));
223    }
224
225    fn lookup(&self, id: u64) -> Option<CacheData> {
226        let d = self.data.lock().unwrap();
227        d.get(&id).cloned()
228    }
229
230    async fn get(&self, id: u64) -> Result<Option<Artifact>, ClientError> {
231        if let Some(data) = self.lookup(id) {
232            match data {
233                CacheData::MemoryBacked(m) => {
234                    Ok(Some(Artifact::new(Box::new(std::io::Cursor::new(m)))?))
235                }
236                CacheData::FileBacked(p) => {
237                    let f = tokio::fs::File::open(p)
238                        .await
239                        .map_err(ClientError::WriteFailure)?;
240                    // Always succeeds as no operations have been started
241                    Ok(Some(Artifact::new(Box::new(f.try_into_std().unwrap()))?))
242                }
243            }
244        } else {
245            Ok(None)
246        }
247    }
248}
249
250#[derive(Clone, Debug)]
251pub(crate) struct JobLog {
252    trace: Arc<Mutex<BytesMut>>,
253}
254
255impl JobLog {
256    pub(crate) fn new() -> Self {
257        let trace = Arc::new(Mutex::new(BytesMut::new()));
258        Self { trace }
259    }
260
261    pub(crate) fn trace(&self, data: &[u8]) {
262        let mut trace = self.trace.lock().unwrap();
263        trace.extend_from_slice(data);
264    }
265
266    pub(crate) fn split_trace(&self) -> Option<Bytes> {
267        let mut trace = self.trace.lock().unwrap();
268        if trace.is_empty() {
269            None
270        } else {
271            Some(trace.split().freeze())
272        }
273    }
274}
275
276/// A running Gitlab Job
277#[derive(Debug)]
278pub struct Job {
279    response: Arc<JobResponse>,
280    client: Client,
281    log: JobLog,
282    build_dir: PathBuf,
283    artifacts: ArtifactCache,
284}
285
286impl Job {
287    pub(crate) fn new(
288        client: Client,
289        response: Arc<JobResponse>,
290        build_dir: PathBuf,
291        log: JobLog,
292    ) -> Self {
293        Self {
294            client,
295            response,
296            log,
297            build_dir,
298            artifacts: ArtifactCache::new(),
299        }
300    }
301
302    /// Get the job id
303    pub fn id(&self) -> u64 {
304        self.response.id
305    }
306
307    /// Sent data to the gitlab log
308    ///
309    /// Normally [`outputln!`](crate::outputln) should be used. This function directly puts data in the queue for
310    /// the gitlab log and side-steps the tracing infrastructure
311    pub fn trace<D: AsRef<[u8]>>(&self, data: D) {
312        self.log.trace(data.as_ref());
313    }
314
315    /// Get the variable matching the given key
316    pub fn variable(&self, key: &str) -> Option<Variable<'_>> {
317        self.response.variables.get(key).map(|v| Variable { v })
318    }
319
320    /// Get an iterator over all the variables associated with this job.
321    pub fn variables(&self) -> impl Iterator<Item = Variable<'_>> {
322        self.response.variables.values().map(|v| Variable { v })
323    }
324
325    /// Get an iterator over the job dependencies
326    pub fn dependencies(&self) -> impl Iterator<Item = Dependency<'_>> {
327        self.response
328            .dependencies
329            .iter()
330            .map(move |dependency| Dependency {
331                job: self,
332                dependency,
333            })
334    }
335
336    /// Get a reference to the jobs build dir.
337    pub fn build_dir(&self) -> &Path {
338        &self.build_dir
339    }
340}