1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! Object storage backend abstraction layer for Delta Table transaction logs and data

use std::fmt::Debug;
use std::pin::Pin;

use chrono::{DateTime, Utc};
use futures::Stream;

#[cfg(feature = "azure")]
use azure_core::errors::AzureError;
#[cfg(feature = "azure")]
use std::error::Error;

#[cfg(feature = "azure")]
pub mod azure;
pub mod file;
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
pub mod s3;

/// Error enum that represents an invalid URI.
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum UriError {
    /// Error returned when the URI contains a scheme that is not handled.
    #[error("Invalid URI scheme: {0}")]
    InvalidScheme(String),
    /// Error returned when a local file system path is expected, but the URI is not a local file system path.
    #[error("Expected local path URI, found: {0}")]
    ExpectedSLocalPathUri(String),

    /// Error returned when the URI is expected to be an S3 path, but does not include a bucket part.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Object URI missing bucket")]
    MissingObjectBucket,
    /// Error returned when the URI is expected to be an S3 path, but does not include a key part.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Object URI missing key")]
    MissingObjectKey,
    /// Error returned when an S3 path is expected, but the URI is not an S3 URI.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Expected S3 URI, found: {0}")]
    ExpectedS3Uri(String),

    /// Error returned when an Azure URI is expected, but the URI is not an Azure file system
    /// (abfs\[s\]) URI.
    #[cfg(feature = "azure")]
    #[error("Expected Azure URI, found: {0}")]
    ExpectedAzureUri(String),
    /// Error returned when an Azure URI is expected, but the URI is missing the scheme.
    #[cfg(feature = "azure")]
    #[error("Object URI missing filesystem")]
    MissingObjectFileSystem,
    /// Error returned when an Azure URI is expected, but the URI is missing the account name and
    /// path.
    #[cfg(feature = "azure")]
    #[error("Object URI missing account name and path")]
    MissingObjectAccountAndPath,
    /// Error returned when an Azure URI is expected, but the URI is missing the account name.
    #[cfg(feature = "azure")]
    #[error("Object URI missing account name")]
    MissingObjectAccountName,
    /// Error returned when an Azure URI is expected, but the URI is missing the path.
    #[cfg(feature = "azure")]
    #[error("Object URI missing path")]
    MissingObjectPath,
    /// Error returned when container in an Azure URI doesn't match the expected value
    #[cfg(feature = "azure")]
    #[error("Container mismatch, expected: {expected}, got: {got}")]
    ContainerMismatch {
        /// Expected container value
        expected: String,
        /// Actual container value
        got: String,
    },
}

/// Enum with variants representing each supported storage backend.
#[derive(Debug)]
pub enum Uri<'a> {
    /// URI for local file system backend.
    LocalPath(&'a str),
    /// URI for S3 backend.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    S3Object(s3::S3Object<'a>),
    /// URI for Azure backend.
    #[cfg(feature = "azure")]
    AdlsGen2Object(azure::AdlsGen2Object<'a>),
}

impl<'a> Uri<'a> {
    /// Converts the URI to an S3Object. Returns UriError if the URI is not valid for the S3
    /// backend.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    pub fn into_s3object(self) -> Result<s3::S3Object<'a>, UriError> {
        match self {
            Uri::S3Object(x) => Ok(x),
            #[cfg(feature = "azure")]
            Uri::AdlsGen2Object(x) => Err(UriError::ExpectedS3Uri(x.to_string())),
            Uri::LocalPath(x) => Err(UriError::ExpectedS3Uri(x.to_string())),
        }
    }

    /// Converts the URI to an AdlsGen2Object. Returns UriError if the URI is not valid for the
    /// Azure backend.
    #[cfg(feature = "azure")]
    pub fn into_adlsgen2_object(self) -> Result<azure::AdlsGen2Object<'a>, UriError> {
        match self {
            Uri::AdlsGen2Object(x) => Ok(x),
            #[cfg(any(feature = "s3", feature = "s3-rustls"))]
            Uri::S3Object(x) => Err(UriError::ExpectedAzureUri(x.to_string())),
            Uri::LocalPath(x) => Err(UriError::ExpectedAzureUri(x.to_string())),
        }
    }

    /// Converts the URI to an str representing a local file system path. Returns UriError if the
    /// URI is not valid for the file storage backend.
    pub fn into_localpath(self) -> Result<&'a str, UriError> {
        match self {
            Uri::LocalPath(x) => Ok(x),
            #[cfg(any(feature = "s3", feature = "s3-rustls"))]
            Uri::S3Object(x) => Err(UriError::ExpectedSLocalPathUri(format!("{}", x))),
            #[cfg(feature = "azure")]
            Uri::AdlsGen2Object(x) => Err(UriError::ExpectedSLocalPathUri(format!("{}", x))),
        }
    }

    /// Return URI path component as String
    #[inline]
    pub fn path(&self) -> String {
        match self {
            Uri::LocalPath(x) => x.to_string(),
            #[cfg(any(feature = "s3", feature = "s3-rustls"))]
            Uri::S3Object(x) => x.key.to_string(),
            #[cfg(feature = "azure")]
            Uri::AdlsGen2Object(x) => x.path.to_string(),
        }
    }
}

