Skip to main content

omnia_nats/
keyvalue.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_nats::jetstream::kv::Config;
6use async_nats::jetstream::{self, kv};
7use futures::TryStreamExt;
8use futures::future::FutureExt;
9use omnia_wasi_keyvalue::{Bucket, FutureResult, WasiKeyValueCtx};
10
11use crate::Client;
12
13/// `wasi-keyvalue` implementation backed by NATS JetStream KV store.
14impl WasiKeyValueCtx for Client {
15    fn open_bucket(&self, identifier: String) -> FutureResult<Arc<dyn Bucket>> {
16        tracing::trace!("opening bucket: {identifier}");
17        let client = self.inner.clone();
18
19        async move {
20            let jetstream = jetstream::new(client);
21            let store = if let Ok(store) = jetstream.get_key_value(&identifier).await {
22                store
23            } else {
24                let result = jetstream
25                    .create_key_value(Config {
26                        bucket: identifier,
27                        history: 1,
28                        max_age: Duration::from_mins(10),
29                        max_bytes: 100 * 1024 * 1024, // 100 MiB
30                        ..Config::default()
31                    })
32                    .await;
33
34                result.context("failed to create bucket")?
35            };
36
37            Ok(Arc::new(KvBucket(store)) as Arc<dyn Bucket>)
38        }
39        .boxed()
40    }
41}
42
43/// A key-value bucket backed by a NATS JetStream KV store.
44#[derive(Debug)]
45pub struct KvBucket(pub kv::Store);
46
47impl Bucket for KvBucket {
48    fn name(&self) -> &'static str {
49        Box::leak(self.0.name.clone().into_boxed_str())
50    }
51
52    fn get(&self, key: String) -> FutureResult<Option<Vec<u8>>> {
53        tracing::trace!("getting key: {key}");
54        let store = self.0.clone();
55
56        async move {
57            let entry = store.get(key).await.context("getting key")?;
58            Ok(entry.map(Into::into))
59        }
60        .boxed()
61    }
62
63    fn set(&self, key: String, value: Vec<u8>) -> FutureResult<()> {
64        tracing::trace!("setting key: {key}");
65        let store = self.0.clone();
66
67        async move {
68            store.put(key, value.into()).await.context("setting key")?;
69            Ok(())
70        }
71        .boxed()
72    }
73
74    fn delete(&self, key: String) -> FutureResult<()> {
75        tracing::trace!("deleting key: {key}");
76        let store = self.0.clone();
77
78        async move {
79            store.delete(key).await.context("deleting key")?;
80            Ok(())
81        }
82        .boxed()
83    }
84
85    fn exists(&self, key: String) -> FutureResult<bool> {
86        tracing::trace!("checking existence of key: {key}");
87        let store = self.0.clone();
88
89        async move {
90            let entry = store.get(key).await.context("checking key")?;
91            Ok(entry.is_some())
92        }
93        .boxed()
94    }
95
96    fn keys(&self) -> FutureResult<Vec<String>> {
97        let store = self.0.clone();
98
99        async move {
100            tracing::trace!("listing keys");
101
102            let key_results = store.keys().await.context("listing keys")?;
103            let keys =
104                key_results.try_filter_map(|k| async move { Ok(Some(k)) }).try_collect().await?;
105
106            Ok(keys)
107        }
108        .boxed()
109    }
110}