wasmcloud_redis/
lib.rs

1mod kvredis;
2
3#[macro_use]
4extern crate wasmcloud_provider_core as codec;
5use actorcore::{deserialize, serialize, CapabilityConfiguration, HealthCheckResponse};
6use actorkeyvalue::*;
7use codec::{
8    capabilities::{CapabilityProvider, Dispatcher, NullDispatcher},
9    core::{OP_BIND_ACTOR, OP_HEALTH_REQUEST, OP_REMOVE_ACTOR},
10};
11use log::trace;
12use redis::Connection;
13use redis::RedisResult;
14use redis::{self, Commands};
15use std::collections::HashMap;
16use std::error::Error;
17use std::sync::Arc;
18use std::sync::RwLock;
19use wasmcloud_actor_core as actorcore;
20use wasmcloud_actor_keyvalue as actorkeyvalue;
21
22#[allow(unused)]
23const CAPABILITY_ID: &str = "wasmcloud:keyvalue";
24const SYSTEM_ACTOR: &str = "system";
25
26#[cfg(not(feature = "static_plugin"))]
27capability_provider!(RedisKVProvider, RedisKVProvider::new);
28
29/// Redis implementation of the `wasmcloud:keyvalue` specification
30#[derive(Clone)]
31pub struct RedisKVProvider {
32    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
33    clients: Arc<RwLock<HashMap<String, redis::Client>>>,
34}
35
36impl Default for RedisKVProvider {
37    fn default() -> Self {
38        match env_logger::try_init() {
39            Ok(_) => {}
40            Err(_) => {}
41        };
42
43        RedisKVProvider {
44            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
45            clients: Arc::new(RwLock::new(HashMap::new())),
46        }
47    }
48}
49
50impl RedisKVProvider {
51    /// Creates a new Redis provider
52    pub fn new() -> Self {
53        RedisKVProvider::default()
54    }
55
56    fn actor_con(&self, actor: &str) -> RedisResult<Connection> {
57        let lock = self.clients.read().unwrap();
58        if let Some(client) = lock.get(actor) {
59            client.get_connection()
60        } else {
61            Err(redis::RedisError::from((
62                redis::ErrorKind::InvalidClientConfig,
63                "No client for this actor. Did the host configure it?",
64            )))
65        }
66    }
67
68    fn configure(
69        &self,
70        config: CapabilityConfiguration,
71    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
72        if self.clients.read().unwrap().contains_key(&config.module) {
73            return Ok(vec![]);
74        }
75        let c = kvredis::initialize_client(config.clone())?;
76
77        self.clients.write().unwrap().insert(config.module, c);
78        Ok(vec![])
79    }
80
81    fn remove_actor(
82        &self,
83        config: CapabilityConfiguration,
84    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
85        self.clients.write().unwrap().remove(&config.module);
86        Ok(vec![])
87    }
88
89    fn add(&self, actor: &str, req: AddArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
90        let mut con = self.actor_con(actor)?;
91        let res: i32 = con.incr(req.key, req.value)?;
92        let resp = AddResponse { value: res };
93
94        Ok(serialize(resp)?)
95    }
96
97    fn del(&self, actor: &str, req: DelArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
98        let mut con = self.actor_con(actor)?;
99        con.del(&req.key)?;
100        let resp = DelResponse { key: req.key };
101
102        Ok(serialize(resp)?)
103    }
104
105    fn get(&self, actor: &str, req: GetArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
106        let mut con = self.actor_con(actor)?;
107        if !con.exists(&req.key)? {
108            Ok(serialize(GetResponse {
109                value: String::from(""),
110                exists: false,
111            })?)
112        } else {
113            let v: redis::RedisResult<String> = con.get(&req.key);
114            Ok(serialize(match v {
115                Ok(s) => GetResponse {
116                    value: s,
117                    exists: true,
118                },
119                Err(e) => {
120                    eprint!("GET for {} failed: {}", &req.key, e);
121                    GetResponse {
122                        value: "".to_string(),
123                        exists: false,
124                    }
125                }
126            })?)
127        }
128    }
129
130    fn list_clear(
131        &self,
132        actor: &str,
133        req: ClearArgs,
134    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
135        self.del(actor, DelArgs { key: req.key })
136    }
137
138    fn list_range(
139        &self,
140        actor: &str,
141        req: RangeArgs,
142    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
143        let mut con = self.actor_con(actor)?;
144        let result: Vec<String> = con.lrange(req.key, req.start as _, req.stop as _)?;
145        Ok(serialize(ListRangeResponse { values: result })?)
146    }
147
148    fn list_push(
149        &self,
150        actor: &str,
151        req: PushArgs,
152    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
153        let mut con = self.actor_con(actor)?;
154        let result: i32 = con.lpush(req.key, req.value)?;
155        Ok(serialize(ListResponse { new_count: result })?)
156    }
157
158    fn set(&self, actor: &str, req: SetArgs) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
159        let mut con = self.actor_con(actor)?;
160        con.set(req.key, &req.value)?;
161        Ok(serialize(SetResponse {
162            value: req.value.clone(),
163        })?)
164    }
165
166    fn list_del_item(
167        &self,
168        actor: &str,
169        req: ListItemDeleteArgs,
170    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
171        let mut con = self.actor_con(actor)?;
172        let result: i32 = con.lrem(req.key, 0, &req.value)?;
173        Ok(serialize(ListResponse { new_count: result })?)
174    }
175
176    fn set_add(
177        &self,
178        actor: &str,
179        req: SetAddArgs,
180    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
181        let mut con = self.actor_con(actor)?;
182        let result: i32 = con.sadd(req.key, &req.value)?;
183        Ok(serialize(SetOperationResponse { new_count: result })?)
184    }
185
186    fn set_remove(
187        &self,
188        actor: &str,
189        req: SetRemoveArgs,
190    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
191        let mut con = self.actor_con(actor)?;
192        let result: i32 = con.srem(req.key, &req.value)?;
193        Ok(serialize(SetOperationResponse { new_count: result })?)
194    }
195
196    fn set_union(
197        &self,
198        actor: &str,
199        req: SetUnionArgs,
200    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
201        let mut con = self.actor_con(actor)?;
202        let result: Vec<String> = con.sunion(req.keys)?;
203        Ok(serialize(SetQueryResponse { values: result })?)
204    }
205
206    fn set_intersect(
207        &self,
208        actor: &str,
209        req: SetIntersectionArgs,
210    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
211        let mut con = self.actor_con(actor)?;
212        let result: Vec<String> = con.sinter(req.keys)?;
213        Ok(serialize(SetQueryResponse { values: result })?)
214    }
215
216    fn set_query(
217        &self,
218        actor: &str,
219        req: SetQueryArgs,
220    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
221        let mut con = self.actor_con(actor)?;
222        let result: Vec<String> = con.smembers(req.key)?;
223        Ok(serialize(SetQueryResponse { values: result })?)
224    }
225
226    fn exists(
227        &self,
228        actor: &str,
229        req: KeyExistsArgs,
230    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
231        let mut con = self.actor_con(actor)?;
232        let result: bool = con.exists(req.key)?;
233        Ok(serialize(GetResponse {
234            value: "".to_string(),
235            exists: result,
236        })?)
237    }
238}
239
240impl CapabilityProvider for RedisKVProvider {
241    fn configure_dispatch(
242        &self,
243        dispatcher: Box<dyn Dispatcher>,
244    ) -> Result<(), Box<dyn Error + Sync + Send>> {
245        trace!("Dispatcher received.");
246
247        let mut lock = self.dispatcher.write().unwrap();
248        *lock = dispatcher;
249
250        Ok(())
251    }
252
253    fn stop(&self) {
254        // Nothing to do here
255    }
256
257    fn handle_call(
258        &self,
259        actor: &str,
260        op: &str,
261        msg: &[u8],
262    ) -> Result<Vec<u8>, Box<dyn Error + Sync + Send>> {
263        trace!(
264            "Received host call from {}, operation - {} ({} bytes)",
265            actor,
266            op,
267            msg.len()
268        );
269
270        match op {
271            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => {
272                self.configure(deserialize::<CapabilityConfiguration>(msg).unwrap())
273            }
274            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => {
275                self.remove_actor(deserialize::<CapabilityConfiguration>(msg).unwrap())
276            }
277            OP_ADD => self.add(actor, deserialize(msg).unwrap()),
278            OP_DEL => self.del(actor, deserialize(msg).unwrap()),
279            OP_GET => self.get(actor, deserialize(msg).unwrap()),
280            OP_CLEAR => self.list_clear(actor, deserialize(msg).unwrap()),
281            OP_RANGE => self.list_range(actor, deserialize(msg).unwrap()),
282            OP_PUSH => self.list_push(actor, deserialize(msg).unwrap()),
283            OP_SET => self.set(actor, deserialize(msg).unwrap()),
284            OP_LIST_DEL => self.list_del_item(actor, deserialize(msg).unwrap()),
285            OP_SET_ADD => self.set_add(actor, deserialize(msg).unwrap()),
286            OP_SET_REMOVE => self.set_remove(actor, deserialize(msg).unwrap()),
287            OP_SET_UNION => self.set_union(actor, deserialize(msg).unwrap()),
288            OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg).unwrap()),
289            OP_SET_QUERY => self.set_query(actor, deserialize(msg).unwrap()),
290            OP_KEY_EXISTS => self.exists(actor, deserialize(msg).unwrap()),
291            OP_HEALTH_REQUEST if actor == SYSTEM_ACTOR => Ok(serialize(HealthCheckResponse {
292                healthy: true,
293                message: "".to_string(),
294            })
295            .unwrap()),
296            _ => Err("bad dispatch".into()),
297        }
298    }
299}