gitlab_runner/lib.rs
1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3pub mod artifact;
4mod client;
5mod logging;
6use crate::client::Client;
7mod run;
8use crate::run::Run;
9pub mod job;
10use client::ClientMetadata;
11use hmac::Hmac;
12use hmac::Mac;
13use job::{Job, JobLog};
14pub mod uploader;
15pub use logging::GitlabLayer;
16use rand::distr::Alphanumeric;
17use rand::distr::SampleString;
18use runlist::JobRunList;
19use tokio::fs::File;
20use tokio::io::AsyncReadExt;
21use tokio_util::sync::CancellationToken;
22use tracing::debug;
23use tracing::instrument::WithSubscriber;
24
25mod runlist;
26use crate::runlist::RunList;
27
28use futures::prelude::*;
29use futures::AsyncRead;
30use std::borrow::Cow;
31use std::fmt::Write;
32use std::path::PathBuf;
33use tokio::time::{sleep, Duration};
34use tracing::warn;
35use url::Url;
36
37#[doc(hidden)]
38pub use ::tracing;
39
40/// Output a line to the gitlab log
41#[macro_export]
42macro_rules! outputln {
43 ($f: expr) => {
44 $crate::tracing::trace!(gitlab.output = true, $f)
45 };
46 ($f: expr, $($arg: tt) *) => {
47 $crate::tracing::trace!(gitlab.output = true, $f, $($arg)*)
48 };
49}
50
51/// Result type for various stagings of a jobs
52pub type JobResult = Result<(), ()>;
53pub use client::Phase;
54
55/// A file that a [`JobHandler`] is willing to upload to the server
56///
57/// A [`JobHandler`] will be queried for the set of files it is able
58/// to upload to the server. These might be on disk, or they might be
59/// generated or downloaded from some other source on demand. The
60/// `get_path` method is required so that the globbing Gitlab expects
61/// can be performed without the handler needing to be involved.
62#[async_trait::async_trait]
63pub trait UploadableFile: Eq {
64 /// The type of the data stream returned by
65 /// [`get_data`](Self::get_data)
66 type Data<'a>: AsyncRead + Send + Unpin
67 where
68 Self: 'a;
69
70 /// Get the logical path of the file.
71 ///
72 /// This is the path on disk from the root of the checkout for
73 /// Gitlab's own runner. It should match the paths that users are
74 /// expected to specify when requesting artifacts in their jobs.
75 fn get_path(&self) -> Cow<'_, str>;
76
77 /// Get something that can provide the data for this file.
78 ///
79 /// This can be any implementor of [`AsyncRead`].
80 async fn get_data(&self) -> Result<Self::Data<'_>, ()>;
81}
82
83/// An [`UploadableFile`] type for JobHandlers that expose no files.
84///
85/// This will panic on attempting to actually pass it to the crate,
86/// but if you don't want or need to return artifacts to Gitlab it
87/// provides a sensible default.
88#[derive(Clone, Debug, Eq, PartialEq)]
89pub struct NoFiles {}
90
91#[async_trait::async_trait]
92impl UploadableFile for NoFiles {
93 type Data<'a> = &'a [u8];
94
95 fn get_path(&self) -> Cow<'_, str> {
96 unreachable!("tried to get path of NoFiles")
97 }
98 async fn get_data(&self) -> Result<Self::Data<'_>, ()> {
99 unreachable!("tried to read data from NoFiles");
100 }
101}
102
103/// Async trait for handling a single Job that handles its own cancellation
104///
105/// This trait is largely identical to [`JobHandler`], but its methods explicitly take a
106/// `CancellationToken`, which will be triggered if GitLab cancels the job, after which the method
107/// will be responsible for cancellation appropriately. In most cases, the entire execution should
108/// simply be cancelled, in which case [`JobHandler`]'s default behavior is desirable instead. (Even
109/// when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be performed.)
110///
111/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
112/// crate. However this also means the rustdoc documentation is interesting...
113#[async_trait::async_trait]
114pub trait CancellableJobHandler<U = NoFiles>: Send
115where
116 U: UploadableFile + Send + 'static,
117{
118 /// Do a single step of a job
119 ///
120 /// This gets called for each phase of the job (e.g. script and after_script). The passed
121 /// string array is the same array as was passed for a given step in the job definition. If the
122 /// job is cancelled while this is running, the given `cancel_token` will be triggered.
123 ///
124 /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
125 /// [Phase::Script] step
126 async fn step(
127 &mut self,
128 script: &[String],
129 phase: Phase,
130 cancel_token: &CancellationToken,
131 ) -> JobResult;
132
133 /// Get a list of the files available to upload
134 ///
135 /// See the description [`UploadableFile`] for more information.
136 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
137 Ok(Box::new(core::iter::empty()))
138 }
139
140 /// Cleanup after the job is finished
141 ///
142 /// This method always get called whether or not the job succeeded or was acancelled, allowing
143 /// the job handler to clean up as necessary.
144 async fn cleanup(&mut self) {}
145}
146
147/// Async trait for handling a single Job
148///
149/// In the event of being cancelled by GitLab, the `step` function will have its future dropped
150/// instantly. If manual handling of cancellation is required, use `CancellableJobHandler` instead.
151/// (Even when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be
152/// performed.)
153///
154/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
155/// crate. However this also means the rustdoc documentation is interesting...
156#[async_trait::async_trait]
157pub trait JobHandler<U = NoFiles>: Send
158where
159 U: UploadableFile + Send + 'static,
160{
161 /// Do a single step of a job
162 ///
163 /// This gets called for each phase of the job (e.g. script and after_script). The passed
164 /// string array is the same array as was passed for a given step in the job definition. If the
165 /// job is cancelled while this is running, its future will dropped, resulting in the function's
166 /// termination.
167 ///
168 /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
169 /// [Phase::Script] step
170 async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
171
172 /// Get a list of the files available to upload
173 ///
174 /// See the description [`UploadableFile`] for more information.
175 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
176 Ok(Box::new(core::iter::empty()))
177 }
178
179 /// Cleanup after the job is finished
180 ///
181 /// This method always get called whether or not the job succeeded or was cancelled, allowing
182 /// the job handler to clean up as necessary.
183 async fn cleanup(&mut self) {}
184}
185
186#[async_trait::async_trait]
187impl<J, U> CancellableJobHandler<U> for J
188where
189 J: JobHandler<U>,
190 U: UploadableFile + Send + 'static,
191{
192 async fn step(
193 &mut self,
194 script: &[String],
195 phase: Phase,
196 cancel_token: &CancellationToken,
197 ) -> JobResult {
198 tokio::select! {
199 r = self.step(script, phase) => r,
200 _ = cancel_token.cancelled() => Ok(()),
201 }
202 }
203
204 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
205 self.get_uploadable_files().await
206 }
207
208 async fn cleanup(&mut self) {
209 self.cleanup().await;
210 }
211}
212
213/// Builder for [`Runner`]
214pub struct RunnerBuilder {
215 server: Url,
216 token: String,
217 build_dir: PathBuf,
218 system_id: Option<String>,
219 run_list: RunList<u64, JobLog>,
220 metadata: ClientMetadata,
221}
222
223impl RunnerBuilder {
224 // The official gitlab runner uses 2 char prefixes (s_ or r_) followed
225 // by 12 characters for a unique identifier
226 const DEFAULT_ID_LEN: usize = 12;
227 /// Create a new [`RunnerBuilder`] for the given server url, runner token,
228 /// build dir and job list (as created by GitlabLayer::new).
229 ///
230 /// The build_dir is used to store temporary files during a job run.
231 /// ```
232 /// # use tracing_subscriber::{prelude::*, Registry};
233 /// # use gitlab_runner::{RunnerBuilder, GitlabLayer};
234 /// # use url::Url;
235 /// #
236 /// #[tokio::main]
237 /// # async fn main() {
238 /// let dir = tempfile::tempdir().unwrap();
239 /// let (layer, jobs) = GitlabLayer::new();
240 /// let subscriber = Registry::default().with(layer).init();
241 /// let runner = RunnerBuilder::new(
242 /// Url::parse("https://gitlab.com/").unwrap(),
243 /// "RunnerToken",
244 /// dir.path(),
245 /// jobs
246 /// )
247 /// .build()
248 /// .await;
249 /// # }
250 /// ```
251 pub fn new<P: Into<PathBuf>, S: Into<String>>(
252 server: Url,
253 token: S,
254 build_dir: P,
255 jobs: JobRunList,
256 ) -> Self {
257 RunnerBuilder {
258 server,
259 token: token.into(),
260 build_dir: build_dir.into(),
261 system_id: None,
262 run_list: jobs.inner(),
263 metadata: ClientMetadata::default(),
264 }
265 }
266
267 /// Set the [system_id](https://docs.gitlab.com/runner/fleet_scaling/#generation-of-system_id-identifiers) for this runner
268 ///
269 /// The system_id will be truncated to 64 characters to match gitlabs limit,
270 /// but no further validation will be done. It's up to the caller to ensure the
271 /// system_id is valid for gitlab
272 pub fn system_id<S: Into<String>>(mut self, system_id: S) -> Self {
273 let mut system_id = system_id.into();
274 system_id.truncate(64);
275 self.system_id = Some(system_id);
276 self
277 }
278
279 /// Set the version reported by the gitlab runner
280 ///
281 /// The version will be truncated to 2048 characters
282 pub fn version<S: Into<String>>(mut self, version: S) -> Self {
283 let mut version = version.into();
284 version.truncate(2048);
285 self.metadata.version = Some(version);
286 self
287 }
288
289 /// Set the revision reported by the gitlab runner
290 ///
291 /// The revision will be truncated to 255 characters
292 pub fn revision<S: Into<String>>(mut self, revision: S) -> Self {
293 let mut revision = revision.into();
294 revision.truncate(255);
295 self.metadata.revision = Some(revision);
296 self
297 }
298
299 /// Set the platform reported by the gitlab runner
300 ///
301 /// The platform will be truncated to 255 characters
302 pub fn platform<S: Into<String>>(mut self, platform: S) -> Self {
303 let mut platform = platform.into();
304 platform.truncate(255);
305 self.metadata.platform = Some(platform);
306 self
307 }
308
309 /// Set the architecture reported by the gitlab runner
310 ///
311 /// The architecture will be truncated to 255 characters
312 pub fn architecture<S: Into<String>>(mut self, architecture: S) -> Self {
313 let mut architecture = architecture.into();
314 architecture.truncate(255);
315 self.metadata.architecture = Some(architecture);
316 self
317 }
318
319 async fn generate_system_id_from_machine_id() -> Option<String> {
320 let mut f = match File::open("/etc/machine-id").await {
321 Ok(f) => f,
322 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
323 debug!("/etc/machine-id not found, not generate systemd id based on it");
324 return None;
325 }
326 Err(e) => {
327 warn!("Failed to open machine-id: {e}");
328 return None;
329 }
330 };
331
332 let mut id = [0u8; 32];
333 if let Err(e) = f.read_exact(&mut id).await {
334 warn!("Failed to read from machine-id: {e}");
335 return None;
336 };
337
338 // Infallible as a hmac can take a key of any size
339 let mut mac = Hmac::<sha2::Sha256>::new_from_slice(&id).unwrap();
340 mac.update(b"gitlab-runner");
341
342 let mut system_id = String::from("s_");
343 // 2 hex chars for each byte
344 for b in &mac.finalize().into_bytes()[0..Self::DEFAULT_ID_LEN / 2] {
345 // Infallible: writing to a string
346 write!(&mut system_id, "{:02x}", b).unwrap();
347 }
348 Some(system_id)
349 }
350
351 async fn generate_system_id() -> String {
352 if let Some(system_id) = Self::generate_system_id_from_machine_id().await {
353 system_id
354 } else {
355 let mut system_id = String::from("r_");
356 Alphanumeric.append_string(&mut rand::rng(), &mut system_id, Self::DEFAULT_ID_LEN);
357 system_id
358 }
359 }
360
361 /// Build the runner.
362 pub async fn build(self) -> Runner {
363 let system_id = match self.system_id {
364 Some(system_id) => system_id,
365 None => Self::generate_system_id().await,
366 };
367 let client = Client::new(self.server, self.token, system_id, self.metadata);
368 Runner {
369 client,
370 build_dir: self.build_dir,
371 run_list: self.run_list,
372 }
373 }
374}
375
376/// Runner for gitlab
377///
378/// The runner is responsible for communicating with gitlab to request new job and spawn them.
379#[derive(Debug)]
380pub struct Runner {
381 client: Client,
382 build_dir: PathBuf,
383 run_list: RunList<u64, JobLog>,
384}
385
386impl Runner {
387 /// The number of jobs currently running
388 pub fn running(&self) -> usize {
389 self.run_list.size()
390 }
391
392 /// Try to request a single job from gitlab
393 ///
394 /// This does a single poll of gitlab for a new job. If a new job received a new asynchronous
395 /// task is spawned for processing the job. The passed `process` function is called to create a
396 /// the actual job handler. Returns whether or not a job was received or an error if polling
397 /// gitlab failed.
398 ///
399 /// Note that this function is not cancel safe. If the future gets cancelled gitlab might have
400 /// provided a job for which processing didn't start yet.
401 pub async fn request_job<F, J, U, Ret>(&mut self, process: F) -> Result<bool, client::Error>
402 where
403 F: FnOnce(Job) -> Ret + Sync + Send + 'static,
404 J: CancellableJobHandler<U> + Send + 'static,
405 U: UploadableFile + Send + 'static,
406 Ret: Future<Output = Result<J, ()>> + Send + 'static,
407 {
408 let response = self.client.request_job().await?;
409 if let Some(response) = response {
410 let mut build_dir = self.build_dir.clone();
411 build_dir.push(format!("{}", response.id));
412 let mut run = Run::new(self.client.clone(), response, &mut self.run_list);
413 tokio::spawn(
414 async move { run.run(process, build_dir).await }.with_current_subscriber(),
415 );
416 Ok(true)
417 } else {
418 Ok(false)
419 }
420 }
421
422 /// Wait untill there are less then max jobs running
423 pub async fn wait_for_space(&mut self, max: usize) {
424 self.run_list.wait_for_space(max).await;
425 }
426
427 /// Wait untill there are no more jobs running
428 pub async fn drain(&mut self) {
429 self.run_list.wait_for_space(1).await;
430 }
431
432 /// Run continously, processing at most `maximum` jobs concurrently
433 ///
434 /// This essentially calls [`Runner::request_job`] requesting jobs until at most `maximum` jobs are
435 /// running in parallel.
436 pub async fn run<F, U, J, Ret>(
437 &mut self,
438 process: F,
439 maximum: usize,
440 ) -> Result<(), client::Error>
441 where
442 F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
443 J: CancellableJobHandler<U> + Send + 'static,
444 U: UploadableFile + Send + 'static,
445 Ret: Future<Output = Result<J, ()>> + Send + 'static,
446 {
447 loop {
448 self.wait_for_space(maximum).await;
449 match self.request_job(process.clone()).await {
450 /* continue to pick up a new job straight away */
451 Ok(true) => continue,
452 Ok(false) => (),
453 Err(e) => warn!("Couldn't get a job from gitlab: {:?}", e),
454 }
455 sleep(Duration::from_secs(5)).await;
456 }
457 }
458}