gitlab_runner/lib.rs
1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
3pub mod artifact;
4mod client;
5mod logging;
6use crate::client::Client;
7mod run;
8use crate::run::Run;
9pub mod job;
10use client::ClientMetadata;
11use hmac::Hmac;
12use hmac::KeyInit;
13use hmac::Mac;
14use job::{Job, JobLog};
15pub mod uploader;
16pub use logging::GitlabLayer;
17use rand::distr::Alphanumeric;
18use rand::distr::SampleString;
19use runlist::JobRunList;
20use tokio::fs::File;
21use tokio::io::AsyncReadExt;
22use tokio_util::sync::CancellationToken;
23use tracing::debug;
24use tracing::instrument::WithSubscriber;
25
26mod runlist;
27use crate::runlist::RunList;
28
29use futures::AsyncRead;
30use futures::prelude::*;
31use std::borrow::Cow;
32use std::fmt::Write;
33use std::path::PathBuf;
34use tokio::time::{Duration, sleep};
35use tracing::warn;
36use url::Url;
37
38#[doc(hidden)]
39pub use ::tracing;
40
41/// Output a line to the gitlab log
42#[macro_export]
43macro_rules! outputln {
44 ($f: expr) => {
45 $crate::tracing::trace!(gitlab.output = true, $f)
46 };
47 ($f: expr, $($arg: tt) *) => {
48 $crate::tracing::trace!(gitlab.output = true, $f, $($arg)*)
49 };
50}
51
52/// Result type for various stagings of a jobs
53pub type JobResult = Result<(), ()>;
54pub use client::Phase;
55
56/// A file that a [`JobHandler`] is willing to upload to the server
57///
58/// A [`JobHandler`] will be queried for the set of files it is able
59/// to upload to the server. These might be on disk, or they might be
60/// generated or downloaded from some other source on demand. The
61/// `get_path` method is required so that the globbing Gitlab expects
62/// can be performed without the handler needing to be involved.
63#[async_trait::async_trait]
64pub trait UploadableFile {
65 /// The type of the data stream returned by
66 /// [`get_data`](Self::get_data)
67 type Data<'a>: AsyncRead + Send + Unpin
68 where
69 Self: 'a;
70
71 /// Get the logical path of the file.
72 ///
73 /// This is the path on disk from the root of the checkout for
74 /// Gitlab's own runner. It should match the paths that users are
75 /// expected to specify when requesting artifacts in their jobs.
76 fn get_path(&self) -> Cow<'_, str>;
77
78 /// Get something that can provide the data for this file.
79 ///
80 /// This can be any implementor of [`AsyncRead`].
81 async fn get_data(&self) -> Result<Self::Data<'_>, ()>;
82}
83
84/// An [`UploadableFile`] type for JobHandlers that expose no files.
85///
86/// This will panic on attempting to actually pass it to the crate,
87/// but if you don't want or need to return artifacts to Gitlab it
88/// provides a sensible default.
89#[derive(Clone, Debug, Eq, PartialEq)]
90pub struct NoFiles {}
91
92#[async_trait::async_trait]
93impl UploadableFile for NoFiles {
94 type Data<'a> = &'a [u8];
95
96 fn get_path(&self) -> Cow<'_, str> {
97 unreachable!("tried to get path of NoFiles")
98 }
99 async fn get_data(&self) -> Result<Self::Data<'_>, ()> {
100 unreachable!("tried to read data from NoFiles");
101 }
102}
103
104/// Async trait for handling a single Job that handles its own cancellation
105///
106/// This trait is largely identical to [`JobHandler`], but its methods explicitly take a
107/// `CancellationToken`, which will be triggered if GitLab cancels the job, after which the method
108/// will be responsible for cancellation appropriately. In most cases, the entire execution should
109/// simply be cancelled, in which case [`JobHandler`]'s default behavior is desirable instead. (Even
110/// when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be performed.)
111///
112/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
113/// crate. However this also means the rustdoc documentation is interesting...
114#[async_trait::async_trait]
115pub trait CancellableJobHandler<U = NoFiles>: Send
116where
117 U: UploadableFile + Send + 'static,
118{
119 /// Do a single step of a job
120 ///
121 /// This gets called for each phase of the job (e.g. script and after_script). The passed
122 /// string array is the same array as was passed for a given step in the job definition. If the
123 /// job is cancelled while this is running, the given `cancel_token` will be triggered.
124 ///
125 /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
126 /// [Phase::Script] step
127 async fn step(
128 &mut self,
129 script: &[String],
130 phase: Phase,
131 cancel_token: &CancellationToken,
132 ) -> JobResult;
133
134 /// Get a list of the files available to upload
135 ///
136 /// See the description [`UploadableFile`] for more information.
137 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
138 Ok(Box::new(core::iter::empty()))
139 }
140
141 /// Cleanup after the job is finished
142 ///
143 /// This method always get called whether or not the job succeeded or was acancelled, allowing
144 /// the job handler to clean up as necessary.
145 async fn cleanup(&mut self) {}
146}
147
148/// Async trait for handling a single Job
149///
150/// In the event of being cancelled by GitLab, the `step` function will have its future dropped
151/// instantly. If manual handling of cancellation is required, use `CancellableJobHandler` instead.
152/// (Even when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be
153/// performed.)
154///
155/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
156/// crate. However this also means the rustdoc documentation is interesting...
157#[async_trait::async_trait]
158pub trait JobHandler<U = NoFiles>: Send
159where
160 U: UploadableFile + Send + 'static,
161{
162 /// Do a single step of a job
163 ///
164 /// This gets called for each phase of the job (e.g. script and after_script). The passed
165 /// string array is the same array as was passed for a given step in the job definition. If the
166 /// job is cancelled while this is running, its future will dropped, resulting in the function's
167 /// termination.
168 ///
169 /// Note that gitlab concatinates the `before_script` and `script` arrays into a single
170 /// [Phase::Script] step
171 async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
172
173 /// Get a list of the files available to upload
174 ///
175 /// See the description [`UploadableFile`] for more information.
176 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
177 Ok(Box::new(core::iter::empty()))
178 }
179
180 /// Cleanup after the job is finished
181 ///
182 /// This method always get called whether or not the job succeeded or was cancelled, allowing
183 /// the job handler to clean up as necessary.
184 async fn cleanup(&mut self) {}
185}
186
187#[async_trait::async_trait]
188impl<J, U> CancellableJobHandler<U> for J
189where
190 J: JobHandler<U>,
191 U: UploadableFile + Send + 'static,
192{
193 async fn step(
194 &mut self,
195 script: &[String],
196 phase: Phase,
197 cancel_token: &CancellationToken,
198 ) -> JobResult {
199 tokio::select! {
200 r = self.step(script, phase) => r,
201 _ = cancel_token.cancelled() => Ok(()),
202 }
203 }
204
205 async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
206 self.get_uploadable_files().await
207 }
208
209 async fn cleanup(&mut self) {
210 self.cleanup().await;
211 }
212}
213
214/// Builder for [`Runner`]
215pub struct RunnerBuilder {
216 server: Url,
217 token: String,
218 build_dir: PathBuf,
219 system_id: Option<String>,
220 run_list: RunList<u64, JobLog>,
221 metadata: ClientMetadata,
222}
223
224impl RunnerBuilder {
225 // The official gitlab runner uses 2 char prefixes (s_ or r_) followed
226 // by 12 characters for a unique identifier
227 const DEFAULT_ID_LEN: usize = 12;
228 /// Create a new [`RunnerBuilder`] for the given server url, runner token,
229 /// build dir and job list (as created by GitlabLayer::new).
230 ///
231 /// The build_dir is used to store temporary files during a job run.
232 /// ```
233 /// # use tracing_subscriber::{prelude::*, Registry};
234 /// # use gitlab_runner::{RunnerBuilder, GitlabLayer};
235 /// # use url::Url;
236 /// #
237 /// #[tokio::main]
238 /// # async fn main() {
239 /// let dir = tempfile::tempdir().unwrap();
240 /// let (layer, jobs) = GitlabLayer::new();
241 /// let subscriber = Registry::default().with(layer).init();
242 /// let runner = RunnerBuilder::new(
243 /// Url::parse("https://gitlab.com/").unwrap(),
244 /// "RunnerToken",
245 /// dir.path(),
246 /// jobs
247 /// )
248 /// .build()
249 /// .await;
250 /// # }
251 /// ```
252 pub fn new<P: Into<PathBuf>, S: Into<String>>(
253 server: Url,
254 token: S,
255 build_dir: P,
256 jobs: JobRunList,
257 ) -> Self {
258 RunnerBuilder {
259 server,
260 token: token.into(),
261 build_dir: build_dir.into(),
262 system_id: None,
263 run_list: jobs.inner(),
264 metadata: ClientMetadata::default(),
265 }
266 }
267
268 /// Set the [system_id](https://docs.gitlab.com/runner/fleet_scaling/#generation-of-system_id-identifiers) for this runner
269 ///
270 /// The system_id will be truncated to 64 characters to match gitlabs limit,
271 /// but no further validation will be done. It's up to the caller to ensure the
272 /// system_id is valid for gitlab
273 pub fn system_id<S: Into<String>>(mut self, system_id: S) -> Self {
274 let mut system_id = system_id.into();
275 system_id.truncate(64);
276 self.system_id = Some(system_id);
277 self
278 }
279
280 /// Set the version reported by the gitlab runner
281 ///
282 /// The version will be truncated to 2048 characters
283 pub fn version<S: Into<String>>(mut self, version: S) -> Self {
284 let mut version = version.into();
285 version.truncate(2048);
286 self.metadata.version = Some(version);
287 self
288 }
289
290 /// Set the revision reported by the gitlab runner
291 ///
292 /// The revision will be truncated to 255 characters
293 pub fn revision<S: Into<String>>(mut self, revision: S) -> Self {
294 let mut revision = revision.into();
295 revision.truncate(255);
296 self.metadata.revision = Some(revision);
297 self
298 }
299
300 /// Set the platform reported by the gitlab runner
301 ///
302 /// The platform will be truncated to 255 characters
303 pub fn platform<S: Into<String>>(mut self, platform: S) -> Self {
304 let mut platform = platform.into();
305 platform.truncate(255);
306 self.metadata.platform = Some(platform);
307 self
308 }
309
310 /// Set the architecture reported by the gitlab runner
311 ///
312 /// The architecture will be truncated to 255 characters
313 pub fn architecture<S: Into<String>>(mut self, architecture: S) -> Self {
314 let mut architecture = architecture.into();
315 architecture.truncate(255);
316 self.metadata.architecture = Some(architecture);
317 self
318 }
319
320 async fn generate_system_id_from_machine_id() -> Option<String> {
321 let mut f = match File::open("/etc/machine-id").await {
322 Ok(f) => f,
323 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
324 debug!("/etc/machine-id not found, not generate systemd id based on it");
325 return None;
326 }
327 Err(e) => {
328 warn!("Failed to open machine-id: {e}");
329 return None;
330 }
331 };
332
333 let mut id = [0u8; 32];
334 if let Err(e) = f.read_exact(&mut id).await {
335 warn!("Failed to read from machine-id: {e}");
336 return None;
337 };
338
339 // Infallible as a hmac can take a key of any size
340 let mut mac = Hmac::<sha2::Sha256>::new_from_slice(&id).unwrap();
341 mac.update(b"gitlab-runner");
342
343 let mut system_id = String::from("s_");
344 // 2 hex chars for each byte
345 for b in &mac.finalize().into_bytes()[0..Self::DEFAULT_ID_LEN / 2] {
346 // Infallible: writing to a string
347 write!(&mut system_id, "{b:02x}").unwrap();
348 }
349 Some(system_id)
350 }
351
352 async fn generate_system_id() -> String {
353 if let Some(system_id) = Self::generate_system_id_from_machine_id().await {
354 system_id
355 } else {
356 let mut system_id = String::from("r_");
357 Alphanumeric.append_string(&mut rand::rng(), &mut system_id, Self::DEFAULT_ID_LEN);
358 system_id
359 }
360 }
361
362 /// Build the runner.
363 pub async fn build(self) -> Runner {
364 let system_id = match self.system_id {
365 Some(system_id) => system_id,
366 None => Self::generate_system_id().await,
367 };
368 let client = Client::new(self.server, self.token, system_id, self.metadata);
369 Runner {
370 client,
371 build_dir: self.build_dir,
372 run_list: self.run_list,
373 }
374 }
375}
376
377/// Runner for gitlab
378///
379/// The runner is responsible for communicating with gitlab to request new job and spawn them.
380#[derive(Debug)]
381pub struct Runner {
382 client: Client,
383 build_dir: PathBuf,
384 run_list: RunList<u64, JobLog>,
385}
386
387impl Runner {
388 /// The number of jobs currently running
389 pub fn running(&self) -> usize {
390 self.run_list.size()
391 }
392
393 /// Try to request a single job from gitlab
394 ///
395 /// This does a single poll of gitlab for a new job. If a new job received a new asynchronous
396 /// task is spawned for processing the job. The passed `process` function is called to create a
397 /// the actual job handler. Returns whether or not a job was received or an error if polling
398 /// gitlab failed.
399 ///
400 /// Note that this function is not cancel safe. If the future gets cancelled gitlab might have
401 /// provided a job for which processing didn't start yet.
402 pub async fn request_job<F, J, U, Ret>(&mut self, process: F) -> Result<bool, client::Error>
403 where
404 F: FnOnce(Job) -> Ret + Sync + Send + 'static,
405 J: CancellableJobHandler<U> + Send + 'static,
406 U: UploadableFile + Send + 'static,
407 Ret: Future<Output = Result<J, ()>> + Send + 'static,
408 {
409 let response = self.client.request_job().await?;
410 if let Some(response) = response {
411 let mut build_dir = self.build_dir.clone();
412 build_dir.push(format!("{}", response.id));
413 let mut run = Run::new(self.client.clone(), response, &mut self.run_list);
414 tokio::spawn(
415 async move { run.run(process, build_dir).await }.with_current_subscriber(),
416 );
417 Ok(true)
418 } else {
419 Ok(false)
420 }
421 }
422
423 /// Wait untill there are less then max jobs running
424 pub async fn wait_for_space(&mut self, max: usize) {
425 self.run_list.wait_for_space(max).await;
426 }
427
428 /// Wait untill there are no more jobs running
429 pub async fn drain(&mut self) {
430 self.run_list.wait_for_space(1).await;
431 }
432
433 /// Run continously, processing at most `maximum` jobs concurrently
434 ///
435 /// This essentially calls [`Runner::request_job`] requesting jobs until at most `maximum` jobs are
436 /// running in parallel.
437 pub async fn run<F, U, J, Ret>(
438 &mut self,
439 process: F,
440 maximum: usize,
441 ) -> Result<(), client::Error>
442 where
443 F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
444 J: CancellableJobHandler<U> + Send + 'static,
445 U: UploadableFile + Send + 'static,
446 Ret: Future<Output = Result<J, ()>> + Send + 'static,
447 {
448 loop {
449 self.wait_for_space(maximum).await;
450 match self.request_job(process.clone()).await {
451 /* continue to pick up a new job straight away */
452 Ok(true) => continue,
453 Ok(false) => (),
454 Err(e) => warn!("Couldn't get a job from gitlab: {:?}", e),
455 }
456 sleep(Duration::from_secs(5)).await;
457 }
458 }
459}