warm_fs/
lib.rs

1//! A File system warmer
2//!
3//! Cloud providers tent to restore volumes from snapshots in a cold state:
4//!
5//! > For volumes that were created from snapshots, the storage blocks must be pulled down from
6//! Amazon S3 and written to the volume before you can access them. This preliminary action takes
7//! time and can cause a significant increase in the latency of I/O operations the first time
8//! each block is accessed ([source][ebs-initialize]).
9//!
10//! It has methods to estimates total size of particular folder and then recursively read files
11//! in a thread pool.
12//!
13//! It implements `Iterator` giving an access to the warming process intermediate state.
14//! Refer to [cli example] for progress bar implementation.
15//!
16//! [ebs-initialize]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-initialize.html
17//! [cli example]: https://github.com/imbolc/warm-fs/blob/main/examples/cli.rs
18
19#![warn(clippy::all, missing_docs, nonstandard_style, future_incompatible)]
20
21use std::fs::File;
22use std::io::{self, Read};
23use std::path::{Path, PathBuf};
24use std::sync::mpsc::{channel, Receiver, Sender};
25use threadpool::ThreadPool;
26use walkdir::WalkDir;
27
28/// The warmer
29#[derive(Default)]
30pub struct Warmer {
31    dirs: Vec<PathBuf>,
32    files: Vec<PathBuf>,
33    num_threads: usize,
34    follow_links: bool,
35}
36
37/// Iterator over the size estimation / file reading bytes
38pub struct Iter {
39    rx: Receiver<u64>,
40}
41
42impl Warmer {
43    /// Creates a new warmer
44    pub fn new(num_threads: usize, follow_links: bool) -> Self {
45        Self {
46            num_threads,
47            follow_links,
48            ..Default::default()
49        }
50    }
51
52    /// Adds folders to walk recursively
53    pub fn add_dirs(&mut self, paths: &[impl AsRef<Path>]) {
54        let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
55        self.dirs.append(&mut paths);
56    }
57
58    /// Adds files directly to avoid folders traversal
59    pub fn add_files(&mut self, paths: &[impl AsRef<Path>]) {
60        let mut paths: Vec<_> = paths.iter().map(|p| p.as_ref().to_path_buf()).collect();
61        self.files.append(&mut paths);
62    }
63
64    /// Estimates total size to read, returns the total number of bytes
65    pub fn estimate(&self) -> u64 {
66        self.iter_estimate().sum()
67    }
68
69    /// Read files, returns the total number of read bytes
70    pub fn warm(&self) -> u64 {
71        self.iter_warm().sum()
72    }
73
74    /// Estimates total size to read, returns an iterator over file sizes
75    pub fn iter_estimate(&self) -> Iter {
76        let (tx, rx) = channel();
77        let dirs = self.dirs.clone();
78        let files = self.files.clone();
79        let num_threads = self.num_threads;
80        let follow_links = self.follow_links;
81        std::thread::spawn(move || {
82            let pool = ThreadPool::new(num_threads);
83            for file in files {
84                let tx = tx.clone();
85                pool.execute(move || {
86                    if let Ok(Some(file)) = resolve_file(file) {
87                        if let Ok(size) = file.metadata().map(|m| m.len()) {
88                            tx.send(size).ok();
89                        }
90                    }
91                });
92            }
93            for dir in dirs {
94                for entry in walker(dir, follow_links) {
95                    let tx = tx.clone();
96                    pool.execute(move || {
97                        if let Ok(size) = entry.metadata().map(|m| m.len()) {
98                            tx.send(size).ok();
99                        }
100                    });
101                }
102            }
103        });
104        Iter { rx }
105    }
106
107    /// Reads files, returns an iterator over the read number of bytes
108    pub fn iter_warm(&self) -> Iter {
109        let (tx, rx) = channel();
110        let dirs = self.dirs.clone();
111        let files = self.files.clone();
112        let num_threads = self.num_threads;
113        let follow_links = self.follow_links;
114        std::thread::spawn(move || {
115            let pool = ThreadPool::new(num_threads);
116            for file in files {
117                let tx = tx.clone();
118                pool.execute(move || {
119                    if let Ok(Some(file)) = resolve_file(file) {
120                        warm_file(file, tx);
121                    }
122                });
123            }
124            for dir in dirs {
125                for entry in walker(dir, follow_links) {
126                    let tx = tx.clone();
127                    pool.execute(move || warm_file(entry.path(), tx));
128                }
129            }
130        });
131        Iter { rx }
132    }
133}
134
135/// Checks if it's a file and resolves a possible simlink
136fn resolve_file(path: PathBuf) -> io::Result<Option<PathBuf>> {
137    if path.is_file() {
138        Ok(Some(path))
139    } else if path.is_symlink() {
140        path.canonicalize().map(Some)
141    } else {
142        Ok(None)
143    }
144}
145
146/// Warms a file
147fn warm_file(path: impl AsRef<Path>, tx: Sender<u64>) {
148    if let Ok(mut file) = File::open(path) {
149        let mut buffer = [0; 1024];
150        loop {
151            let count = file.read(&mut buffer).unwrap_or_default();
152            if count == 0 {
153                break;
154            }
155            tx.send(count as u64).ok();
156        }
157    }
158}
159
160/// Initializes and returns a `walkdir::WalkDir` instance
161fn walker(path: impl AsRef<Path>, follow_links: bool) -> impl Iterator<Item = walkdir::DirEntry> {
162    let mut w = WalkDir::new(path);
163    if follow_links {
164        w = w.follow_links(true);
165    }
166    w.into_iter()
167        .filter_map(|e| e.ok())
168        .filter(|e| e.file_type().is_file())
169}
170
171impl Iterator for Iter {
172    type Item = u64;
173
174    /// Returns estimated / read number of bytes
175    fn next(&mut self) -> Option<Self::Item> {
176        self.rx.recv().ok()
177    }
178}