wrpc_wasi_keyvalue_redis/
lib.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Context as _;
5use bytes::Bytes;
6use redis::{aio::ConnectionManager, AsyncCommands as _};
7use tokio::sync::RwLock;
8use tracing::{instrument, trace};
9use uuid::Uuid;
10use wrpc_transport::{ResourceBorrow, ResourceOwn};
11use wrpc_wasi_keyvalue::exports::wasi::keyvalue::{atomics, batch, store};
12
13pub type Result<T, E = store::Error> = core::result::Result<T, E>;
14
15#[derive(Clone, Default)]
16pub struct Handler(pub Arc<RwLock<HashMap<Bytes, ConnectionManager>>>);
17
18impl Handler {
19 async fn bucket(&self, bucket: impl AsRef<[u8]>) -> Result<ConnectionManager> {
20 trace!("looking up bucket");
21 let store = self.0.read().await;
22 store
23 .get(bucket.as_ref())
24 .ok_or(store::Error::NoSuchStore)
25 .cloned()
26 }
27}
28
29impl<C: Send + Sync> store::Handler<C> for Handler {
30 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
32 async fn open(
33 &self,
34 _cx: C,
35 identifier: String,
36 ) -> anyhow::Result<Result<ResourceOwn<store::Bucket>>> {
37 let client = match redis::Client::open(identifier).context("failed to open Redis client") {
38 Ok(client) => client,
39 Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
40 };
41 let conn = match client
42 .get_connection_manager()
43 .await
44 .context("failed to get Redis connection manager")
45 {
46 Ok(conn) => conn,
47 Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
48 };
49 let id = Uuid::now_v7();
50 let id = Bytes::copy_from_slice(id.as_bytes());
51 let mut buckets = self.0.write().await;
52 buckets.insert(id.clone(), conn);
53 return Ok(Ok(ResourceOwn::from(id)));
54 }
55}
56
57impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
58 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
59 async fn get(
60 &self,
61 _cx: C,
62 bucket: ResourceBorrow<store::Bucket>,
63 key: String,
64 ) -> anyhow::Result<Result<Option<Bytes>>> {
65 let mut conn = match self.bucket(bucket).await {
66 Ok(conn) => conn,
67 Err(err) => return Ok(Err(err)),
68 };
69 match conn.get(key).await {
70 Ok(redis::Value::Nil) => Ok(Ok(None)),
71 Ok(redis::Value::BulkString(buf)) => Ok(Ok(Some(buf.into()))),
72 Ok(_) => Ok(Err(store::Error::Other(
73 "invalid data type returned by Redis".into(),
74 ))),
75 Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
76 }
77 }
78
79 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
80 async fn set(
81 &self,
82 _cx: C,
83 bucket: ResourceBorrow<store::Bucket>,
84 key: String,
85 value: Bytes,
86 ) -> anyhow::Result<Result<()>> {
87 let mut conn = match self.bucket(bucket).await {
88 Ok(conn) => conn,
89 Err(err) => return Ok(Err(err)),
90 };
91 match conn.set(key, value.as_ref()).await {
92 Ok(()) => Ok(Ok(())),
93 Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
94 }
95 }
96
97 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
98 async fn delete(
99 &self,
100 _cx: C,
101 bucket: ResourceBorrow<store::Bucket>,
102 key: String,
103 ) -> anyhow::Result<Result<()>> {
104 let mut conn = match self.bucket(bucket).await {
105 Ok(conn) => conn,
106 Err(err) => return Ok(Err(err)),
107 };
108 match conn.del(key).await {
109 Ok(()) => Ok(Ok(())),
110 Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
111 }
112 }
113
114 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
115 async fn exists(
116 &self,
117 _cx: C,
118 bucket: ResourceBorrow<store::Bucket>,
119 key: String,
120 ) -> anyhow::Result<Result<bool>> {
121 let mut conn = match self.bucket(bucket).await {
122 Ok(conn) => conn,
123 Err(err) => return Ok(Err(err)),
124 };
125 match conn.exists(key).await {
126 Ok(ok) => Ok(Ok(ok)),
127 Err(err) => Ok(Err(store::Error::Other(err.to_string()))),
128 }
129 }
130
131 #[instrument(level = "trace", skip(self, _cx), ret(level = "trace"))]
132 async fn list_keys(
133 &self,
134 _cx: C,
135 bucket: ResourceBorrow<store::Bucket>,
136 cursor: Option<String>,
137 ) -> anyhow::Result<Result<store::KeyResponse>> {
138 Ok(Err(store::Error::Other("not supported".into())))
139 }
140}
141
142impl<C: Send + Sync> atomics::Handler<C> for Handler {
143 async fn increment(
144 &self,
145 _cx: C,
146 _bucket: ResourceBorrow<store::Bucket>,
147 _key: String,
148 _delta: i64,
149 ) -> anyhow::Result<Result<i64>> {
150 Ok(Err(store::Error::Other("not supported".into())))
151 }
152
153 async fn swap(
154 &self,
155 _cx: C,
156 _cas: ResourceOwn<atomics::Cas>,
157 _value: Bytes,
158 ) -> anyhow::Result<Result<(), atomics::CasError>> {
159 Ok(Err(atomics::CasError::StoreError(store::Error::Other(
160 "not supported".into(),
161 ))))
162 }
163}
164
165impl<C: Send + Sync> atomics::HandlerCas<C> for Handler {
166 async fn new(
167 &self,
168 _cx: C,
169 _bucket: ResourceBorrow<store::Bucket>,
170 _key: String,
171 ) -> anyhow::Result<Result<ResourceOwn<atomics::Cas>>> {
172 Ok(Err(store::Error::Other("not supported".into())))
173 }
174
175 async fn current(
176 &self,
177 _cx: C,
178 _bucket: ResourceBorrow<atomics::Cas>,
179 ) -> anyhow::Result<Result<Option<Bytes>>> {
180 Ok(Err(store::Error::Other("not supported".into())))
181 }
182}
183
184impl<C: Send + Sync> batch::Handler<C> for Handler {
185 async fn get_many(
186 &self,
187 _cx: C,
188 _bucket: ResourceBorrow<store::Bucket>,
189 _keys: Vec<String>,
190 ) -> anyhow::Result<Result<Vec<Option<(String, Bytes)>>>> {
191 Ok(Err(store::Error::Other("not supported".into())))
192 }
193
194 async fn set_many(
195 &self,
196 _cx: C,
197 _bucket: ResourceBorrow<store::Bucket>,
198 _key_values: Vec<(String, Bytes)>,
199 ) -> anyhow::Result<Result<()>> {
200 Ok(Err(store::Error::Other("not supported".into())))
201 }
202
203 async fn delete_many(
204 &self,
205 _cx: C,
206 _bucket: ResourceBorrow<store::Bucket>,
207 _keys: Vec<String>,
208 ) -> anyhow::Result<Result<()>> {
209 Ok(Err(store::Error::Other("not supported".into())))
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[allow(unused)]
218 async fn bound(s: &impl wrpc_transport::Serve) -> anyhow::Result<()> {
219 wrpc_wasi_keyvalue::serve(s, Handler::default()).await?;
220 Ok(())
221 }
222}