1use crate::error::*;
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22use std::time::SystemTime;
23
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use opendal::raw::normalize_root;
27use opendal::Operator;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncWriteCompatExt;
30use url::Url;
31
32use super::Storage;
33
34#[derive(Clone, Debug)]
35pub struct FileIO {
36 storage: Arc<Storage>,
37}
38
39impl FileIO {
40 pub fn from_url(path: &str) -> crate::Result<FileIOBuilder> {
44 let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
45 message: format!("Invalid URL: {path}"),
46 })?;
47
48 Ok(FileIOBuilder::new(url.scheme()))
49 }
50
51 pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
58 let path = path.as_ref();
59 let url = if looks_like_windows_drive_path(path) {
60 Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
61 message: format!("Input {path} is neither a valid url nor path"),
62 })?
63 } else {
64 Url::parse(path)
65 .map_err(|_| Error::ConfigInvalid {
66 message: format!("Invalid URL: {path}"),
67 })
68 .or_else(|_| {
69 Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
70 message: format!("Input {path} is neither a valid url nor path"),
71 })
72 })?
73 };
74 Ok(FileIOBuilder::new(url.scheme()))
75 }
76
77 pub fn new_input(&self, path: &str) -> crate::Result<InputFile> {
81 let (op, relative_path) = self.storage.create(path)?;
82 let path = path.to_string();
83 let relative_path_pos = path.len() - relative_path.len();
84 Ok(InputFile {
85 op,
86 path,
87 relative_path_pos,
88 })
89 }
90
91 pub fn new_output(&self, path: &str) -> Result<OutputFile> {
95 let (op, relative_path) = self.storage.create(path)?;
96 let path = path.to_string();
97 let relative_path_pos = path.len() - relative_path.len();
98 Ok(OutputFile {
99 op,
100 path,
101 relative_path_pos,
102 })
103 }
104
105 pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
109 let (op, relative_path) = self.storage.create(path)?;
110 let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu {
111 message: format!("Failed to get file status for '{path}'"),
112 })?;
113
114 Ok(FileStatus {
115 size: meta.content_length(),
116 is_dir: meta.is_dir(),
117 last_modified: meta
118 .last_modified()
119 .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
120 path: path.to_string(),
121 })
122 }
123
124 pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
130 let (op, relative_path) = self.storage.create(path)?;
131 let base_path = &path[..path.len() - relative_path.len()];
132 let list_path = normalize_root(relative_path);
135
136 let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu {
137 message: format!("Failed to list files in '{path}'"),
138 })?;
139
140 let mut statuses = Vec::new();
141 let list_path_normalized = list_path.trim_start_matches('/');
142 for entry in entries {
143 let entry_path = entry.path();
144 if entry_path.trim_start_matches('/') == list_path_normalized {
145 continue;
146 }
147 let meta = entry.metadata();
148 statuses.push(FileStatus {
149 size: meta.content_length(),
150 is_dir: meta.is_dir(),
151 path: format!("{base_path}{entry_path}"),
152 last_modified: meta
153 .last_modified()
154 .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
155 });
156 }
157
158 Ok(statuses)
159 }
160
161 pub async fn exists(&self, path: &str) -> Result<bool> {
165 let (op, relative_path) = self.storage.create(path)?;
166
167 op.exists(relative_path).await.context(IoUnexpectedSnafu {
168 message: format!("Failed to check existence of '{path}'"),
169 })
170 }
171
172 pub async fn delete_file(&self, path: &str) -> Result<()> {
176 let (op, relative_path) = self.storage.create(path)?;
177
178 op.delete(relative_path).await.context(IoUnexpectedSnafu {
179 message: format!("Failed to delete file '{path}'"),
180 })?;
181
182 Ok(())
183 }
184
185 pub async fn delete_dir(&self, path: &str) -> Result<()> {
189 let (op, relative_path) = self.storage.create(path)?;
190
191 op.remove_all(relative_path)
192 .await
193 .context(IoUnexpectedSnafu {
194 message: format!("Failed to delete directory '{path}'"),
195 })?;
196
197 Ok(())
198 }
199
200 pub async fn mkdirs(&self, path: &str) -> Result<()> {
206 let (op, relative_path) = self.storage.create(path)?;
207 let dir_path = normalize_root(relative_path);
209 op.create_dir(&dir_path).await.context(IoUnexpectedSnafu {
210 message: format!("Failed to create directory '{path}'"),
211 })?;
212
213 Ok(())
214 }
215
216 pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
220 let (op_src, relative_path_src) = self.storage.create(src)?;
221 let (_, relative_path_dst) = self.storage.create(dst)?;
222
223 op_src
224 .rename(relative_path_src, relative_path_dst)
225 .await
226 .context(IoUnexpectedSnafu {
227 message: format!("Failed to rename '{src}' to '{dst}'"),
228 })?;
229
230 Ok(())
231 }
232}
233
234fn looks_like_windows_drive_path(path: &str) -> bool {
235 let bytes = path.as_bytes();
236 bytes.len() >= 3
237 && bytes[0].is_ascii_alphabetic()
238 && bytes[1] == b':'
239 && matches!(bytes[2], b'\\' | b'/')
240}
241
242#[derive(Debug)]
243pub struct FileIOBuilder {
244 scheme_str: Option<String>,
245 props: HashMap<String, String>,
246}
247
248impl FileIOBuilder {
249 pub fn new(scheme_str: impl ToString) -> Self {
250 Self {
251 scheme_str: Some(scheme_str.to_string()),
252 props: HashMap::default(),
253 }
254 }
255
256 pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
257 (self.scheme_str.unwrap_or_default(), self.props)
258 }
259
260 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
261 self.props.insert(key.to_string(), value.to_string());
262 self
263 }
264
265 pub fn with_props(
266 mut self,
267 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
268 ) -> Self {
269 self.props
270 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
271 self
272 }
273
274 pub fn build(self) -> crate::Result<FileIO> {
275 let storage = Storage::build(self)?;
276 Ok(FileIO {
277 storage: Arc::new(storage),
278 })
279 }
280}
281
282#[async_trait::async_trait]
283pub trait FileRead: Send + Sync + Unpin + 'static {
284 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
285}
286
287#[async_trait::async_trait]
288impl FileRead for opendal::Reader {
289 async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
290 Ok(opendal::Reader::read(self, range).await?.to_bytes())
291 }
292}
293
294#[async_trait::async_trait]
295pub trait FileWrite: Send + Unpin + 'static {
296 async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
297
298 async fn close(&mut self) -> crate::Result<()>;
299}
300
301#[async_trait::async_trait]
302impl FileWrite for opendal::Writer {
303 async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
304 Ok(opendal::Writer::write(self, bs).await?)
305 }
306
307 async fn close(&mut self) -> crate::Result<()> {
308 opendal::Writer::close(self).await?;
309 Ok(())
310 }
311}
312
313pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {}
315
316impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWrite for T {}
317
318#[derive(Clone, Debug)]
319pub struct FileStatus {
320 pub size: u64,
321 pub is_dir: bool,
322 pub path: String,
323 pub last_modified: Option<DateTime<Utc>>,
324}
325
326#[derive(Debug)]
327pub struct InputFile {
328 op: Operator,
329 path: String,
330 relative_path_pos: usize,
331}
332
333impl InputFile {
334 pub fn location(&self) -> &str {
335 &self.path
336 }
337
338 pub async fn exists(&self) -> crate::Result<bool> {
339 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
340 }
341
342 pub async fn metadata(&self) -> crate::Result<FileStatus> {
343 let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
344
345 Ok(FileStatus {
346 size: meta.content_length(),
347 is_dir: meta.is_dir(),
348 path: self.path.clone(),
349 last_modified: meta
350 .last_modified()
351 .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
352 })
353 }
354
355 pub async fn read(&self) -> crate::Result<Bytes> {
356 Ok(self
357 .op
358 .read(&self.path[self.relative_path_pos..])
359 .await?
360 .to_bytes())
361 }
362
363 pub async fn reader(&self) -> crate::Result<impl FileRead> {
364 Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
365 }
366}
367
368#[derive(Debug, Clone)]
369pub struct OutputFile {
370 op: Operator,
371 path: String,
372 relative_path_pos: usize,
373}
374
375impl OutputFile {
376 pub fn location(&self) -> &str {
377 &self.path
378 }
379
380 pub async fn exists(&self) -> crate::Result<bool> {
381 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
382 }
383
384 pub fn to_input_file(self) -> InputFile {
385 InputFile {
386 op: self.op,
387 path: self.path,
388 relative_path_pos: self.relative_path_pos,
389 }
390 }
391
392 pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
393 let mut writer = self.writer().await?;
394 writer.write(bs).await?;
395 writer.close().await
396 }
397
398 pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
399 Ok(Box::new(self.opendal_writer().await?))
400 }
401
402 pub(crate) async fn async_writer(&self) -> crate::Result<Box<dyn AsyncFileWrite>> {
404 Ok(Box::new(
405 self.opendal_writer()
406 .await?
407 .into_futures_async_write()
408 .compat_write(),
409 ))
410 }
411
412 async fn opendal_writer(&self) -> crate::Result<opendal::Writer> {
413 Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
414 }
415}
416
417#[cfg(test)]
418mod file_action_test {
419 use std::collections::BTreeSet;
420 use std::fs;
421 use tempfile::tempdir;
422
423 use super::*;
424 use bytes::Bytes;
425
426 fn setup_memory_file_io() -> FileIO {
427 FileIOBuilder::new("memory").build().unwrap()
428 }
429
430 fn setup_fs_file_io() -> FileIO {
431 FileIOBuilder::new("file").build().unwrap()
432 }
433
434 fn local_file_path(path: &std::path::Path) -> String {
435 let normalized = path.to_string_lossy().replace('\\', "/");
436 if normalized.starts_with('/') {
437 format!("file:{normalized}")
438 } else {
439 format!("file:/{normalized}")
440 }
441 }
442
443 async fn common_test_get_status(file_io: &FileIO, path: &str) {
444 let output = file_io.new_output(path).unwrap();
445 let mut writer = output.writer().await.unwrap();
446 writer.write(Bytes::from("hello world")).await.unwrap();
447 writer.close().await.unwrap();
448
449 let status = file_io.get_status(path).await.unwrap();
450 assert_eq!(status.size, 11);
451
452 file_io.delete_file(path).await.unwrap();
453 }
454
455 async fn common_test_exists(file_io: &FileIO, path: &str) {
456 let output = file_io.new_output(path).unwrap();
457 let mut writer = output.writer().await.unwrap();
458 writer.write(Bytes::from("hello world")).await.unwrap();
459 writer.close().await.unwrap();
460
461 let exists = file_io.exists(path).await.unwrap();
462 assert!(exists);
463
464 file_io.delete_file(path).await.unwrap();
465 }
466
467 async fn common_test_delete_file(file_io: &FileIO, path: &str) {
468 let output = file_io.new_output(path).unwrap();
469 let mut writer = output.writer().await.unwrap();
470 writer.write(Bytes::from("hello world")).await.unwrap();
471 writer.close().await.unwrap();
472
473 file_io.delete_file(path).await.unwrap();
474
475 let exists = file_io.exists(path).await.unwrap();
476 assert!(!exists);
477 }
478
479 async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) {
480 file_io.mkdirs(dir_path).await.unwrap();
481
482 let exists = file_io.exists(dir_path).await.unwrap();
483 assert!(exists);
484
485 let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap());
486 }
487
488 async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) {
489 let output = file_io.new_output(src).unwrap();
490 let mut writer = output.writer().await.unwrap();
491 writer.write(Bytes::from("hello world")).await.unwrap();
492 writer.close().await.unwrap();
493
494 file_io.rename(src, dst).await.unwrap();
495
496 let exists_old = file_io.exists(src).await.unwrap();
497 let exists_new = file_io.exists(dst).await.unwrap();
498 assert!(!exists_old);
499 assert!(exists_new);
500
501 file_io.delete_file(dst).await.unwrap();
502 }
503
504 async fn common_test_list_status_paths(file_io: &FileIO, dir_path: &str) {
505 if let Some(local_dir) = dir_path.strip_prefix("file:/") {
506 let _ = fs::remove_dir_all(local_dir);
507 }
508
509 file_io.mkdirs(dir_path).await.unwrap();
510
511 let file_a = format!("{dir_path}a.txt");
512 let file_b = format!("{dir_path}b.txt");
513 for file in [&file_a, &file_b] {
514 file_io
515 .new_output(file)
516 .unwrap()
517 .write(Bytes::from("test data"))
518 .await
519 .unwrap();
520 }
521
522 let statuses = file_io.list_status(dir_path).await.unwrap();
523 assert_eq!(statuses.len(), 2);
524
525 let expected_paths: BTreeSet<String> =
526 [file_a.clone(), file_b.clone()].into_iter().collect();
527 let actual_paths: BTreeSet<String> =
528 statuses.iter().map(|status| status.path.clone()).collect();
529 assert_eq!(
530 actual_paths, expected_paths,
531 "list_status should return exact entry paths"
532 );
533
534 file_io.delete_dir(dir_path).await.unwrap();
535 }
536
537 #[tokio::test]
538 async fn test_delete_file_memory() {
539 let file_io = setup_memory_file_io();
540 common_test_delete_file(&file_io, "memory:/test_file_delete_mem").await;
541 }
542
543 #[tokio::test]
544 async fn test_empty_path_should_return_error_for_exists_fs() {
545 let file_io = setup_fs_file_io();
546 let result = file_io.exists("").await;
547 assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
548 }
549
550 #[tokio::test]
551 async fn test_empty_path_should_return_error_for_exists_memory() {
552 let file_io = setup_memory_file_io();
553 let result = file_io.exists("").await;
554 assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
555 }
556
557 #[tokio::test]
558 async fn test_memory_operator_reuse_across_file_io_calls() {
559 let file_io = setup_memory_file_io();
560 let path = "memory:/tmp/reuse_case";
561 let dir = "memory:/tmp/";
562
563 file_io
564 .new_output(path)
565 .unwrap()
566 .write(Bytes::from("data"))
567 .await
568 .unwrap();
569
570 assert!(file_io.exists(path).await.unwrap());
571 assert_eq!(file_io.get_status(path).await.unwrap().size, 4);
572 assert!(file_io
573 .list_status(dir)
574 .await
575 .unwrap()
576 .iter()
577 .any(|status| status.path == path));
578
579 file_io.delete_dir(dir).await.unwrap();
580 }
581
582 #[tokio::test]
583 async fn test_memory_operator_not_shared_between_file_io_instances() {
584 let file_io_1 = setup_memory_file_io();
585 let file_io_2 = setup_memory_file_io();
586 let path = "memory:/tmp/reuse_isolation_case";
587
588 file_io_1
589 .new_output(path)
590 .unwrap()
591 .write(Bytes::from("data"))
592 .await
593 .unwrap();
594
595 assert!(file_io_1.exists(path).await.unwrap());
596 assert!(!file_io_2.exists(path).await.unwrap());
597 }
598
599 #[tokio::test]
600 async fn test_get_status_fs() {
601 let file_io = setup_fs_file_io();
602 common_test_get_status(&file_io, "file:/tmp/test_file_get_status_fs").await;
603 }
604
605 #[tokio::test]
606 async fn test_exists_fs() {
607 let file_io = setup_fs_file_io();
608 common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await;
609 }
610
611 #[tokio::test]
612 async fn test_delete_file_fs() {
613 let file_io = setup_fs_file_io();
614 common_test_delete_file(&file_io, "file:/tmp/test_file_delete_fs").await;
615 }
616
617 #[tokio::test]
618 async fn test_mkdirs_fs() {
619 let file_io = setup_fs_file_io();
620 common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await;
621 }
622
623 #[tokio::test]
624 async fn test_rename_fs() {
625 let file_io = setup_fs_file_io();
626 common_test_rename(
627 &file_io,
628 "file:/tmp/test_file_fs_z",
629 "file:/tmp/new_test_file_fs_o",
630 )
631 .await;
632 }
633
634 #[tokio::test]
635 async fn test_list_status_fs_should_return_entry_paths() {
636 let file_io = setup_fs_file_io();
637 common_test_list_status_paths(&file_io, "file:/tmp/test_list_status_paths_fs/").await;
638 }
639
640 #[test]
641 fn test_from_path_detects_local_fs_path() {
642 let dir = tempdir().unwrap();
643 let file_io = FileIO::from_path(dir.path().to_string_lossy())
644 .unwrap()
645 .build()
646 .unwrap();
647 let path = local_file_path(&dir.path().join("from_path_detects_local_fs_path.txt"));
648
649 let rt = tokio::runtime::Runtime::new().unwrap();
650 rt.block_on(async {
651 file_io
652 .new_output(&path)
653 .unwrap()
654 .write(Bytes::from("data"))
655 .await
656 .unwrap();
657 assert!(file_io.exists(&path).await.unwrap());
658 });
659 }
660}
661
662#[cfg(test)]
663mod input_output_test {
664 use super::*;
665 use bytes::Bytes;
666
667 fn setup_memory_file_io() -> FileIO {
668 FileIOBuilder::new("memory").build().unwrap()
669 }
670
671 fn setup_fs_file_io() -> FileIO {
672 FileIOBuilder::new("file").build().unwrap()
673 }
674
675 async fn common_test_output_file_write_and_read(file_io: &FileIO, path: &str) {
676 let output = file_io.new_output(path).unwrap();
677 let mut writer = output.writer().await.unwrap();
678 writer.write(Bytes::from("hello world")).await.unwrap();
679 writer.close().await.unwrap();
680
681 let input = output.to_input_file();
682 let content = input.read().await.unwrap();
683
684 assert_eq!(&content[..], b"hello world");
685
686 file_io.delete_file(path).await.unwrap();
687 }
688
689 async fn common_test_output_file_exists(file_io: &FileIO, path: &str) {
690 let output = file_io.new_output(path).unwrap();
691 let mut writer = output.writer().await.unwrap();
692 writer.write(Bytes::from("hello world")).await.unwrap();
693 writer.close().await.unwrap();
694
695 let exists = output.exists().await.unwrap();
696 assert!(exists);
697
698 file_io.delete_file(path).await.unwrap();
699 }
700
701 async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) {
702 let output = file_io.new_output(path).unwrap();
703 let mut writer = output.writer().await.unwrap();
704 writer.write(Bytes::from("hello world")).await.unwrap();
705 writer.close().await.unwrap();
706
707 let input = output.to_input_file();
708 let metadata = input.metadata().await.unwrap();
709
710 assert_eq!(metadata.size, 11);
711
712 file_io.delete_file(path).await.unwrap();
713 }
714
715 async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str) {
716 let output = file_io.new_output(path).unwrap();
717 let mut writer = output.writer().await.unwrap();
718 writer.write(Bytes::from("hello world")).await.unwrap();
719 writer.close().await.unwrap();
720
721 let input = output.to_input_file();
722 let reader = input.reader().await.unwrap();
723 let partial_content = reader.read(0..5).await.unwrap(); assert_eq!(&partial_content[..], b"hello");
726
727 file_io.delete_file(path).await.unwrap();
728 }
729
730 #[tokio::test]
731 async fn test_output_file_write_and_read_memory() {
732 let file_io = setup_memory_file_io();
733 common_test_output_file_write_and_read(&file_io, "memory:/test_file_rw_mem").await;
734 }
735
736 #[tokio::test]
737 async fn test_output_file_exists_memory() {
738 let file_io = setup_memory_file_io();
739 common_test_output_file_exists(&file_io, "memory:/test_file_exist_mem").await;
740 }
741
742 #[tokio::test]
743 async fn test_input_file_metadata_memory() {
744 let file_io = setup_memory_file_io();
745 common_test_input_file_metadata(&file_io, "memory:/test_file_meta_mem").await;
746 }
747
748 #[tokio::test]
749 async fn test_input_file_partial_read_memory() {
750 let file_io = setup_memory_file_io();
751 common_test_input_file_partial_read(&file_io, "memory:/test_file_part_read_mem").await;
752 }
753
754 #[tokio::test]
755 async fn test_output_file_write_and_read_fs() {
756 let file_io = setup_fs_file_io();
757 common_test_output_file_write_and_read(&file_io, "file:/tmp/test_file_fs_rw").await;
758 }
759
760 #[tokio::test]
761 async fn test_output_file_exists_fs() {
762 let file_io = setup_fs_file_io();
763 common_test_output_file_exists(&file_io, "file:/tmp/test_file_exists").await;
764 }
765
766 #[tokio::test]
767 async fn test_input_file_metadata_fs() {
768 let file_io = setup_fs_file_io();
769 common_test_input_file_metadata(&file_io, "file:/tmp/test_file_meta").await;
770 }
771
772 #[tokio::test]
773 async fn test_input_file_partial_read_fs() {
774 let file_io = setup_fs_file_io();
775 common_test_input_file_partial_read(&file_io, "file:/tmp/test_file_read_fs").await;
776 }
777}