alien_bindings/providers/storage/
local.rs1use crate::providers::utils::{prefixed_path, relativize_path};
2use crate::{
3 error::{Error, ErrorData},
4 presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
5 traits::{Binding, Storage},
6};
7use alien_error::{AlienError, Context, IntoAlienError};
8use async_trait::async_trait;
9use bytes::Bytes;
10use chrono::Utc;
11use futures::stream::BoxStream;
12use futures::TryStreamExt as _;
13use object_store::{
14 local::LocalFileSystem, parse_url, path::Path,
17 GetOptions,
18 GetResult,
19 ListResult,
20 ObjectMeta,
21 ObjectStore,
22 PutMultipartOpts,
23 PutOptions,
24 PutPayload,
25 PutResult,
26 Result as ObjectStoreResult,
27};
28use std::path::PathBuf as StdPathBuf;
29use std::time::Duration;
30use url::Url;
31
32#[derive(Debug)]
37pub struct LocalStorage {
38 url: Url,
40 base_dir: Path,
41 inner: Box<dyn ObjectStore>,
42}
43
44impl LocalStorage {
45 pub fn new(storage_path: String) -> Result<Self, Error> {
51 if storage_path.starts_with("file://") {
53 Self::new_from_url(&storage_path)
54 } else {
55 Self::new_from_path(&storage_path)
57 }
58 }
59
60 pub fn new_from_url(url_str: &str) -> Result<Self, Error> {
63 let url =
64 Url::parse(url_str)
65 .into_alien_error()
66 .context(ErrorData::InvalidConfigurationUrl {
67 url: url_str.to_string(),
68 reason: "Invalid storage URL for local storage".to_string(),
69 })?;
70
71 let (store, base_dir) =
72 parse_url(&url)
73 .into_alien_error()
74 .context(ErrorData::BindingSetupFailed {
75 binding_type: "local storage".to_string(),
76 reason: format!("Failed to initialize storage from URL '{}'", url_str),
77 })?;
78
79 Ok(Self {
80 url,
81 base_dir,
82 inner: store,
83 })
84 }
85
86 pub fn new_from_path(path: &str) -> Result<Self, Error> {
88 let path_buf = StdPathBuf::from(path);
89
90 std::fs::create_dir_all(&path_buf)
92 .into_alien_error()
93 .context(ErrorData::LocalFilesystemError {
94 path: path_buf.to_string_lossy().to_string(),
95 operation: "create_dir_all".to_string(),
96 })?;
97
98 let store = LocalFileSystem::new_with_prefix(&path_buf)
100 .into_alien_error()
101 .context(ErrorData::BindingSetupFailed {
102 binding_type: "local storage".to_string(),
103 reason: format!("Failed to initialize LocalFileSystem at: {:?}", path_buf),
104 })?;
105
106 let url_string = if cfg!(windows) {
108 format!("file:///{}?path={}", path_buf.display(), path_buf.display()).replace('\\', "/")
109 } else {
110 format!("file://{}", path_buf.display())
111 };
112
113 let url = Url::parse(&url_string).into_alien_error().context(
114 ErrorData::InvalidConfigurationUrl {
115 url: url_string.clone(),
116 reason: format!("Failed to construct file URL for path: {:?}", path_buf),
117 },
118 )?;
119
120 Ok(Self {
121 url,
122 base_dir: Path::default(),
123 inner: Box::new(store),
124 })
125 }
126}
127
128impl Binding for LocalStorage {}
129
130#[async_trait]
131impl Storage for LocalStorage {
132 fn get_base_dir(&self) -> Path {
133 self.base_dir.clone()
138 }
139
140 fn get_url(&self) -> Url {
141 self.url.clone()
142 }
143
144 async fn presigned_put(
145 &self,
146 path: &Path,
147 expires_in: Duration,
148 ) -> crate::error::Result<PresignedRequest> {
149 let dst = prefixed_path(&self.base_dir, path);
150
151 let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
154 url_path.join(dst.to_string()).to_string_lossy().to_string()
156 } else {
157 format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
160 };
161
162 Ok(PresignedRequest {
163 backend: PresignedRequestBackend::Local {
164 file_path,
165 operation: LocalOperation::Put,
166 },
167 expiration: Utc::now()
168 + chrono::Duration::from_std(expires_in).map_err(|e| {
169 AlienError::new(ErrorData::Other {
170 message: format!("Invalid duration: {}", e),
171 })
172 })?,
173 operation: PresignedOperation::Put,
174 path: path.to_string(),
175 })
176 }
177
178 async fn presigned_get(
179 &self,
180 path: &Path,
181 expires_in: Duration,
182 ) -> crate::error::Result<PresignedRequest> {
183 let dst = prefixed_path(&self.base_dir, path);
184
185 let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
186 url_path.join(dst.to_string()).to_string_lossy().to_string()
187 } else {
188 format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
189 };
190
191 Ok(PresignedRequest {
192 backend: PresignedRequestBackend::Local {
193 file_path,
194 operation: LocalOperation::Get,
195 },
196 expiration: Utc::now()
197 + chrono::Duration::from_std(expires_in).map_err(|e| {
198 AlienError::new(ErrorData::Other {
199 message: format!("Invalid duration: {}", e),
200 })
201 })?,
202 operation: PresignedOperation::Get,
203 path: path.to_string(),
204 })
205 }
206
207 async fn presigned_delete(
208 &self,
209 path: &Path,
210 expires_in: Duration,
211 ) -> crate::error::Result<PresignedRequest> {
212 let dst = prefixed_path(&self.base_dir, path);
213
214 let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
215 url_path.join(dst.to_string()).to_string_lossy().to_string()
216 } else {
217 format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
218 };
219
220 Ok(PresignedRequest {
221 backend: PresignedRequestBackend::Local {
222 file_path,
223 operation: LocalOperation::Delete,
224 },
225 expiration: Utc::now()
226 + chrono::Duration::from_std(expires_in).map_err(|e| {
227 AlienError::new(ErrorData::Other {
228 message: format!("Invalid duration: {}", e),
229 })
230 })?,
231 operation: PresignedOperation::Delete,
232 path: path.to_string(),
233 })
234 }
235}
236
237#[async_trait]
242impl ObjectStore for LocalStorage {
243 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
244 let dst = prefixed_path(&self.base_dir, location);
245 self.inner.put(&dst, payload).await
246 }
247
248 async fn put_opts(
249 &self,
250 location: &Path,
251 payload: PutPayload,
252 opts: PutOptions,
253 ) -> ObjectStoreResult<PutResult> {
254 let dst = prefixed_path(&self.base_dir, location);
255 let opts = PutOptions {
257 mode: opts.mode,
258 tags: Default::default(),
259 attributes: Default::default(),
260 extensions: opts.extensions,
261 };
262 self.inner.put_opts(&dst, payload, opts).await
263 }
264
265 async fn put_multipart(
266 &self,
267 location: &Path,
268 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
269 let dst = prefixed_path(&self.base_dir, location);
270 self.inner.put_multipart(&dst).await
271 }
272
273 async fn put_multipart_opts(
274 &self,
275 location: &Path,
276 opts: PutMultipartOpts,
277 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
278 let dst = prefixed_path(&self.base_dir, location);
279 self.inner.put_multipart_opts(&dst, opts).await
280 }
281
282 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
283 let src = prefixed_path(&self.base_dir, location);
284 self.inner.get(&src).await
285 }
286
287 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
288 let src = prefixed_path(&self.base_dir, location);
289 self.inner.get_opts(&src, options).await
290 }
291
292 async fn get_range(
293 &self,
294 location: &Path,
295 range: std::ops::Range<u64>,
296 ) -> ObjectStoreResult<Bytes> {
297 let src = prefixed_path(&self.base_dir, location);
298 self.inner.get_range(&src, range).await
299 }
300
301 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
302 let src = prefixed_path(&self.base_dir, location);
303 let mut meta = self.inner.head(&src).await?;
304 meta.location = relativize_path(&self.base_dir, meta.location, "LocalStorage")?;
305 Ok(meta)
306 }
307
308 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
309 let src = prefixed_path(&self.base_dir, location);
310 self.inner.delete(&src).await
311 }
312
313 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
314 let list_prefix_for_inner = prefix
315 .map(|p| prefixed_path(&self.base_dir, p))
316 .unwrap_or_else(|| self.base_dir.clone());
317
318 let base_dir_for_stream = self.base_dir.clone();
319
320 Box::pin(
321 self.inner
322 .list(Some(&list_prefix_for_inner))
323 .and_then(move |mut meta| {
324 let captured_base_dir = base_dir_for_stream.clone();
325 async move {
326 meta.location =
327 relativize_path(&captured_base_dir, meta.location, "LocalStorage")?;
328 Ok(meta)
329 }
330 }),
331 )
332 }
333
334 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
335 let list_prefix_for_inner = prefix
336 .map(|p| prefixed_path(&self.base_dir, p))
337 .unwrap_or_else(|| self.base_dir.clone());
338
339 let mut result = self
340 .inner
341 .list_with_delimiter(Some(&list_prefix_for_inner))
342 .await?;
343
344 for meta in &mut result.objects {
345 let original_location = std::mem::take(&mut meta.location);
346 meta.location = relativize_path(&self.base_dir, original_location, "LocalStorage")?;
347 }
348
349 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
350 for cp in result.common_prefixes {
351 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "LocalStorage")?);
352 }
353 result.common_prefixes = new_common_prefixes;
354
355 Ok(result)
356 }
357
358 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
359 let src = prefixed_path(&self.base_dir, from);
360 let dst = prefixed_path(&self.base_dir, to);
361 self.inner.copy(&src, &dst).await
362 }
363
364 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
365 let src = prefixed_path(&self.base_dir, from);
366 let dst = prefixed_path(&self.base_dir, to);
367 self.inner.copy_if_not_exists(&src, &dst).await
368 }
369}
370
371impl std::fmt::Display for LocalStorage {
372 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373 write!(f, "LocalStorage(url={})", self.url)
374 }
375}