1use std::{
11 borrow::Cow,
12 collections::HashMap,
13 ffi::OsStr,
14 io,
15 path::PathBuf,
16 sync::{atomic::Ordering, Arc},
17 time::{SystemTime, UNIX_EPOCH},
18};
19
20use lading_capture::json;
21use metrics_util::registry::{AtomicStorage, Registry};
22use tokio::{
23 fs::File,
24 io::{AsyncWriteExt, BufWriter},
25 time::{self, Duration},
26};
27use tracing::{debug, info};
28use uuid::Uuid;
29
30use crate::signals::Shutdown;
31
32struct Inner {
33 registry: Registry<metrics::Key, AtomicStorage>,
34}
35
36#[allow(missing_debug_implementations)]
37pub struct CaptureManager {
43 fetch_index: u64,
44 run_id: Uuid,
45 capture_fp: BufWriter<File>,
46 capture_path: PathBuf,
47 shutdown: Shutdown,
48 inner: Arc<Inner>,
49 global_labels: HashMap<String, String>,
50}
51
52impl CaptureManager {
53 pub async fn new(capture_path: PathBuf, shutdown: Shutdown) -> Self {
59 let fp = File::create(&capture_path).await.unwrap();
60 Self {
61 run_id: Uuid::new_v4(),
62 fetch_index: 0,
63 capture_fp: BufWriter::new(fp),
64 capture_path,
65 shutdown,
66 inner: Arc::new(Inner {
67 registry: Registry::atomic(),
68 }),
69 global_labels: HashMap::new(),
70 }
71 }
72
73 pub fn install(&self) {
79 let recorder = CaptureRecorder {
80 inner: Arc::clone(&self.inner),
81 };
82 metrics::set_boxed_recorder(Box::new(recorder)).unwrap();
83 }
84
85 pub fn add_global_label<K, V>(&mut self, key: K, value: V)
87 where
88 K: Into<String>,
89 V: Into<String>,
90 {
91 self.global_labels.insert(key.into(), value.into());
92 }
93
94 async fn record_captures(&mut self) {
95 let now_ms: u128 = SystemTime::now()
96 .duration_since(UNIX_EPOCH)
97 .unwrap()
98 .as_millis();
99 let mut lines = Vec::new();
100 self.inner
101 .registry
102 .visit_counters(|key: &metrics::Key, counter| {
103 let mut labels = self.global_labels.clone();
104 for lbl in key.labels() {
105 labels.insert(lbl.key().into(), lbl.value().into());
107 }
108 let line = json::Line {
109 run_id: Cow::Borrowed(&self.run_id),
110 time: now_ms,
111 fetch_index: self.fetch_index,
112 metric_name: key.name().into(),
113 metric_kind: json::MetricKind::Counter,
114 value: json::LineValue::Int(counter.load(Ordering::Relaxed)),
115 labels,
116 };
117 lines.push(line);
118 });
119 self.inner
120 .registry
121 .visit_gauges(|key: &metrics::Key, gauge| {
122 let mut labels = self.global_labels.clone();
123 for lbl in key.labels() {
124 labels.insert(lbl.key().into(), lbl.value().into());
126 }
127 let value: f64 = f64::from_bits(gauge.load(Ordering::Relaxed));
128 let line = json::Line {
129 run_id: Cow::Borrowed(&self.run_id),
130 time: now_ms,
131 fetch_index: self.fetch_index,
132 metric_name: key.name().into(),
133 metric_kind: json::MetricKind::Gauge,
134 value: json::LineValue::Float(value),
135 labels,
136 };
137 lines.push(line);
138 });
139 debug!(
140 "Recording {} captures to {}",
141 lines.len(),
142 self.capture_path
143 .file_name()
144 .and_then(OsStr::to_str)
145 .unwrap()
146 );
147 for line in lines.drain(..) {
148 let pyld = serde_json::to_string(&line).unwrap();
149 self.capture_fp.write_all(pyld.as_bytes()).await.unwrap();
150 self.capture_fp.write_all(b"\n").await.unwrap();
151 }
152 }
153
154 pub async fn run(mut self) -> Result<(), io::Error> {
168 let mut write_delay = time::interval(Duration::from_secs(1));
169
170 loop {
171 tokio::select! {
172 _ = write_delay.tick() => {
173 self.record_captures().await;
174 self.fetch_index += 1;
175 }
176 _ = self.shutdown.recv() => {
177 self.record_captures().await;
178 info!("shutdown signal received");
179 return Ok(())
180 }
181 }
182 }
183 }
184}
185
186struct CaptureRecorder {
187 inner: Arc<Inner>,
188}
189
190impl metrics::Recorder for CaptureRecorder {
191 fn describe_counter(
192 &self,
193 _key: metrics::KeyName,
194 _unit: Option<metrics::Unit>,
195 _description: metrics::SharedString,
196 ) {
197 }
199
200 fn describe_gauge(
201 &self,
202 _key: metrics::KeyName,
203 _unit: Option<metrics::Unit>,
204 _description: metrics::SharedString,
205 ) {
206 }
208
209 fn describe_histogram(
210 &self,
211 _key: metrics::KeyName,
212 _unit: Option<metrics::Unit>,
213 _description: metrics::SharedString,
214 ) {
215 }
217
218 fn register_counter(&self, key: &metrics::Key) -> metrics::Counter {
219 self.inner
220 .registry
221 .get_or_create_counter(key, |c| c.clone().into())
222 }
223
224 fn register_gauge(&self, key: &metrics::Key) -> metrics::Gauge {
225 self.inner
226 .registry
227 .get_or_create_gauge(key, |c| c.clone().into())
228 }
229
230 fn register_histogram(&self, _key: &metrics::Key) -> metrics::Histogram {
231 unimplemented!()
233 }
234}