1use std::fmt::Debug;
2use std::sync::Arc;
3
4use anyhow::Context;
5use async_nats::jetstream;
6use async_nats::jetstream::object_store::{Config, ObjectStore};
7use chrono::Utc;
8use futures::{FutureExt, StreamExt};
9use omnia_wasi_blobstore::{
10 Container, ContainerMetadata, FutureResult, ObjectMetadata, WasiBlobstoreCtx,
11};
12use tokio::io::AsyncReadExt;
13
14use crate::Client;
15
16impl WasiBlobstoreCtx for Client {
18 fn create_container(&self, name: String) -> FutureResult<Arc<dyn Container>> {
19 tracing::trace!("creating container: {name}");
20 let client = self.inner.clone();
21
22 async move {
23 let store = jetstream::new(client)
24 .create_object_store(Config {
25 bucket: name.clone(),
26 ..Default::default()
27 })
28 .await
29 .context("creating object store")?;
30 let metadata = metadata(name);
31
32 Ok(Arc::new(NatsContainer { metadata, store }) as Arc<dyn Container>)
33 }
34 .boxed()
35 }
36
37 fn get_container(&self, name: String) -> FutureResult<Arc<dyn Container>> {
38 tracing::trace!("getting container: {name}");
39 let client = self.inner.clone();
40
41 async move {
42 let store = jetstream::new(client)
43 .get_object_store(&name)
44 .await
45 .context("getting object store")?;
46 let metadata = metadata(name);
47
48 Ok(Arc::new(NatsContainer { metadata, store }) as Arc<dyn Container>)
49 }
50 .boxed()
51 }
52
53 fn delete_container(&self, name: String) -> FutureResult<()> {
54 tracing::trace!("deleting container: {name}");
55 let client = self.inner.clone();
56
57 async move {
58 jetstream::new(client)
59 .delete_object_store(&name)
60 .await
61 .context("issue deleting object store")?;
62 Ok(())
63 }
64 .boxed()
65 }
66
67 fn container_exists(&self, name: String) -> FutureResult<bool> {
68 tracing::trace!("checking existence of container: {name}");
69 let client = self.inner.clone();
70
71 async move {
72 let exists = jetstream::new(client).get_object_store(&name).await.is_ok();
73 Ok(exists)
74 }
75 .boxed()
76 }
77}
78
79fn metadata(name: String) -> ContainerMetadata {
80 #[allow(clippy::cast_sign_loss)]
81 ContainerMetadata {
82 name,
83 created_at: Utc::now().timestamp() as u64,
84 }
85}
86
87pub struct NatsContainer {
89 metadata: ContainerMetadata,
90 store: ObjectStore,
91}
92
93impl Debug for NatsContainer {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("NatsContainer").finish()
96 }
97}
98
99impl Container for NatsContainer {
100 fn name(&self) -> anyhow::Result<String> {
101 tracing::trace!("getting container name");
102 Ok(self.metadata.name.clone())
103 }
104
105 fn info(&self) -> anyhow::Result<ContainerMetadata> {
106 Ok(self.metadata.clone())
107 }
108
109 fn get_data(&self, name: String, _start: u64, _end: u64) -> FutureResult<Option<Vec<u8>>> {
110 tracing::trace!("getting object data: {name}");
111 let store = self.store.clone();
112
113 async move {
114 let mut object = store.get(name).await.context("getting object")?;
115 let mut bytes = vec![];
116 object.read_to_end(&mut bytes).await?;
117 Ok(Some(bytes))
118 }
119 .boxed()
120 }
121
122 fn write_data(&self, name: String, data: Vec<u8>) -> FutureResult<()> {
123 tracing::trace!("writing object data: {name}");
124 let store = self.store.clone();
125
126 async move {
127 store.put(name.as_str(), &mut data.as_slice()).await.context("writing object")?;
128 Ok(())
129 }
130 .boxed()
131 }
132
133 fn list_objects(&self) -> FutureResult<Vec<String>> {
134 tracing::trace!("listing objects");
135 let store = self.store.clone();
136
137 async move {
138 let mut objects = store.list().await.context("listing objects")?;
139
140 let mut names = vec![];
141 while let Some(n) = objects.next().await {
142 let Ok(obj_info) = n else {
143 tracing::warn!("issue listing object");
144 continue;
145 };
146 names.push(obj_info.name);
147 }
148
149 Ok(names)
150 }
151 .boxed()
152 }
153
154 fn delete_object(&self, name: String) -> FutureResult<()> {
155 let store = self.store.clone();
156 async move { store.delete(name).await.context("deleting object") }.boxed()
157 }
158
159 fn has_object(&self, name: String) -> FutureResult<bool> {
160 tracing::trace!("checking existence of object: {name}");
161 let store = self.store.clone();
162
163 async move {
164 let _ = store.get(name).await.context("checking object")?;
165 Ok(true)
166 }
167 .boxed()
168 }
169
170 fn object_info(&self, name: String) -> FutureResult<ObjectMetadata> {
171 tracing::trace!("getting object info: {name}");
172 let store = self.store.clone();
173 let metadata = self.metadata.clone();
174
175 async move {
176 let info = store.info(&name).await.context("getting object info")?;
177
178 Ok(ObjectMetadata {
179 container: metadata.name.clone(),
180 name: info.name,
181 size: info.size as u64,
182 created_at: metadata.created_at,
183 })
184 }
185 .boxed()
186 }
187}