1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_nats::jetstream::kv::Config;
6use async_nats::jetstream::{self, kv};
7use futures::TryStreamExt;
8use futures::future::FutureExt;
9use omnia_wasi_keyvalue::{Bucket, FutureResult, WasiKeyValueCtx};
10
11use crate::Client;
12
13impl WasiKeyValueCtx for Client {
15 fn open_bucket(&self, identifier: String) -> FutureResult<Arc<dyn Bucket>> {
16 tracing::trace!("opening bucket: {identifier}");
17 let client = self.inner.clone();
18
19 async move {
20 let jetstream = jetstream::new(client);
21 let store = if let Ok(store) = jetstream.get_key_value(&identifier).await {
22 store
23 } else {
24 let result = jetstream
25 .create_key_value(Config {
26 bucket: identifier,
27 history: 1,
28 max_age: Duration::from_mins(10),
29 max_bytes: 100 * 1024 * 1024, ..Config::default()
31 })
32 .await;
33
34 result.context("failed to create bucket")?
35 };
36
37 Ok(Arc::new(KvBucket(store)) as Arc<dyn Bucket>)
38 }
39 .boxed()
40 }
41}
42
43#[derive(Debug)]
45pub struct KvBucket(pub kv::Store);
46
47impl Bucket for KvBucket {
48 fn name(&self) -> &'static str {
49 Box::leak(self.0.name.clone().into_boxed_str())
50 }
51
52 fn get(&self, key: String) -> FutureResult<Option<Vec<u8>>> {
53 tracing::trace!("getting key: {key}");
54 let store = self.0.clone();
55
56 async move {
57 let entry = store.get(key).await.context("getting key")?;
58 Ok(entry.map(Into::into))
59 }
60 .boxed()
61 }
62
63 fn set(&self, key: String, value: Vec<u8>) -> FutureResult<()> {
64 tracing::trace!("setting key: {key}");
65 let store = self.0.clone();
66
67 async move {
68 store.put(key, value.into()).await.context("setting key")?;
69 Ok(())
70 }
71 .boxed()
72 }
73
74 fn delete(&self, key: String) -> FutureResult<()> {
75 tracing::trace!("deleting key: {key}");
76 let store = self.0.clone();
77
78 async move {
79 store.delete(key).await.context("deleting key")?;
80 Ok(())
81 }
82 .boxed()
83 }
84
85 fn exists(&self, key: String) -> FutureResult<bool> {
86 tracing::trace!("checking existence of key: {key}");
87 let store = self.0.clone();
88
89 async move {
90 let entry = store.get(key).await.context("checking key")?;
91 Ok(entry.is_some())
92 }
93 .boxed()
94 }
95
96 fn keys(&self) -> FutureResult<Vec<String>> {
97 let store = self.0.clone();
98
99 async move {
100 tracing::trace!("listing keys");
101
102 let key_results = store.keys().await.context("listing keys")?;
103 let keys =
104 key_results.try_filter_map(|k| async move { Ok(Some(k)) }).try_collect().await?;
105
106 Ok(keys)
107 }
108 .boxed()
109 }
110}