Skip to main content

hermes_core/directories/
mmap.rs

1//! Memory-mapped directory for efficient access to large indices
2//!
3//! This module is only compiled with the "native" feature.
4
5use std::collections::HashMap;
6use std::io;
7use std::ops::Range;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use memmap2::Mmap;
13use parking_lot::RwLock;
14
15use super::{Directory, DirectoryWriter, FileSlice, LazyFileHandle, OwnedBytes, RangeReadFn};
16
17/// Memory-mapped directory for efficient access to large index files
18///
19/// Uses memory-mapped files to avoid loading entire files into memory.
20/// The OS manages paging, making this ideal for indices larger than RAM.
21///
22/// Benefits:
23/// - Files are not fully loaded into memory
24/// - OS handles caching and paging automatically
25/// - Multiple processes can share the same mapped pages
26/// - Efficient random access patterns
27///
28/// Note: Write operations still use regular file I/O.
29pub struct MmapDirectory {
30    root: PathBuf,
31    /// Cache of memory-mapped files
32    mmap_cache: Arc<RwLock<HashMap<PathBuf, Arc<Mmap>>>>,
33}
34
35impl MmapDirectory {
36    /// Create a new MmapDirectory rooted at the given path
37    pub fn new(root: impl AsRef<Path>) -> Self {
38        Self {
39            root: root.as_ref().to_path_buf(),
40            mmap_cache: Arc::new(RwLock::new(HashMap::new())),
41        }
42    }
43
44    fn resolve(&self, path: &Path) -> PathBuf {
45        self.root.join(path)
46    }
47
48    /// Get or create a memory-mapped file
49    async fn get_mmap(&self, path: &Path) -> io::Result<Arc<Mmap>> {
50        let full_path = self.resolve(path);
51
52        // Check cache first
53        {
54            let cache = self.mmap_cache.read();
55            if let Some(mmap) = cache.get(&full_path) {
56                return Ok(Arc::clone(mmap));
57            }
58        }
59
60        // Create new mmap
61        let file = std::fs::File::open(&full_path)?;
62        let mmap = unsafe { Mmap::map(&file)? };
63        let mmap = Arc::new(mmap);
64
65        // Cache it
66        {
67            let mut cache = self.mmap_cache.write();
68            cache.insert(full_path, Arc::clone(&mmap));
69        }
70
71        Ok(mmap)
72    }
73
74    /// Clear the mmap cache (useful after writes)
75    pub fn clear_cache(&self) {
76        self.mmap_cache.write().clear();
77    }
78
79    /// Remove a specific file from the cache
80    fn invalidate_cache(&self, path: &Path) {
81        let full_path = self.resolve(path);
82        self.mmap_cache.write().remove(&full_path);
83    }
84}
85
86impl Clone for MmapDirectory {
87    fn clone(&self) -> Self {
88        Self {
89            root: self.root.clone(),
90            mmap_cache: Arc::clone(&self.mmap_cache),
91        }
92    }
93}
94
95#[async_trait]
96impl Directory for MmapDirectory {
97    async fn exists(&self, path: &Path) -> io::Result<bool> {
98        let full_path = self.resolve(path);
99        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
100    }
101
102    async fn file_size(&self, path: &Path) -> io::Result<u64> {
103        let full_path = self.resolve(path);
104        let metadata = tokio::fs::metadata(&full_path).await?;
105        Ok(metadata.len())
106    }
107
108    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
109        let mmap = self.get_mmap(path).await?;
110        // Create OwnedBytes that references the mmap data
111        // The Arc<Mmap> keeps the mapping alive
112        let bytes = mmap.to_vec(); // Copy for now - could optimize with custom type
113        Ok(FileSlice::new(OwnedBytes::new(bytes)))
114    }
115
116    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
117        let mmap = self.get_mmap(path).await?;
118        let start = range.start as usize;
119        let end = range.end as usize;
120
121        if end > mmap.len() {
122            return Err(io::Error::new(
123                io::ErrorKind::InvalidInput,
124                format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
125            ));
126        }
127
128        Ok(OwnedBytes::new(mmap[start..end].to_vec()))
129    }
130
131    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
132        let full_path = self.resolve(prefix);
133        let mut entries = tokio::fs::read_dir(&full_path).await?;
134        let mut files = Vec::new();
135
136        while let Some(entry) = entries.next_entry().await? {
137            if entry.file_type().await?.is_file() {
138                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
139            }
140        }
141
142        Ok(files)
143    }
144
145    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
146        let mmap = self.get_mmap(path).await?;
147        let file_size = mmap.len() as u64;
148
149        // Clone the mmap Arc for the closure
150        let mmap_clone = Arc::clone(&mmap);
151
152        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
153            let mmap = Arc::clone(&mmap_clone);
154            Box::pin(async move {
155                let start = range.start as usize;
156                let end = range.end as usize;
157
158                if end > mmap.len() {
159                    return Err(io::Error::new(
160                        io::ErrorKind::InvalidInput,
161                        format!("Range {}..{} exceeds file size {}", start, end, mmap.len()),
162                    ));
163                }
164
165                Ok(OwnedBytes::new(mmap[start..end].to_vec()))
166            })
167        });
168
169        Ok(LazyFileHandle::new(file_size, read_fn))
170    }
171}
172
173#[async_trait]
174impl DirectoryWriter for MmapDirectory {
175    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
176        let full_path = self.resolve(path);
177
178        // Ensure parent directory exists
179        if let Some(parent) = full_path.parent() {
180            tokio::fs::create_dir_all(parent).await?;
181        }
182
183        // Invalidate cache before writing
184        self.invalidate_cache(path);
185
186        tokio::fs::write(&full_path, data).await
187    }
188
189    async fn delete(&self, path: &Path) -> io::Result<()> {
190        // Invalidate cache before deleting
191        self.invalidate_cache(path);
192
193        let full_path = self.resolve(path);
194        tokio::fs::remove_file(&full_path).await
195    }
196
197    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
198        // Invalidate both paths
199        self.invalidate_cache(from);
200        self.invalidate_cache(to);
201
202        let from_path = self.resolve(from);
203        let to_path = self.resolve(to);
204        tokio::fs::rename(&from_path, &to_path).await
205    }
206
207    async fn sync(&self) -> io::Result<()> {
208        // fsync the directory
209        let dir = std::fs::File::open(&self.root)?;
210        dir.sync_all()?;
211        Ok(())
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use tempfile::TempDir;
219
220    #[tokio::test]
221    async fn test_mmap_directory_basic() {
222        let temp_dir = TempDir::new().unwrap();
223        let dir = MmapDirectory::new(temp_dir.path());
224
225        // Write a file
226        let test_data = b"Hello, mmap world!";
227        dir.write(Path::new("test.txt"), test_data).await.unwrap();
228
229        // Check exists
230        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
231        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
232
233        // Check file size
234        assert_eq!(
235            dir.file_size(Path::new("test.txt")).await.unwrap(),
236            test_data.len() as u64
237        );
238
239        // Read full file
240        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
241        let bytes = slice.read_bytes().await.unwrap();
242        assert_eq!(bytes.as_slice(), test_data);
243
244        // Read range
245        let range_bytes = dir.read_range(Path::new("test.txt"), 7..12).await.unwrap();
246        assert_eq!(range_bytes.as_slice(), b"mmap ");
247    }
248
249    #[tokio::test]
250    async fn test_mmap_directory_cache() {
251        let temp_dir = TempDir::new().unwrap();
252        let dir = MmapDirectory::new(temp_dir.path());
253
254        // Write a file
255        dir.write(Path::new("cached.txt"), b"cached content")
256            .await
257            .unwrap();
258
259        // Read twice - second should use cache
260        let _ = dir.open_read(Path::new("cached.txt")).await.unwrap();
261        let _ = dir.open_read(Path::new("cached.txt")).await.unwrap();
262
263        // Cache should have one entry
264        assert_eq!(dir.mmap_cache.read().len(), 1);
265
266        // Clear cache
267        dir.clear_cache();
268        assert_eq!(dir.mmap_cache.read().len(), 0);
269    }
270
271    #[tokio::test]
272    async fn test_mmap_directory_lazy_handle() {
273        use crate::directories::AsyncFileRead;
274
275        let temp_dir = TempDir::new().unwrap();
276        let dir = MmapDirectory::new(temp_dir.path());
277
278        // Write a larger file
279        let data: Vec<u8> = (0..1000).map(|i| (i % 256) as u8).collect();
280        dir.write(Path::new("large.bin"), &data).await.unwrap();
281
282        // Open lazy handle
283        let handle = dir.open_lazy(Path::new("large.bin")).await.unwrap();
284        assert_eq!(handle.len(), 1000);
285
286        // Read ranges
287        let range1 = handle.read_bytes_range(0..100).await.unwrap();
288        assert_eq!(range1.len(), 100);
289        assert_eq!(range1.as_slice(), &data[0..100]);
290
291        let range2 = handle.read_bytes_range(500..600).await.unwrap();
292        assert_eq!(range2.as_slice(), &data[500..600]);
293    }
294}