Skip to main content

fluss/io/
file_io.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19use crate::error::*;
20use std::collections::HashMap;
21use std::ops::Range;
22use std::sync::Arc;
23
24use bytes::Bytes;
25use jiff::Timestamp;
26use opendal::Operator;
27
28use url::Url;
29
30use super::Storage;
31
32use crate::error::Result;
33
34#[derive(Clone, Debug)]
35pub struct FileIO {
36    storage: Arc<Storage>,
37}
38
39impl FileIO {
40    /// Try to infer file io scheme from path.
41    pub fn from_url(path: &str) -> Result<FileIOBuilder> {
42        let url = Url::parse(path).map_err(|e| Error::IllegalArgument {
43            message: format!("Invalid URL '{path}': {e}"),
44        })?;
45        Ok(FileIOBuilder::new(url.scheme()))
46    }
47
48    /// Create a new input file to read data.
49    pub fn new_input(&self, path: &str) -> Result<InputFile> {
50        let (op, relative_path) = self.storage.create(path)?;
51        let path = path.to_string();
52        let relative_path_pos = path.len() - relative_path.len();
53        Ok(InputFile {
54            op,
55            path,
56            relative_path_pos,
57        })
58    }
59}
60
61#[derive(Debug)]
62pub struct FileIOBuilder {
63    scheme_str: Option<String>,
64    props: HashMap<String, String>,
65}
66
67impl FileIOBuilder {
68    pub fn new(scheme_str: impl ToString) -> Self {
69        Self {
70            scheme_str: Some(scheme_str.to_string()),
71            props: HashMap::default(),
72        }
73    }
74
75    pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
76        (self.scheme_str.unwrap_or_default(), self.props)
77    }
78
79    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
80        self.props.insert(key.to_string(), value.to_string());
81        self
82    }
83
84    pub fn with_props(
85        mut self,
86        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
87    ) -> Self {
88        self.props
89            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
90        self
91    }
92
93    pub fn build(self) -> Result<FileIO> {
94        let storage = Storage::build(self)?;
95        Ok(FileIO {
96            storage: Arc::new(storage),
97        })
98    }
99}
100
101pub trait FileRead: Send + Unpin + 'static {
102    fn read(&self, range: Range<u64>) -> impl Future<Output = Result<Bytes>> + Send;
103}
104
105impl FileRead for opendal::Reader {
106    async fn read(&self, range: Range<u64>) -> Result<Bytes> {
107        Ok(opendal::Reader::read(self, range).await?.to_bytes())
108    }
109}
110
111#[derive(Debug)]
112pub struct InputFile {
113    op: Operator,
114    path: String,
115    relative_path_pos: usize,
116}
117
118impl InputFile {
119    pub fn location(&self) -> &str {
120        &self.path
121    }
122
123    pub async fn exists(&self) -> Result<bool> {
124        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
125    }
126
127    pub async fn metadata(&self) -> Result<FileStatus> {
128        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
129
130        Ok(FileStatus {
131            size: meta.content_length(),
132            is_dir: meta.is_dir(),
133            path: self.path.clone(),
134            last_modified: meta.last_modified().map(Into::into),
135        })
136    }
137
138    pub async fn read(&self) -> Result<Bytes> {
139        Ok(self
140            .op
141            .read(&self.path[self.relative_path_pos..])
142            .await?
143            .to_bytes())
144    }
145
146    pub async fn reader(&self) -> Result<impl FileRead> {
147        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
148    }
149}
150
151#[derive(Clone, Debug)]
152pub struct FileStatus {
153    pub size: u64,
154    pub is_dir: bool,
155    pub path: String,
156    pub last_modified: Option<Timestamp>,
157}