ailake_store/
object_store_backend.rs1use std::ops::Range;
3use std::sync::Arc;
4
5use ailake_core::{AilakeError, AilakeResult};
6use async_trait::async_trait;
7use bytes::Bytes;
8use futures::StreamExt;
9use object_store::{path::Path, ObjectStore};
10
11use crate::store::Store;
12
13pub struct ObjectStoreBackend {
16 inner: Arc<dyn ObjectStore>,
17 prefix: String,
19}
20
21impl ObjectStoreBackend {
22 pub fn new(store: Arc<dyn ObjectStore>, prefix: impl Into<String>) -> Self {
23 let mut prefix = prefix.into();
24 if !prefix.is_empty() && !prefix.ends_with('/') {
25 prefix.push('/');
26 }
27 Self {
28 inner: store,
29 prefix,
30 }
31 }
32
33 fn resolve(&self, path: &str) -> Path {
34 let full = format!("{}{}", self.prefix, path.trim_start_matches('/'));
35 Path::from(full.as_str())
36 }
37}
38
39#[async_trait]
40impl Store for ObjectStoreBackend {
41 async fn get(&self, path: &str) -> AilakeResult<Bytes> {
42 let p = self.resolve(path);
43 self.inner
44 .get(&p)
45 .await
46 .map_err(|e| AilakeError::Store(e.to_string()))?
47 .bytes()
48 .await
49 .map_err(|e| AilakeError::Store(e.to_string()))
50 }
51
52 async fn get_range(&self, path: &str, range: Range<u64>) -> AilakeResult<Bytes> {
53 let p = self.resolve(path);
54 let byte_range = range.start as usize..range.end as usize;
55 self.inner
56 .get_range(&p, byte_range)
57 .await
58 .map_err(|e| AilakeError::Store(e.to_string()))
59 }
60
61 async fn put(&self, path: &str, data: Bytes) -> AilakeResult<()> {
62 let p = self.resolve(path);
63 self.inner
64 .put(&p, data.into())
65 .await
66 .map_err(|e| AilakeError::Store(e.to_string()))?;
67 Ok(())
68 }
69
70 async fn list(&self, prefix: &str) -> AilakeResult<Vec<String>> {
71 let p = self.resolve(prefix);
72 let base_prefix = self.prefix.clone();
73 let mut stream = self.inner.list(Some(&p));
74 let mut paths = Vec::new();
75 while let Some(item) = stream.next().await {
76 let meta = item.map_err(|e| AilakeError::Store(e.to_string()))?;
77 let full = meta.location.to_string();
78 let rel = if full.starts_with(&base_prefix) {
80 full[base_prefix.len()..].to_string()
81 } else {
82 full
83 };
84 paths.push(rel);
85 }
86 paths.sort();
87 Ok(paths)
88 }
89
90 async fn file_size(&self, path: &str) -> AilakeResult<u64> {
91 let p = self.resolve(path);
92 let meta = self
93 .inner
94 .head(&p)
95 .await
96 .map_err(|e| AilakeError::Store(e.to_string()))?;
97 Ok(meta.size as u64)
98 }
99
100 async fn exists(&self, path: &str) -> AilakeResult<bool> {
101 let p = self.resolve(path);
102 match self.inner.head(&p).await {
103 Ok(_) => Ok(true),
104 Err(object_store::Error::NotFound { .. }) => Ok(false),
105 Err(e) => Err(AilakeError::Store(e.to_string())),
106 }
107 }
108
109 async fn delete(&self, path: &str) -> AilakeResult<()> {
110 let p = self.resolve(path);
111 self.inner
112 .delete(&p)
113 .await
114 .map_err(|e| AilakeError::Store(e.to_string()))
115 }
116}