use std::{fmt, io::BufRead, net::IpAddr, sync::Arc};
use crate::{
core::{env, error::Result, ports::Ports, ContainerPort, ExecCommand},
ContainerAsync, Image,
};
pub(super) mod exec;
mod sync_reader;
pub struct Container<I: Image> {
inner: Option<ActiveContainer<I>>,
}
struct ActiveContainer<I: Image> {
runtime: Arc<tokio::runtime::Runtime>,
async_impl: ContainerAsync<I>,
}
impl<I> fmt::Debug for Container<I>
where
I: fmt::Debug + Image,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Container")
.field("id", &self.id())
.field("image", &self.image())
.field("ports", &self.ports())
.field("command", &self.async_impl().docker_client.config.command())
.finish()
}
}
impl<I: Image> Container<I> {
pub(crate) fn new(
runtime: Arc<tokio::runtime::Runtime>,
async_impl: ContainerAsync<I>,
) -> Self {
Self {
inner: Some(ActiveContainer {
runtime,
async_impl,
}),
}
}
}
impl<I> Container<I>
where
I: Image,
{
pub fn id(&self) -> &str {
self.async_impl().id()
}
pub fn image(&self) -> &I {
self.async_impl().image()
}
pub fn ports(&self) -> Result<Ports> {
self.rt().block_on(self.async_impl().ports())
}
pub fn get_host_port_ipv4(&self, internal_port: impl Into<ContainerPort>) -> Result<u16> {
self.rt()
.block_on(self.async_impl().get_host_port_ipv4(internal_port))
}
pub fn get_host_port_ipv6(&self, internal_port: impl Into<ContainerPort>) -> Result<u16> {
self.rt()
.block_on(self.async_impl().get_host_port_ipv6(internal_port))
}
pub fn get_bridge_ip_address(&self) -> Result<IpAddr> {
self.rt()
.block_on(self.async_impl().get_bridge_ip_address())
}
pub fn get_host(&self) -> Result<url::Host> {
self.rt().block_on(self.async_impl().get_host())
}
pub fn exec(&self, cmd: ExecCommand) -> Result<exec::SyncExecResult> {
let async_exec = self.rt().block_on(self.async_impl().exec(cmd))?;
Ok(exec::SyncExecResult {
inner: async_exec,
runtime: self.rt().clone(),
})
}
pub fn stop(&self) -> Result<()> {
self.rt().block_on(self.async_impl().stop())
}
pub fn start(&self) -> Result<()> {
self.rt().block_on(self.async_impl().start())
}
pub fn rm(mut self) -> Result<()> {
if let Some(active) = self.inner.take() {
active.runtime.block_on(active.async_impl.rm())?;
}
Ok(())
}
pub fn stdout(&self, follow: bool) -> Box<dyn BufRead + Send> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stdout(follow),
self.rt().clone(),
))
}
pub fn stderr(&self, follow: bool) -> Box<dyn BufRead + Send> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stderr(follow),
self.rt().clone(),
))
}
pub fn stdout_to_vec(&self) -> Result<Vec<u8>> {
let mut stdout = Vec::new();
self.stdout(false).read_to_end(&mut stdout)?;
Ok(stdout)
}
pub fn stderr_to_vec(&self) -> Result<Vec<u8>> {
let mut stderr = Vec::new();
self.stderr(false).read_to_end(&mut stderr)?;
Ok(stderr)
}
fn rt(&self) -> &Arc<tokio::runtime::Runtime> {
&self.inner.as_ref().unwrap().runtime
}
fn async_impl(&self) -> &ContainerAsync<I> {
&self.inner.as_ref().unwrap().async_impl
}
}
impl<I: Image> Drop for Container<I> {
fn drop(&mut self) {
if let Some(active) = self.inner.take() {
active.runtime.block_on(async {
match active.async_impl.docker_client.config.command() {
env::Command::Remove => {
if let Err(e) = active.async_impl.rm().await {
log::error!("Failed to remove container on drop: {}", e);
}
}
env::Command::Keep => {}
}
});
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{core::WaitFor, runners::SyncRunner, GenericImage};
#[derive(Debug, Default)]
pub struct HelloWorld;
impl Image for HelloWorld {
fn name(&self) -> &str {
"hello-world"
}
fn tag(&self) -> &str {
"latest"
}
fn ready_conditions(&self) -> Vec<WaitFor> {
vec![WaitFor::message_on_stdout("Hello from Docker!")]
}
}
#[test]
fn container_should_be_send_and_sync() {
assert_send_and_sync::<Container<HelloWorld>>();
}
fn assert_send_and_sync<T: Send + Sync>() {}
#[test]
fn sync_logs_are_accessible() -> anyhow::Result<()> {
let image = GenericImage::new("testcontainers/helloworld", "1.1.0");
let container = image.start()?;
let stderr = container.stderr(true);
let log_follower_thread = std::thread::spawn(move || {
let mut stderr_lines = stderr.lines();
let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next().expect("line must exist")?;
if !line.contains(expected_message) {
anyhow::bail!(
"Log message ('{}') doesn't contain expected message ('{}')",
line,
expected_message
);
}
}
Ok(())
});
log_follower_thread
.join()
.map_err(|_| anyhow::anyhow!("failed to join log follower thread"))??;
container.stop()?;
let stdout = String::from_utf8(container.stdout_to_vec()?)?;
assert_eq!(stdout, "");
let stderr = String::from_utf8(container.stderr_to_vec()?)?;
assert_eq!(
stderr.lines().count(),
6,
"unexpected stderr size: {}",
stderr
);
Ok(())
}
}