hermes_core/directories/
mmap.rs1use std::io;
6use std::ops::Range;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use memmap2::Mmap;
12
13use super::{
14 Directory, DirectoryWriter, FileHandle, FileStreamingWriter, OwnedBytes, StreamingWriter,
15};
16
17pub struct MmapDirectory {
31 root: PathBuf,
32}
33
34impl MmapDirectory {
35 pub fn new(root: impl AsRef<Path>) -> Self {
37 Self {
38 root: root.as_ref().to_path_buf(),
39 }
40 }
41
42 pub fn root(&self) -> &Path {
44 &self.root
45 }
46
47 fn resolve(&self, path: &Path) -> PathBuf {
48 self.root.join(path)
49 }
50
51 fn mmap_file(&self, path: &Path) -> io::Result<Arc<Mmap>> {
53 let full_path = self.resolve(path);
54 let file = std::fs::File::open(&full_path)?;
55 let mmap = unsafe { Mmap::map(&file)? };
56 Ok(Arc::new(mmap))
57 }
58}
59
60impl Clone for MmapDirectory {
61 fn clone(&self) -> Self {
62 Self {
63 root: self.root.clone(),
64 }
65 }
66}
67
68#[async_trait]
69impl Directory for MmapDirectory {
70 async fn exists(&self, path: &Path) -> io::Result<bool> {
71 let full_path = self.resolve(path);
72 Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
73 }
74
75 async fn file_size(&self, path: &Path) -> io::Result<u64> {
76 let full_path = self.resolve(path);
77 let metadata = tokio::fs::metadata(&full_path).await?;
78 Ok(metadata.len())
79 }
80
81 async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
82 let mmap = self.mmap_file(path)?;
83 Ok(FileHandle::from_bytes(OwnedBytes::from_mmap(mmap)))
85 }
86
87 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
88 let mmap = self.mmap_file(path)?;
89 let start = range.start as usize;
90 let end = range.end as usize;
91
92 if end > mmap.len() {
93 return Err(io::Error::new(
94 io::ErrorKind::InvalidInput,
95 format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
96 ));
97 }
98
99 Ok(OwnedBytes::from_mmap_range(mmap, start..end))
101 }
102
103 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
104 let full_path = self.resolve(prefix);
105 let mut entries = tokio::fs::read_dir(&full_path).await?;
106 let mut files = Vec::new();
107
108 while let Some(entry) = entries.next_entry().await? {
109 if entry.file_type().await?.is_file() {
110 files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
111 }
112 }
113
114 Ok(files)
115 }
116
117 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
118 self.open_read(path).await
121 }
122}
123
124#[async_trait]
125impl DirectoryWriter for MmapDirectory {
126 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
127 let full_path = self.resolve(path);
128
129 if let Some(parent) = full_path.parent() {
131 tokio::fs::create_dir_all(parent).await?;
132 }
133
134 tokio::fs::write(&full_path, data).await
135 }
136
137 async fn delete(&self, path: &Path) -> io::Result<()> {
138 let full_path = self.resolve(path);
139 tokio::fs::remove_file(&full_path).await
140 }
141
142 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
143 let from_path = self.resolve(from);
144 let to_path = self.resolve(to);
145 tokio::fs::rename(&from_path, &to_path).await
146 }
147
148 async fn sync(&self) -> io::Result<()> {
149 let dir = std::fs::File::open(&self.root)?;
151 dir.sync_all()?;
152 Ok(())
153 }
154
155 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
156 let full_path = self.resolve(path);
157 if let Some(parent) = full_path.parent() {
158 tokio::fs::create_dir_all(parent).await?;
159 }
160 let file = std::fs::File::create(&full_path)?;
161 Ok(Box::new(FileStreamingWriter::new(file)))
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use tempfile::TempDir;
169
170 #[tokio::test]
171 async fn test_mmap_directory_basic() {
172 let temp_dir = TempDir::new().unwrap();
173 let dir = MmapDirectory::new(temp_dir.path());
174
175 let test_data = b"Hello, mmap world!";
177 dir.write(Path::new("test.txt"), test_data).await.unwrap();
178
179 assert!(dir.exists(Path::new("test.txt")).await.unwrap());
181 assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
182
183 assert_eq!(
185 dir.file_size(Path::new("test.txt")).await.unwrap(),
186 test_data.len() as u64
187 );
188
189 let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
191 let bytes = slice.read_bytes().await.unwrap();
192 assert_eq!(bytes.as_slice(), test_data);
193
194 let range_bytes = dir.read_range(Path::new("test.txt"), 7..12).await.unwrap();
196 assert_eq!(range_bytes.as_slice(), b"mmap ");
197 }
198
199 #[tokio::test]
200 async fn test_mmap_directory_lazy_handle() {
201 let temp_dir = TempDir::new().unwrap();
202 let dir = MmapDirectory::new(temp_dir.path());
203
204 let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
206 dir.write(Path::new("large.bin"), &data).await.unwrap();
207
208 let handle = dir.open_lazy(Path::new("large.bin")).await.unwrap();
210 assert_eq!(handle.len(), 1000);
211 assert!(handle.is_sync());
212
213 let range1 = handle.read_bytes_range(0..100).await.unwrap();
215 assert_eq!(range1.len(), 100);
216 assert_eq!(range1.as_slice(), &data[0..100]);
217
218 let range2 = handle.read_bytes_range_sync(500..600).unwrap();
220 assert_eq!(range2.as_slice(), &data[500..600]);
221 }
222}