atm0s_sdn_key_value/behavior/
sdk.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    sync::{atomic::AtomicU64, Arc},
4};
5
6use async_std::channel::Sender;
7use atm0s_sdn_utils::awaker::Awaker;
8use parking_lot::{Mutex, RwLock};
9
10use crate::{msg::KeyValueSdkEventError, ExternalControl, KeyId, KeySource, KeyValueSdkEvent, KeyVersion, SubKeyId, ValueType};
11
12use super::{hashmap_local::HashmapKeyValueGetError, simple_local::SimpleKeyValueGetError};
13
14mod pub_sub;
15
16pub type SimpleKeyValueSubscriber = pub_sub::Subscriber<u64, (KeyId, Option<ValueType>, KeyVersion, KeySource)>;
17pub type HashmapKeyValueSubscriber = pub_sub::Subscriber<u64, (KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>;
18
19#[derive(Clone)]
20pub struct KeyValueSdk {
21    req_id_gen: Arc<AtomicU64>,
22    uuid_gen: Arc<AtomicU64>,
23    awaker: Arc<RwLock<Option<Arc<dyn Awaker>>>>,
24    simple_publisher: Arc<pub_sub::PublisherManager<u64, (KeyId, Option<ValueType>, KeyVersion, KeySource)>>,
25    hashmap_publisher: Arc<pub_sub::PublisherManager<u64, (KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>>,
26    simple_get_queue: Arc<Mutex<HashMap<u64, Sender<Result<Option<(ValueType, KeyVersion, KeySource)>, SimpleKeyValueGetError>>>>>,
27    hashmap_get_queue: Arc<Mutex<HashMap<u64, Sender<Result<Option<Vec<(SubKeyId, ValueType, KeyVersion, KeySource)>>, HashmapKeyValueGetError>>>>>,
28    actions: Arc<RwLock<VecDeque<crate::KeyValueSdkEvent>>>,
29}
30
31impl KeyValueSdk {
32    pub fn new() -> Self {
33        Self {
34            req_id_gen: Arc::new(AtomicU64::new(0)),
35            uuid_gen: Arc::new(AtomicU64::new(0)),
36            awaker: Arc::new(RwLock::new(None)),
37            simple_publisher: Arc::new(pub_sub::PublisherManager::new()),
38            hashmap_publisher: Arc::new(pub_sub::PublisherManager::new()),
39            actions: Arc::new(RwLock::new(VecDeque::new())),
40            simple_get_queue: Arc::new(Mutex::new(HashMap::new())),
41            hashmap_get_queue: Arc::new(Mutex::new(HashMap::new())),
42        }
43    }
44
45    pub fn set(&self, key: KeyId, value: Vec<u8>, ex: Option<u64>) {
46        self.actions.write().push_back(crate::KeyValueSdkEvent::Set(key, value, ex));
47        self.awaker.read().as_ref().unwrap().notify();
48    }
49
50    pub async fn get(&self, key: KeyId, timeout_ms: u64) -> Result<Option<(ValueType, KeyVersion, KeySource)>, SimpleKeyValueGetError> {
51        let req_id = self.req_id_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
52        self.actions.write().push_back(crate::KeyValueSdkEvent::Get(req_id, key, timeout_ms));
53        self.awaker.read().as_ref().unwrap().notify();
54        let (tx, rx) = async_std::channel::bounded(1);
55        self.simple_get_queue.lock().insert(req_id, tx);
56        rx.recv().await.map_err(|_| SimpleKeyValueGetError::InternalError)?
57    }
58
59    pub fn del(&self, key: KeyId) {
60        self.actions.write().push_back(crate::KeyValueSdkEvent::Del(key));
61        self.awaker.read().as_ref().unwrap().notify();
62    }
63
64    pub fn subscribe(&self, key: KeyId, ex: Option<u64>) -> SimpleKeyValueSubscriber {
65        let sub_uuid = self.uuid_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
66        let actions = self.actions.clone();
67        let awaker = self.awaker.clone();
68        let subscriber = self.simple_publisher.subscribe(
69            key,
70            Box::new(move || {
71                actions.write().push_back(crate::KeyValueSdkEvent::Unsub(sub_uuid, key));
72                awaker.read().as_ref().unwrap().notify();
73            }),
74        );
75
76        self.actions.write().push_back(crate::KeyValueSdkEvent::Sub(sub_uuid, key, ex));
77        self.awaker.read().as_ref().unwrap().notify();
78
79        subscriber
80    }
81
82    pub fn hset(&self, key: KeyId, sub_key: SubKeyId, value: Vec<u8>, ex: Option<u64>) {
83        self.actions.write().push_back(crate::KeyValueSdkEvent::SetH(key, sub_key, value, ex));
84        self.awaker.read().as_ref().unwrap().notify();
85    }
86
87    pub async fn hget(&self, key: KeyId, timeout_ms: u64) -> Result<Option<Vec<(SubKeyId, ValueType, KeyVersion, KeySource)>>, HashmapKeyValueGetError> {
88        let req_id = self.req_id_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
89        self.actions.write().push_back(crate::KeyValueSdkEvent::GetH(req_id, key, timeout_ms));
90        self.awaker.read().as_ref().unwrap().notify();
91        let (tx, rx) = async_std::channel::bounded(1);
92        self.hashmap_get_queue.lock().insert(req_id, tx);
93        rx.recv().await.map_err(|_| HashmapKeyValueGetError::InternalError)?
94    }
95
96    pub fn hdel(&self, key: KeyId, sub_key: SubKeyId) {
97        self.actions.write().push_back(crate::KeyValueSdkEvent::DelH(key, sub_key));
98        self.awaker.read().as_ref().unwrap().notify();
99    }
100
101    pub fn hsubscribe(&self, key: u64, ex: Option<u64>) -> HashmapKeyValueSubscriber {
102        let sub_uuid = self.uuid_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
103        let actions = self.actions.clone();
104        let awaker = self.awaker.clone();
105        let subscriber = self.hashmap_publisher.subscribe(
106            key,
107            Box::new(move || {
108                actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(sub_uuid, key));
109                awaker.read().as_ref().unwrap().notify();
110            }),
111        );
112
113        self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(sub_uuid, key, ex));
114        self.awaker.read().as_ref().unwrap().notify();
115
116        subscriber
117    }
118
119    pub fn hsubscribe_raw(&self, key: u64, uuid: u64, ex: Option<u64>, tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>) {
120        self.hashmap_publisher.sub_raw(key, uuid, tx);
121        self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(uuid, key, ex));
122        self.awaker.read().as_ref().unwrap().notify();
123    }
124
125    pub fn hunsubscribe_raw(&self, key: u64, uuid: u64) {
126        self.hashmap_publisher.unsub_raw(key, uuid);
127        self.actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(uuid, key));
128        self.awaker.read().as_ref().unwrap().notify();
129    }
130}
131
132impl ExternalControl for KeyValueSdk {
133    fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
134        self.awaker.write().replace(awaker);
135    }
136
137    fn on_event(&self, event: KeyValueSdkEvent) {
138        match event {
139            KeyValueSdkEvent::OnKeyChanged(uuid, key, value, version, source) => {
140                self.simple_publisher.publish(Some(uuid), key, (key, value, version, source));
141            }
142            KeyValueSdkEvent::OnKeyHChanged(uuid, key, sub_key, value, version, source) => {
143                self.hashmap_publisher.publish(Some(uuid), key, (key, sub_key, value, version, source));
144            }
145            KeyValueSdkEvent::OnGet(req_id, key, res) => {
146                if let Some(tx) = self.simple_get_queue.lock().remove(&req_id) {
147                    if let Err(e) = tx.try_send(res.map_err(|e| match e {
148                        KeyValueSdkEventError::NetworkError => SimpleKeyValueGetError::NetworkError,
149                        KeyValueSdkEventError::Timeout => SimpleKeyValueGetError::Timeout,
150                        KeyValueSdkEventError::InternalError => SimpleKeyValueGetError::InternalError,
151                    })) {
152                        log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e);
153                    }
154                }
155            }
156            KeyValueSdkEvent::OnGetH(req_id, key, res) => {
157                if let Some(tx) = self.hashmap_get_queue.lock().remove(&req_id) {
158                    if let Err(e) = tx.try_send(res.map_err(|e| match e {
159                        KeyValueSdkEventError::NetworkError => HashmapKeyValueGetError::NetworkError,
160                        KeyValueSdkEventError::Timeout => HashmapKeyValueGetError::Timeout,
161                        KeyValueSdkEventError::InternalError => HashmapKeyValueGetError::InternalError,
162                    })) {
163                        log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e);
164                    }
165                }
166            }
167            _ => {}
168        }
169    }
170
171    fn pop_action(&self) -> Option<KeyValueSdkEvent> {
172        self.actions.write().pop_front()
173    }
174}
175
176#[cfg(test)]
177mod test {
178    use std::{sync::Arc, time::Duration};
179
180    use atm0s_sdn_utils::awaker::{Awaker, MockAwaker};
181
182    use crate::{ExternalControl, KeyValueSdk, KeyValueSdkEvent};
183
184    #[async_std::test]
185    async fn sdk_get_should_fire_awaker_and_action() {
186        let sdk = KeyValueSdk::new();
187        let awaker = Arc::new(MockAwaker::default());
188
189        sdk.set_awaker(awaker.clone());
190
191        async_std::future::timeout(Duration::from_millis(100), sdk.get(1000, 100)).await.expect_err("Should timeout");
192        assert_eq!(awaker.pop_awake_count(), 1);
193        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Get(0, 1000, 100)));
194
195        async_std::future::timeout(Duration::from_millis(100), sdk.hget(1000, 100)).await.expect_err("Should timeout");
196        assert_eq!(awaker.pop_awake_count(), 1);
197        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::GetH(1, 1000, 100)));
198    }
199
200    #[test]
201    fn sdk_set_should_fire_awaker_and_action() {
202        let sdk = KeyValueSdk::new();
203        let awaker = Arc::new(MockAwaker::default());
204
205        sdk.set_awaker(awaker.clone());
206
207        sdk.set(1000, vec![1], Some(20000));
208        assert_eq!(awaker.pop_awake_count(), 1);
209
210        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Set(1000, vec![1], Some(20000))));
211
212        sdk.del(1000);
213        assert_eq!(awaker.pop_awake_count(), 1);
214
215        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Del(1000)))
216    }
217
218    #[test]
219    fn sdk_sub_should_fire_awaker_and_action() {
220        let sdk = KeyValueSdk::new();
221        let awaker = Arc::new(MockAwaker::default());
222
223        sdk.set_awaker(awaker.clone());
224
225        let handler1 = sdk.subscribe(1000, Some(20000));
226        let handler2 = sdk.subscribe(1000, Some(20000));
227
228        assert_eq!(awaker.pop_awake_count(), 2);
229
230        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Sub(0, 1000, Some(20000))));
231        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Sub(1, 1000, Some(20000))));
232        assert_eq!(sdk.pop_action(), None);
233
234        drop(handler1);
235        drop(handler2);
236        assert_eq!(awaker.pop_awake_count(), 2);
237
238        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Unsub(0, 1000)));
239        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Unsub(1, 1000)));
240        assert_eq!(sdk.pop_action(), None);
241    }
242
243    #[test]
244    fn sdk_hset_should_fire_awaker_and_action() {
245        let sdk = KeyValueSdk::new();
246        let awaker = Arc::new(MockAwaker::default());
247
248        sdk.set_awaker(awaker.clone());
249
250        sdk.hset(1000, 11, vec![1], Some(20000));
251        assert_eq!(awaker.pop_awake_count(), 1);
252
253        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SetH(1000, 11, vec![1], Some(20000))));
254        assert_eq!(sdk.pop_action(), None);
255
256        sdk.hdel(1000, 11);
257        assert_eq!(awaker.pop_awake_count(), 1);
258
259        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::DelH(1000, 11)));
260        assert_eq!(sdk.pop_action(), None);
261    }
262
263    #[test]
264    fn sdk_hsub_should_fire_awaker_and_action() {
265        let sdk = KeyValueSdk::new();
266        let awaker = Arc::new(MockAwaker::default());
267
268        sdk.set_awaker(awaker.clone());
269
270        let handler = sdk.hsubscribe(1000, Some(20000));
271        assert_eq!(awaker.pop_awake_count(), 1);
272
273        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SubH(0, 1000, Some(20000))));
274        assert_eq!(sdk.pop_action(), None);
275
276        drop(handler);
277        assert_eq!(awaker.pop_awake_count(), 1);
278
279        assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::UnsubH(0, 1000)));
280        assert_eq!(sdk.pop_action(), None);
281    }
282}