#![warn(missing_docs)]
#![doc = include_str!("../README.md")]
pub mod artifact;
mod client;
mod logging;
use crate::client::Client;
mod run;
use crate::run::Run;
pub mod job;
use client::ClientMetadata;
use hmac::Hmac;
use hmac::Mac;
use job::{Job, JobLog};
pub mod uploader;
pub use logging::GitlabLayer;
use rand::distr::Alphanumeric;
use rand::distr::SampleString;
use runlist::JobRunList;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::instrument::WithSubscriber;
mod runlist;
use crate::runlist::RunList;
use futures::AsyncRead;
use futures::prelude::*;
use std::borrow::Cow;
use std::fmt::Write;
use std::path::PathBuf;
use tokio::time::{Duration, sleep};
use tracing::warn;
use url::Url;
#[doc(hidden)]
pub use ::tracing;
#[macro_export]
macro_rules! outputln {
($f: expr) => {
$crate::tracing::trace!(gitlab.output = true, $f)
};
($f: expr, $($arg: tt) *) => {
$crate::tracing::trace!(gitlab.output = true, $f, $($arg)*)
};
}
pub type JobResult = Result<(), ()>;
pub use client::Phase;
#[async_trait::async_trait]
pub trait UploadableFile {
type Data<'a>: AsyncRead + Send + Unpin
where
Self: 'a;
fn get_path(&self) -> Cow<'_, str>;
async fn get_data(&self) -> Result<Self::Data<'_>, ()>;
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct NoFiles {}
#[async_trait::async_trait]
impl UploadableFile for NoFiles {
type Data<'a> = &'a [u8];
fn get_path(&self) -> Cow<'_, str> {
unreachable!("tried to get path of NoFiles")
}
async fn get_data(&self) -> Result<Self::Data<'_>, ()> {
unreachable!("tried to read data from NoFiles");
}
}
#[async_trait::async_trait]
pub trait CancellableJobHandler<U = NoFiles>: Send
where
U: UploadableFile + Send + 'static,
{
async fn step(
&mut self,
script: &[String],
phase: Phase,
cancel_token: &CancellationToken,
) -> JobResult;
async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
Ok(Box::new(core::iter::empty()))
}
async fn cleanup(&mut self) {}
}
#[async_trait::async_trait]
pub trait JobHandler<U = NoFiles>: Send
where
U: UploadableFile + Send + 'static,
{
async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
Ok(Box::new(core::iter::empty()))
}
async fn cleanup(&mut self) {}
}
#[async_trait::async_trait]
impl<J, U> CancellableJobHandler<U> for J
where
J: JobHandler<U>,
U: UploadableFile + Send + 'static,
{
async fn step(
&mut self,
script: &[String],
phase: Phase,
cancel_token: &CancellationToken,
) -> JobResult {
tokio::select! {
r = self.step(script, phase) => r,
_ = cancel_token.cancelled() => Ok(()),
}
}
async fn get_uploadable_files(&mut self) -> Result<Box<dyn Iterator<Item = U> + Send>, ()> {
self.get_uploadable_files().await
}
async fn cleanup(&mut self) {
self.cleanup().await;
}
}
pub struct RunnerBuilder {
server: Url,
token: String,
build_dir: PathBuf,
system_id: Option<String>,
run_list: RunList<u64, JobLog>,
metadata: ClientMetadata,
}
impl RunnerBuilder {
const DEFAULT_ID_LEN: usize = 12;
pub fn new<P: Into<PathBuf>, S: Into<String>>(
server: Url,
token: S,
build_dir: P,
jobs: JobRunList,
) -> Self {
RunnerBuilder {
server,
token: token.into(),
build_dir: build_dir.into(),
system_id: None,
run_list: jobs.inner(),
metadata: ClientMetadata::default(),
}
}
pub fn system_id<S: Into<String>>(mut self, system_id: S) -> Self {
let mut system_id = system_id.into();
system_id.truncate(64);
self.system_id = Some(system_id);
self
}
pub fn version<S: Into<String>>(mut self, version: S) -> Self {
let mut version = version.into();
version.truncate(2048);
self.metadata.version = Some(version);
self
}
pub fn revision<S: Into<String>>(mut self, revision: S) -> Self {
let mut revision = revision.into();
revision.truncate(255);
self.metadata.revision = Some(revision);
self
}
pub fn platform<S: Into<String>>(mut self, platform: S) -> Self {
let mut platform = platform.into();
platform.truncate(255);
self.metadata.platform = Some(platform);
self
}
pub fn architecture<S: Into<String>>(mut self, architecture: S) -> Self {
let mut architecture = architecture.into();
architecture.truncate(255);
self.metadata.architecture = Some(architecture);
self
}
async fn generate_system_id_from_machine_id() -> Option<String> {
let mut f = match File::open("/etc/machine-id").await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
debug!("/etc/machine-id not found, not generate systemd id based on it");
return None;
}
Err(e) => {
warn!("Failed to open machine-id: {e}");
return None;
}
};
let mut id = [0u8; 32];
if let Err(e) = f.read_exact(&mut id).await {
warn!("Failed to read from machine-id: {e}");
return None;
};
let mut mac = Hmac::<sha2::Sha256>::new_from_slice(&id).unwrap();
mac.update(b"gitlab-runner");
let mut system_id = String::from("s_");
for b in &mac.finalize().into_bytes()[0..Self::DEFAULT_ID_LEN / 2] {
write!(&mut system_id, "{b:02x}").unwrap();
}
Some(system_id)
}
async fn generate_system_id() -> String {
if let Some(system_id) = Self::generate_system_id_from_machine_id().await {
system_id
} else {
let mut system_id = String::from("r_");
Alphanumeric.append_string(&mut rand::rng(), &mut system_id, Self::DEFAULT_ID_LEN);
system_id
}
}
pub async fn build(self) -> Runner {
let system_id = match self.system_id {
Some(system_id) => system_id,
None => Self::generate_system_id().await,
};
let client = Client::new(self.server, self.token, system_id, self.metadata);
Runner {
client,
build_dir: self.build_dir,
run_list: self.run_list,
}
}
}
#[derive(Debug)]
pub struct Runner {
client: Client,
build_dir: PathBuf,
run_list: RunList<u64, JobLog>,
}
impl Runner {
pub fn running(&self) -> usize {
self.run_list.size()
}
pub async fn request_job<F, J, U, Ret>(&mut self, process: F) -> Result<bool, client::Error>
where
F: FnOnce(Job) -> Ret + Sync + Send + 'static,
J: CancellableJobHandler<U> + Send + 'static,
U: UploadableFile + Send + 'static,
Ret: Future<Output = Result<J, ()>> + Send + 'static,
{
let response = self.client.request_job().await?;
if let Some(response) = response {
let mut build_dir = self.build_dir.clone();
build_dir.push(format!("{}", response.id));
let mut run = Run::new(self.client.clone(), response, &mut self.run_list);
tokio::spawn(
async move { run.run(process, build_dir).await }.with_current_subscriber(),
);
Ok(true)
} else {
Ok(false)
}
}
pub async fn wait_for_space(&mut self, max: usize) {
self.run_list.wait_for_space(max).await;
}
pub async fn drain(&mut self) {
self.run_list.wait_for_space(1).await;
}
pub async fn run<F, U, J, Ret>(
&mut self,
process: F,
maximum: usize,
) -> Result<(), client::Error>
where
F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
J: CancellableJobHandler<U> + Send + 'static,
U: UploadableFile + Send + 'static,
Ret: Future<Output = Result<J, ()>> + Send + 'static,
{
loop {
self.wait_for_space(maximum).await;
match self.request_job(process.clone()).await {
Ok(true) => continue,
Ok(false) => (),
Err(e) => warn!("Couldn't get a job from gitlab: {:?}", e),
}
sleep(Duration::from_secs(5)).await;
}
}
}