nydus_utils/
metrics.rs

1// Copyright 2020 Ant Group. All rights reserved.
2//
3// SPDX-License-Identifier: Apache-2.0
4
5//! Nydus error events and performance related metrics.
6//!
7//! There are several types of metrics supported:
8//! - Global error events of type [`ErrorHolder`]
9//! - Storage backend metrics of type ['BackendMetrics']
10//! - Blobcache metrics of type ['BlobcacheMetrics']
11//! - Filesystem metrics of type ['FsIoStats`], supported by Rafs in fuse/virtiofs only.
12
13use std::collections::{HashMap, HashSet};
14use std::ops::{Deref, Drop};
15use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex, RwLock};
17use std::time::{Duration, SystemTime};
18
19use nydus_api::http::MetricsError;
20
21use crate::logger::ErrorHolder;
22use crate::InodeBitmap;
23
24/// Type of `inode`.
25pub type Inode = u64;
26
27/// Type of file operation statistics counter.
28#[derive(PartialEq, Copy, Clone)]
29pub enum StatsFop {
30    Getattr,
31    Readlink,
32    Open,
33    Release,
34    Read,
35    Statfs,
36    Getxattr,
37    Listxattr,
38    Opendir,
39    Lookup,
40    Readdir,
41    Readdirplus,
42    Access,
43    Forget,
44    BatchForget,
45    Max,
46}
47
48type IoStatsResult<T> = Result<T, MetricsError>;
49
50// Block size separated counters.
51// [0-3]: <1K;1K~;4K~;16K~;
52// [5-7]: 64K~;128K~;512K~;1M~
53const BLOCK_READ_SIZES_MAX: usize = 8;
54
55#[inline]
56fn request_size_index(size: usize) -> usize {
57    let ceil = (size >> 10).leading_zeros();
58    let shift = (std::cmp::max(ceil, 53) - 53) << 2;
59
60    (0x0112_2334_5567u64 >> shift) as usize & 0xf
61}
62
63// <=1ms, <=20ms, <=50ms, <=100ms, <=500ms, <=1s, <=2s, >2s
64const READ_LATENCY_RANGE_MAX: usize = 8;
65
66fn latency_millis_range_index(elapsed: u64) -> usize {
67    match elapsed {
68        _ if elapsed <= 1 => 0,
69        _ if elapsed <= 20 => 1,
70        _ if elapsed <= 50 => 2,
71        _ if elapsed <= 100 => 3,
72        _ if elapsed <= 500 => 4,
73        _ if elapsed <= 1000 => 5,
74        _ if elapsed <= 2000 => 6,
75        _ => 7,
76    }
77}
78
79// <=200us, <=1ms, <=20ms, <=50ms, <=500ms, <=1s, <=2s, >2s
80fn latency_micros_range_index(elapsed: u64) -> usize {
81    match elapsed {
82        _ if elapsed <= 200 => 0,
83        _ if elapsed <= 1_000 => 1,
84        _ if elapsed <= 20_000 => 2,
85        _ if elapsed <= 50_000 => 3,
86        _ if elapsed <= 500_000 => 4,
87        _ if elapsed <= 1_000_000 => 5,
88        _ if elapsed <= 2_000_000 => 6,
89        _ => 7,
90    }
91}
92
93// Defining below global static metrics set so that a specific metrics counter can
94// be found as per the rafs backend mountpoint/id. Remind that nydusd can have
95// multiple backends mounted.
96lazy_static! {
97    static ref FS_METRICS: RwLock<HashMap<String, Arc<FsIoStats>>> = Default::default();
98}
99
100lazy_static! {
101    static ref BACKEND_METRICS: RwLock<HashMap<String, Arc<BackendMetrics>>> = Default::default();
102}
103
104lazy_static! {
105    static ref BLOBCACHE_METRICS: RwLock<HashMap<String, Arc<BlobcacheMetrics>>> =
106        Default::default();
107}
108
109lazy_static! {
110    pub static ref ERROR_HOLDER: Arc<Mutex<ErrorHolder>> =
111        Arc::new(Mutex::new(ErrorHolder::new(500, 50 * 1024)));
112}
113
114/// Trait to manipulate per inode statistics metrics.
115pub trait InodeStatsCounter {
116    fn stats_fop_inc(&self, fop: StatsFop);
117    fn stats_fop_err_inc(&self, fop: StatsFop);
118    fn stats_cumulative(&self, fop: StatsFop, value: usize);
119}
120
121/// Per inode io statistics metrics.
122#[derive(Default, Debug, Serialize)]
123pub struct InodeIoStats {
124    total_fops: BasicMetric,
125    data_read: BasicMetric,
126    // Cumulative bytes for different block size.
127    block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
128    fop_hits: [BasicMetric; StatsFop::Max as usize],
129    fop_errors: [BasicMetric; StatsFop::Max as usize],
130}
131
132impl InodeStatsCounter for InodeIoStats {
133    fn stats_fop_inc(&self, fop: StatsFop) {
134        self.fop_hits[fop as usize].inc();
135        self.total_fops.inc();
136    }
137
138    fn stats_fop_err_inc(&self, fop: StatsFop) {
139        self.fop_errors[fop as usize].inc();
140    }
141
142    fn stats_cumulative(&self, fop: StatsFop, value: usize) {
143        if fop == StatsFop::Read {
144            self.data_read.add(value as u64);
145            // Put counters into $BLOCK_READ_COUNT_MAX catagories
146            // 1K; 4K; 16K; 64K, 128K, 512K, 1M
147            let idx = request_size_index(value);
148            self.block_count_read[idx].inc();
149        }
150    }
151}
152
153/// Records how a file is accessed.
154/// For security sake, each file can associate an access pattern recorder, which
155/// is globally configured through nydusd configuration file.
156/// For now, the pattern is composed of:
157///     1. How many times a file is read regardless of io block size and request offset.
158///        And this counter can not be cleared.
159///     2. First time point at which this file is read. It's wall-time in unit of seconds.
160///     3. File path relative to current rafs root.
161///
162/// Yes, we now don't have an abundant pattern recorder now. It can be negotiated in the
163/// future about how to enrich it.
164///
165#[derive(Default, Debug, Serialize)]
166pub struct AccessPattern {
167    ino: u64,
168    nr_read: BasicMetric,
169    /// In unit of seconds.
170    first_access_time_secs: AtomicU64,
171    first_access_time_nanos: AtomicU32,
172}
173
174impl AccessPattern {
175    fn record_access_time(&self) {
176        if self.first_access_time_secs.load(Ordering::Relaxed) == 0 {
177            let t = SystemTime::now()
178                .duration_since(SystemTime::UNIX_EPOCH)
179                .unwrap();
180            self.first_access_time_secs
181                .store(t.as_secs(), Ordering::Relaxed);
182            self.first_access_time_nanos
183                .store(t.subsec_nanos(), Ordering::Relaxed);
184        }
185    }
186}
187
188/// Filesystem level statistics and metrics.
189///
190/// Currently only Rafs in Fuse/Virtiofs mode supports filesystem level statistics and metrics.
191#[derive(Default, Debug, Serialize)]
192pub struct FsIoStats {
193    // Whether to enable each file accounting switch.
194    // As fop accounting might consume much memory space, it is disabled by default.
195    // But global fop accounting is always working within each Rafs.
196    files_account_enabled: AtomicBool,
197    access_pattern_enabled: AtomicBool,
198    record_latest_read_files_enabled: AtomicBool,
199    // Flag to enable record operation latency.
200    measure_latency: AtomicBool,
201
202    id: String,
203    // Total number of files that are currently open.
204    nr_opens: BasicMetric,
205    // Total bytes read against the filesystem.
206    data_read: BasicMetric,
207    // Cumulative bytes for different block size.
208    block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
209    // Counters for successful various file operations.
210    fop_hits: [BasicMetric; StatsFop::Max as usize],
211    // Counters for failed file operations.
212    fop_errors: [BasicMetric; StatsFop::Max as usize],
213
214    // Cumulative latency's life cycle is equivalent to Rafs, unlike incremental
215    // latency which will be cleared each time dumped. Unit as micro-seconds.
216    //   * @total means io_stats simply adds every fop latency to the counter which is never cleared.
217    //     It is useful for other tools to calculate their metrics report.
218    fop_cumulative_latency_total: [BasicMetric; StatsFop::Max as usize],
219    // Record how many times read latency drops to the ranges.
220    // This helps us to understand the io service time stability.
221    read_latency_dist: [BasicMetric; READ_LATENCY_RANGE_MAX],
222
223    // Rwlock closes the race that more than one threads are creating counters concurrently.
224    #[serde(skip_serializing, skip_deserializing)]
225    file_counters: RwLock<HashMap<Inode, Arc<InodeIoStats>>>,
226    #[serde(skip_serializing, skip_deserializing)]
227    access_patterns: RwLock<HashMap<Inode, Arc<AccessPattern>>>,
228    // record regular file read
229    #[serde(skip_serializing, skip_deserializing)]
230    recent_read_files: InodeBitmap,
231}
232
233macro_rules! impl_iostat_option {
234    ($get:ident, $set:ident, $opt:ident) => {
235        #[inline]
236        fn $get(&self) -> bool {
237            self.$opt.load(Ordering::Relaxed)
238        }
239
240        #[inline]
241        pub fn $set(&self, switch: bool) {
242            self.$opt.store(switch, Ordering::Relaxed)
243        }
244    };
245}
246
247impl FsIoStats {
248    /// Create a new instance of [`FsIoStats`] for filesystem `id`.
249    pub fn new(id: &str) -> Arc<FsIoStats> {
250        let c = Arc::new(FsIoStats {
251            id: id.to_string(),
252            ..Default::default()
253        });
254        FS_METRICS
255            .write()
256            .unwrap()
257            .insert(id.to_string(), c.clone());
258        c.init();
259        c
260    }
261
262    /// Initialize the [`FsIoStats`] object.
263    pub fn init(&self) {
264        self.files_account_enabled.store(false, Ordering::Relaxed);
265        self.measure_latency.store(true, Ordering::Relaxed);
266    }
267
268    impl_iostat_option!(files_enabled, toggle_files_recording, files_account_enabled);
269    impl_iostat_option!(
270        access_pattern_enabled,
271        toggle_access_pattern,
272        access_pattern_enabled
273    );
274    impl_iostat_option!(
275        record_latest_read_files_enabled,
276        toggle_latest_read_files_recording,
277        record_latest_read_files_enabled
278    );
279
280    /// Prepare for recording statistics information about `ino`.
281    pub fn new_file_counter(&self, ino: Inode) {
282        if self.files_enabled() {
283            let mut counters = self.file_counters.write().unwrap();
284            if counters.get(&ino).is_none() {
285                counters.insert(ino, Arc::new(InodeIoStats::default()));
286            }
287        }
288
289        if self.access_pattern_enabled() {
290            let mut records = self.access_patterns.write().unwrap();
291            if records.get(&ino).is_none() {
292                records.insert(
293                    ino,
294                    Arc::new(AccessPattern {
295                        ino,
296                        ..Default::default()
297                    }),
298                );
299            }
300        }
301    }
302
303    fn file_stats_update(&self, ino: Inode, fop: StatsFop, bsize: usize, success: bool) {
304        self.fop_update(fop, bsize, success);
305
306        if self.files_enabled() {
307            let counters = self.file_counters.read().unwrap();
308            match counters.get(&ino) {
309                Some(c) => {
310                    c.stats_fop_inc(fop);
311                    c.stats_cumulative(fop, bsize);
312                }
313                None => warn!("No iostats counter for file {}", ino),
314            }
315        }
316
317        if self.access_pattern_enabled() && fop == StatsFop::Read {
318            let records = self.access_patterns.read().unwrap();
319            match records.get(&ino) {
320                Some(r) => {
321                    r.nr_read.inc();
322                    r.record_access_time();
323                }
324                None => warn!("No pattern record for file {}", ino),
325            }
326        }
327
328        if self.record_latest_read_files_enabled() && fop == StatsFop::Read && success {
329            self.recent_read_files.set(ino);
330        }
331    }
332
333    fn fop_update(&self, fop: StatsFop, value: usize, success: bool) {
334        // Linux kernel no longer splits IO into sizes smaller than 128K.
335        // So 512K and 1M is added.
336        // We put block count into 5 catagories e.g. 1K; 4K; 16K; 64K; 128K; 512K; 1M
337        if fop == StatsFop::Read {
338            let idx = request_size_index(value);
339            self.block_count_read[idx].inc()
340        }
341
342        if success {
343            self.fop_hits[fop as usize].inc();
344            match fop {
345                StatsFop::Read => self.data_read.add(value as u64),
346                StatsFop::Open => self.nr_opens.inc(),
347                StatsFop::Release => self.nr_opens.dec(),
348                _ => (),
349            };
350        } else {
351            self.fop_errors[fop as usize].inc();
352        }
353    }
354
355    /// Mark starting of filesystem operation.
356    pub fn latency_start(&self) -> Option<SystemTime> {
357        if !self.measure_latency.load(Ordering::Relaxed) {
358            return None;
359        }
360
361        Some(SystemTime::now())
362    }
363
364    /// Mark ending of filesystem operation and record statistics.
365    pub fn latency_end(&self, start: &Option<SystemTime>, fop: StatsFop) {
366        if let Some(start) = start {
367            if let Ok(d) = SystemTime::elapsed(start) {
368                let elapsed = saturating_duration_micros(&d);
369                self.read_latency_dist[latency_micros_range_index(elapsed)].inc();
370                self.fop_cumulative_latency_total[fop as usize].add(elapsed);
371            }
372        }
373    }
374
375    fn export_files_stats(&self) -> Result<String, MetricsError> {
376        serde_json::to_string(
377            self.file_counters
378                .read()
379                .expect("Not expect poisoned lock")
380                .deref(),
381        )
382        .map_err(MetricsError::Serialize)
383    }
384
385    fn export_latest_read_files(&self) -> String {
386        serde_json::json!(self.recent_read_files.bitmap_to_array_and_clear()).to_string()
387    }
388
389    fn export_files_access_patterns(&self) -> Result<String, MetricsError> {
390        serde_json::to_string(
391            &self
392                .access_patterns
393                .read()
394                .expect("Not poisoned lock")
395                .deref()
396                .values()
397                .filter(|r| r.nr_read.count() != 0)
398                .collect::<Vec<&Arc<AccessPattern>>>(),
399        )
400        .map_err(MetricsError::Serialize)
401    }
402
403    fn export_fs_stats(&self) -> Result<String, MetricsError> {
404        serde_json::to_string(self).map_err(MetricsError::Serialize)
405    }
406}
407
408/// Guard object to record file operation metrics associated with an inode.
409///
410/// Call its `settle()` method to generate an on-stack recorder.
411/// If the operation succeeds, call `mark_success()` to change the recorder's internal state.
412/// If the operation fails, its internal state will not be changed.
413/// Finally, when the recorder is being destroyed, iostats counter will be updated.
414pub struct FopRecorder<'a> {
415    fop: StatsFop,
416    inode: u64,
417    success: bool,
418    // Now, the size only makes sense for `Read` FOP.
419    size: usize,
420    ios: &'a FsIoStats,
421}
422
423impl Drop for FopRecorder<'_> {
424    fn drop(&mut self) {
425        self.ios
426            .file_stats_update(self.inode, self.fop, self.size, self.success);
427    }
428}
429
430impl<'a> FopRecorder<'a> {
431    /// Create a guard object for filesystem operation `fop` associated with `inode`.
432    pub fn settle<T>(fop: StatsFop, inode: u64, ios: &'a T) -> Self
433    where
434        T: AsRef<FsIoStats>,
435    {
436        FopRecorder {
437            fop,
438            inode,
439            success: false,
440            size: 0,
441            ios: ios.as_ref(),
442        }
443    }
444
445    /// Mark operation as success.
446    pub fn mark_success(&mut self, size: usize) {
447        self.success = true;
448        self.size = size;
449    }
450}
451
452/// Export file metrics of a filesystem.
453pub fn export_files_stats(
454    name: &Option<String>,
455    latest_read_files: bool,
456) -> Result<String, MetricsError> {
457    let fs_metrics = FS_METRICS.read().unwrap();
458
459    match name {
460        Some(k) => fs_metrics.get(k).ok_or(MetricsError::NoCounter).map(|v| {
461            if !latest_read_files {
462                v.export_files_stats()
463            } else {
464                Ok(v.export_latest_read_files())
465            }
466        })?,
467        None => {
468            if fs_metrics.len() == 1 {
469                if let Some(ios) = fs_metrics.values().next() {
470                    return if !latest_read_files {
471                        ios.export_files_stats()
472                    } else {
473                        Ok(ios.export_latest_read_files())
474                    };
475                }
476            }
477            Err(MetricsError::NoCounter)
478        }
479    }
480}
481
482/// Export file access pattern of a filesystem.
483pub fn export_files_access_pattern(name: &Option<String>) -> Result<String, MetricsError> {
484    let fs_metrics = FS_METRICS.read().unwrap();
485    match name {
486        Some(k) => fs_metrics
487            .get(k)
488            .ok_or(MetricsError::NoCounter)
489            .map(|v| v.export_files_access_patterns())?,
490        None => {
491            if fs_metrics.len() == 1 {
492                if let Some(ios) = fs_metrics.values().next() {
493                    return ios.export_files_access_patterns();
494                }
495            }
496            Err(MetricsError::NoCounter)
497        }
498    }
499}
500
501/// Export filesystem metrics.
502pub fn export_global_stats(name: &Option<String>) -> Result<String, MetricsError> {
503    // With only one rafs instance, we allow caller to ask for an unknown ios name.
504    let fs_metrics = FS_METRICS.read().unwrap();
505
506    match name {
507        Some(k) => fs_metrics
508            .get(k)
509            .ok_or(MetricsError::NoCounter)
510            .map(|v| v.export_fs_stats())?,
511        None => {
512            if fs_metrics.len() == 1 {
513                if let Some(ios) = fs_metrics.values().next() {
514                    return ios.export_fs_stats();
515                }
516            }
517            Err(MetricsError::NoCounter)
518        }
519    }
520}
521
522/// Export storage backend metrics.
523pub fn export_backend_metrics(name: &Option<String>) -> IoStatsResult<String> {
524    let metrics = BACKEND_METRICS.read().unwrap();
525
526    match name {
527        Some(k) => metrics
528            .get(k)
529            .ok_or(MetricsError::NoCounter)
530            .map(|v| v.export_metrics())?,
531        None => {
532            if metrics.len() == 1 {
533                if let Some(m) = metrics.values().next() {
534                    return m.export_metrics();
535                }
536            }
537            Err(MetricsError::NoCounter)
538        }
539    }
540}
541
542/// Export blob cache metircs.
543pub fn export_blobcache_metrics(id: &Option<String>) -> IoStatsResult<String> {
544    let metrics = BLOBCACHE_METRICS.read().unwrap();
545
546    match id {
547        Some(k) => metrics
548            .get(k)
549            .ok_or(MetricsError::NoCounter)
550            .map(|v| v.export_metrics())?,
551        None => {
552            if metrics.len() == 1 {
553                if let Some(m) = metrics.values().next() {
554                    return m.export_metrics();
555                }
556            }
557            Err(MetricsError::NoCounter)
558        }
559    }
560}
561
562/// Export global error events.
563pub fn export_events() -> IoStatsResult<String> {
564    serde_json::to_string(ERROR_HOLDER.lock().unwrap().deref()).map_err(MetricsError::Serialize)
565}
566
567/// Trait to manipulate metric counters.
568pub trait Metric {
569    /// Adds `value` to the current counter.
570    fn add(&self, value: u64);
571    /// Increments by 1 unit the current counter.
572    fn inc(&self) {
573        self.add(1);
574    }
575    /// Returns current value of the counter.
576    fn count(&self) -> u64;
577    /// Subtract `value` from the current counter.
578    fn sub(&self, value: u64);
579    /// Decrease the current counter.
580    fn dec(&self) {
581        self.sub(1);
582    }
583
584    fn set(&self, value: u64);
585}
586
587/// Basic 64-bit metric counter.
588#[derive(Default, Serialize, Debug)]
589pub struct BasicMetric(AtomicU64);
590
591impl Metric for BasicMetric {
592    fn add(&self, value: u64) {
593        self.0.fetch_add(value, Ordering::Relaxed);
594    }
595
596    fn count(&self) -> u64 {
597        self.0.load(Ordering::Relaxed)
598    }
599
600    fn sub(&self, value: u64) {
601        self.0.fetch_sub(value, Ordering::Relaxed);
602    }
603
604    fn set(&self, value: u64) {
605        self.0.store(value, Ordering::Relaxed);
606    }
607}
608
609/// Metrics for storage backends.
610#[derive(Default, Serialize, Debug)]
611pub struct BackendMetrics {
612    #[serde(skip_serializing, skip_deserializing)]
613    id: String,
614    // type of storage backend.
615    backend_type: String,
616    // Cumulative count of read request to backend
617    read_count: BasicMetric,
618    // Cumulative count of read failure to backend
619    read_errors: BasicMetric,
620    // Cumulative amount of data from to backend in unit of Byte. External tools
621    // are responsible for calculating BPS from this field.
622    read_amount_total: BasicMetric,
623    // In unit of millisecond
624    read_cumulative_latency_millis_total: BasicMetric,
625    read_cumulative_latency_millis_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
626    read_count_block_size_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
627    // Categorize metrics as per their latency and request size
628    read_latency_sizes_dist: [[BasicMetric; READ_LATENCY_RANGE_MAX]; BLOCK_READ_SIZES_MAX],
629}
630
631impl BackendMetrics {
632    /// Create a [`BackendMetrics`] object for a storage backend.
633    pub fn new(id: &str, backend_type: &str) -> Arc<Self> {
634        let backend_metrics = Arc::new(Self {
635            id: id.to_string(),
636            backend_type: backend_type.to_string(),
637            ..Default::default()
638        });
639
640        BACKEND_METRICS
641            .write()
642            .unwrap()
643            .insert(id.to_string(), backend_metrics.clone());
644
645        backend_metrics
646    }
647
648    /// Release a [`BackendMetrics`] object for a storage backend.
649    pub fn release(&self) -> IoStatsResult<()> {
650        BACKEND_METRICS
651            .write()
652            .unwrap()
653            .remove(&self.id)
654            .map(|_| ())
655            .ok_or(MetricsError::NoCounter)
656    }
657
658    /// Mark starting of an IO operations.
659    pub fn begin(&self) -> SystemTime {
660        SystemTime::now()
661    }
662
663    /// Mark ending of an IO operations.
664    pub fn end(&self, begin: &SystemTime, size: usize, error: bool) {
665        if let Ok(d) = SystemTime::elapsed(begin) {
666            let elapsed = saturating_duration_millis(&d);
667
668            self.read_count.inc();
669            if error {
670                self.read_errors.inc();
671            }
672
673            self.read_cumulative_latency_millis_total.add(elapsed);
674            self.read_amount_total.add(size as u64);
675            let lat_idx = latency_millis_range_index(elapsed);
676            let size_idx = request_size_index(size);
677            self.read_cumulative_latency_millis_dist[size_idx].add(elapsed);
678            self.read_count_block_size_dist[size_idx].inc();
679            self.read_latency_sizes_dist[size_idx][lat_idx].inc();
680        }
681    }
682
683    fn export_metrics(&self) -> IoStatsResult<String> {
684        serde_json::to_string(self).map_err(MetricsError::Serialize)
685    }
686}
687
688// This function assumes that the counted duration won't be too long.
689fn saturating_duration_millis(d: &Duration) -> u64 {
690    let d_secs = d.as_secs();
691    if d_secs == 0 {
692        d.subsec_millis() as u64
693    } else {
694        d_secs
695            .saturating_mul(1000)
696            .saturating_add(d.subsec_millis() as u64)
697    }
698}
699
700fn saturating_duration_micros(d: &Duration) -> u64 {
701    let d_secs = d.as_secs();
702    if d_secs == 0 {
703        d.subsec_micros() as u64
704    } else {
705        d_secs
706            .saturating_mul(1_000_000)
707            .saturating_add(d.subsec_micros() as u64)
708    }
709}
710
711#[derive(Debug, Default, Serialize)]
712pub struct BlobcacheMetrics {
713    #[serde(skip_serializing, skip_deserializing)]
714    id: String,
715    // Prefer to let external tool get file's state like file size and disk usage.
716    // Because stat(2) file may get blocked.
717    // It should include the real blob cache file names, so that the external GC
718    // process can handle it directly.
719    pub underlying_files: Mutex<HashSet<String>>,
720    pub store_path: String,
721    // Cache hit percentage = (partial_hits + whole_hits) / total
722    pub partial_hits: BasicMetric,
723    pub whole_hits: BasicMetric,
724    // How many `read` requests are processed by the blobcache instance.
725    // This metric will be helpful when comparing with cache hits times.
726    pub total: BasicMetric,
727    // Scale of blobcache. Blobcache does not evict entries.
728    // Means the number of chunks in ready status.
729    pub entries_count: BasicMetric,
730    // Together with below two fields, we can figure out average merging size thus
731    // to estimate the possibility to merge backend IOs.
732    // In unit of Bytes
733    pub prefetch_data_amount: BasicMetric,
734    // Total prefetch requests issued from storage/blobs or rafs filesystem layer for each file that needs prefetch
735    pub prefetch_requests_count: BasicMetric,
736    pub prefetch_workers: AtomicUsize,
737    pub prefetch_unmerged_chunks: BasicMetric,
738    // Cumulative time latencies of each prefetch request which can be handled in parallel.
739    // It starts when the request is born including nydusd processing and schedule and end when the chunk is downloaded and stored.
740    // Then the average prefetch latency can be calculated by
741    // `prefetch_cumulative_time_millis / prefetch_requests_count`
742    pub prefetch_cumulative_time_millis: BasicMetric,
743    // The time seconds part when nydusd begins to prefetch
744    // We can calculate prefetch average bandwidth by
745    // `prefetch_data_amount / (prefetch_end_time_secs - prefetch_begin_time_secs)`. Note, it does not take milliseconds into account yet.s
746    pub prefetch_begin_time_secs: BasicMetric,
747    // The time milliseconds part when nydusd begins to prefetch
748    pub prefetch_begin_time_millis: BasicMetric,
749    // The time seconds part when nydusd ends prefetching
750    pub prefetch_end_time_secs: BasicMetric,
751    // The time milliseconds part when nydusd ends prefetching
752    pub prefetch_end_time_millis: BasicMetric,
753    pub buffered_backend_size: BasicMetric,
754    pub data_all_ready: AtomicBool,
755}
756
757impl BlobcacheMetrics {
758    /// Create a [`BlobcacheMetrics`] object for a blob cache manager.
759    pub fn new(id: &str, store_path: &str) -> Arc<Self> {
760        let metrics = Arc::new(Self {
761            id: id.to_string(),
762            store_path: store_path.to_string(),
763            ..Default::default()
764        });
765
766        // Old metrics will be dropped when BlobCache is swapped. So we don't
767        // have to worry about swapping its metrics either which means it's
768        // not necessary to release metrics recorder when blobcache is dropped due to swapping.
769        BLOBCACHE_METRICS
770            .write()
771            .unwrap()
772            .insert(id.to_string(), metrics.clone());
773
774        metrics
775    }
776
777    /// Release a [`BlobcacheMetrics`] object for a blob cache manager.
778    pub fn release(&self) -> IoStatsResult<()> {
779        BLOBCACHE_METRICS
780            .write()
781            .unwrap()
782            .remove(&self.id)
783            .map(|_| ())
784            .ok_or(MetricsError::NoCounter)
785    }
786
787    /// Export blobcache metric information.
788    pub fn export_metrics(&self) -> IoStatsResult<String> {
789        serde_json::to_string(self).map_err(MetricsError::Serialize)
790    }
791
792    pub fn calculate_prefetch_metrics(&self, begin_time: SystemTime) {
793        let now = SystemTime::now();
794        if let Ok(ref t) = now.duration_since(SystemTime::UNIX_EPOCH) {
795            self.prefetch_end_time_secs.set(t.as_secs());
796            self.prefetch_end_time_millis.set(t.subsec_millis() as u64);
797        }
798        if let Ok(ref t) = now.duration_since(begin_time) {
799            let elapsed = saturating_duration_millis(t);
800            self.prefetch_cumulative_time_millis.add(elapsed);
801        }
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808
809    #[test]
810    fn test_request_size_index() {
811        assert_eq!(request_size_index(0x0), 0);
812        assert_eq!(request_size_index(0x3ff), 0);
813        assert_eq!(request_size_index(0x400), 1);
814        assert_eq!(request_size_index(0xfff), 1);
815        assert_eq!(request_size_index(0x1000), 2);
816        assert_eq!(request_size_index(0x3fff), 2);
817        assert_eq!(request_size_index(0x4000), 3);
818        assert_eq!(request_size_index(0xffff), 3);
819        assert_eq!(request_size_index(0x1_0000), 4);
820        assert_eq!(request_size_index(0x1_ffff), 4);
821        assert_eq!(request_size_index(0x2_0000), 5);
822        assert_eq!(request_size_index(0x7_ffff), 5);
823        assert_eq!(request_size_index(0x8_0000), 6);
824        assert_eq!(request_size_index(0xf_ffff), 6);
825        assert_eq!(request_size_index(0x10_0000), 7);
826        assert_eq!(request_size_index(usize::MAX), 7);
827    }
828
829    #[test]
830    fn test_block_read_count() {
831        let g = FsIoStats::default();
832        g.init();
833        g.fop_update(StatsFop::Read, 4000, true);
834        assert_eq!(g.block_count_read[1].count(), 1);
835
836        g.fop_update(StatsFop::Read, 4096, true);
837        assert_eq!(g.block_count_read[2].count(), 1);
838
839        g.fop_update(StatsFop::Read, 65535, true);
840        assert_eq!(g.block_count_read[3].count(), 1);
841
842        g.fop_update(StatsFop::Read, 131072, true);
843        assert_eq!(g.block_count_read[5].count(), 1);
844
845        g.fop_update(StatsFop::Read, 65520, true);
846        assert_eq!(g.block_count_read[3].count(), 2);
847
848        g.fop_update(StatsFop::Read, 2015520, true);
849        assert_eq!(g.block_count_read[3].count(), 2);
850    }
851
852    #[test]
853    fn test_latency_millis_range_index() {
854        assert_eq!(latency_millis_range_index(0), 0);
855        assert_eq!(latency_millis_range_index(1), 0);
856        assert_eq!(latency_millis_range_index(10), 1);
857        assert_eq!(latency_millis_range_index(20), 1);
858        assert_eq!(latency_millis_range_index(40), 2);
859        assert_eq!(latency_millis_range_index(80), 3);
860        assert_eq!(latency_millis_range_index(160), 4);
861        assert_eq!(latency_millis_range_index(320), 4);
862        assert_eq!(latency_millis_range_index(640), 5);
863        assert_eq!(latency_millis_range_index(1280), 6);
864        assert_eq!(latency_millis_range_index(2560), 7);
865    }
866
867    #[test]
868    fn test_latency_micros_range_index() {
869        assert_eq!(latency_micros_range_index(100), 0);
870        assert_eq!(latency_micros_range_index(500), 1);
871        assert_eq!(latency_micros_range_index(10_000), 2);
872        assert_eq!(latency_micros_range_index(30_000), 3);
873        assert_eq!(latency_micros_range_index(100_000), 4);
874        assert_eq!(latency_micros_range_index(1_000_000), 5);
875        assert_eq!(latency_micros_range_index(1_500_000), 6);
876        assert_eq!(latency_micros_range_index(3_000_000), 7);
877    }
878
879    #[test]
880    fn test_inode_stats() {
881        let stat = InodeIoStats::default();
882        stat.stats_fop_inc(StatsFop::Read);
883        stat.stats_fop_inc(StatsFop::Open);
884        assert_eq!(stat.fop_hits[StatsFop::Read as usize].count(), 1);
885        assert_eq!(stat.total_fops.count(), 2);
886
887        stat.stats_cumulative(StatsFop::Open, 1000);
888        stat.stats_cumulative(StatsFop::Read, 4000);
889        stat.stats_cumulative(StatsFop::Read, 5000);
890
891        assert_eq!(stat.block_count_read[0].count(), 0);
892        assert_eq!(stat.block_count_read[1].count(), 1);
893        assert_eq!(stat.block_count_read[2].count(), 1);
894    }
895
896    #[test]
897    fn test_access_pattern() {
898        let ap = AccessPattern::default();
899        ap.record_access_time();
900        assert_ne!(ap.first_access_time_secs.load(Ordering::Relaxed), 0);
901        assert_ne!(ap.first_access_time_nanos.load(Ordering::Relaxed), 0);
902    }
903
904    #[test]
905    fn test_file_stats_update() {
906        let f = FsIoStats::default();
907        let node1: Inode = 1;
908        let node2: Inode = 2;
909        let node3: Inode = 3;
910
911        f.new_file_counter(node1);
912        f.new_file_counter(node2);
913        assert!(f.access_patterns.read().unwrap().is_empty());
914        assert!(f.file_counters.read().unwrap().is_empty());
915
916        f.access_pattern_enabled.store(true, Ordering::Relaxed);
917        f.files_account_enabled.store(true, Ordering::Relaxed);
918        f.record_latest_read_files_enabled
919            .store(true, Ordering::Relaxed);
920        f.new_file_counter(node1);
921        f.new_file_counter(node2);
922        f.file_stats_update(node1, StatsFop::Read, 4000, true);
923        f.file_stats_update(node1, StatsFop::Read, 5000, true);
924        f.file_stats_update(node1, StatsFop::Open, 0, true);
925        f.file_stats_update(node3, StatsFop::Open, 0, true);
926        assert_eq!(
927            f.access_patterns
928                .read()
929                .unwrap()
930                .get(&node1)
931                .unwrap()
932                .nr_read
933                .count(),
934            2
935        );
936        assert_eq!(
937            f.file_counters
938                .read()
939                .unwrap()
940                .get(&node1)
941                .unwrap()
942                .fop_hits[StatsFop::Read as usize]
943                .count(),
944            2
945        );
946        assert!(f.recent_read_files.is_set(node1 as u64));
947    }
948
949    #[test]
950    fn test_fop_update() {
951        let f = FsIoStats::default();
952        assert_eq!(f.nr_opens.count(), 0);
953        f.fop_update(StatsFop::Open, 0, true);
954        assert_eq!(f.nr_opens.count(), 1);
955        f.fop_update(StatsFop::Release, 0, true);
956        assert_eq!(f.nr_opens.count(), 0);
957        f.fop_update(StatsFop::Opendir, 0, true);
958        assert_eq!(f.fop_errors[StatsFop::Opendir as usize].count(), 0);
959        f.fop_update(StatsFop::Opendir, 0, false);
960        assert_eq!(f.fop_errors[StatsFop::Opendir as usize].count(), 1);
961    }
962
963    #[test]
964    fn test_latecny() {
965        let f = FsIoStats::default();
966        assert_eq!(f.latency_start(), None);
967        f.measure_latency.store(true, Ordering::Relaxed);
968        let s = f.latency_start().unwrap();
969        let d = Duration::new(1, 500_000_000);
970        /* because of the timer resolution, the elapsed maybe greater than 1.5s gentlely*/
971        f.latency_end(&s.checked_sub(d), StatsFop::Read);
972        assert_eq!(
973            f.read_latency_dist[latency_micros_range_index(1_500_000)].count(),
974            1
975        );
976        /* we think if the latency delta error no more 1ms, the test is successful. */
977        assert!(
978            f.fop_cumulative_latency_total[StatsFop::Read as usize].count()
979                - saturating_duration_micros(&d)
980                <= 1000
981        );
982    }
983
984    #[test]
985    fn test_fs_io_stats_new_and_export() {
986        let id0: Option<String> = Some("id-0".to_string());
987        let id1: Option<String> = Some("id-1".to_string());
988        let none: Option<String> = None;
989
990        let _f1 = FsIoStats::new("id-0");
991        assert!(export_files_stats(&id0, true).is_ok());
992        assert!(export_files_stats(&none, true).is_ok());
993        assert!(export_global_stats(&id0).is_ok());
994        assert!(export_global_stats(&id1).is_err());
995        assert!(export_global_stats(&none).is_ok());
996
997        let _f2 = FsIoStats::new("id-1");
998        assert!(export_files_stats(&none, false).is_err());
999        assert!(export_files_stats(&id0, true).is_ok());
1000        assert!(export_files_stats(&id0, false).is_ok());
1001        assert!(export_global_stats(&none).is_err());
1002        assert!(export_files_access_pattern(&id0).is_ok());
1003        assert!(export_files_access_pattern(&none).is_err());
1004
1005        let ios = FsIoStats::default();
1006        assert!(ios.export_files_access_patterns().is_ok());
1007        assert!(ios.export_files_stats().is_ok());
1008        assert!(ios.export_fs_stats().is_ok());
1009        ios.export_latest_read_files();
1010
1011        test_fop_record();
1012    }
1013
1014    fn test_fop_record() {
1015        let ios = FsIoStats::new("0");
1016        let mut recorder = FopRecorder::settle(StatsFop::Read, 0, &ios);
1017        assert!(!recorder.success);
1018        assert_eq!(recorder.size, 0);
1019
1020        recorder.mark_success(10);
1021        assert!(recorder.success);
1022        assert_eq!(recorder.size, 10);
1023        drop(recorder);
1024    }
1025
1026    #[test]
1027    fn test_saturating_duration() {
1028        assert_eq!(
1029            saturating_duration_millis(&Duration::from_millis(1234)),
1030            1234
1031        );
1032        assert_eq!(
1033            saturating_duration_micros(&Duration::from_millis(888)),
1034            888_000
1035        );
1036        assert_eq!(
1037            saturating_duration_micros(&Duration::from_millis(1888)),
1038            1_888_000
1039        );
1040    }
1041
1042    #[test]
1043    fn test_blob_cache_metric() {
1044        let m1: Arc<BlobcacheMetrics> = BlobcacheMetrics::new("id", "path");
1045        {
1046            let metrics = BLOBCACHE_METRICS.read().unwrap();
1047            assert_eq!(metrics.len(), 1);
1048        }
1049        assert!(m1.export_metrics().is_ok());
1050        assert!(m1.release().is_ok());
1051        {
1052            let metrics = BLOBCACHE_METRICS.read().unwrap();
1053            assert_eq!(metrics.len(), 0);
1054        }
1055
1056        let now = SystemTime::now();
1057        let prev = now.checked_sub(Duration::new(10, 0)).unwrap();
1058        m1.calculate_prefetch_metrics(prev);
1059        assert_eq!(m1.prefetch_cumulative_time_millis.count(), 10_000);
1060        assert_eq!(
1061            m1.prefetch_end_time_secs.count(),
1062            now.duration_since(SystemTime::UNIX_EPOCH)
1063                .expect("No error")
1064                .as_secs()
1065        );
1066
1067        let id0: Option<String> = Some("id-0".to_string());
1068        let none: Option<String> = None;
1069        BlobcacheMetrics::new("id-0", "t0");
1070        assert!(export_blobcache_metrics(&id0).is_ok());
1071        assert!(export_blobcache_metrics(&none).is_ok());
1072        BlobcacheMetrics::new("id-1", "t1");
1073        assert!(export_blobcache_metrics(&none).is_err());
1074        assert!(export_events().is_ok());
1075    }
1076
1077    #[test]
1078    fn test_backend_metric() {
1079        let id0: Option<String> = Some("id-0".to_string());
1080        let id1: Option<String> = Some("id-1".to_string());
1081        let none: Option<String> = None;
1082        let b0 = BackendMetrics::new("id-0", "t0");
1083        assert!(export_backend_metrics(&id0).is_ok());
1084        assert!(export_backend_metrics(&id1).is_err());
1085        assert!(export_backend_metrics(&none).is_ok());
1086        let b1 = BackendMetrics::new("id-1", "t1");
1087        assert!(export_backend_metrics(&id0).is_ok());
1088        assert!(export_backend_metrics(&id1).is_ok());
1089        assert!(export_backend_metrics(&none).is_err());
1090        assert!(b0.release().is_ok());
1091        assert!(b1.release().is_ok());
1092    }
1093}