Skip to main content

scouter_dataframe/
storage.rs

1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::SessionContext;
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ObjectStore as ObjStore;
11use scouter_settings::ObjectStorageSettings;
12use scouter_types::StorageType;
13use std::sync::Arc;
14use tracing::debug;
15use url::Url;
16
17/// Helper function to decode base64 encoded string
18fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
19    let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
20
21    Ok(String::from_utf8(decoded)?)
22}
23
24/// Storage provider enum for common object stores
25#[derive(Debug, Clone)]
26enum StorageProvider {
27    Google(Arc<GoogleCloudStorage>),
28    Aws(Arc<AmazonS3>),
29    Local(Arc<LocalFileSystem>),
30    Azure(Arc<MicrosoftAzure>),
31}
32
33impl StorageProvider {
34    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
35        let store = match storage_settings.storage_type {
36            StorageType::Google => {
37                let mut builder = GoogleCloudStorageBuilder::from_env();
38
39                // Try to use base64 credentials if available
40                if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
41                    let key = decode_base64_str(&base64_creds)?;
42                    builder = builder.with_service_account_key(&key);
43                    debug!("Using base64 encoded service account key for Google Cloud Storage");
44                }
45
46                // Add bucket name and build
47                let storage = builder
48                    .with_bucket_name(storage_settings.storage_root())
49                    .build()?;
50
51                StorageProvider::Google(Arc::new(storage))
52            }
53            StorageType::Aws => {
54                let builder = AmazonS3Builder::from_env()
55                    .with_bucket_name(storage_settings.storage_root())
56                    .with_region(storage_settings.region.clone())
57                    .build()?;
58                StorageProvider::Aws(Arc::new(builder))
59            }
60            StorageType::Local => {
61                // Create LocalFileSystem with the root path as the prefix
62                let builder = LocalFileSystem::new();
63                StorageProvider::Local(Arc::new(builder))
64            }
65            StorageType::Azure => {
66                let builder = MicrosoftAzureBuilder::from_env()
67                    .with_container_name(storage_settings.storage_root())
68                    .build()?;
69
70                StorageProvider::Azure(Arc::new(builder))
71            }
72        };
73
74        Ok(store)
75    }
76
77    pub fn get_base_url(
78        &self,
79        storage_settings: &ObjectStorageSettings,
80    ) -> Result<Url, StorageError> {
81        match self {
82            StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
83            StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
84            StorageProvider::Local(_) => Ok(Url::parse("file:///")?),
85            StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
86        }
87    }
88
89    pub fn get_session(
90        &self,
91        storage_settings: &ObjectStorageSettings,
92    ) -> Result<SessionContext, StorageError> {
93        let ctx = SessionContext::new();
94        let base_url = self.get_base_url(storage_settings)?;
95
96        match self {
97            StorageProvider::Google(store) => {
98                ctx.register_object_store(&base_url, store.clone());
99            }
100            StorageProvider::Aws(store) => {
101                ctx.register_object_store(&base_url, store.clone());
102            }
103            StorageProvider::Local(store) => {
104                ctx.register_object_store(&base_url, store.clone());
105            }
106            StorageProvider::Azure(store) => {
107                ctx.register_object_store(&base_url, store.clone());
108            }
109        }
110
111        Ok(ctx)
112    }
113
114    /// List files in the object store
115    ///
116    /// # Arguments
117    /// * `path` - The path to list files from. If None, lists all files in the root.
118    ///
119    /// # Returns
120    /// * `Result<Vec<String>, StorageError>` - A result containing a vector of file paths or an error.
121    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
122        let stream = match self {
123            StorageProvider::Local(store) => store.list(path),
124            StorageProvider::Google(store) => store.list(path),
125            StorageProvider::Aws(store) => store.list(path),
126            StorageProvider::Azure(store) => store.list(path),
127        };
128
129        // Process each item in the stream
130        stream
131            .try_fold(Vec::new(), |mut files, meta| async move {
132                files.push(meta.location.to_string());
133                Ok(files)
134            })
135            .await
136            .map_err(Into::into)
137    }
138
139    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
140        match self {
141            StorageProvider::Local(store) => {
142                store.delete(path).await?;
143                Ok(())
144            }
145            StorageProvider::Google(store) => {
146                store.delete(path).await?;
147                Ok(())
148            }
149            StorageProvider::Aws(store) => {
150                store.delete(path).await?;
151                Ok(())
152            }
153            StorageProvider::Azure(store) => {
154                store.delete(path).await?;
155                Ok(())
156            }
157        }
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct ObjectStore {
163    provider: StorageProvider,
164    pub storage_settings: ObjectStorageSettings,
165}
166
167impl ObjectStore {
168    /// Creates a new ObjectStore instance.
169    ///
170    /// # Arguments
171    /// * `storage_settings` - The settings for the object storage.
172    ///
173    /// # Returns
174    /// * `Result<ObjectStore, StorageError>` - A result containing the ObjectStore instance or an error.
175    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
176        let store = StorageProvider::new(storage_settings)?;
177        Ok(ObjectStore {
178            provider: store,
179            storage_settings: storage_settings.clone(),
180        })
181    }
182
183    pub fn get_session(&self) -> Result<SessionContext, StorageError> {
184        let ctx = self.provider.get_session(&self.storage_settings)?;
185        Ok(ctx)
186    }
187
188    /// Get the base URL for datafusion to use
189    pub fn get_base_url(&self) -> Result<Url, StorageError> {
190        self.provider.get_base_url(&self.storage_settings)
191    }
192
193    /// List files in the object store
194    ///
195    /// When path is None, lists from the root.
196    /// When path is provided, lists from that path.
197    ///
198    /// Note: The path parameter should NOT include the storage root - it's a relative path
199    /// that will be automatically combined with the storage root.
200    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
201        self.provider.list(path).await
202    }
203
204    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
205        self.provider.delete(path).await
206    }
207}