Skip to main content

ailake_store/
object_store_backend.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2use std::ops::Range;
3use std::sync::Arc;
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use futures::StreamExt;
9use object_store::{path::Path, ObjectStore};
10
11use crate::store::Store;
12
13/// Wraps any `object_store::ObjectStore` (S3, GCS, Azure, in-memory) behind the
14/// unified `Store` trait. All paths are resolved relative to `prefix`.
15pub struct ObjectStoreBackend {
16    inner: Arc<dyn ObjectStore>,
17    /// Base prefix prepended to every path (e.g. "my-table/"). May be empty.
18    prefix: String,
19}
20
21impl ObjectStoreBackend {
22    pub fn new(store: Arc<dyn ObjectStore>, prefix: impl Into<String>) -> Self {
23        let mut prefix = prefix.into();
24        if !prefix.is_empty() && !prefix.ends_with('/') {
25            prefix.push('/');
26        }
27        Self {
28            inner: store,
29            prefix,
30        }
31    }
32
33    fn resolve(&self, path: &str) -> Path {
34        let full = format!("{}{}", self.prefix, path.trim_start_matches('/'));
35        Path::from(full.as_str())
36    }
37}
38
39#[async_trait]
40impl Store for ObjectStoreBackend {
41    async fn get(&self, path: &str) -> AilakeResult<Bytes> {
42        let p = self.resolve(path);
43        self.inner
44            .get(&p)
45            .await
46            .map_err(|e| AilakeError::Store(e.to_string()))?
47            .bytes()
48            .await
49            .map_err(|e| AilakeError::Store(e.to_string()))
50    }
51
52    async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
53        let p = self.resolve(path);
54        let byte_range = range.start as usize..range.end as usize;
55        self.inner
56            .get_range(&p, byte_range)
57            .await
58            .map_err(|e| AilakeError::Store(e.to_string()))
59    }
60
61    async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
62        let p = self.resolve(path);
63        self.inner
64            .put(&p, data.into())
65            .await
66            .map_err(|e| AilakeError::Store(e.to_string()))?;
67        Ok(())
68    }
69
70    async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
71        let p = self.resolve(prefix);
72        let base_prefix = self.prefix.clone();
73        let mut stream = self.inner.list(Some(&p));
74        let mut paths = Vec::new();
75        while let Some(item) = stream.next().await {
76            let meta = item.map_err(|e| AilakeError::Store(e.to_string()))?;
77            let full = meta.location.to_string();
78            // Strip the store prefix to return a relative path
79            let rel = if full.starts_with(&base_prefix) {
80                full[base_prefix.len()..].to_string()
81            } else {
82                full
83            };
84            paths.push(rel);
85        }
86        paths.sort();
87        Ok(paths)
88    }
89
90    async fn file_size(&self, path: &str) -> AilakeResult<u64> {
91        let p = self.resolve(path);
92        let meta = self
93            .inner
94            .head(&p)
95            .await
96            .map_err(|e| AilakeError::Store(e.to_string()))?;
97        Ok(meta.size as u64)
98    }
99
100    async fn exists(&self, path: &str) -> AilakeResult<bool> {
101        let p = self.resolve(path);
102        match self.inner.head(&p).await {
103            Ok(_) => Ok(true),
104            Err(object_store::Error::NotFound { .. }) => Ok(false),
105            Err(e) => Err(AilakeError::Store(e.to_string())),
106        }
107    }
108
109    async fn delete(&self, path: &str) -> AilakeResult<()> {
110        let p = self.resolve(path);
111        self.inner
112            .delete(&p)
113            .await
114            .map_err(|e| AilakeError::Store(e.to_string()))
115    }
116}