solana_metrics/
metrics.rs

1//! The `metrics` module enables sending measurements to an `InfluxDB` instance
2
3use {
4    crate::{counter::CounterPoint, datapoint::DataPoint},
5    crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError},
6    gethostname::gethostname,
7    log::*,
8    solana_cluster_type::ClusterType,
9    solana_sha256_hasher::hash,
10    std::{
11        cmp,
12        collections::HashMap,
13        convert::Into,
14        env,
15        fmt::Write,
16        sync::{Arc, Barrier, Mutex, Once, RwLock},
17        thread,
18        time::{Duration, Instant, UNIX_EPOCH},
19    },
20    thiserror::Error,
21};
22
23type CounterMap = HashMap<(&'static str, u64), CounterPoint>;
24
25#[derive(Debug, Error)]
26pub enum MetricsError {
27    #[error(transparent)]
28    VarError(#[from] env::VarError),
29    #[error(transparent)]
30    ReqwestError(#[from] reqwest::Error),
31    #[error("SOLANA_METRICS_CONFIG is invalid: '{0}'")]
32    ConfigInvalid(String),
33    #[error("SOLANA_METRICS_CONFIG is incomplete")]
34    ConfigIncomplete,
35    #[error("SOLANA_METRICS_CONFIG database mismatch: {0}")]
36    DbMismatch(String),
37}
38
39impl From<MetricsError> for String {
40    fn from(error: MetricsError) -> Self {
41        error.to_string()
42    }
43}
44
45impl From<&CounterPoint> for DataPoint {
46    fn from(counter_point: &CounterPoint) -> Self {
47        let mut point = Self::new(counter_point.name);
48        point.timestamp = counter_point.timestamp;
49        point.add_field_i64("count", counter_point.count);
50        point
51    }
52}
53
54#[derive(Debug)]
55enum MetricsCommand {
56    Flush(Arc<Barrier>),
57    Submit(DataPoint, log::Level),
58    SubmitCounter(CounterPoint, log::Level, u64),
59}
60
61pub struct MetricsAgent {
62    sender: Sender<MetricsCommand>,
63}
64
65pub trait MetricsWriter {
66    // Write the points and empty the vector.  Called on the internal
67    // MetricsAgent worker thread.
68    fn write(&self, points: Vec<DataPoint>);
69}
70
71struct InfluxDbMetricsWriter {
72    write_url: Option<String>,
73}
74
75impl InfluxDbMetricsWriter {
76    fn new() -> Self {
77        Self {
78            write_url: Self::build_write_url().ok(),
79        }
80    }
81
82    fn build_write_url() -> Result<String, MetricsError> {
83        let config = get_metrics_config().map_err(|err| {
84            info!("metrics disabled: {err}");
85            err
86        })?;
87
88        info!(
89            "metrics configuration: host={} db={} username={}",
90            config.host, config.db, config.username
91        );
92
93        let write_url = format!(
94            "{}/write?db={}&u={}&p={}&precision=n",
95            &config.host, &config.db, &config.username, &config.password
96        );
97
98        Ok(write_url)
99    }
100}
101
102pub fn serialize_points(points: &Vec<DataPoint>, host_id: &str) -> String {
103    const TIMESTAMP_LEN: usize = 20;
104    const HOST_ID_LEN: usize = 8; // "host_id=".len()
105    const EXTRA_LEN: usize = 2; // "=,".len()
106    let mut len = 0;
107    for point in points {
108        for (name, value) in &point.fields {
109            len += name.len() + value.len() + EXTRA_LEN;
110        }
111        for (name, value) in &point.tags {
112            len += name.len() + value.len() + EXTRA_LEN;
113        }
114        len += point.name.len();
115        len += TIMESTAMP_LEN;
116        len += host_id.len() + HOST_ID_LEN;
117    }
118    let mut line = String::with_capacity(len);
119    for point in points {
120        let _ = write!(line, "{},host_id={}", &point.name, host_id);
121        for (name, value) in point.tags.iter() {
122            let _ = write!(line, ",{name}={value}");
123        }
124
125        let mut first = true;
126        for (name, value) in point.fields.iter() {
127            let _ = write!(line, "{}{}={}", if first { ' ' } else { ',' }, name, value);
128            first = false;
129        }
130        let timestamp = point.timestamp.duration_since(UNIX_EPOCH);
131        let nanos = timestamp.unwrap().as_nanos();
132        let _ = writeln!(line, " {nanos}");
133    }
134    line
135}
136
137impl MetricsWriter for InfluxDbMetricsWriter {
138    fn write(&self, points: Vec<DataPoint>) {
139        if let Some(ref write_url) = self.write_url {
140            debug!("submitting {} points", points.len());
141
142            let host_id = HOST_ID.read().unwrap();
143
144            let line = serialize_points(&points, &host_id);
145
146            let client = reqwest::blocking::Client::builder()
147                .timeout(Duration::from_secs(5))
148                .build();
149            let client = match client {
150                Ok(client) => client,
151                Err(err) => {
152                    warn!("client instantiation failed: {err}");
153                    return;
154                }
155            };
156
157            let response = client.post(write_url.as_str()).body(line).send();
158            if let Ok(resp) = response {
159                let status = resp.status();
160                if !status.is_success() {
161                    let text = resp
162                        .text()
163                        .unwrap_or_else(|_| "[text body empty]".to_string());
164                    warn!("submit response unsuccessful: {status} {text}",);
165                }
166            } else {
167                warn!("submit error: {}", response.unwrap_err());
168            }
169        }
170    }
171}
172
173impl Default for MetricsAgent {
174    fn default() -> Self {
175        let max_points_per_sec = env::var("SOLANA_METRICS_MAX_POINTS_PER_SECOND")
176            .map(|x| {
177                x.parse()
178                    .expect("Failed to parse SOLANA_METRICS_MAX_POINTS_PER_SECOND")
179            })
180            .unwrap_or(4000);
181
182        Self::new(
183            Arc::new(InfluxDbMetricsWriter::new()),
184            Duration::from_secs(10),
185            max_points_per_sec,
186        )
187    }
188}
189
190impl MetricsAgent {
191    pub fn new(
192        writer: Arc<dyn MetricsWriter + Send + Sync>,
193        write_frequency: Duration,
194        max_points_per_sec: usize,
195    ) -> Self {
196        let (sender, receiver) = unbounded::<MetricsCommand>();
197
198        thread::Builder::new()
199            .name("solMetricsAgent".into())
200            .spawn(move || Self::run(&receiver, &writer, write_frequency, max_points_per_sec))
201            .unwrap();
202
203        Self { sender }
204    }
205
206    // Combines `points` and `counters` into a single array of `DataPoint`s, appending a data point
207    // with the metrics stats at the end.
208    //
209    // Limits the number of produced points to the `max_points` value.  Takes `points` followed by
210    // `counters`, dropping `counters` first.
211    //
212    // `max_points_per_sec` is only used in a warning message.
213    // `points_buffered` is used in the stats.
214    fn combine_points(
215        max_points: usize,
216        max_points_per_sec: usize,
217        secs_since_last_write: u64,
218        points_buffered: usize,
219        points: &mut Vec<DataPoint>,
220        counters: &mut CounterMap,
221    ) -> Vec<DataPoint> {
222        // Reserve one slot for the stats point we will add at the end.
223        let max_points = max_points.saturating_sub(1);
224
225        let num_points = points.len().saturating_add(counters.len());
226        let fit_counters = max_points.saturating_sub(points.len());
227        let points_written = cmp::min(num_points, max_points);
228
229        debug!("run: attempting to write {num_points} points");
230
231        if num_points > max_points {
232            warn!(
233                "Max submission rate of {max_points_per_sec} datapoints per second exceeded. Only \
234                 the first {max_points} of {num_points} points will be submitted."
235            );
236        }
237
238        let mut combined = std::mem::take(points);
239        combined.truncate(points_written);
240
241        combined.extend(counters.values().take(fit_counters).map(|v| v.into()));
242        counters.clear();
243
244        combined.push(
245            DataPoint::new("metrics")
246                .add_field_i64("points_written", points_written as i64)
247                .add_field_i64("num_points", num_points as i64)
248                .add_field_i64("points_lost", (num_points - points_written) as i64)
249                .add_field_i64("points_buffered", points_buffered as i64)
250                .add_field_i64("secs_since_last_write", secs_since_last_write as i64)
251                .to_owned(),
252        );
253
254        combined
255    }
256
257    // Consumes provided `points`, sending up to `max_points` of them into the `writer`.
258    //
259    // Returns an updated value for `last_write_time`.  Which is equal to `Instant::now()`, just
260    // before `write` in updated.
261    fn write(
262        writer: &Arc<dyn MetricsWriter + Send + Sync>,
263        max_points: usize,
264        max_points_per_sec: usize,
265        last_write_time: Instant,
266        points_buffered: usize,
267        points: &mut Vec<DataPoint>,
268        counters: &mut CounterMap,
269    ) -> Instant {
270        let now = Instant::now();
271        let secs_since_last_write = now.duration_since(last_write_time).as_secs();
272
273        writer.write(Self::combine_points(
274            max_points,
275            max_points_per_sec,
276            secs_since_last_write,
277            points_buffered,
278            points,
279            counters,
280        ));
281
282        now
283    }
284
285    fn run(
286        receiver: &Receiver<MetricsCommand>,
287        writer: &Arc<dyn MetricsWriter + Send + Sync>,
288        write_frequency: Duration,
289        max_points_per_sec: usize,
290    ) {
291        trace!("run: enter");
292        let mut last_write_time = Instant::now();
293        let mut points = Vec::<DataPoint>::new();
294        let mut counters = CounterMap::new();
295
296        let max_points = write_frequency.as_secs() as usize * max_points_per_sec;
297
298        // Bind common arguments in the `Self::write()` call.
299        let write = |last_write_time: Instant,
300                     points: &mut Vec<DataPoint>,
301                     counters: &mut CounterMap|
302         -> Instant {
303            Self::write(
304                writer,
305                max_points,
306                max_points_per_sec,
307                last_write_time,
308                receiver.len(),
309                points,
310                counters,
311            )
312        };
313
314        loop {
315            match receiver.try_recv() {
316                Ok(cmd) => match cmd {
317                    MetricsCommand::Flush(barrier) => {
318                        debug!("metrics_thread: flush");
319                        last_write_time = write(last_write_time, &mut points, &mut counters);
320                        barrier.wait();
321                    }
322                    MetricsCommand::Submit(point, level) => {
323                        log!(level, "{point}");
324                        points.push(point);
325                    }
326                    MetricsCommand::SubmitCounter(counter, _level, bucket) => {
327                        debug!("{counter:?}");
328                        let key = (counter.name, bucket);
329                        if let Some(value) = counters.get_mut(&key) {
330                            value.count += counter.count;
331                        } else {
332                            counters.insert(key, counter);
333                        }
334                    }
335                },
336                Err(TryRecvError::Empty) => {
337                    std::thread::sleep(Duration::from_millis(5));
338                }
339                Err(TryRecvError::Disconnected) => {
340                    debug!("run: sender disconnected");
341                    break;
342                }
343            };
344
345            let now = Instant::now();
346            if now.duration_since(last_write_time) >= write_frequency {
347                last_write_time = write(last_write_time, &mut points, &mut counters);
348            }
349        }
350
351        debug_assert!(
352            points.is_empty() && counters.is_empty(),
353            "Controlling `MetricsAgent` is expected to call `flush()` from the `Drop` \
354             implementation, before exiting. So both `points` and `counters` must be empty at \
355             this point. `points`: {points:?}, `counters`: {counters:?}",
356        );
357
358        trace!("run: exit");
359    }
360
361    pub fn submit(&self, point: DataPoint, level: log::Level) {
362        self.sender
363            .send(MetricsCommand::Submit(point, level))
364            .unwrap();
365    }
366
367    pub fn submit_counter(&self, counter: CounterPoint, level: log::Level, bucket: u64) {
368        self.sender
369            .send(MetricsCommand::SubmitCounter(counter, level, bucket))
370            .unwrap();
371    }
372
373    pub fn flush(&self) {
374        debug!("Flush");
375        let barrier = Arc::new(Barrier::new(2));
376        self.sender
377            .send(MetricsCommand::Flush(Arc::clone(&barrier)))
378            .unwrap();
379
380        barrier.wait();
381    }
382}
383
384impl Drop for MetricsAgent {
385    fn drop(&mut self) {
386        self.flush();
387    }
388}
389
390fn get_singleton_agent() -> &'static MetricsAgent {
391    static AGENT: std::sync::LazyLock<MetricsAgent> =
392        std::sync::LazyLock::new(MetricsAgent::default);
393    &AGENT
394}
395
396static HOST_ID: std::sync::LazyLock<RwLock<String>> = std::sync::LazyLock::new(|| {
397    RwLock::new({
398        let hostname: String = gethostname()
399            .into_string()
400            .unwrap_or_else(|_| "".to_string());
401        format!("{}", hash(hostname.as_bytes()))
402    })
403});
404
405pub fn set_host_id(host_id: String) {
406    info!("host id: {host_id}");
407    *HOST_ID.write().unwrap() = host_id;
408}
409
410pub fn get_host_id() -> String {
411    HOST_ID.read().unwrap().clone()
412}
413
414/// Submits a new point from any thread.  Note that points are internally queued
415/// and transmitted periodically in batches.
416pub fn submit(point: DataPoint, level: log::Level) {
417    let agent = get_singleton_agent();
418    agent.submit(point, level);
419}
420
421/// Submits a new counter or updates an existing counter from any thread.  Note that points are
422/// internally queued and transmitted periodically in batches.
423pub(crate) fn submit_counter(point: CounterPoint, level: log::Level, bucket: u64) {
424    let agent = get_singleton_agent();
425    agent.submit_counter(point, level, bucket);
426}
427
428#[derive(Debug, Default)]
429struct MetricsConfig {
430    pub host: String,
431    pub db: String,
432    pub username: String,
433    pub password: String,
434}
435
436impl MetricsConfig {
437    fn complete(&self) -> bool {
438        !(self.host.is_empty()
439            || self.db.is_empty()
440            || self.username.is_empty()
441            || self.password.is_empty())
442    }
443}
444
445fn get_metrics_config() -> Result<MetricsConfig, MetricsError> {
446    let mut config = MetricsConfig::default();
447    let config_var = env::var("SOLANA_METRICS_CONFIG")?;
448    if config_var.is_empty() {
449        Err(env::VarError::NotPresent)?;
450    }
451
452    for pair in config_var.split(',') {
453        let nv: Vec<_> = pair.split('=').collect();
454        if nv.len() != 2 {
455            return Err(MetricsError::ConfigInvalid(pair.to_string()));
456        }
457        let v = nv[1].to_string();
458        match nv[0] {
459            "host" => config.host = v,
460            "db" => config.db = v,
461            "u" => config.username = v,
462            "p" => config.password = v,
463            _ => return Err(MetricsError::ConfigInvalid(pair.to_string())),
464        }
465    }
466
467    if !config.complete() {
468        return Err(MetricsError::ConfigIncomplete);
469    }
470
471    Ok(config)
472}
473
474pub fn metrics_config_sanity_check(cluster_type: ClusterType) -> Result<(), MetricsError> {
475    let config = match get_metrics_config() {
476        Ok(config) => config,
477        Err(MetricsError::VarError(env::VarError::NotPresent)) => return Ok(()),
478        Err(e) => return Err(e),
479    };
480    match &config.db[..] {
481        "mainnet-beta" if cluster_type != ClusterType::MainnetBeta => (),
482        "tds" if cluster_type != ClusterType::Testnet => (),
483        "devnet" if cluster_type != ClusterType::Devnet => (),
484        _ => return Ok(()),
485    };
486    let (host, db) = (&config.host, &config.db);
487    let msg = format!("cluster_type={cluster_type:?} host={host} database={db}");
488    Err(MetricsError::DbMismatch(msg))
489}
490
491pub fn query(q: &str) -> Result<String, MetricsError> {
492    let config = get_metrics_config()?;
493    let query_url = format!(
494        "{}/query?u={}&p={}&q={}",
495        &config.host, &config.username, &config.password, &q
496    );
497
498    let response = reqwest::blocking::get(query_url.as_str())?.text()?;
499
500    Ok(response)
501}
502
503/// Blocks until all pending points from previous calls to `submit` have been
504/// transmitted.
505pub fn flush() {
506    let agent = get_singleton_agent();
507    agent.flush();
508}
509
510/// Hook the panic handler to generate a data point on each panic
511pub fn set_panic_hook(program: &'static str, version: Option<String>) {
512    static SET_HOOK: Once = Once::new();
513    SET_HOOK.call_once(|| {
514        let default_hook = std::panic::take_hook();
515        std::panic::set_hook(Box::new(move |ono| {
516            default_hook(ono);
517            let location = match ono.location() {
518                Some(location) => location.to_string(),
519                None => "?".to_string(),
520            };
521            submit(
522                DataPoint::new("panic")
523                    .add_field_str("program", program)
524                    .add_field_str("thread", thread::current().name().unwrap_or("?"))
525                    // The 'one' field exists to give Kapacitor Alerts a numerical value
526                    // to filter on
527                    .add_field_i64("one", 1)
528                    .add_field_str("message", &ono.to_string())
529                    .add_field_str("location", &location)
530                    .add_field_str("version", version.as_ref().unwrap_or(&"".to_string()))
531                    .to_owned(),
532                Level::Error,
533            );
534            // Flush metrics immediately
535            flush();
536
537            // Exit cleanly so the process don't limp along in a half-dead state
538            std::process::exit(1);
539        }));
540    });
541}
542
543pub mod test_mocks {
544    use super::*;
545
546    pub struct MockMetricsWriter {
547        pub points_written: Arc<Mutex<Vec<DataPoint>>>,
548    }
549    impl MockMetricsWriter {
550        pub fn new() -> Self {
551            MockMetricsWriter {
552                points_written: Arc::new(Mutex::new(Vec::new())),
553            }
554        }
555
556        pub fn points_written(&self) -> usize {
557            self.points_written.lock().unwrap().len()
558        }
559    }
560
561    impl Default for MockMetricsWriter {
562        fn default() -> Self {
563            Self::new()
564        }
565    }
566
567    impl MetricsWriter for MockMetricsWriter {
568        fn write(&self, points: Vec<DataPoint>) {
569            assert!(!points.is_empty());
570
571            let new_points = points.len();
572            self.points_written.lock().unwrap().extend(points);
573
574            info!(
575                "Writing {} points ({} total)",
576                new_points,
577                self.points_written(),
578            );
579        }
580    }
581}
582
583#[cfg(test)]
584mod test {
585    use {super::*, test_mocks::MockMetricsWriter};
586
587    #[test]
588    fn test_submit() {
589        let writer = Arc::new(MockMetricsWriter::new());
590        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
591
592        for i in 0..42 {
593            agent.submit(
594                DataPoint::new("measurement")
595                    .add_field_i64("i", i)
596                    .to_owned(),
597                Level::Info,
598            );
599        }
600
601        agent.flush();
602        assert_eq!(writer.points_written(), 43);
603    }
604
605    #[test]
606    fn test_submit_counter() {
607        let writer = Arc::new(MockMetricsWriter::new());
608        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
609
610        for i in 0..10 {
611            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i);
612            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i);
613        }
614
615        agent.flush();
616        assert_eq!(writer.points_written(), 21);
617    }
618
619    #[test]
620    fn test_submit_counter_increment() {
621        let writer = Arc::new(MockMetricsWriter::new());
622        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
623
624        for _ in 0..10 {
625            agent.submit_counter(
626                CounterPoint {
627                    name: "counter",
628                    count: 10,
629                    timestamp: UNIX_EPOCH,
630                },
631                Level::Info,
632                0, // use the same bucket
633            );
634        }
635
636        agent.flush();
637        assert_eq!(writer.points_written(), 2);
638
639        let submitted_point = writer.points_written.lock().unwrap()[0].clone();
640        assert_eq!(submitted_point.fields[0], ("count", "100i".to_string()));
641    }
642
643    #[test]
644    fn test_submit_bucketed_counter() {
645        let writer = Arc::new(MockMetricsWriter::new());
646        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(10), 1000);
647
648        for i in 0..50 {
649            agent.submit_counter(CounterPoint::new("counter 1"), Level::Info, i / 10);
650            agent.submit_counter(CounterPoint::new("counter 2"), Level::Info, i / 10);
651        }
652
653        agent.flush();
654        assert_eq!(writer.points_written(), 11);
655    }
656
657    #[test]
658    fn test_submit_with_delay() {
659        let writer = Arc::new(MockMetricsWriter::new());
660        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), 1000);
661
662        agent.submit(DataPoint::new("point 1"), Level::Info);
663        thread::sleep(Duration::from_secs(2));
664        assert_eq!(writer.points_written(), 2);
665    }
666
667    #[test]
668    fn test_submit_exceed_max_rate() {
669        let writer = Arc::new(MockMetricsWriter::new());
670
671        let max_points_per_sec = 100;
672
673        let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(1), max_points_per_sec);
674
675        for i in 0..(max_points_per_sec + 20) {
676            agent.submit(
677                DataPoint::new("measurement")
678                    .add_field_i64("i", i.try_into().unwrap())
679                    .to_owned(),
680                Level::Info,
681            );
682        }
683
684        agent.flush();
685
686        // We are expecting `max_points_per_sec - 1` data points from `submit()` and one more metric
687        // stats data points.
688        assert_eq!(writer.points_written(), max_points_per_sec);
689    }
690
691    #[test]
692    fn test_multithread_submit() {
693        let writer = Arc::new(MockMetricsWriter::new());
694        let agent = Arc::new(Mutex::new(MetricsAgent::new(
695            writer.clone(),
696            Duration::from_secs(10),
697            1000,
698        )));
699
700        //
701        // Submit measurements from different threads
702        //
703        let mut threads = Vec::new();
704        for i in 0..42 {
705            let mut point = DataPoint::new("measurement");
706            point.add_field_i64("i", i);
707            let agent = Arc::clone(&agent);
708            threads.push(thread::spawn(move || {
709                agent.lock().unwrap().submit(point, Level::Info);
710            }));
711        }
712
713        for thread in threads {
714            thread.join().unwrap();
715        }
716
717        agent.lock().unwrap().flush();
718        assert_eq!(writer.points_written(), 43);
719    }
720
721    #[test]
722    fn test_flush_before_drop() {
723        let writer = Arc::new(MockMetricsWriter::new());
724        {
725            let agent = MetricsAgent::new(writer.clone(), Duration::from_secs(9_999_999), 1000);
726            agent.submit(DataPoint::new("point 1"), Level::Info);
727        }
728
729        // The datapoints we expect to see are:
730        // 1. `point 1` from the above.
731        // 2. `metrics` stats submitted as a result of the `Flush` sent by `agent` being destroyed.
732        assert_eq!(writer.points_written(), 2);
733    }
734
735    #[test]
736    fn test_live_submit() {
737        let agent = MetricsAgent::default();
738
739        let point = DataPoint::new("live_submit_test")
740            .add_field_bool("true", true)
741            .add_field_bool("random_bool", rand::random::<u8>() < 128)
742            .add_field_i64("random_int", rand::random::<u8>() as i64)
743            .to_owned();
744        agent.submit(point, Level::Info);
745    }
746
747    #[test]
748    fn test_host_id() {
749        let test_host_id = "test_host_123".to_string();
750        set_host_id(test_host_id.clone());
751        assert_eq!(get_host_id(), test_host_id);
752    }
753}