Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// Callback type for lazy range reading
15#[cfg(not(target_arch = "wasm32"))]
16pub type RangeReadFn = Arc<
17    dyn Fn(
18            Range<u64>,
19        )
20            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
21        + Send
22        + Sync,
23>;
24
25#[cfg(target_arch = "wasm32")]
26pub type RangeReadFn = Arc<
27    dyn Fn(
28        Range<u64>,
29    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
30>;
31
32/// Unified file handle for both inline (mmap/RAM) and lazy (HTTP/filesystem) access.
33///
34/// Replaces the previous `FileSlice`, `LazyFileHandle`, and `LazyFileSlice` types.
35/// - **Inline**: data is available synchronously (mmap, RAM). Sync reads via `read_bytes_range_sync`.
36/// - **Lazy**: data is fetched on-demand via async callback (HTTP, filesystem).
37///
38/// Use `.slice()` to create sub-range views (zero-copy for Inline, offset-adjusted for Lazy).
39#[derive(Clone)]
40pub struct FileHandle {
41    inner: FileHandleInner,
42}
43
44#[derive(Clone)]
45enum FileHandleInner {
46    /// Data available inline — sync reads possible (mmap, RAM)
47    Inline {
48        data: OwnedBytes,
49        offset: u64,
50        len: u64,
51    },
52    /// Data fetched on-demand via async callback (HTTP, filesystem)
53    Lazy {
54        read_fn: RangeReadFn,
55        offset: u64,
56        len: u64,
57    },
58}
59
60impl std::fmt::Debug for FileHandle {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match &self.inner {
63            FileHandleInner::Inline { len, offset, .. } => f
64                .debug_struct("FileHandle::Inline")
65                .field("offset", offset)
66                .field("len", len)
67                .finish(),
68            FileHandleInner::Lazy { len, offset, .. } => f
69                .debug_struct("FileHandle::Lazy")
70                .field("offset", offset)
71                .field("len", len)
72                .finish(),
73        }
74    }
75}
76
77impl FileHandle {
78    /// Create an inline file handle from owned bytes (mmap, RAM).
79    /// Sync reads are available.
80    pub fn from_bytes(data: OwnedBytes) -> Self {
81        let len = data.len() as u64;
82        Self {
83            inner: FileHandleInner::Inline {
84                data,
85                offset: 0,
86                len,
87            },
88        }
89    }
90
91    /// Create an empty file handle.
92    pub fn empty() -> Self {
93        Self::from_bytes(OwnedBytes::empty())
94    }
95
96    /// Create a lazy file handle from an async range-read callback.
97    /// Only async reads are available.
98    pub fn lazy(len: u64, read_fn: RangeReadFn) -> Self {
99        Self {
100            inner: FileHandleInner::Lazy {
101                read_fn,
102                offset: 0,
103                len,
104            },
105        }
106    }
107
108    /// Total length in bytes.
109    #[inline]
110    pub fn len(&self) -> u64 {
111        match &self.inner {
112            FileHandleInner::Inline { len, .. } => *len,
113            FileHandleInner::Lazy { len, .. } => *len,
114        }
115    }
116
117    /// Check if empty.
118    #[inline]
119    pub fn is_empty(&self) -> bool {
120        self.len() == 0
121    }
122
123    /// Whether synchronous reads are available (inline/mmap data).
124    #[inline]
125    pub fn is_sync(&self) -> bool {
126        matches!(&self.inner, FileHandleInner::Inline { .. })
127    }
128
129    /// Create a sub-range view. Zero-copy for Inline, offset-adjusted for Lazy.
130    pub fn slice(&self, range: Range<u64>) -> Self {
131        match &self.inner {
132            FileHandleInner::Inline { data, offset, len } => {
133                let new_offset = offset + range.start;
134                let new_len = range.end - range.start;
135                debug_assert!(
136                    new_offset + new_len <= offset + len,
137                    "slice out of bounds: {}+{} > {}+{}",
138                    new_offset,
139                    new_len,
140                    offset,
141                    len
142                );
143                Self {
144                    inner: FileHandleInner::Inline {
145                        data: data.clone(),
146                        offset: new_offset,
147                        len: new_len,
148                    },
149                }
150            }
151            FileHandleInner::Lazy {
152                read_fn,
153                offset,
154                len,
155            } => {
156                let new_offset = offset + range.start;
157                let new_len = range.end - range.start;
158                debug_assert!(
159                    new_offset + new_len <= offset + len,
160                    "slice out of bounds: {}+{} > {}+{}",
161                    new_offset,
162                    new_len,
163                    offset,
164                    len
165                );
166                Self {
167                    inner: FileHandleInner::Lazy {
168                        read_fn: Arc::clone(read_fn),
169                        offset: new_offset,
170                        len: new_len,
171                    },
172                }
173            }
174        }
175    }
176
177    /// Async range read — works for both Inline and Lazy.
178    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
179        match &self.inner {
180            FileHandleInner::Inline { data, offset, len } => {
181                if range.end > *len {
182                    return Err(io::Error::new(
183                        io::ErrorKind::InvalidInput,
184                        format!("Range {:?} out of bounds (len: {})", range, len),
185                    ));
186                }
187                let start = (*offset + range.start) as usize;
188                let end = (*offset + range.end) as usize;
189                Ok(data.slice(start..end))
190            }
191            FileHandleInner::Lazy {
192                read_fn,
193                offset,
194                len,
195            } => {
196                if range.end > *len {
197                    return Err(io::Error::new(
198                        io::ErrorKind::InvalidInput,
199                        format!("Range {:?} out of bounds (len: {})", range, len),
200                    ));
201                }
202                let abs_start = offset + range.start;
203                let abs_end = offset + range.end;
204                (read_fn)(abs_start..abs_end).await
205            }
206        }
207    }
208
209    /// Read all bytes.
210    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
211        self.read_bytes_range(0..self.len()).await
212    }
213
214    /// Synchronous range read — only works for Inline handles.
215    /// Returns `Err` if the handle is Lazy.
216    #[inline]
217    pub fn read_bytes_range_sync(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
218        match &self.inner {
219            FileHandleInner::Inline { data, offset, len } => {
220                if range.end > *len {
221                    return Err(io::Error::new(
222                        io::ErrorKind::InvalidInput,
223                        format!("Range {:?} out of bounds (len: {})", range, len),
224                    ));
225                }
226                let start = (*offset + range.start) as usize;
227                let end = (*offset + range.end) as usize;
228                Ok(data.slice(start..end))
229            }
230            FileHandleInner::Lazy { .. } => Err(io::Error::new(
231                io::ErrorKind::Unsupported,
232                "Synchronous read not available on lazy file handle",
233            )),
234        }
235    }
236
237    /// Synchronous read of all bytes — only works for Inline handles.
238    #[inline]
239    pub fn read_bytes_sync(&self) -> io::Result<OwnedBytes> {
240        self.read_bytes_range_sync(0..self.len())
241    }
242}
243
244/// Backing store for OwnedBytes — supports both heap Vec and mmap.
245#[derive(Clone)]
246enum SharedBytes {
247    Vec(Arc<Vec<u8>>),
248    #[cfg(feature = "native")]
249    Mmap(Arc<memmap2::Mmap>),
250}
251
252impl SharedBytes {
253    #[inline]
254    fn as_bytes(&self) -> &[u8] {
255        match self {
256            SharedBytes::Vec(v) => v.as_slice(),
257            #[cfg(feature = "native")]
258            SharedBytes::Mmap(m) => m.as_ref(),
259        }
260    }
261}
262
263impl std::fmt::Debug for SharedBytes {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        match self {
266            SharedBytes::Vec(v) => write!(f, "Vec(len={})", v.len()),
267            #[cfg(feature = "native")]
268            SharedBytes::Mmap(m) => write!(f, "Mmap(len={})", m.len()),
269        }
270    }
271}
272
273/// Owned bytes with cheap cloning (Arc-backed)
274///
275/// Supports two backing stores:
276/// - `Vec<u8>` for owned data (RamDirectory, FsDirectory, decompressed blocks)
277/// - `Mmap` for zero-copy memory-mapped files (MmapDirectory, native only)
278#[derive(Debug, Clone)]
279pub struct OwnedBytes {
280    data: SharedBytes,
281    range: Range<usize>,
282}
283
284impl OwnedBytes {
285    pub fn new(data: Vec<u8>) -> Self {
286        let len = data.len();
287        Self {
288            data: SharedBytes::Vec(Arc::new(data)),
289            range: 0..len,
290        }
291    }
292
293    pub fn empty() -> Self {
294        Self {
295            data: SharedBytes::Vec(Arc::new(Vec::new())),
296            range: 0..0,
297        }
298    }
299
300    /// Create from a pre-existing Arc<Vec<u8>> with a sub-range.
301    /// Used by RamDirectory and CachingDirectory to share data without copying.
302    pub(crate) fn from_arc_vec(data: Arc<Vec<u8>>, range: Range<usize>) -> Self {
303        Self {
304            data: SharedBytes::Vec(data),
305            range,
306        }
307    }
308
309    /// Create from a memory-mapped file (zero-copy).
310    #[cfg(feature = "native")]
311    pub(crate) fn from_mmap(mmap: Arc<memmap2::Mmap>) -> Self {
312        let len = mmap.len();
313        Self {
314            data: SharedBytes::Mmap(mmap),
315            range: 0..len,
316        }
317    }
318
319    /// Create from a memory-mapped file with a sub-range (zero-copy).
320    #[cfg(feature = "native")]
321    pub(crate) fn from_mmap_range(mmap: Arc<memmap2::Mmap>, range: Range<usize>) -> Self {
322        Self {
323            data: SharedBytes::Mmap(mmap),
324            range,
325        }
326    }
327
328    pub fn len(&self) -> usize {
329        self.range.len()
330    }
331
332    pub fn is_empty(&self) -> bool {
333        self.range.is_empty()
334    }
335
336    pub fn slice(&self, range: Range<usize>) -> Self {
337        let start = self.range.start + range.start;
338        let end = self.range.start + range.end;
339        Self {
340            data: self.data.clone(),
341            range: start..end,
342        }
343    }
344
345    pub fn as_slice(&self) -> &[u8] {
346        &self.data.as_bytes()[self.range.clone()]
347    }
348
349    /// Returns `true` if the backing store is a memory-mapped file.
350    ///
351    /// Used to guard `madvise` calls: `MADV_DONTNEED` on heap memory
352    /// zeroes pages on Linux and corrupts allocator metadata.
353    #[cfg(feature = "native")]
354    #[inline]
355    pub fn is_mmap(&self) -> bool {
356        matches!(self.data, SharedBytes::Mmap(_))
357    }
358
359    pub fn to_vec(&self) -> Vec<u8> {
360        self.as_slice().to_vec()
361    }
362}
363
364impl AsRef<[u8]> for OwnedBytes {
365    fn as_ref(&self) -> &[u8] {
366        self.as_slice()
367    }
368}
369
370impl std::ops::Deref for OwnedBytes {
371    type Target = [u8];
372
373    fn deref(&self) -> &Self::Target {
374        self.as_slice()
375    }
376}
377
378/// Async directory trait for reading index files
379#[cfg(not(target_arch = "wasm32"))]
380#[async_trait]
381pub trait Directory: Send + Sync + 'static {
382    /// Check if a file exists
383    async fn exists(&self, path: &Path) -> io::Result<bool>;
384
385    /// Get file size
386    async fn file_size(&self, path: &Path) -> io::Result<u64>;
387
388    /// Open a file for reading (loads entire file into an inline FileHandle)
389    async fn open_read(&self, path: &Path) -> io::Result<FileHandle>;
390
391    /// Read a specific byte range from a file (optimized for network)
392    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
393
394    /// List files in directory
395    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
396
397    /// Open a file handle that fetches ranges on demand.
398    /// For mmap directories this returns an Inline handle (sync-capable).
399    /// For HTTP/filesystem directories this returns a Lazy handle.
400    async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle>;
401}
402
403/// Async directory trait for reading index files (WASM version - no Send requirement)
404#[cfg(target_arch = "wasm32")]
405#[async_trait(?Send)]
406pub trait Directory: 'static {
407    /// Check if a file exists
408    async fn exists(&self, path: &Path) -> io::Result<bool>;
409
410    /// Get file size
411    async fn file_size(&self, path: &Path) -> io::Result<u64>;
412
413    /// Open a file for reading (loads entire file into an inline FileHandle)
414    async fn open_read(&self, path: &Path) -> io::Result<FileHandle>;
415
416    /// Read a specific byte range from a file (optimized for network)
417    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
418
419    /// List files in directory
420    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
421
422    /// Open a file handle that fetches ranges on demand.
423    async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle>;
424}
425
426/// A writer for incrementally writing data to a directory file.
427///
428/// Avoids buffering entire files in memory during merge. File-backed
429/// directories write directly to disk; memory directories collect to Vec.
430pub trait StreamingWriter: io::Write + Send {
431    /// Finalize the write, making data available for reading.
432    fn finish(self: Box<Self>) -> io::Result<()>;
433
434    /// Bytes written so far.
435    fn bytes_written(&self) -> u64;
436}
437
438/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
439/// Used as default/fallback and for RamDirectory.
440struct BufferedStreamingWriter {
441    path: PathBuf,
442    buffer: Vec<u8>,
443    /// Callback to write the buffer to the directory on finish.
444    /// We store the files Arc directly for RamDirectory.
445    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
446}
447
448impl io::Write for BufferedStreamingWriter {
449    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
450        self.buffer.extend_from_slice(buf);
451        Ok(buf.len())
452    }
453
454    fn flush(&mut self) -> io::Result<()> {
455        Ok(())
456    }
457}
458
459impl StreamingWriter for BufferedStreamingWriter {
460    fn finish(self: Box<Self>) -> io::Result<()> {
461        self.files.write().insert(self.path, Arc::new(self.buffer));
462        Ok(())
463    }
464
465    fn bytes_written(&self) -> u64 {
466        self.buffer.len() as u64
467    }
468}
469
470/// Buffer size for FileStreamingWriter (8 MB).
471/// Large enough to coalesce millions of tiny writes (e.g. per-vector doc_id writes)
472/// into efficient sequential I/O.
473#[cfg(feature = "native")]
474const FILE_STREAMING_BUF_SIZE: usize = 8 * 1024 * 1024;
475
476/// StreamingWriter backed by a buffered std::fs::File for filesystem directories.
477#[cfg(feature = "native")]
478pub(crate) struct FileStreamingWriter {
479    pub(crate) file: io::BufWriter<std::fs::File>,
480    pub(crate) written: u64,
481}
482
483#[cfg(feature = "native")]
484impl FileStreamingWriter {
485    pub(crate) fn new(file: std::fs::File) -> Self {
486        Self {
487            file: io::BufWriter::with_capacity(FILE_STREAMING_BUF_SIZE, file),
488            written: 0,
489        }
490    }
491}
492
493#[cfg(feature = "native")]
494impl io::Write for FileStreamingWriter {
495    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
496        let n = self.file.write(buf)?;
497        self.written += n as u64;
498        Ok(n)
499    }
500
501    fn flush(&mut self) -> io::Result<()> {
502        self.file.flush()
503    }
504}
505
506#[cfg(feature = "native")]
507impl StreamingWriter for FileStreamingWriter {
508    fn finish(self: Box<Self>) -> io::Result<()> {
509        let file = self.file.into_inner().map_err(|e| e.into_error())?;
510        file.sync_all()?;
511        Ok(())
512    }
513
514    fn bytes_written(&self) -> u64 {
515        self.written
516    }
517}
518
519/// Async directory trait for writing index files
520#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
521#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
522pub trait DirectoryWriter: Directory {
523    /// Create/overwrite a file with data
524    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
525
526    /// Delete a file
527    async fn delete(&self, path: &Path) -> io::Result<()>;
528
529    /// Atomic rename
530    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
531
532    /// Sync all pending writes
533    async fn sync(&self) -> io::Result<()>;
534
535    /// Create a streaming writer for incremental file writes.
536    /// Call finish() on the returned writer to finalize.
537    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
538}
539
540/// In-memory directory for testing and small indexes
541#[derive(Debug, Default)]
542pub struct RamDirectory {
543    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
544}
545
546impl Clone for RamDirectory {
547    fn clone(&self) -> Self {
548        Self {
549            files: Arc::clone(&self.files),
550        }
551    }
552}
553
554impl RamDirectory {
555    pub fn new() -> Self {
556        Self::default()
557    }
558
559    /// Synchronous file listing (for serialization).
560    pub fn list_files_sync(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
561        let files = self.files.read();
562        Ok(files
563            .keys()
564            .filter(|p| p.starts_with(prefix))
565            .cloned()
566            .collect())
567    }
568
569    /// Synchronous file read (for serialization).
570    pub fn read_file_sync(&self, path: &Path) -> io::Result<Vec<u8>> {
571        let files = self.files.read();
572        files
573            .get(path)
574            .map(|data| data.as_ref().clone())
575            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
576    }
577
578    /// Synchronous file write (for deserialization).
579    pub fn write_sync(&self, path: &Path, data: &[u8]) -> io::Result<()> {
580        self.files
581            .write()
582            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
583        Ok(())
584    }
585}
586
587#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
588#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
589impl Directory for RamDirectory {
590    async fn exists(&self, path: &Path) -> io::Result<bool> {
591        Ok(self.files.read().contains_key(path))
592    }
593
594    async fn file_size(&self, path: &Path) -> io::Result<u64> {
595        self.files
596            .read()
597            .get(path)
598            .map(|data| data.len() as u64)
599            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
600    }
601
602    async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
603        let files = self.files.read();
604        let data = files
605            .get(path)
606            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
607
608        Ok(FileHandle::from_bytes(OwnedBytes::from_arc_vec(
609            Arc::clone(data),
610            0..data.len(),
611        )))
612    }
613
614    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
615        let files = self.files.read();
616        let data = files
617            .get(path)
618            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
619
620        let start = range.start as usize;
621        let end = range.end as usize;
622
623        if end > data.len() {
624            return Err(io::Error::new(
625                io::ErrorKind::InvalidInput,
626                "Range out of bounds",
627            ));
628        }
629
630        Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
631    }
632
633    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
634        let files = self.files.read();
635        Ok(files
636            .keys()
637            .filter(|p| p.starts_with(prefix))
638            .cloned()
639            .collect())
640    }
641
642    async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
643        // RAM data is always available synchronously — return Inline handle
644        self.open_read(path).await
645    }
646}
647
648#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
649#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
650impl DirectoryWriter for RamDirectory {
651    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
652        self.files
653            .write()
654            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
655        Ok(())
656    }
657
658    async fn delete(&self, path: &Path) -> io::Result<()> {
659        self.files.write().remove(path);
660        Ok(())
661    }
662
663    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
664        let mut files = self.files.write();
665        if let Some(data) = files.remove(from) {
666            files.insert(to.to_path_buf(), data);
667        }
668        Ok(())
669    }
670
671    async fn sync(&self) -> io::Result<()> {
672        Ok(())
673    }
674
675    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
676        Ok(Box::new(BufferedStreamingWriter {
677            path: path.to_path_buf(),
678            buffer: Vec::new(),
679            files: Arc::clone(&self.files),
680        }))
681    }
682}
683
684/// Local filesystem directory with async IO via tokio
685#[cfg(feature = "native")]
686#[derive(Debug, Clone)]
687pub struct FsDirectory {
688    root: PathBuf,
689}
690
691#[cfg(feature = "native")]
692impl FsDirectory {
693    pub fn new(root: impl AsRef<Path>) -> Self {
694        Self {
695            root: root.as_ref().to_path_buf(),
696        }
697    }
698
699    fn resolve(&self, path: &Path) -> PathBuf {
700        self.root.join(path)
701    }
702}
703
704#[cfg(feature = "native")]
705#[async_trait]
706impl Directory for FsDirectory {
707    async fn exists(&self, path: &Path) -> io::Result<bool> {
708        let full_path = self.resolve(path);
709        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
710    }
711
712    async fn file_size(&self, path: &Path) -> io::Result<u64> {
713        let full_path = self.resolve(path);
714        let metadata = tokio::fs::metadata(&full_path).await?;
715        Ok(metadata.len())
716    }
717
718    async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
719        let full_path = self.resolve(path);
720        let data = tokio::fs::read(&full_path).await?;
721        Ok(FileHandle::from_bytes(OwnedBytes::new(data)))
722    }
723
724    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
725        use tokio::io::{AsyncReadExt, AsyncSeekExt};
726
727        let full_path = self.resolve(path);
728        let mut file = tokio::fs::File::open(&full_path).await?;
729
730        file.seek(std::io::SeekFrom::Start(range.start)).await?;
731
732        let len = (range.end - range.start) as usize;
733        let mut buffer = vec![0u8; len];
734        file.read_exact(&mut buffer).await?;
735
736        Ok(OwnedBytes::new(buffer))
737    }
738
739    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
740        let full_path = self.resolve(prefix);
741        let mut entries = tokio::fs::read_dir(&full_path).await?;
742        let mut files = Vec::new();
743
744        while let Some(entry) = entries.next_entry().await? {
745            if entry.file_type().await?.is_file() {
746                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
747            }
748        }
749
750        Ok(files)
751    }
752
753    async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
754        let full_path = self.resolve(path);
755        let metadata = tokio::fs::metadata(&full_path).await?;
756        let file_size = metadata.len();
757
758        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
759            let full_path = full_path.clone();
760            Box::pin(async move {
761                use tokio::io::{AsyncReadExt, AsyncSeekExt};
762
763                let mut file = tokio::fs::File::open(&full_path).await?;
764                file.seek(std::io::SeekFrom::Start(range.start)).await?;
765
766                let len = (range.end - range.start) as usize;
767                let mut buffer = vec![0u8; len];
768                file.read_exact(&mut buffer).await?;
769
770                Ok(OwnedBytes::new(buffer))
771            })
772        });
773
774        Ok(FileHandle::lazy(file_size, read_fn))
775    }
776}
777
778#[cfg(feature = "native")]
779#[async_trait]
780impl DirectoryWriter for FsDirectory {
781    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
782        let full_path = self.resolve(path);
783
784        // Ensure parent directory exists
785        if let Some(parent) = full_path.parent() {
786            tokio::fs::create_dir_all(parent).await?;
787        }
788
789        tokio::fs::write(&full_path, data).await
790    }
791
792    async fn delete(&self, path: &Path) -> io::Result<()> {
793        let full_path = self.resolve(path);
794        tokio::fs::remove_file(&full_path).await
795    }
796
797    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
798        let from_path = self.resolve(from);
799        let to_path = self.resolve(to);
800        tokio::fs::rename(&from_path, &to_path).await
801    }
802
803    async fn sync(&self) -> io::Result<()> {
804        // fsync the directory
805        let dir = std::fs::File::open(&self.root)?;
806        dir.sync_all()?;
807        Ok(())
808    }
809
810    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
811        let full_path = self.resolve(path);
812        if let Some(parent) = full_path.parent() {
813            tokio::fs::create_dir_all(parent).await?;
814        }
815        let file = std::fs::File::create(&full_path)?;
816        Ok(Box::new(FileStreamingWriter::new(file)))
817    }
818}
819
820/// Caching wrapper for any Directory - caches file reads
821pub struct CachingDirectory<D: Directory> {
822    inner: D,
823    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
824    max_cached_bytes: usize,
825    current_bytes: RwLock<usize>,
826}
827
828impl<D: Directory> CachingDirectory<D> {
829    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
830        Self {
831            inner,
832            cache: RwLock::new(HashMap::new()),
833            max_cached_bytes,
834            current_bytes: RwLock::new(0),
835        }
836    }
837
838    fn try_cache(&self, path: &Path, data: &[u8]) {
839        let mut current = self.current_bytes.write();
840        if *current + data.len() <= self.max_cached_bytes {
841            self.cache
842                .write()
843                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
844            *current += data.len();
845        }
846    }
847}
848
849#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
850#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
851impl<D: Directory> Directory for CachingDirectory<D> {
852    async fn exists(&self, path: &Path) -> io::Result<bool> {
853        if self.cache.read().contains_key(path) {
854            return Ok(true);
855        }
856        self.inner.exists(path).await
857    }
858
859    async fn file_size(&self, path: &Path) -> io::Result<u64> {
860        if let Some(data) = self.cache.read().get(path) {
861            return Ok(data.len() as u64);
862        }
863        self.inner.file_size(path).await
864    }
865
866    async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
867        // Check cache first
868        if let Some(data) = self.cache.read().get(path) {
869            return Ok(FileHandle::from_bytes(OwnedBytes::from_arc_vec(
870                Arc::clone(data),
871                0..data.len(),
872            )));
873        }
874
875        // Read from inner and potentially cache
876        let handle = self.inner.open_read(path).await?;
877        let bytes = handle.read_bytes().await?;
878
879        self.try_cache(path, bytes.as_slice());
880
881        Ok(FileHandle::from_bytes(bytes))
882    }
883
884    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
885        // Check cache first
886        if let Some(data) = self.cache.read().get(path) {
887            let start = range.start as usize;
888            let end = range.end as usize;
889            return Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end));
890        }
891
892        self.inner.read_range(path, range).await
893    }
894
895    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
896        self.inner.list_files(prefix).await
897    }
898
899    async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
900        // For caching directory, delegate to inner - caching happens at read_range level
901        self.inner.open_lazy(path).await
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908
909    #[tokio::test]
910    async fn test_ram_directory() {
911        let dir = RamDirectory::new();
912
913        // Write file
914        dir.write(Path::new("test.txt"), b"hello world")
915            .await
916            .unwrap();
917
918        // Check exists
919        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
920        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
921
922        // Read file
923        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
924        let data = slice.read_bytes().await.unwrap();
925        assert_eq!(data.as_slice(), b"hello world");
926
927        // Read range
928        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
929        assert_eq!(range_data.as_slice(), b"hello");
930
931        // Delete
932        dir.delete(Path::new("test.txt")).await.unwrap();
933        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
934    }
935
936    #[tokio::test]
937    async fn test_file_handle() {
938        let data = OwnedBytes::new(b"hello world".to_vec());
939        let handle = FileHandle::from_bytes(data);
940
941        assert_eq!(handle.len(), 11);
942        assert!(handle.is_sync());
943
944        let sub = handle.slice(0..5);
945        let bytes = sub.read_bytes().await.unwrap();
946        assert_eq!(bytes.as_slice(), b"hello");
947
948        let sub2 = handle.slice(6..11);
949        let bytes2 = sub2.read_bytes().await.unwrap();
950        assert_eq!(bytes2.as_slice(), b"world");
951
952        // Sync reads work on inline handles
953        let sync_bytes = handle.read_bytes_range_sync(0..5).unwrap();
954        assert_eq!(sync_bytes.as_slice(), b"hello");
955    }
956
957    #[tokio::test]
958    async fn test_owned_bytes() {
959        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
960
961        assert_eq!(bytes.len(), 5);
962        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
963
964        let sliced = bytes.slice(1..4);
965        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
966
967        // Original unchanged
968        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
969    }
970}