1use std::collections::{HashMap, HashSet};
14use std::ops::{Deref, Drop};
15use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex, RwLock};
17use std::time::{Duration, SystemTime};
18
19use nydus_api::http::MetricsError;
20
21use crate::logger::ErrorHolder;
22use crate::InodeBitmap;
23
24pub type Inode = u64;
26
27#[derive(PartialEq, Copy, Clone)]
29pub enum StatsFop {
30 Getattr,
31 Readlink,
32 Open,
33 Release,
34 Read,
35 Statfs,
36 Getxattr,
37 Listxattr,
38 Opendir,
39 Lookup,
40 Readdir,
41 Readdirplus,
42 Access,
43 Forget,
44 BatchForget,
45 Max,
46}
47
48type IoStatsResult<T> = Result<T, MetricsError>;
49
50const BLOCK_READ_SIZES_MAX: usize = 8;
54
55#[inline]
56fn request_size_index(size: usize) -> usize {
57 let ceil = (size >> 10).leading_zeros();
58 let shift = (std::cmp::max(ceil, 53) - 53) << 2;
59
60 (0x0112_2334_5567u64 >> shift) as usize & 0xf
61}
62
63const READ_LATENCY_RANGE_MAX: usize = 8;
65
66fn latency_millis_range_index(elapsed: u64) -> usize {
67 match elapsed {
68 _ if elapsed <= 1 => 0,
69 _ if elapsed <= 20 => 1,
70 _ if elapsed <= 50 => 2,
71 _ if elapsed <= 100 => 3,
72 _ if elapsed <= 500 => 4,
73 _ if elapsed <= 1000 => 5,
74 _ if elapsed <= 2000 => 6,
75 _ => 7,
76 }
77}
78
79fn latency_micros_range_index(elapsed: u64) -> usize {
81 match elapsed {
82 _ if elapsed <= 200 => 0,
83 _ if elapsed <= 1_000 => 1,
84 _ if elapsed <= 20_000 => 2,
85 _ if elapsed <= 50_000 => 3,
86 _ if elapsed <= 500_000 => 4,
87 _ if elapsed <= 1_000_000 => 5,
88 _ if elapsed <= 2_000_000 => 6,
89 _ => 7,
90 }
91}
92
93lazy_static! {
97 static ref FS_METRICS: RwLock<HashMap<String, Arc<FsIoStats>>> = Default::default();
98}
99
100lazy_static! {
101 static ref BACKEND_METRICS: RwLock<HashMap<String, Arc<BackendMetrics>>> = Default::default();
102}
103
104lazy_static! {
105 static ref BLOBCACHE_METRICS: RwLock<HashMap<String, Arc<BlobcacheMetrics>>> =
106 Default::default();
107}
108
109lazy_static! {
110 pub static ref ERROR_HOLDER: Arc<Mutex<ErrorHolder>> =
111 Arc::new(Mutex::new(ErrorHolder::new(500, 50 * 1024)));
112}
113
114pub trait InodeStatsCounter {
116 fn stats_fop_inc(&self, fop: StatsFop);
117 fn stats_fop_err_inc(&self, fop: StatsFop);
118 fn stats_cumulative(&self, fop: StatsFop, value: usize);
119}
120
121#[derive(Default, Debug, Serialize)]
123pub struct InodeIoStats {
124 total_fops: BasicMetric,
125 data_read: BasicMetric,
126 block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
128 fop_hits: [BasicMetric; StatsFop::Max as usize],
129 fop_errors: [BasicMetric; StatsFop::Max as usize],
130}
131
132impl InodeStatsCounter for InodeIoStats {
133 fn stats_fop_inc(&self, fop: StatsFop) {
134 self.fop_hits[fop as usize].inc();
135 self.total_fops.inc();
136 }
137
138 fn stats_fop_err_inc(&self, fop: StatsFop) {
139 self.fop_errors[fop as usize].inc();
140 }
141
142 fn stats_cumulative(&self, fop: StatsFop, value: usize) {
143 if fop == StatsFop::Read {
144 self.data_read.add(value as u64);
145 let idx = request_size_index(value);
148 self.block_count_read[idx].inc();
149 }
150 }
151}
152
153#[derive(Default, Debug, Serialize)]
166pub struct AccessPattern {
167 ino: u64,
168 nr_read: BasicMetric,
169 first_access_time_secs: AtomicU64,
171 first_access_time_nanos: AtomicU32,
172}
173
174impl AccessPattern {
175 fn record_access_time(&self) {
176 if self.first_access_time_secs.load(Ordering::Relaxed) == 0 {
177 let t = SystemTime::now()
178 .duration_since(SystemTime::UNIX_EPOCH)
179 .unwrap();
180 self.first_access_time_secs
181 .store(t.as_secs(), Ordering::Relaxed);
182 self.first_access_time_nanos
183 .store(t.subsec_nanos(), Ordering::Relaxed);
184 }
185 }
186}
187
188#[derive(Default, Debug, Serialize)]
192pub struct FsIoStats {
193 files_account_enabled: AtomicBool,
197 access_pattern_enabled: AtomicBool,
198 record_latest_read_files_enabled: AtomicBool,
199 measure_latency: AtomicBool,
201
202 id: String,
203 nr_opens: BasicMetric,
205 data_read: BasicMetric,
207 block_count_read: [BasicMetric; BLOCK_READ_SIZES_MAX],
209 fop_hits: [BasicMetric; StatsFop::Max as usize],
211 fop_errors: [BasicMetric; StatsFop::Max as usize],
213
214 fop_cumulative_latency_total: [BasicMetric; StatsFop::Max as usize],
219 read_latency_dist: [BasicMetric; READ_LATENCY_RANGE_MAX],
222
223 #[serde(skip_serializing, skip_deserializing)]
225 file_counters: RwLock<HashMap<Inode, Arc<InodeIoStats>>>,
226 #[serde(skip_serializing, skip_deserializing)]
227 access_patterns: RwLock<HashMap<Inode, Arc<AccessPattern>>>,
228 #[serde(skip_serializing, skip_deserializing)]
230 recent_read_files: InodeBitmap,
231}
232
233macro_rules! impl_iostat_option {
234 ($get:ident, $set:ident, $opt:ident) => {
235 #[inline]
236 fn $get(&self) -> bool {
237 self.$opt.load(Ordering::Relaxed)
238 }
239
240 #[inline]
241 pub fn $set(&self, switch: bool) {
242 self.$opt.store(switch, Ordering::Relaxed)
243 }
244 };
245}
246
247impl FsIoStats {
248 pub fn new(id: &str) -> Arc<FsIoStats> {
250 let c = Arc::new(FsIoStats {
251 id: id.to_string(),
252 ..Default::default()
253 });
254 FS_METRICS
255 .write()
256 .unwrap()
257 .insert(id.to_string(), c.clone());
258 c.init();
259 c
260 }
261
262 pub fn init(&self) {
264 self.files_account_enabled.store(false, Ordering::Relaxed);
265 self.measure_latency.store(true, Ordering::Relaxed);
266 }
267
268 impl_iostat_option!(files_enabled, toggle_files_recording, files_account_enabled);
269 impl_iostat_option!(
270 access_pattern_enabled,
271 toggle_access_pattern,
272 access_pattern_enabled
273 );
274 impl_iostat_option!(
275 record_latest_read_files_enabled,
276 toggle_latest_read_files_recording,
277 record_latest_read_files_enabled
278 );
279
280 pub fn new_file_counter(&self, ino: Inode) {
282 if self.files_enabled() {
283 let mut counters = self.file_counters.write().unwrap();
284 if counters.get(&ino).is_none() {
285 counters.insert(ino, Arc::new(InodeIoStats::default()));
286 }
287 }
288
289 if self.access_pattern_enabled() {
290 let mut records = self.access_patterns.write().unwrap();
291 if records.get(&ino).is_none() {
292 records.insert(
293 ino,
294 Arc::new(AccessPattern {
295 ino,
296 ..Default::default()
297 }),
298 );
299 }
300 }
301 }
302
303 fn file_stats_update(&self, ino: Inode, fop: StatsFop, bsize: usize, success: bool) {
304 self.fop_update(fop, bsize, success);
305
306 if self.files_enabled() {
307 let counters = self.file_counters.read().unwrap();
308 match counters.get(&ino) {
309 Some(c) => {
310 c.stats_fop_inc(fop);
311 c.stats_cumulative(fop, bsize);
312 }
313 None => warn!("No iostats counter for file {}", ino),
314 }
315 }
316
317 if self.access_pattern_enabled() && fop == StatsFop::Read {
318 let records = self.access_patterns.read().unwrap();
319 match records.get(&ino) {
320 Some(r) => {
321 r.nr_read.inc();
322 r.record_access_time();
323 }
324 None => warn!("No pattern record for file {}", ino),
325 }
326 }
327
328 if self.record_latest_read_files_enabled() && fop == StatsFop::Read && success {
329 self.recent_read_files.set(ino);
330 }
331 }
332
333 fn fop_update(&self, fop: StatsFop, value: usize, success: bool) {
334 if fop == StatsFop::Read {
338 let idx = request_size_index(value);
339 self.block_count_read[idx].inc()
340 }
341
342 if success {
343 self.fop_hits[fop as usize].inc();
344 match fop {
345 StatsFop::Read => self.data_read.add(value as u64),
346 StatsFop::Open => self.nr_opens.inc(),
347 StatsFop::Release => self.nr_opens.dec(),
348 _ => (),
349 };
350 } else {
351 self.fop_errors[fop as usize].inc();
352 }
353 }
354
355 pub fn latency_start(&self) -> Option<SystemTime> {
357 if !self.measure_latency.load(Ordering::Relaxed) {
358 return None;
359 }
360
361 Some(SystemTime::now())
362 }
363
364 pub fn latency_end(&self, start: &Option<SystemTime>, fop: StatsFop) {
366 if let Some(start) = start {
367 if let Ok(d) = SystemTime::elapsed(start) {
368 let elapsed = saturating_duration_micros(&d);
369 self.read_latency_dist[latency_micros_range_index(elapsed)].inc();
370 self.fop_cumulative_latency_total[fop as usize].add(elapsed);
371 }
372 }
373 }
374
375 fn export_files_stats(&self) -> Result<String, MetricsError> {
376 serde_json::to_string(
377 self.file_counters
378 .read()
379 .expect("Not expect poisoned lock")
380 .deref(),
381 )
382 .map_err(MetricsError::Serialize)
383 }
384
385 fn export_latest_read_files(&self) -> String {
386 serde_json::json!(self.recent_read_files.bitmap_to_array_and_clear()).to_string()
387 }
388
389 fn export_files_access_patterns(&self) -> Result<String, MetricsError> {
390 serde_json::to_string(
391 &self
392 .access_patterns
393 .read()
394 .expect("Not poisoned lock")
395 .deref()
396 .values()
397 .filter(|r| r.nr_read.count() != 0)
398 .collect::<Vec<&Arc<AccessPattern>>>(),
399 )
400 .map_err(MetricsError::Serialize)
401 }
402
403 fn export_fs_stats(&self) -> Result<String, MetricsError> {
404 serde_json::to_string(self).map_err(MetricsError::Serialize)
405 }
406}
407
408pub struct FopRecorder<'a> {
415 fop: StatsFop,
416 inode: u64,
417 success: bool,
418 size: usize,
420 ios: &'a FsIoStats,
421}
422
423impl Drop for FopRecorder<'_> {
424 fn drop(&mut self) {
425 self.ios
426 .file_stats_update(self.inode, self.fop, self.size, self.success);
427 }
428}
429
430impl<'a> FopRecorder<'a> {
431 pub fn settle<T>(fop: StatsFop, inode: u64, ios: &'a T) -> Self
433 where
434 T: AsRef<FsIoStats>,
435 {
436 FopRecorder {
437 fop,
438 inode,
439 success: false,
440 size: 0,
441 ios: ios.as_ref(),
442 }
443 }
444
445 pub fn mark_success(&mut self, size: usize) {
447 self.success = true;
448 self.size = size;
449 }
450}
451
452pub fn export_files_stats(
454 name: &Option<String>,
455 latest_read_files: bool,
456) -> Result<String, MetricsError> {
457 let fs_metrics = FS_METRICS.read().unwrap();
458
459 match name {
460 Some(k) => fs_metrics.get(k).ok_or(MetricsError::NoCounter).map(|v| {
461 if !latest_read_files {
462 v.export_files_stats()
463 } else {
464 Ok(v.export_latest_read_files())
465 }
466 })?,
467 None => {
468 if fs_metrics.len() == 1 {
469 if let Some(ios) = fs_metrics.values().next() {
470 return if !latest_read_files {
471 ios.export_files_stats()
472 } else {
473 Ok(ios.export_latest_read_files())
474 };
475 }
476 }
477 Err(MetricsError::NoCounter)
478 }
479 }
480}
481
482pub fn export_files_access_pattern(name: &Option<String>) -> Result<String, MetricsError> {
484 let fs_metrics = FS_METRICS.read().unwrap();
485 match name {
486 Some(k) => fs_metrics
487 .get(k)
488 .ok_or(MetricsError::NoCounter)
489 .map(|v| v.export_files_access_patterns())?,
490 None => {
491 if fs_metrics.len() == 1 {
492 if let Some(ios) = fs_metrics.values().next() {
493 return ios.export_files_access_patterns();
494 }
495 }
496 Err(MetricsError::NoCounter)
497 }
498 }
499}
500
501pub fn export_global_stats(name: &Option<String>) -> Result<String, MetricsError> {
503 let fs_metrics = FS_METRICS.read().unwrap();
505
506 match name {
507 Some(k) => fs_metrics
508 .get(k)
509 .ok_or(MetricsError::NoCounter)
510 .map(|v| v.export_fs_stats())?,
511 None => {
512 if fs_metrics.len() == 1 {
513 if let Some(ios) = fs_metrics.values().next() {
514 return ios.export_fs_stats();
515 }
516 }
517 Err(MetricsError::NoCounter)
518 }
519 }
520}
521
522pub fn export_backend_metrics(name: &Option<String>) -> IoStatsResult<String> {
524 let metrics = BACKEND_METRICS.read().unwrap();
525
526 match name {
527 Some(k) => metrics
528 .get(k)
529 .ok_or(MetricsError::NoCounter)
530 .map(|v| v.export_metrics())?,
531 None => {
532 if metrics.len() == 1 {
533 if let Some(m) = metrics.values().next() {
534 return m.export_metrics();
535 }
536 }
537 Err(MetricsError::NoCounter)
538 }
539 }
540}
541
542pub fn export_blobcache_metrics(id: &Option<String>) -> IoStatsResult<String> {
544 let metrics = BLOBCACHE_METRICS.read().unwrap();
545
546 match id {
547 Some(k) => metrics
548 .get(k)
549 .ok_or(MetricsError::NoCounter)
550 .map(|v| v.export_metrics())?,
551 None => {
552 if metrics.len() == 1 {
553 if let Some(m) = metrics.values().next() {
554 return m.export_metrics();
555 }
556 }
557 Err(MetricsError::NoCounter)
558 }
559 }
560}
561
562pub fn export_events() -> IoStatsResult<String> {
564 serde_json::to_string(ERROR_HOLDER.lock().unwrap().deref()).map_err(MetricsError::Serialize)
565}
566
567pub trait Metric {
569 fn add(&self, value: u64);
571 fn inc(&self) {
573 self.add(1);
574 }
575 fn count(&self) -> u64;
577 fn sub(&self, value: u64);
579 fn dec(&self) {
581 self.sub(1);
582 }
583
584 fn set(&self, value: u64);
585}
586
587#[derive(Default, Serialize, Debug)]
589pub struct BasicMetric(AtomicU64);
590
591impl Metric for BasicMetric {
592 fn add(&self, value: u64) {
593 self.0.fetch_add(value, Ordering::Relaxed);
594 }
595
596 fn count(&self) -> u64 {
597 self.0.load(Ordering::Relaxed)
598 }
599
600 fn sub(&self, value: u64) {
601 self.0.fetch_sub(value, Ordering::Relaxed);
602 }
603
604 fn set(&self, value: u64) {
605 self.0.store(value, Ordering::Relaxed);
606 }
607}
608
609#[derive(Default, Serialize, Debug)]
611pub struct BackendMetrics {
612 #[serde(skip_serializing, skip_deserializing)]
613 id: String,
614 backend_type: String,
616 read_count: BasicMetric,
618 read_errors: BasicMetric,
620 read_amount_total: BasicMetric,
623 read_cumulative_latency_millis_total: BasicMetric,
625 read_cumulative_latency_millis_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
626 read_count_block_size_dist: [BasicMetric; BLOCK_READ_SIZES_MAX],
627 read_latency_sizes_dist: [[BasicMetric; READ_LATENCY_RANGE_MAX]; BLOCK_READ_SIZES_MAX],
629}
630
631impl BackendMetrics {
632 pub fn new(id: &str, backend_type: &str) -> Arc<Self> {
634 let backend_metrics = Arc::new(Self {
635 id: id.to_string(),
636 backend_type: backend_type.to_string(),
637 ..Default::default()
638 });
639
640 BACKEND_METRICS
641 .write()
642 .unwrap()
643 .insert(id.to_string(), backend_metrics.clone());
644
645 backend_metrics
646 }
647
648 pub fn release(&self) -> IoStatsResult<()> {
650 BACKEND_METRICS
651 .write()
652 .unwrap()
653 .remove(&self.id)
654 .map(|_| ())
655 .ok_or(MetricsError::NoCounter)
656 }
657
658 pub fn begin(&self) -> SystemTime {
660 SystemTime::now()
661 }
662
663 pub fn end(&self, begin: &SystemTime, size: usize, error: bool) {
665 if let Ok(d) = SystemTime::elapsed(begin) {
666 let elapsed = saturating_duration_millis(&d);
667
668 self.read_count.inc();
669 if error {
670 self.read_errors.inc();
671 }
672
673 self.read_cumulative_latency_millis_total.add(elapsed);
674 self.read_amount_total.add(size as u64);
675 let lat_idx = latency_millis_range_index(elapsed);
676 let size_idx = request_size_index(size);
677 self.read_cumulative_latency_millis_dist[size_idx].add(elapsed);
678 self.read_count_block_size_dist[size_idx].inc();
679 self.read_latency_sizes_dist[size_idx][lat_idx].inc();
680 }
681 }
682
683 fn export_metrics(&self) -> IoStatsResult<String> {
684 serde_json::to_string(self).map_err(MetricsError::Serialize)
685 }
686}
687
688fn saturating_duration_millis(d: &Duration) -> u64 {
690 let d_secs = d.as_secs();
691 if d_secs == 0 {
692 d.subsec_millis() as u64
693 } else {
694 d_secs
695 .saturating_mul(1000)
696 .saturating_add(d.subsec_millis() as u64)
697 }
698}
699
700fn saturating_duration_micros(d: &Duration) -> u64 {
701 let d_secs = d.as_secs();
702 if d_secs == 0 {
703 d.subsec_micros() as u64
704 } else {
705 d_secs
706 .saturating_mul(1_000_000)
707 .saturating_add(d.subsec_micros() as u64)
708 }
709}
710
711#[derive(Debug, Default, Serialize)]
712pub struct BlobcacheMetrics {
713 #[serde(skip_serializing, skip_deserializing)]
714 id: String,
715 pub underlying_files: Mutex<HashSet<String>>,
720 pub store_path: String,
721 pub partial_hits: BasicMetric,
723 pub whole_hits: BasicMetric,
724 pub total: BasicMetric,
727 pub entries_count: BasicMetric,
730 pub prefetch_data_amount: BasicMetric,
734 pub prefetch_requests_count: BasicMetric,
736 pub prefetch_workers: AtomicUsize,
737 pub prefetch_unmerged_chunks: BasicMetric,
738 pub prefetch_cumulative_time_millis: BasicMetric,
743 pub prefetch_begin_time_secs: BasicMetric,
747 pub prefetch_begin_time_millis: BasicMetric,
749 pub prefetch_end_time_secs: BasicMetric,
751 pub prefetch_end_time_millis: BasicMetric,
753 pub buffered_backend_size: BasicMetric,
754 pub data_all_ready: AtomicBool,
755}
756
757impl BlobcacheMetrics {
758 pub fn new(id: &str, store_path: &str) -> Arc<Self> {
760 let metrics = Arc::new(Self {
761 id: id.to_string(),
762 store_path: store_path.to_string(),
763 ..Default::default()
764 });
765
766 BLOBCACHE_METRICS
770 .write()
771 .unwrap()
772 .insert(id.to_string(), metrics.clone());
773
774 metrics
775 }
776
777 pub fn release(&self) -> IoStatsResult<()> {
779 BLOBCACHE_METRICS
780 .write()
781 .unwrap()
782 .remove(&self.id)
783 .map(|_| ())
784 .ok_or(MetricsError::NoCounter)
785 }
786
787 pub fn export_metrics(&self) -> IoStatsResult<String> {
789 serde_json::to_string(self).map_err(MetricsError::Serialize)
790 }
791
792 pub fn calculate_prefetch_metrics(&self, begin_time: SystemTime) {
793 let now = SystemTime::now();
794 if let Ok(ref t) = now.duration_since(SystemTime::UNIX_EPOCH) {
795 self.prefetch_end_time_secs.set(t.as_secs());
796 self.prefetch_end_time_millis.set(t.subsec_millis() as u64);
797 }
798 if let Ok(ref t) = now.duration_since(begin_time) {
799 let elapsed = saturating_duration_millis(t);
800 self.prefetch_cumulative_time_millis.add(elapsed);
801 }
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808
809 #[test]
810 fn test_request_size_index() {
811 assert_eq!(request_size_index(0x0), 0);
812 assert_eq!(request_size_index(0x3ff), 0);
813 assert_eq!(request_size_index(0x400), 1);
814 assert_eq!(request_size_index(0xfff), 1);
815 assert_eq!(request_size_index(0x1000), 2);
816 assert_eq!(request_size_index(0x3fff), 2);
817 assert_eq!(request_size_index(0x4000), 3);
818 assert_eq!(request_size_index(0xffff), 3);
819 assert_eq!(request_size_index(0x1_0000), 4);
820 assert_eq!(request_size_index(0x1_ffff), 4);
821 assert_eq!(request_size_index(0x2_0000), 5);
822 assert_eq!(request_size_index(0x7_ffff), 5);
823 assert_eq!(request_size_index(0x8_0000), 6);
824 assert_eq!(request_size_index(0xf_ffff), 6);
825 assert_eq!(request_size_index(0x10_0000), 7);
826 assert_eq!(request_size_index(usize::MAX), 7);
827 }
828
829 #[test]
830 fn test_block_read_count() {
831 let g = FsIoStats::default();
832 g.init();
833 g.fop_update(StatsFop::Read, 4000, true);
834 assert_eq!(g.block_count_read[1].count(), 1);
835
836 g.fop_update(StatsFop::Read, 4096, true);
837 assert_eq!(g.block_count_read[2].count(), 1);
838
839 g.fop_update(StatsFop::Read, 65535, true);
840 assert_eq!(g.block_count_read[3].count(), 1);
841
842 g.fop_update(StatsFop::Read, 131072, true);
843 assert_eq!(g.block_count_read[5].count(), 1);
844
845 g.fop_update(StatsFop::Read, 65520, true);
846 assert_eq!(g.block_count_read[3].count(), 2);
847
848 g.fop_update(StatsFop::Read, 2015520, true);
849 assert_eq!(g.block_count_read[3].count(), 2);
850 }
851
852 #[test]
853 fn test_latency_millis_range_index() {
854 assert_eq!(latency_millis_range_index(0), 0);
855 assert_eq!(latency_millis_range_index(1), 0);
856 assert_eq!(latency_millis_range_index(10), 1);
857 assert_eq!(latency_millis_range_index(20), 1);
858 assert_eq!(latency_millis_range_index(40), 2);
859 assert_eq!(latency_millis_range_index(80), 3);
860 assert_eq!(latency_millis_range_index(160), 4);
861 assert_eq!(latency_millis_range_index(320), 4);
862 assert_eq!(latency_millis_range_index(640), 5);
863 assert_eq!(latency_millis_range_index(1280), 6);
864 assert_eq!(latency_millis_range_index(2560), 7);
865 }
866
867 #[test]
868 fn test_latency_micros_range_index() {
869 assert_eq!(latency_micros_range_index(100), 0);
870 assert_eq!(latency_micros_range_index(500), 1);
871 assert_eq!(latency_micros_range_index(10_000), 2);
872 assert_eq!(latency_micros_range_index(30_000), 3);
873 assert_eq!(latency_micros_range_index(100_000), 4);
874 assert_eq!(latency_micros_range_index(1_000_000), 5);
875 assert_eq!(latency_micros_range_index(1_500_000), 6);
876 assert_eq!(latency_micros_range_index(3_000_000), 7);
877 }
878
879 #[test]
880 fn test_inode_stats() {
881 let stat = InodeIoStats::default();
882 stat.stats_fop_inc(StatsFop::Read);
883 stat.stats_fop_inc(StatsFop::Open);
884 assert_eq!(stat.fop_hits[StatsFop::Read as usize].count(), 1);
885 assert_eq!(stat.total_fops.count(), 2);
886
887 stat.stats_cumulative(StatsFop::Open, 1000);
888 stat.stats_cumulative(StatsFop::Read, 4000);
889 stat.stats_cumulative(StatsFop::Read, 5000);
890
891 assert_eq!(stat.block_count_read[0].count(), 0);
892 assert_eq!(stat.block_count_read[1].count(), 1);
893 assert_eq!(stat.block_count_read[2].count(), 1);
894 }
895
896 #[test]
897 fn test_access_pattern() {
898 let ap = AccessPattern::default();
899 ap.record_access_time();
900 assert_ne!(ap.first_access_time_secs.load(Ordering::Relaxed), 0);
901 assert_ne!(ap.first_access_time_nanos.load(Ordering::Relaxed), 0);
902 }
903
904 #[test]
905 fn test_file_stats_update() {
906 let f = FsIoStats::default();
907 let node1: Inode = 1;
908 let node2: Inode = 2;
909 let node3: Inode = 3;
910
911 f.new_file_counter(node1);
912 f.new_file_counter(node2);
913 assert!(f.access_patterns.read().unwrap().is_empty());
914 assert!(f.file_counters.read().unwrap().is_empty());
915
916 f.access_pattern_enabled.store(true, Ordering::Relaxed);
917 f.files_account_enabled.store(true, Ordering::Relaxed);
918 f.record_latest_read_files_enabled
919 .store(true, Ordering::Relaxed);
920 f.new_file_counter(node1);
921 f.new_file_counter(node2);
922 f.file_stats_update(node1, StatsFop::Read, 4000, true);
923 f.file_stats_update(node1, StatsFop::Read, 5000, true);
924 f.file_stats_update(node1, StatsFop::Open, 0, true);
925 f.file_stats_update(node3, StatsFop::Open, 0, true);
926 assert_eq!(
927 f.access_patterns
928 .read()
929 .unwrap()
930 .get(&node1)
931 .unwrap()
932 .nr_read
933 .count(),
934 2
935 );
936 assert_eq!(
937 f.file_counters
938 .read()
939 .unwrap()
940 .get(&node1)
941 .unwrap()
942 .fop_hits[StatsFop::Read as usize]
943 .count(),
944 2
945 );
946 assert!(f.recent_read_files.is_set(node1 as u64));
947 }
948
949 #[test]
950 fn test_fop_update() {
951 let f = FsIoStats::default();
952 assert_eq!(f.nr_opens.count(), 0);
953 f.fop_update(StatsFop::Open, 0, true);
954 assert_eq!(f.nr_opens.count(), 1);
955 f.fop_update(StatsFop::Release, 0, true);
956 assert_eq!(f.nr_opens.count(), 0);
957 f.fop_update(StatsFop::Opendir, 0, true);
958 assert_eq!(f.fop_errors[StatsFop::Opendir as usize].count(), 0);
959 f.fop_update(StatsFop::Opendir, 0, false);
960 assert_eq!(f.fop_errors[StatsFop::Opendir as usize].count(), 1);
961 }
962
963 #[test]
964 fn test_latecny() {
965 let f = FsIoStats::default();
966 assert_eq!(f.latency_start(), None);
967 f.measure_latency.store(true, Ordering::Relaxed);
968 let s = f.latency_start().unwrap();
969 let d = Duration::new(1, 500_000_000);
970 f.latency_end(&s.checked_sub(d), StatsFop::Read);
972 assert_eq!(
973 f.read_latency_dist[latency_micros_range_index(1_500_000)].count(),
974 1
975 );
976 assert!(
978 f.fop_cumulative_latency_total[StatsFop::Read as usize].count()
979 - saturating_duration_micros(&d)
980 <= 1000
981 );
982 }
983
984 #[test]
985 fn test_fs_io_stats_new_and_export() {
986 let id0: Option<String> = Some("id-0".to_string());
987 let id1: Option<String> = Some("id-1".to_string());
988 let none: Option<String> = None;
989
990 let _f1 = FsIoStats::new("id-0");
991 assert!(export_files_stats(&id0, true).is_ok());
992 assert!(export_files_stats(&none, true).is_ok());
993 assert!(export_global_stats(&id0).is_ok());
994 assert!(export_global_stats(&id1).is_err());
995 assert!(export_global_stats(&none).is_ok());
996
997 let _f2 = FsIoStats::new("id-1");
998 assert!(export_files_stats(&none, false).is_err());
999 assert!(export_files_stats(&id0, true).is_ok());
1000 assert!(export_files_stats(&id0, false).is_ok());
1001 assert!(export_global_stats(&none).is_err());
1002 assert!(export_files_access_pattern(&id0).is_ok());
1003 assert!(export_files_access_pattern(&none).is_err());
1004
1005 let ios = FsIoStats::default();
1006 assert!(ios.export_files_access_patterns().is_ok());
1007 assert!(ios.export_files_stats().is_ok());
1008 assert!(ios.export_fs_stats().is_ok());
1009 ios.export_latest_read_files();
1010
1011 test_fop_record();
1012 }
1013
1014 fn test_fop_record() {
1015 let ios = FsIoStats::new("0");
1016 let mut recorder = FopRecorder::settle(StatsFop::Read, 0, &ios);
1017 assert!(!recorder.success);
1018 assert_eq!(recorder.size, 0);
1019
1020 recorder.mark_success(10);
1021 assert!(recorder.success);
1022 assert_eq!(recorder.size, 10);
1023 drop(recorder);
1024 }
1025
1026 #[test]
1027 fn test_saturating_duration() {
1028 assert_eq!(
1029 saturating_duration_millis(&Duration::from_millis(1234)),
1030 1234
1031 );
1032 assert_eq!(
1033 saturating_duration_micros(&Duration::from_millis(888)),
1034 888_000
1035 );
1036 assert_eq!(
1037 saturating_duration_micros(&Duration::from_millis(1888)),
1038 1_888_000
1039 );
1040 }
1041
1042 #[test]
1043 fn test_blob_cache_metric() {
1044 let m1: Arc<BlobcacheMetrics> = BlobcacheMetrics::new("id", "path");
1045 {
1046 let metrics = BLOBCACHE_METRICS.read().unwrap();
1047 assert_eq!(metrics.len(), 1);
1048 }
1049 assert!(m1.export_metrics().is_ok());
1050 assert!(m1.release().is_ok());
1051 {
1052 let metrics = BLOBCACHE_METRICS.read().unwrap();
1053 assert_eq!(metrics.len(), 0);
1054 }
1055
1056 let now = SystemTime::now();
1057 let prev = now.checked_sub(Duration::new(10, 0)).unwrap();
1058 m1.calculate_prefetch_metrics(prev);
1059 assert_eq!(m1.prefetch_cumulative_time_millis.count(), 10_000);
1060 assert_eq!(
1061 m1.prefetch_end_time_secs.count(),
1062 now.duration_since(SystemTime::UNIX_EPOCH)
1063 .expect("No error")
1064 .as_secs()
1065 );
1066
1067 let id0: Option<String> = Some("id-0".to_string());
1068 let none: Option<String> = None;
1069 BlobcacheMetrics::new("id-0", "t0");
1070 assert!(export_blobcache_metrics(&id0).is_ok());
1071 assert!(export_blobcache_metrics(&none).is_ok());
1072 BlobcacheMetrics::new("id-1", "t1");
1073 assert!(export_blobcache_metrics(&none).is_err());
1074 assert!(export_events().is_ok());
1075 }
1076
1077 #[test]
1078 fn test_backend_metric() {
1079 let id0: Option<String> = Some("id-0".to_string());
1080 let id1: Option<String> = Some("id-1".to_string());
1081 let none: Option<String> = None;
1082 let b0 = BackendMetrics::new("id-0", "t0");
1083 assert!(export_backend_metrics(&id0).is_ok());
1084 assert!(export_backend_metrics(&id1).is_err());
1085 assert!(export_backend_metrics(&none).is_ok());
1086 let b1 = BackendMetrics::new("id-1", "t1");
1087 assert!(export_backend_metrics(&id0).is_ok());
1088 assert!(export_backend_metrics(&id1).is_ok());
1089 assert!(export_backend_metrics(&none).is_err());
1090 assert!(b0.release().is_ok());
1091 assert!(b1.release().is_ok());
1092 }
1093}