1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
use std::{fs, io, str::FromStr, sync, thread, time};

use ckb_logger::{error, info};
use jemalloc_ctl::{epoch, stats};

use crate::rocksdb::TrackRocksDBMemory;

macro_rules! je_mib {
    ($key:ty) => {
        if let Ok(value) = <$key>::mib() {
            value
        } else {
            error!("failed to lookup jemalloc mib for {}", stringify!($key));
            return;
        }
    };
}

macro_rules! mib_read {
    ($mib:ident) => {
        if let Ok(value) = $mib.read() {
            value as i64
        } else {
            error!("failed to read jemalloc stats for {}", stringify!($mib));
            return;
        }
    };
}

/// Track the memory usage of the CKB process, Jemalloc and RocksDB through [ckb-metrics](../../ckb_metrics/index.html).
pub fn track_current_process<Tracker: 'static + TrackRocksDBMemory + Sync + Send>(
    interval: u64,
    tracker_opt: Option<sync::Arc<Tracker>>,
) {
    if interval == 0 {
        info!("track current process: disable");
    } else {
        info!("track current process: enable");
        let wait_secs = time::Duration::from_secs(interval);

        let je_epoch = je_mib!(epoch);
        // Bytes allocated by the application.
        let allocated = je_mib!(stats::allocated);
        // Bytes in physically resident data pages mapped by the allocator.
        let resident = je_mib!(stats::resident);
        // Bytes in active pages allocated by the application.
        let active = je_mib!(stats::active);
        // Bytes in active extents mapped by the allocator.
        let mapped = je_mib!(stats::mapped);
        // Bytes in virtual memory mappings that were retained
        // rather than being returned to the operating system
        let retained = je_mib!(stats::retained);
        // Bytes dedicated to jemalloc metadata.
        let metadata = je_mib!(stats::metadata);

        if let Err(err) = thread::Builder::new()
            .name("MemoryTracker".to_string())
            .spawn(move || {
                loop {
                    if je_epoch.advance().is_err() {
                        error!("failed to refresh the jemalloc stats");
                        return;
                    }
                    if let Ok(memory) = get_current_process_memory() {
                        // Resident set size, amount of non-swapped physical memory.
                        let rss = memory.resident as i64;
                        // Virtual memory size, total amount of memory.
                        let vms = memory.size as i64;

                        if let Some(metrics) = ckb_metrics::handle() {
                            metrics.ckb_sys_mem_process.rss.set(rss);
                            metrics.ckb_sys_mem_process.vms.set(vms);
                        }

                        let allocated = mib_read!(allocated);
                        let resident = mib_read!(resident);
                        let active = mib_read!(active);
                        let mapped = mib_read!(mapped);
                        let retained = mib_read!(retained);
                        let metadata = mib_read!(metadata);
                        if let Some(metrics) = ckb_metrics::handle() {
                            metrics.ckb_sys_mem_jemalloc.allocated.set(allocated);
                            metrics.ckb_sys_mem_jemalloc.resident.set(resident);
                            metrics.ckb_sys_mem_jemalloc.active.set(active);
                            metrics.ckb_sys_mem_jemalloc.mapped.set(mapped);
                            metrics.ckb_sys_mem_jemalloc.retained.set(retained);
                            metrics.ckb_sys_mem_jemalloc.metadata.set(metadata);

                            if let Some(tracker) = tracker_opt.clone() {
                                tracker.gather_memory_stats();
                            }
                        }
                    } else {
                        error!("failed to fetch the memory information about current process");
                    }
                    thread::sleep(wait_secs);
                }
            })
        {
            error!(
                "failed to spawn the thread to track current process: {}",
                err
            );
        }
    }
}

#[derive(Debug)]
pub struct Memory {
    // Virtual memory size
    size: u64,
    // Size of physical memory being used
    resident: u64,
    // Number of shared pages
    _shared: u64,
    // The size of executable virtual memory owned by the program
    _text: u64,
    // Size of the program data segment and the user state stack
    _data: u64,
}

impl FromStr for Memory {
    type Err = io::Error;
    fn from_str(value: &str) -> Result<Memory, io::Error> {
        static PAGE_SIZE: once_cell::sync::OnceCell<u64> = once_cell::sync::OnceCell::new();
        let page_size =
            PAGE_SIZE.get_or_init(|| unsafe { libc::sysconf(libc::_SC_PAGESIZE) as u64 });
        let mut parts = value.split_ascii_whitespace();
        let size = parts
            .next()
            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
            .and_then(|value| {
                u64::from_str(value)
                    .map(|value| value * *page_size)
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
            })?;
        let resident = parts
            .next()
            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
            .and_then(|value| {
                u64::from_str(value)
                    .map(|value| value * *page_size)
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
            })?;
        let _shared = parts
            .next()
            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
            .and_then(|value| {
                u64::from_str(value)
                    .map(|value| value * *page_size)
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
            })?;
        let _text = parts
            .next()
            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
            .and_then(|value| {
                u64::from_str(value)
                    .map(|value| value * *page_size)
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
            })?;
        // ignore the size of the library in the virtual memory space of the task being imaged
        let _lrs = parts.next();
        let _data = parts
            .next()
            .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))
            .and_then(|value| {
                u64::from_str(value)
                    .map(|value| value * *page_size)
                    .map_err(|_| io::Error::from(io::ErrorKind::InvalidData))
            })?;
        Ok(Memory {
            size,
            resident,
            _shared,
            _text,
            _data,
        })
    }
}

fn get_current_process_memory() -> Result<Memory, io::Error> {
    static PID: once_cell::sync::OnceCell<libc::pid_t> = once_cell::sync::OnceCell::new();
    let pid = PID.get_or_init(|| unsafe { libc::getpid() });
    let content = fs::read_to_string(format!("/proc/{pid}/statm"))?;

    Memory::from_str(&content)
}