rusty_leveldb/
disk_env.rs

1use crate::env::{path_to_str, Env, FileLock, Logger, RandomAccess};
2use crate::env_common::{micros, sleep_for};
3use crate::error::{err, Result, Status, StatusCode};
4use crate::types::{share, Shared};
5use fs2::FileExt;
6
7use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{self, ErrorKind, Read, Write};
10use std::iter::FromIterator;
11use std::path::{Path, PathBuf};
12
13type FileDescriptor = i32;
14
15#[derive(Clone)]
16pub struct PosixDiskEnv {
17    locks: Shared<HashMap<String, File>>,
18}
19
20impl Default for PosixDiskEnv {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl PosixDiskEnv {
27    pub fn new() -> PosixDiskEnv {
28        PosixDiskEnv {
29            locks: share(HashMap::new()),
30        }
31    }
32}
33
34/// map_err_with_name annotates an io::Error with information about the operation and the file.
35fn map_err_with_name(method: &'static str, f: &Path, e: io::Error) -> Status {
36    let mut s = Status::from(e);
37    s.err = format!("{}: {}: {}", method, s.err, path_to_str(f));
38    s
39}
40
41// Note: We're using Ok(f()?) in several locations below in order to benefit from the automatic
42// error conversion using std::convert::From.
43impl Env for PosixDiskEnv {
44    fn open_sequential_file(&self, p: &Path) -> Result<Box<dyn Read>> {
45        Ok(Box::new(
46            fs::OpenOptions::new()
47                .read(true)
48                .open(p)
49                .map_err(|e| map_err_with_name("open (seq)", p, e))?,
50        ))
51    }
52    fn open_random_access_file(&self, p: &Path) -> Result<Box<dyn RandomAccess>> {
53        fs::OpenOptions::new()
54            .read(true)
55            .open(p)
56            .map(|f| {
57                let b: Box<dyn RandomAccess> = Box::new(f);
58                b
59            })
60            .map_err(|e| map_err_with_name("open (randomaccess)", p, e))
61    }
62    fn open_writable_file(&self, p: &Path) -> Result<Box<dyn Write>> {
63        Ok(Box::new(
64            fs::OpenOptions::new()
65                .create(true)
66                .truncate(true)
67                .write(true)
68                .append(false)
69                .open(p)
70                .map_err(|e| map_err_with_name("open (write)", p, e))?,
71        ))
72    }
73    fn open_appendable_file(&self, p: &Path) -> Result<Box<dyn Write>> {
74        Ok(Box::new(
75            fs::OpenOptions::new()
76                .create(true)
77                .append(true)
78                .open(p)
79                .map_err(|e| map_err_with_name("open (append)", p, e))?,
80        ))
81    }
82
83    fn exists(&self, p: &Path) -> Result<bool> {
84        Ok(p.exists())
85    }
86    fn children(&self, p: &Path) -> Result<Vec<PathBuf>> {
87        let dir_reader = fs::read_dir(p).map_err(|e| map_err_with_name("children", p, e))?;
88        let filenames = dir_reader
89            .map(|r| match r {
90                Ok(_) => {
91                    let direntry = r.unwrap();
92                    Path::new(&direntry.file_name()).to_owned()
93                }
94                Err(_) => Path::new("").to_owned(),
95            })
96            .filter(|s| !s.as_os_str().is_empty());
97        Ok(Vec::from_iter(filenames))
98    }
99    fn size_of(&self, p: &Path) -> Result<usize> {
100        let meta = fs::metadata(p).map_err(|e| map_err_with_name("size_of", p, e))?;
101        Ok(meta.len() as usize)
102    }
103
104    fn delete(&self, p: &Path) -> Result<()> {
105        fs::remove_file(p).map_err(|e| map_err_with_name("delete", p, e))
106    }
107    fn mkdir(&self, p: &Path) -> Result<()> {
108        fs::create_dir_all(p).map_err(|e| map_err_with_name("mkdir", p, e))
109    }
110    fn rmdir(&self, p: &Path) -> Result<()> {
111        fs::remove_dir_all(p).map_err(|e| map_err_with_name("rmdir", p, e))
112    }
113    fn rename(&self, old: &Path, new: &Path) -> Result<()> {
114        fs::rename(old, new).map_err(|e| map_err_with_name("rename", old, e))
115    }
116
117    fn lock(&self, p: &Path) -> Result<FileLock> {
118        let mut locks = self.locks.borrow_mut();
119
120        if let std::collections::hash_map::Entry::Vacant(e) =
121            locks.entry(p.to_str().unwrap().to_string())
122        {
123            let f = fs::OpenOptions::new()
124                .write(true)
125                .create(true)
126                .truncate(true)
127                .open(p)
128                .map_err(|e| map_err_with_name("lock", p, e))?;
129
130            match f.try_lock_exclusive() {
131                Err(err) if err.kind() == ErrorKind::WouldBlock => {
132                    return Err(Status::new(
133                        StatusCode::LockError,
134                        "lock on database is already held by different process",
135                    ))
136                }
137                Err(_) => {
138                    return Err(Status::new(
139                        StatusCode::Errno(errno::errno()),
140                        &format!("unknown lock error on file {:?} (file {})", f, p.display()),
141                    ))
142                }
143                _ => (),
144            };
145
146            e.insert(f);
147            let lock = FileLock {
148                id: p.to_str().unwrap().to_string(),
149            };
150            Ok(lock)
151        } else {
152            Err(Status::new(StatusCode::AlreadyExists, "Lock is held"))
153        }
154    }
155    fn unlock(&self, l: FileLock) -> Result<()> {
156        let mut locks = self.locks.borrow_mut();
157        if !locks.contains_key(&l.id) {
158            err(
159                StatusCode::LockError,
160                &format!("unlocking a file that is not locked: {}", l.id),
161            )
162        } else {
163            let f = locks.remove(&l.id).unwrap();
164            if f.unlock().is_err() {
165                return err(StatusCode::LockError, &format!("unlock failed: {}", l.id));
166            }
167            Ok(())
168        }
169    }
170
171    fn new_logger(&self, p: &Path) -> Result<Logger> {
172        self.open_appendable_file(p)
173            .map(|dst| Logger::new(Box::new(dst)))
174    }
175
176    fn micros(&self) -> u64 {
177        micros()
178    }
179
180    fn sleep_for(&self, micros: u32) {
181        sleep_for(micros);
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    use std::convert::AsRef;
190    use std::io::Write;
191    use std::iter::FromIterator;
192
193    #[test]
194    fn test_files() {
195        let n = "testfile.xyz".to_string();
196        let name = n.as_ref();
197        let env = PosixDiskEnv::new();
198
199        // exists, size_of, delete
200        assert!(env.open_appendable_file(name).is_ok());
201        assert!(env.exists(name).unwrap_or(false));
202        assert_eq!(env.size_of(name).unwrap_or(1), 0);
203        assert!(env.delete(name).is_ok());
204
205        assert!(env.open_writable_file(name).is_ok());
206        assert!(env.exists(name).unwrap_or(false));
207        assert_eq!(env.size_of(name).unwrap_or(1), 0);
208        assert!(env.delete(name).is_ok());
209
210        {
211            // write
212            let mut f = env.open_writable_file(name).unwrap();
213            let _ = f.write("123xyz".as_bytes());
214            assert_eq!(6, env.size_of(name).unwrap_or(0));
215
216            // rename
217            let newname = Path::new("testfile2.xyz");
218            assert!(env.rename(name, newname).is_ok());
219            assert_eq!(6, env.size_of(newname).unwrap());
220            assert!(!env.exists(name).unwrap());
221            // rename back so that the remaining tests can use the file.
222            assert!(env.rename(newname, name).is_ok());
223        }
224
225        assert!(env.open_sequential_file(name).is_ok());
226        assert!(env.open_random_access_file(name).is_ok());
227
228        assert!(env.delete(name).is_ok());
229    }
230
231    #[test]
232    fn test_locking() {
233        let env = PosixDiskEnv::new();
234        let n = "testfile.123".to_string();
235        let name = n.as_ref();
236
237        {
238            let mut f = env.open_writable_file(name).unwrap();
239            let _ = f.write("123xyz".as_bytes());
240            assert_eq!(env.size_of(name).unwrap_or(0), 6);
241        }
242
243        {
244            let r = env.lock(name);
245            assert!(r.is_ok());
246            env.unlock(r.unwrap()).unwrap();
247        }
248
249        {
250            let r = env.lock(name);
251            assert!(r.is_ok());
252            let s = env.lock(name);
253            assert!(s.is_err());
254            env.unlock(r.unwrap()).unwrap();
255        }
256
257        assert!(env.delete(name).is_ok());
258    }
259
260    #[test]
261    fn test_dirs() {
262        let d = "subdir/";
263        let dirname = d.as_ref();
264        let env = PosixDiskEnv::new();
265
266        assert!(env.mkdir(dirname).is_ok());
267        assert!(env
268            .open_writable_file(
269                String::from_iter(vec![d.to_string(), "f1.txt".to_string()].into_iter()).as_ref()
270            )
271            .is_ok());
272        assert_eq!(env.children(dirname).unwrap().len(), 1);
273        assert!(env.rmdir(dirname).is_ok());
274    }
275}