Skip to main content

gitlab_runner/
lib.rs

1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3pub mod artifact;
4mod client;
5mod logging;
6use crate::client::Client;
7mod run;
8use crate::run::Run;
9pub mod job;
10use client::ClientMetadata;
11use hmac::Hmac;
12use hmac::KeyInit;
13use hmac::Mac;
14use job::{Job, JobLog};
15pub mod uploader;
16pub use logging::GitlabLayer;
17use rand::distr::Alphanumeric;
18use rand::distr::SampleString;
19use runlist::JobRunList;
20use tokio::fs::File;
21use tokio::io::AsyncReadExt;
22use tokio_util::sync::CancellationToken;
23use tracing::debug;
24use tracing::instrument::WithSubscriber;
25
26mod runlist;
27use crate::runlist::RunList;
28
29use futures::AsyncRead;
30use futures::prelude::*;
31use std::borrow::Cow;
32use std::fmt::Write;
33use std::path::PathBuf;
34use tokio::time::{Duration, sleep};
35use tracing::warn;
36use url::Url;
37
38#[doc(hidden)]
39pub use ::tracing;
40
41/// Output a line to the gitlab log
42#[macro_export]
43macro_rules! outputln {
44    ($f: expr) => {
45        $crate::tracing::trace!(gitlab.output = true, $f)
46    };
47    ($f: expr, $($arg: tt) *) => {
48        $crate::tracing::trace!(gitlab.output = true, $f, $($arg)*)
49    };
50}
51
52/// Result type for various stagings of a jobs
53pub type JobResult = Result<(), ()>;
54pub use client::Phase;
55
56/// A file that a [`JobHandler`] is willing to upload to the server
57///
58/// A [`JobHandler`] will be queried for the set of files it is able
59/// to upload to the server. These might be on disk, or they might be
60/// generated or downloaded from some other source on demand. The
61/// `get_path` method is required so that the globbing Gitlab expects
62/// can be performed without the handler needing to be involved.
63#[async_trait::async_trait]
64pub trait UploadableFile {
65    /// The type of the data stream returned by
66    /// [`get_data`](Self::get_data)
67    type Data<'a>: AsyncRead + Send + Unpin
68    where
69        Self: 'a;
70
71    /// Get the logical path of the file.
72    ///
73    /// This is the path on disk from the root of the checkout for
74    /// Gitlab's own runner. It should match the paths that users are
75    /// expected to specify when requesting artifacts in their jobs.
76    fn get_path(&self) -> Cow<'_, str>;
77
78    /// Get something that can provide the data for this file.
79    ///
80    /// This can be any implementor of [`AsyncRead`].
81    async fn get_data(&self) -> Result<Self::Data<'_>, ()>;
82}
83
84/// An [`UploadableFile`] type for JobHandlers that expose no files.
85///
86/// This will panic on attempting to actually pass it to the crate,
87/// but if you don't want or need to return artifacts to Gitlab it
88/// provides a sensible default.
89#[derive(Clone, Debug, Eq, PartialEq)]
90pub struct NoFiles {}
91
92#[async_trait::async_trait]
93impl UploadableFile for NoFiles {
94    type Data<'a> = &'a [u8];
95
96    fn get_path(&self) -> Cow<'_, str> {
97        unreachable!("tried to get path of NoFiles")
98    }
99    async fn get_data(&self) -> Result<Self::Data<'_>, ()> {
100        unreachable!("tried to read data from NoFiles");
101    }
102}
103
104/// Async trait for handling a single Job that handles its own cancellation
105///
106/// This trait is largely identical to [`JobHandler`], but its methods explicitly take a
107/// `CancellationToken`, which will be triggered if GitLab cancels the job, after which the method
108/// will be responsible for cancellation appropriately. In most cases, the entire execution should
109/// simply be cancelled, in which case [`JobHandler`]'s default behavior is desirable instead. (Even
110/// when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be performed.)
111///
112/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
113/// crate. However this also means the rustdoc documentation is interesting...
114#[async_trait::async_trait]
115pub trait CancellableJobHandler<U = NoFiles>: Send
116where
117    U: UploadableFile + Send + 'static,
118{
119    /// Do a single step of a job
120    ///
121    /// This gets called for each phase of the job (e.g. script and after_script). The passed
122    /// string array is the same array as was passed for a given step in the job definition. If the
123    /// job is cancelled while this is running, the given `cancel_token` will be triggered.
124    ///
125    /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
126    /// [Phase::Script] step
127    async fn step(
128        &mut self,
129        script: &[String],
130        phase: Phase,
131        cancel_token: &CancellationToken,
132    ) -> JobResult;
133
134    /// Get a list of the files available to upload
135    ///
136    /// See the description [`UploadableFile`] for more information.
137    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
138        Ok(Box::new(core::iter::empty()))
139    }
140
141    /// Cleanup after the job is finished
142    ///
143    /// This method always get called whether or not the job succeeded or was acancelled, allowing
144    /// the job handler to clean up as necessary.
145    async fn cleanup(&mut self) {}
146}
147
148/// Async trait for handling a single Job
149///
150/// In the event of being cancelled by GitLab, the `step` function will have its future dropped
151/// instantly. If manual handling of cancellation is required, use `CancellableJobHandler` instead.
152/// (Even when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be
153/// performed.)
154///
155/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
156/// crate. However this also means the rustdoc documentation is interesting...
157#[async_trait::async_trait]
158pub trait JobHandler<U = NoFiles>: Send
159where
160    U: UploadableFile + Send + 'static,
161{
162    /// Do a single step of a job
163    ///
164    /// This gets called for each phase of the job (e.g. script and after_script). The passed
165    /// string array is the same array as was passed for a given step in the job definition. If the
166    /// job is cancelled while this is running, its future will dropped, resulting in the function's
167    /// termination.
168    ///
169    /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
170    /// [Phase::Script] step
171    async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
172
173    /// Get a list of the files available to upload
174    ///
175    /// See the description [`UploadableFile`] for more information.
176    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
177        Ok(Box::new(core::iter::empty()))
178    }
179
180    /// Cleanup after the job is finished
181    ///
182    /// This method always get called whether or not the job succeeded or was cancelled, allowing
183    /// the job handler to clean up as necessary.
184    async fn cleanup(&mut self) {}
185}
186
187#[async_trait::async_trait]
188impl<J, U> CancellableJobHandler<U> for J
189where
190    J: JobHandler<U>,
191    U: UploadableFile + Send + 'static,
192{
193    async fn step(
194        &mut self,
195        script: &[String],
196        phase: Phase,
197        cancel_token: &CancellationToken,
198    ) -> JobResult {
199        tokio::select! {
200            r = self.step(script, phase) => r,
201            _ = cancel_token.cancelled() => Ok(()),
202        }
203    }
204
205    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
206        self.get_uploadable_files().await
207    }
208
209    async fn cleanup(&mut self) {
210        self.cleanup().await;
211    }
212}
213
214/// Builder for [`Runner`]
215pub struct RunnerBuilder {
216    server: Url,
217    token: String,
218    build_dir: PathBuf,
219    system_id: Option<String>,
220    run_list: RunList<u64, JobLog>,
221    metadata: ClientMetadata,
222}
223
224impl RunnerBuilder {
225    // The official gitlab runner uses 2 char prefixes (s_ or r_) followed
226    // by 12 characters for a unique identifier
227    const DEFAULT_ID_LEN: usize = 12;
228    /// Create a new [`RunnerBuilder`] for the given server url, runner token,
229    /// build dir and job list (as created by GitlabLayer::new).
230    ///
231    /// The build_dir is used to store temporary files during a job run.
232    /// ```
233    /// # use tracing_subscriber::{prelude::*, Registry};
234    /// # use gitlab_runner::{RunnerBuilder, GitlabLayer};
235    /// # use url::Url;
236    /// #
237    /// #[tokio::main]
238    /// # async fn main() {
239    /// let dir = tempfile::tempdir().unwrap();
240    /// let (layer, jobs) = GitlabLayer::new();
241    /// let subscriber = Registry::default().with(layer).init();
242    /// let runner = RunnerBuilder::new(
243    ///         Url::parse("https://gitlab.com/").unwrap(),
244    ///         "RunnerToken",
245    ///         dir.path(),
246    ///         jobs
247    ///     )
248    ///     .build()
249    ///     .await;
250    /// # }
251    /// ```
252    pub fn new<P: Into<PathBuf>, S: Into<String>>(
253        server: Url,
254        token: S,
255        build_dir: P,
256        jobs: JobRunList,
257    ) -> Self {
258        RunnerBuilder {
259            server,
260            token: token.into(),
261            build_dir: build_dir.into(),
262            system_id: None,
263            run_list: jobs.inner(),
264            metadata: ClientMetadata::default(),
265        }
266    }
267
268    /// Set the [system_id](https://docs.gitlab.com/runner/fleet_scaling/#generation-of-system_id-identifiers) for this runner
269    ///
270    /// The system_id will be truncated to 64 characters to match gitlabs limit,
271    /// but no further validation will be done. It's up to the caller to ensure the
272    /// system_id is valid for gitlab
273    pub fn system_id<S: Into<String>>(mut self, system_id: S) -> Self {
274        let mut system_id = system_id.into();
275        system_id.truncate(64);
276        self.system_id = Some(system_id);
277        self
278    }
279
280    /// Set the version reported by the gitlab runner
281    ///
282    /// The version will be truncated to 2048 characters
283    pub fn version<S: Into<String>>(mut self, version: S) -> Self {
284        let mut version = version.into();
285        version.truncate(2048);
286        self.metadata.version = Some(version);
287        self
288    }
289
290    /// Set the revision reported by the gitlab runner
291    ///
292    /// The revision will be truncated to 255 characters
293    pub fn revision<S: Into<String>>(mut self, revision: S) -> Self {
294        let mut revision = revision.into();
295        revision.truncate(255);
296        self.metadata.revision = Some(revision);
297        self
298    }
299
300    /// Set the platform reported by the gitlab runner
301    ///
302    /// The platform will be truncated to 255 characters
303    pub fn platform<S: Into<String>>(mut self, platform: S) -> Self {
304        let mut platform = platform.into();
305        platform.truncate(255);
306        self.metadata.platform = Some(platform);
307        self
308    }
309
310    /// Set the architecture reported by the gitlab runner
311    ///
312    /// The architecture will be truncated to 255 characters
313    pub fn architecture<S: Into<String>>(mut self, architecture: S) -> Self {
314        let mut architecture = architecture.into();
315        architecture.truncate(255);
316        self.metadata.architecture = Some(architecture);
317        self
318    }
319
320    async fn generate_system_id_from_machine_id() -> Option<String> {
321        let mut f = match File::open("/etc/machine-id").await {
322            Ok(f) => f,
323            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
324                debug!("/etc/machine-id not found, not generate systemd id based on it");
325                return None;
326            }
327            Err(e) => {
328                warn!("Failed to open machine-id: {e}");
329                return None;
330            }
331        };
332
333        let mut id = [0u8; 32];
334        if let Err(e) = f.read_exact(&mut id).await {
335            warn!("Failed to read from machine-id: {e}");
336            return None;
337        };
338
339        // Infallible as a hmac can take a key of any size
340        let mut mac = Hmac::<sha2::Sha256>::new_from_slice(&id).unwrap();
341        mac.update(b"gitlab-runner");
342
343        let mut system_id = String::from("s_");
344        // 2 hex chars for each byte
345        for b in &mac.finalize().into_bytes()[0..Self::DEFAULT_ID_LEN / 2] {
346            // Infallible: writing to a string
347            write!(&mut system_id, "{b:02x}").unwrap();
348        }
349        Some(system_id)
350    }
351
352    async fn generate_system_id() -> String {
353        if let Some(system_id) = Self::generate_system_id_from_machine_id().await {
354            system_id
355        } else {
356            let mut system_id = String::from("r_");
357            Alphanumeric.append_string(&mut rand::rng(), &mut system_id, Self::DEFAULT_ID_LEN);
358            system_id
359        }
360    }
361
362    /// Build the runner.
363    pub async fn build(self) -> Runner {
364        let system_id = match self.system_id {
365            Some(system_id) => system_id,
366            None => Self::generate_system_id().await,
367        };
368        let client = Client::new(self.server, self.token, system_id, self.metadata);
369        Runner {
370            client,
371            build_dir: self.build_dir,
372            run_list: self.run_list,
373        }
374    }
375}
376
377/// Runner for gitlab
378///
379/// The runner is responsible for communicating with gitlab to request new job and spawn them.
380#[derive(Debug)]
381pub struct Runner {
382    client: Client,
383    build_dir: PathBuf,
384    run_list: RunList<u64, JobLog>,
385}
386
387impl Runner {
388    /// The number of jobs currently running
389    pub fn running(&self) -> usize {
390        self.run_list.size()
391    }
392
393    /// Try to request a single job from gitlab
394    ///
395    /// This does a single poll of gitlab for a new job. If a new job received a new asynchronous
396    /// task is spawned for processing the job. The passed `process` function is called to create a
397    /// the actual job handler. Returns whether or not a job was received or an error if polling
398    /// gitlab failed.
399    ///
400    /// Note that this function is not cancel safe. If the future gets cancelled gitlab might have
401    /// provided a job for which processing didn't start yet.
402    pub async fn request_job<F, J, U, Ret>(&mut self, process: F) -> Result<bool, client::Error>
403    where
404        F: FnOnce(Job) -> Ret + Sync + Send + 'static,
405        J: CancellableJobHandler<U> + Send + 'static,
406        U: UploadableFile + Send + 'static,
407        Ret: Future<Output = Result<J, ()>> + Send + 'static,
408    {
409        let response = self.client.request_job().await?;
410        if let Some(response) = response {
411            let mut build_dir = self.build_dir.clone();
412            build_dir.push(format!("{}", response.id));
413            let mut run = Run::new(self.client.clone(), response, &mut self.run_list);
414            tokio::spawn(
415                async move { run.run(process, build_dir).await }.with_current_subscriber(),
416            );
417            Ok(true)
418        } else {
419            Ok(false)
420        }
421    }
422
423    /// Wait untill there are less then max jobs running
424    pub async fn wait_for_space(&mut self, max: usize) {
425        self.run_list.wait_for_space(max).await;
426    }
427
428    /// Wait untill there are no more jobs running
429    pub async fn drain(&mut self) {
430        self.run_list.wait_for_space(1).await;
431    }
432
433    /// Run continously, processing at most `maximum` jobs concurrently
434    ///
435    /// This essentially calls [`Runner::request_job`] requesting jobs until at most `maximum` jobs are
436    /// running in parallel.
437    pub async fn run<F, U, J, Ret>(
438        &mut self,
439        process: F,
440        maximum: usize,
441    ) -> Result<(), client::Error>
442    where
443        F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
444        J: CancellableJobHandler<U> + Send + 'static,
445        U: UploadableFile + Send + 'static,
446        Ret: Future<Output = Result<J, ()>> + Send + 'static,
447    {
448        loop {
449            self.wait_for_space(maximum).await;
450            match self.request_job(process.clone()).await {
451                /* continue to pick up a new job straight away */
452                Ok(true) => continue,
453                Ok(false) => (),
454                Err(e) => warn!("Couldn't get a job from gitlab: {:?}", e),
455            }
456            sleep(Duration::from_secs(5)).await;
457        }
458    }
459}