Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io::{self, Write as _};
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// A slice of bytes that can be read asynchronously
15#[derive(Debug, Clone)]
16pub struct FileSlice {
17    data: OwnedBytes,
18    range: Range<u64>,
19}
20
21impl FileSlice {
22    pub fn new(data: OwnedBytes) -> Self {
23        let len = data.len() as u64;
24        Self {
25            data,
26            range: 0..len,
27        }
28    }
29
30    pub fn empty() -> Self {
31        Self {
32            data: OwnedBytes::empty(),
33            range: 0..0,
34        }
35    }
36
37    pub fn slice(&self, range: Range<u64>) -> Self {
38        let start = self.range.start + range.start;
39        let end = self.range.start + range.end;
40        Self {
41            data: self.data.clone(),
42            range: start..end,
43        }
44    }
45
46    pub fn len(&self) -> u64 {
47        self.range.end - self.range.start
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.range.start == self.range.end
52    }
53
54    /// Read the entire slice (async for network compatibility)
55    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
56        Ok(self
57            .data
58            .slice(self.range.start as usize..self.range.end as usize))
59    }
60
61    /// Read a specific range within this slice
62    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
63        let start = self.range.start + range.start;
64        let end = self.range.start + range.end;
65        if end > self.range.end {
66            return Err(io::Error::new(
67                io::ErrorKind::InvalidInput,
68                "Range out of bounds",
69            ));
70        }
71        Ok(self.data.slice(start as usize..end as usize))
72    }
73}
74
75/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle
76#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg(not(target_arch = "wasm32"))]
79pub trait AsyncFileRead: Send + Sync {
80    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
81    fn len(&self) -> u64;
82
83    /// Check if empty
84    fn is_empty(&self) -> bool {
85        self.len() == 0
86    }
87
88    /// Read a specific byte range
89    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
90
91    /// Read all bytes
92    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
93        self.read_bytes_range(0..self.len()).await
94    }
95}
96
97/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle (WASM version)
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg(target_arch = "wasm32")]
100pub trait AsyncFileRead {
101    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
102    fn len(&self) -> u64;
103
104    /// Check if empty
105    fn is_empty(&self) -> bool {
106        self.len() == 0
107    }
108
109    /// Read a specific byte range
110    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
111
112    /// Read all bytes
113    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
114        self.read_bytes_range(0..self.len()).await
115    }
116}
117
118#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
119#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
120impl AsyncFileRead for FileSlice {
121    fn len(&self) -> u64 {
122        self.range.end - self.range.start
123    }
124
125    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
126        let start = self.range.start + range.start;
127        let end = self.range.start + range.end;
128        if end > self.range.end {
129            return Err(io::Error::new(
130                io::ErrorKind::InvalidInput,
131                "Range out of bounds",
132            ));
133        }
134        Ok(self.data.slice(start as usize..end as usize))
135    }
136}
137
138/// Callback type for lazy range reading
139#[cfg(not(target_arch = "wasm32"))]
140pub type RangeReadFn = Arc<
141    dyn Fn(
142            Range<u64>,
143        )
144            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
145        + Send
146        + Sync,
147>;
148
149#[cfg(target_arch = "wasm32")]
150pub type RangeReadFn = Arc<
151    dyn Fn(
152        Range<u64>,
153    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
154>;
155
156/// Lazy file handle that fetches ranges on demand via HTTP range requests
157/// Does NOT load the entire file into memory
158pub struct LazyFileHandle {
159    /// Total file size (u64 to support >4GB files on 32-bit platforms like WASM)
160    file_size: u64,
161    /// Callback to read a range from the underlying directory
162    read_fn: RangeReadFn,
163}
164
165impl std::fmt::Debug for LazyFileHandle {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("LazyFileHandle")
168            .field("file_size", &self.file_size)
169            .finish()
170    }
171}
172
173impl Clone for LazyFileHandle {
174    fn clone(&self) -> Self {
175        Self {
176            file_size: self.file_size,
177            read_fn: Arc::clone(&self.read_fn),
178        }
179    }
180}
181
182impl LazyFileHandle {
183    /// Create a new lazy file handle
184    pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
185        Self { file_size, read_fn }
186    }
187
188    /// Get file size
189    pub fn file_size(&self) -> u64 {
190        self.file_size
191    }
192
193    /// Create a sub-slice view (still lazy)
194    pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
195        LazyFileSlice {
196            handle: self.clone(),
197            offset: range.start,
198            len: range.end - range.start,
199        }
200    }
201}
202
203#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
204#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
205impl AsyncFileRead for LazyFileHandle {
206    fn len(&self) -> u64 {
207        self.file_size
208    }
209
210    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
211        if range.end > self.file_size {
212            return Err(io::Error::new(
213                io::ErrorKind::InvalidInput,
214                format!(
215                    "Range {:?} out of bounds (file size: {})",
216                    range, self.file_size
217                ),
218            ));
219        }
220        (self.read_fn)(range).await
221    }
222}
223
224/// A slice view into a LazyFileHandle
225#[derive(Clone)]
226pub struct LazyFileSlice {
227    handle: LazyFileHandle,
228    offset: u64,
229    len: u64,
230}
231
232impl std::fmt::Debug for LazyFileSlice {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("LazyFileSlice")
235            .field("offset", &self.offset)
236            .field("len", &self.len)
237            .finish()
238    }
239}
240
241impl LazyFileSlice {
242    /// Create a sub-slice
243    pub fn slice(&self, range: Range<u64>) -> Self {
244        Self {
245            handle: self.handle.clone(),
246            offset: self.offset + range.start,
247            len: range.end - range.start,
248        }
249    }
250}
251
252#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
253#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
254impl AsyncFileRead for LazyFileSlice {
255    fn len(&self) -> u64 {
256        self.len
257    }
258
259    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
260        if range.end > self.len {
261            return Err(io::Error::new(
262                io::ErrorKind::InvalidInput,
263                format!("Range {:?} out of bounds (slice len: {})", range, self.len),
264            ));
265        }
266        let abs_start = self.offset + range.start;
267        let abs_end = self.offset + range.end;
268        self.handle.read_bytes_range(abs_start..abs_end).await
269    }
270}
271
272/// Owned bytes with cheap cloning (Arc-backed)
273#[derive(Debug, Clone)]
274pub struct OwnedBytes {
275    data: Arc<Vec<u8>>,
276    range: Range<usize>,
277}
278
279impl OwnedBytes {
280    pub fn new(data: Vec<u8>) -> Self {
281        let len = data.len();
282        Self {
283            data: Arc::new(data),
284            range: 0..len,
285        }
286    }
287
288    pub fn empty() -> Self {
289        Self {
290            data: Arc::new(Vec::new()),
291            range: 0..0,
292        }
293    }
294
295    pub fn len(&self) -> usize {
296        self.range.len()
297    }
298
299    pub fn is_empty(&self) -> bool {
300        self.range.is_empty()
301    }
302
303    pub fn slice(&self, range: Range<usize>) -> Self {
304        let start = self.range.start + range.start;
305        let end = self.range.start + range.end;
306        Self {
307            data: Arc::clone(&self.data),
308            range: start..end,
309        }
310    }
311
312    pub fn as_slice(&self) -> &[u8] {
313        &self.data[self.range.clone()]
314    }
315
316    pub fn to_vec(&self) -> Vec<u8> {
317        self.as_slice().to_vec()
318    }
319}
320
321impl AsRef<[u8]> for OwnedBytes {
322    fn as_ref(&self) -> &[u8] {
323        self.as_slice()
324    }
325}
326
327impl std::ops::Deref for OwnedBytes {
328    type Target = [u8];
329
330    fn deref(&self) -> &Self::Target {
331        self.as_slice()
332    }
333}
334
335/// Async directory trait for reading index files
336#[cfg(not(target_arch = "wasm32"))]
337#[async_trait]
338pub trait Directory: Send + Sync + 'static {
339    /// Check if a file exists
340    async fn exists(&self, path: &Path) -> io::Result<bool>;
341
342    /// Get file size
343    async fn file_size(&self, path: &Path) -> io::Result<u64>;
344
345    /// Open a file for reading, returns a FileSlice (loads entire file)
346    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
347
348    /// Read a specific byte range from a file (optimized for network)
349    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
350
351    /// List files in directory
352    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
353
354    /// Open a lazy file handle that fetches ranges on demand
355    /// This is more efficient for large files over network
356    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
357}
358
359/// Async directory trait for reading index files (WASM version - no Send requirement)
360#[cfg(target_arch = "wasm32")]
361#[async_trait(?Send)]
362pub trait Directory: 'static {
363    /// Check if a file exists
364    async fn exists(&self, path: &Path) -> io::Result<bool>;
365
366    /// Get file size
367    async fn file_size(&self, path: &Path) -> io::Result<u64>;
368
369    /// Open a file for reading, returns a FileSlice (loads entire file)
370    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
371
372    /// Read a specific byte range from a file (optimized for network)
373    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
374
375    /// List files in directory
376    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
377
378    /// Open a lazy file handle that fetches ranges on demand
379    /// This is more efficient for large files over network
380    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
381}
382
383/// A writer for incrementally writing data to a directory file.
384///
385/// Avoids buffering entire files in memory during merge. File-backed
386/// directories write directly to disk; memory directories collect to Vec.
387pub trait StreamingWriter: io::Write + Send {
388    /// Finalize the write, making data available for reading.
389    fn finish(self: Box<Self>) -> io::Result<()>;
390
391    /// Bytes written so far.
392    fn bytes_written(&self) -> u64;
393}
394
395/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
396/// Used as default/fallback and for RamDirectory.
397struct BufferedStreamingWriter {
398    path: PathBuf,
399    buffer: Vec<u8>,
400    /// Callback to write the buffer to the directory on finish.
401    /// We store the files Arc directly for RamDirectory.
402    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
403}
404
405impl io::Write for BufferedStreamingWriter {
406    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
407        self.buffer.extend_from_slice(buf);
408        Ok(buf.len())
409    }
410
411    fn flush(&mut self) -> io::Result<()> {
412        Ok(())
413    }
414}
415
416impl StreamingWriter for BufferedStreamingWriter {
417    fn finish(self: Box<Self>) -> io::Result<()> {
418        self.files.write().insert(self.path, Arc::new(self.buffer));
419        Ok(())
420    }
421
422    fn bytes_written(&self) -> u64 {
423        self.buffer.len() as u64
424    }
425}
426
427/// StreamingWriter backed by std::fs::File for filesystem directories.
428#[cfg(feature = "native")]
429pub(crate) struct FileStreamingWriter {
430    pub(crate) file: std::fs::File,
431    pub(crate) written: u64,
432}
433
434#[cfg(feature = "native")]
435impl io::Write for FileStreamingWriter {
436    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
437        let n = self.file.write(buf)?;
438        self.written += n as u64;
439        Ok(n)
440    }
441
442    fn flush(&mut self) -> io::Result<()> {
443        self.file.flush()
444    }
445}
446
447#[cfg(feature = "native")]
448impl StreamingWriter for FileStreamingWriter {
449    fn finish(mut self: Box<Self>) -> io::Result<()> {
450        self.file.flush()?;
451        self.file.sync_all()?;
452        Ok(())
453    }
454
455    fn bytes_written(&self) -> u64 {
456        self.written
457    }
458}
459
460/// Async directory trait for writing index files
461#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
462#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
463pub trait DirectoryWriter: Directory {
464    /// Create/overwrite a file with data
465    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
466
467    /// Delete a file
468    async fn delete(&self, path: &Path) -> io::Result<()>;
469
470    /// Atomic rename
471    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
472
473    /// Sync all pending writes
474    async fn sync(&self) -> io::Result<()>;
475
476    /// Create a streaming writer for incremental file writes.
477    /// Call finish() on the returned writer to finalize.
478    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
479}
480
481/// In-memory directory for testing and small indexes
482#[derive(Debug, Default)]
483pub struct RamDirectory {
484    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
485}
486
487impl Clone for RamDirectory {
488    fn clone(&self) -> Self {
489        Self {
490            files: Arc::clone(&self.files),
491        }
492    }
493}
494
495impl RamDirectory {
496    pub fn new() -> Self {
497        Self::default()
498    }
499}
500
501#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
502#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
503impl Directory for RamDirectory {
504    async fn exists(&self, path: &Path) -> io::Result<bool> {
505        Ok(self.files.read().contains_key(path))
506    }
507
508    async fn file_size(&self, path: &Path) -> io::Result<u64> {
509        self.files
510            .read()
511            .get(path)
512            .map(|data| data.len() as u64)
513            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
514    }
515
516    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
517        let files = self.files.read();
518        let data = files
519            .get(path)
520            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
521
522        Ok(FileSlice::new(OwnedBytes {
523            data: Arc::clone(data),
524            range: 0..data.len(),
525        }))
526    }
527
528    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
529        let files = self.files.read();
530        let data = files
531            .get(path)
532            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
533
534        let start = range.start as usize;
535        let end = range.end as usize;
536
537        if end > data.len() {
538            return Err(io::Error::new(
539                io::ErrorKind::InvalidInput,
540                "Range out of bounds",
541            ));
542        }
543
544        Ok(OwnedBytes {
545            data: Arc::clone(data),
546            range: start..end,
547        })
548    }
549
550    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
551        let files = self.files.read();
552        Ok(files
553            .keys()
554            .filter(|p| p.starts_with(prefix))
555            .cloned()
556            .collect())
557    }
558
559    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
560        let files = Arc::clone(&self.files);
561        let path = path.to_path_buf();
562
563        let file_size = {
564            let files_guard = files.read();
565            files_guard
566                .get(&path)
567                .map(|data| data.len() as u64)
568                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
569        };
570
571        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
572            let files = Arc::clone(&files);
573            let path = path.clone();
574            Box::pin(async move {
575                let files_guard = files.read();
576                let data = files_guard
577                    .get(&path)
578                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
579
580                let start = range.start as usize;
581                let end = range.end as usize;
582                if end > data.len() {
583                    return Err(io::Error::new(
584                        io::ErrorKind::InvalidInput,
585                        "Range out of bounds",
586                    ));
587                }
588                Ok(OwnedBytes {
589                    data: Arc::clone(data),
590                    range: start..end,
591                })
592            })
593        });
594
595        Ok(LazyFileHandle::new(file_size, read_fn))
596    }
597}
598
599#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
600#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
601impl DirectoryWriter for RamDirectory {
602    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
603        self.files
604            .write()
605            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
606        Ok(())
607    }
608
609    async fn delete(&self, path: &Path) -> io::Result<()> {
610        self.files.write().remove(path);
611        Ok(())
612    }
613
614    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
615        let mut files = self.files.write();
616        if let Some(data) = files.remove(from) {
617            files.insert(to.to_path_buf(), data);
618        }
619        Ok(())
620    }
621
622    async fn sync(&self) -> io::Result<()> {
623        Ok(())
624    }
625
626    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
627        Ok(Box::new(BufferedStreamingWriter {
628            path: path.to_path_buf(),
629            buffer: Vec::new(),
630            files: Arc::clone(&self.files),
631        }))
632    }
633}
634
635/// Local filesystem directory with async IO via tokio
636#[cfg(feature = "native")]
637#[derive(Debug, Clone)]
638pub struct FsDirectory {
639    root: PathBuf,
640}
641
642#[cfg(feature = "native")]
643impl FsDirectory {
644    pub fn new(root: impl AsRef<Path>) -> Self {
645        Self {
646            root: root.as_ref().to_path_buf(),
647        }
648    }
649
650    fn resolve(&self, path: &Path) -> PathBuf {
651        self.root.join(path)
652    }
653}
654
655#[cfg(feature = "native")]
656#[async_trait]
657impl Directory for FsDirectory {
658    async fn exists(&self, path: &Path) -> io::Result<bool> {
659        let full_path = self.resolve(path);
660        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
661    }
662
663    async fn file_size(&self, path: &Path) -> io::Result<u64> {
664        let full_path = self.resolve(path);
665        let metadata = tokio::fs::metadata(&full_path).await?;
666        Ok(metadata.len())
667    }
668
669    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
670        let full_path = self.resolve(path);
671        let data = tokio::fs::read(&full_path).await?;
672        Ok(FileSlice::new(OwnedBytes::new(data)))
673    }
674
675    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
676        use tokio::io::{AsyncReadExt, AsyncSeekExt};
677
678        let full_path = self.resolve(path);
679        let mut file = tokio::fs::File::open(&full_path).await?;
680
681        file.seek(std::io::SeekFrom::Start(range.start)).await?;
682
683        let len = (range.end - range.start) as usize;
684        let mut buffer = vec![0u8; len];
685        file.read_exact(&mut buffer).await?;
686
687        Ok(OwnedBytes::new(buffer))
688    }
689
690    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
691        let full_path = self.resolve(prefix);
692        let mut entries = tokio::fs::read_dir(&full_path).await?;
693        let mut files = Vec::new();
694
695        while let Some(entry) = entries.next_entry().await? {
696            if entry.file_type().await?.is_file() {
697                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
698            }
699        }
700
701        Ok(files)
702    }
703
704    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
705        let full_path = self.resolve(path);
706        let metadata = tokio::fs::metadata(&full_path).await?;
707        let file_size = metadata.len();
708
709        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
710            let full_path = full_path.clone();
711            Box::pin(async move {
712                use tokio::io::{AsyncReadExt, AsyncSeekExt};
713
714                let mut file = tokio::fs::File::open(&full_path).await?;
715                file.seek(std::io::SeekFrom::Start(range.start)).await?;
716
717                let len = (range.end - range.start) as usize;
718                let mut buffer = vec![0u8; len];
719                file.read_exact(&mut buffer).await?;
720
721                Ok(OwnedBytes::new(buffer))
722            })
723        });
724
725        Ok(LazyFileHandle::new(file_size, read_fn))
726    }
727}
728
729#[cfg(feature = "native")]
730#[async_trait]
731impl DirectoryWriter for FsDirectory {
732    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
733        let full_path = self.resolve(path);
734
735        // Ensure parent directory exists
736        if let Some(parent) = full_path.parent() {
737            tokio::fs::create_dir_all(parent).await?;
738        }
739
740        tokio::fs::write(&full_path, data).await
741    }
742
743    async fn delete(&self, path: &Path) -> io::Result<()> {
744        let full_path = self.resolve(path);
745        tokio::fs::remove_file(&full_path).await
746    }
747
748    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
749        let from_path = self.resolve(from);
750        let to_path = self.resolve(to);
751        tokio::fs::rename(&from_path, &to_path).await
752    }
753
754    async fn sync(&self) -> io::Result<()> {
755        // fsync the directory
756        let dir = std::fs::File::open(&self.root)?;
757        dir.sync_all()?;
758        Ok(())
759    }
760
761    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
762        let full_path = self.resolve(path);
763        if let Some(parent) = full_path.parent() {
764            tokio::fs::create_dir_all(parent).await?;
765        }
766        let file = std::fs::File::create(&full_path)?;
767        Ok(Box::new(FileStreamingWriter { file, written: 0 }))
768    }
769}
770
771/// Caching wrapper for any Directory - caches file reads
772pub struct CachingDirectory<D: Directory> {
773    inner: D,
774    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
775    max_cached_bytes: usize,
776    current_bytes: RwLock<usize>,
777}
778
779impl<D: Directory> CachingDirectory<D> {
780    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
781        Self {
782            inner,
783            cache: RwLock::new(HashMap::new()),
784            max_cached_bytes,
785            current_bytes: RwLock::new(0),
786        }
787    }
788
789    fn try_cache(&self, path: &Path, data: &[u8]) {
790        let mut current = self.current_bytes.write();
791        if *current + data.len() <= self.max_cached_bytes {
792            self.cache
793                .write()
794                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
795            *current += data.len();
796        }
797    }
798}
799
800#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
801#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
802impl<D: Directory> Directory for CachingDirectory<D> {
803    async fn exists(&self, path: &Path) -> io::Result<bool> {
804        if self.cache.read().contains_key(path) {
805            return Ok(true);
806        }
807        self.inner.exists(path).await
808    }
809
810    async fn file_size(&self, path: &Path) -> io::Result<u64> {
811        if let Some(data) = self.cache.read().get(path) {
812            return Ok(data.len() as u64);
813        }
814        self.inner.file_size(path).await
815    }
816
817    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
818        // Check cache first
819        if let Some(data) = self.cache.read().get(path) {
820            return Ok(FileSlice::new(OwnedBytes {
821                data: Arc::clone(data),
822                range: 0..data.len(),
823            }));
824        }
825
826        // Read from inner and potentially cache
827        let slice = self.inner.open_read(path).await?;
828        let bytes = slice.read_bytes().await?;
829
830        self.try_cache(path, bytes.as_slice());
831
832        Ok(FileSlice::new(bytes))
833    }
834
835    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
836        // Check cache first
837        if let Some(data) = self.cache.read().get(path) {
838            let start = range.start as usize;
839            let end = range.end as usize;
840            return Ok(OwnedBytes {
841                data: Arc::clone(data),
842                range: start..end,
843            });
844        }
845
846        self.inner.read_range(path, range).await
847    }
848
849    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
850        self.inner.list_files(prefix).await
851    }
852
853    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
854        // For caching directory, delegate to inner - caching happens at read_range level
855        self.inner.open_lazy(path).await
856    }
857}
858
859#[cfg(test)]
860mod tests {
861    use super::*;
862
863    #[tokio::test]
864    async fn test_ram_directory() {
865        let dir = RamDirectory::new();
866
867        // Write file
868        dir.write(Path::new("test.txt"), b"hello world")
869            .await
870            .unwrap();
871
872        // Check exists
873        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
874        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
875
876        // Read file
877        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
878        let data = slice.read_bytes().await.unwrap();
879        assert_eq!(data.as_slice(), b"hello world");
880
881        // Read range
882        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
883        assert_eq!(range_data.as_slice(), b"hello");
884
885        // Delete
886        dir.delete(Path::new("test.txt")).await.unwrap();
887        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
888    }
889
890    #[tokio::test]
891    async fn test_file_slice() {
892        let data = OwnedBytes::new(b"hello world".to_vec());
893        let slice = FileSlice::new(data);
894
895        assert_eq!(slice.len(), 11);
896
897        let sub_slice = slice.slice(0..5);
898        let bytes = sub_slice.read_bytes().await.unwrap();
899        assert_eq!(bytes.as_slice(), b"hello");
900
901        let sub_slice2 = slice.slice(6..11);
902        let bytes2 = sub_slice2.read_bytes().await.unwrap();
903        assert_eq!(bytes2.as_slice(), b"world");
904    }
905
906    #[tokio::test]
907    async fn test_owned_bytes() {
908        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
909
910        assert_eq!(bytes.len(), 5);
911        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
912
913        let sliced = bytes.slice(1..4);
914        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
915
916        // Original unchanged
917        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
918    }
919}