1mod utils;
26
27use std::sync::Arc;
28
29use async_trait::async_trait;
30use bytes::Bytes;
31use cfg_if::cfg_if;
32use iceberg::io::{
33 FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
34 StorageFactory,
35};
36use iceberg::{Error, ErrorKind, Result};
37use opendal::Operator;
38use opendal::layers::RetryLayer;
39use serde::{Deserialize, Serialize};
40use utils::from_opendal_error;
41
42cfg_if! {
43 if #[cfg(feature = "opendal-azdls")] {
44 mod azdls;
45 use azdls::AzureStorageScheme;
46 use azdls::*;
47 use opendal::services::AzdlsConfig;
48 }
49}
50
51cfg_if! {
52 if #[cfg(feature = "opendal-fs")] {
53 mod fs;
54 use fs::*;
55 }
56}
57
58cfg_if! {
59 if #[cfg(feature = "opendal-gcs")] {
60 mod gcs;
61 use gcs::*;
62 use opendal::services::GcsConfig;
63 }
64}
65
66cfg_if! {
67 if #[cfg(feature = "opendal-memory")] {
68 mod memory;
69 use memory::*;
70 }
71}
72
73cfg_if! {
74 if #[cfg(feature = "opendal-oss")] {
75 mod oss;
76 use opendal::services::OssConfig;
77 use oss::*;
78 }
79}
80
81cfg_if! {
82 if #[cfg(feature = "opendal-s3")] {
83 mod s3;
84 use opendal::services::S3Config;
85 pub use s3::*;
86 }
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
94pub enum OpenDalStorageFactory {
95 #[cfg(feature = "opendal-memory")]
97 Memory,
98 #[cfg(feature = "opendal-fs")]
100 Fs,
101 #[cfg(feature = "opendal-s3")]
103 S3 {
104 configured_scheme: String,
107 #[serde(skip)]
109 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
110 },
111 #[cfg(feature = "opendal-gcs")]
113 Gcs,
114 #[cfg(feature = "opendal-oss")]
116 Oss,
117 #[cfg(feature = "opendal-azdls")]
119 Azdls {
120 configured_scheme: AzureStorageScheme,
122 },
123}
124
125#[typetag::serde(name = "OpenDalStorageFactory")]
126impl StorageFactory for OpenDalStorageFactory {
127 #[allow(unused_variables)]
128 fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
129 match self {
130 #[cfg(feature = "opendal-memory")]
131 OpenDalStorageFactory::Memory => {
132 Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
133 }
134 #[cfg(feature = "opendal-fs")]
135 OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
136 #[cfg(feature = "opendal-s3")]
137 OpenDalStorageFactory::S3 {
138 configured_scheme,
139 customized_credential_load,
140 } => Ok(Arc::new(OpenDalStorage::S3 {
141 configured_scheme: configured_scheme.clone(),
142 config: s3_config_parse(config.props().clone())?.into(),
143 customized_credential_load: customized_credential_load.clone(),
144 })),
145 #[cfg(feature = "opendal-gcs")]
146 OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
147 config: gcs_config_parse(config.props().clone())?.into(),
148 })),
149 #[cfg(feature = "opendal-oss")]
150 OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
151 config: oss_config_parse(config.props().clone())?.into(),
152 })),
153 #[cfg(feature = "opendal-azdls")]
154 OpenDalStorageFactory::Azdls { configured_scheme } => {
155 Ok(Arc::new(OpenDalStorage::Azdls {
156 configured_scheme: configured_scheme.clone(),
157 config: azdls_config_parse(config.props().clone())?.into(),
158 }))
159 }
160 #[cfg(all(
161 not(feature = "opendal-memory"),
162 not(feature = "opendal-fs"),
163 not(feature = "opendal-s3"),
164 not(feature = "opendal-gcs"),
165 not(feature = "opendal-oss"),
166 not(feature = "opendal-azdls"),
167 ))]
168 _ => Err(Error::new(
169 ErrorKind::FeatureUnsupported,
170 "No storage service has been enabled",
171 )),
172 }
173 }
174}
175
176#[cfg(feature = "opendal-memory")]
178fn default_memory_operator() -> Operator {
179 memory_config_build().expect("Failed to create default memory operator")
180}
181
182#[derive(Clone, Debug, Serialize, Deserialize)]
184pub enum OpenDalStorage {
185 #[cfg(feature = "opendal-memory")]
187 Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
188 #[cfg(feature = "opendal-fs")]
190 LocalFs,
191 #[cfg(feature = "opendal-s3")]
193 S3 {
194 configured_scheme: String,
197 config: Arc<S3Config>,
199 #[serde(skip)]
201 customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
202 },
203 #[cfg(feature = "opendal-gcs")]
205 Gcs {
206 config: Arc<GcsConfig>,
208 },
209 #[cfg(feature = "opendal-oss")]
211 Oss {
212 config: Arc<OssConfig>,
214 },
215 #[cfg(feature = "opendal-azdls")]
220 #[allow(private_interfaces)]
221 Azdls {
222 configured_scheme: AzureStorageScheme,
226 config: Arc<AzdlsConfig>,
228 },
229}
230
231impl OpenDalStorage {
232 #[allow(unreachable_code, unused_variables)]
245 pub(crate) fn create_operator<'a>(
246 &self,
247 path: &'a impl AsRef<str>,
248 ) -> Result<(Operator, &'a str)> {
249 let path = path.as_ref();
250 let (operator, relative_path): (Operator, &str) = match self {
251 #[cfg(feature = "opendal-memory")]
252 OpenDalStorage::Memory(op) => {
253 if let Some(stripped) = path.strip_prefix("memory:/") {
254 (op.clone(), stripped)
255 } else {
256 (op.clone(), &path[1..])
257 }
258 }
259 #[cfg(feature = "opendal-fs")]
260 OpenDalStorage::LocalFs => {
261 let op = fs_config_build()?;
262 if let Some(stripped) = path.strip_prefix("file:/") {
263 (op, stripped)
264 } else {
265 (op, &path[1..])
266 }
267 }
268 #[cfg(feature = "opendal-s3")]
269 OpenDalStorage::S3 {
270 configured_scheme,
271 config,
272 customized_credential_load,
273 } => {
274 let op = s3_config_build(config, customized_credential_load, path)?;
275 let op_info = op.info();
276
277 let prefix = format!("{}://{}/", configured_scheme, op_info.name());
279 if path.starts_with(&prefix) {
280 (op, &path[prefix.len()..])
281 } else {
282 return Err(Error::new(
283 ErrorKind::DataInvalid,
284 format!("Invalid s3 url: {path}, should start with {prefix}"),
285 ));
286 }
287 }
288 #[cfg(feature = "opendal-gcs")]
289 OpenDalStorage::Gcs { config } => {
290 let operator = gcs_config_build(config, path)?;
291 let prefix = format!("gs://{}/", operator.info().name());
292 if path.starts_with(&prefix) {
293 (operator, &path[prefix.len()..])
294 } else {
295 return Err(Error::new(
296 ErrorKind::DataInvalid,
297 format!("Invalid gcs url: {path}, should start with {prefix}"),
298 ));
299 }
300 }
301 #[cfg(feature = "opendal-oss")]
302 OpenDalStorage::Oss { config } => {
303 let op = oss_config_build(config, path)?;
304 let prefix = format!("oss://{}/", op.info().name());
305 if path.starts_with(&prefix) {
306 (op, &path[prefix.len()..])
307 } else {
308 return Err(Error::new(
309 ErrorKind::DataInvalid,
310 format!("Invalid oss url: {path}, should start with {prefix}"),
311 ));
312 }
313 }
314 #[cfg(feature = "opendal-azdls")]
315 OpenDalStorage::Azdls {
316 configured_scheme,
317 config,
318 } => azdls_create_operator(path, config, configured_scheme)?,
319 #[cfg(all(
320 not(feature = "opendal-s3"),
321 not(feature = "opendal-fs"),
322 not(feature = "opendal-gcs"),
323 not(feature = "opendal-oss"),
324 not(feature = "opendal-azdls"),
325 ))]
326 _ => {
327 return Err(Error::new(
328 ErrorKind::FeatureUnsupported,
329 "No storage service has been enabled",
330 ));
331 }
332 };
333
334 let operator = operator.layer(RetryLayer::new());
337 Ok((operator, relative_path))
338 }
339}
340
341#[typetag::serde(name = "OpenDalStorage")]
342#[async_trait]
343impl Storage for OpenDalStorage {
344 async fn exists(&self, path: &str) -> Result<bool> {
345 let (op, relative_path) = self.create_operator(&path)?;
346 Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
347 }
348
349 async fn metadata(&self, path: &str) -> Result<FileMetadata> {
350 let (op, relative_path) = self.create_operator(&path)?;
351 let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
352 Ok(FileMetadata {
353 size: meta.content_length(),
354 })
355 }
356
357 async fn read(&self, path: &str) -> Result<Bytes> {
358 let (op, relative_path) = self.create_operator(&path)?;
359 Ok(op
360 .read(relative_path)
361 .await
362 .map_err(from_opendal_error)?
363 .to_bytes())
364 }
365
366 async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
367 let (op, relative_path) = self.create_operator(&path)?;
368 Ok(Box::new(OpenDalReader(
369 op.reader(relative_path).await.map_err(from_opendal_error)?,
370 )))
371 }
372
373 async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
374 let (op, relative_path) = self.create_operator(&path)?;
375 op.write(relative_path, bs)
376 .await
377 .map_err(from_opendal_error)?;
378 Ok(())
379 }
380
381 async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
382 let (op, relative_path) = self.create_operator(&path)?;
383 Ok(Box::new(OpenDalWriter(
384 op.writer(relative_path).await.map_err(from_opendal_error)?,
385 )))
386 }
387
388 async fn delete(&self, path: &str) -> Result<()> {
389 let (op, relative_path) = self.create_operator(&path)?;
390 Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
391 }
392
393 async fn delete_prefix(&self, path: &str) -> Result<()> {
394 let (op, relative_path) = self.create_operator(&path)?;
395 let path = if relative_path.ends_with('/') {
396 relative_path.to_string()
397 } else {
398 format!("{relative_path}/")
399 };
400 Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
401 }
402
403 #[allow(unreachable_code, unused_variables)]
404 fn new_input(&self, path: &str) -> Result<InputFile> {
405 Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
406 }
407
408 #[allow(unreachable_code, unused_variables)]
409 fn new_output(&self, path: &str) -> Result<OutputFile> {
410 Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
411 }
412}
413
414pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
420
421#[async_trait]
422impl FileRead for OpenDalReader {
423 async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
424 Ok(opendal::Reader::read(&self.0, range)
425 .await
426 .map_err(from_opendal_error)?
427 .to_bytes())
428 }
429}
430
431pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
433
434#[async_trait]
435impl FileWrite for OpenDalWriter {
436 async fn write(&mut self, bs: Bytes) -> Result<()> {
437 Ok(opendal::Writer::write(&mut self.0, bs)
438 .await
439 .map_err(from_opendal_error)?)
440 }
441
442 async fn close(&mut self) -> Result<()> {
443 let _ = opendal::Writer::close(&mut self.0)
444 .await
445 .map_err(from_opendal_error)?;
446 Ok(())
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[cfg(feature = "opendal-memory")]
455 #[test]
456 fn test_default_memory_operator() {
457 let op = default_memory_operator();
458 assert_eq!(op.info().scheme().to_string(), "memory");
459 }
460}