Skip to main content

alien_bindings/providers/storage/
local.rs

1use crate::providers::utils::{prefixed_path, relativize_path};
2use crate::{
3    error::{Error, ErrorData},
4    presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
5    traits::{Binding, Storage},
6};
7use alien_error::{AlienError, Context, IntoAlienError};
8use async_trait::async_trait;
9use bytes::Bytes;
10use chrono::Utc;
11use futures::stream::BoxStream;
12use futures::TryStreamExt as _;
13use object_store::{
14    local::LocalFileSystem, // Use LocalFileSystem directly
15    parse_url,              // Also needed for the URL case
16    path::Path,
17    GetOptions,
18    GetResult,
19    ListResult,
20    ObjectMeta,
21    ObjectStore,
22    PutMultipartOpts,
23    PutOptions,
24    PutPayload,
25    PutResult,
26    Result as ObjectStoreResult,
27};
28use std::path::PathBuf as StdPathBuf;
29use std::time::Duration;
30use url::Url;
31
32/// Local storage implementation.
33///
34/// Can be backed by either a generic store parsed from a URL (typically `file://`)
35/// or a `LocalFileSystem` instance pointing to a specific directory.
36#[derive(Debug)]
37pub struct LocalStorage {
38    // Store the URL for reference, even if using LocalFileSystem directly
39    url: Url,
40    base_dir: Path,
41    inner: Box<dyn ObjectStore>,
42}
43
44impl LocalStorage {
45    /// Creates a new `LocalStorage` instance from a storage path.
46    ///
47    /// The path can be either:
48    /// - A file:// URL (e.g., "file:///path/to/storage")
49    /// - An absolute filesystem path (e.g., "/path/to/storage")
50    pub fn new(storage_path: String) -> Result<Self, Error> {
51        // Check if it's a file:// URL or an absolute path
52        if storage_path.starts_with("file://") {
53            Self::new_from_url(&storage_path)
54        } else {
55            // Treat as an absolute filesystem path
56            Self::new_from_path(&storage_path)
57        }
58    }
59
60    /// Creates a new `LocalStorage` instance from a URL string.
61    /// Uses `parse_url` to handle the URL (likely `file://`).
62    pub fn new_from_url(url_str: &str) -> Result<Self, Error> {
63        let url =
64            Url::parse(url_str)
65                .into_alien_error()
66                .context(ErrorData::InvalidConfigurationUrl {
67                    url: url_str.to_string(),
68                    reason: "Invalid storage URL for local storage".to_string(),
69                })?;
70
71        let (store, base_dir) =
72            parse_url(&url)
73                .into_alien_error()
74                .context(ErrorData::BindingSetupFailed {
75                    binding_type: "local storage".to_string(),
76                    reason: format!("Failed to initialize storage from URL '{}'", url_str),
77                })?;
78
79        Ok(Self {
80            url,
81            base_dir,
82            inner: store,
83        })
84    }
85
86    /// Creates a new `LocalStorage` instance from an absolute filesystem path.
87    pub fn new_from_path(path: &str) -> Result<Self, Error> {
88        let path_buf = StdPathBuf::from(path);
89
90        // Create the directory if it doesn't exist
91        std::fs::create_dir_all(&path_buf)
92            .into_alien_error()
93            .context(ErrorData::LocalFilesystemError {
94                path: path_buf.to_string_lossy().to_string(),
95                operation: "create_dir_all".to_string(),
96            })?;
97
98        // Build the LocalFileSystem store
99        let store = LocalFileSystem::new_with_prefix(&path_buf)
100            .into_alien_error()
101            .context(ErrorData::BindingSetupFailed {
102                binding_type: "local storage".to_string(),
103                reason: format!("Failed to initialize LocalFileSystem at: {:?}", path_buf),
104            })?;
105
106        // Construct a file:// URL for reference
107        let url_string = if cfg!(windows) {
108            format!("file:///{}?path={}", path_buf.display(), path_buf.display()).replace('\\', "/")
109        } else {
110            format!("file://{}", path_buf.display())
111        };
112
113        let url = Url::parse(&url_string).into_alien_error().context(
114            ErrorData::InvalidConfigurationUrl {
115                url: url_string.clone(),
116                reason: format!("Failed to construct file URL for path: {:?}", path_buf),
117            },
118        )?;
119
120        Ok(Self {
121            url,
122            base_dir: Path::default(),
123            inner: Box::new(store),
124        })
125    }
126}
127
128impl Binding for LocalStorage {}
129
130#[async_trait]
131impl Storage for LocalStorage {
132    fn get_base_dir(&self) -> Path {
133        // Note: When using LocalFileSystem::new_with_prefix, the prefix
134        // is handled internally by the store. So the base_dir passed
135        // to the ObjectStore methods should be relative to that prefix.
136        // If created via parse_url, base_dir will have the path part.
137        self.base_dir.clone()
138    }
139
140    fn get_url(&self) -> Url {
141        self.url.clone()
142    }
143
144    async fn presigned_put(
145        &self,
146        path: &Path,
147        expires_in: Duration,
148    ) -> crate::error::Result<PresignedRequest> {
149        let dst = prefixed_path(&self.base_dir, path);
150
151        // For local storage, we need to convert the logical path to a filesystem path
152        // This is a bit tricky since we need to map from the object store path to the actual file path
153        let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
154            // URL-based local storage (file://)
155            url_path.join(dst.to_string()).to_string_lossy().to_string()
156        } else {
157            // Direct LocalFileSystem usage - we need to infer the base directory
158            // This is a simplification - in practice you might want to store the actual base path
159            format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
160        };
161
162        Ok(PresignedRequest {
163            backend: PresignedRequestBackend::Local {
164                file_path,
165                operation: LocalOperation::Put,
166            },
167            expiration: Utc::now()
168                + chrono::Duration::from_std(expires_in).map_err(|e| {
169                    AlienError::new(ErrorData::Other {
170                        message: format!("Invalid duration: {}", e),
171                    })
172                })?,
173            operation: PresignedOperation::Put,
174            path: path.to_string(),
175        })
176    }
177
178    async fn presigned_get(
179        &self,
180        path: &Path,
181        expires_in: Duration,
182    ) -> crate::error::Result<PresignedRequest> {
183        let dst = prefixed_path(&self.base_dir, path);
184
185        let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
186            url_path.join(dst.to_string()).to_string_lossy().to_string()
187        } else {
188            format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
189        };
190
191        Ok(PresignedRequest {
192            backend: PresignedRequestBackend::Local {
193                file_path,
194                operation: LocalOperation::Get,
195            },
196            expiration: Utc::now()
197                + chrono::Duration::from_std(expires_in).map_err(|e| {
198                    AlienError::new(ErrorData::Other {
199                        message: format!("Invalid duration: {}", e),
200                    })
201                })?,
202            operation: PresignedOperation::Get,
203            path: path.to_string(),
204        })
205    }
206
207    async fn presigned_delete(
208        &self,
209        path: &Path,
210        expires_in: Duration,
211    ) -> crate::error::Result<PresignedRequest> {
212        let dst = prefixed_path(&self.base_dir, path);
213
214        let file_path = if let Some(url_path) = self.url.to_file_path().ok() {
215            url_path.join(dst.to_string()).to_string_lossy().to_string()
216        } else {
217            format!("{}/{}", self.url.path().trim_start_matches('/'), dst)
218        };
219
220        Ok(PresignedRequest {
221            backend: PresignedRequestBackend::Local {
222                file_path,
223                operation: LocalOperation::Delete,
224            },
225            expiration: Utc::now()
226                + chrono::Duration::from_std(expires_in).map_err(|e| {
227                    AlienError::new(ErrorData::Other {
228                        message: format!("Invalid duration: {}", e),
229                    })
230                })?,
231            operation: PresignedOperation::Delete,
232            path: path.to_string(),
233        })
234    }
235}
236
237// Delegate ObjectStore trait implementation to the inner store.
238// When using LocalFileSystem::new_with_prefix, paths passed here are
239// relative to the prefix directory.
240// When using parse_url("file:///prefix/..."), the base_dir adjustment handles the prefix.
241#[async_trait]
242impl ObjectStore for LocalStorage {
243    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
244        let dst = prefixed_path(&self.base_dir, location);
245        self.inner.put(&dst, payload).await
246    }
247
248    async fn put_opts(
249        &self,
250        location: &Path,
251        payload: PutPayload,
252        opts: PutOptions,
253    ) -> ObjectStoreResult<PutResult> {
254        let dst = prefixed_path(&self.base_dir, location);
255        // LocalFileSystem doesn't support attributes or tags, strip them to avoid UNIMPLEMENTED
256        let opts = PutOptions {
257            mode: opts.mode,
258            tags: Default::default(),
259            attributes: Default::default(),
260            extensions: opts.extensions,
261        };
262        self.inner.put_opts(&dst, payload, opts).await
263    }
264
265    async fn put_multipart(
266        &self,
267        location: &Path,
268    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
269        let dst = prefixed_path(&self.base_dir, location);
270        self.inner.put_multipart(&dst).await
271    }
272
273    async fn put_multipart_opts(
274        &self,
275        location: &Path,
276        opts: PutMultipartOpts,
277    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
278        let dst = prefixed_path(&self.base_dir, location);
279        self.inner.put_multipart_opts(&dst, opts).await
280    }
281
282    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
283        let src = prefixed_path(&self.base_dir, location);
284        self.inner.get(&src).await
285    }
286
287    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
288        let src = prefixed_path(&self.base_dir, location);
289        self.inner.get_opts(&src, options).await
290    }
291
292    async fn get_range(
293        &self,
294        location: &Path,
295        range: std::ops::Range<u64>,
296    ) -> ObjectStoreResult<Bytes> {
297        let src = prefixed_path(&self.base_dir, location);
298        self.inner.get_range(&src, range).await
299    }
300
301    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
302        let src = prefixed_path(&self.base_dir, location);
303        let mut meta = self.inner.head(&src).await?;
304        meta.location = relativize_path(&self.base_dir, meta.location, "LocalStorage")?;
305        Ok(meta)
306    }
307
308    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
309        let src = prefixed_path(&self.base_dir, location);
310        self.inner.delete(&src).await
311    }
312
313    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
314        let list_prefix_for_inner = prefix
315            .map(|p| prefixed_path(&self.base_dir, p))
316            .unwrap_or_else(|| self.base_dir.clone());
317
318        let base_dir_for_stream = self.base_dir.clone();
319
320        Box::pin(
321            self.inner
322                .list(Some(&list_prefix_for_inner))
323                .and_then(move |mut meta| {
324                    let captured_base_dir = base_dir_for_stream.clone();
325                    async move {
326                        meta.location =
327                            relativize_path(&captured_base_dir, meta.location, "LocalStorage")?;
328                        Ok(meta)
329                    }
330                }),
331        )
332    }
333
334    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
335        let list_prefix_for_inner = prefix
336            .map(|p| prefixed_path(&self.base_dir, p))
337            .unwrap_or_else(|| self.base_dir.clone());
338
339        let mut result = self
340            .inner
341            .list_with_delimiter(Some(&list_prefix_for_inner))
342            .await?;
343
344        for meta in &mut result.objects {
345            let original_location = std::mem::take(&mut meta.location);
346            meta.location = relativize_path(&self.base_dir, original_location, "LocalStorage")?;
347        }
348
349        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
350        for cp in result.common_prefixes {
351            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "LocalStorage")?);
352        }
353        result.common_prefixes = new_common_prefixes;
354
355        Ok(result)
356    }
357
358    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
359        let src = prefixed_path(&self.base_dir, from);
360        let dst = prefixed_path(&self.base_dir, to);
361        self.inner.copy(&src, &dst).await
362    }
363
364    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
365        let src = prefixed_path(&self.base_dir, from);
366        let dst = prefixed_path(&self.base_dir, to);
367        self.inner.copy_if_not_exists(&src, &dst).await
368    }
369}
370
371impl std::fmt::Display for LocalStorage {
372    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
373        write!(f, "LocalStorage(url={})", self.url)
374    }
375}