use crate::metrics::{CpuMetrics, MetricsLog};
use bollard::container::{ListContainersOptions, Stats, StatsOptions};
use bollard::Docker;
use chrono::Utc;
use futures_util::stream::StreamExt;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use sysinfo::System;
use tracing::{debug, error, warn};
pub async fn keep_logging(container_names: Vec<String>, metrics_log: Arc<Mutex<MetricsLog>>) {
let docker = match Docker::connect_with_defaults() {
Ok(docker) => {
debug!("Successfully connected to Docker");
docker
}
Err(e) => {
error!("Failed to connect to Docker: {}", e);
return;
}
};
loop {
let mut filter = HashMap::new();
filter.insert(String::from("status"), vec![String::from("running")]);
filter.insert(String::from("name"), container_names.clone());
debug!("Listing containers with filter: {:?}", filter);
let container_list = docker
.list_containers(Some(ListContainersOptions {
all: true,
filters: filter,
..Default::default()
}))
.await;
let containers = match container_list {
Ok(containers) => {
debug!(
"Successfully listed containers. Count: {}",
containers.len()
);
containers
}
Err(e) => {
error!("Failed to list containers: {}", e);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};
if containers.is_empty() {
warn!("No running containers");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
let mut sys = System::new_all();
sys.refresh_cpu_all();
let core_count = num_cpus::get();
for container in containers {
if let Some(container_id) = container.id.as_ref() {
let container_name_with_slash = container
.names
.clone()
.and_then(|names| names.first().cloned())
.unwrap_or_else(|| "unknown".to_string());
let container_name = &container_name_with_slash[1..container_name_with_slash.len()];
let docker_stats = docker
.stats(
container_id,
Some(StatsOptions {
stream: false,
..Default::default()
}),
)
.next()
.await;
match docker_stats {
Some(Ok(stats)) => {
let cpu_metrics = calculate_cpu_metrics(
container_id,
container_name.to_string(),
&stats,
&core_count,
);
debug!(
"Pushing metrics to metrics log form container name/s {:?}",
container.names
);
metrics_log.lock().unwrap().push_metrics(cpu_metrics);
debug!("Logged metrics for container {}", container_id);
}
Some(Err(e)) => {
error!("Error getting stats for container {}: {}", container_id, e);
metrics_log.lock().unwrap().push_error(anyhow::anyhow!(
"Error getting stats for container {}: {}",
container_id,
e
));
}
None => {
error!("No stats received for container {}", container_id);
}
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
fn calculate_cpu_metrics(
container_id: &str,
container_name: String,
stats: &Stats,
core_count: &usize,
) -> CpuMetrics {
let cpu_delta =
stats.cpu_stats.cpu_usage.total_usage - stats.precpu_stats.cpu_usage.total_usage;
let system_delta = stats.cpu_stats.system_cpu_usage.unwrap_or(0)
- stats.precpu_stats.system_cpu_usage.unwrap_or(0);
let cpu_usage = if system_delta > 0 {
(cpu_delta as f64 / system_delta as f64)
* 100.0
* stats.cpu_stats.online_cpus.unwrap_or(1) as f64
} else {
0.0
};
let cpu_usage = if cpu_usage != 0.0 {
cpu_usage / *core_count as f64
} else {
0.0
};
debug!(
"Calculated CPU metrics for container {} ({}), cpu percentage: {}",
container_id, container_name, cpu_usage
);
CpuMetrics {
process_id: container_id.to_string(),
process_name: container_name,
cpu_usage: cpu_usage / *core_count as f64,
core_count: stats.cpu_stats.online_cpus.unwrap_or(1) as i32,
timestamp: Utc::now().timestamp_millis(),
}
}
pub async fn get_container_status(container_name: &str) -> anyhow::Result<String> {
let docker = Docker::connect_with_defaults().map_err(|e| {
error!("Failed to connect to Docker: {}", e);
anyhow::anyhow!("Failed to connect to Docker: {}", e)
})?;
debug!("Successfully connected to Docker");
let mut filter = HashMap::new();
filter.insert(String::from("name"), vec![container_name.to_string()]);
debug!("Listing containers with filter: {:?}", filter);
let containers = docker
.list_containers(Some(ListContainersOptions {
all: true,
filters: filter,
..Default::default()
}))
.await
.map_err(|e| {
error!("Failed to list containers: {}", e);
anyhow::anyhow!("Failed to list containers: {}", e)
})?;
debug!(
"Successfully listed containers. Count: {}",
containers.len()
);
if containers.is_empty() {
return Ok(String::from("not_found"));
}
let container = &containers[0];
let status = container.state.as_deref().unwrap_or("unknown").to_string();
debug!("Container '{}' status: {}", container_name, status);
Ok(status)
}
#[cfg(test)]
mod tests {
use crate::{
metrics::{CpuMetrics, MetricsLog},
metrics_logger::{
docker::{get_container_status, keep_logging},
StopHandle,
},
};
use bollard::{
container::{Config, CreateContainerOptions, RemoveContainerOptions},
image::{BuildImageOptions, RemoveImageOptions},
Docker,
};
use bytes::Bytes;
use chrono::Utc;
use core::time;
use futures_util::StreamExt;
use nanoid::nanoid;
use std::{
io::Cursor,
sync::{Arc, Mutex},
};
use tar::{Builder, Header};
use tokio::{task::JoinSet, time::sleep};
use tokio_util::sync::CancellationToken;
#[test]
fn test_metrics_log() {
let mut log = MetricsLog::new();
let metrics = CpuMetrics {
process_id: "123".to_string(),
process_name: "test".to_string(),
cpu_usage: 50.0,
core_count: 4,
timestamp: Utc::now().timestamp_millis(),
};
log.push_metrics(metrics);
assert_eq!(log.get_metrics().len(), 1);
log.push_error(anyhow::anyhow!("Error here"));
assert!(log.has_errors());
assert_eq!(log.get_errors().len(), 1);
}
async fn create_and_start_container(docker: &Docker) -> (String, String, String) {
let dockerfile = r#"
FROM busybox
CMD ["sleep", "infinity"]
"#;
let tar_bytes = {
let mut tar_buffer = Vec::new();
{
let mut tar_builder = Builder::new(&mut tar_buffer);
let mut header = Header::new_gnu();
header.set_path("Dockerfile").unwrap();
header.set_size(dockerfile.len() as u64);
header.set_mode(0o644);
header.set_cksum();
tar_builder
.append(&header, Cursor::new(dockerfile))
.unwrap();
tar_builder.finish().unwrap();
}
Bytes::from(tar_buffer)
};
let image_id = nanoid!(10, &nanoid::alphabet::SAFE[2..]).to_lowercase();
let image_id_latest = format!("{}:latest", image_id);
println!("{}", image_id_latest);
let options = BuildImageOptions {
dockerfile: "Dockerfile",
t: &image_id_latest,
..Default::default()
};
let mut build_stream = docker.build_image(options, None, Some(tar_bytes));
while let Some(output) = build_stream.next().await {
output.unwrap();
}
let container_name = format!(
"cardamon-test-container-{}",
nanoid!(10, &nanoid::alphabet::SAFE[2..]).to_lowercase()
);
let container = docker
.create_container(
Some(CreateContainerOptions {
name: container_name.as_str(),
..Default::default()
}),
Config {
image: Some(image_id_latest),
..Default::default()
},
)
.await
.unwrap();
docker
.start_container::<String>(&container.id, None)
.await
.unwrap();
(container.id, container_name, image_id)
}
#[tokio::test]
async fn test_container_status() {
let docker = Docker::connect_with_local_defaults().unwrap();
let (container_id, container_name, image_id) = create_and_start_container(&docker).await;
let status = get_container_status(&container_name).await.unwrap();
assert_eq!(status, "running", "Container should be in 'running' state");
cleanup_container(&docker, &container_id, &image_id).await;
}
async fn cleanup_container(docker: &Docker, container_id: &str, image_id: &str) {
docker
.remove_container(
container_id,
Some(RemoveContainerOptions {
force: true,
v: true,
link: false,
}),
)
.await
.unwrap();
docker
.remove_image(
image_id,
Some(RemoveImageOptions {
force: true,
noprune: false,
}),
None,
)
.await
.unwrap();
}
#[tokio::test]
async fn test_keep_logging() {
let metrics_log = MetricsLog::new();
let metrics_log_mutex = Mutex::new(metrics_log);
let shared_metrics_log = Arc::new(metrics_log_mutex);
let docker = Docker::connect_with_local_defaults().unwrap();
let (container_id, container_name, image_id) = create_and_start_container(&docker).await;
let token = CancellationToken::new();
let mut join_set = JoinSet::new();
let task_token = token.clone();
let task_metrics_log = shared_metrics_log.clone();
let task_container_name = container_name.clone();
join_set.spawn(async move {
println!("starting to record metrics");
tokio::select! {
_ = task_token.cancelled() => {}
_ = keep_logging(vec![task_container_name], task_metrics_log)=> {}
}
});
let stop_handle = StopHandle::new(token, join_set, shared_metrics_log);
sleep(time::Duration::new(2, 0)).await;
let metrics_log = stop_handle.stop().await.unwrap();
assert!(!metrics_log.has_errors());
assert!(!metrics_log.get_metrics().is_empty());
assert_eq!(
container_name,
metrics_log.get_metrics().first().unwrap().process_name
);
cleanup_container(&docker, &container_id, &image_id).await;
}
}