Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10#[cfg(feature = "native")]
11use std::io::Write;
12use std::ops::Range;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16/// A slice of bytes that can be read asynchronously
17#[derive(Debug, Clone)]
18pub struct FileSlice {
19    data: OwnedBytes,
20    range: Range<u64>,
21}
22
23impl FileSlice {
24    pub fn new(data: OwnedBytes) -> Self {
25        let len = data.len() as u64;
26        Self {
27            data,
28            range: 0..len,
29        }
30    }
31
32    pub fn empty() -> Self {
33        Self {
34            data: OwnedBytes::empty(),
35            range: 0..0,
36        }
37    }
38
39    pub fn slice(&self, range: Range<u64>) -> Self {
40        let start = self.range.start + range.start;
41        let end = self.range.start + range.end;
42        Self {
43            data: self.data.clone(),
44            range: start..end,
45        }
46    }
47
48    pub fn len(&self) -> u64 {
49        self.range.end - self.range.start
50    }
51
52    pub fn is_empty(&self) -> bool {
53        self.range.start == self.range.end
54    }
55
56    /// Read the entire slice (async for network compatibility)
57    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
58        Ok(self
59            .data
60            .slice(self.range.start as usize..self.range.end as usize))
61    }
62
63    /// Read a specific range within this slice
64    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
65        let start = self.range.start + range.start;
66        let end = self.range.start + range.end;
67        if end > self.range.end {
68            return Err(io::Error::new(
69                io::ErrorKind::InvalidInput,
70                "Range out of bounds",
71            ));
72        }
73        Ok(self.data.slice(start as usize..end as usize))
74    }
75}
76
77/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle
78#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
79#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
80#[cfg(not(target_arch = "wasm32"))]
81pub trait AsyncFileRead: Send + Sync {
82    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
83    fn len(&self) -> u64;
84
85    /// Check if empty
86    fn is_empty(&self) -> bool {
87        self.len() == 0
88    }
89
90    /// Read a specific byte range
91    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
92
93    /// Read all bytes
94    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
95        self.read_bytes_range(0..self.len()).await
96    }
97}
98
99/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle (WASM version)
100#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
101#[cfg(target_arch = "wasm32")]
102pub trait AsyncFileRead {
103    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
104    fn len(&self) -> u64;
105
106    /// Check if empty
107    fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    /// Read a specific byte range
112    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
113
114    /// Read all bytes
115    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
116        self.read_bytes_range(0..self.len()).await
117    }
118}
119
120#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
121#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
122impl AsyncFileRead for FileSlice {
123    fn len(&self) -> u64 {
124        self.range.end - self.range.start
125    }
126
127    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
128        let start = self.range.start + range.start;
129        let end = self.range.start + range.end;
130        if end > self.range.end {
131            return Err(io::Error::new(
132                io::ErrorKind::InvalidInput,
133                "Range out of bounds",
134            ));
135        }
136        Ok(self.data.slice(start as usize..end as usize))
137    }
138}
139
140/// Callback type for lazy range reading
141#[cfg(not(target_arch = "wasm32"))]
142pub type RangeReadFn = Arc<
143    dyn Fn(
144            Range<u64>,
145        )
146            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
147        + Send
148        + Sync,
149>;
150
151#[cfg(target_arch = "wasm32")]
152pub type RangeReadFn = Arc<
153    dyn Fn(
154        Range<u64>,
155    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
156>;
157
158/// Lazy file handle that fetches ranges on demand via HTTP range requests
159/// Does NOT load the entire file into memory
160pub struct LazyFileHandle {
161    /// Total file size (u64 to support >4GB files on 32-bit platforms like WASM)
162    file_size: u64,
163    /// Callback to read a range from the underlying directory
164    read_fn: RangeReadFn,
165}
166
167impl std::fmt::Debug for LazyFileHandle {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("LazyFileHandle")
170            .field("file_size", &self.file_size)
171            .finish()
172    }
173}
174
175impl Clone for LazyFileHandle {
176    fn clone(&self) -> Self {
177        Self {
178            file_size: self.file_size,
179            read_fn: Arc::clone(&self.read_fn),
180        }
181    }
182}
183
184impl LazyFileHandle {
185    /// Create a new lazy file handle
186    pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
187        Self { file_size, read_fn }
188    }
189
190    /// Get file size
191    pub fn file_size(&self) -> u64 {
192        self.file_size
193    }
194
195    /// Create a sub-slice view (still lazy)
196    pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
197        LazyFileSlice {
198            handle: self.clone(),
199            offset: range.start,
200            len: range.end - range.start,
201        }
202    }
203}
204
205#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
206#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
207impl AsyncFileRead for LazyFileHandle {
208    fn len(&self) -> u64 {
209        self.file_size
210    }
211
212    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
213        if range.end > self.file_size {
214            return Err(io::Error::new(
215                io::ErrorKind::InvalidInput,
216                format!(
217                    "Range {:?} out of bounds (file size: {})",
218                    range, self.file_size
219                ),
220            ));
221        }
222        (self.read_fn)(range).await
223    }
224}
225
226/// A slice view into a LazyFileHandle
227#[derive(Clone)]
228pub struct LazyFileSlice {
229    handle: LazyFileHandle,
230    offset: u64,
231    len: u64,
232}
233
234impl std::fmt::Debug for LazyFileSlice {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        f.debug_struct("LazyFileSlice")
237            .field("offset", &self.offset)
238            .field("len", &self.len)
239            .finish()
240    }
241}
242
243impl LazyFileSlice {
244    /// Create a sub-slice
245    pub fn slice(&self, range: Range<u64>) -> Self {
246        Self {
247            handle: self.handle.clone(),
248            offset: self.offset + range.start,
249            len: range.end - range.start,
250        }
251    }
252}
253
254#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
255#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
256impl AsyncFileRead for LazyFileSlice {
257    fn len(&self) -> u64 {
258        self.len
259    }
260
261    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
262        if range.end > self.len {
263            return Err(io::Error::new(
264                io::ErrorKind::InvalidInput,
265                format!("Range {:?} out of bounds (slice len: {})", range, self.len),
266            ));
267        }
268        let abs_start = self.offset + range.start;
269        let abs_end = self.offset + range.end;
270        self.handle.read_bytes_range(abs_start..abs_end).await
271    }
272}
273
274/// Owned bytes with cheap cloning (Arc-backed)
275#[derive(Debug, Clone)]
276pub struct OwnedBytes {
277    data: Arc<Vec<u8>>,
278    range: Range<usize>,
279}
280
281impl OwnedBytes {
282    pub fn new(data: Vec<u8>) -> Self {
283        let len = data.len();
284        Self {
285            data: Arc::new(data),
286            range: 0..len,
287        }
288    }
289
290    pub fn empty() -> Self {
291        Self {
292            data: Arc::new(Vec::new()),
293            range: 0..0,
294        }
295    }
296
297    pub fn len(&self) -> usize {
298        self.range.len()
299    }
300
301    pub fn is_empty(&self) -> bool {
302        self.range.is_empty()
303    }
304
305    pub fn slice(&self, range: Range<usize>) -> Self {
306        let start = self.range.start + range.start;
307        let end = self.range.start + range.end;
308        Self {
309            data: Arc::clone(&self.data),
310            range: start..end,
311        }
312    }
313
314    pub fn as_slice(&self) -> &[u8] {
315        &self.data[self.range.clone()]
316    }
317
318    pub fn to_vec(&self) -> Vec<u8> {
319        self.as_slice().to_vec()
320    }
321}
322
323impl AsRef<[u8]> for OwnedBytes {
324    fn as_ref(&self) -> &[u8] {
325        self.as_slice()
326    }
327}
328
329impl std::ops::Deref for OwnedBytes {
330    type Target = [u8];
331
332    fn deref(&self) -> &Self::Target {
333        self.as_slice()
334    }
335}
336
337/// Async directory trait for reading index files
338#[cfg(not(target_arch = "wasm32"))]
339#[async_trait]
340pub trait Directory: Send + Sync + 'static {
341    /// Check if a file exists
342    async fn exists(&self, path: &Path) -> io::Result<bool>;
343
344    /// Get file size
345    async fn file_size(&self, path: &Path) -> io::Result<u64>;
346
347    /// Open a file for reading, returns a FileSlice (loads entire file)
348    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
349
350    /// Read a specific byte range from a file (optimized for network)
351    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
352
353    /// List files in directory
354    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
355
356    /// Open a lazy file handle that fetches ranges on demand
357    /// This is more efficient for large files over network
358    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
359}
360
361/// Async directory trait for reading index files (WASM version - no Send requirement)
362#[cfg(target_arch = "wasm32")]
363#[async_trait(?Send)]
364pub trait Directory: 'static {
365    /// Check if a file exists
366    async fn exists(&self, path: &Path) -> io::Result<bool>;
367
368    /// Get file size
369    async fn file_size(&self, path: &Path) -> io::Result<u64>;
370
371    /// Open a file for reading, returns a FileSlice (loads entire file)
372    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
373
374    /// Read a specific byte range from a file (optimized for network)
375    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
376
377    /// List files in directory
378    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
379
380    /// Open a lazy file handle that fetches ranges on demand
381    /// This is more efficient for large files over network
382    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
383}
384
385/// A writer for incrementally writing data to a directory file.
386///
387/// Avoids buffering entire files in memory during merge. File-backed
388/// directories write directly to disk; memory directories collect to Vec.
389pub trait StreamingWriter: io::Write + Send {
390    /// Finalize the write, making data available for reading.
391    fn finish(self: Box<Self>) -> io::Result<()>;
392
393    /// Bytes written so far.
394    fn bytes_written(&self) -> u64;
395}
396
397/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
398/// Used as default/fallback and for RamDirectory.
399struct BufferedStreamingWriter {
400    path: PathBuf,
401    buffer: Vec<u8>,
402    /// Callback to write the buffer to the directory on finish.
403    /// We store the files Arc directly for RamDirectory.
404    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
405}
406
407impl io::Write for BufferedStreamingWriter {
408    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
409        self.buffer.extend_from_slice(buf);
410        Ok(buf.len())
411    }
412
413    fn flush(&mut self) -> io::Result<()> {
414        Ok(())
415    }
416}
417
418impl StreamingWriter for BufferedStreamingWriter {
419    fn finish(self: Box<Self>) -> io::Result<()> {
420        self.files.write().insert(self.path, Arc::new(self.buffer));
421        Ok(())
422    }
423
424    fn bytes_written(&self) -> u64 {
425        self.buffer.len() as u64
426    }
427}
428
429/// StreamingWriter backed by std::fs::File for filesystem directories.
430#[cfg(feature = "native")]
431pub(crate) struct FileStreamingWriter {
432    pub(crate) file: std::fs::File,
433    pub(crate) written: u64,
434}
435
436#[cfg(feature = "native")]
437impl io::Write for FileStreamingWriter {
438    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
439        let n = self.file.write(buf)?;
440        self.written += n as u64;
441        Ok(n)
442    }
443
444    fn flush(&mut self) -> io::Result<()> {
445        self.file.flush()
446    }
447}
448
449#[cfg(feature = "native")]
450impl StreamingWriter for FileStreamingWriter {
451    fn finish(mut self: Box<Self>) -> io::Result<()> {
452        self.file.flush()?;
453        self.file.sync_all()?;
454        Ok(())
455    }
456
457    fn bytes_written(&self) -> u64 {
458        self.written
459    }
460}
461
462/// Async directory trait for writing index files
463#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
464#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
465pub trait DirectoryWriter: Directory {
466    /// Create/overwrite a file with data
467    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
468
469    /// Delete a file
470    async fn delete(&self, path: &Path) -> io::Result<()>;
471
472    /// Atomic rename
473    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
474
475    /// Sync all pending writes
476    async fn sync(&self) -> io::Result<()>;
477
478    /// Create a streaming writer for incremental file writes.
479    /// Call finish() on the returned writer to finalize.
480    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
481}
482
483/// In-memory directory for testing and small indexes
484#[derive(Debug, Default)]
485pub struct RamDirectory {
486    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
487}
488
489impl Clone for RamDirectory {
490    fn clone(&self) -> Self {
491        Self {
492            files: Arc::clone(&self.files),
493        }
494    }
495}
496
497impl RamDirectory {
498    pub fn new() -> Self {
499        Self::default()
500    }
501}
502
503#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
504#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
505impl Directory for RamDirectory {
506    async fn exists(&self, path: &Path) -> io::Result<bool> {
507        Ok(self.files.read().contains_key(path))
508    }
509
510    async fn file_size(&self, path: &Path) -> io::Result<u64> {
511        self.files
512            .read()
513            .get(path)
514            .map(|data| data.len() as u64)
515            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
516    }
517
518    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
519        let files = self.files.read();
520        let data = files
521            .get(path)
522            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
523
524        Ok(FileSlice::new(OwnedBytes {
525            data: Arc::clone(data),
526            range: 0..data.len(),
527        }))
528    }
529
530    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
531        let files = self.files.read();
532        let data = files
533            .get(path)
534            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
535
536        let start = range.start as usize;
537        let end = range.end as usize;
538
539        if end > data.len() {
540            return Err(io::Error::new(
541                io::ErrorKind::InvalidInput,
542                "Range out of bounds",
543            ));
544        }
545
546        Ok(OwnedBytes {
547            data: Arc::clone(data),
548            range: start..end,
549        })
550    }
551
552    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
553        let files = self.files.read();
554        Ok(files
555            .keys()
556            .filter(|p| p.starts_with(prefix))
557            .cloned()
558            .collect())
559    }
560
561    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
562        let files = Arc::clone(&self.files);
563        let path = path.to_path_buf();
564
565        let file_size = {
566            let files_guard = files.read();
567            files_guard
568                .get(&path)
569                .map(|data| data.len() as u64)
570                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
571        };
572
573        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
574            let files = Arc::clone(&files);
575            let path = path.clone();
576            Box::pin(async move {
577                let files_guard = files.read();
578                let data = files_guard
579                    .get(&path)
580                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
581
582                let start = range.start as usize;
583                let end = range.end as usize;
584                if end > data.len() {
585                    return Err(io::Error::new(
586                        io::ErrorKind::InvalidInput,
587                        "Range out of bounds",
588                    ));
589                }
590                Ok(OwnedBytes {
591                    data: Arc::clone(data),
592                    range: start..end,
593                })
594            })
595        });
596
597        Ok(LazyFileHandle::new(file_size, read_fn))
598    }
599}
600
601#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
602#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
603impl DirectoryWriter for RamDirectory {
604    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
605        self.files
606            .write()
607            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
608        Ok(())
609    }
610
611    async fn delete(&self, path: &Path) -> io::Result<()> {
612        self.files.write().remove(path);
613        Ok(())
614    }
615
616    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
617        let mut files = self.files.write();
618        if let Some(data) = files.remove(from) {
619            files.insert(to.to_path_buf(), data);
620        }
621        Ok(())
622    }
623
624    async fn sync(&self) -> io::Result<()> {
625        Ok(())
626    }
627
628    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
629        Ok(Box::new(BufferedStreamingWriter {
630            path: path.to_path_buf(),
631            buffer: Vec::new(),
632            files: Arc::clone(&self.files),
633        }))
634    }
635}
636
637/// Local filesystem directory with async IO via tokio
638#[cfg(feature = "native")]
639#[derive(Debug, Clone)]
640pub struct FsDirectory {
641    root: PathBuf,
642}
643
644#[cfg(feature = "native")]
645impl FsDirectory {
646    pub fn new(root: impl AsRef<Path>) -> Self {
647        Self {
648            root: root.as_ref().to_path_buf(),
649        }
650    }
651
652    fn resolve(&self, path: &Path) -> PathBuf {
653        self.root.join(path)
654    }
655}
656
657#[cfg(feature = "native")]
658#[async_trait]
659impl Directory for FsDirectory {
660    async fn exists(&self, path: &Path) -> io::Result<bool> {
661        let full_path = self.resolve(path);
662        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
663    }
664
665    async fn file_size(&self, path: &Path) -> io::Result<u64> {
666        let full_path = self.resolve(path);
667        let metadata = tokio::fs::metadata(&full_path).await?;
668        Ok(metadata.len())
669    }
670
671    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
672        let full_path = self.resolve(path);
673        let data = tokio::fs::read(&full_path).await?;
674        Ok(FileSlice::new(OwnedBytes::new(data)))
675    }
676
677    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
678        use tokio::io::{AsyncReadExt, AsyncSeekExt};
679
680        let full_path = self.resolve(path);
681        let mut file = tokio::fs::File::open(&full_path).await?;
682
683        file.seek(std::io::SeekFrom::Start(range.start)).await?;
684
685        let len = (range.end - range.start) as usize;
686        let mut buffer = vec![0u8; len];
687        file.read_exact(&mut buffer).await?;
688
689        Ok(OwnedBytes::new(buffer))
690    }
691
692    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
693        let full_path = self.resolve(prefix);
694        let mut entries = tokio::fs::read_dir(&full_path).await?;
695        let mut files = Vec::new();
696
697        while let Some(entry) = entries.next_entry().await? {
698            if entry.file_type().await?.is_file() {
699                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
700            }
701        }
702
703        Ok(files)
704    }
705
706    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
707        let full_path = self.resolve(path);
708        let metadata = tokio::fs::metadata(&full_path).await?;
709        let file_size = metadata.len();
710
711        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
712            let full_path = full_path.clone();
713            Box::pin(async move {
714                use tokio::io::{AsyncReadExt, AsyncSeekExt};
715
716                let mut file = tokio::fs::File::open(&full_path).await?;
717                file.seek(std::io::SeekFrom::Start(range.start)).await?;
718
719                let len = (range.end - range.start) as usize;
720                let mut buffer = vec![0u8; len];
721                file.read_exact(&mut buffer).await?;
722
723                Ok(OwnedBytes::new(buffer))
724            })
725        });
726
727        Ok(LazyFileHandle::new(file_size, read_fn))
728    }
729}
730
731#[cfg(feature = "native")]
732#[async_trait]
733impl DirectoryWriter for FsDirectory {
734    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
735        let full_path = self.resolve(path);
736
737        // Ensure parent directory exists
738        if let Some(parent) = full_path.parent() {
739            tokio::fs::create_dir_all(parent).await?;
740        }
741
742        tokio::fs::write(&full_path, data).await
743    }
744
745    async fn delete(&self, path: &Path) -> io::Result<()> {
746        let full_path = self.resolve(path);
747        tokio::fs::remove_file(&full_path).await
748    }
749
750    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
751        let from_path = self.resolve(from);
752        let to_path = self.resolve(to);
753        tokio::fs::rename(&from_path, &to_path).await
754    }
755
756    async fn sync(&self) -> io::Result<()> {
757        // fsync the directory
758        let dir = std::fs::File::open(&self.root)?;
759        dir.sync_all()?;
760        Ok(())
761    }
762
763    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
764        let full_path = self.resolve(path);
765        if let Some(parent) = full_path.parent() {
766            tokio::fs::create_dir_all(parent).await?;
767        }
768        let file = std::fs::File::create(&full_path)?;
769        Ok(Box::new(FileStreamingWriter { file, written: 0 }))
770    }
771}
772
773/// Caching wrapper for any Directory - caches file reads
774pub struct CachingDirectory<D: Directory> {
775    inner: D,
776    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
777    max_cached_bytes: usize,
778    current_bytes: RwLock<usize>,
779}
780
781impl<D: Directory> CachingDirectory<D> {
782    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
783        Self {
784            inner,
785            cache: RwLock::new(HashMap::new()),
786            max_cached_bytes,
787            current_bytes: RwLock::new(0),
788        }
789    }
790
791    fn try_cache(&self, path: &Path, data: &[u8]) {
792        let mut current = self.current_bytes.write();
793        if *current + data.len() <= self.max_cached_bytes {
794            self.cache
795                .write()
796                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
797            *current += data.len();
798        }
799    }
800}
801
802#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
803#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
804impl<D: Directory> Directory for CachingDirectory<D> {
805    async fn exists(&self, path: &Path) -> io::Result<bool> {
806        if self.cache.read().contains_key(path) {
807            return Ok(true);
808        }
809        self.inner.exists(path).await
810    }
811
812    async fn file_size(&self, path: &Path) -> io::Result<u64> {
813        if let Some(data) = self.cache.read().get(path) {
814            return Ok(data.len() as u64);
815        }
816        self.inner.file_size(path).await
817    }
818
819    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
820        // Check cache first
821        if let Some(data) = self.cache.read().get(path) {
822            return Ok(FileSlice::new(OwnedBytes {
823                data: Arc::clone(data),
824                range: 0..data.len(),
825            }));
826        }
827
828        // Read from inner and potentially cache
829        let slice = self.inner.open_read(path).await?;
830        let bytes = slice.read_bytes().await?;
831
832        self.try_cache(path, bytes.as_slice());
833
834        Ok(FileSlice::new(bytes))
835    }
836
837    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
838        // Check cache first
839        if let Some(data) = self.cache.read().get(path) {
840            let start = range.start as usize;
841            let end = range.end as usize;
842            return Ok(OwnedBytes {
843                data: Arc::clone(data),
844                range: start..end,
845            });
846        }
847
848        self.inner.read_range(path, range).await
849    }
850
851    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
852        self.inner.list_files(prefix).await
853    }
854
855    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
856        // For caching directory, delegate to inner - caching happens at read_range level
857        self.inner.open_lazy(path).await
858    }
859}
860
861#[cfg(test)]
862mod tests {
863    use super::*;
864
865    #[tokio::test]
866    async fn test_ram_directory() {
867        let dir = RamDirectory::new();
868
869        // Write file
870        dir.write(Path::new("test.txt"), b"hello world")
871            .await
872            .unwrap();
873
874        // Check exists
875        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
876        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
877
878        // Read file
879        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
880        let data = slice.read_bytes().await.unwrap();
881        assert_eq!(data.as_slice(), b"hello world");
882
883        // Read range
884        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
885        assert_eq!(range_data.as_slice(), b"hello");
886
887        // Delete
888        dir.delete(Path::new("test.txt")).await.unwrap();
889        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
890    }
891
892    #[tokio::test]
893    async fn test_file_slice() {
894        let data = OwnedBytes::new(b"hello world".to_vec());
895        let slice = FileSlice::new(data);
896
897        assert_eq!(slice.len(), 11);
898
899        let sub_slice = slice.slice(0..5);
900        let bytes = sub_slice.read_bytes().await.unwrap();
901        assert_eq!(bytes.as_slice(), b"hello");
902
903        let sub_slice2 = slice.slice(6..11);
904        let bytes2 = sub_slice2.read_bytes().await.unwrap();
905        assert_eq!(bytes2.as_slice(), b"world");
906    }
907
908    #[tokio::test]
909    async fn test_owned_bytes() {
910        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
911
912        assert_eq!(bytes.len(), 5);
913        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
914
915        let sliced = bytes.slice(1..4);
916        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
917
918        // Original unchanged
919        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
920    }
921}