use std::collections::{HashMap, HashSet};
use std::ops::{Deref, Drop};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use nydus_api::http::MetricsError;
use crate::logger::ErrorHolder;
use crate::InodeBitmap;
pub type Inode = u64;
#[derive(PartialEq, Copy, Clone)]
pub enum StatsFop {
Getattr,
Readlink,
Open,
Release,
Read,
Statfs,
Getxattr,
Listxattr,
Opendir,
Lookup,
Readdir,
Readdirplus,
Access,
Forget,
BatchForget,
Max,
}
type IoStatsResult<T> = Result<T, MetricsError>;
const BLOCK_READ_SIZES_MAX: usize = 8;
#[inline]
fn request_size_index(size: usize) -> usize {
let ceil = (size >> 10).leading_zeros();
let shift = (std::cmp::max(ceil, 53) - 53) << 2;
(0x0112_2334_5567u64 >> shift) as usize & 0xf
}
const READ_LATENCY_RANGE_MAX: usize = 8;
fn latency_millis_range_index(elapsed: u64) -> usize {
match elapsed {
_ if elapsed <= 1 => 0,
_ if elapsed <= 20 => 1,
_ if elapsed <= 50 => 2,
_ if elapsed <= 100 => 3,
_ if elapsed <= 500 => 4,
_ if elapsed <= 1000 => 5,
_ if elapsed <= 2000 => 6,
_ => 7,
}
}
fn latency_micros_range_index(elapsed: u64) -> usize {
match elapsed {
_ if elapsed <= 200 => 0,
_ if elapsed <= 1_000 => 1,
_ if elapsed <= 20_000 => 2,
_ if elapsed <= 50_000 => 3,
_ if elapsed <= 500_000 => 4,
_ if elapsed <= 1_000_000 => 5,
_ if elapsed <= 2_000_000 => 6,
_ => 7,
}
}
lazy_static! {
static ref FS_METRICS: RwLock<HashMap<String, Arc<FsIoStats>>> = Default::default();
}
lazy_static! {
static ref BACKEND_METRICS: RwLock<HashMap<String, Arc<BackendMetrics>>> = Default::default();
}
lazy_static! {
static ref BLOBCACHE_METRICS: RwLock<HashMap<String, Arc<BlobcacheMetrics>>> =
Default::default();
}
lazy_static! {
pub static ref ERROR_HOLDER: Arc<Mutex<ErrorHolder>> =
Arc::new(Mutex::new(ErrorHolder::new(500, 50 * 1024)));
}
pub trait InodeStatsCounter {
fn stats_fop_inc(&self, fop: StatsFop);
fn stats_fop_err_inc(&self, fop: StatsFop);
fn stats_cumulative(&self, fop: StatsFop, value: usize);
}
#[derive(Default, Debug, Serialize)]
pub struct InodeIoStats {
total_fops: BasicMetric,
data_read: BasicMetric,
block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
fop_hits: [BasicMetric; StatsFop::Max as usize],
fop_errors: [BasicMetric; StatsFop::Max as usize],
}
impl InodeStatsCounter for InodeIoStats {
fn stats_fop_inc(&self, fop: StatsFop) {
self.fop_hits[fop as usize].inc();
self.total_fops.inc();
}
fn stats_fop_err_inc(&self, fop: StatsFop) {
self.fop_errors[fop as usize].inc();
}
fn stats_cumulative(&self, fop: StatsFop, value: usize) {
if fop == StatsFop::Read {
self.data_read.add(value as u64);
let idx = request_size_index(value);
self.block_count_read[idx].inc();
}
}
}
#[derive(Default, Debug, Serialize)]
pub struct AccessPattern {
ino: u64,
nr_read: BasicMetric,
first_access_time_secs: AtomicU64,
first_access_time_nanos: AtomicU32,
}
impl AccessPattern {
fn record_access_time(&self) {
if self.first_access_time_secs.load(Ordering::Relaxed) == 0 {
let t = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
self.first_access_time_secs
.store(t.as_secs(), Ordering::Relaxed);
self.first_access_time_nanos
.store(t.subsec_nanos(), Ordering::Relaxed);
}
}
}
#[derive(Default, Debug, Serialize)]
pub struct FsIoStats {
files_account_enabled: AtomicBool,
access_pattern_enabled: AtomicBool,
record_latest_read_files_enabled: AtomicBool,
measure_latency: AtomicBool,
id: String,
nr_opens: BasicMetric,
data_read: BasicMetric,
block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
fop_hits: [BasicMetric; StatsFop::Max as usize],
fop_errors: [BasicMetric; StatsFop::Max as usize],
fop_cumulative_latency_total: [BasicMetric; StatsFop::Max as usize],
read_latency_dist: [BasicMetric; READ_LATENCY_RANGE_MAX],
#[serde(skip_serializing, skip_deserializing)]
file_counters: RwLock<HashMap<Inode, Arc<InodeIoStats>>>,
#[serde(skip_serializing, skip_deserializing)]
access_patterns: RwLock<HashMap<Inode, Arc<AccessPattern>>>,
#[serde(skip_serializing, skip_deserializing)]
recent_read_files: InodeBitmap,
}
macro_rules! impl_iostat_option {
($get:ident, $set:ident, $opt:ident) => {
#[inline]
fn $get(&self) -> bool {
self.$opt.load(Ordering::Relaxed)
}
#[inline]
pub fn $set(&self, switch: bool) {
self.$opt.store(switch, Ordering::Relaxed)
}
};
}
impl FsIoStats {
pub fn new(id: &str) -> Arc<FsIoStats> {
let c = Arc::new(FsIoStats {
id: id.to_string(),
..Default::default()
});
FS_METRICS
.write()
.unwrap()
.insert(id.to_string(), c.clone());
c.init();
c
}
pub fn init(&self) {
self.files_account_enabled.store(false, Ordering::Relaxed);
self.measure_latency.store(true, Ordering::Relaxed);
}
impl_iostat_option!(files_enabled, toggle_files_recording, files_account_enabled);
impl_iostat_option!(
access_pattern_enabled,
toggle_access_pattern,
access_pattern_enabled
);
impl_iostat_option!(
record_latest_read_files_enabled,
toggle_latest_read_files_recording,
record_latest_read_files_enabled
);
pub fn new_file_counter(&self, ino: Inode) {
if self.files_enabled() {
let mut counters = self.file_counters.write().unwrap();
if counters.get(&ino).is_none() {
counters.insert(ino, Arc::new(InodeIoStats::default()));
}
}
if self.access_pattern_enabled() {
let mut records = self.access_patterns.write().unwrap();
if records.get(&ino).is_none() {
records.insert(
ino,
Arc::new(AccessPattern {
ino,
..Default::default()
}),
);
}
}
}
fn file_stats_update(&self, ino: Inode, fop: StatsFop, bsize: usize, success: bool) {
self.fop_update(fop, bsize, success);
if self.files_enabled() {
let counters = self.file_counters.read().unwrap();
match counters.get(&ino) {
Some(c) => {
c.stats_fop_inc(fop);
c.stats_cumulative(fop, bsize);
}
None => warn!("No iostats counter for file {}", ino),
}
}
if self.access_pattern_enabled() && fop == StatsFop::Read {
let records = self.access_patterns.read().unwrap();
match records.get(&ino) {
Some(r) => {
r.nr_read.inc();
r.record_access_time();
}
None => warn!("No pattern record for file {}", ino),
}
}
if self.record_latest_read_files_enabled() && fop == StatsFop::Read && success {
self.recent_read_files.set(ino);
}
}
fn fop_update(&self, fop: StatsFop, value: usize, success: bool) {
if fop == StatsFop::Read {
let idx = request_size_index(value);
self.block_count_read[idx].inc()
}
if success {
self.fop_hits[fop as usize].inc();
match fop {
StatsFop::Read => self.data_read.add(value as u64),
StatsFop::Open => self.nr_opens.inc(),
StatsFop::Release => self.nr_opens.dec(),
_ => (),
};
} else {
self.fop_errors[fop as usize].inc();
}
}
pub fn latency_start(&self) -> Option<SystemTime> {
if !self.measure_latency.load(Ordering::Relaxed) {
return None;
}
Some(SystemTime::now())
}
pub fn latency_end(&self, start: &Option<SystemTime>, fop: StatsFop) {
if let Some(start) = start {
if let Ok(d) = SystemTime::elapsed(start) {
let elapsed = saturating_duration_micros(&d);
self.read_latency_dist[latency_micros_range_index(elapsed)].inc();
self.fop_cumulative_latency_total[fop as usize].add(elapsed);
}
}
}
fn export_files_stats(&self) -> Result<String, MetricsError> {
serde_json::to_string(
self.file_counters
.read()
.expect("Not expect poisoned lock")
.deref(),
)
.map_err(MetricsError::Serialize)
}
fn export_latest_read_files(&self) -> String {
serde_json::json!(self.recent_read_files.bitmap_to_array_and_clear()).to_string()
}
fn export_files_access_patterns(&self) -> Result<String, MetricsError> {
serde_json::to_string(
&self
.access_patterns
.read()
.expect("Not poisoned lock")
.deref()
.values()
.filter(|r| r.nr_read.count() != 0)
.collect::<Vec<&Arc<AccessPattern>>>(),
)
.map_err(MetricsError::Serialize)
}
fn export_fs_stats(&self) -> Result<String, MetricsError> {
serde_json::to_string(self).map_err(MetricsError::Serialize)
}
}
pub struct FopRecorder<'a> {
fop: StatsFop,
inode: u64,
success: bool,
size: usize,
ios: &'a FsIoStats,
}
impl<'a> Drop for FopRecorder<'a> {
fn drop(&mut self) {
self.ios
.file_stats_update(self.inode, self.fop, self.size, self.success);
}
}
impl<'a> FopRecorder<'a> {
pub fn settle<T>(fop: StatsFop, inode: u64, ios: &'a T) -> Self
where
T: AsRef<FsIoStats>,
{
FopRecorder {
fop,
inode,
success: false,
size: 0,
ios: ios.as_ref(),
}
}
pub fn mark_success(&mut self, size: usize) {
self.success = true;
self.size = size;
}
}
pub fn export_files_stats(
name: &Option<String>,
latest_read_files: bool,
) -> Result<String, MetricsError> {
let fs_metrics = FS_METRICS.read().unwrap();
match name {
Some(k) => fs_metrics.get(k).ok_or(MetricsError::NoCounter).map(|v| {
if !latest_read_files {
v.export_files_stats()
} else {
Ok(v.export_latest_read_files())
}
})?,
None => {
if fs_metrics.len() == 1 {
if let Some(ios) = fs_metrics.values().next() {
return if !latest_read_files {
ios.export_files_stats()
} else {
Ok(ios.export_latest_read_files())
};
}
}
Err(MetricsError::NoCounter)
}
}
}
pub fn export_files_access_pattern(name: &Option<String>) -> Result<String, MetricsError> {
let fs_metrics = FS_METRICS.read().unwrap();
match name {
Some(k) => fs_metrics
.get(k)
.ok_or(MetricsError::NoCounter)
.map(|v| v.export_files_access_patterns())?,
None => {
if fs_metrics.len() == 1 {
if let Some(ios) = fs_metrics.values().next() {
return ios.export_files_access_patterns();
}
}
Err(MetricsError::NoCounter)
}
}
}
pub fn export_global_stats(name: &Option<String>) -> Result<String, MetricsError> {
let fs_metrics = FS_METRICS.read().unwrap();
match name {
Some(k) => fs_metrics
.get(k)
.ok_or(MetricsError::NoCounter)
.map(|v| v.export_fs_stats())?,
None => {
if fs_metrics.len() == 1 {
if let Some(ios) = fs_metrics.values().next() {
return ios.export_fs_stats();
}
}
Err(MetricsError::NoCounter)
}
}
}
pub fn export_backend_metrics(name: &Option<String>) -> IoStatsResult<String> {
let metrics = BACKEND_METRICS.read().unwrap();
match name {
Some(k) => metrics
.get(k)
.ok_or(MetricsError::NoCounter)
.map(|v| v.export_metrics())?,
None => {
if metrics.len() == 1 {
if let Some(m) = metrics.values().next() {
return m.export_metrics();
}
}
Err(MetricsError::NoCounter)
}
}
}
pub fn export_blobcache_metrics(id: &Option<String>) -> IoStatsResult<String> {
let metrics = BLOBCACHE_METRICS.read().unwrap();
match id {
Some(k) => metrics
.get(k)
.ok_or(MetricsError::NoCounter)
.map(|v| v.export_metrics())?,
None => {
if metrics.len() == 1 {
if let Some(m) = metrics.values().next() {
return m.export_metrics();
}
}
Err(MetricsError::NoCounter)
}
}
}
pub fn export_events() -> IoStatsResult<String> {
serde_json::to_string(ERROR_HOLDER.lock().unwrap().deref()).map_err(MetricsError::Serialize)
}
pub trait Metric {
fn add(&self, value: u64);
fn inc(&self) {
self.add(1);
}
fn count(&self) -> u64;
fn sub(&self, value: u64);
fn dec(&self) {
self.sub(1);
}
fn set(&self, value: u64);
}
#[derive(Default, Serialize, Debug)]
pub struct BasicMetric(AtomicU64);
impl Metric for BasicMetric {
fn add(&self, value: u64) {
self.0.fetch_add(value, Ordering::Relaxed);
}
fn count(&self) -> u64 {
self.0.load(Ordering::Relaxed)
}
fn sub(&self, value: u64) {
self.0.fetch_sub(value, Ordering::Relaxed);
}
fn set(&self, value: u64) {
self.0.store(value, Ordering::Relaxed);
}
}
#[derive(Default, Serialize, Debug)]
pub struct BackendMetrics {
#[serde(skip_serializing, skip_deserializing)]
id: String,
backend_type: String,
read_count: BasicMetric,
read_errors: BasicMetric,
read_amount_total: BasicMetric,
read_cumulative_latency_millis_total: BasicMetric,
read_cumulative_latency_millis_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
read_count_block_size_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
read_latency_sizes_dist: [[BasicMetric; READ_LATENCY_RANGE_MAX]; BLOCK_READ_SIZES_MAX],
}
impl BackendMetrics {
pub fn new(id: &str, backend_type: &str) -> Arc<Self> {
let backend_metrics = Arc::new(Self {
id: id.to_string(),
backend_type: backend_type.to_string(),
..Default::default()
});
BACKEND_METRICS
.write()
.unwrap()
.insert(id.to_string(), backend_metrics.clone());
backend_metrics
}
pub fn release(&self) -> IoStatsResult<()> {
BACKEND_METRICS
.write()
.unwrap()
.remove(&self.id)
.map(|_| ())
.ok_or(MetricsError::NoCounter)
}
pub fn begin(&self) -> SystemTime {
SystemTime::now()
}
pub fn end(&self, begin: &SystemTime, size: usize, error: bool) {
if let Ok(d) = SystemTime::elapsed(begin) {
let elapsed = saturating_duration_millis(&d);
self.read_count.inc();
if error {
self.read_errors.inc();
}
self.read_cumulative_latency_millis_total.add(elapsed);
self.read_amount_total.add(size as u64);
let lat_idx = latency_millis_range_index(elapsed);
let size_idx = request_size_index(size);
self.read_cumulative_latency_millis_dist[size_idx].add(elapsed);
self.read_count_block_size_dist[size_idx].inc();
self.read_latency_sizes_dist[size_idx][lat_idx].inc();
}
}
fn export_metrics(&self) -> IoStatsResult<String> {
serde_json::to_string(self).map_err(MetricsError::Serialize)
}
}
fn saturating_duration_millis(d: &Duration) -> u64 {
let d_secs = d.as_secs();
if d_secs == 0 {
d.subsec_millis() as u64
} else {
d_secs
.saturating_mul(1000)
.saturating_add(d.subsec_millis() as u64)
}
}
fn saturating_duration_micros(d: &Duration) -> u64 {
let d_secs = d.as_secs();
if d_secs == 0 {
d.subsec_micros() as u64
} else {
d_secs
.saturating_mul(1_000_000)
.saturating_add(d.subsec_micros() as u64)
}
}
#[derive(Debug, Default, Serialize)]
pub struct BlobcacheMetrics {
#[serde(skip_serializing, skip_deserializing)]
id: String,
pub underlying_files: Mutex<HashSet<String>>,
pub store_path: String,
pub partial_hits: BasicMetric,
pub whole_hits: BasicMetric,
pub total: BasicMetric,
pub entries_count: BasicMetric,
pub prefetch_data_amount: BasicMetric,
pub prefetch_requests_count: BasicMetric,
pub prefetch_workers: AtomicUsize,
pub prefetch_unmerged_chunks: BasicMetric,
pub prefetch_cumulative_time_millis: BasicMetric,
pub prefetch_begin_time_secs: BasicMetric,
pub prefetch_begin_time_millis: BasicMetric,
pub prefetch_end_time_secs: BasicMetric,
pub prefetch_end_time_millis: BasicMetric,
pub buffered_backend_size: BasicMetric,
pub data_all_ready: AtomicBool,
}
impl BlobcacheMetrics {
pub fn new(id: &str, store_path: &str) -> Arc<Self> {
let metrics = Arc::new(Self {
id: id.to_string(),
store_path: store_path.to_string(),
..Default::default()
});
BLOBCACHE_METRICS
.write()
.unwrap()
.insert(id.to_string(), metrics.clone());
metrics
}
pub fn release(&self) -> IoStatsResult<()> {
BLOBCACHE_METRICS
.write()
.unwrap()
.remove(&self.id)
.map(|_| ())
.ok_or(MetricsError::NoCounter)
}
pub fn export_metrics(&self) -> IoStatsResult<String> {
serde_json::to_string(self).map_err(MetricsError::Serialize)
}
pub fn calculate_prefetch_metrics(&self, begin_time: SystemTime) {
let now = SystemTime::now();
if let Ok(ref t) = now.duration_since(SystemTime::UNIX_EPOCH) {
self.prefetch_end_time_secs.set(t.as_secs());
self.prefetch_end_time_millis.set(t.subsec_millis() as u64);
}
if let Ok(ref t) = now.duration_since(begin_time) {
let elapsed = saturating_duration_millis(t);
self.prefetch_cumulative_time_millis.add(elapsed);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_request_size_index() {
assert_eq!(request_size_index(0x0), 0);
assert_eq!(request_size_index(0x3ff), 0);
assert_eq!(request_size_index(0x400), 1);
assert_eq!(request_size_index(0xfff), 1);
assert_eq!(request_size_index(0x1000), 2);
assert_eq!(request_size_index(0x3fff), 2);
assert_eq!(request_size_index(0x4000), 3);
assert_eq!(request_size_index(0xffff), 3);
assert_eq!(request_size_index(0x1_0000), 4);
assert_eq!(request_size_index(0x1_ffff), 4);
assert_eq!(request_size_index(0x2_0000), 5);
assert_eq!(request_size_index(0x7_ffff), 5);
assert_eq!(request_size_index(0x8_0000), 6);
assert_eq!(request_size_index(0xf_ffff), 6);
assert_eq!(request_size_index(0x10_0000), 7);
assert_eq!(request_size_index(usize::MAX), 7);
}
#[test]
fn test_block_read_count() {
let g = FsIoStats::default();
g.init();
g.fop_update(StatsFop::Read, 4000, true);
assert_eq!(g.block_count_read[1].count(), 1);
g.fop_update(StatsFop::Read, 4096, true);
assert_eq!(g.block_count_read[2].count(), 1);
g.fop_update(StatsFop::Read, 65535, true);
assert_eq!(g.block_count_read[3].count(), 1);
g.fop_update(StatsFop::Read, 131072, true);
assert_eq!(g.block_count_read[5].count(), 1);
g.fop_update(StatsFop::Read, 65520, true);
assert_eq!(g.block_count_read[3].count(), 2);
g.fop_update(StatsFop::Read, 2015520, true);
assert_eq!(g.block_count_read[3].count(), 2);
}
}