pub mod bare_metal;
pub mod docker;
use crate::{metrics::MetricsLog, ProcessToObserve};
use std::sync::{Arc, Mutex};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
pub struct StopHandle {
pub token: CancellationToken,
pub join_set: JoinSet<()>,
pub shared_metrics_log: Arc<Mutex<MetricsLog>>,
}
impl StopHandle {
fn new(
token: CancellationToken,
join_set: JoinSet<()>,
shared_metrics_log: Arc<Mutex<MetricsLog>>,
) -> Self {
Self {
token,
join_set,
shared_metrics_log,
}
}
pub async fn stop(mut self) -> anyhow::Result<MetricsLog> {
self.token.cancel();
loop {
if self.join_set.join_next().await.is_none() {
break;
}
}
let metrics_log = Arc::try_unwrap(self.shared_metrics_log)
.expect("Mutex guarding metrics_log shouldn't have multiple owners!")
.into_inner()
.expect("Should be able to take ownership of metrics_log");
if metrics_log.has_errors() {
return Err(anyhow::anyhow!(
"Metrics log contains errors, please check trace"
));
}
Ok(metrics_log)
}
}
pub fn start_logging<'a>(
processes_to_observe: Vec<ProcessToObserve>,
) -> anyhow::Result<StopHandle> {
let metrics_log = MetricsLog::new();
let metrics_log_mutex = Mutex::new(metrics_log);
let shared_metrics_log = Arc::new(metrics_log_mutex);
let mut a: Vec<ProcessToObserve> = vec![];
let mut b: Vec<ProcessToObserve> = vec![];
for proc in processes_to_observe {
match proc {
p @ ProcessToObserve::ExternalPid(_) => a.push(p.clone()),
p @ ProcessToObserve::ExternalContainers(_) => b.push(p.clone()),
p @ ProcessToObserve::ManagedPid {
process_name: _,
pid: _,
down: _,
} => a.push(p.clone()),
p @ ProcessToObserve::ManagedContainers {
process_name: _,
container_names: _,
down: _,
} => b.push(p.clone()),
}
}
let token = CancellationToken::new();
let mut join_set = JoinSet::new();
if !a.is_empty() {
let token = token.clone();
let shared_metrics_log = shared_metrics_log.clone();
tracing::debug!("Spawning bare metal thread");
join_set.spawn(async move {
tracing::info!("Logging PIDs: {:?}", a);
tokio::select! {
_ = token.cancelled() => {}
_ = bare_metal::keep_logging(
a,
shared_metrics_log,
) => {}
}
});
}
if !b.is_empty() {
let token = token.clone();
let shared_metrics_log = shared_metrics_log.clone();
join_set.spawn(async move {
tracing::info!("Logging containers: {:?}", b);
tokio::select! {
_ = token.cancelled() => {}
_ = docker::keep_logging(
b,
shared_metrics_log,
) => {}
}
});
}
Ok(StopHandle::new(token, join_set, shared_metrics_log))
}