wrpc_wasi_keyvalue_redis/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Context as _;
5use bytes::Bytes;
6use redis::{aio::ConnectionManager, AsyncCommands as _};
7use tokio::sync::RwLock;
8use tracing::{instrument, trace};
9use uuid::Uuid;
10use wrpc_transport::{ResourceBorrow, ResourceOwn};
11use wrpc_wasi_keyvalue::exports::wasi::keyvalue::{atomics, batch, store};
12
13pub type Result<T, E = store::Error> = core::result::Result<T, E>;
14
15#[derive(Clone, Default)]
16pub struct Handler(pub Arc<RwLock<HashMap<Bytes, ConnectionManager>>>);
17
18impl Handler {
19    async fn bucket(&self, bucket: impl AsRef<[u8]>) -> Result<ConnectionManager> {
20        trace!("looking up bucket");
21        let store = self.0.read().await;
22        store
23            .get(bucket.as_ref())
24            .ok_or(store::Error::NoSuchStore)
25            .cloned()
26    }
27}
28
29impl<C: Send + Sync> store::Handler<C> for Handler {
30    // NOTE: Resource handle returned is just the `identifier` itself
31    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
32    async fn open(
33        &self,
34        _cx: C,
35        identifier: String,
36    ) -> anyhow::Result<Result<ResourceOwn<store::Bucket>>> {
37        let client = match redis::Client::open(identifier).context("failed to open Redis client") {
38            Ok(client) => client,
39            Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
40        };
41        let conn = match client
42            .get_connection_manager()
43            .await
44            .context("failed to get Redis connection manager")
45        {
46            Ok(conn) => conn,
47            Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
48        };
49        let id = Uuid::now_v7();
50        let id = Bytes::copy_from_slice(id.as_bytes());
51        let mut buckets = self.0.write().await;
52        buckets.insert(id.clone(), conn);
53        return Ok(Ok(ResourceOwn::from(id)));
54    }
55}
56
57impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
58    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
59    async fn get(
60        &self,
61        _cx: C,
62        bucket: ResourceBorrow<store::Bucket>,
63        key: String,
64    ) -> anyhow::Result<Result<Option<Bytes>>> {
65        let mut conn = match self.bucket(bucket).await {
66            Ok(conn) => conn,
67            Err(err) => return Ok(Err(err)),
68        };
69        match conn.get(key).await {
70            Ok(redis::Value::Nil) => Ok(Ok(None)),
71            Ok(redis::Value::BulkString(buf)) => Ok(Ok(Some(buf.into()))),
72            Ok(_) => Ok(Err(store::Error::Other(
73                "invalid data type returned by Redis".into(),
74            ))),
75            Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
76        }
77    }
78
79    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
80    async fn set(
81        &self,
82        _cx: C,
83        bucket: ResourceBorrow<store::Bucket>,
84        key: String,
85        value: Bytes,
86    ) -> anyhow::Result<Result<()>> {
87        let mut conn = match self.bucket(bucket).await {
88            Ok(conn) => conn,
89            Err(err) => return Ok(Err(err)),
90        };
91        match conn.set(key, value.as_ref()).await {
92            Ok(()) => Ok(Ok(())),
93            Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
94        }
95    }
96
97    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
98    async fn delete(
99        &self,
100        _cx: C,
101        bucket: ResourceBorrow<store::Bucket>,
102        key: String,
103    ) -> anyhow::Result<Result<()>> {
104        let mut conn = match self.bucket(bucket).await {
105            Ok(conn) => conn,
106            Err(err) => return Ok(Err(err)),
107        };
108        match conn.del(key).await {
109            Ok(()) => Ok(Ok(())),
110            Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
111        }
112    }
113
114    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
115    async fn exists(
116        &self,
117        _cx: C,
118        bucket: ResourceBorrow<store::Bucket>,
119        key: String,
120    ) -> anyhow::Result<Result<bool>> {
121        let mut conn = match self.bucket(bucket).await {
122            Ok(conn) => conn,
123            Err(err) => return Ok(Err(err)),
124        };
125        match conn.exists(key).await {
126            Ok(ok) => Ok(Ok(ok)),
127            Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
128        }
129    }
130
131    #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
132    async fn list_keys(
133        &self,
134        _cx: C,
135        bucket: ResourceBorrow<store::Bucket>,
136        cursor: Option<String>,
137    ) -> anyhow::Result<Result<store::KeyResponse>> {
138        Ok(Err(store::Error::Other("not supported".into())))
139    }
140}
141
142impl<C: Send + Sync> atomics::Handler<C> for Handler {
143    async fn increment(
144        &self,
145        _cx: C,
146        _bucket: ResourceBorrow<store::Bucket>,
147        _key: String,
148        _delta: i64,
149    ) -> anyhow::Result<Result<i64>> {
150        Ok(Err(store::Error::Other("not supported".into())))
151    }
152
153    async fn swap(
154        &self,
155        _cx: C,
156        _cas: ResourceOwn<atomics::Cas>,
157        _value: Bytes,
158    ) -> anyhow::Result<Result<(), atomics::CasError>> {
159        Ok(Err(atomics::CasError::StoreError(store::Error::Other(
160            "not supported".into(),
161        ))))
162    }
163}
164
165impl<C: Send + Sync> atomics::HandlerCas<C> for Handler {
166    async fn new(
167        &self,
168        _cx: C,
169        _bucket: ResourceBorrow<store::Bucket>,
170        _key: String,
171    ) -> anyhow::Result<Result<ResourceOwn<atomics::Cas>>> {
172        Ok(Err(store::Error::Other("not supported".into())))
173    }
174
175    async fn current(
176        &self,
177        _cx: C,
178        _bucket: ResourceBorrow<atomics::Cas>,
179    ) -> anyhow::Result<Result<Option<Bytes>>> {
180        Ok(Err(store::Error::Other("not supported".into())))
181    }
182}
183
184impl<C: Send + Sync> batch::Handler<C> for Handler {
185    async fn get_many(
186        &self,
187        _cx: C,
188        _bucket: ResourceBorrow<store::Bucket>,
189        _keys: Vec<String>,
190    ) -> anyhow::Result<Result<Vec<Option<(String, Bytes)>>>> {
191        Ok(Err(store::Error::Other("not supported".into())))
192    }
193
194    async fn set_many(
195        &self,
196        _cx: C,
197        _bucket: ResourceBorrow<store::Bucket>,
198        _key_values: Vec<(String, Bytes)>,
199    ) -> anyhow::Result<Result<()>> {
200        Ok(Err(store::Error::Other("not supported".into())))
201    }
202
203    async fn delete_many(
204        &self,
205        _cx: C,
206        _bucket: ResourceBorrow<store::Bucket>,
207        _keys: Vec<String>,
208    ) -> anyhow::Result<Result<()>> {
209        Ok(Err(store::Error::Other("not supported".into())))
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[allow(unused)]
218    async fn bound(s: &impl wrpc_transport::Serve) -> anyhow::Result<()> {
219        wrpc_wasi_keyvalue::serve(s, Handler::default()).await?;
220        Ok(())
221    }
222}