Skip to main content

rustrails_storage/service/
gcs.rs

1//! Object-store backed GCS-style service.
2
3use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use object_store::{ObjectStore, ObjectStoreExt, PutPayload, path::Path};
8use url::Url;
9
10use super::{StorageError, StorageService, checked_key};
11
12/// Minimal Google Cloud Storage-compatible service backed by [`object_store`].
13#[derive(Clone)]
14pub struct GcsService {
15    name: String,
16    bucket: String,
17    store: Arc<dyn ObjectStore>,
18    base_url: Url,
19}
20
21impl std::fmt::Debug for GcsService {
22    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        formatter
24            .debug_struct("GcsService")
25            .field("name", &self.name)
26            .field("bucket", &self.bucket)
27            .finish_non_exhaustive()
28    }
29}
30
31impl GcsService {
32    /// Creates a new service using the provided object store.
33    ///
34    /// # Errors
35    ///
36    /// Returns an error when the synthetic base URL cannot be constructed.
37    pub fn new(
38        name: impl Into<String>,
39        bucket: impl Into<String>,
40        store: Arc<dyn ObjectStore>,
41    ) -> Result<Self, StorageError> {
42        let bucket = bucket.into();
43        let base_url = Url::parse(&format!("https://gcs.local/{bucket}/"))
44            .map_err(|error| StorageError::InvalidUrl(error.to_string()))?;
45        Ok(Self {
46            name: name.into(),
47            bucket,
48            store,
49            base_url,
50        })
51    }
52
53    fn path_for(&self, key: &str) -> Result<Path, StorageError> {
54        let key = checked_key(key)?;
55        Ok(Path::from(key))
56    }
57}
58
59#[async_trait]
60impl StorageService for GcsService {
61    fn name(&self) -> &str {
62        &self.name
63    }
64
65    async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
66        let path = self.path_for(key)?;
67        if self.exists(key).await? {
68            return Err(StorageError::DuplicateKey(key.to_owned()));
69        }
70        self.store
71            .put(&path, PutPayload::from(data))
72            .await
73            .map(|_| ())
74            .map_err(|error| StorageError::ObjectStore {
75                path: key.to_owned(),
76                message: error.to_string(),
77            })
78    }
79
80    async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
81        let path = self.path_for(key)?;
82        match self.store.get(&path).await {
83            Ok(result) => result
84                .bytes()
85                .await
86                .map_err(|error| StorageError::ObjectStore {
87                    path: key.to_owned(),
88                    message: error.to_string(),
89                }),
90            Err(object_store::Error::NotFound { .. }) => {
91                Err(StorageError::NotFound(key.to_owned()))
92            }
93            Err(error) => Err(StorageError::ObjectStore {
94                path: key.to_owned(),
95                message: error.to_string(),
96            }),
97        }
98    }
99
100    async fn delete(&self, key: &str) -> Result<(), StorageError> {
101        let path = self.path_for(key)?;
102        match self.store.delete(&path).await {
103            Ok(()) => Ok(()),
104            Err(object_store::Error::NotFound { .. }) => Ok(()),
105            Err(error) => Err(StorageError::ObjectStore {
106                path: key.to_owned(),
107                message: error.to_string(),
108            }),
109        }
110    }
111
112    async fn exists(&self, key: &str) -> Result<bool, StorageError> {
113        let path = self.path_for(key)?;
114        match self.store.head(&path).await {
115            Ok(_) => Ok(true),
116            Err(object_store::Error::NotFound { .. }) => Ok(false),
117            Err(error) => Err(StorageError::ObjectStore {
118                path: key.to_owned(),
119                message: error.to_string(),
120            }),
121        }
122    }
123
124    async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
125        let key = checked_key(key)?;
126        let mut url = self.base_url.clone();
127        url.set_path(&format!("{}/{}", self.bucket, key));
128        url.query_pairs_mut()
129            .append_pair("service", &self.name)
130            .append_pair("expires_in", &expires_in.as_secs().to_string());
131        Ok(url)
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[tokio::test]
140    async fn test_gcs_service_round_trip_with_in_memory_store() {
141        let service = GcsService::new(
142            "gcs",
143            "bucket",
144            Arc::new(object_store::memory::InMemory::new()),
145        )
146        .expect("service should build");
147        service
148            .upload("a.txt", Bytes::from_static(b"hello"))
149            .await
150            .expect("upload should succeed");
151        assert_eq!(
152            service
153                .download("a.txt")
154                .await
155                .expect("download should succeed"),
156            Bytes::from_static(b"hello")
157        );
158    }
159
160    #[tokio::test]
161    async fn test_gcs_service_url_includes_bucket() {
162        let service = GcsService::new(
163            "gcs",
164            "bucket",
165            Arc::new(object_store::memory::InMemory::new()),
166        )
167        .expect("service should build");
168        let url = service
169            .url("a.txt", Duration::from_secs(60))
170            .await
171            .expect("url should build");
172        assert_eq!(
173            url.as_str(),
174            "https://gcs.local/bucket/a.txt?service=gcs&expires_in=60"
175        );
176    }
177}