scouter_dataframe/
storage.rs1use crate::caching_store::CachingStore;
2use crate::error::StorageError;
3use base64::prelude::*;
4use datafusion::prelude::{SessionConfig, SessionContext};
5use futures::TryStreamExt;
6use object_store::aws::{AmazonS3, AmazonS3Builder};
7use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
8use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
9use object_store::local::LocalFileSystem;
10use object_store::path::Path;
11use object_store::ClientOptions;
12use object_store::ObjectStore as ObjStore;
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::StorageType;
15use std::sync::Arc;
16use tracing::debug;
17use url::Url;
18
19fn cloud_client_options() -> ClientOptions {
25 ClientOptions::new()
26 .with_pool_idle_timeout(std::time::Duration::from_secs(120))
27 .with_pool_max_idle_per_host(64)
28 .with_timeout(std::time::Duration::from_secs(30))
29 .with_connect_timeout(std::time::Duration::from_secs(5))
30}
31
32fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
34 let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
35
36 Ok(String::from_utf8(decoded)?)
37}
38
39#[derive(Debug, Clone)]
41enum StorageProvider {
42 Google(Arc<CachingStore<GoogleCloudStorage>>),
43 Aws(Arc<CachingStore<AmazonS3>>),
44 Local(Arc<CachingStore<LocalFileSystem>>),
45 Azure(Arc<CachingStore<MicrosoftAzure>>),
46}
47
48impl StorageProvider {
49 pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
55 match self {
56 StorageProvider::Google(s) => s.clone() as Arc<dyn ObjStore>,
57 StorageProvider::Aws(s) => s.clone() as Arc<dyn ObjStore>,
58 StorageProvider::Local(s) => s.clone() as Arc<dyn ObjStore>,
59 StorageProvider::Azure(s) => s.clone() as Arc<dyn ObjStore>,
60 }
61 }
62
63 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
64 let cache_bytes = storage_settings.object_cache_mb() * 1024 * 1024;
65
66 let store = match storage_settings.storage_type {
67 StorageType::Google => {
68 let mut builder = GoogleCloudStorageBuilder::from_env();
69
70 if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
72 let key = decode_base64_str(&base64_creds)?;
73 builder = builder.with_service_account_key(&key);
74 debug!("Using base64 encoded service account key for Google Cloud Storage");
75 }
76
77 let storage = builder
79 .with_bucket_name(storage_settings.storage_root())
80 .with_client_options(cloud_client_options())
81 .build()?;
82
83 StorageProvider::Google(Arc::new(CachingStore::new(storage, cache_bytes)))
84 }
85 StorageType::Aws => {
86 let storage = AmazonS3Builder::from_env()
87 .with_bucket_name(storage_settings.storage_root())
88 .with_region(storage_settings.region.clone())
89 .with_client_options(cloud_client_options())
90 .build()?;
91 StorageProvider::Aws(Arc::new(CachingStore::new(storage, cache_bytes)))
92 }
93 StorageType::Local => {
94 let storage = LocalFileSystem::new();
95 StorageProvider::Local(Arc::new(CachingStore::new(storage, cache_bytes)))
96 }
97 StorageType::Azure => {
98 let mut builder = MicrosoftAzureBuilder::from_env();
104
105 if std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
106 if let Ok(account) = std::env::var("AZURE_STORAGE_ACCOUNT") {
107 builder = builder.with_account(account);
108 }
109 }
110 if std::env::var("AZURE_STORAGE_ACCOUNT_KEY").is_err() {
111 if let Ok(key) = std::env::var("AZURE_STORAGE_KEY") {
112 builder = builder.with_access_key(key);
113 }
114 }
115
116 let storage = builder
117 .with_container_name(storage_settings.storage_root())
118 .with_client_options(cloud_client_options())
119 .build()?;
120
121 StorageProvider::Azure(Arc::new(CachingStore::new(storage, cache_bytes)))
122 }
123 };
124
125 Ok(store)
126 }
127
128 pub fn get_base_url(
129 &self,
130 storage_settings: &ObjectStorageSettings,
131 ) -> Result<Url, StorageError> {
132 match self {
133 StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
134 StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
135 StorageProvider::Local(_) => {
136 let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
138 let absolute_path = if storage_path.is_absolute() {
139 storage_path
140 } else {
141 std::env::current_dir()?.join(storage_path)
142 };
143
144 let url = Url::from_file_path(&absolute_path).map_err(|_| {
146 StorageError::InvalidUrl(format!(
147 "Failed to create file URL from path: {:?}",
148 absolute_path
149 ))
150 })?;
151 Ok(url)
152 }
153 StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
154 }
155 }
156
157 fn build_session_config() -> SessionConfig {
159 let mut config = SessionConfig::new()
160 .with_target_partitions(
161 std::thread::available_parallelism()
162 .map(|n| n.get())
163 .unwrap_or(4),
164 )
165 .with_batch_size(8192)
166 .with_prefer_existing_sort(true)
167 .with_parquet_pruning(true)
168 .with_collect_statistics(true);
169
170 config.options_mut().execution.parquet.pushdown_filters = true;
174 config.options_mut().execution.parquet.reorder_filters = true;
175
176 config.options_mut().execution.parquet.metadata_size_hint = Some(1024 * 1024);
185
186 config.options_mut().execution.parquet.bloom_filter_on_read = true;
190
191 config
195 .options_mut()
196 .execution
197 .parquet
198 .schema_force_view_types = true;
199
200 config.options_mut().execution.meta_fetch_concurrency = 64;
207
208 config
213 .options_mut()
214 .execution
215 .parquet
216 .maximum_parallel_row_group_writers = 4;
217
218 config
221 .options_mut()
222 .execution
223 .parquet
224 .maximum_buffered_record_batches_per_stream = 8;
225
226 config
227 }
228
229 fn build_ctx(
231 &self,
232 storage_settings: &ObjectStorageSettings,
233 config: SessionConfig,
234 ) -> Result<SessionContext, StorageError> {
235 let ctx = SessionContext::new_with_config(config);
236 let base_url = self.get_base_url(storage_settings)?;
237
238 match self {
239 StorageProvider::Google(store) => {
240 ctx.register_object_store(&base_url, store.clone());
241 }
242 StorageProvider::Aws(store) => {
243 ctx.register_object_store(&base_url, store.clone());
244 }
245 StorageProvider::Local(store) => {
246 ctx.register_object_store(&base_url, store.clone());
247 }
248 StorageProvider::Azure(store) => {
249 ctx.register_object_store(&base_url, store.clone());
250 }
251 }
252
253 Ok(ctx)
254 }
255
256 pub fn get_session(
257 &self,
258 storage_settings: &ObjectStorageSettings,
259 ) -> Result<SessionContext, StorageError> {
260 let config = Self::build_session_config();
261 self.build_ctx(storage_settings, config)
262 }
263
264 pub fn get_session_with_catalog(
268 &self,
269 storage_settings: &ObjectStorageSettings,
270 catalog_name: &str,
271 schema_name: &str,
272 ) -> Result<SessionContext, StorageError> {
273 let config =
274 Self::build_session_config().with_default_catalog_and_schema(catalog_name, schema_name);
275 self.build_ctx(storage_settings, config)
276 }
277
278 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
286 let stream = match self {
287 StorageProvider::Local(store) => store.list(path),
288 StorageProvider::Google(store) => store.list(path),
289 StorageProvider::Aws(store) => store.list(path),
290 StorageProvider::Azure(store) => store.list(path),
291 };
292
293 stream
295 .try_fold(Vec::new(), |mut files, meta| async move {
296 files.push(meta.location.to_string());
297 Ok(files)
298 })
299 .await
300 .map_err(Into::into)
301 }
302
303 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
304 match self {
305 StorageProvider::Local(store) => {
306 store.delete(path).await?;
307 Ok(())
308 }
309 StorageProvider::Google(store) => {
310 store.delete(path).await?;
311 Ok(())
312 }
313 StorageProvider::Aws(store) => {
314 store.delete(path).await?;
315 Ok(())
316 }
317 StorageProvider::Azure(store) => {
318 store.delete(path).await?;
319 Ok(())
320 }
321 }
322 }
323}
324
325#[derive(Debug, Clone)]
326pub struct ObjectStore {
327 provider: StorageProvider,
328 pub storage_settings: ObjectStorageSettings,
329}
330
331impl ObjectStore {
332 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
340 let store = StorageProvider::new(storage_settings)?;
341 Ok(ObjectStore {
342 provider: store,
343 storage_settings: storage_settings.clone(),
344 })
345 }
346
347 pub fn get_session(&self) -> Result<SessionContext, StorageError> {
348 let ctx = self.provider.get_session(&self.storage_settings)?;
349 Ok(ctx)
350 }
351
352 pub fn get_session_with_catalog(
355 &self,
356 catalog_name: &str,
357 schema_name: &str,
358 ) -> Result<SessionContext, StorageError> {
359 let ctx = self.provider.get_session_with_catalog(
360 &self.storage_settings,
361 catalog_name,
362 schema_name,
363 )?;
364 Ok(ctx)
365 }
366
367 pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
372 self.provider.as_dyn_object_store()
373 }
374
375 pub fn get_base_url(&self) -> Result<Url, StorageError> {
377 self.provider.get_base_url(&self.storage_settings)
378 }
379
380 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
388 self.provider.list(path).await
389 }
390
391 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
392 self.provider.delete(path).await
393 }
394}