1use std::sync::Arc;
7
8use anyhow::{Context, Result};
9
10use crate::error::ValidatorError;
11use bollard::container::LogOutput;
12use bollard::exec::{CreateExecOptions, StartExecOptions, StartExecResults};
13use futures_util::StreamExt;
14use testcontainers::core::client::docker_client_instance;
15use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt};
16
17use crate::docker::{BollardDocker, DockerOperations};
18
19async fn collect_exec_output(
24 docker: &dyn DockerOperations,
25 exec_id: &str,
26 mut output: impl futures_util::Stream<Item = Result<LogOutput, bollard::errors::Error>> + Unpin,
27) -> Result<ValidationResult> {
28 let mut stdout = Vec::new();
29 let mut stderr = Vec::new();
30
31 while let Some(result) = output.next().await {
32 match result {
33 Ok(LogOutput::StdOut { message }) => {
34 stdout.extend_from_slice(&message);
35 }
36 Ok(LogOutput::StdErr { message }) => {
37 stderr.extend_from_slice(&message);
38 }
39 Ok(_) => {}
40 Err(e) => {
41 return Err(ValidatorError::ContainerExec {
42 message: format!("Output stream error: {e}"),
43 }
44 .into());
45 }
46 }
47 }
48
49 let inspect = docker.inspect_exec(exec_id).await?;
51 let exit_code = inspect.exit_code.unwrap_or(-1);
52
53 Ok(ValidationResult {
54 exit_code,
55 stdout: String::from_utf8_lossy(&stdout).to_string(),
56 stderr: String::from_utf8_lossy(&stderr).to_string(),
57 })
58}
59
60#[derive(Debug)]
62#[must_use]
63pub struct ValidationResult {
64 pub exit_code: i64,
66 pub stdout: String,
68 pub stderr: String,
70}
71
72pub struct ValidatorContainer {
77 _container: ContainerAsync<GenericImage>,
79 container_id: String,
80 docker: Arc<dyn DockerOperations>,
82}
83
84impl ValidatorContainer {
85 pub fn with_docker(
96 container: ContainerAsync<GenericImage>,
97 docker: Arc<dyn DockerOperations>,
98 ) -> Self {
99 let container_id = container.id().to_owned();
100 Self {
101 _container: container,
102 container_id,
103 docker,
104 }
105 }
106
107 pub async fn start_with_image(image: &str, validator_script: &[u8]) -> Result<Self> {
121 let (name, tag) = image.rsplit_once(':').unwrap_or((image, "latest"));
122
123 let container = GenericImage::new(name, tag)
124 .with_copy_to("/validate.sh", validator_script.to_vec())
125 .with_cmd(["sleep", "infinity"])
126 .start()
127 .await
128 .context("Failed to start container. Is Docker running?")?;
129
130 let container_id = container.id().to_owned();
131
132 let docker_client = docker_client_instance()
134 .await
135 .context("Failed to get Docker client")?;
136 let docker: Arc<dyn DockerOperations> = Arc::new(BollardDocker::new(docker_client));
137
138 Ok(Self {
139 _container: container,
140 container_id,
141 docker,
142 })
143 }
144
145 pub async fn start(validator_script: &[u8]) -> Result<Self> {
154 Self::start_with_image("alpine:3", validator_script).await
155 }
156
157 pub async fn exec_with_env(
169 &self,
170 setup: Option<&str>,
171 content: &str,
172 assertions: Option<&str>,
173 expect: Option<&str>,
174 ) -> Result<ValidationResult> {
175 let mut env_vars = vec![format!("VALIDATOR_CONTENT={content}")];
176 if let Some(s) = setup {
177 env_vars.push(format!("VALIDATOR_SETUP={s}"));
178 }
179 if let Some(a) = assertions {
180 env_vars.push(format!("VALIDATOR_ASSERTIONS={a}"));
181 }
182 if let Some(e) = expect {
183 env_vars.push(format!("VALIDATOR_EXPECT={e}"));
184 }
185
186 let exec = self
187 .docker
188 .create_exec(
189 &self.container_id,
190 CreateExecOptions {
191 attach_stdout: Some(true),
192 attach_stderr: Some(true),
193 env: Some(env_vars),
194 cmd: Some(vec!["sh".to_owned(), "/validate.sh".to_owned()]),
195 ..Default::default()
196 },
197 )
198 .await?;
199
200 let exec_id = exec.id;
201
202 let start_result = self
203 .docker
204 .start_exec(&exec_id, Some(StartExecOptions::default()))
205 .await?;
206
207 let StartExecResults::Attached { output, .. } = start_result else {
208 return Err(ValidatorError::ContainerExec {
209 message: "Exec should be attached but wasn't".into(),
210 }
211 .into());
212 };
213
214 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
215 }
216
217 #[must_use]
219 pub fn id(&self) -> &str {
220 &self.container_id
221 }
222
223 pub async fn exec_raw(&self, cmd: &[&str]) -> Result<ValidationResult> {
236 let cmd_owned: Vec<String> = cmd.iter().map(|s| (*s).to_owned()).collect();
237
238 let exec = self
239 .docker
240 .create_exec(
241 &self.container_id,
242 CreateExecOptions {
243 attach_stdout: Some(true),
244 attach_stderr: Some(true),
245 cmd: Some(cmd_owned),
246 ..Default::default()
247 },
248 )
249 .await?;
250
251 let exec_id = exec.id;
252
253 let start_result = self
254 .docker
255 .start_exec(&exec_id, Some(StartExecOptions::default()))
256 .await?;
257
258 let StartExecResults::Attached { output, .. } = start_result else {
259 return Err(ValidatorError::ContainerExec {
260 message: "Exec should be attached but wasn't".into(),
261 }
262 .into());
263 };
264
265 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
266 }
267
268 pub async fn exec_with_stdin(
282 &self,
283 cmd: &[&str],
284 stdin_content: &str,
285 ) -> Result<ValidationResult> {
286 use tokio::io::AsyncWriteExt;
287
288 let cmd_owned: Vec<String> = cmd.iter().map(|s| (*s).to_owned()).collect();
289
290 let exec = self
291 .docker
292 .create_exec(
293 &self.container_id,
294 CreateExecOptions {
295 attach_stdin: Some(true),
296 attach_stdout: Some(true),
297 attach_stderr: Some(true),
298 cmd: Some(cmd_owned),
299 ..Default::default()
300 },
301 )
302 .await?;
303
304 let exec_id = exec.id;
305
306 let start_result = self
307 .docker
308 .start_exec(&exec_id, Some(StartExecOptions::default()))
309 .await?;
310
311 let StartExecResults::Attached { output, mut input } = start_result else {
312 return Err(ValidatorError::ContainerExec {
313 message: "Exec should be attached but wasn't".into(),
314 }
315 .into());
316 };
317
318 input
320 .write_all(stdin_content.as_bytes())
321 .await
322 .context("Failed to write to stdin")?;
323 input.shutdown().await.context("Failed to close stdin")?;
324
325 collect_exec_output(self.docker.as_ref(), &exec_id, output).await
326 }
327
328 pub async fn start_raw(image: &str) -> Result<Self> {
341 Self::start_raw_with_mount(image, None).await
342 }
343
344 pub async fn start_raw_with_mount(
358 image: &str,
359 mount: Option<(&std::path::Path, &str)>,
360 ) -> Result<Self> {
361 use testcontainers::core::Mount;
362
363 let (name, tag) = image.rsplit_once(':').unwrap_or((image, "latest"));
364
365 let base_image = GenericImage::new(name, tag).with_cmd(["sleep", "infinity"]);
366
367 let container = if let Some((host_path, container_path)) = mount {
368 let host_str = host_path.to_string_lossy().to_string();
369 base_image
370 .with_mount(Mount::bind_mount(host_str, container_path))
371 .start()
372 .await
373 .context("Failed to start container with mount. Is Docker running?")?
374 } else {
375 base_image
376 .start()
377 .await
378 .context("Failed to start container. Is Docker running?")?
379 };
380
381 let container_id = container.id().to_owned();
382
383 let docker_client = docker_client_instance()
385 .await
386 .context("Failed to get Docker client")?;
387 let docker: Arc<dyn DockerOperations> = Arc::new(BollardDocker::new(docker_client));
388
389 Ok(Self {
390 _container: container,
391 container_id,
392 docker,
393 })
394 }
395}