astro_run/execution_context/
mod.rs1mod builder;
2mod condition_matcher;
3mod context_payload;
4
5pub use self::builder::ExecutionContextBuilder;
6use crate::{
7 AstroRunSignal, Condition, Context, Error, Job, JobId, JobRunResult, Result, RunResult,
8 RunStepEvent, Runner, SharedPluginDriver, Signal, SignalManager, Step, StepRunResult, StreamExt,
9 Workflow, WorkflowLog, WorkflowRunResult, WorkflowState, WorkflowStateEvent,
10};
11pub use context_payload::*;
12use std::sync::Arc;
13use tokio::time;
14
15#[derive(Clone)]
16pub struct ExecutionContext {
17 runner: Arc<Box<dyn Runner>>,
18 plugin_driver: SharedPluginDriver,
19 signal_manager: SignalManager,
20 condition_matcher: condition_matcher::ConditionMatcher,
21 payload: Option<ContextPayload>,
22}
23
24impl ExecutionContext {
25 pub async fn run(&self, step: Step) -> StepRunResult {
26 let step = self.call_on_before_run_step(step).await;
27
28 let step_id = step.id.clone();
29 let timeout = step.timeout;
30
31 let started_at = chrono::Utc::now();
32
33 let event = crate::RunStepEvent {
34 source: step.clone(),
35 trigger_event: self.condition_matcher.event.clone(),
36 payload: self.payload.clone(),
37 };
38
39 self.call_on_run_step(event.clone()).await;
40
41 let event = WorkflowStateEvent::StepStateUpdated {
43 id: step_id.clone(),
44 state: WorkflowState::Queued,
45 };
46
47 self.call_on_state_change(event).await;
48
49 let job_signal = self
51 .signal_manager
52 .get_signal(&step.id.job_id())
53 .expect("Missing job signal");
54
55 let signal = AstroRunSignal::new();
57
58 let mut receiver = match self
59 .runner
60 .run(Context {
61 id: step_id.clone(),
62 signal: signal.clone(),
63 command: step.into(),
64 event: self.condition_matcher.event.clone(),
65 payload: self.payload_string(),
66 })
67 .await
68 {
69 Ok(receiver) => receiver,
70 Err(err) => {
71 let completed_at = chrono::Utc::now();
72 let duration = completed_at - started_at;
73 log::error!(
74 "Step {:?} failed with error {:?} in {} seconds",
75 step_id,
76 err,
77 duration.num_seconds()
78 );
79
80 let event = WorkflowStateEvent::StepStateUpdated {
81 id: step_id.clone(),
82 state: WorkflowState::Failed,
83 };
84
85 self.call_on_state_change(event).await;
86
87 let result = StepRunResult {
88 id: step_id,
89 state: WorkflowState::Failed,
90 exit_code: Some(1),
91 started_at: Some(started_at),
92 completed_at: Some(completed_at),
93 };
94
95 self.call_on_step_completed(result.clone()).await;
96
97 return result;
98 }
99 };
100
101 let event = WorkflowStateEvent::StepStateUpdated {
102 id: step_id.clone(),
103 state: WorkflowState::InProgress,
104 };
105
106 self.call_on_state_change(event).await;
107
108 loop {
109 tokio::select! {
110 _ = time::sleep(timeout) => {
112 signal.timeout().ok();
114 }
115 s = job_signal.recv() => {
116 match s {
117 Signal::Cancel => {
118 signal.cancel().ok();
119 }
120 Signal::Timeout => {
121 signal.timeout().ok();
122 }
123 }
124 }
125 received = receiver.next() => {
126 if let Some(log) = received {
127 let log = WorkflowLog {
128 step_id: step_id.clone(),
129 log_type: log.log_type,
130 message: log.message,
131 time: chrono::Utc::now(),
132 };
133
134 self.call_on_log(log.clone()).await;
135 } else {
136 break;
137 }
138 }
139 }
140 }
141
142 let res = receiver
143 .result()
144 .ok_or(Error::internal_runtime_error(
146 "Missing result from runner. This is a bug in the runner implementation.",
147 ))
148 .unwrap();
149
150 let completed_at = chrono::Utc::now();
151 let duration = completed_at - started_at;
152
153 log::trace!(
154 "Step {:?} finished with result {:?} in {} seconds",
155 step_id,
156 res,
157 duration.num_seconds()
158 );
159
160 let res = match res {
161 RunResult::Succeeded => StepRunResult {
162 id: step_id.clone(),
163 state: WorkflowState::Succeeded,
164 exit_code: None,
165 started_at: Some(started_at),
166 completed_at: Some(completed_at),
167 },
168 RunResult::Failed { exit_code } => StepRunResult {
169 id: step_id.clone(),
170 state: WorkflowState::Failed,
171 exit_code: Some(exit_code),
172 started_at: Some(started_at),
173 completed_at: Some(completed_at),
174 },
175 RunResult::Cancelled => StepRunResult {
176 id: step_id.clone(),
177 state: WorkflowState::Cancelled,
178 exit_code: None,
179 started_at: Some(started_at),
180 completed_at: Some(completed_at),
181 },
182 };
183
184 let event = WorkflowStateEvent::StepStateUpdated {
185 id: step_id.clone(),
186 state: res.state.clone(),
187 };
188
189 self.call_on_state_change(event).await;
190
191 self.call_on_step_completed(res.clone()).await;
192
193 log::trace!("Step {:?} completed", step_id);
194
195 res
196 }
197
198 pub fn cancel_job(&self, job_id: &JobId) -> Result<()> {
199 self.signal_manager.cancel_job(job_id)
200 }
201
202 pub(crate) async fn is_match(&self, condition: &Condition) -> bool {
203 self.condition_matcher.is_match(condition).await
204 }
205
206 pub(crate) async fn call_on_run_workflow(&self, workflow: Workflow) {
207 let event = crate::RunWorkflowEvent {
208 source: workflow,
209 trigger_event: self.condition_matcher.event.clone(),
210 payload: self.payload.clone(),
211 };
212 self.plugin_driver.on_run_workflow(event.clone()).await;
213 if let Err(err) = self.runner.on_run_workflow(event).await {
214 log::error!("Failed to run workflow: {:?}", err);
215 }
216 }
217
218 pub(crate) async fn call_on_run_job(&self, job: Job) {
219 self
220 .signal_manager
221 .register_signal(job.id.clone(), AstroRunSignal::new());
222
223 let event = crate::RunJobEvent {
224 source: job,
225 trigger_event: self.condition_matcher.event.clone(),
226 payload: self.payload.clone(),
227 };
228
229 self.plugin_driver.on_run_job(event.clone()).await;
230 if let Err(err) = self.runner.on_run_job(event).await {
231 log::error!("Failed to run job: {:?}", err);
232 }
233 }
234
235 pub(crate) async fn call_on_state_change(&self, event: WorkflowStateEvent) {
236 self.plugin_driver.on_state_change(event.clone()).await;
237
238 if let Err(err) = self.runner.on_state_change(event).await {
239 log::error!("Failed to handle state change: {:?}", err);
240 }
241 }
242
243 pub(crate) async fn call_on_job_completed(&self, result: JobRunResult) {
244 self.signal_manager.unregister_signal(&result.id);
245 self.plugin_driver.on_job_completed(result.clone()).await;
246
247 if let Err(err) = self.runner.on_job_completed(result).await {
248 log::error!("Failed to handle job completed: {:?}", err);
249 }
250 }
251
252 pub(crate) async fn call_on_run_step(&self, event: RunStepEvent) {
253 self.plugin_driver.on_run_step(event.clone()).await;
254
255 if let Err(err) = self.runner.on_run_step(event).await {
256 log::error!("Failed to run step: {:?}", err);
257 }
258 }
259
260 pub(crate) async fn call_on_step_completed(&self, result: StepRunResult) {
261 self.plugin_driver.on_step_completed(result.clone()).await;
262
263 if let Err(err) = self.runner.on_step_completed(result.clone()).await {
264 log::error!("Failed to handle step completed: {:?}", err);
265 }
266 }
267
268 pub(crate) async fn call_on_before_run_step(&self, step: Step) -> Step {
269 let step = self.plugin_driver.on_before_run_step(step).await;
270
271 if let Ok(step) = self.runner.on_before_run_step(step.clone()).await {
272 step
273 } else {
274 step
275 }
276 }
277
278 pub(crate) async fn call_on_workflow_completed(&self, result: WorkflowRunResult) {
279 self
280 .plugin_driver
281 .on_workflow_completed(result.clone())
282 .await;
283
284 if let Err(err) = self.runner.on_workflow_completed(result).await {
285 log::error!("Failed to handle workflow completed: {:?}", err);
286 }
287 }
288
289 pub(crate) async fn call_on_log(&self, log: WorkflowLog) {
290 self.plugin_driver.on_log(log.clone()).await;
291
292 if let Err(err) = self.runner.on_log(log).await {
293 log::error!("Failed to handle log: {:?}", err);
294 }
295 }
296
297 pub fn builder() -> ExecutionContextBuilder {
298 ExecutionContextBuilder::new()
299 }
300
301 pub fn payload<P>(&self) -> Option<&P>
302 where
303 P: ContextPayloadExt + 'static,
304 {
305 self.payload.as_ref().and_then(|p| p.payload())
306 }
307
308 fn payload_string(&self) -> Option<String> {
309 self
310 .payload
311 .as_ref()
312 .map(|p| serde_json::to_string(&p).expect("Failed to serialize payload"))
313 }
314}