Skip to main content

scouter_dataframe/
storage.rs

1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::SessionContext;
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ObjectStore as ObjStore;
11use scouter_settings::ObjectStorageSettings;
12use scouter_types::StorageType;
13use std::sync::Arc;
14use tracing::debug;
15use url::Url;
16
17/// Helper function to decode base64 encoded string
18fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
19    let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
20
21    Ok(String::from_utf8(decoded)?)
22}
23
24/// Storage provider enum for common object stores
25#[derive(Debug, Clone)]
26enum StorageProvider {
27    Google(Arc<GoogleCloudStorage>),
28    Aws(Arc<AmazonS3>),
29    Local(Arc<LocalFileSystem>),
30    Azure(Arc<MicrosoftAzure>),
31}
32
33impl StorageProvider {
34    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
35        let store = match storage_settings.storage_type {
36            StorageType::Google => {
37                let mut builder = GoogleCloudStorageBuilder::from_env();
38
39                // Try to use base64 credentials if available
40                if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
41                    let key = decode_base64_str(&base64_creds)?;
42                    builder = builder.with_service_account_key(&key);
43                    debug!("Using base64 encoded service account key for Google Cloud Storage");
44                }
45
46                // Add bucket name and build
47                let storage = builder
48                    .with_bucket_name(storage_settings.storage_root())
49                    .build()?;
50
51                StorageProvider::Google(Arc::new(storage))
52            }
53            StorageType::Aws => {
54                let builder = AmazonS3Builder::from_env()
55                    .with_bucket_name(storage_settings.storage_root())
56                    .with_region(storage_settings.region.clone())
57                    .build()?;
58                StorageProvider::Aws(Arc::new(builder))
59            }
60            StorageType::Local => {
61                // Create LocalFileSystem with the root path as the prefix
62                let builder = LocalFileSystem::new();
63                StorageProvider::Local(Arc::new(builder))
64            }
65            StorageType::Azure => {
66                let builder = MicrosoftAzureBuilder::from_env()
67                    .with_container_name(storage_settings.storage_root())
68                    .build()?;
69
70                StorageProvider::Azure(Arc::new(builder))
71            }
72        };
73
74        Ok(store)
75    }
76
77    pub fn get_base_url(
78        &self,
79        storage_settings: &ObjectStorageSettings,
80    ) -> Result<Url, StorageError> {
81        match self {
82            StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
83            StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
84            StorageProvider::Local(_) => {
85                // Convert relative path to absolute path for local filesystem
86                let storage_path = std::path::PathBuf::from(storage_settings.storage_root());
87                let absolute_path = if storage_path.is_absolute() {
88                    storage_path
89                } else {
90                    std::env::current_dir()?.join(storage_path)
91                };
92
93                // Create file:// URL with absolute path
94                let url = Url::from_file_path(&absolute_path).map_err(|_| {
95                    StorageError::InvalidUrl(format!(
96                        "Failed to create file URL from path: {:?}",
97                        absolute_path
98                    ))
99                })?;
100                Ok(url)
101            }
102            StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
103        }
104    }
105
106    pub fn get_session(
107        &self,
108        storage_settings: &ObjectStorageSettings,
109    ) -> Result<SessionContext, StorageError> {
110        let ctx = SessionContext::new();
111        let base_url = self.get_base_url(storage_settings)?;
112
113        match self {
114            StorageProvider::Google(store) => {
115                ctx.register_object_store(&base_url, store.clone());
116            }
117            StorageProvider::Aws(store) => {
118                ctx.register_object_store(&base_url, store.clone());
119            }
120            StorageProvider::Local(store) => {
121                ctx.register_object_store(&base_url, store.clone());
122            }
123            StorageProvider::Azure(store) => {
124                ctx.register_object_store(&base_url, store.clone());
125            }
126        }
127
128        Ok(ctx)
129    }
130
131    /// List files in the object store
132    ///
133    /// # Arguments
134    /// * `path` - The path to list files from. If None, lists all files in the root.
135    ///
136    /// # Returns
137    /// * `Result<Vec<String>, StorageError>` - A result containing a vector of file paths or an error.
138    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
139        let stream = match self {
140            StorageProvider::Local(store) => store.list(path),
141            StorageProvider::Google(store) => store.list(path),
142            StorageProvider::Aws(store) => store.list(path),
143            StorageProvider::Azure(store) => store.list(path),
144        };
145
146        // Process each item in the stream
147        stream
148            .try_fold(Vec::new(), |mut files, meta| async move {
149                files.push(meta.location.to_string());
150                Ok(files)
151            })
152            .await
153            .map_err(Into::into)
154    }
155
156    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
157        match self {
158            StorageProvider::Local(store) => {
159                store.delete(path).await?;
160                Ok(())
161            }
162            StorageProvider::Google(store) => {
163                store.delete(path).await?;
164                Ok(())
165            }
166            StorageProvider::Aws(store) => {
167                store.delete(path).await?;
168                Ok(())
169            }
170            StorageProvider::Azure(store) => {
171                store.delete(path).await?;
172                Ok(())
173            }
174        }
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct ObjectStore {
180    provider: StorageProvider,
181    pub storage_settings: ObjectStorageSettings,
182}
183
184impl ObjectStore {
185    /// Creates a new ObjectStore instance.
186    ///
187    /// # Arguments
188    /// * `storage_settings` - The settings for the object storage.
189    ///
190    /// # Returns
191    /// * `Result<ObjectStore, StorageError>` - A result containing the ObjectStore instance or an error.
192    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
193        let store = StorageProvider::new(storage_settings)?;
194        Ok(ObjectStore {
195            provider: store,
196            storage_settings: storage_settings.clone(),
197        })
198    }
199
200    pub fn get_session(&self) -> Result<SessionContext, StorageError> {
201        let ctx = self.provider.get_session(&self.storage_settings)?;
202        Ok(ctx)
203    }
204
205    /// Get the base URL for datafusion to use
206    pub fn get_base_url(&self) -> Result<Url, StorageError> {
207        self.provider.get_base_url(&self.storage_settings)
208    }
209
210    /// List files in the object store
211    ///
212    /// When path is None, lists from the root.
213    /// When path is provided, lists from that path.
214    ///
215    /// Note: The path parameter should NOT include the storage root - it's a relative path
216    /// that will be automatically combined with the storage root.
217    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
218        self.provider.list(path).await
219    }
220
221    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
222        self.provider.delete(path).await
223    }
224}