use std::pin::Pin;
use gitlab_runner::job::Job;
use gitlab_runner::{GitlabLayer, JobHandler, JobResult, Phase, RunnerBuilder};
use gitlab_runner_mock::{GitlabRunnerMock, MockJob, MockJobState};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::{Duration, sleep};
use tracing::instrument::WithSubscriber;
use tracing_subscriber::Registry;
use tracing_subscriber::layer::SubscriberExt;
struct Completion(oneshot::Receiver<()>);
impl std::future::Future for Completion {
type Output = Result<(), ()>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.0).poll(cx).map_err(|_e| ())
}
}
#[derive(Debug)]
enum Control {
Log(String, oneshot::Sender<()>),
#[allow(dead_code)]
Sleep(u64, oneshot::Sender<()>),
Finish(JobResult),
}
struct LoggerControl(mpsc::Sender<Control>);
impl LoggerControl {
fn new() -> (Self, mpsc::Receiver<Control>) {
let (tx, rx) = mpsc::channel(8);
(LoggerControl(tx), rx)
}
async fn log(&self, msg: String) -> Completion {
let (tx, rx) = oneshot::channel();
let control = Control::Log(msg, tx);
self.0.send(control).await.expect("Failed to send control");
Completion(rx)
}
async fn log_wait(&self, msg: String) {
let completion = self.log(msg).await;
completion.await.expect("Faild to wait for log completion");
}
#[allow(dead_code)]
async fn sleep(&self, time: u64) -> Completion {
let (tx, rx) = oneshot::channel();
let control = Control::Sleep(time, tx);
self.0.send(control).await.expect("Failed to send control");
Completion(rx)
}
async fn finish(self, result: JobResult) {
let control = Control::Finish(result);
self.0.send(control).await.expect("Failed to send control");
}
}
struct Logger {
job: Job,
rx: mpsc::Receiver<Control>,
}
impl Logger {
fn new(job: Job, rx: mpsc::Receiver<Control>) -> Self {
Self { job, rx }
}
}
#[async_trait::async_trait]
impl JobHandler for Logger {
async fn step(&mut self, _script: &[String], _phase: Phase) -> JobResult {
while let Some(command) = self.rx.recv().await {
match command {
Control::Log(s, tx) => {
self.job.trace(s);
let _ = tx.send(());
}
Control::Sleep(d, tx) => {
sleep(Duration::from_secs(d)).await;
let _ = tx.send(());
}
Control::Finish(result) => return result,
}
}
Err(())
}
}
async fn busy_wait<F>(f: F)
where
F: Fn() -> bool,
{
while f() {
std::thread::sleep(Duration::from_millis(5));
tokio::task::yield_now().await;
}
}
async fn busy_delay(d: Duration) {
let now = std::time::Instant::now();
let deadline = now + d;
busy_wait(|| deadline >= std::time::Instant::now()).await
}
async fn log_ping(job: &MockJob, control: &LoggerControl, mut patches: u32) -> u32 {
let ping = "ping";
control.log_wait(ping.to_string()).await;
tokio::time::advance(Duration::from_secs(100)).await;
busy_wait(|| job.log_patches() == patches).await;
patches += 1;
assert_eq!(job.log_patches(), patches);
assert_eq!(job.log_last(), Some(ping.as_bytes().to_vec()));
patches
}
async fn log_batch(
job: &MockJob,
control: &LoggerControl,
mut patches: u32,
interval: Duration,
) -> u32 {
control.log_wait("1".to_string()).await;
tokio::time::advance(interval / 4).await;
busy_delay(Duration::from_millis(100)).await;
assert_eq!(job.log_patches(), patches, "Seen an unexpected log patch");
control.log_wait("2".to_string()).await;
tokio::time::advance(interval / 4).await;
busy_delay(Duration::from_millis(100)).await;
assert_eq!(job.log_patches(), patches, "Seen an unexpected log patch");
tokio::time::advance(interval / 2).await;
busy_wait(|| job.log_patches() == patches).await;
patches += 1;
assert_eq!(job.log_patches(), patches);
assert_eq!(job.log_last(), Some(vec![b'1', b'2']));
patches
}
#[tokio::test(start_paused = true)]
async fn update_interval() {
let mock = GitlabRunnerMock::start().await;
let job = mock.add_dummy_job("logging".to_string());
let dir = tempfile::tempdir().unwrap();
let (layer, jobs) = GitlabLayer::new();
let subscriber = Registry::default().with(layer);
let mut runner = RunnerBuilder::new(mock.uri(), mock.runner_token(), dir.path(), jobs)
.build()
.await;
async {
let (control, rx) = LoggerControl::new();
let got_job = runner
.request_job(|job| async move { Ok(Logger::new(job, rx)) })
.await
.unwrap();
assert!(got_job);
let mut patches = 0;
patches = log_ping(&job, &control, patches).await;
let interval = Duration::from_secs(4);
patches = log_batch(&job, &control, patches, interval).await;
patches = log_batch(&job, &control, patches, interval).await;
patches = log_batch(&job, &control, patches, interval).await;
assert_eq!(job.state_updates(), 1);
tokio::time::advance(Duration::from_secs(40)).await;
busy_wait(|| job.state_updates() == 1).await;
assert_eq!(job.state_updates(), 2);
assert_eq!(job.log_patches(), patches);
patches = log_batch(&job, &control, patches, interval).await;
mock.set_update_interval(30);
patches = log_batch(&job, &control, patches, interval).await;
let interval = Duration::from_secs(30);
patches = log_batch(&job, &control, patches, interval).await;
assert_eq!(job.log_patches(), patches);
assert_eq!(job.state_updates(), 2);
tokio::time::advance(Duration::from_secs(40)).await;
busy_wait(|| job.state_updates() == 2).await;
assert_eq!(job.state_updates(), 3);
assert_eq!(job.log_patches(), patches);
tokio::time::resume();
control.finish(Ok(())).await;
runner.wait_for_space(1).await;
assert_eq!(MockJobState::Success, job.state());
}
.with_subscriber(subscriber)
.await;
}