1use crate::error::*;
20use std::collections::HashMap;
21use std::ops::Range;
22use std::sync::Arc;
23
24use bytes::Bytes;
25use jiff::Timestamp;
26use opendal::Operator;
27
28use url::Url;
29
30use super::Storage;
31
32use crate::error::Result;
33
34#[derive(Clone, Debug)]
35pub struct FileIO {
36 storage: Arc<Storage>,
37}
38
39impl FileIO {
40 pub fn from_url(path: &str) -> Result<FileIOBuilder> {
42 let url = Url::parse(path).map_err(|e| Error::IllegalArgument {
43 message: format!("Invalid URL '{path}': {e}"),
44 })?;
45 Ok(FileIOBuilder::new(url.scheme()))
46 }
47
48 pub fn new_input(&self, path: &str) -> Result<InputFile> {
50 let (op, relative_path) = self.storage.create(path)?;
51 let path = path.to_string();
52 let relative_path_pos = path.len() - relative_path.len();
53 Ok(InputFile {
54 op,
55 path,
56 relative_path_pos,
57 })
58 }
59}
60
61#[derive(Debug)]
62pub struct FileIOBuilder {
63 scheme_str: Option<String>,
64 props: HashMap<String, String>,
65}
66
67impl FileIOBuilder {
68 pub fn new(scheme_str: impl ToString) -> Self {
69 Self {
70 scheme_str: Some(scheme_str.to_string()),
71 props: HashMap::default(),
72 }
73 }
74
75 pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
76 (self.scheme_str.unwrap_or_default(), self.props)
77 }
78
79 pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
80 self.props.insert(key.to_string(), value.to_string());
81 self
82 }
83
84 pub fn with_props(
85 mut self,
86 args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
87 ) -> Self {
88 self.props
89 .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
90 self
91 }
92
93 pub fn build(self) -> Result<FileIO> {
94 let storage = Storage::build(self)?;
95 Ok(FileIO {
96 storage: Arc::new(storage),
97 })
98 }
99}
100
101pub trait FileRead: Send + Unpin + 'static {
102 fn read(&self, range: Range<u64>) -> impl Future<Output = Result<Bytes>> + Send;
103}
104
105impl FileRead for opendal::Reader {
106 async fn read(&self, range: Range<u64>) -> Result<Bytes> {
107 Ok(opendal::Reader::read(self, range).await?.to_bytes())
108 }
109}
110
111#[derive(Debug)]
112pub struct InputFile {
113 op: Operator,
114 path: String,
115 relative_path_pos: usize,
116}
117
118impl InputFile {
119 pub fn location(&self) -> &str {
120 &self.path
121 }
122
123 pub async fn exists(&self) -> Result<bool> {
124 Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
125 }
126
127 pub async fn metadata(&self) -> Result<FileStatus> {
128 let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
129
130 Ok(FileStatus {
131 size: meta.content_length(),
132 is_dir: meta.is_dir(),
133 path: self.path.clone(),
134 last_modified: meta.last_modified().map(Into::into),
135 })
136 }
137
138 pub async fn read(&self) -> Result<Bytes> {
139 Ok(self
140 .op
141 .read(&self.path[self.relative_path_pos..])
142 .await?
143 .to_bytes())
144 }
145
146 pub async fn reader(&self) -> Result<impl FileRead> {
147 Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
148 }
149}
150
151#[derive(Clone, Debug)]
152pub struct FileStatus {
153 pub size: u64,
154 pub is_dir: bool,
155 pub path: String,
156 pub last_modified: Option<Timestamp>,
157}