1use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14#[derive(Debug, Clone)]
16pub struct FileSlice {
17 data: OwnedBytes,
18 range: Range<u64>,
19}
20
21impl FileSlice {
22 pub fn new(data: OwnedBytes) -> Self {
23 let len = data.len() as u64;
24 Self {
25 data,
26 range: 0..len,
27 }
28 }
29
30 pub fn empty() -> Self {
31 Self {
32 data: OwnedBytes::empty(),
33 range: 0..0,
34 }
35 }
36
37 pub fn slice(&self, range: Range<u64>) -> Self {
38 let start = self.range.start + range.start;
39 let end = self.range.start + range.end;
40 Self {
41 data: self.data.clone(),
42 range: start..end,
43 }
44 }
45
46 pub fn len(&self) -> u64 {
47 self.range.end - self.range.start
48 }
49
50 pub fn is_empty(&self) -> bool {
51 self.range.start == self.range.end
52 }
53
54 pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
56 Ok(self
57 .data
58 .slice(self.range.start as usize..self.range.end as usize))
59 }
60
61 pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
63 let start = self.range.start + range.start;
64 let end = self.range.start + range.end;
65 if end > self.range.end {
66 return Err(io::Error::new(
67 io::ErrorKind::InvalidInput,
68 "Range out of bounds",
69 ));
70 }
71 Ok(self.data.slice(start as usize..end as usize))
72 }
73}
74
75#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg(not(target_arch = "wasm32"))]
79pub trait AsyncFileRead: Send + Sync {
80 fn len(&self) -> u64;
82
83 fn is_empty(&self) -> bool {
85 self.len() == 0
86 }
87
88 async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
90
91 async fn read_bytes(&self) -> io::Result<OwnedBytes> {
93 self.read_bytes_range(0..self.len()).await
94 }
95}
96
97#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
99#[cfg(target_arch = "wasm32")]
100pub trait AsyncFileRead {
101 fn len(&self) -> u64;
103
104 fn is_empty(&self) -> bool {
106 self.len() == 0
107 }
108
109 async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes>;
111
112 async fn read_bytes(&self) -> io::Result<OwnedBytes> {
114 self.read_bytes_range(0..self.len()).await
115 }
116}
117
118#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
119#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
120impl AsyncFileRead for FileSlice {
121 fn len(&self) -> u64 {
122 self.range.end - self.range.start
123 }
124
125 async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
126 let start = self.range.start + range.start;
127 let end = self.range.start + range.end;
128 if end > self.range.end {
129 return Err(io::Error::new(
130 io::ErrorKind::InvalidInput,
131 "Range out of bounds",
132 ));
133 }
134 Ok(self.data.slice(start as usize..end as usize))
135 }
136}
137
138#[cfg(not(target_arch = "wasm32"))]
140pub type RangeReadFn = Arc<
141 dyn Fn(
142 Range<u64>,
143 )
144 -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
145 + Send
146 + Sync,
147>;
148
149#[cfg(target_arch = "wasm32")]
150pub type RangeReadFn = Arc<
151 dyn Fn(
152 Range<u64>,
153 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
154>;
155
156pub struct LazyFileHandle {
159 file_size: u64,
161 read_fn: RangeReadFn,
163}
164
165impl std::fmt::Debug for LazyFileHandle {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 f.debug_struct("LazyFileHandle")
168 .field("file_size", &self.file_size)
169 .finish()
170 }
171}
172
173impl Clone for LazyFileHandle {
174 fn clone(&self) -> Self {
175 Self {
176 file_size: self.file_size,
177 read_fn: Arc::clone(&self.read_fn),
178 }
179 }
180}
181
182impl LazyFileHandle {
183 pub fn new(file_size: u64, read_fn: RangeReadFn) -> Self {
185 Self { file_size, read_fn }
186 }
187
188 pub fn file_size(&self) -> u64 {
190 self.file_size
191 }
192
193 pub fn slice(&self, range: Range<u64>) -> LazyFileSlice {
195 LazyFileSlice {
196 handle: self.clone(),
197 offset: range.start,
198 len: range.end - range.start,
199 }
200 }
201}
202
203#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
204#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
205impl AsyncFileRead for LazyFileHandle {
206 fn len(&self) -> u64 {
207 self.file_size
208 }
209
210 async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
211 if range.end > self.file_size {
212 return Err(io::Error::new(
213 io::ErrorKind::InvalidInput,
214 format!(
215 "Range {:?} out of bounds (file size: {})",
216 range, self.file_size
217 ),
218 ));
219 }
220 (self.read_fn)(range).await
221 }
222}
223
224#[derive(Clone)]
226pub struct LazyFileSlice {
227 handle: LazyFileHandle,
228 offset: u64,
229 len: u64,
230}
231
232impl std::fmt::Debug for LazyFileSlice {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("LazyFileSlice")
235 .field("offset", &self.offset)
236 .field("len", &self.len)
237 .finish()
238 }
239}
240
241impl LazyFileSlice {
242 pub fn from_handle_range(handle: &LazyFileHandle, offset: u64, len: u64) -> Self {
244 Self {
245 handle: handle.clone(),
246 offset,
247 len,
248 }
249 }
250
251 pub fn slice(&self, range: Range<u64>) -> Self {
253 Self {
254 handle: self.handle.clone(),
255 offset: self.offset + range.start,
256 len: range.end - range.start,
257 }
258 }
259}
260
261#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
262#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
263impl AsyncFileRead for LazyFileSlice {
264 fn len(&self) -> u64 {
265 self.len
266 }
267
268 async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
269 if range.end > self.len {
270 return Err(io::Error::new(
271 io::ErrorKind::InvalidInput,
272 format!("Range {:?} out of bounds (slice len: {})", range, self.len),
273 ));
274 }
275 let abs_start = self.offset + range.start;
276 let abs_end = self.offset + range.end;
277 self.handle.read_bytes_range(abs_start..abs_end).await
278 }
279}
280
281#[derive(Clone)]
283enum SharedBytes {
284 Vec(Arc<Vec<u8>>),
285 #[cfg(feature = "native")]
286 Mmap(Arc<memmap2::Mmap>),
287}
288
289impl SharedBytes {
290 #[inline]
291 fn as_bytes(&self) -> &[u8] {
292 match self {
293 SharedBytes::Vec(v) => v.as_slice(),
294 #[cfg(feature = "native")]
295 SharedBytes::Mmap(m) => m.as_ref(),
296 }
297 }
298}
299
300impl std::fmt::Debug for SharedBytes {
301 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
302 match self {
303 SharedBytes::Vec(v) => write!(f, "Vec(len={})", v.len()),
304 #[cfg(feature = "native")]
305 SharedBytes::Mmap(m) => write!(f, "Mmap(len={})", m.len()),
306 }
307 }
308}
309
310#[derive(Debug, Clone)]
316pub struct OwnedBytes {
317 data: SharedBytes,
318 range: Range<usize>,
319}
320
321impl OwnedBytes {
322 pub fn new(data: Vec<u8>) -> Self {
323 let len = data.len();
324 Self {
325 data: SharedBytes::Vec(Arc::new(data)),
326 range: 0..len,
327 }
328 }
329
330 pub fn empty() -> Self {
331 Self {
332 data: SharedBytes::Vec(Arc::new(Vec::new())),
333 range: 0..0,
334 }
335 }
336
337 pub(crate) fn from_arc_vec(data: Arc<Vec<u8>>, range: Range<usize>) -> Self {
340 Self {
341 data: SharedBytes::Vec(data),
342 range,
343 }
344 }
345
346 #[cfg(feature = "native")]
348 pub(crate) fn from_mmap(mmap: Arc<memmap2::Mmap>) -> Self {
349 let len = mmap.len();
350 Self {
351 data: SharedBytes::Mmap(mmap),
352 range: 0..len,
353 }
354 }
355
356 #[cfg(feature = "native")]
358 pub(crate) fn from_mmap_range(mmap: Arc<memmap2::Mmap>, range: Range<usize>) -> Self {
359 Self {
360 data: SharedBytes::Mmap(mmap),
361 range,
362 }
363 }
364
365 pub fn len(&self) -> usize {
366 self.range.len()
367 }
368
369 pub fn is_empty(&self) -> bool {
370 self.range.is_empty()
371 }
372
373 pub fn slice(&self, range: Range<usize>) -> Self {
374 let start = self.range.start + range.start;
375 let end = self.range.start + range.end;
376 Self {
377 data: self.data.clone(),
378 range: start..end,
379 }
380 }
381
382 pub fn as_slice(&self) -> &[u8] {
383 &self.data.as_bytes()[self.range.clone()]
384 }
385
386 pub fn to_vec(&self) -> Vec<u8> {
387 self.as_slice().to_vec()
388 }
389}
390
391impl AsRef<[u8]> for OwnedBytes {
392 fn as_ref(&self) -> &[u8] {
393 self.as_slice()
394 }
395}
396
397impl std::ops::Deref for OwnedBytes {
398 type Target = [u8];
399
400 fn deref(&self) -> &Self::Target {
401 self.as_slice()
402 }
403}
404
405#[cfg(not(target_arch = "wasm32"))]
407#[async_trait]
408pub trait Directory: Send + Sync + 'static {
409 async fn exists(&self, path: &Path) -> io::Result<bool>;
411
412 async fn file_size(&self, path: &Path) -> io::Result<u64>;
414
415 async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
417
418 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
420
421 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
423
424 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
427}
428
429#[cfg(target_arch = "wasm32")]
431#[async_trait(?Send)]
432pub trait Directory: 'static {
433 async fn exists(&self, path: &Path) -> io::Result<bool>;
435
436 async fn file_size(&self, path: &Path) -> io::Result<u64>;
438
439 async fn open_read(&self, path: &Path) -> io::Result<FileSlice>;
441
442 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
444
445 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
447
448 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle>;
451}
452
453pub trait StreamingWriter: io::Write + Send {
458 fn finish(self: Box<Self>) -> io::Result<()>;
460
461 fn bytes_written(&self) -> u64;
463}
464
465struct BufferedStreamingWriter {
468 path: PathBuf,
469 buffer: Vec<u8>,
470 files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
473}
474
475impl io::Write for BufferedStreamingWriter {
476 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
477 self.buffer.extend_from_slice(buf);
478 Ok(buf.len())
479 }
480
481 fn flush(&mut self) -> io::Result<()> {
482 Ok(())
483 }
484}
485
486impl StreamingWriter for BufferedStreamingWriter {
487 fn finish(self: Box<Self>) -> io::Result<()> {
488 self.files.write().insert(self.path, Arc::new(self.buffer));
489 Ok(())
490 }
491
492 fn bytes_written(&self) -> u64 {
493 self.buffer.len() as u64
494 }
495}
496
497#[cfg(feature = "native")]
501const FILE_STREAMING_BUF_SIZE: usize = 8 * 1024 * 1024;
502
503#[cfg(feature = "native")]
505pub(crate) struct FileStreamingWriter {
506 pub(crate) file: io::BufWriter<std::fs::File>,
507 pub(crate) written: u64,
508}
509
510#[cfg(feature = "native")]
511impl FileStreamingWriter {
512 pub(crate) fn new(file: std::fs::File) -> Self {
513 Self {
514 file: io::BufWriter::with_capacity(FILE_STREAMING_BUF_SIZE, file),
515 written: 0,
516 }
517 }
518}
519
520#[cfg(feature = "native")]
521impl io::Write for FileStreamingWriter {
522 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
523 let n = self.file.write(buf)?;
524 self.written += n as u64;
525 Ok(n)
526 }
527
528 fn flush(&mut self) -> io::Result<()> {
529 self.file.flush()
530 }
531}
532
533#[cfg(feature = "native")]
534impl StreamingWriter for FileStreamingWriter {
535 fn finish(self: Box<Self>) -> io::Result<()> {
536 let file = self.file.into_inner().map_err(|e| e.into_error())?;
537 file.sync_all()?;
538 Ok(())
539 }
540
541 fn bytes_written(&self) -> u64 {
542 self.written
543 }
544}
545
546#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
548#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
549pub trait DirectoryWriter: Directory {
550 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
552
553 async fn delete(&self, path: &Path) -> io::Result<()>;
555
556 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
558
559 async fn sync(&self) -> io::Result<()>;
561
562 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
565}
566
567#[derive(Debug, Default)]
569pub struct RamDirectory {
570 files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
571}
572
573impl Clone for RamDirectory {
574 fn clone(&self) -> Self {
575 Self {
576 files: Arc::clone(&self.files),
577 }
578 }
579}
580
581impl RamDirectory {
582 pub fn new() -> Self {
583 Self::default()
584 }
585}
586
587#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
588#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
589impl Directory for RamDirectory {
590 async fn exists(&self, path: &Path) -> io::Result<bool> {
591 Ok(self.files.read().contains_key(path))
592 }
593
594 async fn file_size(&self, path: &Path) -> io::Result<u64> {
595 self.files
596 .read()
597 .get(path)
598 .map(|data| data.len() as u64)
599 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
600 }
601
602 async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
603 let files = self.files.read();
604 let data = files
605 .get(path)
606 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
607
608 Ok(FileSlice::new(OwnedBytes::from_arc_vec(
609 Arc::clone(data),
610 0..data.len(),
611 )))
612 }
613
614 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
615 let files = self.files.read();
616 let data = files
617 .get(path)
618 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
619
620 let start = range.start as usize;
621 let end = range.end as usize;
622
623 if end > data.len() {
624 return Err(io::Error::new(
625 io::ErrorKind::InvalidInput,
626 "Range out of bounds",
627 ));
628 }
629
630 Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
631 }
632
633 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
634 let files = self.files.read();
635 Ok(files
636 .keys()
637 .filter(|p| p.starts_with(prefix))
638 .cloned()
639 .collect())
640 }
641
642 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
643 let files = Arc::clone(&self.files);
644 let path = path.to_path_buf();
645
646 let file_size = {
647 let files_guard = files.read();
648 files_guard
649 .get(&path)
650 .map(|data| data.len() as u64)
651 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?
652 };
653
654 let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
655 let files = Arc::clone(&files);
656 let path = path.clone();
657 Box::pin(async move {
658 let files_guard = files.read();
659 let data = files_guard
660 .get(&path)
661 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
662
663 let start = range.start as usize;
664 let end = range.end as usize;
665 if end > data.len() {
666 return Err(io::Error::new(
667 io::ErrorKind::InvalidInput,
668 "Range out of bounds",
669 ));
670 }
671 Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
672 })
673 });
674
675 Ok(LazyFileHandle::new(file_size, read_fn))
676 }
677}
678
679#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
680#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
681impl DirectoryWriter for RamDirectory {
682 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
683 self.files
684 .write()
685 .insert(path.to_path_buf(), Arc::new(data.to_vec()));
686 Ok(())
687 }
688
689 async fn delete(&self, path: &Path) -> io::Result<()> {
690 self.files.write().remove(path);
691 Ok(())
692 }
693
694 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
695 let mut files = self.files.write();
696 if let Some(data) = files.remove(from) {
697 files.insert(to.to_path_buf(), data);
698 }
699 Ok(())
700 }
701
702 async fn sync(&self) -> io::Result<()> {
703 Ok(())
704 }
705
706 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
707 Ok(Box::new(BufferedStreamingWriter {
708 path: path.to_path_buf(),
709 buffer: Vec::new(),
710 files: Arc::clone(&self.files),
711 }))
712 }
713}
714
715#[cfg(feature = "native")]
717#[derive(Debug, Clone)]
718pub struct FsDirectory {
719 root: PathBuf,
720}
721
722#[cfg(feature = "native")]
723impl FsDirectory {
724 pub fn new(root: impl AsRef<Path>) -> Self {
725 Self {
726 root: root.as_ref().to_path_buf(),
727 }
728 }
729
730 fn resolve(&self, path: &Path) -> PathBuf {
731 self.root.join(path)
732 }
733}
734
735#[cfg(feature = "native")]
736#[async_trait]
737impl Directory for FsDirectory {
738 async fn exists(&self, path: &Path) -> io::Result<bool> {
739 let full_path = self.resolve(path);
740 Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
741 }
742
743 async fn file_size(&self, path: &Path) -> io::Result<u64> {
744 let full_path = self.resolve(path);
745 let metadata = tokio::fs::metadata(&full_path).await?;
746 Ok(metadata.len())
747 }
748
749 async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
750 let full_path = self.resolve(path);
751 let data = tokio::fs::read(&full_path).await?;
752 Ok(FileSlice::new(OwnedBytes::new(data)))
753 }
754
755 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
756 use tokio::io::{AsyncReadExt, AsyncSeekExt};
757
758 let full_path = self.resolve(path);
759 let mut file = tokio::fs::File::open(&full_path).await?;
760
761 file.seek(std::io::SeekFrom::Start(range.start)).await?;
762
763 let len = (range.end - range.start) as usize;
764 let mut buffer = vec![0u8; len];
765 file.read_exact(&mut buffer).await?;
766
767 Ok(OwnedBytes::new(buffer))
768 }
769
770 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
771 let full_path = self.resolve(prefix);
772 let mut entries = tokio::fs::read_dir(&full_path).await?;
773 let mut files = Vec::new();
774
775 while let Some(entry) = entries.next_entry().await? {
776 if entry.file_type().await?.is_file() {
777 files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
778 }
779 }
780
781 Ok(files)
782 }
783
784 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
785 let full_path = self.resolve(path);
786 let metadata = tokio::fs::metadata(&full_path).await?;
787 let file_size = metadata.len();
788
789 let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
790 let full_path = full_path.clone();
791 Box::pin(async move {
792 use tokio::io::{AsyncReadExt, AsyncSeekExt};
793
794 let mut file = tokio::fs::File::open(&full_path).await?;
795 file.seek(std::io::SeekFrom::Start(range.start)).await?;
796
797 let len = (range.end - range.start) as usize;
798 let mut buffer = vec![0u8; len];
799 file.read_exact(&mut buffer).await?;
800
801 Ok(OwnedBytes::new(buffer))
802 })
803 });
804
805 Ok(LazyFileHandle::new(file_size, read_fn))
806 }
807}
808
809#[cfg(feature = "native")]
810#[async_trait]
811impl DirectoryWriter for FsDirectory {
812 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
813 let full_path = self.resolve(path);
814
815 if let Some(parent) = full_path.parent() {
817 tokio::fs::create_dir_all(parent).await?;
818 }
819
820 tokio::fs::write(&full_path, data).await
821 }
822
823 async fn delete(&self, path: &Path) -> io::Result<()> {
824 let full_path = self.resolve(path);
825 tokio::fs::remove_file(&full_path).await
826 }
827
828 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
829 let from_path = self.resolve(from);
830 let to_path = self.resolve(to);
831 tokio::fs::rename(&from_path, &to_path).await
832 }
833
834 async fn sync(&self) -> io::Result<()> {
835 let dir = std::fs::File::open(&self.root)?;
837 dir.sync_all()?;
838 Ok(())
839 }
840
841 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
842 let full_path = self.resolve(path);
843 if let Some(parent) = full_path.parent() {
844 tokio::fs::create_dir_all(parent).await?;
845 }
846 let file = std::fs::File::create(&full_path)?;
847 Ok(Box::new(FileStreamingWriter::new(file)))
848 }
849}
850
851pub struct CachingDirectory<D: Directory> {
853 inner: D,
854 cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
855 max_cached_bytes: usize,
856 current_bytes: RwLock<usize>,
857}
858
859impl<D: Directory> CachingDirectory<D> {
860 pub fn new(inner: D, max_cached_bytes: usize) -> Self {
861 Self {
862 inner,
863 cache: RwLock::new(HashMap::new()),
864 max_cached_bytes,
865 current_bytes: RwLock::new(0),
866 }
867 }
868
869 fn try_cache(&self, path: &Path, data: &[u8]) {
870 let mut current = self.current_bytes.write();
871 if *current + data.len() <= self.max_cached_bytes {
872 self.cache
873 .write()
874 .insert(path.to_path_buf(), Arc::new(data.to_vec()));
875 *current += data.len();
876 }
877 }
878}
879
880#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
881#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
882impl<D: Directory> Directory for CachingDirectory<D> {
883 async fn exists(&self, path: &Path) -> io::Result<bool> {
884 if self.cache.read().contains_key(path) {
885 return Ok(true);
886 }
887 self.inner.exists(path).await
888 }
889
890 async fn file_size(&self, path: &Path) -> io::Result<u64> {
891 if let Some(data) = self.cache.read().get(path) {
892 return Ok(data.len() as u64);
893 }
894 self.inner.file_size(path).await
895 }
896
897 async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
898 if let Some(data) = self.cache.read().get(path) {
900 return Ok(FileSlice::new(OwnedBytes::from_arc_vec(
901 Arc::clone(data),
902 0..data.len(),
903 )));
904 }
905
906 let slice = self.inner.open_read(path).await?;
908 let bytes = slice.read_bytes().await?;
909
910 self.try_cache(path, bytes.as_slice());
911
912 Ok(FileSlice::new(bytes))
913 }
914
915 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
916 if let Some(data) = self.cache.read().get(path) {
918 let start = range.start as usize;
919 let end = range.end as usize;
920 return Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end));
921 }
922
923 self.inner.read_range(path, range).await
924 }
925
926 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
927 self.inner.list_files(prefix).await
928 }
929
930 async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
931 self.inner.open_lazy(path).await
933 }
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939
940 #[tokio::test]
941 async fn test_ram_directory() {
942 let dir = RamDirectory::new();
943
944 dir.write(Path::new("test.txt"), b"hello world")
946 .await
947 .unwrap();
948
949 assert!(dir.exists(Path::new("test.txt")).await.unwrap());
951 assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
952
953 let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
955 let data = slice.read_bytes().await.unwrap();
956 assert_eq!(data.as_slice(), b"hello world");
957
958 let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
960 assert_eq!(range_data.as_slice(), b"hello");
961
962 dir.delete(Path::new("test.txt")).await.unwrap();
964 assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
965 }
966
967 #[tokio::test]
968 async fn test_file_slice() {
969 let data = OwnedBytes::new(b"hello world".to_vec());
970 let slice = FileSlice::new(data);
971
972 assert_eq!(slice.len(), 11);
973
974 let sub_slice = slice.slice(0..5);
975 let bytes = sub_slice.read_bytes().await.unwrap();
976 assert_eq!(bytes.as_slice(), b"hello");
977
978 let sub_slice2 = slice.slice(6..11);
979 let bytes2 = sub_slice2.read_bytes().await.unwrap();
980 assert_eq!(bytes2.as_slice(), b"world");
981 }
982
983 #[tokio::test]
984 async fn test_owned_bytes() {
985 let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
986
987 assert_eq!(bytes.len(), 5);
988 assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
989
990 let sliced = bytes.slice(1..4);
991 assert_eq!(sliced.as_slice(), &[2, 3, 4]);
992
993 assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
995 }
996}