mod builder;
mod condition_matcher;
mod context_payload;
pub use self::builder::ExecutionContextBuilder;
use crate::{
AstroRunSignal, Condition, Context, Error, Job, JobId, JobRunResult, Result, RunResult,
RunStepEvent, Runner, SharedPluginDriver, Signal, SignalManager, Step, StepRunResult, StreamExt,
Workflow, WorkflowLog, WorkflowRunResult, WorkflowState, WorkflowStateEvent,
};
pub use context_payload::*;
use std::sync::Arc;
use tokio::time;
#[derive(Clone)]
pub struct ExecutionContext {
runner: Arc<Box<dyn Runner>>,
plugin_driver: SharedPluginDriver,
signal_manager: SignalManager,
condition_matcher: condition_matcher::ConditionMatcher,
payload: Option<ContextPayload>,
}
impl ExecutionContext {
pub async fn run(&self, step: Step) -> StepRunResult {
let step = self.call_on_before_run_step(step).await;
let step_id = step.id.clone();
let timeout = step.timeout;
let started_at = chrono::Utc::now();
let event = crate::RunStepEvent {
source: step.clone(),
trigger_event: self.condition_matcher.event.clone(),
payload: self.payload.clone(),
};
self.call_on_run_step(event.clone()).await;
let event = WorkflowStateEvent::StepStateUpdated {
id: step_id.clone(),
state: WorkflowState::Queued,
};
self.call_on_state_change(event).await;
let job_signal = self
.signal_manager
.get_signal(&step.id.job_id())
.expect("Missing job signal");
let signal = AstroRunSignal::new();
let mut receiver = match self
.runner
.run(Context {
id: step_id.clone(),
signal: signal.clone(),
command: step.into(),
event: self.condition_matcher.event.clone(),
payload: self.payload_string(),
})
.await
{
Ok(receiver) => receiver,
Err(err) => {
let completed_at = chrono::Utc::now();
let duration = completed_at - started_at;
log::error!(
"Step {:?} failed with error {:?} in {} seconds",
step_id,
err,
duration.num_seconds()
);
let event = WorkflowStateEvent::StepStateUpdated {
id: step_id.clone(),
state: WorkflowState::Failed,
};
self.call_on_state_change(event).await;
let result = StepRunResult {
id: step_id,
state: WorkflowState::Failed,
exit_code: Some(1),
started_at: Some(started_at),
completed_at: Some(completed_at),
};
self.call_on_step_completed(result.clone()).await;
return result;
}
};
let event = WorkflowStateEvent::StepStateUpdated {
id: step_id.clone(),
state: WorkflowState::InProgress,
};
self.call_on_state_change(event).await;
loop {
tokio::select! {
_ = time::sleep(timeout) => {
signal.timeout().ok();
}
s = job_signal.recv() => {
match s {
Signal::Cancel => {
signal.cancel().ok();
}
Signal::Timeout => {
signal.timeout().ok();
}
}
}
received = receiver.next() => {
if let Some(log) = received {
let log = WorkflowLog {
step_id: step_id.clone(),
log_type: log.log_type,
message: log.message,
time: chrono::Utc::now(),
};
self.call_on_log(log.clone()).await;
} else {
break;
}
}
}
}
let res = receiver
.result()
.ok_or(Error::internal_runtime_error(
"Missing result from runner. This is a bug in the runner implementation.",
))
.unwrap();
let completed_at = chrono::Utc::now();
let duration = completed_at - started_at;
log::trace!(
"Step {:?} finished with result {:?} in {} seconds",
step_id,
res,
duration.num_seconds()
);
let res = match res {
RunResult::Succeeded => StepRunResult {
id: step_id.clone(),
state: WorkflowState::Succeeded,
exit_code: None,
started_at: Some(started_at),
completed_at: Some(completed_at),
},
RunResult::Failed { exit_code } => StepRunResult {
id: step_id.clone(),
state: WorkflowState::Failed,
exit_code: Some(exit_code),
started_at: Some(started_at),
completed_at: Some(completed_at),
},
RunResult::Cancelled => StepRunResult {
id: step_id.clone(),
state: WorkflowState::Cancelled,
exit_code: None,
started_at: Some(started_at),
completed_at: Some(completed_at),
},
};
let event = WorkflowStateEvent::StepStateUpdated {
id: step_id.clone(),
state: res.state.clone(),
};
self.call_on_state_change(event).await;
self.call_on_step_completed(res.clone()).await;
log::trace!("Step {:?} completed", step_id);
res
}
pub fn cancel_job(&self, job_id: &JobId) -> Result<()> {
self.signal_manager.cancel_job(job_id)
}
pub(crate) async fn is_match(&self, condition: &Condition) -> bool {
self.condition_matcher.is_match(condition).await
}
pub(crate) async fn call_on_run_workflow(&self, workflow: Workflow) {
let event = crate::RunWorkflowEvent {
source: workflow,
trigger_event: self.condition_matcher.event.clone(),
payload: self.payload.clone(),
};
self.plugin_driver.on_run_workflow(event.clone()).await;
if let Err(err) = self.runner.on_run_workflow(event).await {
log::error!("Failed to run workflow: {:?}", err);
}
}
pub(crate) async fn call_on_run_job(&self, job: Job) {
self
.signal_manager
.register_signal(job.id.clone(), AstroRunSignal::new());
let event = crate::RunJobEvent {
source: job,
trigger_event: self.condition_matcher.event.clone(),
payload: self.payload.clone(),
};
self.plugin_driver.on_run_job(event.clone()).await;
if let Err(err) = self.runner.on_run_job(event).await {
log::error!("Failed to run job: {:?}", err);
}
}
pub(crate) async fn call_on_state_change(&self, event: WorkflowStateEvent) {
self.plugin_driver.on_state_change(event.clone()).await;
if let Err(err) = self.runner.on_state_change(event).await {
log::error!("Failed to handle state change: {:?}", err);
}
}
pub(crate) async fn call_on_job_completed(&self, result: JobRunResult) {
self.signal_manager.unregister_signal(&result.id);
self.plugin_driver.on_job_completed(result.clone()).await;
if let Err(err) = self.runner.on_job_completed(result).await {
log::error!("Failed to handle job completed: {:?}", err);
}
}
pub(crate) async fn call_on_run_step(&self, event: RunStepEvent) {
self.plugin_driver.on_run_step(event.clone()).await;
if let Err(err) = self.runner.on_run_step(event).await {
log::error!("Failed to run step: {:?}", err);
}
}
pub(crate) async fn call_on_step_completed(&self, result: StepRunResult) {
self.plugin_driver.on_step_completed(result.clone()).await;
if let Err(err) = self.runner.on_step_completed(result.clone()).await {
log::error!("Failed to handle step completed: {:?}", err);
}
}
pub(crate) async fn call_on_before_run_step(&self, step: Step) -> Step {
let step = self.plugin_driver.on_before_run_step(step).await;
if let Ok(step) = self.runner.on_before_run_step(step.clone()).await {
step
} else {
step
}
}
pub(crate) async fn call_on_workflow_completed(&self, result: WorkflowRunResult) {
self
.plugin_driver
.on_workflow_completed(result.clone())
.await;
if let Err(err) = self.runner.on_workflow_completed(result).await {
log::error!("Failed to handle workflow completed: {:?}", err);
}
}
pub(crate) async fn call_on_log(&self, log: WorkflowLog) {
self.plugin_driver.on_log(log.clone()).await;
if let Err(err) = self.runner.on_log(log).await {
log::error!("Failed to handle log: {:?}", err);
}
}
pub fn builder() -> ExecutionContextBuilder {
ExecutionContextBuilder::new()
}
pub fn payload<P>(&self) -> Option<&P>
where
P: ContextPayloadExt + 'static,
{
self.payload.as_ref().and_then(|p| p.payload())
}
fn payload_string(&self) -> Option<String> {
self
.payload
.as_ref()
.map(|p| serde_json::to_string(&p).expect("Failed to serialize payload"))
}
}