Skip to main content

hermes_core/directories/
mmap.rs

1//! Memory-mapped directory for efficient access to large indices
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::io;
6use std::ops::Range;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use memmap2::Mmap;
12
13use super::{
14    Directory, DirectoryWriter, FileSlice, FileStreamingWriter, LazyFileHandle, OwnedBytes,
15    RangeReadFn, StreamingWriter,
16};
17
18/// Memory-mapped directory for efficient access to large index files
19///
20/// Uses memory-mapped files to avoid loading entire files into memory.
21/// The OS manages paging, making this ideal for indices larger than RAM.
22///
23/// Benefits:
24/// - Files are not fully loaded into memory
25/// - OS handles caching and paging automatically
26/// - Multiple processes can share the same mapped pages
27/// - Efficient random access patterns
28///
29/// Note: Write operations still use regular file I/O.
30/// No application-level cache - the OS page cache handles this efficiently.
31pub struct MmapDirectory {
32    root: PathBuf,
33}
34
35impl MmapDirectory {
36    /// Create a new MmapDirectory rooted at the given path
37    pub fn new(root: impl AsRef<Path>) -> Self {
38        Self {
39            root: root.as_ref().to_path_buf(),
40        }
41    }
42
43    fn resolve(&self, path: &Path) -> PathBuf {
44        self.root.join(path)
45    }
46
47    /// Memory-map a file (no application cache - OS page cache handles this)
48    fn mmap_file(&self, path: &Path) -> io::Result<Arc<Mmap>> {
49        let full_path = self.resolve(path);
50        let file = std::fs::File::open(&full_path)?;
51        let mmap = unsafe { Mmap::map(&file)? };
52        Ok(Arc::new(mmap))
53    }
54}
55
56impl Clone for MmapDirectory {
57    fn clone(&self) -> Self {
58        Self {
59            root: self.root.clone(),
60        }
61    }
62}
63
64#[async_trait]
65impl Directory for MmapDirectory {
66    async fn exists(&self, path: &Path) -> io::Result<bool> {
67        let full_path = self.resolve(path);
68        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
69    }
70
71    async fn file_size(&self, path: &Path) -> io::Result<u64> {
72        let full_path = self.resolve(path);
73        let metadata = tokio::fs::metadata(&full_path).await?;
74        Ok(metadata.len())
75    }
76
77    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
78        let mmap = self.mmap_file(path)?;
79        // Copy data - mmap will be dropped after this, OS page cache handles rest
80        let bytes = mmap.to_vec();
81        Ok(FileSlice::new(OwnedBytes::new(bytes)))
82    }
83
84    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
85        let mmap = self.mmap_file(path)?;
86        let start = range.start as usize;
87        let end = range.end as usize;
88
89        if end > mmap.len() {
90            return Err(io::Error::new(
91                io::ErrorKind::InvalidInput,
92                format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
93            ));
94        }
95
96        Ok(OwnedBytes::new(mmap[start..end].to_vec()))
97    }
98
99    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
100        let full_path = self.resolve(prefix);
101        let mut entries = tokio::fs::read_dir(&full_path).await?;
102        let mut files = Vec::new();
103
104        while let Some(entry) = entries.next_entry().await? {
105            if entry.file_type().await?.is_file() {
106                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
107            }
108        }
109
110        Ok(files)
111    }
112
113    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
114        let mmap = self.mmap_file(path)?;
115        let file_size = mmap.len() as u64;
116
117        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
118            let mmap = Arc::clone(&mmap);
119            Box::pin(async move {
120                let start = range.start as usize;
121                let end = range.end as usize;
122
123                if end > mmap.len() {
124                    return Err(io::Error::new(
125                        io::ErrorKind::InvalidInput,
126                        format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
127                    ));
128                }
129
130                // Hint the OS to prefetch these pages before the memcpy
131                #[cfg(unix)]
132                let _ = mmap.advise_range(memmap2::Advice::WillNeed, start, end - start);
133
134                Ok(OwnedBytes::new(mmap[start..end].to_vec()))
135            })
136        });
137
138        Ok(LazyFileHandle::new(file_size, read_fn))
139    }
140}
141
142#[async_trait]
143impl DirectoryWriter for MmapDirectory {
144    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
145        let full_path = self.resolve(path);
146
147        // Ensure parent directory exists
148        if let Some(parent) = full_path.parent() {
149            tokio::fs::create_dir_all(parent).await?;
150        }
151
152        tokio::fs::write(&full_path, data).await
153    }
154
155    async fn delete(&self, path: &Path) -> io::Result<()> {
156        let full_path = self.resolve(path);
157        tokio::fs::remove_file(&full_path).await
158    }
159
160    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
161        let from_path = self.resolve(from);
162        let to_path = self.resolve(to);
163        tokio::fs::rename(&from_path, &to_path).await
164    }
165
166    async fn sync(&self) -> io::Result<()> {
167        // fsync the directory
168        let dir = std::fs::File::open(&self.root)?;
169        dir.sync_all()?;
170        Ok(())
171    }
172
173    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
174        let full_path = self.resolve(path);
175        if let Some(parent) = full_path.parent() {
176            tokio::fs::create_dir_all(parent).await?;
177        }
178        let file = std::fs::File::create(&full_path)?;
179        Ok(Box::new(FileStreamingWriter { file, written: 0 }))
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use tempfile::TempDir;
187
188    #[tokio::test]
189    async fn test_mmap_directory_basic() {
190        let temp_dir = TempDir::new().unwrap();
191        let dir = MmapDirectory::new(temp_dir.path());
192
193        // Write a file
194        let test_data = b"Hello, mmap world!";
195        dir.write(Path::new("test.txt"), test_data).await.unwrap();
196
197        // Check exists
198        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
199        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
200
201        // Check file size
202        assert_eq!(
203            dir.file_size(Path::new("test.txt")).await.unwrap(),
204            test_data.len() as u64
205        );
206
207        // Read full file
208        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
209        let bytes = slice.read_bytes().await.unwrap();
210        assert_eq!(bytes.as_slice(), test_data);
211
212        // Read range
213        let range_bytes = dir.read_range(Path::new("test.txt"), 7..12).await.unwrap();
214        assert_eq!(range_bytes.as_slice(), b"mmap ");
215    }
216
217    #[tokio::test]
218    async fn test_mmap_directory_lazy_handle() {
219        use crate::directories::AsyncFileRead;
220
221        let temp_dir = TempDir::new().unwrap();
222        let dir = MmapDirectory::new(temp_dir.path());
223
224        // Write a larger file
225        let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
226        dir.write(Path::new("large.bin"), &data).await.unwrap();
227
228        // Open lazy handle
229        let handle = dir.open_lazy(Path::new("large.bin")).await.unwrap();
230        assert_eq!(handle.len(), 1000);
231
232        // Read ranges
233        let range1 = handle.read_bytes_range(0..100).await.unwrap();
234        assert_eq!(range1.len(), 100);
235        assert_eq!(range1.as_slice(), &data[0..100]);
236
237        let range2 = handle.read_bytes_range(500..600).await.unwrap();
238        assert_eq!(range2.as_slice(), &data[500..600]);
239    }
240}