use std::{future, time::Duration};
use async_trait::async_trait;
use bollard::container::{LogOutput, LogsOptions};
use futures::StreamExt;
use crate::container::Container;
use crate::probe::Probe;
use crate::TestError;
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MessageSource {
StdErr,
StdOut,
Any,
}
#[derive(Debug, Clone)]
pub struct MessageProbe {
message: String,
times: usize,
timeout: Duration,
source: MessageSource,
}
#[allow(dead_code)]
impl MessageProbe {
pub fn message(&self) -> &str {
&self.message
}
pub fn times(&self) -> usize {
self.times
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub fn source(&self) -> MessageSource {
self.source
}
pub fn new(message: String) -> Self {
Self {
message,
timeout: Duration::from_secs(60),
times: 1,
source: MessageSource::StdOut,
}
}
pub fn builder<T: Into<String>>(message: T) -> MessageProbeBuilder {
MessageProbeBuilder::new(message)
}
}
#[derive(Debug, Clone)]
pub struct MessageProbeBuilder {
probe: MessageProbe,
}
impl MessageProbeBuilder {
fn new<T: Into<String>>(message: T) -> Self {
Self {
probe: MessageProbe::new(message.into()),
}
}
pub fn times(self, times: usize) -> Self {
Self {
probe: MessageProbe {
times,
..self.probe
},
}
}
pub fn source(self, source: MessageSource) -> Self {
Self {
probe: MessageProbe {
source,
..self.probe
},
}
}
pub fn timeout(self, timeout: Duration) -> Self {
Self {
probe: MessageProbe {
timeout,
..self.probe
},
}
}
pub fn build(self) -> MessageProbe {
self.probe
}
}
#[async_trait(?Send)]
impl Probe for MessageProbe {
async fn probe(&self, container: &Container) -> Result<(), TestError> {
MessageProbe::probe(self, container).await
}
}
impl MessageProbe {
async fn probe(&self, container: &Container) -> Result<(), TestError> {
let mut log_options = LogsOptions::<&str> {
follow: true,
..Default::default()
};
match self.source {
MessageSource::StdOut => log_options.stdout = true,
MessageSource::StdErr => log_options.stderr = true,
MessageSource::Any => {
log_options.stdout = true;
log_options.stderr = true;
}
};
let log_options = Some(log_options);
let logs = container.docker.logs(container.id(), log_options);
let task = async {
logs.take_while(|chunk| future::ready(!chunk.is_err()))
.filter(|chunk| {
match chunk {
Ok(chunk) => {
let content = match chunk {
LogOutput::StdErr { message } => Some(message),
LogOutput::StdOut { message } => Some(message),
LogOutput::StdIn { message: _ } => None,
LogOutput::Console { message: _ } => None,
};
match content {
Some(content)
if String::from_utf8(content.to_vec())
.unwrap()
.contains(&self.message) =>
{
futures::future::ready(true)
}
_ => futures::future::ready(false),
}
}
_ => futures::future::ready(false),
}
})
.take(self.times)
.collect::<Vec<_>>()
.await
};
match tokio::time::timeout(self.timeout, task).await {
Ok(values) => {
if values.len() == self.times {
Ok(())
} else {
let name = container.name();
let err = format!("Container `{name}` ended log before message triggered");
Err(TestError::Startup(err))
}
}
Err(_) => {
log::warn!("Awaiting container message timed out");
Err(TestError::Startup(
"Awaiting container message timed out".to_string(),
))
}
}
}
}