1use std::collections::BTreeMap;
36use std::fs;
37use std::path::PathBuf;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::{Arc, LazyLock};
40
41use async_trait::async_trait;
42use datafusion::execution::context::SessionContext;
43use datafusion::execution::object_store::ObjectStoreUrl;
44use futures_util::TryStreamExt;
45use object_store::ObjectStore as ObjectStoreClient;
46use object_store::aws::AmazonS3Builder;
47use object_store::azure::MicrosoftAzureBuilder;
48use object_store::gcp::GoogleCloudStorageBuilder;
49use object_store::local::LocalFileSystem;
50use object_store::path::Path as ObjectStorePath;
51use serde::{Deserialize, Serialize};
52use tracing::{debug, error};
53
54use crate::backend::{Backend, BackendMetadata, Capability, ConnectionKind, ConnectionMetadata};
55use crate::error::{Error, Result};
56use crate::response::{ListSummary, TableSummary};
57
58const IGNORE_FILES: &[&str] = &[".DS_Store", ".git", ".env"];
59
60static OBJECT_STORE_METADATA: LazyLock<BackendMetadata> = LazyLock::new(|| BackendMetadata {
61 kind: ConnectionKind::ObjectStore,
62 capabilities: vec![Capability::ExecuteSql, Capability::List],
63});
64
65#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
67#[schema(as = ObjectStoreConfiguration)]
68pub struct Config {
69 #[serde(default)]
71 pub format: ObjectStoreFormat,
72 #[serde(default)]
74 pub store: ObjectStore,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
79#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
80#[schema(as = ObjectStoreFormat)]
81#[serde(rename_all = "snake_case")]
82pub enum ObjectStoreFormat {
83 #[schema(value_type = Option<BTreeMap<String, String>>)]
85 Parquet(#[serde(default)] Option<BTreeMap<String, String>>),
86}
87
88impl Default for ObjectStoreFormat {
89 fn default() -> Self { ObjectStoreFormat::Parquet(None) }
90}
91
92#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
94pub struct ObjectStoreOptions {
95 #[serde(default)]
97 #[schema(required)]
98 pub bucket: String,
99 #[serde(default)]
101 pub from_env: bool,
102 #[serde(default)]
104 #[schema(value_type = BTreeMap<String, String>)]
105 pub options: BTreeMap<String, String>,
106}
107
108impl ObjectStoreOptions {
109 pub fn get(&self, key: &str) -> Option<&String> { self.options.get(key) }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
115#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
116#[schema(as = ObjectStore)]
117#[serde(rename_all = "snake_case")]
118pub enum ObjectStore {
119 #[serde(alias = "s3", alias = "aws")]
120 #[cfg_attr(feature = "strum", strum(serialize = "s3"))]
121 Aws(ObjectStoreOptions),
122 #[serde(alias = "gcs", alias = "google", alias = "gcp")]
123 #[cfg_attr(feature = "strum", strum(serialize = "gs"))]
124 Gcp(ObjectStoreOptions),
125 #[serde(alias = "azure", alias = "microsoft")]
126 #[cfg_attr(feature = "strum", strum(serialize = "az"))]
127 Azure(ObjectStoreOptions),
128 #[serde(alias = "local", alias = "file")]
129 #[cfg_attr(feature = "strum", strum(serialize = "file"))]
130 Local(ObjectStoreOptions),
131}
132
133impl ObjectStore {
134 pub fn bucket(&self) -> String {
136 match self {
137 ObjectStore::Aws(settings)
138 | ObjectStore::Gcp(settings)
139 | ObjectStore::Azure(settings) => settings.bucket.clone(),
140 ObjectStore::Local(settings) => {
141 let root = settings
142 .options
143 .get("root")
144 .or_else(|| settings.options.get("path"))
145 .or_else(|| settings.options.get("base_path"))
146 .or_else(|| settings.options.get("base_dir"));
147 match root {
148 Some(path) => format!("{path}/{}", settings.bucket),
149 None => settings.bucket.clone(),
150 }
151 }
152 }
153 }
154
155 pub fn url(&self) -> String {
157 let scheme = match self {
158 ObjectStore::Aws(_) => "s3",
159 ObjectStore::Gcp(_) => "gs",
160 ObjectStore::Azure(_) => "az",
161 ObjectStore::Local(_) => "file",
162 };
163 format!("{scheme}://{}", self.bucket())
164 }
165
166 pub fn config(&self) -> &ObjectStoreOptions {
168 match self {
169 ObjectStore::Aws(cfg)
170 | ObjectStore::Gcp(cfg)
171 | ObjectStore::Azure(cfg)
172 | ObjectStore::Local(cfg) => cfg,
173 }
174 }
175}
176
177impl Default for ObjectStore {
178 fn default() -> Self { Self::Local(ObjectStoreOptions::default()) }
179}
180
181pub struct ObjectStoreBackend {
183 metadata: ConnectionMetadata,
184 store: Arc<dyn ObjectStoreClient>,
185 url: ObjectStoreUrl,
186 registered: Arc<AtomicBool>,
187}
188
189impl ObjectStoreBackend {
190 pub fn try_new(
195 id: impl Into<String>,
196 name: impl Into<String>,
197 config: &Config,
198 ) -> Result<Self> {
199 let metadata = ConnectionMetadata {
200 id: id.into(),
201 name: name.into(),
202 catalog: Some(config.store.url()),
203 metadata: OBJECT_STORE_METADATA.clone(),
204 };
205
206 let ObjectStoreRegistration { object_store: store, url, .. } =
207 create_object_store(&config.store)
208 .inspect_err(|error| error!(?error, "Failed to create object store"))?;
209 let url = ObjectStoreUrl::parse(&url)
210 .map_err(|e| Error::Internal(format!("Invalid bucket URL: {e}")))?;
211 Ok(Self { metadata, store, url, registered: Arc::new(AtomicBool::new(false)) })
212 }
213
214 pub fn metadata() -> BackendMetadata { OBJECT_STORE_METADATA.clone() }
215}
216
217#[async_trait]
218impl Backend for ObjectStoreBackend {
219 fn connection(&self) -> &ConnectionMetadata { &self.metadata }
220
221 async fn prepare_session(&self, session: &SessionContext) -> Result<()> {
222 if self.registered.load(Ordering::Acquire) {
223 debug!("Object store already registered, skipping registration");
224 return Ok(());
225 }
226
227 let url = &self.url;
228 let store = Arc::clone(&self.store);
229 let previous = session.register_object_store(url.as_ref(), store);
230 let overwrote = previous.is_some();
231 debug!(url = url.as_str(), overwrote, "Registered object store with session");
232
233 self.registered.store(true, Ordering::Release);
235
236 Ok(())
237 }
238
239 async fn list(&self, path: Option<&str>) -> Result<ListSummary> {
240 let prefix = path
241 .filter(|s| !s.is_empty())
242 .map(|db| ObjectStorePath::from(db.trim_start_matches('/')));
243 let object_metas = self
244 .store
245 .list(prefix.as_ref())
246 .try_collect::<Vec<_>>()
247 .await?
248 .into_iter()
249 .filter(|meta| {
250 !IGNORE_FILES
251 .iter()
252 .any(|i| meta.location.filename().is_some_and(|f| f.starts_with(i)))
253 })
254 .map(|meta| (meta.location.to_string(), meta.size))
255 .collect::<Vec<_>>();
256
257 let database_search = path.is_none()
259 && object_metas.iter().all(|(location, _)| {
260 location.contains('/') && !location.starts_with('/') && !location.ends_with('/')
261 });
262
263 Ok(if database_search {
264 ListSummary::Paths(
265 object_metas
266 .into_iter()
267 .filter_map(|(location, _)| location.split('/').next().map(ToString::to_string))
268 .collect::<Vec<_>>(),
269 )
270 } else {
271 ListSummary::Files(
272 object_metas
273 .into_iter()
274 .map(|(location, size)| {
275 let location =
278 if let Some(p) = path.as_ref().filter(|p| location.starts_with(*p)) {
279 location.strip_prefix(p).unwrap_or(&location).to_string()
280 } else {
281 location
282 };
283 (location, size)
284 })
285 .map(|(location, size)| TableSummary {
286 name: location,
287 rows: None,
288 size_bytes: Some(size),
289 })
290 .collect::<Vec<_>>(),
291 )
292 })
293 }
294}
295
296pub struct ObjectStoreRegistration {
299 pub object_store: Arc<dyn ObjectStoreClient>,
300 pub url: String,
301 pub full_path: Option<String>,
302}
303
304pub fn create_object_store(store: &ObjectStore) -> Result<ObjectStoreRegistration> {
309 macro_rules! build {
310 ($url:expr, $conf:expr, $b:ident, $bn:ident) => {{
311 let mut builder = if $conf.from_env { $b::from_env() } else { $b::new() }.$bn($url);
312 for (key, value) in &$conf.options {
313 builder = builder.with_config(key.parse()?, value);
314 }
315 Arc::new(builder.build()?) as Arc<dyn ObjectStoreClient>
316 }};
317 }
318
319 let bucket = store.bucket();
320 let url = store.url();
321 let mut full_path = None;
322 let object_store = match store {
323 ObjectStore::Aws(s) => build!(bucket, s, AmazonS3Builder, with_bucket_name),
324 ObjectStore::Gcp(s) => build!(bucket, s, GoogleCloudStorageBuilder, with_bucket_name),
325 ObjectStore::Azure(s) => build!(bucket, s, MicrosoftAzureBuilder, with_container_name),
326 ObjectStore::Local(_) => {
327 let path = PathBuf::from(bucket);
329 let path = if path.is_relative() {
330 std::env::current_dir()
331 .map(|c| c.join(path))
332 .map_err(|e| Error::ObjectStoreCreate(e.to_string()))?
333 } else {
334 path
335 };
336
337 if let Err(e) = fs::create_dir_all(&path) {
338 error!(?path, "Failed to prepare local object store directory");
339 return Err(Error::ObjectStoreCreate(format!(
340 "Failed to create local store path `{}`: {e}",
341 path.display()
342 )));
343 }
344
345 full_path = Some(path.to_string_lossy().to_string());
346 Arc::new(LocalFileSystem::new_with_prefix(&path)?)
347 }
348 };
349
350 Ok(ObjectStoreRegistration { object_store, url, full_path })
351}