/// Parses the URI and returns a variant of the Uri enum for the appropriate storage backend based
/// on scheme.
pub fn parse_uri<'a>(path: &'a str) -> Result<Uri<'a>, UriError> {
    let parts: Vec<&'a str> = path.split("://").collect();

    if parts.len() == 1 {
        return Ok(Uri::LocalPath(parts[0]));
    }

    match parts[0] {
        "s3" => {
            cfg_if::cfg_if! {
                if #[cfg(any(feature = "s3", feature = "s3-rustls"))] {
                    let mut path_parts = parts[1].splitn(2, '/');
                    let bucket = match path_parts.next() {
                        Some(x) => x,
                        None => {
                            return Err(UriError::MissingObjectBucket);
                        }
                    };
                    let key = match path_parts.next() {
                        Some(x) => x,
                        None => {
                            return Err(UriError::MissingObjectKey);
                        }
                    };

                    Ok(Uri::S3Object(s3::S3Object { bucket, key }))
                } else {
                    Err(UriError::InvalidScheme(String::from(parts[0])))
                }
            }
        }
        "file" => Ok(Uri::LocalPath(parts[1])),
        "abfss" => {
            cfg_if::cfg_if! {
                if #[cfg(feature = "azure")] {
                    // URI scheme: abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>/<file_name>
                    let mut parts = parts[1].splitn(2, '@');
                    let file_system = parts.next().ok_or(UriError::MissingObjectFileSystem)?;
                    let mut parts = parts.next().map(|x| x.splitn(2, '.')).ok_or(UriError::MissingObjectAccountAndPath)?;
                    let account_name = parts.next().ok_or(UriError::MissingObjectAccountName)?;
                    let mut paths = parts.next().map(|x| x.splitn(2, '/')).ok_or(UriError::MissingObjectPath)?;
                    // assume root when uri ends without `/`
                    let path = paths.nth(1).unwrap_or("");
                    Ok(Uri::AdlsGen2Object(azure::AdlsGen2Object { account_name, file_system, path }))
                } else {
                    Err(UriError::InvalidScheme(String::from(parts[0])))
                }
            }
        }
        _ => Err(UriError::InvalidScheme(String::from(parts[0]))),
    }
}

/// Error enum returned when storage backend interaction fails.
#[derive(thiserror::Error, Debug)]
pub enum StorageError {
    /// The requested object does not exist.
    #[error("Object not found")]
    NotFound,
    /// The object written to the storage backend already exists.
    /// This error is expected in some cases.
    /// For example, optimistic concurrency writes between multiple processes expect to compete
    /// for the same URI when writing to _delta_log.
    #[error("Object exists already at path: {0}")]
    AlreadyExists(String),
    /// An IO error occurred while reading from the local file system.
    #[error("Failed to read local object content: {source}")]
    Io {
        /// The raw error returned when trying to read the local file.
        source: std::io::Error,
    },
    /// The file system represented by the scheme is not known.
    #[error("File system not supported")]
    FileSystemNotSupported,
    /// Wraps a generic storage backend error. The wrapped string contains the details.
    #[error("Generic error: {0}")]
    Generic(String),

    /// Error representing an S3 GET failure.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to read S3 object content: {source}")]
    S3Get {
        /// The underlying Rusoto S3 error.
        source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
    },
    /// Error representing a failure when executing an S3 HEAD request.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to read S3 object metadata: {source}")]
    S3Head {
        /// The underlying Rusoto S3 error.
        source: rusoto_core::RusotoError<rusoto_s3::HeadObjectError>,
    },
    /// Error representing a failure when executing an S3 list operation.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to list S3 objects: {source}")]
    S3List {
        /// The underlying Rusoto S3 error.
        source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
    },
    /// Error representing a failure when executing an S3 PUT request.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to put S3 object: {source}")]
    S3Put {
        /// The underlying Rusoto S3 error.
        source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
    },
    /// Error returned when an S3 response for a requested URI does not include body bytes.
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to delete S3 object: {source}")]
    S3Delete {
        /// The underlying Rusoto S3 error.
        #[from]
        source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
    },
    /// Error representing a failure when copying a S3 object
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("Failed to copy S3 object: {source}")]
    S3Copy {
        /// The underlying Rusoto S3 error.
        source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>,
    },
    /// Error returned when S3 object get response contains empty body
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    #[error("S3 Object missing body content: {0}")]
    S3MissingObjectBody(String),
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    /// Represents a generic S3 error. The wrapped error string describes the details.
    #[error("S3 error: {0}")]
    S3Generic(String),
    #[cfg(any(feature = "s3", feature = "s3-rustls"))]
    /// Wraps the DynamoDB error
    #[error("DynamoDB error: {source}")]
    DynamoDb {
        /// Wrapped DynamoDB error
        #[from]
        source: s3::dynamodb_lock::DynamoError,
    },

    /// Azure error
    #[cfg(feature = "azure")]
    #[error("Error interacting with Azure: {source}")]
    Azure {
        /// Azure error reason
        source: AzureError,
    },
    /// Generic Azure error
    #[cfg(feature = "azure")]
    #[error("Generic error: {source}")]
    AzureGeneric {
        /// Generic Azure error reason
        source: Box<dyn Error + Sync + std::marker::Send>,
    },
    /// Azure config error
    #[cfg(feature = "azure")]
    #[error("Azure config error: {0}")]
    AzureConfig(String),

    /// Error returned when the URI is invalid.
    /// The wrapped UriError contains additional details.
    #[error("Invalid object URI")]
    Uri {
        #[from]
        /// Uri error details when the URI is invalid.
        source: UriError,
    },
}

