Skip to main content

trace_weft_server/storage/
blob.rs

1use bytes::Bytes;
2use object_store::{ObjectStore, path::Path as ObjectPath};
3use std::path::PathBuf;
4use tokio::fs;
5use trace_weft_core::{BlobHash, BlobStore};
6
7pub struct LocalBlobStore {
8    dir: PathBuf,
9}
10
11impl LocalBlobStore {
12    pub fn new(dir: PathBuf) -> Self {
13        Self { dir }
14    }
15}
16
17#[async_trait::async_trait]
18impl BlobStore for LocalBlobStore {
19    async fn put_blob(
20        &self,
21        hash: &BlobHash,
22        _content_type: &str,
23        content: &[u8],
24    ) -> anyhow::Result<()> {
25        let path = self.dir.join(&hash.0);
26        fs::write(path, content).await?;
27        Ok(())
28    }
29
30    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
31        let path = self.dir.join(&hash.0);
32        if path.exists() {
33            let data = fs::read(path).await?;
34            Ok(Some(data))
35        } else {
36            Ok(None)
37        }
38    }
39}
40
41pub struct S3BlobStore {
42    store: Box<dyn ObjectStore>,
43}
44
45impl S3BlobStore {
46    pub fn new(
47        bucket: &str,
48        region: &str,
49        access_key: &str,
50        secret_key: &str,
51    ) -> anyhow::Result<Self> {
52        let store = object_store::aws::AmazonS3Builder::new()
53            .with_bucket_name(bucket)
54            .with_region(region)
55            .with_access_key_id(access_key)
56            .with_secret_access_key(secret_key)
57            .build()?;
58
59        Ok(Self {
60            store: Box::new(store),
61        })
62    }
63}
64
65#[async_trait::async_trait]
66impl BlobStore for S3BlobStore {
67    async fn put_blob(
68        &self,
69        hash: &BlobHash,
70        _content_type: &str,
71        content: &[u8],
72    ) -> anyhow::Result<()> {
73        let path = ObjectPath::from(hash.0.clone());
74        let bytes = Bytes::copy_from_slice(content);
75        self.store.put(&path, bytes.into()).await?;
76        Ok(())
77    }
78
79    async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
80        let path = ObjectPath::from(hash.0.clone());
81        match self.store.get(&path).await {
82            Ok(result) => {
83                let bytes = result.bytes().await?;
84                Ok(Some(bytes.to_vec()))
85            }
86            Err(object_store::Error::NotFound { .. }) => Ok(None),
87            Err(e) => Err(e.into()),
88        }
89    }
90}