lading/
captures.rs

1//! Capture and record lading's internal metrics
2//!
3//! The manner in which lading instruments its target is pretty simple: we use
4//! the [`metrics`] library to record factual things about interaction with the
5//! target and then write all that out to disk for later analysis. This means
6//! that the generator, blackhole etc code are unaware of anything other than
7//! their [`metrics`] integration while [`CaptureManager`] need only hook into
8//! that same crate.
9
10use std::{
11    borrow::Cow,
12    collections::HashMap,
13    ffi::OsStr,
14    io,
15    path::PathBuf,
16    sync::{atomic::Ordering, Arc},
17    time::{SystemTime, UNIX_EPOCH},
18};
19
20use lading_capture::json;
21use metrics_util::registry::{AtomicStorage, Registry};
22use tokio::{
23    fs::File,
24    io::{AsyncWriteExt, BufWriter},
25    time::{self, Duration},
26};
27use tracing::{debug, info};
28use uuid::Uuid;
29
30use crate::signals::Shutdown;
31
32struct Inner {
33    registry: Registry<metrics::Key, AtomicStorage>,
34}
35
36#[allow(missing_debug_implementations)]
37/// Wrangles internal metrics into capture files
38///
39/// This struct is responsible for capturing all internal metrics sent through
40/// [`metrics`] and periodically writing them to disk with format
41/// [`json::Line`].
42pub struct CaptureManager {
43    fetch_index: u64,
44    run_id: Uuid,
45    capture_fp: BufWriter<File>,
46    capture_path: PathBuf,
47    shutdown: Shutdown,
48    inner: Arc<Inner>,
49    global_labels: HashMap<String, String>,
50}
51
52impl CaptureManager {
53    /// Create a new [`CaptureManager`]
54    ///
55    /// # Panics
56    ///
57    /// Function will panic if the underlying capture file cannot be opened.
58    pub async fn new(capture_path: PathBuf, shutdown: Shutdown) -> Self {
59        let fp = File::create(&capture_path).await.unwrap();
60        Self {
61            run_id: Uuid::new_v4(),
62            fetch_index: 0,
63            capture_fp: BufWriter::new(fp),
64            capture_path,
65            shutdown,
66            inner: Arc::new(Inner {
67                registry: Registry::atomic(),
68            }),
69            global_labels: HashMap::new(),
70        }
71    }
72
73    /// Install the [`CaptureManager`] as global [`metrics::Recorder`]
74    ///
75    /// # Panics
76    ///
77    /// Function will panic if there is already a global recorder set.
78    pub fn install(&self) {
79        let recorder = CaptureRecorder {
80            inner: Arc::clone(&self.inner),
81        };
82        metrics::set_boxed_recorder(Box::new(recorder)).unwrap();
83    }
84
85    /// Add a global label to all metrics managed by [`CaptureManager`].
86    pub fn add_global_label<K, V>(&mut self, key: K, value: V)
87    where
88        K: Into<String>,
89        V: Into<String>,
90    {
91        self.global_labels.insert(key.into(), value.into());
92    }
93
94    async fn record_captures(&mut self) {
95        let now_ms: u128 = SystemTime::now()
96            .duration_since(UNIX_EPOCH)
97            .unwrap()
98            .as_millis();
99        let mut lines = Vec::new();
100        self.inner
101            .registry
102            .visit_counters(|key: &metrics::Key, counter| {
103                let mut labels = self.global_labels.clone();
104                for lbl in key.labels() {
105                    // TODO we're allocating the same small strings over and over most likely
106                    labels.insert(lbl.key().into(), lbl.value().into());
107                }
108                let line = json::Line {
109                    run_id: Cow::Borrowed(&self.run_id),
110                    time: now_ms,
111                    fetch_index: self.fetch_index,
112                    metric_name: key.name().into(),
113                    metric_kind: json::MetricKind::Counter,
114                    value: json::LineValue::Int(counter.load(Ordering::Relaxed)),
115                    labels,
116                };
117                lines.push(line);
118            });
119        self.inner
120            .registry
121            .visit_gauges(|key: &metrics::Key, gauge| {
122                let mut labels = self.global_labels.clone();
123                for lbl in key.labels() {
124                    // TODO we're allocating the same small strings over and over most likely
125                    labels.insert(lbl.key().into(), lbl.value().into());
126                }
127                let value: f64 = f64::from_bits(gauge.load(Ordering::Relaxed));
128                let line = json::Line {
129                    run_id: Cow::Borrowed(&self.run_id),
130                    time: now_ms,
131                    fetch_index: self.fetch_index,
132                    metric_name: key.name().into(),
133                    metric_kind: json::MetricKind::Gauge,
134                    value: json::LineValue::Float(value),
135                    labels,
136                };
137                lines.push(line);
138            });
139        debug!(
140            "Recording {} captures to {}",
141            lines.len(),
142            self.capture_path
143                .file_name()
144                .and_then(OsStr::to_str)
145                .unwrap()
146        );
147        for line in lines.drain(..) {
148            let pyld = serde_json::to_string(&line).unwrap();
149            self.capture_fp.write_all(pyld.as_bytes()).await.unwrap();
150            self.capture_fp.write_all(b"\n").await.unwrap();
151        }
152    }
153
154    /// Run [`CaptureManager`] to completion
155    ///
156    /// Once a second any metrics produced by this program are flushed to disk
157    /// and this process only stops once an error occurs or a shutdown signal is
158    /// received.
159    ///
160    /// # Errors
161    ///
162    /// Function will error if underlying IO writes do not succeed.
163    ///
164    /// # Panics
165    ///
166    /// None known.
167    pub async fn run(mut self) -> Result<(), io::Error> {
168        let mut write_delay = time::interval(Duration::from_secs(1));
169
170        loop {
171            tokio::select! {
172                _ = write_delay.tick() => {
173                    self.record_captures().await;
174                    self.fetch_index += 1;
175                }
176                _ = self.shutdown.recv() => {
177                    self.record_captures().await;
178                    info!("shutdown signal received");
179                    return Ok(())
180                }
181            }
182        }
183    }
184}
185
186struct CaptureRecorder {
187    inner: Arc<Inner>,
188}
189
190impl metrics::Recorder for CaptureRecorder {
191    fn describe_counter(
192        &self,
193        _key: metrics::KeyName,
194        _unit: Option<metrics::Unit>,
195        _description: metrics::SharedString,
196    ) {
197        // nothing, intentionally
198    }
199
200    fn describe_gauge(
201        &self,
202        _key: metrics::KeyName,
203        _unit: Option<metrics::Unit>,
204        _description: metrics::SharedString,
205    ) {
206        // nothing, intentionally
207    }
208
209    fn describe_histogram(
210        &self,
211        _key: metrics::KeyName,
212        _unit: Option<metrics::Unit>,
213        _description: metrics::SharedString,
214    ) {
215        // nothing, intentionally
216    }
217
218    fn register_counter(&self, key: &metrics::Key) -> metrics::Counter {
219        self.inner
220            .registry
221            .get_or_create_counter(key, |c| c.clone().into())
222    }
223
224    fn register_gauge(&self, key: &metrics::Key) -> metrics::Gauge {
225        self.inner
226            .registry
227            .get_or_create_gauge(key, |c| c.clone().into())
228    }
229
230    fn register_histogram(&self, _key: &metrics::Key) -> metrics::Histogram {
231        // nothing, intentionally
232        unimplemented!()
233    }
234}