impl StorageError {
    /// Creates a StorageError::Io error wrapping the provided error string.
    pub fn other_std_io_err(desc: String) -> Self {
        Self::Io {
            source: std::io::Error::new(std::io::ErrorKind::Other, desc),
        }
    }
}

impl From<std::io::Error> for StorageError {
    fn from(error: std::io::Error) -> Self {
        match error.kind() {
            std::io::ErrorKind::NotFound => StorageError::NotFound,
            _ => StorageError::Io { source: error },
        }
    }
}

#[cfg(feature = "azure")]
impl From<AzureError> for StorageError {
    fn from(error: AzureError) -> Self {
        match error {
            AzureError::UnexpectedHTTPResult(e) if e.status_code().as_u16() == 404 => {
                StorageError::NotFound
            }
            _ => StorageError::Azure { source: error },
        }
    }
}

/// Describes metadata of a storage object.
pub struct ObjectMeta {
    /// The path where the object is stored. This is the path component of the object URI.
    ///
    /// For example:
    ///   * path for `s3://bucket/foo/bar` should be `foo/bar`.
    ///   * path for `dir/foo/bar` should be `dir/foo/bar`.
    ///
    /// Given a table URI, object URI can be constructed by joining table URI with object path.
    pub path: String,
    /// The last time the object was modified in the storage backend.
    // The timestamp of a commit comes from the remote storage `lastModifiedTime`, and can be
    // adjusted for clock skew.
    pub modified: DateTime<Utc>,
}

/// Abstractions for underlying blob storages hosting the Delta table. To add support for new cloud
/// or local storage systems, simply implement this trait.
#[async_trait::async_trait]
pub trait StorageBackend: Send + Sync + Debug {
    /// Create a new path by appending `path_to_join` as a new component to `path`.
    #[inline]
    fn join_path(&self, path: &str, path_to_join: &str) -> String {
        let normalized_path = path.trim_end_matches('/');
        format!("{}/{}", normalized_path, path_to_join)
    }

    /// More efficient path join for multiple path components. Use this method if you need to
    /// combine more than two path components.
    #[inline]
    fn join_paths(&self, paths: &[&str]) -> String {
        paths
            .iter()
            .map(|s| s.trim_end_matches('/'))
            .collect::<Vec<_>>()
            .join("/")
    }

    /// Returns trimed path with trailing path separator removed.
    #[inline]
    fn trim_path(&self, path: &str) -> String {
        path.trim_end_matches('/').to_string()
    }

    /// Fetch object metadata without reading the actual content
    async fn head_obj(&self, path: &str) -> Result<ObjectMeta, StorageError>;

    /// Fetch object content
    async fn get_obj(&self, path: &str) -> Result<Vec<u8>, StorageError>;

    /// Return a list of objects by `path` prefix in an async stream.
    async fn list_objs<'a>(
        &'a self,
        path: &'a str,
    ) -> Result<
        Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
        StorageError,
    >;

    /// Create new object with `obj_bytes` as content.
    async fn put_obj(&self, path: &str, obj_bytes: &[u8]) -> Result<(), StorageError>;

    /// Moves object from `src` to `dst`.
    ///
    /// Implementation note:
    ///
    /// For a multi-writer safe backend, `rename_obj` needs to implement `atomic rename` semantic.
    /// In other words, if the destination path already exists, rename should return a
    /// [StorageError::AlreadyExists] error.
    async fn rename_obj(&self, src: &str, dst: &str) -> Result<(), StorageError>;

    /// Deletes object by `path`.
    async fn delete_obj(&self, path: &str) -> Result<(), StorageError>;
}

/// Dynamically construct a Storage backend trait object based on scheme for provided URI
pub fn get_backend_for_uri(uri: &str) -> Result<Box<dyn StorageBackend>, StorageError> {
    match parse_uri(uri)? {
        Uri::LocalPath(root) => Ok(Box::new(file::FileStorageBackend::new(root))),
        #[cfg(any(feature = "s3", feature = "s3-rustls"))]
        Uri::S3Object(_) => Ok(Box::new(s3::S3StorageBackend::new()?)),
        #[cfg(feature = "azure")]
        Uri::AdlsGen2Object(obj) => Ok(Box::new(azure::AdlsGen2Backend::new(obj.file_system)?)),
    }
}