Skip to main content

fluss/io/
storage.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18use crate::error;
19use crate::error::Result;
20use crate::io::FileIOBuilder;
21use opendal::{Operator, Scheme};
22#[cfg(any(feature = "storage-s3", feature = "storage-oss"))]
23use std::collections::HashMap;
24
25/// The storage carries all supported storage services in fluss
26#[derive(Debug)]
27pub enum Storage {
28    #[cfg(feature = "storage-memory")]
29    Memory,
30    #[cfg(feature = "storage-fs")]
31    LocalFs,
32    #[cfg(feature = "storage-s3")]
33    S3 { props: HashMap<String, String> },
34    #[cfg(feature = "storage-oss")]
35    Oss { props: HashMap<String, String> },
36}
37
38impl Storage {
39    #[allow(unused_variables)]
40    pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
41        let (scheme_str, props) = file_io_builder.into_parts();
42        let scheme = Self::parse_scheme(&scheme_str)?;
43
44        match scheme {
45            #[cfg(feature = "storage-memory")]
46            Scheme::Memory => Ok(Self::Memory),
47            #[cfg(feature = "storage-fs")]
48            Scheme::Fs => Ok(Self::LocalFs),
49            #[cfg(feature = "storage-s3")]
50            Scheme::S3 => Ok(Self::S3 { props }),
51            #[cfg(feature = "storage-oss")]
52            Scheme::Oss => Ok(Self::Oss { props }),
53            _ => Err(error::Error::IoUnsupported {
54                message: format!("Unsupported storage feature {scheme_str}"),
55            }),
56        }
57    }
58
59    pub(crate) fn create<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> {
60        match self {
61            #[cfg(feature = "storage-memory")]
62            Storage::Memory => {
63                let op = super::memory_config_build()?;
64
65                if let Some(stripped) = path.strip_prefix("memory:/") {
66                    Ok((op, stripped))
67                } else {
68                    Ok((op, &path[1..]))
69                }
70            }
71            #[cfg(feature = "storage-fs")]
72            Storage::LocalFs => {
73                let op = super::fs_config_build()?;
74                if let Some(stripped) = path.strip_prefix("file:/") {
75                    Ok((op, stripped))
76                } else {
77                    Ok((op, &path[1..]))
78                }
79            }
80            #[cfg(feature = "storage-s3")]
81            Storage::S3 { props } => {
82                let (bucket, key) = super::parse_s3_path(path);
83                let mut s3_props = props.clone();
84                s3_props.insert("bucket".to_string(), bucket.to_string());
85                let op = super::s3_config_build(&s3_props)?;
86                Ok((op, key))
87            }
88            #[cfg(feature = "storage-oss")]
89            Storage::Oss { props } => {
90                let (bucket, key) = super::parse_oss_path(path);
91                let mut oss_props = props.clone();
92                oss_props.insert("bucket".to_string(), bucket.to_string());
93                let op = super::oss_config_build(&oss_props)?;
94                Ok((op, key))
95            }
96        }
97    }
98
99    fn parse_scheme(scheme: &str) -> Result<Scheme> {
100        match scheme {
101            "memory" => Ok(Scheme::Memory),
102            "file" | "" => Ok(Scheme::Fs),
103            "s3" | "s3a" => Ok(Scheme::S3),
104            "oss" => Ok(Scheme::Oss),
105            s => Ok(s.parse::<Scheme>()?),
106        }
107    }
108}