use crate::execution::BoxError;
use crate::execution::{
progress::Observation, repository::Execution, BoxFuture, BoxStream, JobRepository, Progress,
Runner,
};
use crate::job::JobId;
use core::{pin::Pin, task::{Context, Poll}};
use futures_lite::{FutureExt, Stream, StreamExt as _};
use std::collections::VecDeque;
pub struct Simple;
impl Runner for Simple {
fn run<'f, 'j: 'f, J: 'j + JobRepository + Send + Unpin>(
&self,
jobs: J,
goals: &[Box<str>],
) -> BoxStream<'f, (JobId, Progress)> {
let goals = goals.to_vec();
let ready = vec![].into();
Box::pin(SimpleEvents {
jobs,
goals,
running: vec![],
ready,
reload: false,
})
}
}
struct SimpleEvents<J> {
jobs: J,
goals: Vec<Box<str>>,
running: Vec<BoxFuture<'static, Execution>>,
ready: VecDeque<(JobId, Progress)>,
reload: bool,
}
impl<J> Stream for SimpleEvents<J>
where
J: JobRepository + Send + Unpin,
{
type Item = (JobId, Progress);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
jobs,
goals,
running,
ready,
reload,
} = self.as_mut().get_mut();
loop {
if let Some(ready) = ready.pop_front() {
break Poll::Ready(Some(ready));
}
if *reload || running.is_empty() {
*reload = false;
let runnable = jobs.execute(goals);
ready.extend(
runnable
.iter()
.map(|e| (e.job().id.clone(), Progress::Start)),
);
running.extend(runnable.into_iter().map(|mut e| {
Box::pin(async move {
run(&mut e).await;
e
}) as BoxFuture<'static, Execution>
}));
if running.is_empty() {
break Poll::Ready(None);
} else {
continue;
}
}
let mut completed = None;
for (i, v) in running.iter_mut().enumerate() {
match v.poll(cx) {
Poll::Ready(execution) => {
completed = Some((i, execution));
break;
}
Poll::Pending => continue,
}
}
let Some((i, execution)) = completed else {
return Poll::Pending;
};
*reload = true;
drop(running.remove(i));
ready.push_back((
execution.job().id.clone(),
Progress::Observation(Observation::Completed(
execution.progress().get_status().unwrap_or(1),
)),
));
}
}
}
async fn run(execution: &mut Execution) {
if let Err(e) = run_job_fallible(execution).await {
execution
.progress()
.update(Progress::Observation(Observation::Failure {
message: "job execution failed".into(),
source: Some(e.into()),
}));
}
}
async fn run_job_fallible(
execution: &mut Execution,
) -> Result<(), BoxError> {
let script = &execution.job().script.clone();
let mut interpretter = execution.commander().start()?;
execution.progress().update(Progress::Start);
for command in script {
execution
.progress()
.update(Progress::Command(command.clone()));
let mut logs = interpretter.execute(command).await?;
while let Some(observation) = logs.next().await {
execution
.progress()
.update(Progress::Observation(observation));
}
if execution.progress().is_failed() {
break;
}
}
execution.progress().update(Progress::Shutdown);
let mut shutdown = interpretter.shutdown().await?;
while let Some(log) = shutdown.next().await {
execution.progress().update(Progress::Observation(log));
}
Ok(())
}