scouter_dataframe/
storage.rs1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::{SessionConfig, SessionContext};
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ClientOptions;
11use object_store::ObjectStore as ObjStore;
12use scouter_settings::ObjectStorageSettings;
13use scouter_types::StorageType;
14use std::sync::Arc;
15use tracing::debug;
16use url::Url;
17
18fn cloud_client_options() -> ClientOptions {
23 ClientOptions::new()
24 .with_pool_idle_timeout(std::time::Duration::from_secs(90))
25 .with_pool_max_idle_per_host(16)
26}
27
28fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
30 let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
31
32 Ok(String::from_utf8(decoded)?)
33}
34
35#[derive(Debug, Clone)]
37enum StorageProvider {
38 Google(Arc<GoogleCloudStorage>),
39 Aws(Arc<AmazonS3>),
40 Local(Arc<LocalFileSystem>),
41 Azure(Arc<MicrosoftAzure>),
42}
43
44impl StorageProvider {
45 pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
51 match self {
52 StorageProvider::Google(s) => s.clone() as Arc<dyn ObjStore>,
53 StorageProvider::Aws(s) => s.clone() as Arc<dyn ObjStore>,
54 StorageProvider::Local(s) => s.clone() as Arc<dyn ObjStore>,
55 StorageProvider::Azure(s) => s.clone() as Arc<dyn ObjStore>,
56 }
57 }
58
59 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
60 let store = match storage_settings.storage_type {
61 StorageType::Google => {
62 let mut builder = GoogleCloudStorageBuilder::from_env();
63
64 if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
66 let key = decode_base64_str(&base64_creds)?;
67 builder = builder.with_service_account_key(&key);
68 debug!("Using base64 encoded service account key for Google Cloud Storage");
69 }
70
71 let storage = builder
73 .with_bucket_name(storage_settings.storage_root())
74 .with_client_options(cloud_client_options())
75 .build()?;
76
77 StorageProvider::Google(Arc::new(storage))
78 }
79 StorageType::Aws => {
80 let builder = AmazonS3Builder::from_env()
81 .with_bucket_name(storage_settings.storage_root())
82 .with_region(storage_settings.region.clone())
83 .with_client_options(cloud_client_options())
84 .build()?;
85 StorageProvider::Aws(Arc::new(builder))
86 }
87 StorageType::Local => {
88 let builder = LocalFileSystem::new();
90 StorageProvider::Local(Arc::new(builder))
91 }
92 StorageType::Azure => {
93 let mut builder = MicrosoftAzureBuilder::from_env();
99
100 if std::env::var("AZURE_STORAGE_ACCOUNT_NAME").is_err() {
101 if let Ok(account) = std::env::var("AZURE_STORAGE_ACCOUNT") {
102 builder = builder.with_account(account);
103 }
104 }
105 if std::env::var("AZURE_STORAGE_ACCOUNT_KEY").is_err() {
106 if let Ok(key) = std::env::var("AZURE_STORAGE_KEY") {
107 builder = builder.with_access_key(key);
108 }
109 }
110
111 let store = builder
112 .with_container_name(storage_settings.storage_root())
113 .with_client_options(cloud_client_options())
114 .build()?;
115
116 StorageProvider::Azure(Arc::new(store))
117 }
118 };
119
120 Ok(store)
121 }
122
123 pub fn get_base_url(
124 &self,
125 storage_settings: &ObjectStorageSettings,
126 ) -> Result<Url, StorageError> {
127 match self {
128 StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
129 StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
130 StorageProvider::Local(_) => {
131 let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
133 let absolute_path = if storage_path.is_absolute() {
134 storage_path
135 } else {
136 std::env::current_dir()?.join(storage_path)
137 };
138
139 let url = Url::from_file_path(&absolute_path).map_err(|_| {
141 StorageError::InvalidUrl(format!(
142 "Failed to create file URL from path: {:?}",
143 absolute_path
144 ))
145 })?;
146 Ok(url)
147 }
148 StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
149 }
150 }
151
152 pub fn get_session(
153 &self,
154 storage_settings: &ObjectStorageSettings,
155 ) -> Result<SessionContext, StorageError> {
156 let mut config = SessionConfig::new()
157 .with_target_partitions(
158 std::thread::available_parallelism()
159 .map(|n| n.get())
160 .unwrap_or(4),
161 )
162 .with_batch_size(8192)
163 .with_prefer_existing_sort(true)
164 .with_parquet_pruning(true)
165 .with_collect_statistics(true);
166
167 config.options_mut().execution.parquet.pushdown_filters = true;
171 config.options_mut().execution.parquet.reorder_filters = true;
172
173 let ctx = SessionContext::new_with_config(config);
174 let base_url = self.get_base_url(storage_settings)?;
175
176 match self {
177 StorageProvider::Google(store) => {
178 ctx.register_object_store(&base_url, store.clone());
179 }
180 StorageProvider::Aws(store) => {
181 ctx.register_object_store(&base_url, store.clone());
182 }
183 StorageProvider::Local(store) => {
184 ctx.register_object_store(&base_url, store.clone());
185 }
186 StorageProvider::Azure(store) => {
187 ctx.register_object_store(&base_url, store.clone());
188 }
189 }
190
191 Ok(ctx)
192 }
193
194 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
202 let stream = match self {
203 StorageProvider::Local(store) => store.list(path),
204 StorageProvider::Google(store) => store.list(path),
205 StorageProvider::Aws(store) => store.list(path),
206 StorageProvider::Azure(store) => store.list(path),
207 };
208
209 stream
211 .try_fold(Vec::new(), |mut files, meta| async move {
212 files.push(meta.location.to_string());
213 Ok(files)
214 })
215 .await
216 .map_err(Into::into)
217 }
218
219 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
220 match self {
221 StorageProvider::Local(store) => {
222 store.delete(path).await?;
223 Ok(())
224 }
225 StorageProvider::Google(store) => {
226 store.delete(path).await?;
227 Ok(())
228 }
229 StorageProvider::Aws(store) => {
230 store.delete(path).await?;
231 Ok(())
232 }
233 StorageProvider::Azure(store) => {
234 store.delete(path).await?;
235 Ok(())
236 }
237 }
238 }
239}
240
241#[derive(Debug, Clone)]
242pub struct ObjectStore {
243 provider: StorageProvider,
244 pub storage_settings: ObjectStorageSettings,
245}
246
247impl ObjectStore {
248 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
256 let store = StorageProvider::new(storage_settings)?;
257 Ok(ObjectStore {
258 provider: store,
259 storage_settings: storage_settings.clone(),
260 })
261 }
262
263 pub fn get_session(&self) -> Result<SessionContext, StorageError> {
264 let ctx = self.provider.get_session(&self.storage_settings)?;
265 Ok(ctx)
266 }
267
268 pub fn as_dyn_object_store(&self) -> Arc<dyn ObjStore> {
273 self.provider.as_dyn_object_store()
274 }
275
276 pub fn get_base_url(&self) -> Result<Url, StorageError> {
278 self.provider.get_base_url(&self.storage_settings)
279 }
280
281 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
289 self.provider.list(path).await
290 }
291
292 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
293 self.provider.delete(path).await
294 }
295}