scouter_dataframe/
storage.rs

1use crate::error::StorageError;
2use base64::prelude::*;
3use datafusion::prelude::SessionContext;
4use futures::TryStreamExt;
5use object_store::aws::{AmazonS3, AmazonS3Builder};
6use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
7use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
8use object_store::local::LocalFileSystem;
9use object_store::path::Path;
10use object_store::ObjectStore as ObjStore;
11use scouter_settings::ObjectStorageSettings;
12use scouter_types::StorageType;
13use std::sync::Arc;
14use tracing::debug;
15use url::Url;
16
17/// Helper function to decode base64 encoded string
18fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
19    let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
20
21    Ok(String::from_utf8(decoded)?)
22}
23
24/// Storage provider enum for common object stores
25#[derive(Debug, Clone)]
26enum StorageProvider {
27    Google(Arc<GoogleCloudStorage>),
28    Aws(Arc<AmazonS3>),
29    Local(Arc<LocalFileSystem>),
30    Azure(Arc<MicrosoftAzure>),
31}
32
33impl StorageProvider {
34    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
35        let store = match storage_settings.storage_type {
36            StorageType::Google => {
37                let mut builder = GoogleCloudStorageBuilder::from_env();
38
39                // Try to use base64 credentials if available
40                if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
41                    let key = decode_base64_str(&base64_creds)?;
42                    builder = builder.with_service_account_key(&key);
43                    debug!("Using base64 encoded service account key for Google Cloud Storage");
44                }
45
46                // Add bucket name and build
47                let storage = builder
48                    .with_bucket_name(storage_settings.storage_root())
49                    .build()?;
50
51                StorageProvider::Google(Arc::new(storage))
52            }
53            StorageType::Aws => {
54                let builder = AmazonS3Builder::from_env()
55                    .with_bucket_name(storage_settings.storage_root())
56                    .with_region(storage_settings.region.clone())
57                    .build()?;
58                StorageProvider::Aws(Arc::new(builder))
59            }
60            StorageType::Local => {
61                // Create LocalFileSystem with the root path as the prefix
62                let builder = LocalFileSystem::new();
63                StorageProvider::Local(Arc::new(builder))
64            }
65            StorageType::Azure => {
66                let builder = MicrosoftAzureBuilder::from_env()
67                    .with_container_name(storage_settings.storage_root())
68                    .build()?;
69
70                StorageProvider::Azure(Arc::new(builder))
71            }
72        };
73
74        Ok(store)
75    }
76
77    pub fn get_base_url(
78        &self,
79        storage_settings: &ObjectStorageSettings,
80    ) -> Result<Url, StorageError> {
81        match self {
82            StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
83            StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
84            StorageProvider::Local(_) => Ok(Url::parse("file:///")?),
85            StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
86        }
87    }
88
89    pub fn get_session(
90        &self,
91        storage_settings: &ObjectStorageSettings,
92    ) -> Result<SessionContext, StorageError> {
93        let ctx = SessionContext::new();
94        let base_url = self.get_base_url(storage_settings)?;
95
96        match self {
97            StorageProvider::Google(store) => {
98                ctx.register_object_store(&base_url, store.clone());
99            }
100            StorageProvider::Aws(store) => {
101                ctx.register_object_store(&base_url, store.clone());
102            }
103            StorageProvider::Local(store) => {
104                ctx.register_object_store(&base_url, store.clone());
105            }
106            StorageProvider::Azure(store) => {
107                ctx.register_object_store(&base_url, store.clone());
108            }
109        }
110
111        Ok(ctx)
112    }
113
114    /// List files in the object store
115    ///
116    /// # Arguments
117    /// * `path` - The path to list files from. If None, lists all files in the root.
118    ///
119    /// # Returns
120    /// * `Result<Vec<String>, StorageError>` - A result containing a vector of file paths or an error.
121    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
122        // Get the stream based on the provided path
123        let stream = match self {
124            StorageProvider::Local(store) => store.list(path),
125            StorageProvider::Google(store) => store.list(path),
126            StorageProvider::Aws(store) => store.list(path),
127            StorageProvider::Azure(store) => store.list(path),
128        };
129
130        // Process each item in the stream
131        stream
132            .try_fold(Vec::new(), |mut files, meta| async move {
133                files.push(meta.location.to_string());
134                Ok(files)
135            })
136            .await
137            .map_err(Into::into)
138    }
139
140    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
141        match self {
142            StorageProvider::Local(store) => {
143                store.delete(path).await?;
144                Ok(())
145            }
146            StorageProvider::Google(store) => {
147                store.delete(path).await?;
148                Ok(())
149            }
150            StorageProvider::Aws(store) => {
151                store.delete(path).await?;
152                Ok(())
153            }
154            StorageProvider::Azure(store) => {
155                store.delete(path).await?;
156                Ok(())
157            }
158        }
159    }
160}
161
162#[derive(Debug, Clone)]
163pub struct ObjectStore {
164    provider: StorageProvider,
165    pub storage_settings: ObjectStorageSettings,
166}
167
168impl ObjectStore {
169    /// Creates a new ObjectStore instance.
170    ///
171    /// # Arguments
172    /// * `storage_settings` - The settings for the object storage.
173    ///
174    /// # Returns
175    /// * `Result<ObjectStore, StorageError>` - A result containing the ObjectStore instance or an error.
176    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
177        let store = StorageProvider::new(storage_settings)?;
178        Ok(ObjectStore {
179            provider: store,
180            storage_settings: storage_settings.clone(),
181        })
182    }
183
184    pub fn get_session(&self) -> Result<SessionContext, StorageError> {
185        let ctx = self.provider.get_session(&self.storage_settings)?;
186        Ok(ctx)
187    }
188
189    /// Get the base URL for datafusion to use
190    pub fn get_base_url(&self) -> Result<Url, StorageError> {
191        self.provider.get_base_url(&self.storage_settings)
192    }
193
194    /// List files in the object store
195    ///
196    /// When path is None, lists from the root.
197    /// When path is provided, lists from that path.
198    ///
199    /// Note: The path parameter should NOT include the storage root - it's a relative path
200    /// that will be automatically combined with the storage root.
201    pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
202        self.provider.list(path).await
203    }
204
205    pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
206        self.provider.delete(path).await
207    }
208}