Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10#[cfg(feature = "native")]
11use std::io::Write;
12use std::ops::Range;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16/// A slice of bytes that can be read asynchronously
17#[derive(Debug, Clone)]
18pub struct FileSlice {
19    data: OwnedBytes,
20    range: Range<u64>,
21}
22
23impl FileSlice {
24    pub fn new(data: OwnedBytes) -> Self {
25        let len = data.len() as u64;
26        Self {
27            data,
28            range: 0..len,
29        }
30    }
31
32    pub fn empty() -> Self {
33        Self {
34            data: OwnedBytes::empty(),
35            range: 0..0,
36        }
37    }
38
39    pub fn slice(&self, range: Range<u64>) -> Self {
40        let start = self.range.start + range.start;
41        let end = self.range.start + range.end;
42        Self {
43            data: self.data.clone(),
44            range: start..end,
45        }
46    }
47
48    pub fn len(&self) -> u64 {
49        self.range.end - self.range.start
50    }
51
52    pub fn is_empty(&self) -> bool {
53        self.range.start == self.range.end
54    }
55
56    /// Read the entire slice (async for network compatibility)
57    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
58        Ok(self
59            .data
60            .slice(self.range.start as usize..self.range.end as usize))
61    }
62
63    /// Read a specific range within this slice
64    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
65        let start = self.range.start + range.start;
66        let end = self.range.start + range.end;
67        if end > self.range.end {
68            return Err(io::Error::new(
69                io::ErrorKind::InvalidInput,
70                "Range out of bounds",
71            ));
72        }
73        Ok(self.data.slice(start as usize..end as usize))
74    }
75}
76
77/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle
78#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
79#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
80#[cfg(not(target_arch = "wasm32"))]
81pub trait AsyncFileRead: Send + Sync {
82    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
83    fn len(&self) -> u64;
84
85    /// Check if empty
86    fn is_empty(&self) -> bool {
87        self.len() == 0
88    }
89
90    /// Read a specific byte range
91    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
92
93    /// Read all bytes
94    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
95        self.read_bytes_range(0..self.len()).await
96    }
97}
98
99/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle (WASM version)
100#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
101#[cfg(target_arch = "wasm32")]
102pub trait AsyncFileRead {
103    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
104    fn len(&self) -> u64;
105
106    /// Check if empty
107    fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Read a specific byte range
112    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
113
114    /// Read all bytes
115    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
116        self.read_bytes_range(0..self.len()).await
117    }
118}
119
120#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
121#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
122impl AsyncFileRead for FileSlice {
123    fn len(&self) -> u64 {
124        self.range.end - self.range.start
125    }
126
127    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
128        let start = self.range.start + range.start;
129        let end = self.range.start + range.end;
130        if end > self.range.end {
131            return Err(io::Error::new(
132                io::ErrorKind::InvalidInput,
133                "Range out of bounds",
134            ));
135        }
136        Ok(self.data.slice(start as usize..end as usize))
137    }
138}
139
140/// Callback type for lazy range reading
141#[cfg(not(target_arch = "wasm32"))]
142pub type RangeReadFn = Arc<
143    dyn Fn(
144            Range<u64>,
145        )
146            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
147        + Send
148        + Sync,
149>;
150
151#[cfg(target_arch = "wasm32")]
152pub type RangeReadFn = Arc<
153    dyn Fn(
154        Range<u64>,
155    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
156>;
157
158/// Lazy file handle that fetches ranges on demand via HTTP range requests
159/// Does NOT load the entire file into memory
160pub struct LazyFileHandle {
161    /// Total file size (u64 to support >4GB files on 32-bit platforms like WASM)
162    file_size: u64,
163    /// Callback to read a range from the underlying directory
164    read_fn: RangeReadFn,
165}
166
167impl std::fmt::Debug for LazyFileHandle {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("LazyFileHandle")
170            .field("file_size", &self.file_size)
171            .finish()
172    }
173}
174
175impl Clone for LazyFileHandle {
176    fn clone(&self) -> Self {
177        Self {
178            file_size: self.file_size,
179            read_fn: Arc::clone(&self.read_fn),
180        }
181    }
182}
183
184impl LazyFileHandle {
185    /// Create a new lazy file handle
186    pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
187        Self { file_size, read_fn }
188    }
189
190    /// Get file size
191    pub fn file_size(&self) -> u64 {
192        self.file_size
193    }
194
195    /// Create a sub-slice view (still lazy)
196    pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
197        LazyFileSlice {
198            handle: self.clone(),
199            offset: range.start,
200            len: range.end - range.start,
201        }
202    }
203}
204
205#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
206#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
207impl AsyncFileRead for LazyFileHandle {
208    fn len(&self) -> u64 {
209        self.file_size
210    }
211
212    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
213        if range.end > self.file_size {
214            return Err(io::Error::new(
215                io::ErrorKind::InvalidInput,
216                format!(
217                    "Range {:?} out of bounds (file size: {})",
218                    range, self.file_size
219                ),
220            ));
221        }
222        (self.read_fn)(range).await
223    }
224}
225
226/// A slice view into a LazyFileHandle
227#[derive(Clone)]
228pub struct LazyFileSlice {
229    handle: LazyFileHandle,
230    offset: u64,
231    len: u64,
232}
233
234impl std::fmt::Debug for LazyFileSlice {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        f.debug_struct("LazyFileSlice")
237            .field("offset", &self.offset)
238            .field("len", &self.len)
239            .finish()
240    }
241}
242
243impl LazyFileSlice {
244    /// Create a slice from a handle with absolute offset and length
245    pub fn from_handle_range(handle: &LazyFileHandle, offset: u64, len: u64) -> Self {
246        Self {
247            handle: handle.clone(),
248            offset,
249            len,
250        }
251    }
252
253    /// Create a sub-slice
254    pub fn slice(&self, range: Range<u64>) -> Self {
255        Self {
256            handle: self.handle.clone(),
257            offset: self.offset + range.start,
258            len: range.end - range.start,
259        }
260    }
261}
262
263#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
264#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
265impl AsyncFileRead for LazyFileSlice {
266    fn len(&self) -> u64 {
267        self.len
268    }
269
270    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
271        if range.end > self.len {
272            return Err(io::Error::new(
273                io::ErrorKind::InvalidInput,
274                format!("Range {:?} out of bounds (slice len: {})", range, self.len),
275            ));
276        }
277        let abs_start = self.offset + range.start;
278        let abs_end = self.offset + range.end;
279        self.handle.read_bytes_range(abs_start..abs_end).await
280    }
281}
282
283/// Owned bytes with cheap cloning (Arc-backed)
284#[derive(Debug, Clone)]
285pub struct OwnedBytes {
286    data: Arc<Vec<u8>>,
287    range: Range<usize>,
288}
289
290impl OwnedBytes {
291    pub fn new(data: Vec<u8>) -> Self {
292        let len = data.len();
293        Self {
294            data: Arc::new(data),
295            range: 0..len,
296        }
297    }
298
299    pub fn empty() -> Self {
300        Self {
301            data: Arc::new(Vec::new()),
302            range: 0..0,
303        }
304    }
305
306    pub fn len(&self) -> usize {
307        self.range.len()
308    }
309
310    pub fn is_empty(&self) -> bool {
311        self.range.is_empty()
312    }
313
314    pub fn slice(&self, range: Range<usize>) -> Self {
315        let start = self.range.start + range.start;
316        let end = self.range.start + range.end;
317        Self {
318            data: Arc::clone(&self.data),
319            range: start..end,
320        }
321    }
322
323    pub fn as_slice(&self) -> &[u8] {
324        &self.data[self.range.clone()]
325    }
326
327    pub fn to_vec(&self) -> Vec<u8> {
328        self.as_slice().to_vec()
329    }
330}
331
332impl AsRef<[u8]> for OwnedBytes {
333    fn as_ref(&self) -> &[u8] {
334        self.as_slice()
335    }
336}
337
338impl std::ops::Deref for OwnedBytes {
339    type Target = [u8];
340
341    fn deref(&self) -> &Self::Target {
342        self.as_slice()
343    }
344}
345
346/// Async directory trait for reading index files
347#[cfg(not(target_arch = "wasm32"))]
348#[async_trait]
349pub trait Directory: Send + Sync + 'static {
350    /// Check if a file exists
351    async fn exists(&self, path: &Path) -> io::Result<bool>;
352
353    /// Get file size
354    async fn file_size(&self, path: &Path) -> io::Result<u64>;
355
356    /// Open a file for reading, returns a FileSlice (loads entire file)
357    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
358
359    /// Read a specific byte range from a file (optimized for network)
360    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
361
362    /// List files in directory
363    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
364
365    /// Open a lazy file handle that fetches ranges on demand
366    /// This is more efficient for large files over network
367    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
368}
369
370/// Async directory trait for reading index files (WASM version - no Send requirement)
371#[cfg(target_arch = "wasm32")]
372#[async_trait(?Send)]
373pub trait Directory: 'static {
374    /// Check if a file exists
375    async fn exists(&self, path: &Path) -> io::Result<bool>;
376
377    /// Get file size
378    async fn file_size(&self, path: &Path) -> io::Result<u64>;
379
380    /// Open a file for reading, returns a FileSlice (loads entire file)
381    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
382
383    /// Read a specific byte range from a file (optimized for network)
384    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
385
386    /// List files in directory
387    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
388
389    /// Open a lazy file handle that fetches ranges on demand
390    /// This is more efficient for large files over network
391    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
392}
393
394/// A writer for incrementally writing data to a directory file.
395///
396/// Avoids buffering entire files in memory during merge. File-backed
397/// directories write directly to disk; memory directories collect to Vec.
398pub trait StreamingWriter: io::Write + Send {
399    /// Finalize the write, making data available for reading.
400    fn finish(self: Box<Self>) -> io::Result<()>;
401
402    /// Bytes written so far.
403    fn bytes_written(&self) -> u64;
404}
405
406/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
407/// Used as default/fallback and for RamDirectory.
408struct BufferedStreamingWriter {
409    path: PathBuf,
410    buffer: Vec<u8>,
411    /// Callback to write the buffer to the directory on finish.
412    /// We store the files Arc directly for RamDirectory.
413    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
414}
415
416impl io::Write for BufferedStreamingWriter {
417    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
418        self.buffer.extend_from_slice(buf);
419        Ok(buf.len())
420    }
421
422    fn flush(&mut self) -> io::Result<()> {
423        Ok(())
424    }
425}
426
427impl StreamingWriter for BufferedStreamingWriter {
428    fn finish(self: Box<Self>) -> io::Result<()> {
429        self.files.write().insert(self.path, Arc::new(self.buffer));
430        Ok(())
431    }
432
433    fn bytes_written(&self) -> u64 {
434        self.buffer.len() as u64
435    }
436}
437
438/// StreamingWriter backed by std::fs::File for filesystem directories.
439#[cfg(feature = "native")]
440pub(crate) struct FileStreamingWriter {
441    pub(crate) file: std::fs::File,
442    pub(crate) written: u64,
443}
444
445#[cfg(feature = "native")]
446impl io::Write for FileStreamingWriter {
447    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
448        let n = self.file.write(buf)?;
449        self.written += n as u64;
450        Ok(n)
451    }
452
453    fn flush(&mut self) -> io::Result<()> {
454        self.file.flush()
455    }
456}
457
458#[cfg(feature = "native")]
459impl StreamingWriter for FileStreamingWriter {
460    fn finish(mut self: Box<Self>) -> io::Result<()> {
461        self.file.flush()?;
462        self.file.sync_all()?;
463        Ok(())
464    }
465
466    fn bytes_written(&self) -> u64 {
467        self.written
468    }
469}
470
471/// Async directory trait for writing index files
472#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
473#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
474pub trait DirectoryWriter: Directory {
475    /// Create/overwrite a file with data
476    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
477
478    /// Delete a file
479    async fn delete(&self, path: &Path) -> io::Result<()>;
480
481    /// Atomic rename
482    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
483
484    /// Sync all pending writes
485    async fn sync(&self) -> io::Result<()>;
486
487    /// Create a streaming writer for incremental file writes.
488    /// Call finish() on the returned writer to finalize.
489    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
490}
491
492/// In-memory directory for testing and small indexes
493#[derive(Debug, Default)]
494pub struct RamDirectory {
495    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
496}
497
498impl Clone for RamDirectory {
499    fn clone(&self) -> Self {
500        Self {
501            files: Arc::clone(&self.files),
502        }
503    }
504}
505
506impl RamDirectory {
507    pub fn new() -> Self {
508        Self::default()
509    }
510}
511
512#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
513#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
514impl Directory for RamDirectory {
515    async fn exists(&self, path: &Path) -> io::Result<bool> {
516        Ok(self.files.read().contains_key(path))
517    }
518
519    async fn file_size(&self, path: &Path) -> io::Result<u64> {
520        self.files
521            .read()
522            .get(path)
523            .map(|data| data.len() as u64)
524            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
525    }
526
527    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
528        let files = self.files.read();
529        let data = files
530            .get(path)
531            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
532
533        Ok(FileSlice::new(OwnedBytes {
534            data: Arc::clone(data),
535            range: 0..data.len(),
536        }))
537    }
538
539    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
540        let files = self.files.read();
541        let data = files
542            .get(path)
543            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
544
545        let start = range.start as usize;
546        let end = range.end as usize;
547
548        if end > data.len() {
549            return Err(io::Error::new(
550                io::ErrorKind::InvalidInput,
551                "Range out of bounds",
552            ));
553        }
554
555        Ok(OwnedBytes {
556            data: Arc::clone(data),
557            range: start..end,
558        })
559    }
560
561    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
562        let files = self.files.read();
563        Ok(files
564            .keys()
565            .filter(|p| p.starts_with(prefix))
566            .cloned()
567            .collect())
568    }
569
570    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
571        let files = Arc::clone(&self.files);
572        let path = path.to_path_buf();
573
574        let file_size = {
575            let files_guard = files.read();
576            files_guard
577                .get(&path)
578                .map(|data| data.len() as u64)
579                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
580        };
581
582        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
583            let files = Arc::clone(&files);
584            let path = path.clone();
585            Box::pin(async move {
586                let files_guard = files.read();
587                let data = files_guard
588                    .get(&path)
589                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
590
591                let start = range.start as usize;
592                let end = range.end as usize;
593                if end > data.len() {
594                    return Err(io::Error::new(
595                        io::ErrorKind::InvalidInput,
596                        "Range out of bounds",
597                    ));
598                }
599                Ok(OwnedBytes {
600                    data: Arc::clone(data),
601                    range: start..end,
602                })
603            })
604        });
605
606        Ok(LazyFileHandle::new(file_size, read_fn))
607    }
608}
609
610#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
611#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
612impl DirectoryWriter for RamDirectory {
613    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
614        self.files
615            .write()
616            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
617        Ok(())
618    }
619
620    async fn delete(&self, path: &Path) -> io::Result<()> {
621        self.files.write().remove(path);
622        Ok(())
623    }
624
625    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
626        let mut files = self.files.write();
627        if let Some(data) = files.remove(from) {
628            files.insert(to.to_path_buf(), data);
629        }
630        Ok(())
631    }
632
633    async fn sync(&self) -> io::Result<()> {
634        Ok(())
635    }
636
637    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
638        Ok(Box::new(BufferedStreamingWriter {
639            path: path.to_path_buf(),
640            buffer: Vec::new(),
641            files: Arc::clone(&self.files),
642        }))
643    }
644}
645
646/// Local filesystem directory with async IO via tokio
647#[cfg(feature = "native")]
648#[derive(Debug, Clone)]
649pub struct FsDirectory {
650    root: PathBuf,
651}
652
653#[cfg(feature = "native")]
654impl FsDirectory {
655    pub fn new(root: impl AsRef<Path>) -> Self {
656        Self {
657            root: root.as_ref().to_path_buf(),
658        }
659    }
660
661    fn resolve(&self, path: &Path) -> PathBuf {
662        self.root.join(path)
663    }
664}
665
666#[cfg(feature = "native")]
667#[async_trait]
668impl Directory for FsDirectory {
669    async fn exists(&self, path: &Path) -> io::Result<bool> {
670        let full_path = self.resolve(path);
671        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
672    }
673
674    async fn file_size(&self, path: &Path) -> io::Result<u64> {
675        let full_path = self.resolve(path);
676        let metadata = tokio::fs::metadata(&full_path).await?;
677        Ok(metadata.len())
678    }
679
680    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
681        let full_path = self.resolve(path);
682        let data = tokio::fs::read(&full_path).await?;
683        Ok(FileSlice::new(OwnedBytes::new(data)))
684    }
685
686    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
687        use tokio::io::{AsyncReadExt, AsyncSeekExt};
688
689        let full_path = self.resolve(path);
690        let mut file = tokio::fs::File::open(&full_path).await?;
691
692        file.seek(std::io::SeekFrom::Start(range.start)).await?;
693
694        let len = (range.end - range.start) as usize;
695        let mut buffer = vec![0u8; len];
696        file.read_exact(&mut buffer).await?;
697
698        Ok(OwnedBytes::new(buffer))
699    }
700
701    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
702        let full_path = self.resolve(prefix);
703        let mut entries = tokio::fs::read_dir(&full_path).await?;
704        let mut files = Vec::new();
705
706        while let Some(entry) = entries.next_entry().await? {
707            if entry.file_type().await?.is_file() {
708                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
709            }
710        }
711
712        Ok(files)
713    }
714
715    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
716        let full_path = self.resolve(path);
717        let metadata = tokio::fs::metadata(&full_path).await?;
718        let file_size = metadata.len();
719
720        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
721            let full_path = full_path.clone();
722            Box::pin(async move {
723                use tokio::io::{AsyncReadExt, AsyncSeekExt};
724
725                let mut file = tokio::fs::File::open(&full_path).await?;
726                file.seek(std::io::SeekFrom::Start(range.start)).await?;
727
728                let len = (range.end - range.start) as usize;
729                let mut buffer = vec![0u8; len];
730                file.read_exact(&mut buffer).await?;
731
732                Ok(OwnedBytes::new(buffer))
733            })
734        });
735
736        Ok(LazyFileHandle::new(file_size, read_fn))
737    }
738}
739
740#[cfg(feature = "native")]
741#[async_trait]
742impl DirectoryWriter for FsDirectory {
743    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
744        let full_path = self.resolve(path);
745
746        // Ensure parent directory exists
747        if let Some(parent) = full_path.parent() {
748            tokio::fs::create_dir_all(parent).await?;
749        }
750
751        tokio::fs::write(&full_path, data).await
752    }
753
754    async fn delete(&self, path: &Path) -> io::Result<()> {
755        let full_path = self.resolve(path);
756        tokio::fs::remove_file(&full_path).await
757    }
758
759    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
760        let from_path = self.resolve(from);
761        let to_path = self.resolve(to);
762        tokio::fs::rename(&from_path, &to_path).await
763    }
764
765    async fn sync(&self) -> io::Result<()> {
766        // fsync the directory
767        let dir = std::fs::File::open(&self.root)?;
768        dir.sync_all()?;
769        Ok(())
770    }
771
772    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
773        let full_path = self.resolve(path);
774        if let Some(parent) = full_path.parent() {
775            tokio::fs::create_dir_all(parent).await?;
776        }
777        let file = std::fs::File::create(&full_path)?;
778        Ok(Box::new(FileStreamingWriter { file, written: 0 }))
779    }
780}
781
782/// Caching wrapper for any Directory - caches file reads
783pub struct CachingDirectory<D: Directory> {
784    inner: D,
785    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
786    max_cached_bytes: usize,
787    current_bytes: RwLock<usize>,
788}
789
790impl<D: Directory> CachingDirectory<D> {
791    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
792        Self {
793            inner,
794            cache: RwLock::new(HashMap::new()),
795            max_cached_bytes,
796            current_bytes: RwLock::new(0),
797        }
798    }
799
800    fn try_cache(&self, path: &Path, data: &[u8]) {
801        let mut current = self.current_bytes.write();
802        if *current + data.len() <= self.max_cached_bytes {
803            self.cache
804                .write()
805                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
806            *current += data.len();
807        }
808    }
809}
810
811#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
812#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
813impl<D: Directory> Directory for CachingDirectory<D> {
814    async fn exists(&self, path: &Path) -> io::Result<bool> {
815        if self.cache.read().contains_key(path) {
816            return Ok(true);
817        }
818        self.inner.exists(path).await
819    }
820
821    async fn file_size(&self, path: &Path) -> io::Result<u64> {
822        if let Some(data) = self.cache.read().get(path) {
823            return Ok(data.len() as u64);
824        }
825        self.inner.file_size(path).await
826    }
827
828    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
829        // Check cache first
830        if let Some(data) = self.cache.read().get(path) {
831            return Ok(FileSlice::new(OwnedBytes {
832                data: Arc::clone(data),
833                range: 0..data.len(),
834            }));
835        }
836
837        // Read from inner and potentially cache
838        let slice = self.inner.open_read(path).await?;
839        let bytes = slice.read_bytes().await?;
840
841        self.try_cache(path, bytes.as_slice());
842
843        Ok(FileSlice::new(bytes))
844    }
845
846    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
847        // Check cache first
848        if let Some(data) = self.cache.read().get(path) {
849            let start = range.start as usize;
850            let end = range.end as usize;
851            return Ok(OwnedBytes {
852                data: Arc::clone(data),
853                range: start..end,
854            });
855        }
856
857        self.inner.read_range(path, range).await
858    }
859
860    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
861        self.inner.list_files(prefix).await
862    }
863
864    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
865        // For caching directory, delegate to inner - caching happens at read_range level
866        self.inner.open_lazy(path).await
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873
874    #[tokio::test]
875    async fn test_ram_directory() {
876        let dir = RamDirectory::new();
877
878        // Write file
879        dir.write(Path::new("test.txt"), b"hello world")
880            .await
881            .unwrap();
882
883        // Check exists
884        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
885        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
886
887        // Read file
888        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
889        let data = slice.read_bytes().await.unwrap();
890        assert_eq!(data.as_slice(), b"hello world");
891
892        // Read range
893        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
894        assert_eq!(range_data.as_slice(), b"hello");
895
896        // Delete
897        dir.delete(Path::new("test.txt")).await.unwrap();
898        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
899    }
900
901    #[tokio::test]
902    async fn test_file_slice() {
903        let data = OwnedBytes::new(b"hello world".to_vec());
904        let slice = FileSlice::new(data);
905
906        assert_eq!(slice.len(), 11);
907
908        let sub_slice = slice.slice(0..5);
909        let bytes = sub_slice.read_bytes().await.unwrap();
910        assert_eq!(bytes.as_slice(), b"hello");
911
912        let sub_slice2 = slice.slice(6..11);
913        let bytes2 = sub_slice2.read_bytes().await.unwrap();
914        assert_eq!(bytes2.as_slice(), b"world");
915    }
916
917    #[tokio::test]
918    async fn test_owned_bytes() {
919        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
920
921        assert_eq!(bytes.len(), 5);
922        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
923
924        let sliced = bytes.slice(1..4);
925        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
926
927        // Original unchanged
928        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
929    }
930}