ckb_memory_tracker/
process.rs

1use std::{fs, io, str::FromStr, sync, thread, time};
2
3use ckb_logger::{error, info};
4use jemalloc_ctl::{epoch, stats};
5
6use crate::rocksdb::TrackRocksDBMemory;
7
8macro_rules! je_mib {
9    ($key:ty) => {
10        if let Ok(value) = <$key>::mib() {
11            value
12        } else {
13            error!("failed to lookup jemalloc mib for {}", stringify!($key));
14            return;
15        }
16    };
17}
18
19macro_rules! mib_read {
20    ($mib:ident) => {
21        if let Ok(value) = $mib.read() {
22            value as i64
23        } else {
24            error!("failed to read jemalloc stats for {}", stringify!($mib));
25            return;
26        }
27    };
28}
29
30/// Track the memory usage of the CKB process, Jemalloc and RocksDB through [ckb-metrics](../../ckb_metrics/index.html).
31pub fn track_current_process<Tracker: 'static + TrackRocksDBMemory + Sync + Send>(
32    interval: u64,
33    tracker_opt: Option<sync::Arc<Tracker>>,
34) {
35    if interval == 0 {
36        info!("track current process: disable");
37    } else {
38        info!("track current process: enable");
39        let wait_secs = time::Duration::from_secs(interval);
40
41        let je_epoch = je_mib!(epoch);
42        // Bytes allocated by the application.
43        let allocated = je_mib!(stats::allocated);
44        // Bytes in physically resident data pages mapped by the allocator.
45        let resident = je_mib!(stats::resident);
46        // Bytes in active pages allocated by the application.
47        let active = je_mib!(stats::active);
48        // Bytes in active extents mapped by the allocator.
49        let mapped = je_mib!(stats::mapped);
50        // Bytes in virtual memory mappings that were retained
51        // rather than being returned to the operating system
52        let retained = je_mib!(stats::retained);
53        // Bytes dedicated to jemalloc metadata.
54        let metadata = je_mib!(stats::metadata);
55
56        if let Err(err) = thread::Builder::new()
57            .name("MemoryTracker".to_string())
58            .spawn(move || {
59                loop {
60                    if je_epoch.advance().is_err() {
61                        error!("Failed to refresh the jemalloc stats");
62                        return;
63                    }
64                    if let Ok(memory) = get_current_process_memory() {
65                        // Resident set size, amount of non-swapped physical memory.
66                        let rss = memory.resident as i64;
67                        // Virtual memory size, total amount of memory.
68                        let vms = memory.size as i64;
69
70                        if let Some(metrics) = ckb_metrics::handle() {
71                            metrics.ckb_sys_mem_process.rss.set(rss);
72                            metrics.ckb_sys_mem_process.vms.set(vms);
73                        }
74
75                        let allocated = mib_read!(allocated);
76                        let resident = mib_read!(resident);
77                        let active = mib_read!(active);
78                        let mapped = mib_read!(mapped);
79                        let retained = mib_read!(retained);
80                        let metadata = mib_read!(metadata);
81                        if let Some(metrics) = ckb_metrics::handle() {
82                            metrics.ckb_sys_mem_jemalloc.allocated.set(allocated);
83                            metrics.ckb_sys_mem_jemalloc.resident.set(resident);
84                            metrics.ckb_sys_mem_jemalloc.active.set(active);
85                            metrics.ckb_sys_mem_jemalloc.mapped.set(mapped);
86                            metrics.ckb_sys_mem_jemalloc.retained.set(retained);
87                            metrics.ckb_sys_mem_jemalloc.metadata.set(metadata);
88
89                            if let Some(tracker) = tracker_opt.clone() {
90                                tracker.gather_memory_stats();
91                            }
92                        }
93                    } else {
94                        error!("Failed to fetch the memory information of the current process");
95                    }
96                    thread::sleep(wait_secs);
97                }
98            })
99        {
100            error!(
101                "failed to spawn the thread to track current process: {}",
102                err
103            );
104        }
105    }
106}
107
108#[derive(Debug)]
109pub struct Memory {
110    // Virtual memory size
111    size: u64,
112    // Size of physical memory being used
113    resident: u64,
114    // Number of shared pages
115    _shared: u64,
116    // The size of executable virtual memory owned by the program
117    _text: u64,
118    // Size of the program data segment and the user state stack
119    _data: u64,
120}
121
122impl FromStr for Memory {
123    type Err = io::Error;
124    fn from_str(value: &str) -> Result<Memory, io::Error> {
125        static PAGE_SIZE: std::sync::OnceLock<u64> = std::sync::OnceLock::new();
126        let page_size =
127            PAGE_SIZE.get_or_init(|| unsafe { libc::sysconf(libc::_SC_PAGESIZE) as u64 });
128        let mut parts = value.split_ascii_whitespace();
129        let size = parts
130            .next()
131            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
132            .and_then(|value| {
133                u64::from_str(value)
134                    .map(|value| value * *page_size)
135                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
136            })?;
137        let resident = parts
138            .next()
139            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
140            .and_then(|value| {
141                u64::from_str(value)
142                    .map(|value| value * *page_size)
143                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
144            })?;
145        let _shared = parts
146            .next()
147            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
148            .and_then(|value| {
149                u64::from_str(value)
150                    .map(|value| value * *page_size)
151                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
152            })?;
153        let _text = parts
154            .next()
155            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
156            .and_then(|value| {
157                u64::from_str(value)
158                    .map(|value| value * *page_size)
159                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
160            })?;
161        // ignore the size of the library in the virtual memory space of the task being imaged
162        let _lrs = parts.next();
163        let _data = parts
164            .next()
165            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
166            .and_then(|value| {
167                u64::from_str(value)
168                    .map(|value| value * *page_size)
169                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
170            })?;
171        Ok(Memory {
172            size,
173            resident,
174            _shared,
175            _text,
176            _data,
177        })
178    }
179}
180
181fn get_current_process_memory() -> Result<Memory, io::Error> {
182    static PID: std::sync::OnceLock<libc::pid_t> = std::sync::OnceLock::new();
183    let pid = PID.get_or_init(|| unsafe { libc::getpid() });
184    let content = fs::read_to_string(format!("/proc/{pid}/statm"))?;
185
186    Memory::from_str(&content)
187}