hermes_core/directories/
mmap.rs1use std::collections::HashMap;
6use std::io;
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use memmap2::Mmap;
13use parking_lot::RwLock;
14
15use super::{Directory, DirectoryWriter, FileSlice, LazyFileHandle, OwnedBytes, RangeReadFn};
16
17pub struct MmapDirectory {
30 root: PathBuf,
31 mmap_cache: Arc<RwLock<HashMap<PathBuf, Arc<Mmap>>>>,
33}
34
35impl MmapDirectory {
36 pub fn new(root: impl AsRef<Path>) -> Self {
38 Self {
39 root: root.as_ref().to_path_buf(),
40 mmap_cache: Arc::new(RwLock::new(HashMap::new())),
41 }
42 }
43
44 fn resolve(&self, path: &Path) -> PathBuf {
45 self.root.join(path)
46 }
47
48 async fn get_mmap(&self, path: &Path) -> io::Result<Arc<Mmap>> {
50 let full_path = self.resolve(path);
51
52 {
54 let cache = self.mmap_cache.read();
55 if let Some(mmap) = cache.get(&full_path) {
56 return Ok(Arc::clone(mmap));
57 }
58 }
59
60 let file = std::fs::File::open(&full_path)?;
62 let mmap = unsafe { Mmap::map(&file)? };
63 let mmap = Arc::new(mmap);
64
65 {
67 let mut cache = self.mmap_cache.write();
68 cache.insert(full_path, Arc::clone(&mmap));
69 }
70
71 Ok(mmap)
72 }
73
74 pub fn clear_cache(&self) {
76 self.mmap_cache.write().clear();
77 }
78
79 fn invalidate_cache(&self, path: &Path) {
81 let full_path = self.resolve(path);
82 self.mmap_cache.write().remove(&full_path);
83 }
84}
85
86impl Clone for MmapDirectory {
87 fn clone(&self) -> Self {
88 Self {
89 root: self.root.clone(),
90 mmap_cache: Arc::clone(&self.mmap_cache),
91 }
92 }
93}
94
95#[async_trait]
96impl Directory for MmapDirectory {
97 async fn exists(&self, path: &Path) -> io::Result<bool> {
98 let full_path = self.resolve(path);
99 Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
100 }
101
102 async fn file_size(&self, path: &Path) -> io::Result<u64> {
103 let full_path = self.resolve(path);
104 let metadata = tokio::fs::metadata(&full_path).await?;
105 Ok(metadata.len())
106 }
107
108 async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
109 let mmap = self.get_mmap(path).await?;
110 let bytes = mmap.to_vec(); Ok(FileSlice::new(OwnedBytes::new(bytes)))
114 }
115
116 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
117 let mmap = self.get_mmap(path).await?;
118 let start = range.start as usize;
119 let end = range.end as usize;
120
121 if end > mmap.len() {
122 return Err(io::Error::new(
123 io::ErrorKind::InvalidInput,
124 format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
125 ));
126 }
127
128 Ok(OwnedBytes::new(mmap[start..end].to_vec()))
129 }
130
131 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
132 let full_path = self.resolve(prefix);
133 let mut entries = tokio::fs::read_dir(&full_path).await?;
134 let mut files = Vec::new();
135
136 while let Some(entry) = entries.next_entry().await? {
137 if entry.file_type().await?.is_file() {
138 files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
139 }
140 }
141
142 Ok(files)
143 }
144
145 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
146 let mmap = self.get_mmap(path).await?;
147 let file_size = mmap.len() as u64;
148
149 let mmap_clone = Arc::clone(&mmap);
151
152 let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
153 let mmap = Arc::clone(&mmap_clone);
154 Box::pin(async move {
155 let start = range.start as usize;
156 let end = range.end as usize;
157
158 if end > mmap.len() {
159 return Err(io::Error::new(
160 io::ErrorKind::InvalidInput,
161 format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
162 ));
163 }
164
165 Ok(OwnedBytes::new(mmap[start..end].to_vec()))
166 })
167 });
168
169 Ok(LazyFileHandle::new(file_size, read_fn))
170 }
171}
172
173#[async_trait]
174impl DirectoryWriter for MmapDirectory {
175 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
176 let full_path = self.resolve(path);
177
178 if let Some(parent) = full_path.parent() {
180 tokio::fs::create_dir_all(parent).await?;
181 }
182
183 self.invalidate_cache(path);
185
186 tokio::fs::write(&full_path, data).await
187 }
188
189 async fn delete(&self, path: &Path) -> io::Result<()> {
190 self.invalidate_cache(path);
192
193 let full_path = self.resolve(path);
194 tokio::fs::remove_file(&full_path).await
195 }
196
197 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
198 self.invalidate_cache(from);
200 self.invalidate_cache(to);
201
202 let from_path = self.resolve(from);
203 let to_path = self.resolve(to);
204 tokio::fs::rename(&from_path, &to_path).await
205 }
206
207 async fn sync(&self) -> io::Result<()> {
208 let dir = std::fs::File::open(&self.root)?;
210 dir.sync_all()?;
211 Ok(())
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use tempfile::TempDir;
219
220 #[tokio::test]
221 async fn test_mmap_directory_basic() {
222 let temp_dir = TempDir::new().unwrap();
223 let dir = MmapDirectory::new(temp_dir.path());
224
225 let test_data = b"Hello, mmap world!";
227 dir.write(Path::new("test.txt"), test_data).await.unwrap();
228
229 assert!(dir.exists(Path::new("test.txt")).await.unwrap());
231 assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
232
233 assert_eq!(
235 dir.file_size(Path::new("test.txt")).await.unwrap(),
236 test_data.len() as u64
237 );
238
239 let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
241 let bytes = slice.read_bytes().await.unwrap();
242 assert_eq!(bytes.as_slice(), test_data);
243
244 let range_bytes = dir.read_range(Path::new("test.txt"), 7..12).await.unwrap();
246 assert_eq!(range_bytes.as_slice(), b"mmap ");
247 }
248
249 #[tokio::test]
250 async fn test_mmap_directory_cache() {
251 let temp_dir = TempDir::new().unwrap();
252 let dir = MmapDirectory::new(temp_dir.path());
253
254 dir.write(Path::new("cached.txt"), b"cached content")
256 .await
257 .unwrap();
258
259 let _ = dir.open_read(Path::new("cached.txt")).await.unwrap();
261 let _ = dir.open_read(Path::new("cached.txt")).await.unwrap();
262
263 assert_eq!(dir.mmap_cache.read().len(), 1);
265
266 dir.clear_cache();
268 assert_eq!(dir.mmap_cache.read().len(), 0);
269 }
270
271 #[tokio::test]
272 async fn test_mmap_directory_lazy_handle() {
273 use crate::directories::AsyncFileRead;
274
275 let temp_dir = TempDir::new().unwrap();
276 let dir = MmapDirectory::new(temp_dir.path());
277
278 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
280 dir.write(Path::new("large.bin"), &data).await.unwrap();
281
282 let handle = dir.open_lazy(Path::new("large.bin")).await.unwrap();
284 assert_eq!(handle.len(), 1000);
285
286 let range1 = handle.read_bytes_range(0..100).await.unwrap();
288 assert_eq!(range1.len(), 100);
289 assert_eq!(range1.as_slice(), &data[0..100]);
290
291 let range2 = handle.read_bytes_range(500..600).await.unwrap();
292 assert_eq!(range2.as_slice(), &data[500..600]);
293 }
294}