ckb_memory_tracker/
process.rs1use std::{fs, io, str::FromStr, sync, thread, time};
2
3use ckb_logger::{error, info};
4use jemalloc_ctl::{epoch, stats};
5
6use crate::rocksdb::TrackRocksDBMemory;
7
8macro_rules! je_mib {
9 ($key:ty) => {
10 if let Ok(value) = <$key>::mib() {
11 value
12 } else {
13 error!("failed to lookup jemalloc mib for {}", stringify!($key));
14 return;
15 }
16 };
17}
18
19macro_rules! mib_read {
20 ($mib:ident) => {
21 if let Ok(value) = $mib.read() {
22 value as i64
23 } else {
24 error!("failed to read jemalloc stats for {}", stringify!($mib));
25 return;
26 }
27 };
28}
29
30pub fn track_current_process<Tracker: 'static + TrackRocksDBMemory + Sync + Send>(
32 interval: u64,
33 tracker_opt: Option<sync::Arc<Tracker>>,
34) {
35 if interval == 0 {
36 info!("track current process: disable");
37 } else {
38 info!("track current process: enable");
39 let wait_secs = time::Duration::from_secs(interval);
40
41 let je_epoch = je_mib!(epoch);
42 let allocated = je_mib!(stats::allocated);
44 let resident = je_mib!(stats::resident);
46 let active = je_mib!(stats::active);
48 let mapped = je_mib!(stats::mapped);
50 let retained = je_mib!(stats::retained);
53 let metadata = je_mib!(stats::metadata);
55
56 if let Err(err) = thread::Builder::new()
57 .name("MemoryTracker".to_string())
58 .spawn(move || {
59 loop {
60 if je_epoch.advance().is_err() {
61 error!("Failed to refresh the jemalloc stats");
62 return;
63 }
64 if let Ok(memory) = get_current_process_memory() {
65 let rss = memory.resident as i64;
67 let vms = memory.size as i64;
69
70 if let Some(metrics) = ckb_metrics::handle() {
71 metrics.ckb_sys_mem_process.rss.set(rss);
72 metrics.ckb_sys_mem_process.vms.set(vms);
73 }
74
75 let allocated = mib_read!(allocated);
76 let resident = mib_read!(resident);
77 let active = mib_read!(active);
78 let mapped = mib_read!(mapped);
79 let retained = mib_read!(retained);
80 let metadata = mib_read!(metadata);
81 if let Some(metrics) = ckb_metrics::handle() {
82 metrics.ckb_sys_mem_jemalloc.allocated.set(allocated);
83 metrics.ckb_sys_mem_jemalloc.resident.set(resident);
84 metrics.ckb_sys_mem_jemalloc.active.set(active);
85 metrics.ckb_sys_mem_jemalloc.mapped.set(mapped);
86 metrics.ckb_sys_mem_jemalloc.retained.set(retained);
87 metrics.ckb_sys_mem_jemalloc.metadata.set(metadata);
88
89 if let Some(tracker) = tracker_opt.clone() {
90 tracker.gather_memory_stats();
91 }
92 }
93 } else {
94 error!("Failed to fetch the memory information of the current process");
95 }
96 thread::sleep(wait_secs);
97 }
98 })
99 {
100 error!(
101 "failed to spawn the thread to track current process: {}",
102 err
103 );
104 }
105 }
106}
107
108#[derive(Debug)]
109pub struct Memory {
110 size: u64,
112 resident: u64,
114 _shared: u64,
116 _text: u64,
118 _data: u64,
120}
121
122impl FromStr for Memory {
123 type Err = io::Error;
124 fn from_str(value: &str) -> Result<Memory, io::Error> {
125 static PAGE_SIZE: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
126 let page_size =
127 PAGE_SIZE.get_or_init(|| unsafe { libc::sysconf(libc::_SC_PAGESIZE) as u64 });
128 let mut parts = value.split_ascii_whitespace();
129 let size = parts
130 .next()
131 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
132 .and_then(|value| {
133 u64::from_str(value)
134 .map(|value| value * *page_size)
135 .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
136 })?;
137 let resident = parts
138 .next()
139 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
140 .and_then(|value| {
141 u64::from_str(value)
142 .map(|value| value * *page_size)
143 .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
144 })?;
145 let _shared = parts
146 .next()
147 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
148 .and_then(|value| {
149 u64::from_str(value)
150 .map(|value| value * *page_size)
151 .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
152 })?;
153 let _text = parts
154 .next()
155 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
156 .and_then(|value| {
157 u64::from_str(value)
158 .map(|value| value * *page_size)
159 .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
160 })?;
161 let _lrs = parts.next();
163 let _data = parts
164 .next()
165 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
166 .and_then(|value| {
167 u64::from_str(value)
168 .map(|value| value * *page_size)
169 .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
170 })?;
171 Ok(Memory {
172 size,
173 resident,
174 _shared,
175 _text,
176 _data,
177 })
178 }
179}
180
181fn get_current_process_memory() -> Result<Memory, io::Error> {
182 static PID: std::sync::OnceLock<libc::pid_t> = std::sync::OnceLock::new();
183 let pid = PID.get_or_init(|| unsafe { libc::getpid() });
184 let content = fs::read_to_string(format!("/proc/{pid}/statm"))?;
185
186 Memory::from_str(&content)
187}