astro_runner/executors/
host.rs1use crate::{command::Command, executors::Executor, metadata::Metadata};
2use astro_run::{Context, Result, StreamSender, TriggerEvent};
3use std::path::PathBuf;
4use tokio::fs;
5
6pub struct HostExecutor {
7 pub working_directory: PathBuf,
8}
9
10#[astro_run::async_trait]
11impl Executor for HostExecutor {
12 async fn execute(
13 &self,
14 ctx: Context,
15 sender: StreamSender,
16 event: Option<TriggerEvent>,
17 ) -> Result<()> {
18 let mut builder = Metadata::builder()
20 .runner_working_directory(self.working_directory.clone())
21 .step_id(ctx.command.id.clone());
22
23 if let Some(event) = event {
24 builder = builder.repository(event.repo_owner, event.repo_name);
25 }
26
27 let metadata = builder.build();
28
29 let mut command = Self::into_command(&ctx, &metadata);
31
32 let is_completed = ctx.signal.is_cancelled() || ctx.signal.is_timeout();
33
34 if !is_completed {
35 fs::create_dir_all(&metadata.job_data_directory).await?;
37
38 tokio::select! {
39 Err(err) = command.run(sender.clone()) => {
41 log::error!("Step run error: {}", err);
42 }
43 signal = ctx.signal.recv() => {
44 log::trace!("Step run received signal: {:?}", signal);
46 if let astro_run::Signal::Cancel = signal {
47 sender.cancelled();
48 } else {
49 sender.timeout();
50 }
51 }
52 }
53
54 fs::remove_dir_all(&metadata.job_data_directory).await?;
56
57 log::trace!("Step run finished");
58 } else {
59 log::trace!("Step run has been completed before it started");
60 }
61
62 Ok(())
63 }
64}
65
66impl HostExecutor {
67 fn into_command(ctx: &Context, metadata: &Metadata) -> Command {
68 let mut command = Command::new(ctx.command.run.clone());
69
70 command.dir(&metadata.job_data_directory);
71
72 for (key, env) in &ctx.command.environments {
73 command.env(key, env.to_string());
74 }
75
76 command
77 }
78}