1use std::sync::Arc;
7use tracing::{debug, trace};
8
9use anyhow::{Context, Result};
10
11use crate::error::ValidatorError;
12use bollard::container::LogOutput;
13use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults};
14use futures_util::StreamExt;
15use testcontainers::core::client::docker_client_instance;
16use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt};
17
18use crate::docker::{BollardDocker, DockerOperations};
19
20async fn collect_exec_output(
25 docker: &dyn DockerOperations,
26 exec_id: &str,
27 mut output: impl futures_util::Stream<Item = Result<LogOutput, bollard::errors::Error>> + Unpin,
28) -> Result<ValidationResult> {
29 let mut stdout = Vec::new();
30 let mut stderr = Vec::new();
31
32 while let Some(result) = output.next().await {
33 match result {
34 Ok(LogOutput::StdOut { message }) => {
35 stdout.extend_from_slice(&message);
36 }
37 Ok(LogOutput::StdErr { message }) => {
38 stderr.extend_from_slice(&message);
39 }
40 Ok(_) => {}
41 Err(e) => {
42 return Err(ValidatorError::ContainerExec {
43 message: format!("Output stream error: {e}"),
44 }
45 .into());
46 }
47 }
48 }
49
50 let inspect = docker.inspect_exec(exec_id).await?;
52 let exit_code = inspect.exit_code.unwrap_or(-1);
53
54 Ok(ValidationResult {
55 exit_code,
56 stdout: String::from_utf8_lossy(&stdout).to_string(),
57 stderr: String::from_utf8_lossy(&stderr).to_string(),
58 })
59}
60
61#[derive(Debug)]
63#[must_use]
64pub struct ValidationResult {
65 pub exit_code: i64,
67 pub stdout: String,
69 pub stderr: String,
71}
72
73pub struct ValidatorContainer {
78 _container: ContainerAsync<GenericImage>,
80 container_id: String,
81 docker: Arc<dyn DockerOperations>,
83}
84
85impl ValidatorContainer {
86 pub fn with_docker(
97 container: ContainerAsync<GenericImage>,
98 docker: Arc<dyn DockerOperations>,
99 ) -> Self {
100 let container_id = container.id().to_owned();
101 Self {
102 _container: container,
103 container_id,
104 docker,
105 }
106 }
107
108 pub async fn start_with_image(image: &str, validator_script: &[u8]) -> Result<Self> {
122 debug!(image = %image, "Starting container");
123 let (name, tag) = image.rsplit_once(':').unwrap_or((image, "latest"));
124
125 let container = GenericImage::new(name, tag)
126 .with_copy_to("/validate.sh", validator_script.to_vec())
127 .with_cmd(["sleep", "infinity"])
128 .start()
129 .await
130 .context("Failed to start container. Is Docker running?")?;
131
132 let container_id = container.id().to_owned();
133 let short_id: String = container_id.chars().take(12).collect();
135 debug!(container_id = %short_id, "Container ready");
136
137 let docker_client = docker_client_instance()
139 .await
140 .context("Failed to get Docker client")?;
141 let docker: Arc<dyn DockerOperations> = Arc::new(BollardDocker::new(docker_client));
142
143 Ok(Self {
144 _container: container,
145 container_id,
146 docker,
147 })
148 }
149
150 pub async fn start(validator_script: &[u8]) -> Result<Self> {
159 Self::start_with_image("alpine:3", validator_script).await
160 }
161
162 pub async fn exec_with_env(
174 &self,
175 setup: Option<&str>,
176 content: &str,
177 assertions: Option<&str>,
178 expect: Option<&str>,
179 ) -> Result<ValidationResult> {
180 debug!("Executing with env vars");
181 trace!(content = %content, setup = ?setup, assertions = ?assertions, expect = ?expect, "Exec environment");
182 let mut env_vars = vec![format!("VALIDATOR_CONTENT={content}")];
183 if let Some(s) = setup {
184 env_vars.push(format!("VALIDATOR_SETUP={s}"));
185 }
186 if let Some(a) = assertions {
187 env_vars.push(format!("VALIDATOR_ASSERTIONS={a}"));
188 }
189 if let Some(e) = expect {
190 env_vars.push(format!("VALIDATOR_EXPECT={e}"));
191 }
192
193 let exec = self
194 .docker
195 .create_exec(
196 &self.container_id,
197 CreateExecOptions {
198 attach_stdout: Some(true),
199 attach_stderr: Some(true),
200 env: Some(env_vars),
201 cmd: Some(vec!["sh".to_owned(), "/validate.sh".to_owned()]),
202 ..Default::default()
203 },
204 )
205 .await?;
206
207 let exec_id = exec.id;
208
209 let start_result = self
210 .docker
211 .start_exec(&exec_id, Some(StartExecOptions::default()))
212 .await?;
213
214 let StartExecResults::Attached { output, .. } = start_result else {
215 return Err(ValidatorError::ContainerExec {
216 message: "Exec should be attached but wasn't".into(),
217 }
218 .into());
219 };
220
221 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
222 }
223
224 #[must_use]
226 pub fn id(&self) -> &str {
227 &self.container_id
228 }
229
230 pub async fn exec_raw(&self, cmd: &[&str]) -> Result<ValidationResult> {
243 debug!(command = ?cmd, "Executing raw command");
244 let cmd_owned: Vec<String> = cmd.iter().map(|s| (*s).to_owned()).collect();
245
246 let exec = self
247 .docker
248 .create_exec(
249 &self.container_id,
250 CreateExecOptions {
251 attach_stdout: Some(true),
252 attach_stderr: Some(true),
253 cmd: Some(cmd_owned),
254 ..Default::default()
255 },
256 )
257 .await?;
258
259 let exec_id = exec.id;
260
261 let start_result = self
262 .docker
263 .start_exec(&exec_id, Some(StartExecOptions::default()))
264 .await?;
265
266 let StartExecResults::Attached { output, .. } = start_result else {
267 return Err(ValidatorError::ContainerExec {
268 message: "Exec should be attached but wasn't".into(),
269 }
270 .into());
271 };
272
273 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
274 }
275
276 pub async fn exec_with_stdin(
290 &self,
291 cmd: &[&str],
292 stdin_content: &str,
293 ) -> Result<ValidationResult> {
294 use tokio::io::AsyncWriteExt;
295
296 debug!(command = ?cmd, "Executing with stdin");
297 trace!(stdin = %stdin_content, "Stdin content");
298 let cmd_owned: Vec<String> = cmd.iter().map(|s| (*s).to_owned()).collect();
299
300 let exec = self
301 .docker
302 .create_exec(
303 &self.container_id,
304 CreateExecOptions {
305 attach_stdin: Some(true),
306 attach_stdout: Some(true),
307 attach_stderr: Some(true),
308 cmd: Some(cmd_owned),
309 ..Default::default()
310 },
311 )
312 .await?;
313
314 let exec_id = exec.id;
315
316 let start_result = self
317 .docker
318 .start_exec(&exec_id, Some(StartExecOptions::default()))
319 .await?;
320
321 let StartExecResults::Attached { output, mut input } = start_result else {
322 return Err(ValidatorError::ContainerExec {
323 message: "Exec should be attached but wasn't".into(),
324 }
325 .into());
326 };
327
328 input
330 .write_all(stdin_content.as_bytes())
331 .await
332 .context("Failed to write to stdin")?;
333 input.shutdown().await.context("Failed to close stdin")?;
334
335 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
336 }
337
338 pub async fn start_raw(image: &str) -> Result<Self> {
351 Self::start_raw_with_mount(image, None).await
352 }
353
354 pub async fn start_raw_with_mount(
368 image: &str,
369 mount: Option<(&std::path::Path, &str)>,
370 ) -> Result<Self> {
371 use testcontainers::core::Mount;
372
373 debug!(image = %image, mount = ?mount.map(|(p, c)| (p.display().to_string(), c)), "Starting raw container");
374 let (name, tag) = image.rsplit_once(':').unwrap_or((image, "latest"));
375
376 let base_image = GenericImage::new(name, tag).with_cmd(["sleep", "infinity"]);
377
378 let container = if let Some((host_path, container_path)) = mount {
379 let host_str = host_path.to_string_lossy().to_string();
380 base_image
381 .with_mount(Mount::bind_mount(host_str, container_path))
382 .start()
383 .await
384 .context("Failed to start container with mount. Is Docker running?")?
385 } else {
386 base_image
387 .start()
388 .await
389 .context("Failed to start container. Is Docker running?")?
390 };
391
392 let container_id = container.id().to_owned();
393 let short_id: String = container_id.chars().take(12).collect();
395 debug!(container_id = %short_id, "Container ready");
396
397 let docker_client = docker_client_instance()
399 .await
400 .context("Failed to get Docker client")?;
401 let docker: Arc<dyn DockerOperations> = Arc::new(BollardDocker::new(docker_client));
402
403 Ok(Self {
404 _container: container,
405 container_id,
406 docker,
407 })
408 }
409}