scouter_dataframe/
storage.rs1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::SessionContext;
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ObjectStore as ObjStore;
11use scouter_settings::ObjectStorageSettings;
12use scouter_types::StorageType;
13use std::sync::Arc;
14use tracing::debug;
15use url::Url;
16
17fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
19 let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
20
21 Ok(String::from_utf8(decoded)?)
22}
23
24#[derive(Debug, Clone)]
26enum StorageProvider {
27 Google(Arc<GoogleCloudStorage>),
28 Aws(Arc<AmazonS3>),
29 Local(Arc<LocalFileSystem>),
30 Azure(Arc<MicrosoftAzure>),
31}
32
33impl StorageProvider {
34 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
35 let store = match storage_settings.storage_type {
36 StorageType::Google => {
37 let mut builder = GoogleCloudStorageBuilder::from_env();
38
39 if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
41 let key = decode_base64_str(&base64_creds)?;
42 builder = builder.with_service_account_key(&key);
43 debug!("Using base64 encoded service account key for Google Cloud Storage");
44 }
45
46 let storage = builder
48 .with_bucket_name(storage_settings.storage_root())
49 .build()?;
50
51 StorageProvider::Google(Arc::new(storage))
52 }
53 StorageType::Aws => {
54 let builder = AmazonS3Builder::from_env()
55 .with_bucket_name(storage_settings.storage_root())
56 .with_region(storage_settings.region.clone())
57 .build()?;
58 StorageProvider::Aws(Arc::new(builder))
59 }
60 StorageType::Local => {
61 let builder = LocalFileSystem::new();
63 StorageProvider::Local(Arc::new(builder))
64 }
65 StorageType::Azure => {
66 let builder = MicrosoftAzureBuilder::from_env()
67 .with_container_name(storage_settings.storage_root())
68 .build()?;
69
70 StorageProvider::Azure(Arc::new(builder))
71 }
72 };
73
74 Ok(store)
75 }
76
77 pub fn get_base_url(
78 &self,
79 storage_settings: &ObjectStorageSettings,
80 ) -> Result<Url, StorageError> {
81 match self {
82 StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
83 StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
84 StorageProvider::Local(_) => Ok(Url::parse("file:///")?),
85 StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
86 }
87 }
88
89 pub fn get_session(
90 &self,
91 storage_settings: &ObjectStorageSettings,
92 ) -> Result<SessionContext, StorageError> {
93 let ctx = SessionContext::new();
94 let base_url = self.get_base_url(storage_settings)?;
95
96 match self {
97 StorageProvider::Google(store) => {
98 ctx.register_object_store(&base_url, store.clone());
99 }
100 StorageProvider::Aws(store) => {
101 ctx.register_object_store(&base_url, store.clone());
102 }
103 StorageProvider::Local(store) => {
104 ctx.register_object_store(&base_url, store.clone());
105 }
106 StorageProvider::Azure(store) => {
107 ctx.register_object_store(&base_url, store.clone());
108 }
109 }
110
111 Ok(ctx)
112 }
113
114 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
122 let stream = match self {
124 StorageProvider::Local(store) => store.list(path),
125 StorageProvider::Google(store) => store.list(path),
126 StorageProvider::Aws(store) => store.list(path),
127 StorageProvider::Azure(store) => store.list(path),
128 };
129
130 stream
132 .try_fold(Vec::new(), |mut files, meta| async move {
133 files.push(meta.location.to_string());
134 Ok(files)
135 })
136 .await
137 .map_err(Into::into)
138 }
139
140 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
141 match self {
142 StorageProvider::Local(store) => {
143 store.delete(path).await?;
144 Ok(())
145 }
146 StorageProvider::Google(store) => {
147 store.delete(path).await?;
148 Ok(())
149 }
150 StorageProvider::Aws(store) => {
151 store.delete(path).await?;
152 Ok(())
153 }
154 StorageProvider::Azure(store) => {
155 store.delete(path).await?;
156 Ok(())
157 }
158 }
159 }
160}
161
162#[derive(Debug, Clone)]
163pub struct ObjectStore {
164 provider: StorageProvider,
165 pub storage_settings: ObjectStorageSettings,
166}
167
168impl ObjectStore {
169 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
177 let store = StorageProvider::new(storage_settings)?;
178 Ok(ObjectStore {
179 provider: store,
180 storage_settings: storage_settings.clone(),
181 })
182 }
183
184 pub fn get_session(&self) -> Result<SessionContext, StorageError> {
185 let ctx = self.provider.get_session(&self.storage_settings)?;
186 Ok(ctx)
187 }
188
189 pub fn get_base_url(&self) -> Result<Url, StorageError> {
191 self.provider.get_base_url(&self.storage_settings)
192 }
193
194 pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
202 self.provider.list(path).await
203 }
204
205 pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
206 self.provider.delete(path).await
207 }
208}