stately_arrow/
object_store.rs

1//! Object store backend for cloud and local filesystem access.
2//!
3//! This module provides connectivity to object storage systems including:
4//!
5//! - **AWS S3** - Amazon Simple Storage Service
6//! - **Google Cloud Storage** - GCP object storage
7//! - **Azure Blob Storage** - Microsoft Azure storage
8//! - **Local filesystem** - For development and local data access
9//!
10//! # Configuration
11//!
12//! Use [`Config`] to specify the storage provider and file format:
13//!
14//! ```ignore
15//! use stately_arrow::object_store::{Config, ObjectStore, ObjectStoreFormat};
16//!
17//! let config = Config {
18//!     format: ObjectStoreFormat::Parquet(None),
19//!     store: ObjectStore::Aws {
20//!         bucket: "my-bucket".into(),
21//!         region: Some("us-east-1".into()),
22//!         prefix: Some("data/".into()),
23//!         ..Default::default()
24//!     },
25//! };
26//! ```
27//!
28//! # Credential Resolution
29//!
30//! Cloud providers resolve credentials from environment variables automatically:
31//!
32//! - **AWS**: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`
33//! - **GCP**: `GOOGLE_SERVICE_ACCOUNT` or application default credentials
34//! - **Azure**: `AZURE_STORAGE_ACCOUNT_NAME`, `AZURE_STORAGE_ACCOUNT_KEY`
35use std::collections::BTreeMap;
36use std::fs;
37use std::path::PathBuf;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::{Arc, LazyLock};
40
41use async_trait::async_trait;
42use datafusion::execution::context::SessionContext;
43use datafusion::execution::object_store::ObjectStoreUrl;
44use futures_util::TryStreamExt;
45use object_store::ObjectStore as ObjectStoreClient;
46use object_store::aws::AmazonS3Builder;
47use object_store::azure::MicrosoftAzureBuilder;
48use object_store::gcp::GoogleCloudStorageBuilder;
49use object_store::local::LocalFileSystem;
50use object_store::path::Path as ObjectStorePath;
51use serde::{Deserialize, Serialize};
52use tracing::{debug, error};
53
54use crate::backend::{Backend, BackendMetadata, Capability, ConnectionKind, ConnectionMetadata};
55use crate::error::{Error, Result};
56use crate::response::{ListSummary, TableSummary};
57
58const IGNORE_FILES: &[&str] = &[".DS_Store", ".git", ".env"];
59
60static OBJECT_STORE_METADATA: LazyLock<BackendMetadata> = LazyLock::new(|| BackendMetadata {
61    kind:         ConnectionKind::ObjectStore,
62    capabilities: vec![Capability::ExecuteSql, Capability::List],
63});
64
65/// Configuration for an object store-backed connector.
66#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
67#[schema(as = ObjectStoreConfiguration)]
68pub struct Config {
69    /// The format to read/write within the store.
70    #[serde(default)]
71    pub format: ObjectStoreFormat,
72    /// Provider specific configuration for the store itself.
73    #[serde(default)]
74    pub store:  ObjectStore,
75}
76
77/// Supported file formats for object-store connectors.
78#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
79#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
80#[schema(as = ObjectStoreFormat)]
81#[serde(rename_all = "snake_case")]
82pub enum ObjectStoreFormat {
83    /// Apache Parquet format with optional key/value overrides.
84    #[schema(value_type = Option<BTreeMap<String, String>>)]
85    Parquet(#[serde(default)] Option<BTreeMap<String, String>>),
86}
87
88impl Default for ObjectStoreFormat {
89    fn default() -> Self { ObjectStoreFormat::Parquet(None) }
90}
91
92/// Provider-agnostic object store settings.
93#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
94pub struct ObjectStoreOptions {
95    /// *Required* bucket name (or base directory for local stores).
96    #[serde(default)]
97    #[schema(required)]
98    pub bucket:   String,
99    /// Whether credentials should be resolved from environment variables.
100    #[serde(default)]
101    pub from_env: bool,
102    /// Additional provider-specific configuration parameters.
103    #[serde(default)]
104    #[schema(value_type = BTreeMap<String, String>)]
105    pub options:  BTreeMap<String, String>,
106}
107
108impl ObjectStoreOptions {
109    /// Look up a configuration option by key.
110    pub fn get(&self, key: &str) -> Option<&String> { self.options.get(key) }
111}
112
113/// Supported object store providers.
114#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
115#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
116#[schema(as = ObjectStore)]
117#[serde(rename_all = "snake_case")]
118pub enum ObjectStore {
119    #[serde(alias = "s3", alias = "aws")]
120    #[cfg_attr(feature = "strum", strum(serialize = "s3"))]
121    Aws(ObjectStoreOptions),
122    #[serde(alias = "gcs", alias = "google", alias = "gcp")]
123    #[cfg_attr(feature = "strum", strum(serialize = "gs"))]
124    Gcp(ObjectStoreOptions),
125    #[serde(alias = "azure", alias = "microsoft")]
126    #[cfg_attr(feature = "strum", strum(serialize = "az"))]
127    Azure(ObjectStoreOptions),
128    #[serde(alias = "local", alias = "file")]
129    #[cfg_attr(feature = "strum", strum(serialize = "file"))]
130    Local(ObjectStoreOptions),
131}
132
133impl ObjectStore {
134    /// Return the bucket (or fully qualified path for local stores).
135    pub fn bucket(&self) -> String {
136        match self {
137            ObjectStore::Aws(settings)
138            | ObjectStore::Gcp(settings)
139            | ObjectStore::Azure(settings) => settings.bucket.clone(),
140            ObjectStore::Local(settings) => {
141                let root = settings
142                    .options
143                    .get("root")
144                    .or_else(|| settings.options.get("path"))
145                    .or_else(|| settings.options.get("base_path"))
146                    .or_else(|| settings.options.get("base_dir"));
147                match root {
148                    Some(path) => format!("{path}/{}", settings.bucket),
149                    None => settings.bucket.clone(),
150                }
151            }
152        }
153    }
154
155    /// Produce a canonical URL identifier for the bucket/path.
156    pub fn url(&self) -> String {
157        let scheme = match self {
158            ObjectStore::Aws(_) => "s3",
159            ObjectStore::Gcp(_) => "gs",
160            ObjectStore::Azure(_) => "az",
161            ObjectStore::Local(_) => "file",
162        };
163        format!("{scheme}://{}", self.bucket())
164    }
165
166    /// Access the underlying provider configuration.
167    pub fn config(&self) -> &ObjectStoreOptions {
168        match self {
169            ObjectStore::Aws(cfg)
170            | ObjectStore::Gcp(cfg)
171            | ObjectStore::Azure(cfg)
172            | ObjectStore::Local(cfg) => cfg,
173        }
174    }
175}
176
177impl Default for ObjectStore {
178    fn default() -> Self { Self::Local(ObjectStoreOptions::default()) }
179}
180
181/// Default backend implementation for object store connectors.
182pub struct ObjectStoreBackend {
183    metadata:   ConnectionMetadata,
184    store:      Arc<dyn ObjectStoreClient>,
185    url:        ObjectStoreUrl,
186    registered: Arc<AtomicBool>,
187}
188
189impl ObjectStoreBackend {
190    /// Create a new object store backend.
191    ///
192    /// # Errors
193    /// - If the object store creation fails.
194    pub fn try_new(
195        id: impl Into<String>,
196        name: impl Into<String>,
197        config: &Config,
198    ) -> Result<Self> {
199        let metadata = ConnectionMetadata {
200            id:       id.into(),
201            name:     name.into(),
202            catalog:  Some(config.store.url()),
203            metadata: OBJECT_STORE_METADATA.clone(),
204        };
205
206        let ObjectStoreRegistration { object_store: store, url, .. } =
207            create_object_store(&config.store)
208                .inspect_err(|error| error!(?error, "Failed to create object store"))?;
209        let url = ObjectStoreUrl::parse(&url)
210            .map_err(|e| Error::Internal(format!("Invalid bucket URL: {e}")))?;
211        Ok(Self { metadata, store, url, registered: Arc::new(AtomicBool::new(false)) })
212    }
213
214    pub fn metadata() -> BackendMetadata { OBJECT_STORE_METADATA.clone() }
215}
216
217#[async_trait]
218impl Backend for ObjectStoreBackend {
219    fn connection(&self) -> &ConnectionMetadata { &self.metadata }
220
221    async fn prepare_session(&self, session: &SessionContext) -> Result<()> {
222        if self.registered.load(Ordering::Acquire) {
223            debug!("Object store already registered, skipping registration");
224            return Ok(());
225        }
226
227        let url = &self.url;
228        let store = Arc::clone(&self.store);
229        let previous = session.register_object_store(url.as_ref(), store);
230        let overwrote = previous.is_some();
231        debug!(url = url.as_str(), overwrote, "Registered object store with session");
232
233        // Flag that connection has been registered
234        self.registered.store(true, Ordering::Release);
235
236        Ok(())
237    }
238
239    async fn list(&self, path: Option<&str>) -> Result<ListSummary> {
240        let prefix = path
241            .filter(|s| !s.is_empty())
242            .map(|db| ObjectStorePath::from(db.trim_start_matches('/')));
243        let object_metas = self
244            .store
245            .list(prefix.as_ref())
246            .try_collect::<Vec<_>>()
247            .await?
248            .into_iter()
249            .filter(|meta| {
250                !IGNORE_FILES
251                    .iter()
252                    .any(|i| meta.location.filename().is_some_and(|f| f.starts_with(i)))
253            })
254            .map(|meta| (meta.location.to_string(), meta.size))
255            .collect::<Vec<_>>();
256
257        // If the path is none and there is a directory for each entry, treat as "databases"
258        let database_search = path.is_none()
259            && object_metas.iter().all(|(location, _)| {
260                location.contains('/') && !location.starts_with('/') && !location.ends_with('/')
261            });
262
263        Ok(if database_search {
264            ListSummary::Paths(
265                object_metas
266                    .into_iter()
267                    .filter_map(|(location, _)| location.split('/').next().map(ToString::to_string))
268                    .collect::<Vec<_>>(),
269            )
270        } else {
271            ListSummary::Files(
272                object_metas
273                    .into_iter()
274                    .map(|(location, size)| {
275                        // If the location starts with the search term, remove it, it's already
276                        // encoded in the UI
277                        let location =
278                            if let Some(p) = path.as_ref().filter(|p| location.starts_with(*p)) {
279                                location.strip_prefix(p).unwrap_or(&location).to_string()
280                            } else {
281                                location
282                            };
283                        (location, size)
284                    })
285                    .map(|(location, size)| TableSummary {
286                        name:       location,
287                        rows:       None,
288                        size_bytes: Some(size),
289                    })
290                    .collect::<Vec<_>>(),
291            )
292        })
293    }
294}
295
296/// Object store registration object. Identifies the url, full path if any, and provides the object
297/// store created
298pub struct ObjectStoreRegistration {
299    pub object_store: Arc<dyn ObjectStoreClient>,
300    pub url:          String,
301    pub full_path:    Option<String>,
302}
303
304/// Create an object store based on the provided configuration.
305///
306/// # Errors
307/// - If the object store cannot be created.
308pub fn create_object_store(store: &ObjectStore) -> Result<ObjectStoreRegistration> {
309    macro_rules! build {
310        ($url:expr, $conf:expr, $b:ident, $bn:ident) => {{
311            let mut builder = if $conf.from_env { $b::from_env() } else { $b::new() }.$bn($url);
312            for (key, value) in &$conf.options {
313                builder = builder.with_config(key.parse()?, value);
314            }
315            Arc::new(builder.build()?) as Arc<dyn ObjectStoreClient>
316        }};
317    }
318
319    let bucket = store.bucket();
320    let url = store.url();
321    let mut full_path = None;
322    let object_store = match store {
323        ObjectStore::Aws(s) => build!(bucket, s, AmazonS3Builder, with_bucket_name),
324        ObjectStore::Gcp(s) => build!(bucket, s, GoogleCloudStorageBuilder, with_bucket_name),
325        ObjectStore::Azure(s) => build!(bucket, s, MicrosoftAzureBuilder, with_container_name),
326        ObjectStore::Local(_) => {
327            // Canonicalize the base path and ensure it exists
328            let path = PathBuf::from(bucket);
329            let path = if path.is_relative() {
330                std::env::current_dir()
331                    .map(|c| c.join(path))
332                    .map_err(|e| Error::ObjectStoreCreate(e.to_string()))?
333            } else {
334                path
335            };
336
337            if let Err(e) = fs::create_dir_all(&path) {
338                error!(?path, "Failed to prepare local object store directory");
339                return Err(Error::ObjectStoreCreate(format!(
340                    "Failed to create local store path `{}`: {e}",
341                    path.display()
342                )));
343            }
344
345            full_path = Some(path.to_string_lossy().to_string());
346            Arc::new(LocalFileSystem::new_with_prefix(&path)?)
347        }
348    };
349
350    Ok(ObjectStoreRegistration { object_store, url, full_path })
351}