Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// A slice of bytes that can be read asynchronously
15#[derive(Debug, Clone)]
16pub struct FileSlice {
17    data: OwnedBytes,
18    range: Range<u64>,
19}
20
21impl FileSlice {
22    pub fn new(data: OwnedBytes) -> Self {
23        let len = data.len() as u64;
24        Self {
25            data,
26            range: 0..len,
27        }
28    }
29
30    pub fn empty() -> Self {
31        Self {
32            data: OwnedBytes::empty(),
33            range: 0..0,
34        }
35    }
36
37    pub fn slice(&self, range: Range<u64>) -> Self {
38        let start = self.range.start + range.start;
39        let end = self.range.start + range.end;
40        Self {
41            data: self.data.clone(),
42            range: start..end,
43        }
44    }
45
46    pub fn len(&self) -> u64 {
47        self.range.end - self.range.start
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.range.start == self.range.end
52    }
53
54    /// Read the entire slice (async for network compatibility)
55    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
56        Ok(self
57            .data
58            .slice(self.range.start as usize..self.range.end as usize))
59    }
60
61    /// Read a specific range within this slice
62    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
63        let start = self.range.start + range.start;
64        let end = self.range.start + range.end;
65        if end > self.range.end {
66            return Err(io::Error::new(
67                io::ErrorKind::InvalidInput,
68                "Range out of bounds",
69            ));
70        }
71        Ok(self.data.slice(start as usize..end as usize))
72    }
73}
74
75/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle
76#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg(not(target_arch = "wasm32"))]
79pub trait AsyncFileRead: Send + Sync {
80    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
81    fn len(&self) -> u64;
82
83    /// Check if empty
84    fn is_empty(&self) -> bool {
85        self.len() == 0
86    }
87
88    /// Read a specific byte range
89    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
90
91    /// Read all bytes
92    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
93        self.read_bytes_range(0..self.len()).await
94    }
95}
96
97/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle (WASM version)
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg(target_arch = "wasm32")]
100pub trait AsyncFileRead {
101    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
102    fn len(&self) -> u64;
103
104    /// Check if empty
105    fn is_empty(&self) -> bool {
106        self.len() == 0
107    }
108
109    /// Read a specific byte range
110    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
111
112    /// Read all bytes
113    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
114        self.read_bytes_range(0..self.len()).await
115    }
116}
117
118#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
119#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
120impl AsyncFileRead for FileSlice {
121    fn len(&self) -> u64 {
122        self.range.end - self.range.start
123    }
124
125    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
126        let start = self.range.start + range.start;
127        let end = self.range.start + range.end;
128        if end > self.range.end {
129            return Err(io::Error::new(
130                io::ErrorKind::InvalidInput,
131                "Range out of bounds",
132            ));
133        }
134        Ok(self.data.slice(start as usize..end as usize))
135    }
136}
137
138/// Callback type for lazy range reading
139#[cfg(not(target_arch = "wasm32"))]
140pub type RangeReadFn = Arc<
141    dyn Fn(
142            Range<u64>,
143        )
144            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
145        + Send
146        + Sync,
147>;
148
149#[cfg(target_arch = "wasm32")]
150pub type RangeReadFn = Arc<
151    dyn Fn(
152        Range<u64>,
153    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
154>;
155
156/// Lazy file handle that fetches ranges on demand via HTTP range requests
157/// Does NOT load the entire file into memory
158pub struct LazyFileHandle {
159    /// Total file size (u64 to support >4GB files on 32-bit platforms like WASM)
160    file_size: u64,
161    /// Callback to read a range from the underlying directory
162    read_fn: RangeReadFn,
163}
164
165impl std::fmt::Debug for LazyFileHandle {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("LazyFileHandle")
168            .field("file_size", &self.file_size)
169            .finish()
170    }
171}
172
173impl Clone for LazyFileHandle {
174    fn clone(&self) -> Self {
175        Self {
176            file_size: self.file_size,
177            read_fn: Arc::clone(&self.read_fn),
178        }
179    }
180}
181
182impl LazyFileHandle {
183    /// Create a new lazy file handle
184    pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
185        Self { file_size, read_fn }
186    }
187
188    /// Get file size
189    pub fn file_size(&self) -> u64 {
190        self.file_size
191    }
192
193    /// Create a sub-slice view (still lazy)
194    pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
195        LazyFileSlice {
196            handle: self.clone(),
197            offset: range.start,
198            len: range.end - range.start,
199        }
200    }
201}
202
203#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
204#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
205impl AsyncFileRead for LazyFileHandle {
206    fn len(&self) -> u64 {
207        self.file_size
208    }
209
210    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
211        if range.end > self.file_size {
212            return Err(io::Error::new(
213                io::ErrorKind::InvalidInput,
214                format!(
215                    "Range {:?} out of bounds (file size: {})",
216                    range, self.file_size
217                ),
218            ));
219        }
220        (self.read_fn)(range).await
221    }
222}
223
224/// A slice view into a LazyFileHandle
225#[derive(Clone)]
226pub struct LazyFileSlice {
227    handle: LazyFileHandle,
228    offset: u64,
229    len: u64,
230}
231
232impl std::fmt::Debug for LazyFileSlice {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("LazyFileSlice")
235            .field("offset", &self.offset)
236            .field("len", &self.len)
237            .finish()
238    }
239}
240
241impl LazyFileSlice {
242    /// Create a slice from a handle with absolute offset and length
243    pub fn from_handle_range(handle: &LazyFileHandle, offset: u64, len: u64) -> Self {
244        Self {
245            handle: handle.clone(),
246            offset,
247            len,
248        }
249    }
250
251    /// Create a sub-slice
252    pub fn slice(&self, range: Range<u64>) -> Self {
253        Self {
254            handle: self.handle.clone(),
255            offset: self.offset + range.start,
256            len: range.end - range.start,
257        }
258    }
259}
260
261#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
262#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
263impl AsyncFileRead for LazyFileSlice {
264    fn len(&self) -> u64 {
265        self.len
266    }
267
268    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
269        if range.end > self.len {
270            return Err(io::Error::new(
271                io::ErrorKind::InvalidInput,
272                format!("Range {:?} out of bounds (slice len: {})", range, self.len),
273            ));
274        }
275        let abs_start = self.offset + range.start;
276        let abs_end = self.offset + range.end;
277        self.handle.read_bytes_range(abs_start..abs_end).await
278    }
279}
280
281/// Backing store for OwnedBytes — supports both heap Vec and mmap.
282#[derive(Clone)]
283enum SharedBytes {
284    Vec(Arc<Vec<u8>>),
285    #[cfg(feature = "native")]
286    Mmap(Arc<memmap2::Mmap>),
287}
288
289impl SharedBytes {
290    #[inline]
291    fn as_bytes(&self) -> &[u8] {
292        match self {
293            SharedBytes::Vec(v) => v.as_slice(),
294            #[cfg(feature = "native")]
295            SharedBytes::Mmap(m) => m.as_ref(),
296        }
297    }
298}
299
300impl std::fmt::Debug for SharedBytes {
301    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302        match self {
303            SharedBytes::Vec(v) => write!(f, "Vec(len={})", v.len()),
304            #[cfg(feature = "native")]
305            SharedBytes::Mmap(m) => write!(f, "Mmap(len={})", m.len()),
306        }
307    }
308}
309
310/// Owned bytes with cheap cloning (Arc-backed)
311///
312/// Supports two backing stores:
313/// - `Vec<u8>` for owned data (RamDirectory, FsDirectory, decompressed blocks)
314/// - `Mmap` for zero-copy memory-mapped files (MmapDirectory, native only)
315#[derive(Debug, Clone)]
316pub struct OwnedBytes {
317    data: SharedBytes,
318    range: Range<usize>,
319}
320
321impl OwnedBytes {
322    pub fn new(data: Vec<u8>) -> Self {
323        let len = data.len();
324        Self {
325            data: SharedBytes::Vec(Arc::new(data)),
326            range: 0..len,
327        }
328    }
329
330    pub fn empty() -> Self {
331        Self {
332            data: SharedBytes::Vec(Arc::new(Vec::new())),
333            range: 0..0,
334        }
335    }
336
337    /// Create from a pre-existing Arc<Vec<u8>> with a sub-range.
338    /// Used by RamDirectory and CachingDirectory to share data without copying.
339    pub(crate) fn from_arc_vec(data: Arc<Vec<u8>>, range: Range<usize>) -> Self {
340        Self {
341            data: SharedBytes::Vec(data),
342            range,
343        }
344    }
345
346    /// Create from a memory-mapped file (zero-copy).
347    #[cfg(feature = "native")]
348    pub(crate) fn from_mmap(mmap: Arc<memmap2::Mmap>) -> Self {
349        let len = mmap.len();
350        Self {
351            data: SharedBytes::Mmap(mmap),
352            range: 0..len,
353        }
354    }
355
356    /// Create from a memory-mapped file with a sub-range (zero-copy).
357    #[cfg(feature = "native")]
358    pub(crate) fn from_mmap_range(mmap: Arc<memmap2::Mmap>, range: Range<usize>) -> Self {
359        Self {
360            data: SharedBytes::Mmap(mmap),
361            range,
362        }
363    }
364
365    pub fn len(&self) -> usize {
366        self.range.len()
367    }
368
369    pub fn is_empty(&self) -> bool {
370        self.range.is_empty()
371    }
372
373    pub fn slice(&self, range: Range<usize>) -> Self {
374        let start = self.range.start + range.start;
375        let end = self.range.start + range.end;
376        Self {
377            data: self.data.clone(),
378            range: start..end,
379        }
380    }
381
382    pub fn as_slice(&self) -> &[u8] {
383        &self.data.as_bytes()[self.range.clone()]
384    }
385
386    pub fn to_vec(&self) -> Vec<u8> {
387        self.as_slice().to_vec()
388    }
389}
390
391impl AsRef<[u8]> for OwnedBytes {
392    fn as_ref(&self) -> &[u8] {
393        self.as_slice()
394    }
395}
396
397impl std::ops::Deref for OwnedBytes {
398    type Target = [u8];
399
400    fn deref(&self) -> &Self::Target {
401        self.as_slice()
402    }
403}
404
405/// Async directory trait for reading index files
406#[cfg(not(target_arch = "wasm32"))]
407#[async_trait]
408pub trait Directory: Send + Sync + 'static {
409    /// Check if a file exists
410    async fn exists(&self, path: &Path) -> io::Result<bool>;
411
412    /// Get file size
413    async fn file_size(&self, path: &Path) -> io::Result<u64>;
414
415    /// Open a file for reading, returns a FileSlice (loads entire file)
416    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
417
418    /// Read a specific byte range from a file (optimized for network)
419    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
420
421    /// List files in directory
422    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
423
424    /// Open a lazy file handle that fetches ranges on demand
425    /// This is more efficient for large files over network
426    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
427}
428
429/// Async directory trait for reading index files (WASM version - no Send requirement)
430#[cfg(target_arch = "wasm32")]
431#[async_trait(?Send)]
432pub trait Directory: 'static {
433    /// Check if a file exists
434    async fn exists(&self, path: &Path) -> io::Result<bool>;
435
436    /// Get file size
437    async fn file_size(&self, path: &Path) -> io::Result<u64>;
438
439    /// Open a file for reading, returns a FileSlice (loads entire file)
440    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
441
442    /// Read a specific byte range from a file (optimized for network)
443    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
444
445    /// List files in directory
446    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
447
448    /// Open a lazy file handle that fetches ranges on demand
449    /// This is more efficient for large files over network
450    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
451}
452
453/// A writer for incrementally writing data to a directory file.
454///
455/// Avoids buffering entire files in memory during merge. File-backed
456/// directories write directly to disk; memory directories collect to Vec.
457pub trait StreamingWriter: io::Write + Send {
458    /// Finalize the write, making data available for reading.
459    fn finish(self: Box<Self>) -> io::Result<()>;
460
461    /// Bytes written so far.
462    fn bytes_written(&self) -> u64;
463}
464
465/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
466/// Used as default/fallback and for RamDirectory.
467struct BufferedStreamingWriter {
468    path: PathBuf,
469    buffer: Vec<u8>,
470    /// Callback to write the buffer to the directory on finish.
471    /// We store the files Arc directly for RamDirectory.
472    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
473}
474
475impl io::Write for BufferedStreamingWriter {
476    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
477        self.buffer.extend_from_slice(buf);
478        Ok(buf.len())
479    }
480
481    fn flush(&mut self) -> io::Result<()> {
482        Ok(())
483    }
484}
485
486impl StreamingWriter for BufferedStreamingWriter {
487    fn finish(self: Box<Self>) -> io::Result<()> {
488        self.files.write().insert(self.path, Arc::new(self.buffer));
489        Ok(())
490    }
491
492    fn bytes_written(&self) -> u64 {
493        self.buffer.len() as u64
494    }
495}
496
497/// Buffer size for FileStreamingWriter (8 MB).
498/// Large enough to coalesce millions of tiny writes (e.g. per-vector doc_id writes)
499/// into efficient sequential I/O.
500#[cfg(feature = "native")]
501const FILE_STREAMING_BUF_SIZE: usize = 8 * 1024 * 1024;
502
503/// StreamingWriter backed by a buffered std::fs::File for filesystem directories.
504#[cfg(feature = "native")]
505pub(crate) struct FileStreamingWriter {
506    pub(crate) file: io::BufWriter<std::fs::File>,
507    pub(crate) written: u64,
508}
509
510#[cfg(feature = "native")]
511impl FileStreamingWriter {
512    pub(crate) fn new(file: std::fs::File) -> Self {
513        Self {
514            file: io::BufWriter::with_capacity(FILE_STREAMING_BUF_SIZE, file),
515            written: 0,
516        }
517    }
518}
519
520#[cfg(feature = "native")]
521impl io::Write for FileStreamingWriter {
522    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
523        let n = self.file.write(buf)?;
524        self.written += n as u64;
525        Ok(n)
526    }
527
528    fn flush(&mut self) -> io::Result<()> {
529        self.file.flush()
530    }
531}
532
533#[cfg(feature = "native")]
534impl StreamingWriter for FileStreamingWriter {
535    fn finish(self: Box<Self>) -> io::Result<()> {
536        let file = self.file.into_inner().map_err(|e| e.into_error())?;
537        file.sync_all()?;
538        Ok(())
539    }
540
541    fn bytes_written(&self) -> u64 {
542        self.written
543    }
544}
545
546/// Async directory trait for writing index files
547#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
548#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
549pub trait DirectoryWriter: Directory {
550    /// Create/overwrite a file with data
551    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
552
553    /// Delete a file
554    async fn delete(&self, path: &Path) -> io::Result<()>;
555
556    /// Atomic rename
557    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
558
559    /// Sync all pending writes
560    async fn sync(&self) -> io::Result<()>;
561
562    /// Create a streaming writer for incremental file writes.
563    /// Call finish() on the returned writer to finalize.
564    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
565}
566
567/// In-memory directory for testing and small indexes
568#[derive(Debug, Default)]
569pub struct RamDirectory {
570    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
571}
572
573impl Clone for RamDirectory {
574    fn clone(&self) -> Self {
575        Self {
576            files: Arc::clone(&self.files),
577        }
578    }
579}
580
581impl RamDirectory {
582    pub fn new() -> Self {
583        Self::default()
584    }
585}
586
587#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
588#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
589impl Directory for RamDirectory {
590    async fn exists(&self, path: &Path) -> io::Result<bool> {
591        Ok(self.files.read().contains_key(path))
592    }
593
594    async fn file_size(&self, path: &Path) -> io::Result<u64> {
595        self.files
596            .read()
597            .get(path)
598            .map(|data| data.len() as u64)
599            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
600    }
601
602    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
603        let files = self.files.read();
604        let data = files
605            .get(path)
606            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
607
608        Ok(FileSlice::new(OwnedBytes::from_arc_vec(
609            Arc::clone(data),
610            0..data.len(),
611        )))
612    }
613
614    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
615        let files = self.files.read();
616        let data = files
617            .get(path)
618            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
619
620        let start = range.start as usize;
621        let end = range.end as usize;
622
623        if end > data.len() {
624            return Err(io::Error::new(
625                io::ErrorKind::InvalidInput,
626                "Range out of bounds",
627            ));
628        }
629
630        Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
631    }
632
633    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
634        let files = self.files.read();
635        Ok(files
636            .keys()
637            .filter(|p| p.starts_with(prefix))
638            .cloned()
639            .collect())
640    }
641
642    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
643        let files = Arc::clone(&self.files);
644        let path = path.to_path_buf();
645
646        let file_size = {
647            let files_guard = files.read();
648            files_guard
649                .get(&path)
650                .map(|data| data.len() as u64)
651                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
652        };
653
654        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
655            let files = Arc::clone(&files);
656            let path = path.clone();
657            Box::pin(async move {
658                let files_guard = files.read();
659                let data = files_guard
660                    .get(&path)
661                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
662
663                let start = range.start as usize;
664                let end = range.end as usize;
665                if end > data.len() {
666                    return Err(io::Error::new(
667                        io::ErrorKind::InvalidInput,
668                        "Range out of bounds",
669                    ));
670                }
671                Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
672            })
673        });
674
675        Ok(LazyFileHandle::new(file_size, read_fn))
676    }
677}
678
679#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
680#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
681impl DirectoryWriter for RamDirectory {
682    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
683        self.files
684            .write()
685            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
686        Ok(())
687    }
688
689    async fn delete(&self, path: &Path) -> io::Result<()> {
690        self.files.write().remove(path);
691        Ok(())
692    }
693
694    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
695        let mut files = self.files.write();
696        if let Some(data) = files.remove(from) {
697            files.insert(to.to_path_buf(), data);
698        }
699        Ok(())
700    }
701
702    async fn sync(&self) -> io::Result<()> {
703        Ok(())
704    }
705
706    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
707        Ok(Box::new(BufferedStreamingWriter {
708            path: path.to_path_buf(),
709            buffer: Vec::new(),
710            files: Arc::clone(&self.files),
711        }))
712    }
713}
714
715/// Local filesystem directory with async IO via tokio
716#[cfg(feature = "native")]
717#[derive(Debug, Clone)]
718pub struct FsDirectory {
719    root: PathBuf,
720}
721
722#[cfg(feature = "native")]
723impl FsDirectory {
724    pub fn new(root: impl AsRef<Path>) -> Self {
725        Self {
726            root: root.as_ref().to_path_buf(),
727        }
728    }
729
730    fn resolve(&self, path: &Path) -> PathBuf {
731        self.root.join(path)
732    }
733}
734
735#[cfg(feature = "native")]
736#[async_trait]
737impl Directory for FsDirectory {
738    async fn exists(&self, path: &Path) -> io::Result<bool> {
739        let full_path = self.resolve(path);
740        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
741    }
742
743    async fn file_size(&self, path: &Path) -> io::Result<u64> {
744        let full_path = self.resolve(path);
745        let metadata = tokio::fs::metadata(&full_path).await?;
746        Ok(metadata.len())
747    }
748
749    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
750        let full_path = self.resolve(path);
751        let data = tokio::fs::read(&full_path).await?;
752        Ok(FileSlice::new(OwnedBytes::new(data)))
753    }
754
755    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
756        use tokio::io::{AsyncReadExt, AsyncSeekExt};
757
758        let full_path = self.resolve(path);
759        let mut file = tokio::fs::File::open(&full_path).await?;
760
761        file.seek(std::io::SeekFrom::Start(range.start)).await?;
762
763        let len = (range.end - range.start) as usize;
764        let mut buffer = vec![0u8; len];
765        file.read_exact(&mut buffer).await?;
766
767        Ok(OwnedBytes::new(buffer))
768    }
769
770    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
771        let full_path = self.resolve(prefix);
772        let mut entries = tokio::fs::read_dir(&full_path).await?;
773        let mut files = Vec::new();
774
775        while let Some(entry) = entries.next_entry().await? {
776            if entry.file_type().await?.is_file() {
777                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
778            }
779        }
780
781        Ok(files)
782    }
783
784    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
785        let full_path = self.resolve(path);
786        let metadata = tokio::fs::metadata(&full_path).await?;
787        let file_size = metadata.len();
788
789        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
790            let full_path = full_path.clone();
791            Box::pin(async move {
792                use tokio::io::{AsyncReadExt, AsyncSeekExt};
793
794                let mut file = tokio::fs::File::open(&full_path).await?;
795                file.seek(std::io::SeekFrom::Start(range.start)).await?;
796
797                let len = (range.end - range.start) as usize;
798                let mut buffer = vec![0u8; len];
799                file.read_exact(&mut buffer).await?;
800
801                Ok(OwnedBytes::new(buffer))
802            })
803        });
804
805        Ok(LazyFileHandle::new(file_size, read_fn))
806    }
807}
808
809#[cfg(feature = "native")]
810#[async_trait]
811impl DirectoryWriter for FsDirectory {
812    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
813        let full_path = self.resolve(path);
814
815        // Ensure parent directory exists
816        if let Some(parent) = full_path.parent() {
817            tokio::fs::create_dir_all(parent).await?;
818        }
819
820        tokio::fs::write(&full_path, data).await
821    }
822
823    async fn delete(&self, path: &Path) -> io::Result<()> {
824        let full_path = self.resolve(path);
825        tokio::fs::remove_file(&full_path).await
826    }
827
828    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
829        let from_path = self.resolve(from);
830        let to_path = self.resolve(to);
831        tokio::fs::rename(&from_path, &to_path).await
832    }
833
834    async fn sync(&self) -> io::Result<()> {
835        // fsync the directory
836        let dir = std::fs::File::open(&self.root)?;
837        dir.sync_all()?;
838        Ok(())
839    }
840
841    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
842        let full_path = self.resolve(path);
843        if let Some(parent) = full_path.parent() {
844            tokio::fs::create_dir_all(parent).await?;
845        }
846        let file = std::fs::File::create(&full_path)?;
847        Ok(Box::new(FileStreamingWriter::new(file)))
848    }
849}
850
851/// Caching wrapper for any Directory - caches file reads
852pub struct CachingDirectory<D: Directory> {
853    inner: D,
854    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
855    max_cached_bytes: usize,
856    current_bytes: RwLock<usize>,
857}
858
859impl<D: Directory> CachingDirectory<D> {
860    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
861        Self {
862            inner,
863            cache: RwLock::new(HashMap::new()),
864            max_cached_bytes,
865            current_bytes: RwLock::new(0),
866        }
867    }
868
869    fn try_cache(&self, path: &Path, data: &[u8]) {
870        let mut current = self.current_bytes.write();
871        if *current + data.len() <= self.max_cached_bytes {
872            self.cache
873                .write()
874                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
875            *current += data.len();
876        }
877    }
878}
879
880#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
881#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
882impl<D: Directory> Directory for CachingDirectory<D> {
883    async fn exists(&self, path: &Path) -> io::Result<bool> {
884        if self.cache.read().contains_key(path) {
885            return Ok(true);
886        }
887        self.inner.exists(path).await
888    }
889
890    async fn file_size(&self, path: &Path) -> io::Result<u64> {
891        if let Some(data) = self.cache.read().get(path) {
892            return Ok(data.len() as u64);
893        }
894        self.inner.file_size(path).await
895    }
896
897    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
898        // Check cache first
899        if let Some(data) = self.cache.read().get(path) {
900            return Ok(FileSlice::new(OwnedBytes::from_arc_vec(
901                Arc::clone(data),
902                0..data.len(),
903            )));
904        }
905
906        // Read from inner and potentially cache
907        let slice = self.inner.open_read(path).await?;
908        let bytes = slice.read_bytes().await?;
909
910        self.try_cache(path, bytes.as_slice());
911
912        Ok(FileSlice::new(bytes))
913    }
914
915    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
916        // Check cache first
917        if let Some(data) = self.cache.read().get(path) {
918            let start = range.start as usize;
919            let end = range.end as usize;
920            return Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end));
921        }
922
923        self.inner.read_range(path, range).await
924    }
925
926    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
927        self.inner.list_files(prefix).await
928    }
929
930    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
931        // For caching directory, delegate to inner - caching happens at read_range level
932        self.inner.open_lazy(path).await
933    }
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939
940    #[tokio::test]
941    async fn test_ram_directory() {
942        let dir = RamDirectory::new();
943
944        // Write file
945        dir.write(Path::new("test.txt"), b"hello world")
946            .await
947            .unwrap();
948
949        // Check exists
950        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
951        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
952
953        // Read file
954        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
955        let data = slice.read_bytes().await.unwrap();
956        assert_eq!(data.as_slice(), b"hello world");
957
958        // Read range
959        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
960        assert_eq!(range_data.as_slice(), b"hello");
961
962        // Delete
963        dir.delete(Path::new("test.txt")).await.unwrap();
964        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
965    }
966
967    #[tokio::test]
968    async fn test_file_slice() {
969        let data = OwnedBytes::new(b"hello world".to_vec());
970        let slice = FileSlice::new(data);
971
972        assert_eq!(slice.len(), 11);
973
974        let sub_slice = slice.slice(0..5);
975        let bytes = sub_slice.read_bytes().await.unwrap();
976        assert_eq!(bytes.as_slice(), b"hello");
977
978        let sub_slice2 = slice.slice(6..11);
979        let bytes2 = sub_slice2.read_bytes().await.unwrap();
980        assert_eq!(bytes2.as_slice(), b"world");
981    }
982
983    #[tokio::test]
984    async fn test_owned_bytes() {
985        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
986
987        assert_eq!(bytes.len(), 5);
988        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
989
990        let sliced = bytes.slice(1..4);
991        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
992
993        // Original unchanged
994        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
995    }
996}