trace_weft_server/storage/
blob.rs1use bytes::Bytes;
2use object_store::{ObjectStore, path::Path as ObjectPath};
3use std::path::PathBuf;
4use tokio::fs;
5use trace_weft_core::{BlobHash, BlobStore};
6
7pub struct LocalBlobStore {
8 dir: PathBuf,
9}
10
11impl LocalBlobStore {
12 pub fn new(dir: PathBuf) -> Self {
13 Self { dir }
14 }
15}
16
17#[async_trait::async_trait]
18impl BlobStore for LocalBlobStore {
19 async fn put_blob(
20 &self,
21 hash: &BlobHash,
22 _content_type: &str,
23 content: &[u8],
24 ) -> anyhow::Result<()> {
25 let path = self.dir.join(&hash.0);
26 fs::write(path, content).await?;
27 Ok(())
28 }
29
30 async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
31 let path = self.dir.join(&hash.0);
32 if path.exists() {
33 let data = fs::read(path).await?;
34 Ok(Some(data))
35 } else {
36 Ok(None)
37 }
38 }
39}
40
41pub struct S3BlobStore {
42 store: Box<dyn ObjectStore>,
43}
44
45impl S3BlobStore {
46 pub fn new(
47 bucket: &str,
48 region: &str,
49 access_key: &str,
50 secret_key: &str,
51 ) -> anyhow::Result<Self> {
52 let store = object_store::aws::AmazonS3Builder::new()
53 .with_bucket_name(bucket)
54 .with_region(region)
55 .with_access_key_id(access_key)
56 .with_secret_access_key(secret_key)
57 .build()?;
58
59 Ok(Self {
60 store: Box::new(store),
61 })
62 }
63}
64
65#[async_trait::async_trait]
66impl BlobStore for S3BlobStore {
67 async fn put_blob(
68 &self,
69 hash: &BlobHash,
70 _content_type: &str,
71 content: &[u8],
72 ) -> anyhow::Result<()> {
73 let path = ObjectPath::from(hash.0.clone());
74 let bytes = Bytes::copy_from_slice(content);
75 self.store.put(&path, bytes.into()).await?;
76 Ok(())
77 }
78
79 async fn get_blob(&self, hash: &BlobHash) -> anyhow::Result<Option<Vec<u8>>> {
80 let path = ObjectPath::from(hash.0.clone());
81 match self.store.get(&path).await {
82 Ok(result) => {
83 let bytes = result.bytes().await?;
84 Ok(Some(bytes.to_vec()))
85 }
86 Err(object_store::Error::NotFound { .. }) => Ok(None),
87 Err(e) => Err(e.into()),
88 }
89 }
90}