inmemory_keyvalue/
lib.rs

1#[macro_use]
2extern crate wascc_codec as codec;
3#[macro_use]
4extern crate log;
5
6mod kv;
7use crate::kv::KeyValueStore;
8use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
9use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
10use std::error::Error;
11use std::sync::{Arc, RwLock};
12use wascc_codec::{deserialize, serialize};
13use wasmcloud_actor_core::CapabilityConfiguration;
14use wasmcloud_actor_keyvalue::*;
15
16#[cfg(not(feature = "static_plugin"))]
17capability_provider!(KeyvalueProvider, KeyvalueProvider::new);
18
19#[allow(unused)]
20const CAPABILITY_ID: &str = "wasmcloud:keyvalue";
21const SYSTEM_ACTOR: &str = "system";
22
23#[derive(Clone)]
24pub struct KeyvalueProvider {
25    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
26    store: Arc<RwLock<KeyValueStore>>,
27}
28
29impl Default for KeyvalueProvider {
30    fn default() -> Self {
31        if env_logger::try_init().is_ok() {};
32        KeyvalueProvider {
33            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
34            store: Arc::new(RwLock::new(KeyValueStore::new())),
35        }
36    }
37}
38
39impl KeyvalueProvider {
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    fn configure(
45        &self,
46        _config: CapabilityConfiguration,
47    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
48        // Do nothing here
49        Ok(vec![])
50    }
51
52    fn remove_actor(
53        &self,
54        _config: CapabilityConfiguration,
55    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
56        // Do nothing here
57        Ok(vec![])
58    }
59
60    fn add(&self, _actor: &str, req: AddArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
61        let mut store = self.store.write().unwrap();
62        let res: i32 = store.incr(&req.key, req.value)?;
63        let resp = AddResponse { value: res };
64
65        Ok(serialize(resp)?)
66    }
67
68    fn del(&self, _actor: &str, req: DelArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
69        let mut store = self.store.write().unwrap();
70        store.del(&req.key)?;
71        let resp = DelResponse { key: req.key };
72
73        Ok(serialize(resp)?)
74    }
75
76    fn get(&self, _actor: &str, req: GetArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
77        let store = self.store.read().unwrap();
78        if !store.exists(&req.key)? {
79            Ok(serialize(GetResponse {
80                value: String::from(""),
81                exists: false,
82            })?)
83        } else {
84            let v = store.get(&req.key);
85            Ok(serialize(match v {
86                Ok(s) => GetResponse {
87                    value: s,
88                    exists: true,
89                },
90                Err(e) => {
91                    eprint!("GET for {} failed: {}", &req.key, e);
92                    GetResponse {
93                        value: "".to_string(),
94                        exists: false,
95                    }
96                }
97            })?)
98        }
99    }
100
101    fn list_clear(
102        &self,
103        actor: &str,
104        req: ClearArgs,
105    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
106        self.del(actor, DelArgs { key: req.key })
107    }
108
109    fn list_range(
110        &self,
111        _actor: &str,
112        req: RangeArgs,
113    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
114        let store = self.store.read().unwrap();
115        let result: Vec<String> = store.lrange(&req.key, req.start as _, req.stop as _)?;
116        Ok(serialize(ListRangeResponse { values: result })?)
117    }
118
119    fn list_push(
120        &self,
121        _actor: &str,
122        req: PushArgs,
123    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
124        let mut store = self.store.write().unwrap();
125        let result: i32 = store.lpush(&req.key, req.value)?;
126        Ok(serialize(ListResponse { new_count: result })?)
127    }
128
129    fn set(&self, _actor: &str, req: SetArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
130        let mut store = self.store.write().unwrap();
131        store.set(&req.key, req.value.clone())?;
132        Ok(serialize(SetResponse { value: req.value })?)
133    }
134
135    fn list_del_item(
136        &self,
137        _actor: &str,
138        req: ListItemDeleteArgs,
139    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
140        let mut store = self.store.write().unwrap();
141        let result: i32 = store.lrem(&req.key, req.value)?;
142        Ok(serialize(ListResponse { new_count: result })?)
143    }
144
145    fn set_add(
146        &self,
147        _actor: &str,
148        req: SetAddArgs,
149    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
150        let mut store = self.store.write().unwrap();
151        let result: i32 = store.sadd(&req.key, req.value)?;
152        Ok(serialize(SetOperationResponse { new_count: result })?)
153    }
154
155    fn set_remove(
156        &self,
157        _actor: &str,
158        req: SetRemoveArgs,
159    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
160        let mut store = self.store.write().unwrap();
161        let result: i32 = store.srem(&req.key, req.value)?;
162        Ok(serialize(SetOperationResponse { new_count: result })?)
163    }
164
165    fn set_union(
166        &self,
167        _actor: &str,
168        req: SetUnionArgs,
169    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
170        let store = self.store.read().unwrap();
171        let result: Vec<String> = store.sunion(req.keys)?;
172        Ok(serialize(SetQueryResponse { values: result })?)
173    }
174
175    fn set_intersect(
176        &self,
177        _actor: &str,
178        req: SetIntersectionArgs,
179    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
180        let store = self.store.read().unwrap();
181        let result: Vec<String> = store.sinter(req.keys)?;
182        Ok(serialize(SetQueryResponse { values: result })?)
183    }
184
185    fn set_query(
186        &self,
187        _actor: &str,
188        req: SetQueryArgs,
189    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
190        let store = self.store.read().unwrap();
191        let result: Vec<String> = store.smembers(req.key)?;
192        Ok(serialize(SetQueryResponse { values: result })?)
193    }
194
195    fn exists(
196        &self,
197        _actor: &str,
198        req: KeyExistsArgs,
199    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
200        let store = self.store.read().unwrap();
201        let result: bool = store.exists(&req.key)?;
202        Ok(serialize(GetResponse {
203            value: "".to_string(),
204            exists: result,
205        })?)
206    }
207}
208
209impl CapabilityProvider for KeyvalueProvider {
210    // Invoked by the runtime host to give this provider plugin the ability to communicate
211    // with actors
212    fn configure_dispatch(
213        &self,
214        dispatcher: Box<dyn Dispatcher>,
215    ) -> Result<(), Box<dyn Error + Send + Sync>> {
216        trace!("Dispatcher received.");
217        let mut lock = self.dispatcher.write().unwrap();
218        *lock = dispatcher;
219
220        Ok(())
221    }
222
223    // Invoked by host runtime to allow an actor to make use of the capability
224    // All providers MUST handle the "configure" message, even if no work will be done
225    fn handle_call(
226        &self,
227        actor: &str,
228        op: &str,
229        msg: &[u8],
230    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
231        trace!("Received host call from {}, operation - {}", actor, op);
232
233        match op {
234            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
235            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.remove_actor(deserialize(msg)?),
236            OP_ADD => self.add(actor, deserialize(msg)?),
237            OP_DEL => self.del(actor, deserialize(msg)?),
238            OP_GET => self.get(actor, deserialize(msg)?),
239            OP_CLEAR => self.list_clear(actor, deserialize(msg)?),
240            OP_RANGE => self.list_range(actor, deserialize(msg)?),
241            OP_PUSH => self.list_push(actor, deserialize(msg)?),
242            OP_SET => self.set(actor, deserialize(msg)?),
243            OP_LIST_DEL => self.list_del_item(actor, deserialize(msg)?),
244            OP_SET_ADD => self.set_add(actor, deserialize(msg)?),
245            OP_SET_REMOVE => self.set_remove(actor, deserialize(msg)?),
246            OP_SET_UNION => self.set_union(actor, deserialize(msg)?),
247            OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg)?),
248            OP_SET_QUERY => self.set_query(actor, deserialize(msg)?),
249            OP_KEY_EXISTS => self.exists(actor, deserialize(msg)?),
250            _ => Err("bad dispatch".into()),
251        }
252    }
253
254    /// No cleanup needed
255    fn stop(&self) {}
256}