1#[macro_use]
2extern crate wascc_codec as codec;
3#[macro_use]
4extern crate log;
5
6mod kv;
7use crate::kv::KeyValueStore;
8use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
9use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR};
10use std::error::Error;
11use std::sync::{Arc, RwLock};
12use wascc_codec::{deserialize, serialize};
13use wasmcloud_actor_core::CapabilityConfiguration;
14use wasmcloud_actor_keyvalue::*;
15
16#[cfg(not(feature = "static_plugin"))]
17capability_provider!(KeyvalueProvider, KeyvalueProvider::new);
18
19#[allow(unused)]
20const CAPABILITY_ID: &str = "wasmcloud:keyvalue";
21const SYSTEM_ACTOR: &str = "system";
22
23#[derive(Clone)]
24pub struct KeyvalueProvider {
25 dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
26 store: Arc<RwLock<KeyValueStore>>,
27}
28
29impl Default for KeyvalueProvider {
30 fn default() -> Self {
31 if env_logger::try_init().is_ok() {};
32 KeyvalueProvider {
33 dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
34 store: Arc::new(RwLock::new(KeyValueStore::new())),
35 }
36 }
37}
38
39impl KeyvalueProvider {
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 fn configure(
45 &self,
46 _config: CapabilityConfiguration,
47 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
48 Ok(vec![])
50 }
51
52 fn remove_actor(
53 &self,
54 _config: CapabilityConfiguration,
55 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
56 Ok(vec![])
58 }
59
60 fn add(&self, _actor: &str, req: AddArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
61 let mut store = self.store.write().unwrap();
62 let res: i32 = store.incr(&req.key, req.value)?;
63 let resp = AddResponse { value: res };
64
65 Ok(serialize(resp)?)
66 }
67
68 fn del(&self, _actor: &str, req: DelArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
69 let mut store = self.store.write().unwrap();
70 store.del(&req.key)?;
71 let resp = DelResponse { key: req.key };
72
73 Ok(serialize(resp)?)
74 }
75
76 fn get(&self, _actor: &str, req: GetArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
77 let store = self.store.read().unwrap();
78 if !store.exists(&req.key)? {
79 Ok(serialize(GetResponse {
80 value: String::from(""),
81 exists: false,
82 })?)
83 } else {
84 let v = store.get(&req.key);
85 Ok(serialize(match v {
86 Ok(s) => GetResponse {
87 value: s,
88 exists: true,
89 },
90 Err(e) => {
91 eprint!("GET for {} failed: {}", &req.key, e);
92 GetResponse {
93 value: "".to_string(),
94 exists: false,
95 }
96 }
97 })?)
98 }
99 }
100
101 fn list_clear(
102 &self,
103 actor: &str,
104 req: ClearArgs,
105 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
106 self.del(actor, DelArgs { key: req.key })
107 }
108
109 fn list_range(
110 &self,
111 _actor: &str,
112 req: RangeArgs,
113 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
114 let store = self.store.read().unwrap();
115 let result: Vec<String> = store.lrange(&req.key, req.start as _, req.stop as _)?;
116 Ok(serialize(ListRangeResponse { values: result })?)
117 }
118
119 fn list_push(
120 &self,
121 _actor: &str,
122 req: PushArgs,
123 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
124 let mut store = self.store.write().unwrap();
125 let result: i32 = store.lpush(&req.key, req.value)?;
126 Ok(serialize(ListResponse { new_count: result })?)
127 }
128
129 fn set(&self, _actor: &str, req: SetArgs) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
130 let mut store = self.store.write().unwrap();
131 store.set(&req.key, req.value.clone())?;
132 Ok(serialize(SetResponse { value: req.value })?)
133 }
134
135 fn list_del_item(
136 &self,
137 _actor: &str,
138 req: ListItemDeleteArgs,
139 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
140 let mut store = self.store.write().unwrap();
141 let result: i32 = store.lrem(&req.key, req.value)?;
142 Ok(serialize(ListResponse { new_count: result })?)
143 }
144
145 fn set_add(
146 &self,
147 _actor: &str,
148 req: SetAddArgs,
149 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
150 let mut store = self.store.write().unwrap();
151 let result: i32 = store.sadd(&req.key, req.value)?;
152 Ok(serialize(SetOperationResponse { new_count: result })?)
153 }
154
155 fn set_remove(
156 &self,
157 _actor: &str,
158 req: SetRemoveArgs,
159 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
160 let mut store = self.store.write().unwrap();
161 let result: i32 = store.srem(&req.key, req.value)?;
162 Ok(serialize(SetOperationResponse { new_count: result })?)
163 }
164
165 fn set_union(
166 &self,
167 _actor: &str,
168 req: SetUnionArgs,
169 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
170 let store = self.store.read().unwrap();
171 let result: Vec<String> = store.sunion(req.keys)?;
172 Ok(serialize(SetQueryResponse { values: result })?)
173 }
174
175 fn set_intersect(
176 &self,
177 _actor: &str,
178 req: SetIntersectionArgs,
179 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
180 let store = self.store.read().unwrap();
181 let result: Vec<String> = store.sinter(req.keys)?;
182 Ok(serialize(SetQueryResponse { values: result })?)
183 }
184
185 fn set_query(
186 &self,
187 _actor: &str,
188 req: SetQueryArgs,
189 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
190 let store = self.store.read().unwrap();
191 let result: Vec<String> = store.smembers(req.key)?;
192 Ok(serialize(SetQueryResponse { values: result })?)
193 }
194
195 fn exists(
196 &self,
197 _actor: &str,
198 req: KeyExistsArgs,
199 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
200 let store = self.store.read().unwrap();
201 let result: bool = store.exists(&req.key)?;
202 Ok(serialize(GetResponse {
203 value: "".to_string(),
204 exists: result,
205 })?)
206 }
207}
208
209impl CapabilityProvider for KeyvalueProvider {
210 fn configure_dispatch(
213 &self,
214 dispatcher: Box<dyn Dispatcher>,
215 ) -> Result<(), Box<dyn Error + Send + Sync>> {
216 trace!("Dispatcher received.");
217 let mut lock = self.dispatcher.write().unwrap();
218 *lock = dispatcher;
219
220 Ok(())
221 }
222
223 fn handle_call(
226 &self,
227 actor: &str,
228 op: &str,
229 msg: &[u8],
230 ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
231 trace!("Received host call from {}, operation - {}", actor, op);
232
233 match op {
234 OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
235 OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.remove_actor(deserialize(msg)?),
236 OP_ADD => self.add(actor, deserialize(msg)?),
237 OP_DEL => self.del(actor, deserialize(msg)?),
238 OP_GET => self.get(actor, deserialize(msg)?),
239 OP_CLEAR => self.list_clear(actor, deserialize(msg)?),
240 OP_RANGE => self.list_range(actor, deserialize(msg)?),
241 OP_PUSH => self.list_push(actor, deserialize(msg)?),
242 OP_SET => self.set(actor, deserialize(msg)?),
243 OP_LIST_DEL => self.list_del_item(actor, deserialize(msg)?),
244 OP_SET_ADD => self.set_add(actor, deserialize(msg)?),
245 OP_SET_REMOVE => self.set_remove(actor, deserialize(msg)?),
246 OP_SET_UNION => self.set_union(actor, deserialize(msg)?),
247 OP_SET_INTERSECT => self.set_intersect(actor, deserialize(msg)?),
248 OP_SET_QUERY => self.set_query(actor, deserialize(msg)?),
249 OP_KEY_EXISTS => self.exists(actor, deserialize(msg)?),
250 _ => Err("bad dispatch".into()),
251 }
252 }
253
254 fn stop(&self) {}
256}