Skip to main content

scouter_dataframe/
storage.rs

1use crate::caching_store::CachingStore;
2use crate::error::StorageError;
3use base64::prelude::*;
4use datafusion::prelude::{SessionConfig, SessionContext};
5use futures::TryStreamExt;
6use object_store::aws::{AmazonS3, AmazonS3Builder};
7use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
8use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
9use object_store::local::LocalFileSystem;
10use object_store::path::Path;
11use object_store::ClientOptions;
12use object_store::ObjectStore as ObjStore;
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::StorageType;
15use std::sync::Arc;
16use tracing::debug;
17use url::Url;
18
19/// HTTP client options for cloud object stores.
20///
21/// Enables TCP+TLS connection pooling so repeat queries reuse existing
22/// connections. Sized for high-concurrency GCS/S3 workloads where many
23/// parallel readers share the same host.
24fn cloud_client_options() -> ClientOptions {
25    ClientOptions::new()
26        .with_pool_idle_timeout(std::time::Duration::from_secs(120))
27        .with_pool_max_idle_per_host(64)
28        .with_timeout(std::time::Duration::from_secs(30))
29        .with_connect_timeout(std::time::Duration::from_secs(5))
30}
31
32/// Helper function to decode base64 encoded string
33fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
34    let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
35
36    Ok(String::from_utf8(decoded)?)
37}
38
39/// Storage provider enum for common object stores
40#[derive(Debug, Clone)]
41enum StorageProvider {
42    Google(Arc<CachingStore<GoogleCloudStorage>>),
43    Aws(Arc<CachingStore<AmazonS3>>),
44    Local(Arc<CachingStore<LocalFileSystem>>),
45    Azure(Arc<CachingStore<MicrosoftAzure>>),
46}
47
48impl StorageProvider {
49    /// Return the inner object store as a type-erased `Arc<dyn ObjectStore>`.
50    ///
51    /// Used by the Delta Lake engine to bypass the storage factory via
52    /// `DeltaTableBuilder::with_storage_backend` — necessary for cloud stores
53    /// (GCS, S3, Azure) whose schemes are not registered in the default factory.
54    pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
55        match self {
56            StorageProvider::Google(s) => s.clone() as Arc<dyn ObjStore>,
57            StorageProvider::Aws(s) => s.clone() as Arc<dyn ObjStore>,
58            StorageProvider::Local(s) => s.clone() as Arc<dyn ObjStore>,
59            StorageProvider::Azure(s) => s.clone() as Arc<dyn ObjStore>,
60        }
61    }
62
63    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
64        let cache_bytes = storage_settings.object_cache_mb() * 1024 * 1024;
65
66        let store = match storage_settings.storage_type {
67            StorageType::Google => {
68                let mut builder = GoogleCloudStorageBuilder::from_env();
69
70                // Try to use base64 credentials if available
71                if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
72                    let key = decode_base64_str(&base64_creds)?;
73                    builder = builder.with_service_account_key(&key);
74                    debug!("Using base64 encoded service account key for Google Cloud Storage");
75                }
76
77                // Add bucket name and build
78                let storage = builder
79                    .with_bucket_name(storage_settings.storage_root())
80                    .with_client_options(cloud_client_options())
81                    .build()?;
82
83                StorageProvider::Google(Arc::new(CachingStore::new(storage, cache_bytes)))
84            }
85            StorageType::Aws => {
86                let storage = AmazonS3Builder::from_env()
87                    .with_bucket_name(storage_settings.storage_root())
88                    .with_region(storage_settings.region.clone())
89                    .with_client_options(cloud_client_options())
90                    .build()?;
91                StorageProvider::Aws(Arc::new(CachingStore::new(storage, cache_bytes)))
92            }
93            StorageType::Local => {
94                let storage = LocalFileSystem::new();
95                StorageProvider::Local(Arc::new(CachingStore::new(storage, cache_bytes)))
96            }
97            StorageType::Azure => {
98                // MicrosoftAzureBuilder::from_env() reads AZURE_STORAGE_ACCOUNT_NAME
99                // and AZURE_STORAGE_ACCOUNT_KEY specifically.  Many Azure tools
100                // (az CLI, Terraform, GitHub Actions) emit AZURE_STORAGE_ACCOUNT and
101                // AZURE_STORAGE_KEY instead.  Accept both so callers don't need to
102                // know which naming convention object_store expects.
103                let mut builder = MicrosoftAzureBuilder::from_env();
104
105                if std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
106                    if let Ok(account) = std::env::var("AZURE_STORAGE_ACCOUNT") {
107                        builder = builder.with_account(account);
108                    }
109                }
110                if std::env::var("AZURE_STORAGE_ACCOUNT_KEY").is_err() {
111                    if let Ok(key) = std::env::var("AZURE_STORAGE_KEY") {
112                        builder = builder.with_access_key(key);
113                    }
114                }
115
116                let storage = builder
117                    .with_container_name(storage_settings.storage_root())
118                    .with_client_options(cloud_client_options())
119                    .build()?;
120
121                StorageProvider::Azure(Arc::new(CachingStore::new(storage, cache_bytes)))
122            }
123        };
124
125        Ok(store)
126    }
127
128    pub fn get_base_url(
129        &self,
130        storage_settings: &ObjectStorageSettings,
131    ) -> Result<Url, StorageError> {
132        match self {
133            StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
134            StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
135            StorageProvider::Local(_) => {
136                // Convert relative path to absolute path for local filesystem
137                let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
138                let absolute_path = if storage_path.is_absolute() {
139                    storage_path
140                } else {
141                    std::env::current_dir()?.join(storage_path)
142                };
143
144                // Create file:// URL with absolute path
145                let url = Url::from_file_path(&absolute_path).map_err(|_| {
146                    StorageError::InvalidUrl(format!(
147                        "Failed to create file URL from path: {:?}",
148                        absolute_path
149                    ))
150                })?;
151                Ok(url)
152            }
153            StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
154        }
155    }
156
157    /// Build the base `SessionConfig` used by all session constructors.
158    fn build_session_config() -> SessionConfig {
159        let mut config = SessionConfig::new()
160            .with_target_partitions(
161                std::thread::available_parallelism()
162                    .map(|n| n.get())
163                    .unwrap_or(4),
164            )
165            .with_batch_size(8192)
166            .with_prefer_existing_sort(true)
167            .with_parquet_pruning(true)
168            .with_collect_statistics(true);
169
170        // Push filter predicates into the Parquet reader so only matching rows are decoded,
171        // and reorder predicates by selectivity so bloom filters (trace_id, entity_id) are
172        // evaluated before range checks (start_time), short-circuiting row evaluation early.
173        config.options_mut().execution.parquet.pushdown_filters = true;
174        config.options_mut().execution.parquet.reorder_filters = true;
175
176        // ── Parquet read-path tuning (GCS latency reduction) ──────────────
177        //
178        // Read at least 1MB from the end of each Parquet file in a single request.
179        // Default is 512KB. Our files have bloom filters on trace_id + entity_id
180        // and page-level statistics on start_time + status_code, so footers are
181        // larger than average. 1MB captures footer + column/offset indexes in one
182        // GCS round-trip instead of the default multi-step chain, saving 1-2
183        // round-trips (~30-60ms each) per file.
184        config.options_mut().execution.parquet.metadata_size_hint = Some(1024 * 1024);
185
186        // Bloom filters are written on trace_id and entity_id — ensure the reader
187        // consults them before decoding row groups. (Default is true in DF 52, but
188        // we're explicit to guard against version changes.)
189        config.options_mut().execution.parquet.bloom_filter_on_read = true;
190
191        // Read Utf8 columns as Utf8View and Binary as BinaryView for zero-copy.
192        // Our schema already uses Utf8View/BinaryView — this ensures DataFusion
193        // doesn't downgrade them when reading back from Parquet.
194        config
195            .options_mut()
196            .execution
197            .parquet
198            .schema_force_view_types = true;
199
200        // ── Listing / metadata concurrency ───────────────────────────────
201        //
202        // Number of files to stat in parallel when inferring schema or listing
203        // a Delta table's backing Parquet files. Default is 32. On GCS each
204        // stat is a separate HTTP HEAD; higher concurrency hides the per-file
205        // latency behind parallelism. 64 matches our pool_max_idle_per_host.
206        config.options_mut().execution.meta_fetch_concurrency = 64;
207
208        // ── Write-path tuning ────────────────────────────────────────────
209        //
210        // Increase write-side parallelism so compaction and flush can encode
211        // multiple row groups concurrently, reducing wall-clock write latency.
212        config
213            .options_mut()
214            .execution
215            .parquet
216            .maximum_parallel_row_group_writers = 4;
217
218        // Buffer more decoded record batches per stream before back-pressure
219        // kicks in, smoothing out bursty reads from GCS.
220        config
221            .options_mut()
222            .execution
223            .parquet
224            .maximum_buffered_record_batches_per_stream = 8;
225
226        config
227    }
228
229    /// Create a `SessionContext` from a config and register the backing object store.
230    fn build_ctx(
231        &self,
232        storage_settings: &ObjectStorageSettings,
233        config: SessionConfig,
234    ) -> Result<SessionContext, StorageError> {
235        let ctx = SessionContext::new_with_config(config);
236        let base_url = self.get_base_url(storage_settings)?;
237
238        match self {
239            StorageProvider::Google(store) => {
240                ctx.register_object_store(&base_url, store.clone());
241            }
242            StorageProvider::Aws(store) => {
243                ctx.register_object_store(&base_url, store.clone());
244            }
245            StorageProvider::Local(store) => {
246                ctx.register_object_store(&base_url, store.clone());
247            }
248            StorageProvider::Azure(store) => {
249                ctx.register_object_store(&base_url, store.clone());
250            }
251        }
252
253        Ok(ctx)
254    }
255
256    pub fn get_session(
257        &self,
258        storage_settings: &ObjectStorageSettings,
259    ) -> Result<SessionContext, StorageError> {
260        let config = Self::build_session_config();
261        self.build_ctx(storage_settings, config)
262    }
263
264    /// Like `get_session()` but sets `catalog_name.schema_name` as the default catalog
265    /// for SQL name resolution. Unqualified table names in queries and `ctx.table(name)`
266    /// calls will resolve through the named catalog instead of the built-in `datafusion.public`.
267    pub fn get_session_with_catalog(
268        &self,
269        storage_settings: &ObjectStorageSettings,
270        catalog_name: &str,
271        schema_name: &str,
272    ) -> Result<SessionContext, StorageError> {
273        let config =
274            Self::build_session_config().with_default_catalog_and_schema(catalog_name, schema_name);
275        self.build_ctx(storage_settings, config)
276    }
277
278    /// List files in the object store
279    ///
280    /// # Arguments
281    /// * `path` - The path to list files from. If None, lists all files in the root.
282    ///
283    /// # Returns
284    /// * `Result<Vec<String>, StorageError>` - A result containing a vector of file paths or an error.
285    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
286        let stream = match self {
287            StorageProvider::Local(store) => store.list(path),
288            StorageProvider::Google(store) => store.list(path),
289            StorageProvider::Aws(store) => store.list(path),
290            StorageProvider::Azure(store) => store.list(path),
291        };
292
293        // Process each item in the stream
294        stream
295            .try_fold(Vec::new(), |mut files, meta| async move {
296                files.push(meta.location.to_string());
297                Ok(files)
298            })
299            .await
300            .map_err(Into::into)
301    }
302
303    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
304        match self {
305            StorageProvider::Local(store) => {
306                store.delete(path).await?;
307                Ok(())
308            }
309            StorageProvider::Google(store) => {
310                store.delete(path).await?;
311                Ok(())
312            }
313            StorageProvider::Aws(store) => {
314                store.delete(path).await?;
315                Ok(())
316            }
317            StorageProvider::Azure(store) => {
318                store.delete(path).await?;
319                Ok(())
320            }
321        }
322    }
323}
324
325#[derive(Debug, Clone)]
326pub struct ObjectStore {
327    provider: StorageProvider,
328    pub storage_settings: ObjectStorageSettings,
329}
330
331impl ObjectStore {
332    /// Creates a new ObjectStore instance.
333    ///
334    /// # Arguments
335    /// * `storage_settings` - The settings for the object storage.
336    ///
337    /// # Returns
338    /// * `Result<ObjectStore, StorageError>` - A result containing the ObjectStore instance or an error.
339    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
340        let store = StorageProvider::new(storage_settings)?;
341        Ok(ObjectStore {
342            provider: store,
343            storage_settings: storage_settings.clone(),
344        })
345    }
346
347    pub fn get_session(&self) -> Result<SessionContext, StorageError> {
348        let ctx = self.provider.get_session(&self.storage_settings)?;
349        Ok(ctx)
350    }
351
352    /// Like `get_session()` but configures `catalog_name.schema_name` as the default catalog
353    /// so unqualified table names in SQL and `ctx.table(name)` resolve through the named catalog.
354    pub fn get_session_with_catalog(
355        &self,
356        catalog_name: &str,
357        schema_name: &str,
358    ) -> Result<SessionContext, StorageError> {
359        let ctx = self.provider.get_session_with_catalog(
360            &self.storage_settings,
361            catalog_name,
362            schema_name,
363        )?;
364        Ok(ctx)
365    }
366
367    /// Return the inner object store as a type-erased `Arc<dyn ObjectStore>`.
368    ///
369    /// Pass this to `DeltaTableBuilder::with_storage_backend` to bypass the Delta Lake
370    /// storage factory (required for GCS, S3, and Azure).
371    pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
372        self.provider.as_dyn_object_store()
373    }
374
375    /// Get the base URL for datafusion to use
376    pub fn get_base_url(&self) -> Result<Url, StorageError> {
377        self.provider.get_base_url(&self.storage_settings)
378    }
379
380    /// List files in the object store
381    ///
382    /// When path is None, lists from the root.
383    /// When path is provided, lists from that path.
384    ///
385    /// Note: The path parameter should NOT include the storage root - it's a relative path
386    /// that will be automatically combined with the storage root.
387    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
388        self.provider.list(path).await
389    }
390
391    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
392        self.provider.delete(path).await
393    }
394}