1use std::io;
2
3use futures::future::*;
4use http_body_util::BodyExt;
5use log::info;
6use octocrab::{params, Octocrab};
7use tokio::io::AsyncReadExt;
8
9use super::*;
10
11struct DockerContainerInstance {
14 id: String,
15 keep: bool,
16}
17
18impl Drop for DockerContainerInstance {
19 fn drop(&mut self) {
20 if !self.keep {
21 let mut cmd = std::process::Command::new("docker");
23 cmd.stdin(Stdio::null());
24 cmd.stdout(Stdio::null());
25 cmd.stderr(Stdio::null());
26 cmd.args(&["rm", "-f", &self.id]);
27 info!("Running {:?}", cmd);
28 let _ = cmd.status();
29 }
30 }
31}
32
33impl DockerContainerInstance {
34 fn new(id: String) -> DockerContainerInstance {
35 DockerContainerInstance {
36 id: id,
37 keep: false,
38 }
39 }
40 fn set_keep(&mut self, keep: bool) {
41 self.keep = keep;
42 }
43 fn id(&self) -> &str {
44 self.id.as_str()
45 }
46}
47
48pub type OutputHandler = Box<dyn FnMut(&[u8])>;
50
51pub struct LocalDockerBackend {
54 container_instances: Vec<DockerContainerInstance>,
56 stdout_handler: OutputHandler,
57 stderr_handler: OutputHandler,
58}
59
60impl RunnerBackend for LocalDockerBackend {
61 fn run<'a, F: FnMut(&[u8])>(
62 &'a mut self,
63 container: ContainerId,
64 command: &'a str,
65 stdout_filter: &'a mut F,
66 ) -> Pin<Box<dyn Future<Output = i32> + 'a>> {
67 Box::pin(async move {
68 let mut cmd = tokio::process::Command::new("docker");
69 cmd.stdout(Stdio::piped());
70 cmd.stderr(Stdio::piped());
71 cmd.args(&[
72 "exec",
73 "--tty",
74 self.container_instances[container.0].id(),
75 command,
76 ]);
77 info!("Running {:?}", cmd);
78 let mut child = match cmd.spawn() {
79 Ok(child) => child,
80 Err(e) => panic!("Failed to docker exec: {}", e),
81 };
82 let mut stdout = child.stdout.take().unwrap();
83 let mut stderr = child.stderr.take().unwrap();
84 let mut stdout_open = true;
85 let mut stderr_open = true;
86 let mut stdout_buf = Vec::new();
87 let mut stderr_buf = Vec::new();
88 loop {
89 tokio::select! {
90 r = stdout.read_buf(&mut stdout_buf), if stdout_open => {
91 if r.expect("reading stdout") > 0 {
92 stdout_filter(&stdout_buf);
93 (self.stdout_handler)(&stdout_buf);
94 stdout_buf.clear();
95 } else {
96 stdout_open = false;
97 }
98 },
99 r = stderr.read_buf(&mut stderr_buf), if stderr_open => {
100 if r.expect("reading stderr") > 0 {
101 (self.stderr_handler)(&stderr_buf);
102 stderr_buf.clear();
103 } else {
104 stderr_open = false;
105 }
106 },
107 else => break,
108 }
109 }
110 child
111 .wait()
112 .await
113 .expect("Should have waited successfully")
114 .code()
115 .unwrap_or(-1)
116 })
117 }
118}
119
120impl LocalDockerBackend {
121 pub fn new(
123 job_runner: &JobRunner,
124 github_dir: &Path,
125 stdout_handler: OutputHandler,
126 stderr_handler: OutputHandler,
127 ) -> io::Result<LocalDockerBackend> {
128 let bind_arg = format!(
129 "type=bind,src={},dst=/github",
130 github_dir.to_str().expect("Tempdir has non-UTF8 path?")
131 );
132 let mut container_instances = Vec::new();
136 let keep = std::env::var_os("KEEP_CONTAINERS").is_some();
137 for image in job_runner.container_images() {
138 let mut cmd = std::process::Command::new("docker");
139 cmd.stdin(Stdio::null());
140 cmd.args(&[
141 "run",
142 "--detach=true",
143 "--tmpfs",
144 "/tmp:exec",
145 "--init",
146 "--cap-add",
147 "LINUX_IMMUTABLE",
148 "--mount",
149 &bind_arg,
150 "--entrypoint",
152 "sleep",
153 &image.0,
154 "1000000",
155 ]);
156 info!("Running {:?}", cmd);
157 let output = cmd.output()?;
158 if !output.status.success() {
159 return Err(io::Error::new(
160 io::ErrorKind::Other,
161 format!(
162 "Failed to create docker container for {}:\n{}",
163 image.0,
164 String::from_utf8_lossy(&output.stderr)
165 ),
166 ));
167 }
168 let mut instance = DockerContainerInstance::new(
169 std::str::from_utf8(&output.stdout)
170 .unwrap()
171 .trim()
172 .to_string(),
173 );
174 instance.set_keep(keep);
175 container_instances.push(instance);
176 }
177 Ok(LocalDockerBackend {
178 container_instances: container_instances,
179 stdout_handler: stdout_handler,
180 stderr_handler: stderr_handler,
181 })
182 }
183
184 fn run_container_setup_command(&self, command: Vec<String>) -> io::Result<()> {
185 for instance in self.container_instances.iter() {
186 let mut cmd = std::process::Command::new("docker");
187 cmd.stdin(Stdio::null());
188 cmd.stdout(Stdio::null());
189 cmd.stderr(Stdio::null());
190 cmd.args(&["exec", "--tty", instance.id()]);
191 cmd.args(command.iter());
192 info!("Running {:?}", cmd);
193 if !cmd.status()?.success() {
194 return Err(io::Error::new(io::ErrorKind::Other, "Command failed"));
195 }
196 }
197 Ok(())
198 }
199}
200
201pub fn zero_access_token() -> &'static str {
205 concat!("ghp_", "7EMsGDj8ZsOEJxDSXBoC9XsEjFgMWw2NvXQk")
210}
211
212pub async fn get_workflow(
216 github: &Octocrab,
217 owner: &str,
218 repo: &str,
219 sha: &str,
220 workflow: &str,
221) -> (Vec<u8>, String) {
222 if workflow.starts_with('/') {
223 let data = fs::read(&workflow).unwrap();
224 (data, format!(".github/workflows/{}", workflow))
225 } else {
226 let commit: params::repos::Commitish = sha.to_string().into();
227 let response = github
228 .repos(owner, repo)
229 .raw_file(commit, workflow)
230 .await
231 .unwrap()
232 .into_body();
233 let bytes = response.collect().await.unwrap().to_bytes();
234 info!("Fetched {}, {} bytes", workflow, bytes.len());
235 (bytes.to_vec(), workflow.to_string())
236 }
237}
238
239#[derive(Clone, Eq, PartialEq)]
241pub enum WorkflowResult {
242 AllStepsPassed,
244 StepFailed {
246 step_name: Option<String>,
248 exit_code: i32,
250 },
251}
252
253pub struct LocalDockerOptions {
255 pub access_token: String,
258 pub stdout_handler: Box<dyn FnMut(&[u8])>,
260 pub stderr_handler: Box<dyn FnMut(&[u8])>,
262 pub container_setup_commands: Vec<Vec<String>>,
264 pub modify_step_command: Box<dyn FnMut(&mut Vec<String>)>,
266 pub before_temp_dir_removal_hook: Box<dyn FnOnce(&Path)>,
269}
270
271impl Default for LocalDockerOptions {
272 fn default() -> LocalDockerOptions {
273 LocalDockerOptions {
274 access_token: zero_access_token().to_string(),
275 stdout_handler: Box::new(|_| ()),
276 stderr_handler: Box::new(|_| ()),
277 container_setup_commands: Vec::new(),
278 modify_step_command: Box::new(|_| ()),
279 before_temp_dir_removal_hook: Box::new(|_| ()),
280 }
281 }
282}
283
284pub async fn run_workflow_with_local_backend(
289 owner: &str,
290 repo: &str,
291 sha: &str,
292 workflow: &str,
293 job_name: &str,
294 images: &DockerImageMapping,
295 mut options: LocalDockerOptions,
296) -> WorkflowResult {
297 let github = Octocrab::default();
298 let (workflow_data, workflow_repo_path) =
299 get_workflow(&github, owner, repo, sha, workflow).await;
300
301 let temp_dir = tempfile::tempdir().expect("Can't create tempdir");
302
303 let ctx = RunnerContext {
304 github: github,
305 owner: owner.to_string(),
306 repo: repo.to_string(),
307 commit_sha: hex::decode(sha.as_bytes()).unwrap(),
308 commit_ref: None,
310 global_dir_host: temp_dir.path().to_path_buf(),
311 workflow_repo_path: workflow_repo_path,
312 run_id: 1u64.into(),
313 run_number: 1,
314 job_id: 1u64.into(),
315 actor: "gha-runner".to_string(),
316 token: options.access_token,
317 override_env: Vec::new(),
318 };
319
320 let runner = Runner::new(ctx, &workflow_data).await.unwrap();
321
322 let job_description = if let Some(jd) = runner
323 .job_descriptions()
324 .iter()
325 .find(|jd| jd.name() == job_name)
326 {
327 jd
328 } else {
329 panic!("Can't find job '{}'", job_name);
330 };
331 let mut job_runner = runner
332 .job_runner(job_description, images)
333 .await
334 .expect("Failed to create JobRunner");
335 let mut backend = LocalDockerBackend::new(
336 &job_runner,
337 temp_dir.path(),
338 options.stdout_handler,
339 options.stderr_handler,
340 )
341 .expect("Failed to create docker backend");
342 for cmd in options.container_setup_commands {
343 backend.run_container_setup_command(cmd).unwrap();
344 }
345
346 loop {
347 let step = job_runner.next_step_index();
348 if step >= job_runner.step_count() {
349 break;
350 }
351 let step_name = job_runner
352 .next_step_name()
353 .expect("Failed to get step name");
354 info!(
355 "Running step {} {}",
356 step.0,
357 step_name.as_deref().unwrap_or("<anonymous>")
358 );
359 let exit_code = job_runner
360 .run_next_step(&mut options.modify_step_command, &mut backend)
361 .await
362 .expect("Failed to run step");
363 if exit_code != 0 {
364 (options.before_temp_dir_removal_hook)(temp_dir.path());
365 return WorkflowResult::StepFailed {
366 step_name: step_name,
367 exit_code: exit_code,
368 };
369 }
370 }
371 (options.before_temp_dir_removal_hook)(temp_dir.path());
372 WorkflowResult::AllStepsPassed
373}