zksync_node_jemalloc/
lib.rs

1//! Jemalloc tooling used by ZKsync node.
2
3use std::time::Duration;
4
5use anyhow::Context;
6use serde::Serialize;
7use tikv_jemalloc_ctl::stats_print;
8use tokio::sync::watch;
9use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
10
11pub use self::node::JemallocMonitorLayer;
12use crate::{
13    json::{GeneralStats, JemallocStats},
14    metrics::METRICS,
15};
16
17mod json;
18mod metrics;
19mod node;
20
21#[derive(Debug, Serialize)]
22struct JemallocHealthDetails {
23    version: &'static str,
24    compile_time_config: &'static str,
25    runtime_config: serde_json::Value,
26    stats: GeneralStats,
27}
28
29/// Monitors general Jemalloc statistics and reports them as a health check, logs and metrics.
30#[derive(Debug)]
31pub(crate) struct JemallocMonitor {
32    version: &'static str,
33    compile_time_config: &'static str,
34    runtime_config: Option<serde_json::Value>,
35    health: HealthUpdater,
36    update_interval: Duration,
37}
38
39impl JemallocMonitor {
40    const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
41
42    pub(crate) fn new() -> anyhow::Result<Self> {
43        let version =
44            tikv_jemalloc_ctl::version::read().context("failed reading Jemalloc version")?;
45        let version = version.strip_suffix('\0').unwrap_or(version);
46        let config = tikv_jemalloc_ctl::config::malloc_conf::read()
47            .context("failed reading compile-time Jemalloc config")?;
48        let config = config.strip_suffix('\0').unwrap_or(config);
49
50        Ok(Self {
51            version,
52            compile_time_config: config,
53            runtime_config: None, // to be overwritten on the first `update()`
54            health: ReactiveHealthCheck::new("jemalloc").1,
55            update_interval: Self::DEFAULT_UPDATE_INTERVAL,
56        })
57    }
58
59    pub fn health_check(&self) -> ReactiveHealthCheck {
60        self.health.subscribe()
61    }
62
63    fn update(&mut self) -> anyhow::Result<()> {
64        let mut options = stats_print::Options::default();
65        options.json_format = true;
66        options.skip_constants = self.runtime_config.is_some();
67        options.skip_per_arena = true;
68        options.skip_bin_size_classes = true;
69        options.skip_large_size_classes = true;
70        options.skip_mutex_statistics = true;
71
72        let mut buffer = vec![];
73        stats_print::stats_print(&mut buffer, options)
74            .context("failed collecting Jemalloc stats")?;
75        let JemallocStats::Jemalloc {
76            opt,
77            stats,
78            arena_stats,
79        } = serde_json::from_slice(&buffer).context("failed deserializing Jemalloc stats")?;
80        METRICS.observe_general_stats(&stats);
81        METRICS.observe_arena_stats(&arena_stats);
82
83        let runtime_config = &*self.runtime_config.get_or_insert_with(|| {
84            let opt = opt.unwrap_or_else(|| serde_json::json!({}));
85            tracing::info!(%opt, "Read Jemalloc runtime config");
86            opt
87        });
88
89        let health = Health::from(HealthStatus::Ready).with_details(JemallocHealthDetails {
90            version: self.version,
91            compile_time_config: self.compile_time_config,
92            runtime_config: runtime_config.clone(),
93            stats,
94        });
95        self.health.update(health);
96        Ok(())
97    }
98
99    pub(crate) async fn run(
100        mut self,
101        mut stop_receiver: watch::Receiver<bool>,
102    ) -> anyhow::Result<()> {
103        tracing::info!(
104            version = self.version,
105            config = self.compile_time_config,
106            "Initializing Jemalloc monitor"
107        );
108
109        while !*stop_receiver.borrow() {
110            if let Err(err) = self.update() {
111                tracing::warn!("Error updating Jemalloc stats: {err:#}");
112            }
113
114            if tokio::time::timeout(self.update_interval, stop_receiver.changed())
115                .await
116                .is_ok()
117            {
118                break;
119            }
120        }
121        tracing::info!("Received stop signal, Jemalloc monitor is shutting down");
122        Ok(())
123    }
124}