gitlab_runner/
lib.rs

1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3pub mod artifact;
4mod client;
5mod logging;
6use crate::client::Client;
7mod run;
8use crate::run::Run;
9pub mod job;
10use client::ClientMetadata;
11use hmac::Hmac;
12use hmac::Mac;
13use job::{Job, JobLog};
14pub mod uploader;
15pub use logging::GitlabLayer;
16use rand::distr::Alphanumeric;
17use rand::distr::SampleString;
18use runlist::JobRunList;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio_util::sync::CancellationToken;
22use tracing::debug;
23use tracing::instrument::WithSubscriber;
24
25mod runlist;
26use crate::runlist::RunList;
27
28use futures::prelude::*;
29use futures::AsyncRead;
30use std::borrow::Cow;
31use std::fmt::Write;
32use std::path::PathBuf;
33use tokio::time::{sleep, Duration};
34use tracing::warn;
35use url::Url;
36
37#[doc(hidden)]
38pub use ::tracing;
39
40/// Output a line to the gitlab log
41#[macro_export]
42macro_rules! outputln {
43    ($f: expr) => {
44        $crate::tracing::trace!(gitlab.output = true, $f)
45    };
46    ($f: expr, $($arg: tt) *) => {
47        $crate::tracing::trace!(gitlab.output = true, $f, $($arg)*)
48    };
49}
50
51/// Result type for various stagings of a jobs
52pub type JobResult = Result<(), ()>;
53pub use client::Phase;
54
55/// A file that a [`JobHandler`] is willing to upload to the server
56///
57/// A [`JobHandler`] will be queried for the set of files it is able
58/// to upload to the server. These might be on disk, or they might be
59/// generated or downloaded from some other source on demand. The
60/// `get_path` method is required so that the globbing Gitlab expects
61/// can be performed without the handler needing to be involved.
62#[async_trait::async_trait]
63pub trait UploadableFile: Eq {
64    /// The type of the data stream returned by
65    /// [`get_data`](Self::get_data)
66    type Data<'a>: AsyncRead + Send + Unpin
67    where
68        Self: 'a;
69
70    /// Get the logical path of the file.
71    ///
72    /// This is the path on disk from the root of the checkout for
73    /// Gitlab's own runner. It should match the paths that users are
74    /// expected to specify when requesting artifacts in their jobs.
75    fn get_path(&self) -> Cow<'_, str>;
76
77    /// Get something that can provide the data for this file.
78    ///
79    /// This can be any implementor of [`AsyncRead`].
80    async fn get_data(&self) -> Result<Self::Data<'_>, ()>;
81}
82
83/// An [`UploadableFile`] type for JobHandlers that expose no files.
84///
85/// This will panic on attempting to actually pass it to the crate,
86/// but if you don't want or need to return artifacts to Gitlab it
87/// provides a sensible default.
88#[derive(Clone, Debug, Eq, PartialEq)]
89pub struct NoFiles {}
90
91#[async_trait::async_trait]
92impl UploadableFile for NoFiles {
93    type Data<'a> = &'a [u8];
94
95    fn get_path(&self) -> Cow<'_, str> {
96        unreachable!("tried to get path of NoFiles")
97    }
98    async fn get_data(&self) -> Result<Self::Data<'_>, ()> {
99        unreachable!("tried to read data from NoFiles");
100    }
101}
102
103/// Async trait for handling a single Job that handles its own cancellation
104///
105/// This trait is largely identical to [`JobHandler`], but its methods explicitly take a
106/// `CancellationToken`, which will be triggered if GitLab cancels the job, after which the method
107/// will be responsible for cancellation appropriately. In most cases, the entire execution should
108/// simply be cancelled, in which case [`JobHandler`]'s default behavior is desirable instead. (Even
109/// when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be performed.)
110///
111/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
112/// crate. However this also means the rustdoc documentation is interesting...
113#[async_trait::async_trait]
114pub trait CancellableJobHandler<U = NoFiles>: Send
115where
116    U: UploadableFile + Send + 'static,
117{
118    /// Do a single step of a job
119    ///
120    /// This gets called for each phase of the job (e.g. script and after_script). The passed
121    /// string array is the same array as was passed for a given step in the job definition. If the
122    /// job is cancelled while this is running, the given `cancel_token` will be triggered.
123    ///
124    /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
125    /// [Phase::Script] step
126    async fn step(
127        &mut self,
128        script: &[String],
129        phase: Phase,
130        cancel_token: &CancellationToken,
131    ) -> JobResult;
132
133    /// Get a list of the files available to upload
134    ///
135    /// See the description [`UploadableFile`] for more information.
136    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
137        Ok(Box::new(core::iter::empty()))
138    }
139
140    /// Cleanup after the job is finished
141    ///
142    /// This method always get called whether or not the job succeeded or was acancelled, allowing
143    /// the job handler to clean up as necessary.
144    async fn cleanup(&mut self) {}
145}
146
147/// Async trait for handling a single Job
148///
149/// In the event of being cancelled by GitLab, the `step` function will have its future dropped
150/// instantly. If manual handling of cancellation is required, use `CancellableJobHandler` instead.
151/// (Even when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be
152/// performed.)
153///
154/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
155/// crate. However this also means the rustdoc documentation is interesting...
156#[async_trait::async_trait]
157pub trait JobHandler<U = NoFiles>: Send
158where
159    U: UploadableFile + Send + 'static,
160{
161    /// Do a single step of a job
162    ///
163    /// This gets called for each phase of the job (e.g. script and after_script). The passed
164    /// string array is the same array as was passed for a given step in the job definition. If the
165    /// job is cancelled while this is running, its future will dropped, resulting in the function's
166    /// termination.
167    ///
168    /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
169    /// [Phase::Script] step
170    async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
171
172    /// Get a list of the files available to upload
173    ///
174    /// See the description [`UploadableFile`] for more information.
175    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
176        Ok(Box::new(core::iter::empty()))
177    }
178
179    /// Cleanup after the job is finished
180    ///
181    /// This method always get called whether or not the job succeeded or was cancelled, allowing
182    /// the job handler to clean up as necessary.
183    async fn cleanup(&mut self) {}
184}
185
186#[async_trait::async_trait]
187impl<J, U> CancellableJobHandler<U> for J
188where
189    J: JobHandler<U>,
190    U: UploadableFile + Send + 'static,
191{
192    async fn step(
193        &mut self,
194        script: &[String],
195        phase: Phase,
196        cancel_token: &CancellationToken,
197    ) -> JobResult {
198        tokio::select! {
199            r = self.step(script, phase) => r,
200            _ = cancel_token.cancelled() => Ok(()),
201        }
202    }
203
204    async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
205        self.get_uploadable_files().await
206    }
207
208    async fn cleanup(&mut self) {
209        self.cleanup().await;
210    }
211}
212
213/// Builder for [`Runner`]
214pub struct RunnerBuilder {
215    server: Url,
216    token: String,
217    build_dir: PathBuf,
218    system_id: Option<String>,
219    run_list: RunList<u64, JobLog>,
220    metadata: ClientMetadata,
221}
222
223impl RunnerBuilder {
224    // The official gitlab runner uses 2 char prefixes (s_ or r_) followed
225    // by 12 characters for a unique identifier
226    const DEFAULT_ID_LEN: usize = 12;
227    /// Create a new [`RunnerBuilder`] for the given server url, runner token,
228    /// build dir and job list (as created by GitlabLayer::new).
229    ///
230    /// The build_dir is used to store temporary files during a job run.
231    /// ```
232    /// # use tracing_subscriber::{prelude::*, Registry};
233    /// # use gitlab_runner::{RunnerBuilder, GitlabLayer};
234    /// # use url::Url;
235    /// #
236    /// #[tokio::main]
237    /// # async fn main() {
238    /// let dir = tempfile::tempdir().unwrap();
239    /// let (layer, jobs) = GitlabLayer::new();
240    /// let subscriber = Registry::default().with(layer).init();
241    /// let runner = RunnerBuilder::new(
242    ///         Url::parse("https://gitlab.com/").unwrap(),
243    ///         "RunnerToken",
244    ///         dir.path(),
245    ///         jobs
246    ///     )
247    ///     .build()
248    ///     .await;
249    /// # }
250    /// ```
251    pub fn new<P: Into<PathBuf>, S: Into<String>>(
252        server: Url,
253        token: S,
254        build_dir: P,
255        jobs: JobRunList,
256    ) -> Self {
257        RunnerBuilder {
258            server,
259            token: token.into(),
260            build_dir: build_dir.into(),
261            system_id: None,
262            run_list: jobs.inner(),
263            metadata: ClientMetadata::default(),
264        }
265    }
266
267    /// Set the [system_id](https://docs.gitlab.com/runner/fleet_scaling/#generation-of-system_id-identifiers) for this runner
268    ///
269    /// The system_id will be truncated to 64 characters to match gitlabs limit,
270    /// but no further validation will be done. It's up to the caller to ensure the
271    /// system_id is valid for gitlab
272    pub fn system_id<S: Into<String>>(mut self, system_id: S) -> Self {
273        let mut system_id = system_id.into();
274        system_id.truncate(64);
275        self.system_id = Some(system_id);
276        self
277    }
278
279    /// Set the version reported by the gitlab runner
280    ///
281    /// The version will be truncated to 2048 characters
282    pub fn version<S: Into<String>>(mut self, version: S) -> Self {
283        let mut version = version.into();
284        version.truncate(2048);
285        self.metadata.version = Some(version);
286        self
287    }
288
289    /// Set the revision reported by the gitlab runner
290    ///
291    /// The revision will be truncated to 255 characters
292    pub fn revision<S: Into<String>>(mut self, revision: S) -> Self {
293        let mut revision = revision.into();
294        revision.truncate(255);
295        self.metadata.revision = Some(revision);
296        self
297    }
298
299    /// Set the platform reported by the gitlab runner
300    ///
301    /// The platform will be truncated to 255 characters
302    pub fn platform<S: Into<String>>(mut self, platform: S) -> Self {
303        let mut platform = platform.into();
304        platform.truncate(255);
305        self.metadata.platform = Some(platform);
306        self
307    }
308
309    /// Set the architecture reported by the gitlab runner
310    ///
311    /// The architecture will be truncated to 255 characters
312    pub fn architecture<S: Into<String>>(mut self, architecture: S) -> Self {
313        let mut architecture = architecture.into();
314        architecture.truncate(255);
315        self.metadata.architecture = Some(architecture);
316        self
317    }
318
319    async fn generate_system_id_from_machine_id() -> Option<String> {
320        let mut f = match File::open("/etc/machine-id").await {
321            Ok(f) => f,
322            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
323                debug!("/etc/machine-id not found, not generate systemd id based on it");
324                return None;
325            }
326            Err(e) => {
327                warn!("Failed to open machine-id: {e}");
328                return None;
329            }
330        };
331
332        let mut id = [0u8; 32];
333        if let Err(e) = f.read_exact(&mut id).await {
334            warn!("Failed to read from machine-id: {e}");
335            return None;
336        };
337
338        // Infallible as a hmac can take a key of any size
339        let mut mac = Hmac::<sha2::Sha256>::new_from_slice(&id).unwrap();
340        mac.update(b"gitlab-runner");
341
342        let mut system_id = String::from("s_");
343        // 2 hex chars for each byte
344        for b in &mac.finalize().into_bytes()[0..Self::DEFAULT_ID_LEN / 2] {
345            // Infallible: writing to a string
346            write!(&mut system_id, "{:02x}", b).unwrap();
347        }
348        Some(system_id)
349    }
350
351    async fn generate_system_id() -> String {
352        if let Some(system_id) = Self::generate_system_id_from_machine_id().await {
353            system_id
354        } else {
355            let mut system_id = String::from("r_");
356            Alphanumeric.append_string(&mut rand::rng(), &mut system_id, Self::DEFAULT_ID_LEN);
357            system_id
358        }
359    }
360
361    /// Build the runner.
362    pub async fn build(self) -> Runner {
363        let system_id = match self.system_id {
364            Some(system_id) => system_id,
365            None => Self::generate_system_id().await,
366        };
367        let client = Client::new(self.server, self.token, system_id, self.metadata);
368        Runner {
369            client,
370            build_dir: self.build_dir,
371            run_list: self.run_list,
372        }
373    }
374}
375
376/// Runner for gitlab
377///
378/// The runner is responsible for communicating with gitlab to request new job and spawn them.
379#[derive(Debug)]
380pub struct Runner {
381    client: Client,
382    build_dir: PathBuf,
383    run_list: RunList<u64, JobLog>,
384}
385
386impl Runner {
387    /// The number of jobs currently running
388    pub fn running(&self) -> usize {
389        self.run_list.size()
390    }
391
392    /// Try to request a single job from gitlab
393    ///
394    /// This does a single poll of gitlab for a new job. If a new job received a new asynchronous
395    /// task is spawned for processing the job. The passed `process` function is called to create a
396    /// the actual job handler. Returns whether or not a job was received or an error if polling
397    /// gitlab failed.
398    ///
399    /// Note that this function is not cancel safe. If the future gets cancelled gitlab might have
400    /// provided a job for which processing didn't start yet.
401    pub async fn request_job<F, J, U, Ret>(&mut self, process: F) -> Result<bool, client::Error>
402    where
403        F: FnOnce(Job) -> Ret + Sync + Send + 'static,
404        J: CancellableJobHandler<U> + Send + 'static,
405        U: UploadableFile + Send + 'static,
406        Ret: Future<Output = Result<J, ()>> + Send + 'static,
407    {
408        let response = self.client.request_job().await?;
409        if let Some(response) = response {
410            let mut build_dir = self.build_dir.clone();
411            build_dir.push(format!("{}", response.id));
412            let mut run = Run::new(self.client.clone(), response, &mut self.run_list);
413            tokio::spawn(
414                async move { run.run(process, build_dir).await }.with_current_subscriber(),
415            );
416            Ok(true)
417        } else {
418            Ok(false)
419        }
420    }
421
422    /// Wait untill there are less then max jobs running
423    pub async fn wait_for_space(&mut self, max: usize) {
424        self.run_list.wait_for_space(max).await;
425    }
426
427    /// Wait untill there are no more jobs running
428    pub async fn drain(&mut self) {
429        self.run_list.wait_for_space(1).await;
430    }
431
432    /// Run continously, processing at most `maximum` jobs concurrently
433    ///
434    /// This essentially calls [`Runner::request_job`] requesting jobs until at most `maximum` jobs are
435    /// running in parallel.
436    pub async fn run<F, U, J, Ret>(
437        &mut self,
438        process: F,
439        maximum: usize,
440    ) -> Result<(), client::Error>
441    where
442        F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
443        J: CancellableJobHandler<U> + Send + 'static,
444        U: UploadableFile + Send + 'static,
445        Ret: Future<Output = Result<J, ()>> + Send + 'static,
446    {
447        loop {
448            self.wait_for_space(maximum).await;
449            match self.request_job(process.clone()).await {
450                /* continue to pick up a new job straight away */
451                Ok(true) => continue,
452                Ok(false) => (),
453                Err(e) => warn!("Couldn't get a job from gitlab: {:?}", e),
454            }
455            sleep(Duration::from_secs(5)).await;
456        }
457    }
458}