rustrails_storage/service/
s3.rs1use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use object_store::{ObjectStore, ObjectStoreExt, PutPayload, path::Path};
8use url::Url;
9
10use super::{StorageError, StorageService, checked_key};
11
12#[derive(Clone)]
14pub struct S3Service {
15 name: String,
16 bucket: String,
17 store: Arc<dyn ObjectStore>,
18 base_url: Url,
19}
20
21impl std::fmt::Debug for S3Service {
22 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 formatter
24 .debug_struct("S3Service")
25 .field("name", &self.name)
26 .field("bucket", &self.bucket)
27 .finish_non_exhaustive()
28 }
29}
30
31impl S3Service {
32 pub fn new(
38 name: impl Into<String>,
39 bucket: impl Into<String>,
40 store: Arc<dyn ObjectStore>,
41 ) -> Result<Self, StorageError> {
42 let bucket = bucket.into();
43 let base_url = Url::parse(&format!("https://s3.local/{bucket}/"))
44 .map_err(|error| StorageError::InvalidUrl(error.to_string()))?;
45 Ok(Self {
46 name: name.into(),
47 bucket,
48 store,
49 base_url,
50 })
51 }
52
53 fn path_for(&self, key: &str) -> Result<Path, StorageError> {
54 let key = checked_key(key)?;
55 Ok(Path::from(key))
56 }
57}
58
59#[async_trait]
60impl StorageService for S3Service {
61 fn name(&self) -> &str {
62 &self.name
63 }
64
65 async fn upload(&self, key: &str, data: Bytes) -> Result<(), StorageError> {
66 let path = self.path_for(key)?;
67 if self.exists(key).await? {
68 return Err(StorageError::DuplicateKey(key.to_owned()));
69 }
70 self.store
71 .put(&path, PutPayload::from(data))
72 .await
73 .map(|_| ())
74 .map_err(|error| StorageError::ObjectStore {
75 path: key.to_owned(),
76 message: error.to_string(),
77 })
78 }
79
80 async fn download(&self, key: &str) -> Result<Bytes, StorageError> {
81 let path = self.path_for(key)?;
82 match self.store.get(&path).await {
83 Ok(result) => result
84 .bytes()
85 .await
86 .map_err(|error| StorageError::ObjectStore {
87 path: key.to_owned(),
88 message: error.to_string(),
89 }),
90 Err(object_store::Error::NotFound { .. }) => {
91 Err(StorageError::NotFound(key.to_owned()))
92 }
93 Err(error) => Err(StorageError::ObjectStore {
94 path: key.to_owned(),
95 message: error.to_string(),
96 }),
97 }
98 }
99
100 async fn delete(&self, key: &str) -> Result<(), StorageError> {
101 let path = self.path_for(key)?;
102 match self.store.delete(&path).await {
103 Ok(()) => Ok(()),
104 Err(object_store::Error::NotFound { .. }) => Ok(()),
105 Err(error) => Err(StorageError::ObjectStore {
106 path: key.to_owned(),
107 message: error.to_string(),
108 }),
109 }
110 }
111
112 async fn exists(&self, key: &str) -> Result<bool, StorageError> {
113 let path = self.path_for(key)?;
114 match self.store.head(&path).await {
115 Ok(_) => Ok(true),
116 Err(object_store::Error::NotFound { .. }) => Ok(false),
117 Err(error) => Err(StorageError::ObjectStore {
118 path: key.to_owned(),
119 message: error.to_string(),
120 }),
121 }
122 }
123
124 async fn url(&self, key: &str, expires_in: Duration) -> Result<Url, StorageError> {
125 let key = checked_key(key)?;
126 let mut url = self.base_url.clone();
127 url.set_path(&format!("{}/{}", self.bucket, key));
128 url.query_pairs_mut()
129 .append_pair("service", &self.name)
130 .append_pair("expires_in", &expires_in.as_secs().to_string());
131 Ok(url)
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[tokio::test]
140 async fn test_s3_service_round_trip_with_in_memory_store() {
141 let service = S3Service::new(
142 "s3",
143 "bucket",
144 Arc::new(object_store::memory::InMemory::new()),
145 )
146 .expect("service should build");
147 service
148 .upload("a.txt", Bytes::from_static(b"hello"))
149 .await
150 .expect("upload should succeed");
151 assert_eq!(
152 service
153 .download("a.txt")
154 .await
155 .expect("download should succeed"),
156 Bytes::from_static(b"hello")
157 );
158 }
159
160 #[tokio::test]
161 async fn test_s3_service_reports_missing_keys() {
162 let service = S3Service::new(
163 "s3",
164 "bucket",
165 Arc::new(object_store::memory::InMemory::new()),
166 )
167 .expect("service should build");
168 let error = service
169 .download("missing")
170 .await
171 .expect_err("download should fail");
172 assert!(matches!(error, StorageError::NotFound(key) if key == "missing"));
173 }
174}