scouter_dataframe/
storage.rs1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::SessionContext;
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ObjectStore as ObjStore;
11use scouter_settings::ObjectStorageSettings;
12use scouter_types::StorageType;
13use std::sync::Arc;
14use tracing::debug;
15use url::Url;
16
17fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
19 let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
20
21 Ok(String::from_utf8(decoded)?)
22}
23
24#[derive(Debug, Clone)]
26enum StorageProvider {
27 Google(Arc<GoogleCloudStorage>),
28 Aws(Arc<AmazonS3>),
29 Local(Arc<LocalFileSystem>),
30 Azure(Arc<MicrosoftAzure>),
31}
32
33impl StorageProvider {
34 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
35 let store = match storage_settings.storage_type {
36 StorageType::Google => {
37 let mut builder = GoogleCloudStorageBuilder::from_env();
38
39 if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
41 let key = decode_base64_str(&base64_creds)?;
42 builder = builder.with_service_account_key(&key);
43 debug!("Using base64 encoded service account key for Google Cloud Storage");
44 }
45
46 let storage = builder
48 .with_bucket_name(storage_settings.storage_root())
49 .build()?;
50
51 StorageProvider::Google(Arc::new(storage))
52 }
53 StorageType::Aws => {
54 let builder = AmazonS3Builder::from_env()
55 .with_bucket_name(storage_settings.storage_root())
56 .with_region(storage_settings.region.clone())
57 .build()?;
58 StorageProvider::Aws(Arc::new(builder))
59 }
60 StorageType::Local => {
61 let builder = LocalFileSystem::new();
63 StorageProvider::Local(Arc::new(builder))
64 }
65 StorageType::Azure => {
66 let builder = MicrosoftAzureBuilder::from_env()
67 .with_container_name(storage_settings.storage_root())
68 .build()?;
69
70 StorageProvider::Azure(Arc::new(builder))
71 }
72 };
73
74 Ok(store)
75 }
76
77 pub fn get_base_url(
78 &self,
79 storage_settings: &ObjectStorageSettings,
80 ) -> Result<Url, StorageError> {
81 match self {
82 StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
83 StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
84 StorageProvider::Local(_) => {
85 let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
87 let absolute_path = if storage_path.is_absolute() {
88 storage_path
89 } else {
90 std::env::current_dir()?.join(storage_path)
91 };
92
93 let url = Url::from_file_path(&absolute_path).map_err(|_| {
95 StorageError::InvalidUrl(format!(
96 "Failed to create file URL from path: {:?}",
97 absolute_path
98 ))
99 })?;
100 Ok(url)
101 }
102 StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
103 }
104 }
105
106 pub fn get_session(
107 &self,
108 storage_settings: &ObjectStorageSettings,
109 ) -> Result<SessionContext, StorageError> {
110 let ctx = SessionContext::new();
111 let base_url = self.get_base_url(storage_settings)?;
112
113 match self {
114 StorageProvider::Google(store) => {
115 ctx.register_object_store(&base_url, store.clone());
116 }
117 StorageProvider::Aws(store) => {
118 ctx.register_object_store(&base_url, store.clone());
119 }
120 StorageProvider::Local(store) => {
121 ctx.register_object_store(&base_url, store.clone());
122 }
123 StorageProvider::Azure(store) => {
124 ctx.register_object_store(&base_url, store.clone());
125 }
126 }
127
128 Ok(ctx)
129 }
130
131 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
139 let stream = match self {
140 StorageProvider::Local(store) => store.list(path),
141 StorageProvider::Google(store) => store.list(path),
142 StorageProvider::Aws(store) => store.list(path),
143 StorageProvider::Azure(store) => store.list(path),
144 };
145
146 stream
148 .try_fold(Vec::new(), |mut files, meta| async move {
149 files.push(meta.location.to_string());
150 Ok(files)
151 })
152 .await
153 .map_err(Into::into)
154 }
155
156 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
157 match self {
158 StorageProvider::Local(store) => {
159 store.delete(path).await?;
160 Ok(())
161 }
162 StorageProvider::Google(store) => {
163 store.delete(path).await?;
164 Ok(())
165 }
166 StorageProvider::Aws(store) => {
167 store.delete(path).await?;
168 Ok(())
169 }
170 StorageProvider::Azure(store) => {
171 store.delete(path).await?;
172 Ok(())
173 }
174 }
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct ObjectStore {
180 provider: StorageProvider,
181 pub storage_settings: ObjectStorageSettings,
182}
183
184impl ObjectStore {
185 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
193 let store = StorageProvider::new(storage_settings)?;
194 Ok(ObjectStore {
195 provider: store,
196 storage_settings: storage_settings.clone(),
197 })
198 }
199
200 pub fn get_session(&self) -> Result<SessionContext, StorageError> {
201 let ctx = self.provider.get_session(&self.storage_settings)?;
202 Ok(ctx)
203 }
204
205 pub fn get_base_url(&self) -> Result<Url, StorageError> {
207 self.provider.get_base_url(&self.storage_settings)
208 }
209
210 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
218 self.provider.list(path).await
219 }
220
221 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
222 self.provider.delete(path).await
223 }
224}