use std::{
borrow::Cow,
collections::HashMap,
ffi::OsStr,
io,
path::PathBuf,
sync::{atomic::Ordering, Arc},
time::{SystemTime, UNIX_EPOCH},
};
use lading_capture::json;
use metrics_util::registry::{AtomicStorage, Registry};
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
time::{self, Duration},
};
use tracing::{debug, info};
use uuid::Uuid;
use crate::signals::Shutdown;
struct Inner {
registry: Registry<metrics::Key, AtomicStorage>,
}
#[allow(missing_debug_implementations)]
pub struct CaptureManager {
fetch_index: u64,
run_id: Uuid,
capture_fp: BufWriter<File>,
capture_path: PathBuf,
shutdown: Shutdown,
inner: Arc<Inner>,
global_labels: HashMap<String, String>,
}
impl CaptureManager {
pub async fn new(capture_path: PathBuf, shutdown: Shutdown) -> Self {
let fp = File::create(&capture_path).await.unwrap();
Self {
run_id: Uuid::new_v4(),
fetch_index: 0,
capture_fp: BufWriter::new(fp),
capture_path,
shutdown,
inner: Arc::new(Inner {
registry: Registry::atomic(),
}),
global_labels: HashMap::new(),
}
}
pub fn install(&self) {
let recorder = CaptureRecorder {
inner: Arc::clone(&self.inner),
};
metrics::set_boxed_recorder(Box::new(recorder)).unwrap();
}
pub fn add_global_label<K, V>(&mut self, key: K, value: V)
where
K: Into<String>,
V: Into<String>,
{
self.global_labels.insert(key.into(), value.into());
}
async fn record_captures(&mut self) {
let now_ms: u128 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let mut lines = Vec::new();
self.inner
.registry
.visit_counters(|key: &metrics::Key, counter| {
let mut labels = self.global_labels.clone();
for lbl in key.labels() {
labels.insert(lbl.key().into(), lbl.value().into());
}
let line = json::Line {
run_id: Cow::Borrowed(&self.run_id),
time: now_ms,
fetch_index: self.fetch_index,
metric_name: key.name().into(),
metric_kind: json::MetricKind::Counter,
value: json::LineValue::Int(counter.load(Ordering::Relaxed)),
labels,
};
lines.push(line);
});
self.inner
.registry
.visit_gauges(|key: &metrics::Key, gauge| {
let mut labels = self.global_labels.clone();
for lbl in key.labels() {
labels.insert(lbl.key().into(), lbl.value().into());
}
let value: f64 = f64::from_bits(gauge.load(Ordering::Relaxed));
let line = json::Line {
run_id: Cow::Borrowed(&self.run_id),
time: now_ms,
fetch_index: self.fetch_index,
metric_name: key.name().into(),
metric_kind: json::MetricKind::Gauge,
value: json::LineValue::Float(value),
labels,
};
lines.push(line);
});
debug!(
"Recording {} captures to {}",
lines.len(),
self.capture_path
.file_name()
.and_then(OsStr::to_str)
.unwrap()
);
for line in lines.drain(..) {
let pyld = serde_json::to_string(&line).unwrap();
self.capture_fp.write_all(pyld.as_bytes()).await.unwrap();
self.capture_fp.write_all(b"\n").await.unwrap();
}
}
pub async fn run(mut self) -> Result<(), io::Error> {
let mut write_delay = time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = write_delay.tick() => {
self.record_captures().await;
self.fetch_index += 1;
}
_ = self.shutdown.recv() => {
self.record_captures().await;
info!("shutdown signal received");
return Ok(())
}
}
}
}
}
struct CaptureRecorder {
inner: Arc<Inner>,
}
impl metrics::Recorder for CaptureRecorder {
fn describe_counter(
&self,
_key: metrics::KeyName,
_unit: Option<metrics::Unit>,
_description: metrics::SharedString,
) {
}
fn describe_gauge(
&self,
_key: metrics::KeyName,
_unit: Option<metrics::Unit>,
_description: metrics::SharedString,
) {
}
fn describe_histogram(
&self,
_key: metrics::KeyName,
_unit: Option<metrics::Unit>,
_description: metrics::SharedString,
) {
}
fn register_counter(&self, key: &metrics::Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &metrics::Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, _key: &metrics::Key) -> metrics::Histogram {
unimplemented!()
}
}