Skip to main content

trace_weft_server/storage/
blob.rs

1use bytes::Bytes;
2use object_store::{ObjectStore, path::Path as ObjectPath};
3use std::path::PathBuf;
4use tokio::fs;
5use trace_weft_core::{BlobHash, BlobStore};
6
7pub struct LocalBlobStore {
8    dir: PathBuf,
9}
10
11impl LocalBlobStore {
12    pub fn new(dir: PathBuf) -> Self {
13        Self { dir }
14    }
15
16    fn path_for(&self, hash: &BlobHash) -> PathBuf {
17        self.dir.join(hash.0.replace(':', "_"))
18    }
19}
20
21#[async_trait::async_trait]
22impl BlobStore for LocalBlobStore {
23    async fn put_blob(
24        &self,
25        hash: &BlobHash,
26        _content_type: &str,
27        content: &[u8],
28    ) -> anyhow::Result<()> {
29        fs::create_dir_all(&self.dir).await?;
30        let path = self.path_for(hash);
31        fs::write(path, content).await?;
32        Ok(())
33    }
34
35    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
36        let path = self.path_for(hash);
37        if path.exists() {
38            let data = fs::read(path).await?;
39            Ok(Some(data))
40        } else {
41            Ok(None)
42        }
43    }
44}
45
46pub struct S3BlobStore {
47    store: Box<dyn ObjectStore>,
48}
49
50impl S3BlobStore {
51    pub fn new(
52        bucket: &str,
53        region: &str,
54        access_key: &str,
55        secret_key: &str,
56    ) -> anyhow::Result<Self> {
57        let store = object_store::aws::AmazonS3Builder::new()
58            .with_bucket_name(bucket)
59            .with_region(region)
60            .with_access_key_id(access_key)
61            .with_secret_access_key(secret_key)
62            .build()?;
63
64        Ok(Self {
65            store: Box::new(store),
66        })
67    }
68}
69
70#[async_trait::async_trait]
71impl BlobStore for S3BlobStore {
72    async fn put_blob(
73        &self,
74        hash: &BlobHash,
75        _content_type: &str,
76        content: &[u8],
77    ) -> anyhow::Result<()> {
78        let path = ObjectPath::from(hash.0.clone());
79        let bytes = Bytes::copy_from_slice(content);
80        self.store.put(&path, bytes.into()).await?;
81        Ok(())
82    }
83
84    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
85        let path = ObjectPath::from(hash.0.clone());
86        match self.store.get(&path).await {
87            Ok(result) => {
88                let bytes = result.bytes().await?;
89                Ok(Some(bytes.to_vec()))
90            }
91            Err(object_store::Error::NotFound { .. }) => Ok(None),
92            Err(e) => Err(e.into()),
93        }
94    }
95}