Skip to main content

iceberg_storage_opendal/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! OpenDAL-based storage implementation for Apache Iceberg.
19//!
20//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`],
21//! which implement the [`Storage`](iceberg::io::Storage) and
22//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate
23//! using [OpenDAL](https://opendal.apache.org/) as the backend.
24
25mod utils;
26
27use std::sync::Arc;
28
29use async_trait::async_trait;
30use bytes::Bytes;
31use cfg_if::cfg_if;
32use iceberg::io::{
33    FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig,
34    StorageFactory,
35};
36use iceberg::{Error, ErrorKind, Result};
37use opendal::Operator;
38use opendal::layers::RetryLayer;
39use serde::{Deserialize, Serialize};
40use utils::from_opendal_error;
41
42cfg_if! {
43    if #[cfg(feature = "opendal-azdls")] {
44        mod azdls;
45        use azdls::AzureStorageScheme;
46        use azdls::*;
47        use opendal::services::AzdlsConfig;
48    }
49}
50
51cfg_if! {
52    if #[cfg(feature = "opendal-fs")] {
53        mod fs;
54        use fs::*;
55    }
56}
57
58cfg_if! {
59    if #[cfg(feature = "opendal-gcs")] {
60        mod gcs;
61        use gcs::*;
62        use opendal::services::GcsConfig;
63    }
64}
65
66cfg_if! {
67    if #[cfg(feature = "opendal-memory")] {
68        mod memory;
69        use memory::*;
70    }
71}
72
73cfg_if! {
74    if #[cfg(feature = "opendal-oss")] {
75        mod oss;
76        use opendal::services::OssConfig;
77        use oss::*;
78    }
79}
80
81cfg_if! {
82    if #[cfg(feature = "opendal-s3")] {
83        mod s3;
84        use opendal::services::S3Config;
85        pub use s3::*;
86    }
87}
88
89/// OpenDAL-based storage factory.
90///
91/// Maps scheme to the corresponding OpenDalStorage storage variant.
92/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances.
93#[derive(Clone, Debug, Serialize, Deserialize)]
94pub enum OpenDalStorageFactory {
95    /// Memory storage factory.
96    #[cfg(feature = "opendal-memory")]
97    Memory,
98    /// Local filesystem storage factory.
99    #[cfg(feature = "opendal-fs")]
100    Fs,
101    /// S3 storage factory.
102    #[cfg(feature = "opendal-s3")]
103    S3 {
104        /// s3 storage could have `s3://` and `s3a://`.
105        /// Storing the scheme string here to return the correct path.
106        configured_scheme: String,
107        /// Custom AWS credential loader.
108        #[serde(skip)]
109        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
110    },
111    /// GCS storage factory.
112    #[cfg(feature = "opendal-gcs")]
113    Gcs,
114    /// OSS storage factory.
115    #[cfg(feature = "opendal-oss")]
116    Oss,
117    /// Azure Data Lake Storage factory.
118    #[cfg(feature = "opendal-azdls")]
119    Azdls {
120        /// The configured Azure storage scheme.
121        configured_scheme: AzureStorageScheme,
122    },
123}
124
125#[typetag::serde(name = "OpenDalStorageFactory")]
126impl StorageFactory for OpenDalStorageFactory {
127    #[allow(unused_variables)]
128    fn build(&self, config: &StorageConfig) -> Result<Arc<dyn Storage>> {
129        match self {
130            #[cfg(feature = "opendal-memory")]
131            OpenDalStorageFactory::Memory => {
132                Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?)))
133            }
134            #[cfg(feature = "opendal-fs")]
135            OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)),
136            #[cfg(feature = "opendal-s3")]
137            OpenDalStorageFactory::S3 {
138                configured_scheme,
139                customized_credential_load,
140            } => Ok(Arc::new(OpenDalStorage::S3 {
141                configured_scheme: configured_scheme.clone(),
142                config: s3_config_parse(config.props().clone())?.into(),
143                customized_credential_load: customized_credential_load.clone(),
144            })),
145            #[cfg(feature = "opendal-gcs")]
146            OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs {
147                config: gcs_config_parse(config.props().clone())?.into(),
148            })),
149            #[cfg(feature = "opendal-oss")]
150            OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss {
151                config: oss_config_parse(config.props().clone())?.into(),
152            })),
153            #[cfg(feature = "opendal-azdls")]
154            OpenDalStorageFactory::Azdls { configured_scheme } => {
155                Ok(Arc::new(OpenDalStorage::Azdls {
156                    configured_scheme: configured_scheme.clone(),
157                    config: azdls_config_parse(config.props().clone())?.into(),
158                }))
159            }
160            #[cfg(all(
161                not(feature = "opendal-memory"),
162                not(feature = "opendal-fs"),
163                not(feature = "opendal-s3"),
164                not(feature = "opendal-gcs"),
165                not(feature = "opendal-oss"),
166                not(feature = "opendal-azdls"),
167            ))]
168            _ => Err(Error::new(
169                ErrorKind::FeatureUnsupported,
170                "No storage service has been enabled",
171            )),
172        }
173    }
174}
175
176/// Default memory operator for serde deserialization.
177#[cfg(feature = "opendal-memory")]
178fn default_memory_operator() -> Operator {
179    memory_config_build().expect("Failed to create default memory operator")
180}
181
182/// OpenDAL-based storage implementation.
183#[derive(Clone, Debug, Serialize, Deserialize)]
184pub enum OpenDalStorage {
185    /// Memory storage variant.
186    #[cfg(feature = "opendal-memory")]
187    Memory(#[serde(skip, default = "self::default_memory_operator")] Operator),
188    /// Local filesystem storage variant.
189    #[cfg(feature = "opendal-fs")]
190    LocalFs,
191    /// S3 storage variant.
192    #[cfg(feature = "opendal-s3")]
193    S3 {
194        /// s3 storage could have `s3://` and `s3a://`.
195        /// Storing the scheme string here to return the correct path.
196        configured_scheme: String,
197        /// S3 configuration.
198        config: Arc<S3Config>,
199        /// Custom AWS credential loader.
200        #[serde(skip)]
201        customized_credential_load: Option<s3::CustomAwsCredentialLoader>,
202    },
203    /// GCS storage variant.
204    #[cfg(feature = "opendal-gcs")]
205    Gcs {
206        /// GCS configuration.
207        config: Arc<GcsConfig>,
208    },
209    /// OSS storage variant.
210    #[cfg(feature = "opendal-oss")]
211    Oss {
212        /// OSS configuration.
213        config: Arc<OssConfig>,
214    },
215    /// Azure Data Lake Storage variant.
216    /// Expects paths of the form
217    /// `abfs[s]://<filesystem>@<account>.dfs.<endpoint-suffix>/<path>` or
218    /// `wasb[s]://<container>@<account>.blob.<endpoint-suffix>/<path>`.
219    #[cfg(feature = "opendal-azdls")]
220    #[allow(private_interfaces)]
221    Azdls {
222        /// The configured Azure storage scheme.
223        /// Because Azdls accepts multiple possible schemes, we store the full
224        /// passed scheme here to later validate schemes passed via paths.
225        configured_scheme: AzureStorageScheme,
226        /// Azure DLS configuration.
227        config: Arc<AzdlsConfig>,
228    },
229}
230
231impl OpenDalStorage {
232    /// Creates operator from path.
233    ///
234    /// # Arguments
235    ///
236    /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO).
237    ///
238    /// # Returns
239    ///
240    /// The return value consists of two parts:
241    ///
242    /// * An [`opendal::Operator`] instance used to operate on file.
243    /// * Relative path to the root uri of [`opendal::Operator`].
244    #[allow(unreachable_code, unused_variables)]
245    pub(crate) fn create_operator<'a>(
246        &self,
247        path: &'a impl AsRef<str>,
248    ) -> Result<(Operator, &'a str)> {
249        let path = path.as_ref();
250        let (operator, relative_path): (Operator, &str) = match self {
251            #[cfg(feature = "opendal-memory")]
252            OpenDalStorage::Memory(op) => {
253                if let Some(stripped) = path.strip_prefix("memory:/") {
254                    (op.clone(), stripped)
255                } else {
256                    (op.clone(), &path[1..])
257                }
258            }
259            #[cfg(feature = "opendal-fs")]
260            OpenDalStorage::LocalFs => {
261                let op = fs_config_build()?;
262                if let Some(stripped) = path.strip_prefix("file:/") {
263                    (op, stripped)
264                } else {
265                    (op, &path[1..])
266                }
267            }
268            #[cfg(feature = "opendal-s3")]
269            OpenDalStorage::S3 {
270                configured_scheme,
271                config,
272                customized_credential_load,
273            } => {
274                let op = s3_config_build(config, customized_credential_load, path)?;
275                let op_info = op.info();
276
277                // Check prefix of s3 path.
278                let prefix = format!("{}://{}/", configured_scheme, op_info.name());
279                if path.starts_with(&prefix) {
280                    (op, &path[prefix.len()..])
281                } else {
282                    return Err(Error::new(
283                        ErrorKind::DataInvalid,
284                        format!("Invalid s3 url: {path}, should start with {prefix}"),
285                    ));
286                }
287            }
288            #[cfg(feature = "opendal-gcs")]
289            OpenDalStorage::Gcs { config } => {
290                let operator = gcs_config_build(config, path)?;
291                let prefix = format!("gs://{}/", operator.info().name());
292                if path.starts_with(&prefix) {
293                    (operator, &path[prefix.len()..])
294                } else {
295                    return Err(Error::new(
296                        ErrorKind::DataInvalid,
297                        format!("Invalid gcs url: {path}, should start with {prefix}"),
298                    ));
299                }
300            }
301            #[cfg(feature = "opendal-oss")]
302            OpenDalStorage::Oss { config } => {
303                let op = oss_config_build(config, path)?;
304                let prefix = format!("oss://{}/", op.info().name());
305                if path.starts_with(&prefix) {
306                    (op, &path[prefix.len()..])
307                } else {
308                    return Err(Error::new(
309                        ErrorKind::DataInvalid,
310                        format!("Invalid oss url: {path}, should start with {prefix}"),
311                    ));
312                }
313            }
314            #[cfg(feature = "opendal-azdls")]
315            OpenDalStorage::Azdls {
316                configured_scheme,
317                config,
318            } => azdls_create_operator(path, config, configured_scheme)?,
319            #[cfg(all(
320                not(feature = "opendal-s3"),
321                not(feature = "opendal-fs"),
322                not(feature = "opendal-gcs"),
323                not(feature = "opendal-oss"),
324                not(feature = "opendal-azdls"),
325            ))]
326            _ => {
327                return Err(Error::new(
328                    ErrorKind::FeatureUnsupported,
329                    "No storage service has been enabled",
330                ));
331            }
332        };
333
334        // Transient errors are common for object stores; however there's no
335        // harm in retrying temporary failures for other storage backends as well.
336        let operator = operator.layer(RetryLayer::new());
337        Ok((operator, relative_path))
338    }
339}
340
341#[typetag::serde(name = "OpenDalStorage")]
342#[async_trait]
343impl Storage for OpenDalStorage {
344    async fn exists(&self, path: &str) -> Result<bool> {
345        let (op, relative_path) = self.create_operator(&path)?;
346        Ok(op.exists(relative_path).await.map_err(from_opendal_error)?)
347    }
348
349    async fn metadata(&self, path: &str) -> Result<FileMetadata> {
350        let (op, relative_path) = self.create_operator(&path)?;
351        let meta = op.stat(relative_path).await.map_err(from_opendal_error)?;
352        Ok(FileMetadata {
353            size: meta.content_length(),
354        })
355    }
356
357    async fn read(&self, path: &str) -> Result<Bytes> {
358        let (op, relative_path) = self.create_operator(&path)?;
359        Ok(op
360            .read(relative_path)
361            .await
362            .map_err(from_opendal_error)?
363            .to_bytes())
364    }
365
366    async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
367        let (op, relative_path) = self.create_operator(&path)?;
368        Ok(Box::new(OpenDalReader(
369            op.reader(relative_path).await.map_err(from_opendal_error)?,
370        )))
371    }
372
373    async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
374        let (op, relative_path) = self.create_operator(&path)?;
375        op.write(relative_path, bs)
376            .await
377            .map_err(from_opendal_error)?;
378        Ok(())
379    }
380
381    async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
382        let (op, relative_path) = self.create_operator(&path)?;
383        Ok(Box::new(OpenDalWriter(
384            op.writer(relative_path).await.map_err(from_opendal_error)?,
385        )))
386    }
387
388    async fn delete(&self, path: &str) -> Result<()> {
389        let (op, relative_path) = self.create_operator(&path)?;
390        Ok(op.delete(relative_path).await.map_err(from_opendal_error)?)
391    }
392
393    async fn delete_prefix(&self, path: &str) -> Result<()> {
394        let (op, relative_path) = self.create_operator(&path)?;
395        let path = if relative_path.ends_with('/') {
396            relative_path.to_string()
397        } else {
398            format!("{relative_path}/")
399        };
400        Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
401    }
402
403    #[allow(unreachable_code, unused_variables)]
404    fn new_input(&self, path: &str) -> Result<InputFile> {
405        Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
406    }
407
408    #[allow(unreachable_code, unused_variables)]
409    fn new_output(&self, path: &str) -> Result<OutputFile> {
410        Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
411    }
412}
413
414// Newtype wrappers for opendal types to satisfy orphan rules.
415// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's
416// Reader/Writer since neither trait nor type is defined in this crate.
417
418/// Wrapper around `opendal::Reader` that implements `FileRead`.
419pub(crate) struct OpenDalReader(pub(crate) opendal::Reader);
420
421#[async_trait]
422impl FileRead for OpenDalReader {
423    async fn read(&self, range: std::ops::Range<u64>) -> Result<Bytes> {
424        Ok(opendal::Reader::read(&self.0, range)
425            .await
426            .map_err(from_opendal_error)?
427            .to_bytes())
428    }
429}
430
431/// Wrapper around `opendal::Writer` that implements `FileWrite`.
432pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer);
433
434#[async_trait]
435impl FileWrite for OpenDalWriter {
436    async fn write(&mut self, bs: Bytes) -> Result<()> {
437        Ok(opendal::Writer::write(&mut self.0, bs)
438            .await
439            .map_err(from_opendal_error)?)
440    }
441
442    async fn close(&mut self) -> Result<()> {
443        let _ = opendal::Writer::close(&mut self.0)
444            .await
445            .map_err(from_opendal_error)?;
446        Ok(())
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[cfg(feature = "opendal-memory")]
455    #[test]
456    fn test_default_memory_operator() {
457        let op = default_memory_operator();
458        assert_eq!(op.info().scheme().to_string(), "memory");
459    }
460}