Skip to main content

tibba_opendal/
lib.rs

1// Copyright 2026 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use opendal::Operator;
16use opendal::layers::MimeGuessLayer;
17use path_absolutize::Absolutize;
18use serde::Deserialize;
19use snafu::{ResultExt, Snafu};
20use std::path::PathBuf;
21use tibba_config::Config;
22use tibba_error::Error as BaseError;
23use tibba_util::parse_uri;
24use validator::Validate;
25
26mod storage;
27
28pub use storage::*;
29
30/// MySQL 存储 URL 前缀。
31const MYSQL_PREFIX: &str = "mysql://";
32/// 本地文件系统存储 URL 前缀。
33const FS_PREFIX: &str = "file://";
34
35/// OpenDAL 存储配置,`url` 决定后端类型,`schema` 可显式指定协议(如 "http")。
36#[derive(Debug, Clone, Validate, Deserialize, Default)]
37pub struct OpenDalConfig {
38    #[validate(length(min = 10))]
39    pub url: String,
40    #[serde(default)]
41    pub schema: String,
42}
43
44/// 从应用配置中读取并校验 OpenDalConfig。
45fn new_opendal_config(config: &Config) -> Result<OpenDalConfig> {
46    let open_dal_config = config
47        .try_deserialize::<OpenDalConfig>()
48        .context(ConfigSnafu)?;
49    open_dal_config.validate().context(ValidateSnafu)?;
50    Ok(open_dal_config)
51}
52
53#[derive(Debug, Snafu)]
54pub enum Error {
55    #[snafu(display("open dal {source}"))]
56    OpenDal {
57        #[snafu(source(from(opendal::Error, Box::new)))]
58        source: Box<opendal::Error>,
59    },
60    #[snafu(display("config error: {source}"))]
61    Config {
62        #[snafu(source(from(tibba_config::Error, Box::new)))]
63        source: Box<tibba_config::Error>,
64    },
65    #[snafu(display("parse uri error: {source}"))]
66    ParseUri {
67        #[snafu(source(from(tibba_util::Error, Box::new)))]
68        source: Box<tibba_util::Error>,
69    },
70    #[snafu(display("validate {source}"))]
71    Validate {
72        #[snafu(source(from(validator::ValidationErrors, Box::new)))]
73        source: Box<validator::ValidationErrors>,
74    },
75    /// 其他无效参数或配置错误。
76    #[snafu(display("{message}"))]
77    Invalid { message: String },
78}
79
80type Result<T, E = Error> = std::result::Result<T, E>;
81
82impl From<Error> for BaseError {
83    fn from(val: Error) -> Self {
84        let err = match val {
85            Error::OpenDal { source } => BaseError::new(source).with_exception(true),
86            Error::Config { source } => BaseError::new(*source).with_sub_category("config"),
87            Error::ParseUri { source } => BaseError::new(*source)
88                .with_sub_category("parse_uri")
89                .with_exception(true),
90            Error::Validate { source } => BaseError::new(*source)
91                .with_sub_category("validate")
92                .with_exception(true),
93            Error::Invalid { message } => BaseError::new(message).with_exception(true),
94        };
95        err.with_category("open_dal")
96    }
97}
98
99/// S3 连接参数,从 URL 查询字符串中解析。
100#[derive(Deserialize, Debug, PartialEq)]
101struct S3Params {
102    bucket: String,
103    region: Option<String>,
104    access_key_id: Option<String>,
105    secret_access_key: Option<String>,
106}
107
108/// 从 S3 兼容 URL 创建 S3 存储后端。
109fn new_s3_dal(url: &str) -> Result<Storage> {
110    let parsed = parse_uri::<S3Params>(url).context(ParseUriSnafu)?;
111    let mut builder = opendal::services::S3::default().endpoint(&parsed.endpoint());
112    if let Some(path) = parsed.path {
113        builder = builder.root(path);
114    }
115    let query = parsed.query;
116    builder = builder.bucket(&query.bucket);
117    if let Some(region) = &query.region {
118        builder = builder.region(region);
119    }
120    if let Some(access_key_id) = &query.access_key_id {
121        builder = builder.access_key_id(access_key_id);
122    }
123    if let Some(secret_access_key) = &query.secret_access_key {
124        builder = builder.secret_access_key(secret_access_key);
125    }
126    let dal = opendal::Operator::new(builder)
127        .context(OpenDalSnafu)?
128        .layer(MimeGuessLayer::default())
129        .finish();
130    Ok(Storage::new(dal))
131}
132
133/// 从 MySQL 连接字符串创建 MySQL 存储后端,使用 `objects` 表存储对象数据。
134fn new_mysql_dal(url: &str) -> Result<Storage> {
135    let builder = opendal::services::Mysql::default()
136        .connection_string(url)
137        .table("objects");
138    let dal = Operator::new(builder)
139        .context(OpenDalSnafu)?
140        .layer(MimeGuessLayer::default())
141        .finish();
142    Ok(Storage::new(dal))
143}
144
145/// 将路径字符串规范化为绝对路径,支持 `~/` 家目录前缀展开。
146#[inline]
147fn resolve_path(path_str: &str) -> String {
148    if path_str.is_empty() {
149        return String::new();
150    }
151    let path = if let Some(stripped) = path_str.strip_prefix("~/") {
152        dirs::home_dir()
153            .map(|home| home.join(stripped))
154            .unwrap_or_else(|| PathBuf::from(path_str))
155    } else {
156        PathBuf::from(path_str)
157    };
158
159    path.absolutize().map_or_else(
160        |_| path.to_string_lossy().into_owned(),
161        |p| p.to_string_lossy().into_owned(),
162    )
163}
164
165/// 从 `file://` URL 创建本地文件系统存储后端,根路径需至少 2 个字符。
166fn new_fs_dal(url: &str) -> Result<Storage> {
167    let root = url.strip_prefix(FS_PREFIX).unwrap_or_default();
168    if root.len() < 2 {
169        return Err(Error::Invalid {
170            message: "root is empty".to_string(),
171        });
172    }
173    let builder = opendal::services::Fs::default().root(&resolve_path(root));
174    let dal = Operator::new(builder)
175        .context(OpenDalSnafu)?
176        .layer(MimeGuessLayer::default())
177        .finish();
178    Ok(Storage::new(dal))
179}
180
181/// 从 HTTP URL 创建只读 HTTP 存储后端。
182fn new_http_dal(url: &str) -> Result<Storage> {
183    let builder = opendal::services::Http::default().endpoint(url);
184    let dal = Operator::new(builder)
185        .context(OpenDalSnafu)?
186        .layer(MimeGuessLayer::default())
187        .finish();
188    Ok(Storage::new(dal))
189}
190
191/// 根据配置 URL 自动选择存储后端并创建 Storage 实例。
192/// - `mysql://` → MySQL 后端
193/// - `file://`  → 本地文件系统后端
194/// - `schema = "http"` → HTTP 只读后端
195/// - 其余 → S3 兼容后端
196pub fn new_opendal_storage(config: &Config) -> Result<Storage> {
197    let opendal_config = new_opendal_config(config)?;
198    let url = opendal_config.url.as_str();
199    new_opendal_storage_from_url(url, Some(&opendal_config.schema))
200}
201
202/// 根据 URL 和 schema 自动选择存储后端并创建 Storage 实例。
203pub fn new_opendal_storage_from_url(url: &str, schema: Option<&str>) -> Result<Storage> {
204    match url {
205        url if url.starts_with(MYSQL_PREFIX) => new_mysql_dal(url),
206        url if url.starts_with(FS_PREFIX) => new_fs_dal(url),
207        url if schema == Some("http") => new_http_dal(url),
208        _ => new_s3_dal(url),
209    }
210}