1use async_trait::async_trait;
2use log::warn;
3use once_cell::sync::OnceCell;
4use std::ffi::OsString;
5use std::fmt;
6use std::io::{self, ErrorKind, Read, Seek, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, RwLock};
9use std::time::SystemTime;
10
11#[cfg(not(target_arch = "wasm32"))]
12mod native;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod remote;
15#[cfg(not(target_arch = "wasm32"))]
16pub mod sandbox;
17#[cfg(target_arch = "wasm32")]
18mod wasm;
19
20#[cfg(not(target_arch = "wasm32"))]
21pub use native::NativeFsProvider;
22#[cfg(not(target_arch = "wasm32"))]
23pub use remote::{RemoteFsConfig, RemoteFsProvider};
24#[cfg(not(target_arch = "wasm32"))]
25pub use sandbox::SandboxFsProvider;
26#[cfg(target_arch = "wasm32")]
27pub use wasm::PlaceholderFsProvider;
28
29pub mod data_contract;
30
31use data_contract::{
32 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
33};
34
35#[async_trait(?Send)]
36pub trait FileHandle: Read + Write + Seek + Send + Sync {
37 async fn flush_async(&mut self) -> io::Result<()> {
38 self.flush()
39 }
40}
41
42#[async_trait(?Send)]
43impl FileHandle for std::fs::File {}
44
45#[derive(Clone, Debug, Default)]
46pub struct OpenFlags {
47 pub read: bool,
48 pub write: bool,
49 pub append: bool,
50 pub truncate: bool,
51 pub create: bool,
52 pub create_new: bool,
53}
54
55#[derive(Clone, Debug)]
56pub struct OpenOptions {
57 flags: OpenFlags,
58}
59
60impl OpenOptions {
61 pub fn new() -> Self {
62 Self {
63 flags: OpenFlags::default(),
64 }
65 }
66
67 pub fn read(&mut self, value: bool) -> &mut Self {
68 self.flags.read = value;
69 self
70 }
71
72 pub fn write(&mut self, value: bool) -> &mut Self {
73 self.flags.write = value;
74 self
75 }
76
77 pub fn append(&mut self, value: bool) -> &mut Self {
78 self.flags.append = value;
79 self
80 }
81
82 pub fn truncate(&mut self, value: bool) -> &mut Self {
83 self.flags.truncate = value;
84 self
85 }
86
87 pub fn create(&mut self, value: bool) -> &mut Self {
88 self.flags.create = value;
89 self
90 }
91
92 pub fn create_new(&mut self, value: bool) -> &mut Self {
93 self.flags.create_new = value;
94 self
95 }
96
97 pub fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
98 let resolved = resolve_path(path.as_ref());
99 with_provider(|provider| provider.open(&resolved, &self.flags)).map(File::from_handle)
100 }
101
102 pub async fn open_async(&self, path: impl AsRef<Path>) -> io::Result<File> {
103 let resolved = resolve_path(path.as_ref());
104 let provider = current_provider();
105 provider
106 .open_async(&resolved, &self.flags)
107 .await
108 .map(File::from_handle)
109 }
110
111 pub fn flags(&self) -> &OpenFlags {
112 &self.flags
113 }
114}
115
116impl Default for OpenOptions {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[derive(Clone, Copy, Debug, PartialEq, Eq)]
123pub enum FsFileType {
124 Directory,
125 File,
126 Symlink,
127 Other,
128 Unknown,
129}
130
131#[derive(Clone, Debug)]
132pub struct FsMetadata {
133 file_type: FsFileType,
134 len: u64,
135 modified: Option<SystemTime>,
136 readonly: bool,
137 hash: Option<String>,
138}
139
140impl FsMetadata {
141 pub fn new(
142 file_type: FsFileType,
143 len: u64,
144 modified: Option<SystemTime>,
145 readonly: bool,
146 ) -> Self {
147 Self {
148 file_type,
149 len,
150 modified,
151 readonly,
152 hash: None,
153 }
154 }
155
156 pub fn new_with_hash(
157 file_type: FsFileType,
158 len: u64,
159 modified: Option<SystemTime>,
160 readonly: bool,
161 hash: Option<String>,
162 ) -> Self {
163 Self {
164 file_type,
165 len,
166 modified,
167 readonly,
168 hash,
169 }
170 }
171
172 pub fn file_type(&self) -> FsFileType {
173 self.file_type
174 }
175
176 pub fn is_dir(&self) -> bool {
177 matches!(self.file_type, FsFileType::Directory)
178 }
179
180 pub fn is_file(&self) -> bool {
181 matches!(self.file_type, FsFileType::File)
182 }
183
184 pub fn is_symlink(&self) -> bool {
185 matches!(self.file_type, FsFileType::Symlink)
186 }
187
188 pub fn len(&self) -> u64 {
189 self.len
190 }
191
192 pub fn hash(&self) -> Option<&str> {
193 self.hash.as_deref()
194 }
195
196 pub fn is_empty(&self) -> bool {
197 self.len == 0
198 }
199
200 pub fn modified(&self) -> Option<SystemTime> {
201 self.modified
202 }
203
204 pub fn is_readonly(&self) -> bool {
205 self.readonly
206 }
207}
208
209#[derive(Clone, Debug)]
210pub struct DirEntry {
211 path: PathBuf,
212 file_name: OsString,
213 file_type: FsFileType,
214}
215
216#[derive(Clone, Debug)]
217pub struct ReadManyEntry {
218 path: PathBuf,
219 bytes: Option<Vec<u8>>,
220 error: Option<String>,
221}
222
223impl ReadManyEntry {
224 pub fn new(path: PathBuf, bytes: Option<Vec<u8>>) -> Self {
225 Self {
226 path,
227 bytes,
228 error: None,
229 }
230 }
231
232 pub fn with_error(path: PathBuf, error: String) -> Self {
233 Self {
234 path,
235 bytes: None,
236 error: Some(error),
237 }
238 }
239
240 pub fn path(&self) -> &Path {
241 &self.path
242 }
243
244 pub fn bytes(&self) -> Option<&[u8]> {
245 self.bytes.as_deref()
246 }
247
248 pub fn into_bytes(self) -> Option<Vec<u8>> {
249 self.bytes
250 }
251
252 pub fn error(&self) -> Option<&str> {
253 self.error.as_deref()
254 }
255}
256
257impl DirEntry {
258 pub fn new(path: PathBuf, file_name: OsString, file_type: FsFileType) -> Self {
259 Self {
260 path,
261 file_name,
262 file_type,
263 }
264 }
265
266 pub fn path(&self) -> &Path {
267 &self.path
268 }
269
270 pub fn file_name(&self) -> &OsString {
271 &self.file_name
272 }
273
274 pub fn file_type(&self) -> FsFileType {
275 self.file_type
276 }
277
278 pub fn is_dir(&self) -> bool {
279 matches!(self.file_type, FsFileType::Directory)
280 }
281}
282
283#[async_trait(?Send)]
284pub trait FsProvider: Send + Sync + 'static {
285 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>>;
286 async fn open_async(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
287 self.open(path, flags)
288 }
289 async fn read(&self, path: &Path) -> io::Result<Vec<u8>>;
290 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
291 async fn remove_file(&self, path: &Path) -> io::Result<()>;
292 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata>;
293 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata>;
294 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>>;
295 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf>;
296 async fn create_dir(&self, path: &Path) -> io::Result<()>;
297 async fn create_dir_all(&self, path: &Path) -> io::Result<()>;
298 async fn remove_dir(&self, path: &Path) -> io::Result<()>;
299 async fn remove_dir_all(&self, path: &Path) -> io::Result<()>;
300 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
301 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()>;
302
303 async fn read_many(&self, paths: &[PathBuf]) -> io::Result<Vec<ReadManyEntry>> {
304 let mut entries = Vec::with_capacity(paths.len());
305 for path in paths {
306 let entry = match self.read(path).await {
307 Ok(payload) => ReadManyEntry::new(path.clone(), Some(payload)),
308 Err(error) => {
309 warn!(
310 "fs.read_many.miss path={} kind={:?} error={}",
311 path.to_string_lossy(),
312 error.kind(),
313 error
314 );
315 ReadManyEntry::with_error(
316 path.clone(),
317 format!("kind={:?}; error={}", error.kind(), error),
318 )
319 }
320 };
321 entries.push(entry);
322 }
323 Ok(entries)
324 }
325
326 async fn data_manifest_descriptor(
327 &self,
328 _request: &DataManifestRequest,
329 ) -> io::Result<DataManifestDescriptor> {
330 Err(io::Error::new(
331 ErrorKind::Unsupported,
332 "data manifest descriptor is unsupported by this provider",
333 ))
334 }
335
336 async fn data_chunk_upload_targets(
337 &self,
338 _request: &DataChunkUploadRequest,
339 ) -> io::Result<Vec<DataChunkUploadTarget>> {
340 Err(io::Error::new(
341 ErrorKind::Unsupported,
342 "data chunk upload targets are unsupported by this provider",
343 ))
344 }
345
346 async fn data_upload_chunk(
347 &self,
348 _target: &DataChunkUploadTarget,
349 _data: &[u8],
350 ) -> io::Result<()> {
351 Err(io::Error::new(
352 ErrorKind::Unsupported,
353 "data chunk upload is unsupported by this provider",
354 ))
355 }
356}
357
358pub struct File {
359 inner: Box<dyn FileHandle>,
360}
361
362impl File {
363 fn from_handle(handle: Box<dyn FileHandle>) -> Self {
364 Self { inner: handle }
365 }
366
367 pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
368 let mut opts = OpenOptions::new();
369 opts.read(true);
370 opts.open(path)
371 }
372
373 pub async fn open_async(path: impl AsRef<Path>) -> io::Result<Self> {
374 let mut opts = OpenOptions::new();
375 opts.read(true);
376 opts.open_async(path).await
377 }
378
379 pub fn create(path: impl AsRef<Path>) -> io::Result<Self> {
380 let mut opts = OpenOptions::new();
381 opts.write(true).create(true).truncate(true);
382 opts.open(path)
383 }
384
385 pub async fn create_async(path: impl AsRef<Path>) -> io::Result<Self> {
386 let mut opts = OpenOptions::new();
387 opts.write(true).create(true).truncate(true);
388 opts.open_async(path).await
389 }
390
391 pub async fn flush_async(&mut self) -> io::Result<()> {
392 self.inner.flush_async().await
393 }
394}
395
396impl fmt::Debug for File {
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 f.debug_struct("File").finish_non_exhaustive()
399 }
400}
401
402impl Read for File {
403 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
404 self.inner.read(buf)
405 }
406}
407
408impl Write for File {
409 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
410 self.inner.write(buf)
411 }
412
413 fn flush(&mut self) -> io::Result<()> {
414 self.inner.flush()
415 }
416}
417
418impl Seek for File {
419 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
420 self.inner.seek(pos)
421 }
422}
423
424static PROVIDER: OnceCell<RwLock<Arc<dyn FsProvider>>> = OnceCell::new();
425#[cfg(target_arch = "wasm32")]
426static CURRENT_DIR: OnceCell<RwLock<PathBuf>> = OnceCell::new();
427
428fn provider_lock() -> &'static RwLock<Arc<dyn FsProvider>> {
429 PROVIDER.get_or_init(|| RwLock::new(default_provider()))
430}
431
432#[cfg(target_arch = "wasm32")]
433fn current_dir_lock() -> &'static RwLock<PathBuf> {
434 CURRENT_DIR.get_or_init(|| RwLock::new(PathBuf::from("/")))
435}
436
437fn with_provider<T>(f: impl FnOnce(&dyn FsProvider) -> T) -> T {
438 let guard = provider_lock()
439 .read()
440 .expect("filesystem provider lock poisoned");
441 f(&**guard)
442}
443
444fn resolve_path(path: &Path) -> PathBuf {
445 #[cfg(target_arch = "wasm32")]
446 {
447 if path.is_absolute() {
448 return path.to_path_buf();
449 }
450 if let Ok(base) = current_dir() {
451 return base.join(path);
452 }
453 PathBuf::from("/").join(path)
454 }
455 #[cfg(not(target_arch = "wasm32"))]
456 {
457 path.to_path_buf()
458 }
459}
460
461pub fn set_provider(provider: Arc<dyn FsProvider>) {
462 let mut guard = provider_lock()
463 .write()
464 .expect("filesystem provider lock poisoned");
465 *guard = provider;
466}
467
468pub fn replace_provider(provider: Arc<dyn FsProvider>) -> ProviderGuard {
472 let mut guard = provider_lock()
473 .write()
474 .expect("filesystem provider lock poisoned");
475 let previous = guard.clone();
476 *guard = provider;
477 ProviderGuard { previous }
478}
479
480pub fn with_provider_override<R>(provider: Arc<dyn FsProvider>, f: impl FnOnce() -> R) -> R {
483 let guard = replace_provider(provider);
484 let result = f();
485 drop(guard);
486 result
487}
488
489pub fn current_provider() -> Arc<dyn FsProvider> {
491 provider_lock()
492 .read()
493 .expect("filesystem provider lock poisoned")
494 .clone()
495}
496
497pub fn current_dir() -> io::Result<PathBuf> {
498 #[cfg(target_arch = "wasm32")]
499 {
500 return Ok(current_dir_lock()
501 .read()
502 .expect("filesystem current dir lock poisoned")
503 .clone());
504 }
505 #[cfg(not(target_arch = "wasm32"))]
506 {
507 std::env::current_dir()
508 }
509}
510
511pub fn set_current_dir(path: impl AsRef<Path>) -> io::Result<()> {
512 #[cfg(target_arch = "wasm32")]
513 {
514 let mut target = PathBuf::from(path.as_ref());
515 if !target.is_absolute() {
516 let base = current_dir()?;
517 target = base.join(target);
518 }
519 let canonical =
520 futures::executor::block_on(canonicalize_async(&target)).unwrap_or(target.clone());
521 let metadata = futures::executor::block_on(metadata_async(&canonical))?;
522 if !metadata.is_dir() {
523 return Err(io::Error::new(
524 ErrorKind::NotFound,
525 format!("Not a directory: {}", canonical.display()),
526 ));
527 }
528 let mut guard = current_dir_lock()
529 .write()
530 .expect("filesystem current dir lock poisoned");
531 *guard = canonical;
532 Ok(())
533 }
534 #[cfg(not(target_arch = "wasm32"))]
535 {
536 std::env::set_current_dir(path)
537 }
538}
539
540pub struct ProviderGuard {
541 previous: Arc<dyn FsProvider>,
542}
543
544impl Drop for ProviderGuard {
545 fn drop(&mut self) {
546 set_provider(self.previous.clone());
547 }
548}
549
550pub async fn read_many_async(paths: &[PathBuf]) -> io::Result<Vec<ReadManyEntry>> {
551 let resolved = paths
552 .iter()
553 .map(|path| resolve_path(path.as_path()))
554 .collect::<Vec<_>>();
555 let provider = current_provider();
556 provider.read_many(&resolved).await
557}
558
559pub async fn read_async(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
560 let resolved = resolve_path(path.as_ref());
561 let provider = current_provider();
562 provider.read(&resolved).await
563}
564
565pub async fn read_to_string_async(path: impl AsRef<Path>) -> io::Result<String> {
566 let bytes = read_async(path).await?;
567 String::from_utf8(bytes).map_err(|err| io::Error::new(ErrorKind::InvalidData, err.utf8_error()))
568}
569
570pub async fn write_async(path: impl AsRef<Path>, data: impl AsRef<[u8]>) -> io::Result<()> {
571 let resolved = resolve_path(path.as_ref());
572 let provider = current_provider();
573 provider.write(&resolved, data.as_ref()).await
574}
575
576pub async fn remove_file_async(path: impl AsRef<Path>) -> io::Result<()> {
577 let resolved = resolve_path(path.as_ref());
578 let provider = current_provider();
579 provider.remove_file(&resolved).await
580}
581
582pub async fn metadata_async(path: impl AsRef<Path>) -> io::Result<FsMetadata> {
583 let resolved = resolve_path(path.as_ref());
584 let provider = current_provider();
585 provider.metadata(&resolved).await
586}
587
588pub async fn symlink_metadata_async(path: impl AsRef<Path>) -> io::Result<FsMetadata> {
589 let resolved = resolve_path(path.as_ref());
590 let provider = current_provider();
591 provider.symlink_metadata(&resolved).await
592}
593
594pub async fn read_dir_async(path: impl AsRef<Path>) -> io::Result<Vec<DirEntry>> {
595 let resolved = resolve_path(path.as_ref());
596 let provider = current_provider();
597 provider.read_dir(&resolved).await
598}
599
600pub async fn canonicalize_async(path: impl AsRef<Path>) -> io::Result<PathBuf> {
601 let resolved = resolve_path(path.as_ref());
602 let provider = current_provider();
603 provider.canonicalize(&resolved).await
604}
605
606pub async fn create_dir_async(path: impl AsRef<Path>) -> io::Result<()> {
607 let resolved = resolve_path(path.as_ref());
608 let provider = current_provider();
609 provider.create_dir(&resolved).await
610}
611
612pub async fn create_dir_all_async(path: impl AsRef<Path>) -> io::Result<()> {
613 let resolved = resolve_path(path.as_ref());
614 let provider = current_provider();
615 provider.create_dir_all(&resolved).await
616}
617
618pub async fn remove_dir_async(path: impl AsRef<Path>) -> io::Result<()> {
619 let resolved = resolve_path(path.as_ref());
620 let provider = current_provider();
621 provider.remove_dir(&resolved).await
622}
623
624pub async fn remove_dir_all_async(path: impl AsRef<Path>) -> io::Result<()> {
625 let resolved = resolve_path(path.as_ref());
626 let provider = current_provider();
627 provider.remove_dir_all(&resolved).await
628}
629
630pub async fn rename_async(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
631 let resolved_from = resolve_path(from.as_ref());
632 let resolved_to = resolve_path(to.as_ref());
633 let provider = current_provider();
634 provider.rename(&resolved_from, &resolved_to).await
635}
636
637pub async fn set_readonly_async(path: impl AsRef<Path>, readonly: bool) -> io::Result<()> {
638 let resolved = resolve_path(path.as_ref());
639 let provider = current_provider();
640 provider.set_readonly(&resolved, readonly).await
641}
642
643pub async fn data_manifest_descriptor_async(
644 request: &DataManifestRequest,
645) -> io::Result<DataManifestDescriptor> {
646 let provider = current_provider();
647 provider.data_manifest_descriptor(request).await
648}
649
650pub async fn data_chunk_upload_targets_async(
651 request: &DataChunkUploadRequest,
652) -> io::Result<Vec<DataChunkUploadTarget>> {
653 let provider = current_provider();
654 provider.data_chunk_upload_targets(request).await
655}
656
657pub async fn data_upload_chunk_async(
658 target: &DataChunkUploadTarget,
659 data: &[u8],
660) -> io::Result<()> {
661 let provider = current_provider();
662 provider.data_upload_chunk(target, data).await
663}
664
665pub fn copy_file(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<u64> {
668 let mut reader = OpenOptions::new().read(true).open(from.as_ref())?;
669 let mut writer = OpenOptions::new()
670 .write(true)
671 .create(true)
672 .truncate(true)
673 .open(to.as_ref())?;
674 io::copy(&mut reader, &mut writer)
675}
676
677fn default_provider() -> Arc<dyn FsProvider> {
678 #[cfg(not(target_arch = "wasm32"))]
679 {
680 Arc::new(NativeFsProvider)
681 }
682 #[cfg(target_arch = "wasm32")]
683 {
684 Arc::new(PlaceholderFsProvider)
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691 use once_cell::sync::Lazy;
692 use std::io::{Read, Seek, SeekFrom, Write};
693 use std::sync::Mutex;
694 use tempfile::tempdir;
695
696 static TEST_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
697
698 struct UnsupportedProvider;
699
700 struct AsyncOpenProvider {
701 opened_async: Arc<Mutex<bool>>,
702 flushed_async: Arc<Mutex<bool>>,
703 }
704
705 struct AsyncTestHandle {
706 cursor: usize,
707 data: Vec<u8>,
708 flushed_async: Arc<Mutex<bool>>,
709 }
710
711 impl Read for AsyncTestHandle {
712 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
713 let remaining = self.data.len().saturating_sub(self.cursor);
714 let to_read = remaining.min(buf.len());
715 buf[..to_read].copy_from_slice(&self.data[self.cursor..self.cursor + to_read]);
716 self.cursor += to_read;
717 Ok(to_read)
718 }
719 }
720
721 impl Write for AsyncTestHandle {
722 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
723 let end = self.cursor + buf.len();
724 if end > self.data.len() {
725 self.data.resize(end, 0);
726 }
727 self.data[self.cursor..end].copy_from_slice(buf);
728 self.cursor = end;
729 Ok(buf.len())
730 }
731
732 fn flush(&mut self) -> io::Result<()> {
733 Ok(())
734 }
735 }
736
737 impl Seek for AsyncTestHandle {
738 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
739 let next = match pos {
740 SeekFrom::Start(offset) => offset as i64,
741 SeekFrom::End(offset) => self.data.len() as i64 + offset,
742 SeekFrom::Current(offset) => self.cursor as i64 + offset,
743 };
744 if next < 0 {
745 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
746 }
747 self.cursor = next as usize;
748 Ok(self.cursor as u64)
749 }
750 }
751
752 #[async_trait(?Send)]
753 impl FileHandle for AsyncTestHandle {
754 async fn flush_async(&mut self) -> io::Result<()> {
755 *self.flushed_async.lock().unwrap() = true;
756 Ok(())
757 }
758 }
759
760 #[async_trait(?Send)]
761 impl FsProvider for UnsupportedProvider {
762 fn open(&self, _path: &Path, _flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
763 Err(unsupported())
764 }
765
766 async fn read(&self, _path: &Path) -> io::Result<Vec<u8>> {
767 Err(unsupported())
768 }
769
770 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
771 Err(unsupported())
772 }
773
774 async fn remove_file(&self, _path: &Path) -> io::Result<()> {
775 Err(unsupported())
776 }
777
778 async fn metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
779 Err(unsupported())
780 }
781
782 async fn symlink_metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
783 Err(unsupported())
784 }
785
786 async fn read_dir(&self, _path: &Path) -> io::Result<Vec<DirEntry>> {
787 Err(unsupported())
788 }
789
790 async fn canonicalize(&self, _path: &Path) -> io::Result<PathBuf> {
791 Err(unsupported())
792 }
793
794 async fn create_dir(&self, _path: &Path) -> io::Result<()> {
795 Err(unsupported())
796 }
797
798 async fn create_dir_all(&self, _path: &Path) -> io::Result<()> {
799 Err(unsupported())
800 }
801
802 async fn remove_dir(&self, _path: &Path) -> io::Result<()> {
803 Err(unsupported())
804 }
805
806 async fn remove_dir_all(&self, _path: &Path) -> io::Result<()> {
807 Err(unsupported())
808 }
809
810 async fn rename(&self, _from: &Path, _to: &Path) -> io::Result<()> {
811 Err(unsupported())
812 }
813
814 async fn set_readonly(&self, _path: &Path, _readonly: bool) -> io::Result<()> {
815 Err(unsupported())
816 }
817
818 async fn data_manifest_descriptor(
819 &self,
820 _request: &DataManifestRequest,
821 ) -> io::Result<DataManifestDescriptor> {
822 Err(unsupported())
823 }
824
825 async fn data_chunk_upload_targets(
826 &self,
827 _request: &DataChunkUploadRequest,
828 ) -> io::Result<Vec<DataChunkUploadTarget>> {
829 Err(unsupported())
830 }
831
832 async fn data_upload_chunk(
833 &self,
834 _target: &DataChunkUploadTarget,
835 _data: &[u8],
836 ) -> io::Result<()> {
837 Err(unsupported())
838 }
839 }
840
841 #[async_trait(?Send)]
842 impl FsProvider for AsyncOpenProvider {
843 fn open(&self, _path: &Path, _flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
844 Err(unsupported())
845 }
846
847 async fn open_async(
848 &self,
849 _path: &Path,
850 _flags: &OpenFlags,
851 ) -> io::Result<Box<dyn FileHandle>> {
852 *self.opened_async.lock().unwrap() = true;
853 Ok(Box::new(AsyncTestHandle {
854 cursor: 0,
855 data: b"async contents".to_vec(),
856 flushed_async: self.flushed_async.clone(),
857 }))
858 }
859
860 async fn read(&self, _path: &Path) -> io::Result<Vec<u8>> {
861 Err(unsupported())
862 }
863
864 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
865 Err(unsupported())
866 }
867
868 async fn remove_file(&self, _path: &Path) -> io::Result<()> {
869 Err(unsupported())
870 }
871
872 async fn metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
873 Err(unsupported())
874 }
875
876 async fn symlink_metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
877 Err(unsupported())
878 }
879
880 async fn read_dir(&self, _path: &Path) -> io::Result<Vec<DirEntry>> {
881 Err(unsupported())
882 }
883
884 async fn canonicalize(&self, _path: &Path) -> io::Result<PathBuf> {
885 Err(unsupported())
886 }
887
888 async fn create_dir(&self, _path: &Path) -> io::Result<()> {
889 Err(unsupported())
890 }
891
892 async fn create_dir_all(&self, _path: &Path) -> io::Result<()> {
893 Err(unsupported())
894 }
895
896 async fn remove_dir(&self, _path: &Path) -> io::Result<()> {
897 Err(unsupported())
898 }
899
900 async fn remove_dir_all(&self, _path: &Path) -> io::Result<()> {
901 Err(unsupported())
902 }
903
904 async fn rename(&self, _from: &Path, _to: &Path) -> io::Result<()> {
905 Err(unsupported())
906 }
907
908 async fn set_readonly(&self, _path: &Path, _readonly: bool) -> io::Result<()> {
909 Err(unsupported())
910 }
911 }
912
913 fn unsupported() -> io::Error {
914 io::Error::new(ErrorKind::Unsupported, "unsupported in test provider")
915 }
916
917 #[test]
918 fn copy_file_round_trip() {
919 let _guard = TEST_LOCK.lock().unwrap();
920 let dir = tempdir().expect("tempdir");
921 let src = dir.path().join("src.bin");
922 let dst = dir.path().join("dst.bin");
923 {
924 let mut file = std::fs::File::create(&src).expect("create src");
925 file.write_all(b"hello filesystem").expect("write src");
926 }
927
928 copy_file(&src, &dst).expect("copy");
929 let mut dst_file = File::open(&dst).expect("open dst");
930 let mut contents = Vec::new();
931 dst_file
932 .read_to_end(&mut contents)
933 .expect("read destination");
934 assert_eq!(contents, b"hello filesystem");
935 }
936
937 #[test]
938 fn set_readonly_flips_metadata_flag() {
939 let _guard = TEST_LOCK.lock().unwrap();
940 let dir = tempdir().expect("tempdir");
941 let path = dir.path().join("flag.txt");
942 futures::executor::block_on(write_async(&path, b"flag")).expect("write");
943
944 futures::executor::block_on(set_readonly_async(&path, true)).expect("set readonly");
945 let meta = futures::executor::block_on(metadata_async(&path)).expect("metadata");
946 assert!(meta.is_readonly());
947
948 futures::executor::block_on(set_readonly_async(&path, false)).expect("unset readonly");
949 let meta = futures::executor::block_on(metadata_async(&path)).expect("metadata");
950 assert!(!meta.is_readonly());
951 }
952
953 #[test]
954 fn replace_provider_restores_previous() {
955 let _guard = TEST_LOCK.lock().unwrap();
956 let original = current_provider();
957 let custom: Arc<dyn FsProvider> = Arc::new(UnsupportedProvider);
958 {
959 let _guard = replace_provider(custom.clone());
960 let active = current_provider();
961 assert!(Arc::ptr_eq(&active, &custom));
962 }
963 let final_provider = current_provider();
964 assert!(Arc::ptr_eq(&final_provider, &original));
965 }
966
967 #[test]
968 fn open_async_and_flush_async_use_provider_async_paths() {
969 let _guard = TEST_LOCK.lock().unwrap();
970 let opened_async = Arc::new(Mutex::new(false));
971 let flushed_async = Arc::new(Mutex::new(false));
972 let provider = Arc::new(AsyncOpenProvider {
973 opened_async: opened_async.clone(),
974 flushed_async: flushed_async.clone(),
975 });
976 let _provider_guard = replace_provider(provider);
977
978 let mut file =
979 futures::executor::block_on(OpenOptions::new().read(true).open_async("data.txt"))
980 .expect("async open");
981 let mut contents = String::new();
982 file.read_to_string(&mut contents).expect("read contents");
983 futures::executor::block_on(file.flush_async()).expect("async flush");
984
985 assert_eq!(contents, "async contents");
986 assert!(*opened_async.lock().unwrap());
987 assert!(*flushed_async.lock().unwrap());
988 }
989
990 #[test]
991 fn with_provider_restores_even_on_panic() {
992 let _guard = TEST_LOCK.lock().unwrap();
993 let original = current_provider();
994 let custom: Arc<dyn FsProvider> = Arc::new(UnsupportedProvider);
995 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
996 with_provider_override(custom.clone(), || {
997 let active = current_provider();
998 assert!(Arc::ptr_eq(&active, &custom));
999 panic!("boom");
1000 })
1001 }));
1002 assert!(result.is_err());
1003 let final_provider = current_provider();
1004 assert!(Arc::ptr_eq(&final_provider, &original));
1005 }
1006}