Skip to main content

scouter_dataframe/
storage.rs

1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::{SessionConfig, SessionContext};
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ClientOptions;
11use object_store::ObjectStore as ObjStore;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::StorageType;
14use std::sync::Arc;
15use tracing::debug;
16use url::Url;
17
18/// HTTP client options for cloud object stores.
19///
20/// Enables TCP+TLS connection pooling so repeat queries reuse existing
21/// connections
22fn cloud_client_options() -> ClientOptions {
23    ClientOptions::new()
24        .with_pool_idle_timeout(std::time::Duration::from_secs(90))
25        .with_pool_max_idle_per_host(16)
26}
27
28/// Helper function to decode base64 encoded string
29fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
30    let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
31
32    Ok(String::from_utf8(decoded)?)
33}
34
35/// Storage provider enum for common object stores
36#[derive(Debug, Clone)]
37enum StorageProvider {
38    Google(Arc<GoogleCloudStorage>),
39    Aws(Arc<AmazonS3>),
40    Local(Arc<LocalFileSystem>),
41    Azure(Arc<MicrosoftAzure>),
42}
43
44impl StorageProvider {
45    /// Return the inner object store as a type-erased `Arc<dyn ObjectStore>`.
46    ///
47    /// Used by the Delta Lake engine to bypass the storage factory via
48    /// `DeltaTableBuilder::with_storage_backend` — necessary for cloud stores
49    /// (GCS, S3, Azure) whose schemes are not registered in the default factory.
50    pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
51        match self {
52            StorageProvider::Google(s) => s.clone() as Arc<dyn ObjStore>,
53            StorageProvider::Aws(s) => s.clone() as Arc<dyn ObjStore>,
54            StorageProvider::Local(s) => s.clone() as Arc<dyn ObjStore>,
55            StorageProvider::Azure(s) => s.clone() as Arc<dyn ObjStore>,
56        }
57    }
58
59    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
60        let store = match storage_settings.storage_type {
61            StorageType::Google => {
62                let mut builder = GoogleCloudStorageBuilder::from_env();
63
64                // Try to use base64 credentials if available
65                if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
66                    let key = decode_base64_str(&base64_creds)?;
67                    builder = builder.with_service_account_key(&key);
68                    debug!("Using base64 encoded service account key for Google Cloud Storage");
69                }
70
71                // Add bucket name and build
72                let storage = builder
73                    .with_bucket_name(storage_settings.storage_root())
74                    .with_client_options(cloud_client_options())
75                    .build()?;
76
77                StorageProvider::Google(Arc::new(storage))
78            }
79            StorageType::Aws => {
80                let builder = AmazonS3Builder::from_env()
81                    .with_bucket_name(storage_settings.storage_root())
82                    .with_region(storage_settings.region.clone())
83                    .with_client_options(cloud_client_options())
84                    .build()?;
85                StorageProvider::Aws(Arc::new(builder))
86            }
87            StorageType::Local => {
88                // Create LocalFileSystem with the root path as the prefix
89                let builder = LocalFileSystem::new();
90                StorageProvider::Local(Arc::new(builder))
91            }
92            StorageType::Azure => {
93                // MicrosoftAzureBuilder::from_env() reads AZURE_STORAGE_ACCOUNT_NAME
94                // and AZURE_STORAGE_ACCOUNT_KEY specifically.  Many Azure tools
95                // (az CLI, Terraform, GitHub Actions) emit AZURE_STORAGE_ACCOUNT and
96                // AZURE_STORAGE_KEY instead.  Accept both so callers don't need to
97                // know which naming convention object_store expects.
98                let mut builder = MicrosoftAzureBuilder::from_env();
99
100                if std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
101                    if let Ok(account) = std::env::var("AZURE_STORAGE_ACCOUNT") {
102                        builder = builder.with_account(account);
103                    }
104                }
105                if std::env::var("AZURE_STORAGE_ACCOUNT_KEY").is_err() {
106                    if let Ok(key) = std::env::var("AZURE_STORAGE_KEY") {
107                        builder = builder.with_access_key(key);
108                    }
109                }
110
111                let store = builder
112                    .with_container_name(storage_settings.storage_root())
113                    .with_client_options(cloud_client_options())
114                    .build()?;
115
116                StorageProvider::Azure(Arc::new(store))
117            }
118        };
119
120        Ok(store)
121    }
122
123    pub fn get_base_url(
124        &self,
125        storage_settings: &ObjectStorageSettings,
126    ) -> Result<Url, StorageError> {
127        match self {
128            StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
129            StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
130            StorageProvider::Local(_) => {
131                // Convert relative path to absolute path for local filesystem
132                let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
133                let absolute_path = if storage_path.is_absolute() {
134                    storage_path
135                } else {
136                    std::env::current_dir()?.join(storage_path)
137                };
138
139                // Create file:// URL with absolute path
140                let url = Url::from_file_path(&absolute_path).map_err(|_| {
141                    StorageError::InvalidUrl(format!(
142                        "Failed to create file URL from path: {:?}",
143                        absolute_path
144                    ))
145                })?;
146                Ok(url)
147            }
148            StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
149        }
150    }
151
152    pub fn get_session(
153        &self,
154        storage_settings: &ObjectStorageSettings,
155    ) -> Result<SessionContext, StorageError> {
156        let mut config = SessionConfig::new()
157            .with_target_partitions(
158                std::thread::available_parallelism()
159                    .map(|n| n.get())
160                    .unwrap_or(4),
161            )
162            .with_batch_size(8192)
163            .with_prefer_existing_sort(true)
164            .with_parquet_pruning(true)
165            .with_collect_statistics(true);
166
167        // Push filter predicates into the Parquet reader so only matching rows are decoded,
168        // and reorder predicates by selectivity so bloom filters (trace_id, entity_id) are
169        // evaluated before range checks (start_time), short-circuiting row evaluation early.
170        config.options_mut().execution.parquet.pushdown_filters = true;
171        config.options_mut().execution.parquet.reorder_filters = true;
172
173        let ctx = SessionContext::new_with_config(config);
174        let base_url = self.get_base_url(storage_settings)?;
175
176        match self {
177            StorageProvider::Google(store) => {
178                ctx.register_object_store(&base_url, store.clone());
179            }
180            StorageProvider::Aws(store) => {
181                ctx.register_object_store(&base_url, store.clone());
182            }
183            StorageProvider::Local(store) => {
184                ctx.register_object_store(&base_url, store.clone());
185            }
186            StorageProvider::Azure(store) => {
187                ctx.register_object_store(&base_url, store.clone());
188            }
189        }
190
191        Ok(ctx)
192    }
193
194    /// List files in the object store
195    ///
196    /// # Arguments
197    /// * `path` - The path to list files from. If None, lists all files in the root.
198    ///
199    /// # Returns
200    /// * `Result<Vec<String>, StorageError>` - A result containing a vector of file paths or an error.
201    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
202        let stream = match self {
203            StorageProvider::Local(store) => store.list(path),
204            StorageProvider::Google(store) => store.list(path),
205            StorageProvider::Aws(store) => store.list(path),
206            StorageProvider::Azure(store) => store.list(path),
207        };
208
209        // Process each item in the stream
210        stream
211            .try_fold(Vec::new(), |mut files, meta| async move {
212                files.push(meta.location.to_string());
213                Ok(files)
214            })
215            .await
216            .map_err(Into::into)
217    }
218
219    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
220        match self {
221            StorageProvider::Local(store) => {
222                store.delete(path).await?;
223                Ok(())
224            }
225            StorageProvider::Google(store) => {
226                store.delete(path).await?;
227                Ok(())
228            }
229            StorageProvider::Aws(store) => {
230                store.delete(path).await?;
231                Ok(())
232            }
233            StorageProvider::Azure(store) => {
234                store.delete(path).await?;
235                Ok(())
236            }
237        }
238    }
239}
240
241#[derive(Debug, Clone)]
242pub struct ObjectStore {
243    provider: StorageProvider,
244    pub storage_settings: ObjectStorageSettings,
245}
246
247impl ObjectStore {
248    /// Creates a new ObjectStore instance.
249    ///
250    /// # Arguments
251    /// * `storage_settings` - The settings for the object storage.
252    ///
253    /// # Returns
254    /// * `Result<ObjectStore, StorageError>` - A result containing the ObjectStore instance or an error.
255    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
256        let store = StorageProvider::new(storage_settings)?;
257        Ok(ObjectStore {
258            provider: store,
259            storage_settings: storage_settings.clone(),
260        })
261    }
262
263    pub fn get_session(&self) -> Result<SessionContext, StorageError> {
264        let ctx = self.provider.get_session(&self.storage_settings)?;
265        Ok(ctx)
266    }
267
268    /// Return the inner object store as a type-erased `Arc<dyn ObjectStore>`.
269    ///
270    /// Pass this to `DeltaTableBuilder::with_storage_backend` to bypass the Delta Lake
271    /// storage factory (required for GCS, S3, and Azure).
272    pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
273        self.provider.as_dyn_object_store()
274    }
275
276    /// Get the base URL for datafusion to use
277    pub fn get_base_url(&self) -> Result<Url, StorageError> {
278        self.provider.get_base_url(&self.storage_settings)
279    }
280
281    /// List files in the object store
282    ///
283    /// When path is None, lists from the root.
284    /// When path is provided, lists from that path.
285    ///
286    /// Note: The path parameter should NOT include the storage root - it's a relative path
287    /// that will be automatically combined with the storage root.
288    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
289        self.provider.list(path).await
290    }
291
292    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
293        self.provider.delete(path).await
294    }
295}