Skip to main content

omnia_nats/
blobstore.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_nats::jetstream;
6use async_nats::jetstream::object_store::{Config, ObjectStore};
7use chrono::Utc;
8use futures::{FutureExt, StreamExt};
9use omnia_wasi_blobstore::{
10    Container, ContainerMetadata, FutureResult, ObjectMetadata, WasiBlobstoreCtx,
11};
12use tokio::io::AsyncReadExt;
13
14use crate::Client;
15
16/// `wasi-blobstore` implementation backed by NATS JetStream object store.
17impl WasiBlobstoreCtx for Client {
18    fn create_container(&self, name: String) -> FutureResult<Arc<dyn Container>> {
19        tracing::trace!("creating container: {name}");
20        let client = self.inner.clone();
21
22        async move {
23            let store = jetstream::new(client)
24                .create_object_store(Config {
25                    bucket: name.clone(),
26                    ..Default::default()
27                })
28                .await
29                .context("creating object store")?;
30            let metadata = metadata(name);
31
32            Ok(Arc::new(NatsContainer { metadata, store }) as Arc<dyn Container>)
33        }
34        .boxed()
35    }
36
37    fn get_container(&self, name: String) -> FutureResult<Arc<dyn Container>> {
38        tracing::trace!("getting container: {name}");
39        let client = self.inner.clone();
40
41        async move {
42            let store = jetstream::new(client)
43                .get_object_store(&name)
44                .await
45                .context("getting object store")?;
46            let metadata = metadata(name);
47
48            Ok(Arc::new(NatsContainer { metadata, store }) as Arc<dyn Container>)
49        }
50        .boxed()
51    }
52
53    fn delete_container(&self, name: String) -> FutureResult<()> {
54        tracing::trace!("deleting container: {name}");
55        let client = self.inner.clone();
56
57        async move {
58            jetstream::new(client)
59                .delete_object_store(&name)
60                .await
61                .context("issue deleting object store")?;
62            Ok(())
63        }
64        .boxed()
65    }
66
67    fn container_exists(&self, name: String) -> FutureResult<bool> {
68        tracing::trace!("checking existence of container: {name}");
69        let client = self.inner.clone();
70
71        async move {
72            let exists = jetstream::new(client).get_object_store(&name).await.is_ok();
73            Ok(exists)
74        }
75        .boxed()
76    }
77}
78
79fn metadata(name: String) -> ContainerMetadata {
80    #[allow(clippy::cast_sign_loss)]
81    ContainerMetadata {
82        name,
83        created_at: Utc::now().timestamp() as u64,
84    }
85}
86
87/// A blobstore container backed by a NATS JetStream object store.
88pub struct NatsContainer {
89    metadata: ContainerMetadata,
90    store: ObjectStore,
91}
92
93impl Debug for NatsContainer {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("NatsContainer").finish()
96    }
97}
98
99impl Container for NatsContainer {
100    fn name(&self) -> anyhow::Result<String> {
101        tracing::trace!("getting container name");
102        Ok(self.metadata.name.clone())
103    }
104
105    fn info(&self) -> anyhow::Result<ContainerMetadata> {
106        Ok(self.metadata.clone())
107    }
108
109    fn get_data(&self, name: String, _start: u64, _end: u64) -> FutureResult<Option<Vec<u8>>> {
110        tracing::trace!("getting object data: {name}");
111        let store = self.store.clone();
112
113        async move {
114            let mut object = store.get(name).await.context("getting object")?;
115            let mut bytes = vec![];
116            object.read_to_end(&mut bytes).await?;
117            Ok(Some(bytes))
118        }
119        .boxed()
120    }
121
122    fn write_data(&self, name: String, data: Vec<u8>) -> FutureResult<()> {
123        tracing::trace!("writing object data: {name}");
124        let store = self.store.clone();
125
126        async move {
127            store.put(name.as_str(), &mut data.as_slice()).await.context("writing object")?;
128            Ok(())
129        }
130        .boxed()
131    }
132
133    fn list_objects(&self) -> FutureResult<Vec<String>> {
134        tracing::trace!("listing objects");
135        let store = self.store.clone();
136
137        async move {
138            let mut objects = store.list().await.context("listing objects")?;
139
140            let mut names = vec![];
141            while let Some(n) = objects.next().await {
142                let Ok(obj_info) = n else {
143                    tracing::warn!("issue listing object");
144                    continue;
145                };
146                names.push(obj_info.name);
147            }
148
149            Ok(names)
150        }
151        .boxed()
152    }
153
154    fn delete_object(&self, name: String) -> FutureResult<()> {
155        let store = self.store.clone();
156        async move { store.delete(name).await.context("deleting object") }.boxed()
157    }
158
159    fn has_object(&self, name: String) -> FutureResult<bool> {
160        tracing::trace!("checking existence of object: {name}");
161        let store = self.store.clone();
162
163        async move {
164            let _ = store.get(name).await.context("checking object")?;
165            Ok(true)
166        }
167        .boxed()
168    }
169
170    fn object_info(&self, name: String) -> FutureResult<ObjectMetadata> {
171        tracing::trace!("getting object info: {name}");
172        let store = self.store.clone();
173        let metadata = self.metadata.clone();
174
175        async move {
176            let info = store.info(&name).await.context("getting object info")?;
177
178            Ok(ObjectMetadata {
179                container: metadata.name.clone(),
180                name: info.name,
181                size: info.size as u64,
182                created_at: metadata.created_at,
183            })
184        }
185        .boxed()
186    }
187}