zksync_node_jemalloc/
lib.rs1use std::time::Duration;
4
5use anyhow::Context;
6use serde::Serialize;
7use tikv_jemalloc_ctl::stats_print;
8use tokio::sync::watch;
9use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
10
11pub use self::node::JemallocMonitorLayer;
12use crate::{
13 json::{GeneralStats, JemallocStats},
14 metrics::METRICS,
15};
16
17mod json;
18mod metrics;
19mod node;
20
21#[derive(Debug, Serialize)]
22struct JemallocHealthDetails {
23 version: &'static str,
24 compile_time_config: &'static str,
25 runtime_config: serde_json::Value,
26 stats: GeneralStats,
27}
28
29#[derive(Debug)]
31pub(crate) struct JemallocMonitor {
32 version: &'static str,
33 compile_time_config: &'static str,
34 runtime_config: Option<serde_json::Value>,
35 health: HealthUpdater,
36 update_interval: Duration,
37}
38
39impl JemallocMonitor {
40 const DEFAULT_UPDATE_INTERVAL: Duration = Duration::from_secs(60);
41
42 pub(crate) fn new() -> anyhow::Result<Self> {
43 let version =
44 tikv_jemalloc_ctl::version::read().context("failed reading Jemalloc version")?;
45 let version = version.strip_suffix('\0').unwrap_or(version);
46 let config = tikv_jemalloc_ctl::config::malloc_conf::read()
47 .context("failed reading compile-time Jemalloc config")?;
48 let config = config.strip_suffix('\0').unwrap_or(config);
49
50 Ok(Self {
51 version,
52 compile_time_config: config,
53 runtime_config: None, health: ReactiveHealthCheck::new("jemalloc").1,
55 update_interval: Self::DEFAULT_UPDATE_INTERVAL,
56 })
57 }
58
59 pub fn health_check(&self) -> ReactiveHealthCheck {
60 self.health.subscribe()
61 }
62
63 fn update(&mut self) -> anyhow::Result<()> {
64 let mut options = stats_print::Options::default();
65 options.json_format = true;
66 options.skip_constants = self.runtime_config.is_some();
67 options.skip_per_arena = true;
68 options.skip_bin_size_classes = true;
69 options.skip_large_size_classes = true;
70 options.skip_mutex_statistics = true;
71
72 let mut buffer = vec![];
73 stats_print::stats_print(&mut buffer, options)
74 .context("failed collecting Jemalloc stats")?;
75 let JemallocStats::Jemalloc {
76 opt,
77 stats,
78 arena_stats,
79 } = serde_json::from_slice(&buffer).context("failed deserializing Jemalloc stats")?;
80 METRICS.observe_general_stats(&stats);
81 METRICS.observe_arena_stats(&arena_stats);
82
83 let runtime_config = &*self.runtime_config.get_or_insert_with(|| {
84 let opt = opt.unwrap_or_else(|| serde_json::json!({}));
85 tracing::info!(%opt, "Read Jemalloc runtime config");
86 opt
87 });
88
89 let health = Health::from(HealthStatus::Ready).with_details(JemallocHealthDetails {
90 version: self.version,
91 compile_time_config: self.compile_time_config,
92 runtime_config: runtime_config.clone(),
93 stats,
94 });
95 self.health.update(health);
96 Ok(())
97 }
98
99 pub(crate) async fn run(
100 mut self,
101 mut stop_receiver: watch::Receiver<bool>,
102 ) -> anyhow::Result<()> {
103 tracing::info!(
104 version = self.version,
105 config = self.compile_time_config,
106 "Initializing Jemalloc monitor"
107 );
108
109 while !*stop_receiver.borrow() {
110 if let Err(err) = self.update() {
111 tracing::warn!("Error updating Jemalloc stats: {err:#}");
112 }
113
114 if tokio::time::timeout(self.update_interval, stop_receiver.changed())
115 .await
116 .is_ok()
117 {
118 break;
119 }
120 }
121 tracing::info!("Received stop signal, Jemalloc monitor is shutting down");
122 Ok(())
123 }
124}