1use async_trait::async_trait;
2use log::warn;
3use once_cell::sync::OnceCell;
4use std::ffi::OsString;
5use std::fmt;
6use std::io::{self, ErrorKind, Read, Seek, Write};
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, RwLock};
9use std::time::SystemTime;
10
11#[cfg(not(target_arch = "wasm32"))]
12mod native;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod remote;
15#[cfg(not(target_arch = "wasm32"))]
16pub mod sandbox;
17#[cfg(target_arch = "wasm32")]
18mod wasm;
19
20#[cfg(not(target_arch = "wasm32"))]
21pub use native::NativeFsProvider;
22#[cfg(not(target_arch = "wasm32"))]
23pub use remote::{RemoteFsConfig, RemoteFsProvider};
24#[cfg(not(target_arch = "wasm32"))]
25pub use sandbox::SandboxFsProvider;
26#[cfg(target_arch = "wasm32")]
27pub use wasm::PlaceholderFsProvider;
28
29pub mod data_contract;
30
31use data_contract::{
32 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
33};
34
35#[async_trait(?Send)]
36pub trait FileHandle: Read + Write + Seek + Send + Sync {
37 async fn flush_async(&mut self) -> io::Result<()> {
38 self.flush()
39 }
40
41 async fn sync_all_async(&mut self) -> io::Result<()> {
42 self.flush_async().await
43 }
44}
45
46#[async_trait(?Send)]
47impl FileHandle for std::fs::File {
48 async fn sync_all_async(&mut self) -> io::Result<()> {
49 std::fs::File::sync_all(self)
50 }
51}
52
53#[derive(Clone, Debug, Default)]
54pub struct OpenFlags {
55 pub read: bool,
56 pub write: bool,
57 pub append: bool,
58 pub truncate: bool,
59 pub create: bool,
60 pub create_new: bool,
61}
62
63#[derive(Clone, Debug)]
64pub struct OpenOptions {
65 flags: OpenFlags,
66}
67
68impl OpenOptions {
69 pub fn new() -> Self {
70 Self {
71 flags: OpenFlags::default(),
72 }
73 }
74
75 pub fn read(&mut self, value: bool) -> &mut Self {
76 self.flags.read = value;
77 self
78 }
79
80 pub fn write(&mut self, value: bool) -> &mut Self {
81 self.flags.write = value;
82 self
83 }
84
85 pub fn append(&mut self, value: bool) -> &mut Self {
86 self.flags.append = value;
87 self
88 }
89
90 pub fn truncate(&mut self, value: bool) -> &mut Self {
91 self.flags.truncate = value;
92 self
93 }
94
95 pub fn create(&mut self, value: bool) -> &mut Self {
96 self.flags.create = value;
97 self
98 }
99
100 pub fn create_new(&mut self, value: bool) -> &mut Self {
101 self.flags.create_new = value;
102 self
103 }
104
105 pub fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
106 let resolved = resolve_path(path.as_ref());
107 with_provider(|provider| provider.open(&resolved, &self.flags)).map(File::from_handle)
108 }
109
110 pub async fn open_async(&self, path: impl AsRef<Path>) -> io::Result<File> {
111 let resolved = resolve_path(path.as_ref());
112 let provider = current_provider();
113 provider
114 .open_async(&resolved, &self.flags)
115 .await
116 .map(File::from_handle)
117 }
118
119 pub fn flags(&self) -> &OpenFlags {
120 &self.flags
121 }
122}
123
124impl Default for OpenOptions {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq)]
131pub enum FsFileType {
132 Directory,
133 File,
134 Symlink,
135 Other,
136 Unknown,
137}
138
139#[derive(Clone, Debug)]
140pub struct FsMetadata {
141 file_type: FsFileType,
142 len: u64,
143 modified: Option<SystemTime>,
144 readonly: bool,
145 hash: Option<String>,
146}
147
148impl FsMetadata {
149 pub fn new(
150 file_type: FsFileType,
151 len: u64,
152 modified: Option<SystemTime>,
153 readonly: bool,
154 ) -> Self {
155 Self {
156 file_type,
157 len,
158 modified,
159 readonly,
160 hash: None,
161 }
162 }
163
164 pub fn new_with_hash(
165 file_type: FsFileType,
166 len: u64,
167 modified: Option<SystemTime>,
168 readonly: bool,
169 hash: Option<String>,
170 ) -> Self {
171 Self {
172 file_type,
173 len,
174 modified,
175 readonly,
176 hash,
177 }
178 }
179
180 pub fn file_type(&self) -> FsFileType {
181 self.file_type
182 }
183
184 pub fn is_dir(&self) -> bool {
185 matches!(self.file_type, FsFileType::Directory)
186 }
187
188 pub fn is_file(&self) -> bool {
189 matches!(self.file_type, FsFileType::File)
190 }
191
192 pub fn is_symlink(&self) -> bool {
193 matches!(self.file_type, FsFileType::Symlink)
194 }
195
196 pub fn len(&self) -> u64 {
197 self.len
198 }
199
200 pub fn hash(&self) -> Option<&str> {
201 self.hash.as_deref()
202 }
203
204 pub fn is_empty(&self) -> bool {
205 self.len == 0
206 }
207
208 pub fn modified(&self) -> Option<SystemTime> {
209 self.modified
210 }
211
212 pub fn is_readonly(&self) -> bool {
213 self.readonly
214 }
215}
216
217#[derive(Clone, Debug)]
218pub struct DirEntry {
219 path: PathBuf,
220 file_name: OsString,
221 file_type: FsFileType,
222}
223
224#[derive(Clone, Debug)]
225pub struct ReadManyEntry {
226 path: PathBuf,
227 bytes: Option<Vec<u8>>,
228 error: Option<String>,
229}
230
231impl ReadManyEntry {
232 pub fn new(path: PathBuf, bytes: Option<Vec<u8>>) -> Self {
233 Self {
234 path,
235 bytes,
236 error: None,
237 }
238 }
239
240 pub fn with_error(path: PathBuf, error: String) -> Self {
241 Self {
242 path,
243 bytes: None,
244 error: Some(error),
245 }
246 }
247
248 pub fn path(&self) -> &Path {
249 &self.path
250 }
251
252 pub fn bytes(&self) -> Option<&[u8]> {
253 self.bytes.as_deref()
254 }
255
256 pub fn into_bytes(self) -> Option<Vec<u8>> {
257 self.bytes
258 }
259
260 pub fn error(&self) -> Option<&str> {
261 self.error.as_deref()
262 }
263}
264
265impl DirEntry {
266 pub fn new(path: PathBuf, file_name: OsString, file_type: FsFileType) -> Self {
267 Self {
268 path,
269 file_name,
270 file_type,
271 }
272 }
273
274 pub fn path(&self) -> &Path {
275 &self.path
276 }
277
278 pub fn file_name(&self) -> &OsString {
279 &self.file_name
280 }
281
282 pub fn file_type(&self) -> FsFileType {
283 self.file_type
284 }
285
286 pub fn is_dir(&self) -> bool {
287 matches!(self.file_type, FsFileType::Directory)
288 }
289}
290
291#[async_trait(?Send)]
292pub trait FsProvider: Send + Sync + 'static {
293 fn open(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>>;
294 async fn open_async(&self, path: &Path, flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
295 self.open(path, flags)
296 }
297 async fn read(&self, path: &Path) -> io::Result<Vec<u8>>;
298 async fn write(&self, path: &Path, data: &[u8]) -> io::Result<()>;
299 async fn remove_file(&self, path: &Path) -> io::Result<()>;
300 async fn metadata(&self, path: &Path) -> io::Result<FsMetadata>;
301 async fn symlink_metadata(&self, path: &Path) -> io::Result<FsMetadata>;
302 async fn read_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>>;
303 async fn canonicalize(&self, path: &Path) -> io::Result<PathBuf>;
304 async fn create_dir(&self, path: &Path) -> io::Result<()>;
305 async fn create_dir_all(&self, path: &Path) -> io::Result<()>;
306 async fn remove_dir(&self, path: &Path) -> io::Result<()>;
307 async fn remove_dir_all(&self, path: &Path) -> io::Result<()>;
308 async fn rename(&self, from: &Path, to: &Path) -> io::Result<()>;
309 async fn set_readonly(&self, path: &Path, readonly: bool) -> io::Result<()>;
310
311 async fn read_many(&self, paths: &[PathBuf]) -> io::Result<Vec<ReadManyEntry>> {
312 let mut entries = Vec::with_capacity(paths.len());
313 for path in paths {
314 let entry = match self.read(path).await {
315 Ok(payload) => ReadManyEntry::new(path.clone(), Some(payload)),
316 Err(error) => {
317 warn!(
318 "fs.read_many.miss path={} kind={:?} error={}",
319 path.to_string_lossy(),
320 error.kind(),
321 error
322 );
323 ReadManyEntry::with_error(
324 path.clone(),
325 format!("kind={:?}; error={}", error.kind(), error),
326 )
327 }
328 };
329 entries.push(entry);
330 }
331 Ok(entries)
332 }
333
334 async fn data_manifest_descriptor(
335 &self,
336 _request: &DataManifestRequest,
337 ) -> io::Result<DataManifestDescriptor> {
338 Err(io::Error::new(
339 ErrorKind::Unsupported,
340 "data manifest descriptor is unsupported by this provider",
341 ))
342 }
343
344 async fn data_chunk_upload_targets(
345 &self,
346 _request: &DataChunkUploadRequest,
347 ) -> io::Result<Vec<DataChunkUploadTarget>> {
348 Err(io::Error::new(
349 ErrorKind::Unsupported,
350 "data chunk upload targets are unsupported by this provider",
351 ))
352 }
353
354 async fn data_upload_chunk(
355 &self,
356 _target: &DataChunkUploadTarget,
357 _data: &[u8],
358 ) -> io::Result<()> {
359 Err(io::Error::new(
360 ErrorKind::Unsupported,
361 "data chunk upload is unsupported by this provider",
362 ))
363 }
364}
365
366pub struct File {
367 inner: Box<dyn FileHandle>,
368}
369
370impl File {
371 fn from_handle(handle: Box<dyn FileHandle>) -> Self {
372 Self { inner: handle }
373 }
374
375 pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
376 let mut opts = OpenOptions::new();
377 opts.read(true);
378 opts.open(path)
379 }
380
381 pub async fn open_async(path: impl AsRef<Path>) -> io::Result<Self> {
382 let mut opts = OpenOptions::new();
383 opts.read(true);
384 opts.open_async(path).await
385 }
386
387 pub fn create(path: impl AsRef<Path>) -> io::Result<Self> {
388 let mut opts = OpenOptions::new();
389 opts.write(true).create(true).truncate(true);
390 opts.open(path)
391 }
392
393 pub async fn create_async(path: impl AsRef<Path>) -> io::Result<Self> {
394 let mut opts = OpenOptions::new();
395 opts.write(true).create(true).truncate(true);
396 opts.open_async(path).await
397 }
398
399 pub async fn flush_async(&mut self) -> io::Result<()> {
400 self.inner.flush_async().await
401 }
402
403 pub async fn sync_all_async(&mut self) -> io::Result<()> {
404 self.inner.sync_all_async().await
405 }
406}
407
408impl fmt::Debug for File {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("File").finish_non_exhaustive()
411 }
412}
413
414impl Read for File {
415 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
416 self.inner.read(buf)
417 }
418}
419
420impl Write for File {
421 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
422 self.inner.write(buf)
423 }
424
425 fn flush(&mut self) -> io::Result<()> {
426 self.inner.flush()
427 }
428}
429
430impl Seek for File {
431 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
432 self.inner.seek(pos)
433 }
434}
435
436static PROVIDER: OnceCell<RwLock<Arc<dyn FsProvider>>> = OnceCell::new();
437#[cfg(target_arch = "wasm32")]
438static CURRENT_DIR: OnceCell<RwLock<PathBuf>> = OnceCell::new();
439
440fn provider_lock() -> &'static RwLock<Arc<dyn FsProvider>> {
441 PROVIDER.get_or_init(|| RwLock::new(default_provider()))
442}
443
444#[cfg(target_arch = "wasm32")]
445fn current_dir_lock() -> &'static RwLock<PathBuf> {
446 CURRENT_DIR.get_or_init(|| RwLock::new(PathBuf::from("/")))
447}
448
449fn with_provider<T>(f: impl FnOnce(&dyn FsProvider) -> T) -> T {
450 let guard = provider_lock()
451 .read()
452 .expect("filesystem provider lock poisoned");
453 f(&**guard)
454}
455
456fn resolve_path(path: &Path) -> PathBuf {
457 #[cfg(target_arch = "wasm32")]
458 {
459 if path.is_absolute() {
460 return path.to_path_buf();
461 }
462 if let Ok(base) = current_dir() {
463 return base.join(path);
464 }
465 PathBuf::from("/").join(path)
466 }
467 #[cfg(not(target_arch = "wasm32"))]
468 {
469 path.to_path_buf()
470 }
471}
472
473pub fn set_provider(provider: Arc<dyn FsProvider>) {
474 let mut guard = provider_lock()
475 .write()
476 .expect("filesystem provider lock poisoned");
477 *guard = provider;
478}
479
480pub fn replace_provider(provider: Arc<dyn FsProvider>) -> ProviderGuard {
484 let mut guard = provider_lock()
485 .write()
486 .expect("filesystem provider lock poisoned");
487 let previous = guard.clone();
488 *guard = provider;
489 ProviderGuard { previous }
490}
491
492pub fn with_provider_override<R>(provider: Arc<dyn FsProvider>, f: impl FnOnce() -> R) -> R {
495 let guard = replace_provider(provider);
496 let result = f();
497 drop(guard);
498 result
499}
500
501pub fn current_provider() -> Arc<dyn FsProvider> {
503 provider_lock()
504 .read()
505 .expect("filesystem provider lock poisoned")
506 .clone()
507}
508
509pub fn current_dir() -> io::Result<PathBuf> {
510 #[cfg(target_arch = "wasm32")]
511 {
512 return Ok(current_dir_lock()
513 .read()
514 .expect("filesystem current dir lock poisoned")
515 .clone());
516 }
517 #[cfg(not(target_arch = "wasm32"))]
518 {
519 std::env::current_dir()
520 }
521}
522
523pub fn set_current_dir(path: impl AsRef<Path>) -> io::Result<()> {
524 #[cfg(target_arch = "wasm32")]
525 {
526 let mut target = PathBuf::from(path.as_ref());
527 if !target.is_absolute() {
528 let base = current_dir()?;
529 target = base.join(target);
530 }
531 let canonical =
532 futures::executor::block_on(canonicalize_async(&target)).unwrap_or(target.clone());
533 let metadata = futures::executor::block_on(metadata_async(&canonical))?;
534 if !metadata.is_dir() {
535 return Err(io::Error::new(
536 ErrorKind::NotFound,
537 format!("Not a directory: {}", canonical.display()),
538 ));
539 }
540 let mut guard = current_dir_lock()
541 .write()
542 .expect("filesystem current dir lock poisoned");
543 *guard = canonical;
544 Ok(())
545 }
546 #[cfg(not(target_arch = "wasm32"))]
547 {
548 std::env::set_current_dir(path)
549 }
550}
551
552pub struct ProviderGuard {
553 previous: Arc<dyn FsProvider>,
554}
555
556impl Drop for ProviderGuard {
557 fn drop(&mut self) {
558 set_provider(self.previous.clone());
559 }
560}
561
562pub async fn read_many_async(paths: &[PathBuf]) -> io::Result<Vec<ReadManyEntry>> {
563 let resolved = paths
564 .iter()
565 .map(|path| resolve_path(path.as_path()))
566 .collect::<Vec<_>>();
567 let provider = current_provider();
568 provider.read_many(&resolved).await
569}
570
571pub async fn read_async(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
572 let resolved = resolve_path(path.as_ref());
573 let provider = current_provider();
574 provider.read(&resolved).await
575}
576
577pub async fn read_to_string_async(path: impl AsRef<Path>) -> io::Result<String> {
578 let bytes = read_async(path).await?;
579 String::from_utf8(bytes).map_err(|err| io::Error::new(ErrorKind::InvalidData, err.utf8_error()))
580}
581
582pub async fn write_async(path: impl AsRef<Path>, data: impl AsRef<[u8]>) -> io::Result<()> {
583 let resolved = resolve_path(path.as_ref());
584 let provider = current_provider();
585 provider.write(&resolved, data.as_ref()).await
586}
587
588pub async fn remove_file_async(path: impl AsRef<Path>) -> io::Result<()> {
589 let resolved = resolve_path(path.as_ref());
590 let provider = current_provider();
591 provider.remove_file(&resolved).await
592}
593
594pub async fn metadata_async(path: impl AsRef<Path>) -> io::Result<FsMetadata> {
595 let resolved = resolve_path(path.as_ref());
596 let provider = current_provider();
597 provider.metadata(&resolved).await
598}
599
600pub async fn symlink_metadata_async(path: impl AsRef<Path>) -> io::Result<FsMetadata> {
601 let resolved = resolve_path(path.as_ref());
602 let provider = current_provider();
603 provider.symlink_metadata(&resolved).await
604}
605
606pub async fn read_dir_async(path: impl AsRef<Path>) -> io::Result<Vec<DirEntry>> {
607 let resolved = resolve_path(path.as_ref());
608 let provider = current_provider();
609 provider.read_dir(&resolved).await
610}
611
612pub async fn canonicalize_async(path: impl AsRef<Path>) -> io::Result<PathBuf> {
613 let resolved = resolve_path(path.as_ref());
614 let provider = current_provider();
615 provider.canonicalize(&resolved).await
616}
617
618pub async fn create_dir_async(path: impl AsRef<Path>) -> io::Result<()> {
619 let resolved = resolve_path(path.as_ref());
620 let provider = current_provider();
621 provider.create_dir(&resolved).await
622}
623
624pub async fn create_dir_all_async(path: impl AsRef<Path>) -> io::Result<()> {
625 let resolved = resolve_path(path.as_ref());
626 let provider = current_provider();
627 provider.create_dir_all(&resolved).await
628}
629
630pub async fn remove_dir_async(path: impl AsRef<Path>) -> io::Result<()> {
631 let resolved = resolve_path(path.as_ref());
632 let provider = current_provider();
633 provider.remove_dir(&resolved).await
634}
635
636pub async fn remove_dir_all_async(path: impl AsRef<Path>) -> io::Result<()> {
637 let resolved = resolve_path(path.as_ref());
638 let provider = current_provider();
639 provider.remove_dir_all(&resolved).await
640}
641
642pub async fn rename_async(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<()> {
643 let resolved_from = resolve_path(from.as_ref());
644 let resolved_to = resolve_path(to.as_ref());
645 let provider = current_provider();
646 provider.rename(&resolved_from, &resolved_to).await
647}
648
649pub async fn set_readonly_async(path: impl AsRef<Path>, readonly: bool) -> io::Result<()> {
650 let resolved = resolve_path(path.as_ref());
651 let provider = current_provider();
652 provider.set_readonly(&resolved, readonly).await
653}
654
655pub async fn data_manifest_descriptor_async(
656 request: &DataManifestRequest,
657) -> io::Result<DataManifestDescriptor> {
658 let provider = current_provider();
659 provider.data_manifest_descriptor(request).await
660}
661
662pub async fn data_chunk_upload_targets_async(
663 request: &DataChunkUploadRequest,
664) -> io::Result<Vec<DataChunkUploadTarget>> {
665 let provider = current_provider();
666 provider.data_chunk_upload_targets(request).await
667}
668
669pub async fn data_upload_chunk_async(
670 target: &DataChunkUploadTarget,
671 data: &[u8],
672) -> io::Result<()> {
673 let provider = current_provider();
674 provider.data_upload_chunk(target, data).await
675}
676
677pub fn copy_file(from: impl AsRef<Path>, to: impl AsRef<Path>) -> io::Result<u64> {
680 let mut reader = OpenOptions::new().read(true).open(from.as_ref())?;
681 let mut writer = OpenOptions::new()
682 .write(true)
683 .create(true)
684 .truncate(true)
685 .open(to.as_ref())?;
686 io::copy(&mut reader, &mut writer)
687}
688
689fn default_provider() -> Arc<dyn FsProvider> {
690 #[cfg(not(target_arch = "wasm32"))]
691 {
692 Arc::new(NativeFsProvider)
693 }
694 #[cfg(target_arch = "wasm32")]
695 {
696 Arc::new(PlaceholderFsProvider)
697 }
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703 use once_cell::sync::Lazy;
704 use std::io::{Read, Seek, SeekFrom, Write};
705 use std::sync::Mutex;
706 use tempfile::tempdir;
707
708 static TEST_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
709
710 struct UnsupportedProvider;
711
712 struct AsyncOpenProvider {
713 opened_async: Arc<Mutex<bool>>,
714 flushed_async: Arc<Mutex<bool>>,
715 }
716
717 struct AsyncTestHandle {
718 cursor: usize,
719 data: Vec<u8>,
720 flushed_async: Arc<Mutex<bool>>,
721 }
722
723 impl Read for AsyncTestHandle {
724 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
725 let remaining = self.data.len().saturating_sub(self.cursor);
726 let to_read = remaining.min(buf.len());
727 buf[..to_read].copy_from_slice(&self.data[self.cursor..self.cursor + to_read]);
728 self.cursor += to_read;
729 Ok(to_read)
730 }
731 }
732
733 impl Write for AsyncTestHandle {
734 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
735 let end = self.cursor + buf.len();
736 if end > self.data.len() {
737 self.data.resize(end, 0);
738 }
739 self.data[self.cursor..end].copy_from_slice(buf);
740 self.cursor = end;
741 Ok(buf.len())
742 }
743
744 fn flush(&mut self) -> io::Result<()> {
745 Ok(())
746 }
747 }
748
749 impl Seek for AsyncTestHandle {
750 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
751 let next = match pos {
752 SeekFrom::Start(offset) => offset as i64,
753 SeekFrom::End(offset) => self.data.len() as i64 + offset,
754 SeekFrom::Current(offset) => self.cursor as i64 + offset,
755 };
756 if next < 0 {
757 return Err(io::Error::new(ErrorKind::InvalidInput, "seek before start"));
758 }
759 self.cursor = next as usize;
760 Ok(self.cursor as u64)
761 }
762 }
763
764 #[async_trait(?Send)]
765 impl FileHandle for AsyncTestHandle {
766 async fn flush_async(&mut self) -> io::Result<()> {
767 *self.flushed_async.lock().unwrap() = true;
768 Ok(())
769 }
770 }
771
772 #[async_trait(?Send)]
773 impl FsProvider for UnsupportedProvider {
774 fn open(&self, _path: &Path, _flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
775 Err(unsupported())
776 }
777
778 async fn read(&self, _path: &Path) -> io::Result<Vec<u8>> {
779 Err(unsupported())
780 }
781
782 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
783 Err(unsupported())
784 }
785
786 async fn remove_file(&self, _path: &Path) -> io::Result<()> {
787 Err(unsupported())
788 }
789
790 async fn metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
791 Err(unsupported())
792 }
793
794 async fn symlink_metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
795 Err(unsupported())
796 }
797
798 async fn read_dir(&self, _path: &Path) -> io::Result<Vec<DirEntry>> {
799 Err(unsupported())
800 }
801
802 async fn canonicalize(&self, _path: &Path) -> io::Result<PathBuf> {
803 Err(unsupported())
804 }
805
806 async fn create_dir(&self, _path: &Path) -> io::Result<()> {
807 Err(unsupported())
808 }
809
810 async fn create_dir_all(&self, _path: &Path) -> io::Result<()> {
811 Err(unsupported())
812 }
813
814 async fn remove_dir(&self, _path: &Path) -> io::Result<()> {
815 Err(unsupported())
816 }
817
818 async fn remove_dir_all(&self, _path: &Path) -> io::Result<()> {
819 Err(unsupported())
820 }
821
822 async fn rename(&self, _from: &Path, _to: &Path) -> io::Result<()> {
823 Err(unsupported())
824 }
825
826 async fn set_readonly(&self, _path: &Path, _readonly: bool) -> io::Result<()> {
827 Err(unsupported())
828 }
829
830 async fn data_manifest_descriptor(
831 &self,
832 _request: &DataManifestRequest,
833 ) -> io::Result<DataManifestDescriptor> {
834 Err(unsupported())
835 }
836
837 async fn data_chunk_upload_targets(
838 &self,
839 _request: &DataChunkUploadRequest,
840 ) -> io::Result<Vec<DataChunkUploadTarget>> {
841 Err(unsupported())
842 }
843
844 async fn data_upload_chunk(
845 &self,
846 _target: &DataChunkUploadTarget,
847 _data: &[u8],
848 ) -> io::Result<()> {
849 Err(unsupported())
850 }
851 }
852
853 #[async_trait(?Send)]
854 impl FsProvider for AsyncOpenProvider {
855 fn open(&self, _path: &Path, _flags: &OpenFlags) -> io::Result<Box<dyn FileHandle>> {
856 Err(unsupported())
857 }
858
859 async fn open_async(
860 &self,
861 _path: &Path,
862 _flags: &OpenFlags,
863 ) -> io::Result<Box<dyn FileHandle>> {
864 *self.opened_async.lock().unwrap() = true;
865 Ok(Box::new(AsyncTestHandle {
866 cursor: 0,
867 data: b"async contents".to_vec(),
868 flushed_async: self.flushed_async.clone(),
869 }))
870 }
871
872 async fn read(&self, _path: &Path) -> io::Result<Vec<u8>> {
873 Err(unsupported())
874 }
875
876 async fn write(&self, _path: &Path, _data: &[u8]) -> io::Result<()> {
877 Err(unsupported())
878 }
879
880 async fn remove_file(&self, _path: &Path) -> io::Result<()> {
881 Err(unsupported())
882 }
883
884 async fn metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
885 Err(unsupported())
886 }
887
888 async fn symlink_metadata(&self, _path: &Path) -> io::Result<FsMetadata> {
889 Err(unsupported())
890 }
891
892 async fn read_dir(&self, _path: &Path) -> io::Result<Vec<DirEntry>> {
893 Err(unsupported())
894 }
895
896 async fn canonicalize(&self, _path: &Path) -> io::Result<PathBuf> {
897 Err(unsupported())
898 }
899
900 async fn create_dir(&self, _path: &Path) -> io::Result<()> {
901 Err(unsupported())
902 }
903
904 async fn create_dir_all(&self, _path: &Path) -> io::Result<()> {
905 Err(unsupported())
906 }
907
908 async fn remove_dir(&self, _path: &Path) -> io::Result<()> {
909 Err(unsupported())
910 }
911
912 async fn remove_dir_all(&self, _path: &Path) -> io::Result<()> {
913 Err(unsupported())
914 }
915
916 async fn rename(&self, _from: &Path, _to: &Path) -> io::Result<()> {
917 Err(unsupported())
918 }
919
920 async fn set_readonly(&self, _path: &Path, _readonly: bool) -> io::Result<()> {
921 Err(unsupported())
922 }
923 }
924
925 fn unsupported() -> io::Error {
926 io::Error::new(ErrorKind::Unsupported, "unsupported in test provider")
927 }
928
929 #[test]
930 fn copy_file_round_trip() {
931 let _guard = TEST_LOCK.lock().unwrap();
932 let dir = tempdir().expect("tempdir");
933 let src = dir.path().join("src.bin");
934 let dst = dir.path().join("dst.bin");
935 {
936 let mut file = std::fs::File::create(&src).expect("create src");
937 file.write_all(b"hello filesystem").expect("write src");
938 }
939
940 copy_file(&src, &dst).expect("copy");
941 let mut dst_file = File::open(&dst).expect("open dst");
942 let mut contents = Vec::new();
943 dst_file
944 .read_to_end(&mut contents)
945 .expect("read destination");
946 assert_eq!(contents, b"hello filesystem");
947 }
948
949 #[test]
950 fn set_readonly_flips_metadata_flag() {
951 let _guard = TEST_LOCK.lock().unwrap();
952 let dir = tempdir().expect("tempdir");
953 let path = dir.path().join("flag.txt");
954 futures::executor::block_on(write_async(&path, b"flag")).expect("write");
955
956 futures::executor::block_on(set_readonly_async(&path, true)).expect("set readonly");
957 let meta = futures::executor::block_on(metadata_async(&path)).expect("metadata");
958 assert!(meta.is_readonly());
959
960 futures::executor::block_on(set_readonly_async(&path, false)).expect("unset readonly");
961 let meta = futures::executor::block_on(metadata_async(&path)).expect("metadata");
962 assert!(!meta.is_readonly());
963 }
964
965 #[test]
966 fn replace_provider_restores_previous() {
967 let _guard = TEST_LOCK.lock().unwrap();
968 let original = current_provider();
969 let custom: Arc<dyn FsProvider> = Arc::new(UnsupportedProvider);
970 {
971 let _guard = replace_provider(custom.clone());
972 let active = current_provider();
973 assert!(Arc::ptr_eq(&active, &custom));
974 }
975 let final_provider = current_provider();
976 assert!(Arc::ptr_eq(&final_provider, &original));
977 }
978
979 #[test]
980 fn open_async_and_flush_async_use_provider_async_paths() {
981 let _guard = TEST_LOCK.lock().unwrap();
982 let opened_async = Arc::new(Mutex::new(false));
983 let flushed_async = Arc::new(Mutex::new(false));
984 let provider = Arc::new(AsyncOpenProvider {
985 opened_async: opened_async.clone(),
986 flushed_async: flushed_async.clone(),
987 });
988 let _provider_guard = replace_provider(provider);
989
990 let mut file =
991 futures::executor::block_on(OpenOptions::new().read(true).open_async("data.txt"))
992 .expect("async open");
993 let mut contents = String::new();
994 file.read_to_string(&mut contents).expect("read contents");
995 futures::executor::block_on(file.flush_async()).expect("async flush");
996
997 assert_eq!(contents, "async contents");
998 assert!(*opened_async.lock().unwrap());
999 assert!(*flushed_async.lock().unwrap());
1000 }
1001
1002 #[test]
1003 fn with_provider_restores_even_on_panic() {
1004 let _guard = TEST_LOCK.lock().unwrap();
1005 let original = current_provider();
1006 let custom: Arc<dyn FsProvider> = Arc::new(UnsupportedProvider);
1007 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1008 with_provider_override(custom.clone(), || {
1009 let active = current_provider();
1010 assert!(Arc::ptr_eq(&active, &custom));
1011 panic!("boom");
1012 })
1013 }));
1014 assert!(result.is_err());
1015 let final_provider = current_provider();
1016 assert!(Arc::ptr_eq(&final_provider, &original));
1017 }
1018}