hermes_core/directories/
mmap.rs1use std::io;
6use std::ops::Range;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use memmap2::Mmap;
12
13use super::{
14 Directory, DirectoryWriter, FileSlice, FileStreamingWriter, LazyFileHandle, OwnedBytes,
15 RangeReadFn, StreamingWriter,
16};
17
18pub struct MmapDirectory {
32 root: PathBuf,
33}
34
35impl MmapDirectory {
36 pub fn new(root: impl AsRef<Path>) -> Self {
38 Self {
39 root: root.as_ref().to_path_buf(),
40 }
41 }
42
43 fn resolve(&self, path: &Path) -> PathBuf {
44 self.root.join(path)
45 }
46
47 fn mmap_file(&self, path: &Path) -> io::Result<Arc<Mmap>> {
49 let full_path = self.resolve(path);
50 let file = std::fs::File::open(&full_path)?;
51 let mmap = unsafe { Mmap::map(&file)? };
52 Ok(Arc::new(mmap))
53 }
54}
55
56impl Clone for MmapDirectory {
57 fn clone(&self) -> Self {
58 Self {
59 root: self.root.clone(),
60 }
61 }
62}
63
64#[async_trait]
65impl Directory for MmapDirectory {
66 async fn exists(&self, path: &Path) -> io::Result<bool> {
67 let full_path = self.resolve(path);
68 Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
69 }
70
71 async fn file_size(&self, path: &Path) -> io::Result<u64> {
72 let full_path = self.resolve(path);
73 let metadata = tokio::fs::metadata(&full_path).await?;
74 Ok(metadata.len())
75 }
76
77 async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
78 let mmap = self.mmap_file(path)?;
79 let bytes = mmap.to_vec();
81 Ok(FileSlice::new(OwnedBytes::new(bytes)))
82 }
83
84 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
85 let mmap = self.mmap_file(path)?;
86 let start = range.start as usize;
87 let end = range.end as usize;
88
89 if end > mmap.len() {
90 return Err(io::Error::new(
91 io::ErrorKind::InvalidInput,
92 format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
93 ));
94 }
95
96 Ok(OwnedBytes::new(mmap[start..end].to_vec()))
97 }
98
99 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
100 let full_path = self.resolve(prefix);
101 let mut entries = tokio::fs::read_dir(&full_path).await?;
102 let mut files = Vec::new();
103
104 while let Some(entry) = entries.next_entry().await? {
105 if entry.file_type().await?.is_file() {
106 files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
107 }
108 }
109
110 Ok(files)
111 }
112
113 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
114 let mmap = self.mmap_file(path)?;
115 let file_size = mmap.len() as u64;
116
117 let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
118 let mmap = Arc::clone(&mmap);
119 Box::pin(async move {
120 let start = range.start as usize;
121 let end = range.end as usize;
122
123 if end > mmap.len() {
124 return Err(io::Error::new(
125 io::ErrorKind::InvalidInput,
126 format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
127 ));
128 }
129
130 #[cfg(unix)]
132 let _ = mmap.advise_range(memmap2::Advice::WillNeed, start, end - start);
133
134 Ok(OwnedBytes::new(mmap[start..end].to_vec()))
135 })
136 });
137
138 Ok(LazyFileHandle::new(file_size, read_fn))
139 }
140}
141
142#[async_trait]
143impl DirectoryWriter for MmapDirectory {
144 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
145 let full_path = self.resolve(path);
146
147 if let Some(parent) = full_path.parent() {
149 tokio::fs::create_dir_all(parent).await?;
150 }
151
152 tokio::fs::write(&full_path, data).await
153 }
154
155 async fn delete(&self, path: &Path) -> io::Result<()> {
156 let full_path = self.resolve(path);
157 tokio::fs::remove_file(&full_path).await
158 }
159
160 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
161 let from_path = self.resolve(from);
162 let to_path = self.resolve(to);
163 tokio::fs::rename(&from_path, &to_path).await
164 }
165
166 async fn sync(&self) -> io::Result<()> {
167 let dir = std::fs::File::open(&self.root)?;
169 dir.sync_all()?;
170 Ok(())
171 }
172
173 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
174 let full_path = self.resolve(path);
175 if let Some(parent) = full_path.parent() {
176 tokio::fs::create_dir_all(parent).await?;
177 }
178 let file = std::fs::File::create(&full_path)?;
179 Ok(Box::new(FileStreamingWriter { file, written: 0 }))
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use tempfile::TempDir;
187
188 #[tokio::test]
189 async fn test_mmap_directory_basic() {
190 let temp_dir = TempDir::new().unwrap();
191 let dir = MmapDirectory::new(temp_dir.path());
192
193 let test_data = b"Hello, mmap world!";
195 dir.write(Path::new("test.txt"), test_data).await.unwrap();
196
197 assert!(dir.exists(Path::new("test.txt")).await.unwrap());
199 assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
200
201 assert_eq!(
203 dir.file_size(Path::new("test.txt")).await.unwrap(),
204 test_data.len() as u64
205 );
206
207 let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
209 let bytes = slice.read_bytes().await.unwrap();
210 assert_eq!(bytes.as_slice(), test_data);
211
212 let range_bytes = dir.read_range(Path::new("test.txt"), 7..12).await.unwrap();
214 assert_eq!(range_bytes.as_slice(), b"mmap ");
215 }
216
217 #[tokio::test]
218 async fn test_mmap_directory_lazy_handle() {
219 use crate::directories::AsyncFileRead;
220
221 let temp_dir = TempDir::new().unwrap();
222 let dir = MmapDirectory::new(temp_dir.path());
223
224 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
226 dir.write(Path::new("large.bin"), &data).await.unwrap();
227
228 let handle = dir.open_lazy(Path::new("large.bin")).await.unwrap();
230 assert_eq!(handle.len(), 1000);
231
232 let range1 = handle.read_bytes_range(0..100).await.unwrap();
234 assert_eq!(range1.len(), 100);
235 assert_eq!(range1.as_slice(), &data[0..100]);
236
237 let range2 = handle.read_bytes_range(500..600).await.unwrap();
238 assert_eq!(range2.as_slice(), &data[500..600]);
239 }
240}