supertable_core/
storage.rs1use bytes::Bytes;
2use object_store::path::Path;
3use object_store::{ObjectStore, ObjectStoreExt};
4use std::sync::Arc;
5
6#[derive(Clone)]
7pub struct Storage {
8 inner: Arc<dyn ObjectStore>,
9}
10
11impl std::fmt::Debug for Storage {
12 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
13 f.debug_struct("Storage").finish()
14 }
15}
16
17impl Storage {
18 pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
19 Self { inner }
20 }
21
22 pub fn from_location(location: &str) -> anyhow::Result<Self> {
25 let url = url::Url::parse(location)
26 .map_err(|e| anyhow::anyhow!("Invalid location URL: {}", e))?;
27 let (store, _path) = object_store::parse_url(&url)?;
28 Ok(Self::new(store.into()))
29 }
30
31 pub async fn read(&self, path: &str) -> anyhow::Result<Bytes> {
32 let path = Path::from(path);
33 let result = self.inner.get(&path).await?;
34 let bytes = result.bytes().await?;
35 Ok(bytes)
36 }
37
38 pub async fn write(&self, path: &str, content: Bytes) -> anyhow::Result<()> {
39 let path = Path::from(path);
40 self.inner.put(&path, content.into()).await?;
41 Ok(())
42 }
43
44 pub async fn delete(&self, path: &str) -> anyhow::Result<()> {
45 let path = Path::from(path);
46 self.inner.delete(&path).await?;
47 Ok(())
48 }
49
50 pub async fn list_files(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
51 let prefix = Path::from(prefix);
52 let mut list = self.inner.list(Some(&prefix));
53 let mut files = Vec::new();
54
55 use futures::StreamExt;
56 while let Some(meta) = list.next().await {
57 let meta = meta?;
58 files.push(meta.location.to_string());
59 }
60
61 Ok(files)
62 }
63}