pub mod bare_metal;
pub mod docker;
use crate::{metrics::MetricsLog, ProcessToObserve};
use itertools::Itertools;
use std::sync::{Arc, Mutex};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
pub struct StopHandle {
token: CancellationToken,
join_set: JoinSet<()>,
shared_metrics_log: Arc<Mutex<MetricsLog>>,
}
impl StopHandle {
fn new(
token: CancellationToken,
join_set: JoinSet<()>,
shared_metrics_log: Arc<Mutex<MetricsLog>>,
) -> Self {
Self {
token,
join_set,
shared_metrics_log,
}
}
pub async fn stop(mut self) -> anyhow::Result<MetricsLog> {
self.token.cancel();
loop {
if self.join_set.join_next().await.is_none() {
break;
}
}
let metrics_log = Arc::try_unwrap(self.shared_metrics_log)
.expect("Mutex guarding metrics_log shouldn't have multiple owners!")
.into_inner()
.expect("Should be able to take ownership of metrics_log");
if metrics_log.has_errors() {
return Err(anyhow::anyhow!(
"Metrics log contains errors, please check trace"
));
}
Ok(metrics_log)
}
}
pub fn start_logging(processes_to_observe: &[ProcessToObserve]) -> anyhow::Result<StopHandle> {
let metrics_log = MetricsLog::new();
let metrics_log_mutex = Mutex::new(metrics_log);
let shared_metrics_log = Arc::new(metrics_log_mutex);
let (pids, container_names): (Vec<_>, Vec<_>) =
processes_to_observe
.iter()
.partition_map(|proc| match proc {
ProcessToObserve::Pid(_, id) => itertools::Either::Left(id),
ProcessToObserve::ContainerName(name) => itertools::Either::Right(name.clone()),
});
let token = CancellationToken::new();
let mut join_set = JoinSet::new();
if !pids.is_empty() {
let token = token.clone();
let shared_metrics_log = shared_metrics_log.clone();
tracing::debug!("Spawning bare metal thread");
join_set.spawn(async move {
tracing::info!("Logging PIDs: {:?}", pids);
tokio::select! {
_ = token.cancelled() => {}
_ = bare_metal::keep_logging(
pids,
shared_metrics_log,
) => {}
}
});
}
if !container_names.is_empty() {
let token = token.clone();
let shared_metrics_log = shared_metrics_log.clone();
join_set.spawn(async move {
tracing::info!("Logging containers: {:?}", container_names);
tokio::select! {
_ = token.cancelled() => {}
_ = docker::keep_logging(
container_names,
shared_metrics_log,
) => {}
}
});
}
Ok(StopHandle::new(token, join_set, shared_metrics_log))
}
pub async fn log_live(
_processes_to_observe: Vec<ProcessToObserve>,
_metrics_log: Arc<Mutex<MetricsLog>>,
) {
todo!("implement this!")
}