1use async_trait::async_trait;
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::io;
10use std::ops::Range;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14#[cfg(not(target_arch = "wasm32"))]
16pub type RangeReadFn = Arc<
17 dyn Fn(
18 Range<u64>,
19 )
20 -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>> + Send>>
21 + Send
22 + Sync,
23>;
24
25#[cfg(target_arch = "wasm32")]
26pub type RangeReadFn = Arc<
27 dyn Fn(
28 Range<u64>,
29 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = io::Result<OwnedBytes>>>>,
30>;
31
32#[derive(Clone)]
40pub struct FileHandle {
41 inner: FileHandleInner,
42}
43
44#[derive(Clone)]
45enum FileHandleInner {
46 Inline {
48 data: OwnedBytes,
49 offset: u64,
50 len: u64,
51 },
52 Lazy {
54 read_fn: RangeReadFn,
55 offset: u64,
56 len: u64,
57 },
58}
59
60impl std::fmt::Debug for FileHandle {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match &self.inner {
63 FileHandleInner::Inline { len, offset, .. } => f
64 .debug_struct("FileHandle::Inline")
65 .field("offset", offset)
66 .field("len", len)
67 .finish(),
68 FileHandleInner::Lazy { len, offset, .. } => f
69 .debug_struct("FileHandle::Lazy")
70 .field("offset", offset)
71 .field("len", len)
72 .finish(),
73 }
74 }
75}
76
77impl FileHandle {
78 pub fn from_bytes(data: OwnedBytes) -> Self {
81 let len = data.len() as u64;
82 Self {
83 inner: FileHandleInner::Inline {
84 data,
85 offset: 0,
86 len,
87 },
88 }
89 }
90
91 pub fn empty() -> Self {
93 Self::from_bytes(OwnedBytes::empty())
94 }
95
96 pub fn lazy(len: u64, read_fn: RangeReadFn) -> Self {
99 Self {
100 inner: FileHandleInner::Lazy {
101 read_fn,
102 offset: 0,
103 len,
104 },
105 }
106 }
107
108 #[inline]
110 pub fn len(&self) -> u64 {
111 match &self.inner {
112 FileHandleInner::Inline { len, .. } => *len,
113 FileHandleInner::Lazy { len, .. } => *len,
114 }
115 }
116
117 #[inline]
119 pub fn is_empty(&self) -> bool {
120 self.len() == 0
121 }
122
123 #[inline]
125 pub fn is_sync(&self) -> bool {
126 matches!(&self.inner, FileHandleInner::Inline { .. })
127 }
128
129 pub fn slice(&self, range: Range<u64>) -> Self {
131 match &self.inner {
132 FileHandleInner::Inline { data, offset, len } => {
133 let new_offset = offset + range.start;
134 let new_len = range.end - range.start;
135 debug_assert!(
136 new_offset + new_len <= offset + len,
137 "slice out of bounds: {}+{} > {}+{}",
138 new_offset,
139 new_len,
140 offset,
141 len
142 );
143 Self {
144 inner: FileHandleInner::Inline {
145 data: data.clone(),
146 offset: new_offset,
147 len: new_len,
148 },
149 }
150 }
151 FileHandleInner::Lazy {
152 read_fn,
153 offset,
154 len,
155 } => {
156 let new_offset = offset + range.start;
157 let new_len = range.end - range.start;
158 debug_assert!(
159 new_offset + new_len <= offset + len,
160 "slice out of bounds: {}+{} > {}+{}",
161 new_offset,
162 new_len,
163 offset,
164 len
165 );
166 Self {
167 inner: FileHandleInner::Lazy {
168 read_fn: Arc::clone(read_fn),
169 offset: new_offset,
170 len: new_len,
171 },
172 }
173 }
174 }
175 }
176
177 pub async fn read_bytes_range(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
179 match &self.inner {
180 FileHandleInner::Inline { data, offset, len } => {
181 if range.end > *len {
182 return Err(io::Error::new(
183 io::ErrorKind::InvalidInput,
184 format!("Range {:?} out of bounds (len: {})", range, len),
185 ));
186 }
187 let start = (*offset + range.start) as usize;
188 let end = (*offset + range.end) as usize;
189 Ok(data.slice(start..end))
190 }
191 FileHandleInner::Lazy {
192 read_fn,
193 offset,
194 len,
195 } => {
196 if range.end > *len {
197 return Err(io::Error::new(
198 io::ErrorKind::InvalidInput,
199 format!("Range {:?} out of bounds (len: {})", range, len),
200 ));
201 }
202 let abs_start = offset + range.start;
203 let abs_end = offset + range.end;
204 (read_fn)(abs_start..abs_end).await
205 }
206 }
207 }
208
209 pub async fn read_bytes(&self) -> io::Result<OwnedBytes> {
211 self.read_bytes_range(0..self.len()).await
212 }
213
214 #[inline]
217 pub fn read_bytes_range_sync(&self, range: Range<u64>) -> io::Result<OwnedBytes> {
218 match &self.inner {
219 FileHandleInner::Inline { data, offset, len } => {
220 if range.end > *len {
221 return Err(io::Error::new(
222 io::ErrorKind::InvalidInput,
223 format!("Range {:?} out of bounds (len: {})", range, len),
224 ));
225 }
226 let start = (*offset + range.start) as usize;
227 let end = (*offset + range.end) as usize;
228 Ok(data.slice(start..end))
229 }
230 FileHandleInner::Lazy { .. } => Err(io::Error::new(
231 io::ErrorKind::Unsupported,
232 "Synchronous read not available on lazy file handle",
233 )),
234 }
235 }
236
237 #[inline]
239 pub fn read_bytes_sync(&self) -> io::Result<OwnedBytes> {
240 self.read_bytes_range_sync(0..self.len())
241 }
242}
243
244#[derive(Clone)]
246enum SharedBytes {
247 Vec(Arc<Vec<u8>>),
248 #[cfg(feature = "native")]
249 Mmap(Arc<memmap2::Mmap>),
250}
251
252impl SharedBytes {
253 #[inline]
254 fn as_bytes(&self) -> &[u8] {
255 match self {
256 SharedBytes::Vec(v) => v.as_slice(),
257 #[cfg(feature = "native")]
258 SharedBytes::Mmap(m) => m.as_ref(),
259 }
260 }
261}
262
263impl std::fmt::Debug for SharedBytes {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 match self {
266 SharedBytes::Vec(v) => write!(f, "Vec(len={})", v.len()),
267 #[cfg(feature = "native")]
268 SharedBytes::Mmap(m) => write!(f, "Mmap(len={})", m.len()),
269 }
270 }
271}
272
273#[derive(Debug, Clone)]
279pub struct OwnedBytes {
280 data: SharedBytes,
281 range: Range<usize>,
282}
283
284impl OwnedBytes {
285 pub fn new(data: Vec<u8>) -> Self {
286 let len = data.len();
287 Self {
288 data: SharedBytes::Vec(Arc::new(data)),
289 range: 0..len,
290 }
291 }
292
293 pub fn empty() -> Self {
294 Self {
295 data: SharedBytes::Vec(Arc::new(Vec::new())),
296 range: 0..0,
297 }
298 }
299
300 pub(crate) fn from_arc_vec(data: Arc<Vec<u8>>, range: Range<usize>) -> Self {
303 Self {
304 data: SharedBytes::Vec(data),
305 range,
306 }
307 }
308
309 #[cfg(feature = "native")]
311 pub(crate) fn from_mmap(mmap: Arc<memmap2::Mmap>) -> Self {
312 let len = mmap.len();
313 Self {
314 data: SharedBytes::Mmap(mmap),
315 range: 0..len,
316 }
317 }
318
319 #[cfg(feature = "native")]
321 pub(crate) fn from_mmap_range(mmap: Arc<memmap2::Mmap>, range: Range<usize>) -> Self {
322 Self {
323 data: SharedBytes::Mmap(mmap),
324 range,
325 }
326 }
327
328 pub fn len(&self) -> usize {
329 self.range.len()
330 }
331
332 pub fn is_empty(&self) -> bool {
333 self.range.is_empty()
334 }
335
336 pub fn slice(&self, range: Range<usize>) -> Self {
337 let start = self.range.start + range.start;
338 let end = self.range.start + range.end;
339 Self {
340 data: self.data.clone(),
341 range: start..end,
342 }
343 }
344
345 pub fn as_slice(&self) -> &[u8] {
346 &self.data.as_bytes()[self.range.clone()]
347 }
348
349 #[cfg(feature = "native")]
354 #[inline]
355 pub fn is_mmap(&self) -> bool {
356 matches!(self.data, SharedBytes::Mmap(_))
357 }
358
359 pub fn to_vec(&self) -> Vec<u8> {
360 self.as_slice().to_vec()
361 }
362}
363
364impl AsRef<[u8]> for OwnedBytes {
365 fn as_ref(&self) -> &[u8] {
366 self.as_slice()
367 }
368}
369
370impl std::ops::Deref for OwnedBytes {
371 type Target = [u8];
372
373 fn deref(&self) -> &Self::Target {
374 self.as_slice()
375 }
376}
377
378#[cfg(not(target_arch = "wasm32"))]
380#[async_trait]
381pub trait Directory: Send + Sync + 'static {
382 async fn exists(&self, path: &Path) -> io::Result<bool>;
384
385 async fn file_size(&self, path: &Path) -> io::Result<u64>;
387
388 async fn open_read(&self, path: &Path) -> io::Result<FileHandle>;
390
391 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
393
394 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
396
397 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle>;
401}
402
403#[cfg(target_arch = "wasm32")]
405#[async_trait(?Send)]
406pub trait Directory: 'static {
407 async fn exists(&self, path: &Path) -> io::Result<bool>;
409
410 async fn file_size(&self, path: &Path) -> io::Result<u64>;
412
413 async fn open_read(&self, path: &Path) -> io::Result<FileHandle>;
415
416 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes>;
418
419 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>>;
421
422 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle>;
424}
425
426pub trait StreamingWriter: io::Write + Send {
431 fn finish(self: Box<Self>) -> io::Result<()>;
433
434 fn bytes_written(&self) -> u64;
436}
437
438struct BufferedStreamingWriter {
441 path: PathBuf,
442 buffer: Vec<u8>,
443 files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
446}
447
448impl io::Write for BufferedStreamingWriter {
449 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
450 self.buffer.extend_from_slice(buf);
451 Ok(buf.len())
452 }
453
454 fn flush(&mut self) -> io::Result<()> {
455 Ok(())
456 }
457}
458
459impl StreamingWriter for BufferedStreamingWriter {
460 fn finish(self: Box<Self>) -> io::Result<()> {
461 self.files.write().insert(self.path, Arc::new(self.buffer));
462 Ok(())
463 }
464
465 fn bytes_written(&self) -> u64 {
466 self.buffer.len() as u64
467 }
468}
469
470#[cfg(feature = "native")]
474const FILE_STREAMING_BUF_SIZE: usize = 8 * 1024 * 1024;
475
476#[cfg(feature = "native")]
478pub(crate) struct FileStreamingWriter {
479 pub(crate) file: io::BufWriter<std::fs::File>,
480 pub(crate) written: u64,
481}
482
483#[cfg(feature = "native")]
484impl FileStreamingWriter {
485 pub(crate) fn new(file: std::fs::File) -> Self {
486 Self {
487 file: io::BufWriter::with_capacity(FILE_STREAMING_BUF_SIZE, file),
488 written: 0,
489 }
490 }
491}
492
493#[cfg(feature = "native")]
494impl io::Write for FileStreamingWriter {
495 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
496 let n = self.file.write(buf)?;
497 self.written += n as u64;
498 Ok(n)
499 }
500
501 fn flush(&mut self) -> io::Result<()> {
502 self.file.flush()
503 }
504}
505
506#[cfg(feature = "native")]
507impl StreamingWriter for FileStreamingWriter {
508 fn finish(self: Box<Self>) -> io::Result<()> {
509 let file = self.file.into_inner().map_err(|e| e.into_error())?;
510 file.sync_all()?;
511 Ok(())
512 }
513
514 fn bytes_written(&self) -> u64 {
515 self.written
516 }
517}
518
519#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
521#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
522pub trait DirectoryWriter: Directory {
523 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
525
526 async fn delete(&self, path: &Path) -> io::Result<()>;
528
529 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
531
532 async fn sync(&self) -> io::Result<()>;
534
535 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>>;
538}
539
540#[derive(Debug, Default)]
542pub struct RamDirectory {
543 files: Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>,
544}
545
546impl Clone for RamDirectory {
547 fn clone(&self) -> Self {
548 Self {
549 files: Arc::clone(&self.files),
550 }
551 }
552}
553
554impl RamDirectory {
555 pub fn new() -> Self {
556 Self::default()
557 }
558
559 pub fn list_files_sync(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
561 let files = self.files.read();
562 Ok(files
563 .keys()
564 .filter(|p| p.starts_with(prefix))
565 .cloned()
566 .collect())
567 }
568
569 pub fn read_file_sync(&self, path: &Path) -> io::Result<Vec<u8>> {
571 let files = self.files.read();
572 files
573 .get(path)
574 .map(|data| data.as_ref().clone())
575 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
576 }
577
578 pub fn write_sync(&self, path: &Path, data: &[u8]) -> io::Result<()> {
580 self.files
581 .write()
582 .insert(path.to_path_buf(), Arc::new(data.to_vec()));
583 Ok(())
584 }
585}
586
587#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
588#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
589impl Directory for RamDirectory {
590 async fn exists(&self, path: &Path) -> io::Result<bool> {
591 Ok(self.files.read().contains_key(path))
592 }
593
594 async fn file_size(&self, path: &Path) -> io::Result<u64> {
595 self.files
596 .read()
597 .get(path)
598 .map(|data| data.len() as u64)
599 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))
600 }
601
602 async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
603 let files = self.files.read();
604 let data = files
605 .get(path)
606 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
607
608 Ok(FileHandle::from_bytes(OwnedBytes::from_arc_vec(
609 Arc::clone(data),
610 0..data.len(),
611 )))
612 }
613
614 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
615 let files = self.files.read();
616 let data = files
617 .get(path)
618 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "File not found"))?;
619
620 let start = range.start as usize;
621 let end = range.end as usize;
622
623 if end > data.len() {
624 return Err(io::Error::new(
625 io::ErrorKind::InvalidInput,
626 "Range out of bounds",
627 ));
628 }
629
630 Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end))
631 }
632
633 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
634 let files = self.files.read();
635 Ok(files
636 .keys()
637 .filter(|p| p.starts_with(prefix))
638 .cloned()
639 .collect())
640 }
641
642 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
643 self.open_read(path).await
645 }
646}
647
648#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
649#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
650impl DirectoryWriter for RamDirectory {
651 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
652 self.files
653 .write()
654 .insert(path.to_path_buf(), Arc::new(data.to_vec()));
655 Ok(())
656 }
657
658 async fn delete(&self, path: &Path) -> io::Result<()> {
659 self.files.write().remove(path);
660 Ok(())
661 }
662
663 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
664 let mut files = self.files.write();
665 if let Some(data) = files.remove(from) {
666 files.insert(to.to_path_buf(), data);
667 }
668 Ok(())
669 }
670
671 async fn sync(&self) -> io::Result<()> {
672 Ok(())
673 }
674
675 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
676 Ok(Box::new(BufferedStreamingWriter {
677 path: path.to_path_buf(),
678 buffer: Vec::new(),
679 files: Arc::clone(&self.files),
680 }))
681 }
682}
683
684#[cfg(feature = "native")]
686#[derive(Debug, Clone)]
687pub struct FsDirectory {
688 root: PathBuf,
689}
690
691#[cfg(feature = "native")]
692impl FsDirectory {
693 pub fn new(root: impl AsRef<Path>) -> Self {
694 Self {
695 root: root.as_ref().to_path_buf(),
696 }
697 }
698
699 fn resolve(&self, path: &Path) -> PathBuf {
700 self.root.join(path)
701 }
702}
703
704#[cfg(feature = "native")]
705#[async_trait]
706impl Directory for FsDirectory {
707 async fn exists(&self, path: &Path) -> io::Result<bool> {
708 let full_path = self.resolve(path);
709 Ok(tokio::fs::try_exists(&full_path).await.unwrap_or(false))
710 }
711
712 async fn file_size(&self, path: &Path) -> io::Result<u64> {
713 let full_path = self.resolve(path);
714 let metadata = tokio::fs::metadata(&full_path).await?;
715 Ok(metadata.len())
716 }
717
718 async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
719 let full_path = self.resolve(path);
720 let data = tokio::fs::read(&full_path).await?;
721 Ok(FileHandle::from_bytes(OwnedBytes::new(data)))
722 }
723
724 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
725 use tokio::io::{AsyncReadExt, AsyncSeekExt};
726
727 let full_path = self.resolve(path);
728 let mut file = tokio::fs::File::open(&full_path).await?;
729
730 file.seek(std::io::SeekFrom::Start(range.start)).await?;
731
732 let len = (range.end - range.start) as usize;
733 let mut buffer = vec![0u8; len];
734 file.read_exact(&mut buffer).await?;
735
736 Ok(OwnedBytes::new(buffer))
737 }
738
739 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
740 let full_path = self.resolve(prefix);
741 let mut entries = tokio::fs::read_dir(&full_path).await?;
742 let mut files = Vec::new();
743
744 while let Some(entry) = entries.next_entry().await? {
745 if entry.file_type().await?.is_file() {
746 files.push(entry.path().strip_prefix(&self.root).unwrap().to_path_buf());
747 }
748 }
749
750 Ok(files)
751 }
752
753 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
754 let full_path = self.resolve(path);
755 let metadata = tokio::fs::metadata(&full_path).await?;
756 let file_size = metadata.len();
757
758 let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
759 let full_path = full_path.clone();
760 Box::pin(async move {
761 use tokio::io::{AsyncReadExt, AsyncSeekExt};
762
763 let mut file = tokio::fs::File::open(&full_path).await?;
764 file.seek(std::io::SeekFrom::Start(range.start)).await?;
765
766 let len = (range.end - range.start) as usize;
767 let mut buffer = vec![0u8; len];
768 file.read_exact(&mut buffer).await?;
769
770 Ok(OwnedBytes::new(buffer))
771 })
772 });
773
774 Ok(FileHandle::lazy(file_size, read_fn))
775 }
776}
777
778#[cfg(feature = "native")]
779#[async_trait]
780impl DirectoryWriter for FsDirectory {
781 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()> {
782 let full_path = self.resolve(path);
783
784 if let Some(parent) = full_path.parent() {
786 tokio::fs::create_dir_all(parent).await?;
787 }
788
789 tokio::fs::write(&full_path, data).await
790 }
791
792 async fn delete(&self, path: &Path) -> io::Result<()> {
793 let full_path = self.resolve(path);
794 tokio::fs::remove_file(&full_path).await
795 }
796
797 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()> {
798 let from_path = self.resolve(from);
799 let to_path = self.resolve(to);
800 tokio::fs::rename(&from_path, &to_path).await
801 }
802
803 async fn sync(&self) -> io::Result<()> {
804 let dir = std::fs::File::open(&self.root)?;
806 dir.sync_all()?;
807 Ok(())
808 }
809
810 async fn streaming_writer(&self, path: &Path) -> io::Result<Box<dyn StreamingWriter>> {
811 let full_path = self.resolve(path);
812 if let Some(parent) = full_path.parent() {
813 tokio::fs::create_dir_all(parent).await?;
814 }
815 let file = std::fs::File::create(&full_path)?;
816 Ok(Box::new(FileStreamingWriter::new(file)))
817 }
818}
819
820pub struct CachingDirectory<D: Directory> {
822 inner: D,
823 cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
824 max_cached_bytes: usize,
825 current_bytes: RwLock<usize>,
826}
827
828impl<D: Directory> CachingDirectory<D> {
829 pub fn new(inner: D, max_cached_bytes: usize) -> Self {
830 Self {
831 inner,
832 cache: RwLock::new(HashMap::new()),
833 max_cached_bytes,
834 current_bytes: RwLock::new(0),
835 }
836 }
837
838 fn try_cache(&self, path: &Path, data: &[u8]) {
839 let mut current = self.current_bytes.write();
840 if *current + data.len() <= self.max_cached_bytes {
841 self.cache
842 .write()
843 .insert(path.to_path_buf(), Arc::new(data.to_vec()));
844 *current += data.len();
845 }
846 }
847}
848
849#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
850#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
851impl<D: Directory> Directory for CachingDirectory<D> {
852 async fn exists(&self, path: &Path) -> io::Result<bool> {
853 if self.cache.read().contains_key(path) {
854 return Ok(true);
855 }
856 self.inner.exists(path).await
857 }
858
859 async fn file_size(&self, path: &Path) -> io::Result<u64> {
860 if let Some(data) = self.cache.read().get(path) {
861 return Ok(data.len() as u64);
862 }
863 self.inner.file_size(path).await
864 }
865
866 async fn open_read(&self, path: &Path) -> io::Result<FileHandle> {
867 if let Some(data) = self.cache.read().get(path) {
869 return Ok(FileHandle::from_bytes(OwnedBytes::from_arc_vec(
870 Arc::clone(data),
871 0..data.len(),
872 )));
873 }
874
875 let handle = self.inner.open_read(path).await?;
877 let bytes = handle.read_bytes().await?;
878
879 self.try_cache(path, bytes.as_slice());
880
881 Ok(FileHandle::from_bytes(bytes))
882 }
883
884 async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
885 if let Some(data) = self.cache.read().get(path) {
887 let start = range.start as usize;
888 let end = range.end as usize;
889 return Ok(OwnedBytes::from_arc_vec(Arc::clone(data), start..end));
890 }
891
892 self.inner.read_range(path, range).await
893 }
894
895 async fn list_files(&self, prefix: &Path) -> io::Result<Vec<PathBuf>> {
896 self.inner.list_files(prefix).await
897 }
898
899 async fn open_lazy(&self, path: &Path) -> io::Result<FileHandle> {
900 self.inner.open_lazy(path).await
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908
909 #[tokio::test]
910 async fn test_ram_directory() {
911 let dir = RamDirectory::new();
912
913 dir.write(Path::new("test.txt"), b"hello world")
915 .await
916 .unwrap();
917
918 assert!(dir.exists(Path::new("test.txt")).await.unwrap());
920 assert!(!dir.exists(Path::new("nonexistent.txt")).await.unwrap());
921
922 let slice = dir.open_read(Path::new("test.txt")).await.unwrap();
924 let data = slice.read_bytes().await.unwrap();
925 assert_eq!(data.as_slice(), b"hello world");
926
927 let range_data = dir.read_range(Path::new("test.txt"), 0..5).await.unwrap();
929 assert_eq!(range_data.as_slice(), b"hello");
930
931 dir.delete(Path::new("test.txt")).await.unwrap();
933 assert!(!dir.exists(Path::new("test.txt")).await.unwrap());
934 }
935
936 #[tokio::test]
937 async fn test_file_handle() {
938 let data = OwnedBytes::new(b"hello world".to_vec());
939 let handle = FileHandle::from_bytes(data);
940
941 assert_eq!(handle.len(), 11);
942 assert!(handle.is_sync());
943
944 let sub = handle.slice(0..5);
945 let bytes = sub.read_bytes().await.unwrap();
946 assert_eq!(bytes.as_slice(), b"hello");
947
948 let sub2 = handle.slice(6..11);
949 let bytes2 = sub2.read_bytes().await.unwrap();
950 assert_eq!(bytes2.as_slice(), b"world");
951
952 let sync_bytes = handle.read_bytes_range_sync(0..5).unwrap();
954 assert_eq!(sync_bytes.as_slice(), b"hello");
955 }
956
957 #[tokio::test]
958 async fn test_owned_bytes() {
959 let bytes = OwnedBytes::new(vec![1, 2, 3, 4, 5]);
960
961 assert_eq!(bytes.len(), 5);
962 assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
963
964 let sliced = bytes.slice(1..4);
965 assert_eq!(sliced.as_slice(), &[2, 3, 4]);
966
967 assert_eq!(bytes.as_slice(), &[1, 2, 3, 4, 5]);
969 }
970}