1use opendal::Operator;
16use opendal::layers::MimeGuessLayer;
17use path_absolutize::Absolutize;
18use serde::Deserialize;
19use snafu::{ResultExt, Snafu};
20use std::path::PathBuf;
21use tibba_config::Config;
22use tibba_error::Error as BaseError;
23use tibba_util::parse_uri;
24use validator::Validate;
25
26mod storage;
27
28pub use storage::*;
29
30const MYSQL_PREFIX: &str = "mysql://";
32const FS_PREFIX: &str = "file://";
34
35#[derive(Debug, Clone, Validate, Deserialize, Default)]
37pub struct OpenDalConfig {
38 #[validate(length(min = 10))]
39 pub url: String,
40 #[serde(default)]
41 pub schema: String,
42}
43
44fn new_opendal_config(config: &Config) -> Result<OpenDalConfig> {
46 let open_dal_config = config
47 .try_deserialize::<OpenDalConfig>()
48 .context(ConfigSnafu)?;
49 open_dal_config.validate().context(ValidateSnafu)?;
50 Ok(open_dal_config)
51}
52
53#[derive(Debug, Snafu)]
54pub enum Error {
55 #[snafu(display("open dal {source}"))]
56 OpenDal {
57 #[snafu(source(from(opendal::Error, Box::new)))]
58 source: Box<opendal::Error>,
59 },
60 #[snafu(display("config error: {source}"))]
61 Config {
62 #[snafu(source(from(tibba_config::Error, Box::new)))]
63 source: Box<tibba_config::Error>,
64 },
65 #[snafu(display("parse uri error: {source}"))]
66 ParseUri {
67 #[snafu(source(from(tibba_util::Error, Box::new)))]
68 source: Box<tibba_util::Error>,
69 },
70 #[snafu(display("validate {source}"))]
71 Validate {
72 #[snafu(source(from(validator::ValidationErrors, Box::new)))]
73 source: Box<validator::ValidationErrors>,
74 },
75 #[snafu(display("{message}"))]
77 Invalid { message: String },
78}
79
80type Result<T, E = Error> = std::result::Result<T, E>;
81
82impl From<Error> for BaseError {
83 fn from(val: Error) -> Self {
84 let err = match val {
85 Error::OpenDal { source } => BaseError::new(source).with_exception(true),
86 Error::Config { source } => BaseError::new(*source).with_sub_category("config"),
87 Error::ParseUri { source } => BaseError::new(*source)
88 .with_sub_category("parse_uri")
89 .with_exception(true),
90 Error::Validate { source } => BaseError::new(*source)
91 .with_sub_category("validate")
92 .with_exception(true),
93 Error::Invalid { message } => BaseError::new(message).with_exception(true),
94 };
95 err.with_category("open_dal")
96 }
97}
98
99#[derive(Deserialize, Debug, PartialEq)]
101struct S3Params {
102 bucket: String,
103 region: Option<String>,
104 access_key_id: Option<String>,
105 secret_access_key: Option<String>,
106}
107
108fn new_s3_dal(url: &str) -> Result<Storage> {
110 let parsed = parse_uri::<S3Params>(url).context(ParseUriSnafu)?;
111 let mut builder = opendal::services::S3::default().endpoint(&parsed.endpoint());
112 if let Some(path) = parsed.path {
113 builder = builder.root(path);
114 }
115 let query = parsed.query;
116 builder = builder.bucket(&query.bucket);
117 if let Some(region) = &query.region {
118 builder = builder.region(region);
119 }
120 if let Some(access_key_id) = &query.access_key_id {
121 builder = builder.access_key_id(access_key_id);
122 }
123 if let Some(secret_access_key) = &query.secret_access_key {
124 builder = builder.secret_access_key(secret_access_key);
125 }
126 let dal = opendal::Operator::new(builder)
127 .context(OpenDalSnafu)?
128 .layer(MimeGuessLayer::default())
129 .finish();
130 Ok(Storage::new(dal))
131}
132
133fn new_mysql_dal(url: &str) -> Result<Storage> {
135 let builder = opendal::services::Mysql::default()
136 .connection_string(url)
137 .table("objects");
138 let dal = Operator::new(builder)
139 .context(OpenDalSnafu)?
140 .layer(MimeGuessLayer::default())
141 .finish();
142 Ok(Storage::new(dal))
143}
144
145#[inline]
147fn resolve_path(path_str: &str) -> String {
148 if path_str.is_empty() {
149 return String::new();
150 }
151 let path = if let Some(stripped) = path_str.strip_prefix("~/") {
152 dirs::home_dir()
153 .map(|home| home.join(stripped))
154 .unwrap_or_else(|| PathBuf::from(path_str))
155 } else {
156 PathBuf::from(path_str)
157 };
158
159 path.absolutize().map_or_else(
160 |_| path.to_string_lossy().into_owned(),
161 |p| p.to_string_lossy().into_owned(),
162 )
163}
164
165fn new_fs_dal(url: &str) -> Result<Storage> {
167 let root = url.strip_prefix(FS_PREFIX).unwrap_or_default();
168 if root.len() < 2 {
169 return Err(Error::Invalid {
170 message: "root is empty".to_string(),
171 });
172 }
173 let builder = opendal::services::Fs::default().root(&resolve_path(root));
174 let dal = Operator::new(builder)
175 .context(OpenDalSnafu)?
176 .layer(MimeGuessLayer::default())
177 .finish();
178 Ok(Storage::new(dal))
179}
180
181fn new_http_dal(url: &str) -> Result<Storage> {
183 let builder = opendal::services::Http::default().endpoint(url);
184 let dal = Operator::new(builder)
185 .context(OpenDalSnafu)?
186 .layer(MimeGuessLayer::default())
187 .finish();
188 Ok(Storage::new(dal))
189}
190
191pub fn new_opendal_storage(config: &Config) -> Result<Storage> {
197 let opendal_config = new_opendal_config(config)?;
198 let url = opendal_config.url.as_str();
199 new_opendal_storage_from_url(url, Some(&opendal_config.schema))
200}
201
202pub fn new_opendal_storage_from_url(url: &str, schema: Option<&str>) -> Result<Storage> {
204 match url {
205 url if url.starts_with(MYSQL_PREFIX) => new_mysql_dal(url),
206 url if url.starts_with(FS_PREFIX) => new_fs_dal(url),
207 url if schema == Some("http") => new_http_dal(url),
208 _ => new_s3_dal(url),
209 }
210}