1mod kvredis;
2
3#[macro_use]
4extern crate wasmcloud_provider_core as codec;
5use actorcore::{deserialize, serialize, CapabilityConfiguration, HealthCheckResponse};
6use actorkeyvalue::*;
7use codec::{
8 capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
9 core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR},
10};
11use log::trace;
12use redis::Connection;
13use redis::RedisResult;
14use redis::{self, Commands};
15use std::collections::HashMap;
16use std::error::Error;
17use std::sync::Arc;
18use std::sync::RwLock;
19use wasmcloud_actor_core as actorcore;
20use wasmcloud_actor_keyvalue as actorkeyvalue;
21
22#[allow(unused)]
23const CAPABILITY_ID: &str = "wasmcloud:keyvalue";
24const SYSTEM_ACTOR: &str = "system";
25
26#[cfg(not(feature = "static_plugin"))]
27capability_provider!(RedisKVProvider, RedisKVProvider::new);
28
29#[derive(Clone)]
31pub struct RedisKVProvider {
32 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
33 clients: Arc<RwLock<HashMap<String, redis::Client>>>,
34}
35
36impl Default for RedisKVProvider {
37 fn default() -> Self {
38 match env_logger::try_init() {
39 Ok(_) => {}
40 Err(_) => {}
41 };
42
43 RedisKVProvider {
44 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
45 clients: Arc::new(RwLock::new(HashMap::new())),
46 }
47 }
48}
49
50impl RedisKVProvider {
51 pub fn new() -> Self {
53 RedisKVProvider::default()
54 }
55
56 fn actor_con(&self, actor: &str) -> RedisResult<Connection> {
57 let lock = self.clients.read().unwrap();
58 if let Some(client) = lock.get(actor) {
59 client.get_connection()
60 } else {
61 Err(redis::RedisError::from((
62 redis::ErrorKind::InvalidClientConfig,
63 "No client for this actor. Did the host configure it?",
64 )))
65 }
66 }
67
68 fn configure(
69 &self,
70 config: CapabilityConfiguration,
71 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
72 if self.clients.read().unwrap().contains_key(&config.module) {
73 return Ok(vec![]);
74 }
75 let c = kvredis::initialize_client(config.clone())?;
76
77 self.clients.write().unwrap().insert(config.module, c);
78 Ok(vec![])
79 }
80
81 fn remove_actor(
82 &self,
83 config: CapabilityConfiguration,
84 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
85 self.clients.write().unwrap().remove(&config.module);
86 Ok(vec![])
87 }
88
89 fn add(&self, actor: &str, req: AddArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
90 let mut con = self.actor_con(actor)?;
91 let res: i32 = con.incr(req.key, req.value)?;
92 let resp = AddResponse { value: res };
93
94 Ok(serialize(resp)?)
95 }
96
97 fn del(&self, actor: &str, req: DelArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
98 let mut con = self.actor_con(actor)?;
99 con.del(&req.key)?;
100 let resp = DelResponse { key: req.key };
101
102 Ok(serialize(resp)?)
103 }
104
105 fn get(&self, actor: &str, req: GetArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
106 let mut con = self.actor_con(actor)?;
107 if !con.exists(&req.key)? {
108 Ok(serialize(GetResponse {
109 value: String::from(""),
110 exists: false,
111 })?)
112 } else {
113 let v: redis::RedisResult<String> = con.get(&req.key);
114 Ok(serialize(match v {
115 Ok(s) => GetResponse {
116 value: s,
117 exists: true,
118 },
119 Err(e) => {
120 eprint!("GET for {} failed: {}", &req.key, e);
121 GetResponse {
122 value: "".to_string(),
123 exists: false,
124 }
125 }
126 })?)
127 }
128 }
129
130 fn list_clear(
131 &self,
132 actor: &str,
133 req: ClearArgs,
134 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
135 self.del(actor, DelArgs { key: req.key })
136 }
137
138 fn list_range(
139 &self,
140 actor: &str,
141 req: RangeArgs,
142 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
143 let mut con = self.actor_con(actor)?;
144 let result: Vec<String> = con.lrange(req.key, req.start as _, req.stop as _)?;
145 Ok(serialize(ListRangeResponse { values: result })?)
146 }
147
148 fn list_push(
149 &self,
150 actor: &str,
151 req: PushArgs,
152 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
153 let mut con = self.actor_con(actor)?;
154 let result: i32 = con.lpush(req.key, req.value)?;
155 Ok(serialize(ListResponse { new_count: result })?)
156 }
157
158 fn set(&self, actor: &str, req: SetArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
159 let mut con = self.actor_con(actor)?;
160 con.set(req.key, &req.value)?;
161 Ok(serialize(SetResponse {
162 value: req.value.clone(),
163 })?)
164 }
165
166 fn list_del_item(
167 &self,
168 actor: &str,
169 req: ListItemDeleteArgs,
170 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
171 let mut con = self.actor_con(actor)?;
172 let result: i32 = con.lrem(req.key, 0, &req.value)?;
173 Ok(serialize(ListResponse { new_count: result })?)
174 }
175
176 fn set_add(
177 &self,
178 actor: &str,
179 req: SetAddArgs,
180 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
181 let mut con = self.actor_con(actor)?;
182 let result: i32 = con.sadd(req.key, &req.value)?;
183 Ok(serialize(SetOperationResponse { new_count: result })?)
184 }
185
186 fn set_remove(
187 &self,
188 actor: &str,
189 req: SetRemoveArgs,
190 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
191 let mut con = self.actor_con(actor)?;
192 let result: i32 = con.srem(req.key, &req.value)?;
193 Ok(serialize(SetOperationResponse { new_count: result })?)
194 }
195
196 fn set_union(
197 &self,
198 actor: &str,
199 req: SetUnionArgs,
200 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
201 let mut con = self.actor_con(actor)?;
202 let result: Vec<String> = con.sunion(req.keys)?;
203 Ok(serialize(SetQueryResponse { values: result })?)
204 }
205
206 fn set_intersect(
207 &self,
208 actor: &str,
209 req: SetIntersectionArgs,
210 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
211 let mut con = self.actor_con(actor)?;
212 let result: Vec<String> = con.sinter(req.keys)?;
213 Ok(serialize(SetQueryResponse { values: result })?)
214 }
215
216 fn set_query(
217 &self,
218 actor: &str,
219 req: SetQueryArgs,
220 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
221 let mut con = self.actor_con(actor)?;
222 let result: Vec<String> = con.smembers(req.key)?;
223 Ok(serialize(SetQueryResponse { values: result })?)
224 }
225
226 fn exists(
227 &self,
228 actor: &str,
229 req: KeyExistsArgs,
230 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
231 let mut con = self.actor_con(actor)?;
232 let result: bool = con.exists(req.key)?;
233 Ok(serialize(GetResponse {
234 value: "".to_string(),
235 exists: result,
236 })?)
237 }
238}
239
240impl CapabilityProvider for RedisKVProvider {
241 fn configure_dispatch(
242 &self,
243 dispatcher: Box<dyn Dispatcher>,
244 ) -> Result<(), Box<dyn Error + Sync + Send>> {
245 trace!("Dispatcher received.");
246
247 let mut lock = self.dispatcher.write().unwrap();
248 *lock = dispatcher;
249
250 Ok(())
251 }
252
253 fn stop(&self) {
254 }
256
257 fn handle_call(
258 &self,
259 actor: &str,
260 op: &str,
261 msg: &[u8],
262 ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
263 trace!(
264 "Received host call from {}, operation - {} ({} bytes)",
265 actor,
266 op,
267 msg.len()
268 );
269
270 match op {
271 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => {
272 self.configure(deserialize::<CapabilityConfiguration>(msg).unwrap())
273 }
274 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => {
275 self.remove_actor(deserialize::<CapabilityConfiguration>(msg).unwrap())
276 }
277 OP_ADD => self.add(actor, deserialize(msg).unwrap()),
278 OP_DEL => self.del(actor, deserialize(msg).unwrap()),
279 OP_GET => self.get(actor, deserialize(msg).unwrap()),
280 OP_CLEAR => self.list_clear(actor, deserialize(msg).unwrap()),
281 OP_RANGE => self.list_range(actor, deserialize(msg).unwrap()),
282 OP_PUSH => self.list_push(actor, deserialize(msg).unwrap()),
283 OP_SET => self.set(actor, deserialize(msg).unwrap()),
284 OP_LIST_DEL => self.list_del_item(actor, deserialize(msg).unwrap()),
285 OP_SET_ADD => self.set_add(actor, deserialize(msg).unwrap()),
286 OP_SET_REMOVE => self.set_remove(actor, deserialize(msg).unwrap()),
287 OP_SET_UNION => self.set_union(actor, deserialize(msg).unwrap()),
288 OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg).unwrap()),
289 OP_SET_QUERY => self.set_query(actor, deserialize(msg).unwrap()),
290 OP_KEY_EXISTS => self.exists(actor, deserialize(msg).unwrap()),
291 OP_HEALTH_REQUEST if actor == SYSTEM_ACTOR => Ok(serialize(HealthCheckResponse {
292 healthy: true,
293 message: "".to_string(),
294 })
295 .unwrap()),
296 _ => Err("bad dispatch".into()),
297 }
298 }
299}