Skip to main content

hermes_core/directories/
directory.rs

1//! Async Directory abstraction for IO operations
2//!
3//! Supports network, local filesystem, and in-memory storage.
4//! All reads are async to minimize blocking on network latency.
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// A slice of bytes that can be read asynchronously
15#[derive(Debug, Clone)]
16pub struct FileSlice {
17    data: OwnedBytes,
18    range: Range<u64>,
19}
20
21impl FileSlice {
22    pub fn new(data: OwnedBytes) -> Self {
23        let len = data.len() as u64;
24        Self {
25            data,
26            range: 0..len,
27        }
28    }
29
30    pub fn empty() -> Self {
31        Self {
32            data: OwnedBytes::empty(),
33            range: 0..0,
34        }
35    }
36
37    pub fn slice(&self, range: Range<u64>) -> Self {
38        let start = self.range.start + range.start;
39        let end = self.range.start + range.end;
40        Self {
41            data: self.data.clone(),
42            range: start..end,
43        }
44    }
45
46    pub fn len(&self) -> u64 {
47        self.range.end - self.range.start
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.range.start == self.range.end
52    }
53
54    /// Read the entire slice (async for network compatibility)
55    pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
56        Ok(self
57            .data
58            .slice(self.range.start as usize..self.range.end as usize))
59    }
60
61    /// Read a specific range within this slice
62    pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
63        let start = self.range.start + range.start;
64        let end = self.range.start + range.end;
65        if end > self.range.end {
66            return Err(io::Error::new(
67                io::ErrorKind::InvalidInput,
68                "Range out of bounds",
69            ));
70        }
71        Ok(self.data.slice(start as usize..end as usize))
72    }
73}
74
75/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle
76#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg(not(target_arch = "wasm32"))]
79pub trait AsyncFileRead: Send + Sync {
80    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
81    fn len(&self) -> u64;
82
83    /// Check if empty
84    fn is_empty(&self) -> bool {
85        self.len() == 0
86    }
87
88    /// Read a specific byte range
89    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
90
91    /// Read all bytes
92    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
93        self.read_bytes_range(0..self.len()).await
94    }
95}
96
97/// Trait for async range reading - implemented by both FileSlice and LazyFileHandle (WASM version)
98#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg(target_arch = "wasm32")]
100pub trait AsyncFileRead {
101    /// Get the total length of the file/slice (u64 to support >4GB files on 32-bit platforms)
102    fn len(&self) -> u64;
103
104    /// Check if empty
105    fn is_empty(&self) -> bool {
106        self.len() == 0
107    }
108
109    /// Read a specific byte range
110    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
111
112    /// Read all bytes
113    async fn read_bytes(&self) -> io::Result<OwnedBytes> {
114        self.read_bytes_range(0..self.len()).await
115    }
116}
117
118#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
119#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
120impl AsyncFileRead for FileSlice {
121    fn len(&self) -> u64 {
122        self.range.end - self.range.start
123    }
124
125    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
126        let start = self.range.start + range.start;
127        let end = self.range.start + range.end;
128        if end > self.range.end {
129            return Err(io::Error::new(
130                io::ErrorKind::InvalidInput,
131                "Range out of bounds",
132            ));
133        }
134        Ok(self.data.slice(start as usize..end as usize))
135    }
136}
137
138/// Callback type for lazy range reading
139#[cfg(not(target_arch = "wasm32"))]
140pub type RangeReadFn = Arc<
141    dyn Fn(
142            Range<u64>,
143        )
144            -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
145        + Send
146        + Sync,
147>;
148
149#[cfg(target_arch = "wasm32")]
150pub type RangeReadFn = Arc<
151    dyn Fn(
152        Range<u64>,
153    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
154>;
155
156/// Lazy file handle that fetches ranges on demand via HTTP range requests
157/// Does NOT load the entire file into memory
158pub struct LazyFileHandle {
159    /// Total file size (u64 to support >4GB files on 32-bit platforms like WASM)
160    file_size: u64,
161    /// Callback to read a range from the underlying directory
162    read_fn: RangeReadFn,
163}
164
165impl std::fmt::Debug for LazyFileHandle {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        f.debug_struct("LazyFileHandle")
168            .field("file_size", &self.file_size)
169            .finish()
170    }
171}
172
173impl Clone for LazyFileHandle {
174    fn clone(&self) -> Self {
175        Self {
176            file_size: self.file_size,
177            read_fn: Arc::clone(&self.read_fn),
178        }
179    }
180}
181
182impl LazyFileHandle {
183    /// Create a new lazy file handle
184    pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
185        Self { file_size, read_fn }
186    }
187
188    /// Get file size
189    pub fn file_size(&self) -> u64 {
190        self.file_size
191    }
192
193    /// Create a sub-slice view (still lazy)
194    pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
195        LazyFileSlice {
196            handle: self.clone(),
197            offset: range.start,
198            len: range.end - range.start,
199        }
200    }
201}
202
203#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
204#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
205impl AsyncFileRead for LazyFileHandle {
206    fn len(&self) -> u64 {
207        self.file_size
208    }
209
210    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
211        if range.end > self.file_size {
212            return Err(io::Error::new(
213                io::ErrorKind::InvalidInput,
214                format!(
215                    "Range {:?} out of bounds (file size: {})",
216                    range, self.file_size
217                ),
218            ));
219        }
220        (self.read_fn)(range).await
221    }
222}
223
224/// A slice view into a LazyFileHandle
225#[derive(Clone)]
226pub struct LazyFileSlice {
227    handle: LazyFileHandle,
228    offset: u64,
229    len: u64,
230}
231
232impl std::fmt::Debug for LazyFileSlice {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("LazyFileSlice")
235            .field("offset", &self.offset)
236            .field("len", &self.len)
237            .finish()
238    }
239}
240
241impl LazyFileSlice {
242    /// Create a slice from a handle with absolute offset and length
243    pub fn from_handle_range(handle: &LazyFileHandle, offset: u64, len: u64) -> Self {
244        Self {
245            handle: handle.clone(),
246            offset,
247            len,
248        }
249    }
250
251    /// Create a sub-slice
252    pub fn slice(&self, range: Range<u64>) -> Self {
253        Self {
254            handle: self.handle.clone(),
255            offset: self.offset + range.start,
256            len: range.end - range.start,
257        }
258    }
259}
260
261#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
262#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
263impl AsyncFileRead for LazyFileSlice {
264    fn len(&self) -> u64 {
265        self.len
266    }
267
268    async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
269        if range.end > self.len {
270            return Err(io::Error::new(
271                io::ErrorKind::InvalidInput,
272                format!("Range {:?} out of bounds (slice len: {})", range, self.len),
273            ));
274        }
275        let abs_start = self.offset + range.start;
276        let abs_end = self.offset + range.end;
277        self.handle.read_bytes_range(abs_start..abs_end).await
278    }
279}
280
281/// Owned bytes with cheap cloning (Arc-backed)
282#[derive(Debug, Clone)]
283pub struct OwnedBytes {
284    data: Arc<Vec<u8>>,
285    range: Range<usize>,
286}
287
288impl OwnedBytes {
289    pub fn new(data: Vec<u8>) -> Self {
290        let len = data.len();
291        Self {
292            data: Arc::new(data),
293            range: 0..len,
294        }
295    }
296
297    pub fn empty() -> Self {
298        Self {
299            data: Arc::new(Vec::new()),
300            range: 0..0,
301        }
302    }
303
304    pub fn len(&self) -> usize {
305        self.range.len()
306    }
307
308    pub fn is_empty(&self) -> bool {
309        self.range.is_empty()
310    }
311
312    pub fn slice(&self, range: Range<usize>) -> Self {
313        let start = self.range.start + range.start;
314        let end = self.range.start + range.end;
315        Self {
316            data: Arc::clone(&self.data),
317            range: start..end,
318        }
319    }
320
321    pub fn as_slice(&self) -> &[u8] {
322        &self.data[self.range.clone()]
323    }
324
325    pub fn to_vec(&self) -> Vec<u8> {
326        self.as_slice().to_vec()
327    }
328}
329
330impl AsRef<[u8]> for OwnedBytes {
331    fn as_ref(&self) -> &[u8] {
332        self.as_slice()
333    }
334}
335
336impl std::ops::Deref for OwnedBytes {
337    type Target = [u8];
338
339    fn deref(&self) -> &Self::Target {
340        self.as_slice()
341    }
342}
343
344/// Async directory trait for reading index files
345#[cfg(not(target_arch = "wasm32"))]
346#[async_trait]
347pub trait Directory: Send + Sync + 'static {
348    /// Check if a file exists
349    async fn exists(&self, path: &Path) -> io::Result<bool>;
350
351    /// Get file size
352    async fn file_size(&self, path: &Path) -> io::Result<u64>;
353
354    /// Open a file for reading, returns a FileSlice (loads entire file)
355    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
356
357    /// Read a specific byte range from a file (optimized for network)
358    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
359
360    /// List files in directory
361    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
362
363    /// Open a lazy file handle that fetches ranges on demand
364    /// This is more efficient for large files over network
365    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
366}
367
368/// Async directory trait for reading index files (WASM version - no Send requirement)
369#[cfg(target_arch = "wasm32")]
370#[async_trait(?Send)]
371pub trait Directory: 'static {
372    /// Check if a file exists
373    async fn exists(&self, path: &Path) -> io::Result<bool>;
374
375    /// Get file size
376    async fn file_size(&self, path: &Path) -> io::Result<u64>;
377
378    /// Open a file for reading, returns a FileSlice (loads entire file)
379    async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
380
381    /// Read a specific byte range from a file (optimized for network)
382    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
383
384    /// List files in directory
385    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
386
387    /// Open a lazy file handle that fetches ranges on demand
388    /// This is more efficient for large files over network
389    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
390}
391
392/// A writer for incrementally writing data to a directory file.
393///
394/// Avoids buffering entire files in memory during merge. File-backed
395/// directories write directly to disk; memory directories collect to Vec.
396pub trait StreamingWriter: io::Write + Send {
397    /// Finalize the write, making data available for reading.
398    fn finish(self: Box<Self>) -> io::Result<()>;
399
400    /// Bytes written so far.
401    fn bytes_written(&self) -> u64;
402}
403
404/// StreamingWriter backed by Vec<u8>, finalized via DirectoryWriter::write.
405/// Used as default/fallback and for RamDirectory.
406struct BufferedStreamingWriter {
407    path: PathBuf,
408    buffer: Vec<u8>,
409    /// Callback to write the buffer to the directory on finish.
410    /// We store the files Arc directly for RamDirectory.
411    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
412}
413
414impl io::Write for BufferedStreamingWriter {
415    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
416        self.buffer.extend_from_slice(buf);
417        Ok(buf.len())
418    }
419
420    fn flush(&mut self) -> io::Result<()> {
421        Ok(())
422    }
423}
424
425impl StreamingWriter for BufferedStreamingWriter {
426    fn finish(self: Box<Self>) -> io::Result<()> {
427        self.files.write().insert(self.path, Arc::new(self.buffer));
428        Ok(())
429    }
430
431    fn bytes_written(&self) -> u64 {
432        self.buffer.len() as u64
433    }
434}
435
436/// Buffer size for FileStreamingWriter (8 MB).
437/// Large enough to coalesce millions of tiny writes (e.g. per-vector doc_id writes)
438/// into efficient sequential I/O.
439#[cfg(feature = "native")]
440const FILE_STREAMING_BUF_SIZE: usize = 8 * 1024 * 1024;
441
442/// StreamingWriter backed by a buffered std::fs::File for filesystem directories.
443#[cfg(feature = "native")]
444pub(crate) struct FileStreamingWriter {
445    pub(crate) file: io::BufWriter<std::fs::File>,
446    pub(crate) written: u64,
447}
448
449#[cfg(feature = "native")]
450impl FileStreamingWriter {
451    pub(crate) fn new(file: std::fs::File) -> Self {
452        Self {
453            file: io::BufWriter::with_capacity(FILE_STREAMING_BUF_SIZE, file),
454            written: 0,
455        }
456    }
457}
458
459#[cfg(feature = "native")]
460impl io::Write for FileStreamingWriter {
461    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
462        let n = self.file.write(buf)?;
463        self.written += n as u64;
464        Ok(n)
465    }
466
467    fn flush(&mut self) -> io::Result<()> {
468        self.file.flush()
469    }
470}
471
472#[cfg(feature = "native")]
473impl StreamingWriter for FileStreamingWriter {
474    fn finish(self: Box<Self>) -> io::Result<()> {
475        let file = self.file.into_inner().map_err(|e| e.into_error())?;
476        file.sync_all()?;
477        Ok(())
478    }
479
480    fn bytes_written(&self) -> u64 {
481        self.written
482    }
483}
484
485/// Async directory trait for writing index files
486#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
487#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
488pub trait DirectoryWriter: Directory {
489    /// Create/overwrite a file with data
490    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
491
492    /// Delete a file
493    async fn delete(&self, path: &Path) -> io::Result<()>;
494
495    /// Atomic rename
496    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
497
498    /// Sync all pending writes
499    async fn sync(&self) -> io::Result<()>;
500
501    /// Create a streaming writer for incremental file writes.
502    /// Call finish() on the returned writer to finalize.
503    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
504}
505
506/// In-memory directory for testing and small indexes
507#[derive(Debug, Default)]
508pub struct RamDirectory {
509    files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
510}
511
512impl Clone for RamDirectory {
513    fn clone(&self) -> Self {
514        Self {
515            files: Arc::clone(&self.files),
516        }
517    }
518}
519
520impl RamDirectory {
521    pub fn new() -> Self {
522        Self::default()
523    }
524}
525
526#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
527#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
528impl Directory for RamDirectory {
529    async fn exists(&self, path: &Path) -> io::Result<bool> {
530        Ok(self.files.read().contains_key(path))
531    }
532
533    async fn file_size(&self, path: &Path) -> io::Result<u64> {
534        self.files
535            .read()
536            .get(path)
537            .map(|data| data.len() as u64)
538            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
539    }
540
541    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
542        let files = self.files.read();
543        let data = files
544            .get(path)
545            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
546
547        Ok(FileSlice::new(OwnedBytes {
548            data: Arc::clone(data),
549            range: 0..data.len(),
550        }))
551    }
552
553    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
554        let files = self.files.read();
555        let data = files
556            .get(path)
557            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
558
559        let start = range.start as usize;
560        let end = range.end as usize;
561
562        if end > data.len() {
563            return Err(io::Error::new(
564                io::ErrorKind::InvalidInput,
565                "Range out of bounds",
566            ));
567        }
568
569        Ok(OwnedBytes {
570            data: Arc::clone(data),
571            range: start..end,
572        })
573    }
574
575    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
576        let files = self.files.read();
577        Ok(files
578            .keys()
579            .filter(|p| p.starts_with(prefix))
580            .cloned()
581            .collect())
582    }
583
584    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
585        let files = Arc::clone(&self.files);
586        let path = path.to_path_buf();
587
588        let file_size = {
589            let files_guard = files.read();
590            files_guard
591                .get(&path)
592                .map(|data| data.len() as u64)
593                .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
594        };
595
596        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
597            let files = Arc::clone(&files);
598            let path = path.clone();
599            Box::pin(async move {
600                let files_guard = files.read();
601                let data = files_guard
602                    .get(&path)
603                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
604
605                let start = range.start as usize;
606                let end = range.end as usize;
607                if end > data.len() {
608                    return Err(io::Error::new(
609                        io::ErrorKind::InvalidInput,
610                        "Range out of bounds",
611                    ));
612                }
613                Ok(OwnedBytes {
614                    data: Arc::clone(data),
615                    range: start..end,
616                })
617            })
618        });
619
620        Ok(LazyFileHandle::new(file_size, read_fn))
621    }
622}
623
624#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
625#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
626impl DirectoryWriter for RamDirectory {
627    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
628        self.files
629            .write()
630            .insert(path.to_path_buf(), Arc::new(data.to_vec()));
631        Ok(())
632    }
633
634    async fn delete(&self, path: &Path) -> io::Result<()> {
635        self.files.write().remove(path);
636        Ok(())
637    }
638
639    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
640        let mut files = self.files.write();
641        if let Some(data) = files.remove(from) {
642            files.insert(to.to_path_buf(), data);
643        }
644        Ok(())
645    }
646
647    async fn sync(&self) -> io::Result<()> {
648        Ok(())
649    }
650
651    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
652        Ok(Box::new(BufferedStreamingWriter {
653            path: path.to_path_buf(),
654            buffer: Vec::new(),
655            files: Arc::clone(&self.files),
656        }))
657    }
658}
659
660/// Local filesystem directory with async IO via tokio
661#[cfg(feature = "native")]
662#[derive(Debug, Clone)]
663pub struct FsDirectory {
664    root: PathBuf,
665}
666
667#[cfg(feature = "native")]
668impl FsDirectory {
669    pub fn new(root: impl AsRef<Path>) -> Self {
670        Self {
671            root: root.as_ref().to_path_buf(),
672        }
673    }
674
675    fn resolve(&self, path: &Path) -> PathBuf {
676        self.root.join(path)
677    }
678}
679
680#[cfg(feature = "native")]
681#[async_trait]
682impl Directory for FsDirectory {
683    async fn exists(&self, path: &Path) -> io::Result<bool> {
684        let full_path = self.resolve(path);
685        Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
686    }
687
688    async fn file_size(&self, path: &Path) -> io::Result<u64> {
689        let full_path = self.resolve(path);
690        let metadata = tokio::fs::metadata(&full_path).await?;
691        Ok(metadata.len())
692    }
693
694    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
695        let full_path = self.resolve(path);
696        let data = tokio::fs::read(&full_path).await?;
697        Ok(FileSlice::new(OwnedBytes::new(data)))
698    }
699
700    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
701        use tokio::io::{AsyncReadExt, AsyncSeekExt};
702
703        let full_path = self.resolve(path);
704        let mut file = tokio::fs::File::open(&full_path).await?;
705
706        file.seek(std::io::SeekFrom::Start(range.start)).await?;
707
708        let len = (range.end - range.start) as usize;
709        let mut buffer = vec![0u8; len];
710        file.read_exact(&mut buffer).await?;
711
712        Ok(OwnedBytes::new(buffer))
713    }
714
715    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
716        let full_path = self.resolve(prefix);
717        let mut entries = tokio::fs::read_dir(&full_path).await?;
718        let mut files = Vec::new();
719
720        while let Some(entry) = entries.next_entry().await? {
721            if entry.file_type().await?.is_file() {
722                files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
723            }
724        }
725
726        Ok(files)
727    }
728
729    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
730        let full_path = self.resolve(path);
731        let metadata = tokio::fs::metadata(&full_path).await?;
732        let file_size = metadata.len();
733
734        let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
735            let full_path = full_path.clone();
736            Box::pin(async move {
737                use tokio::io::{AsyncReadExt, AsyncSeekExt};
738
739                let mut file = tokio::fs::File::open(&full_path).await?;
740                file.seek(std::io::SeekFrom::Start(range.start)).await?;
741
742                let len = (range.end - range.start) as usize;
743                let mut buffer = vec![0u8; len];
744                file.read_exact(&mut buffer).await?;
745
746                Ok(OwnedBytes::new(buffer))
747            })
748        });
749
750        Ok(LazyFileHandle::new(file_size, read_fn))
751    }
752}
753
754#[cfg(feature = "native")]
755#[async_trait]
756impl DirectoryWriter for FsDirectory {
757    async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
758        let full_path = self.resolve(path);
759
760        // Ensure parent directory exists
761        if let Some(parent) = full_path.parent() {
762            tokio::fs::create_dir_all(parent).await?;
763        }
764
765        tokio::fs::write(&full_path, data).await
766    }
767
768    async fn delete(&self, path: &Path) -> io::Result<()> {
769        let full_path = self.resolve(path);
770        tokio::fs::remove_file(&full_path).await
771    }
772
773    async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
774        let from_path = self.resolve(from);
775        let to_path = self.resolve(to);
776        tokio::fs::rename(&from_path, &to_path).await
777    }
778
779    async fn sync(&self) -> io::Result<()> {
780        // fsync the directory
781        let dir = std::fs::File::open(&self.root)?;
782        dir.sync_all()?;
783        Ok(())
784    }
785
786    async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
787        let full_path = self.resolve(path);
788        if let Some(parent) = full_path.parent() {
789            tokio::fs::create_dir_all(parent).await?;
790        }
791        let file = std::fs::File::create(&full_path)?;
792        Ok(Box::new(FileStreamingWriter::new(file)))
793    }
794}
795
796/// Caching wrapper for any Directory - caches file reads
797pub struct CachingDirectory<D: Directory> {
798    inner: D,
799    cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
800    max_cached_bytes: usize,
801    current_bytes: RwLock<usize>,
802}
803
804impl<D: Directory> CachingDirectory<D> {
805    pub fn new(inner: D, max_cached_bytes: usize) -> Self {
806        Self {
807            inner,
808            cache: RwLock::new(HashMap::new()),
809            max_cached_bytes,
810            current_bytes: RwLock::new(0),
811        }
812    }
813
814    fn try_cache(&self, path: &Path, data: &[u8]) {
815        let mut current = self.current_bytes.write();
816        if *current + data.len() <= self.max_cached_bytes {
817            self.cache
818                .write()
819                .insert(path.to_path_buf(), Arc::new(data.to_vec()));
820            *current += data.len();
821        }
822    }
823}
824
825#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
826#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
827impl<D: Directory> Directory for CachingDirectory<D> {
828    async fn exists(&self, path: &Path) -> io::Result<bool> {
829        if self.cache.read().contains_key(path) {
830            return Ok(true);
831        }
832        self.inner.exists(path).await
833    }
834
835    async fn file_size(&self, path: &Path) -> io::Result<u64> {
836        if let Some(data) = self.cache.read().get(path) {
837            return Ok(data.len() as u64);
838        }
839        self.inner.file_size(path).await
840    }
841
842    async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
843        // Check cache first
844        if let Some(data) = self.cache.read().get(path) {
845            return Ok(FileSlice::new(OwnedBytes {
846                data: Arc::clone(data),
847                range: 0..data.len(),
848            }));
849        }
850
851        // Read from inner and potentially cache
852        let slice = self.inner.open_read(path).await?;
853        let bytes = slice.read_bytes().await?;
854
855        self.try_cache(path, bytes.as_slice());
856
857        Ok(FileSlice::new(bytes))
858    }
859
860    async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
861        // Check cache first
862        if let Some(data) = self.cache.read().get(path) {
863            let start = range.start as usize;
864            let end = range.end as usize;
865            return Ok(OwnedBytes {
866                data: Arc::clone(data),
867                range: start..end,
868            });
869        }
870
871        self.inner.read_range(path, range).await
872    }
873
874    async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
875        self.inner.list_files(prefix).await
876    }
877
878    async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
879        // For caching directory, delegate to inner - caching happens at read_range level
880        self.inner.open_lazy(path).await
881    }
882}
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887
888    #[tokio::test]
889    async fn test_ram_directory() {
890        let dir = RamDirectory::new();
891
892        // Write file
893        dir.write(Path::new("test.txt"), b"hello world")
894            .await
895            .unwrap();
896
897        // Check exists
898        assert!(dir.exists(Path::new("test.txt")).await.unwrap());
899        assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
900
901        // Read file
902        let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
903        let data = slice.read_bytes().await.unwrap();
904        assert_eq!(data.as_slice(), b"hello world");
905
906        // Read range
907        let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
908        assert_eq!(range_data.as_slice(), b"hello");
909
910        // Delete
911        dir.delete(Path::new("test.txt")).await.unwrap();
912        assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
913    }
914
915    #[tokio::test]
916    async fn test_file_slice() {
917        let data = OwnedBytes::new(b"hello world".to_vec());
918        let slice = FileSlice::new(data);
919
920        assert_eq!(slice.len(), 11);
921
922        let sub_slice = slice.slice(0..5);
923        let bytes = sub_slice.read_bytes().await.unwrap();
924        assert_eq!(bytes.as_slice(), b"hello");
925
926        let sub_slice2 = slice.slice(6..11);
927        let bytes2 = sub_slice2.read_bytes().await.unwrap();
928        assert_eq!(bytes2.as_slice(), b"world");
929    }
930
931    #[tokio::test]
932    async fn test_owned_bytes() {
933        let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
934
935        assert_eq!(bytes.len(), 5);
936        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
937
938        let sliced = bytes.slice(1..4);
939        assert_eq!(sliced.as_slice(), &[2, 3, 4]);
940
941        // Original unchanged
942        assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
943    }
944}