1use std::{
2 collections::{HashMap, VecDeque},
3 sync::{atomic::AtomicU64, Arc},
4};
5
6use async_std::channel::Sender;
7use atm0s_sdn_utils::awaker::Awaker;
8use parking_lot::{Mutex, RwLock};
9
10use crate::{msg::KeyValueSdkEventError, ExternalControl, KeyId, KeySource, KeyValueSdkEvent, KeyVersion, SubKeyId, ValueType};
11
12use super::{hashmap_local::HashmapKeyValueGetError, simple_local::SimpleKeyValueGetError};
13
14mod pub_sub;
15
16pub type SimpleKeyValueSubscriber = pub_sub::Subscriber<u64, (KeyId, Option<ValueType>, KeyVersion, KeySource)>;
17pub type HashmapKeyValueSubscriber = pub_sub::Subscriber<u64, (KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>;
18
19#[derive(Clone)]
20pub struct KeyValueSdk {
21 req_id_gen: Arc<AtomicU64>,
22 uuid_gen: Arc<AtomicU64>,
23 awaker: Arc<RwLock<Option<Arc<dyn Awaker>>>>,
24 simple_publisher: Arc<pub_sub::PublisherManager<u64, (KeyId, Option<ValueType>, KeyVersion, KeySource)>>,
25 hashmap_publisher: Arc<pub_sub::PublisherManager<u64, (KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>>,
26 simple_get_queue: Arc<Mutex<HashMap<u64, Sender<Result<Option<(ValueType, KeyVersion, KeySource)>, SimpleKeyValueGetError>>>>>,
27 hashmap_get_queue: Arc<Mutex<HashMap<u64, Sender<Result<Option<Vec<(SubKeyId, ValueType, KeyVersion, KeySource)>>, HashmapKeyValueGetError>>>>>,
28 actions: Arc<RwLock<VecDeque<crate::KeyValueSdkEvent>>>,
29}
30
31impl KeyValueSdk {
32 pub fn new() -> Self {
33 Self {
34 req_id_gen: Arc::new(AtomicU64::new(0)),
35 uuid_gen: Arc::new(AtomicU64::new(0)),
36 awaker: Arc::new(RwLock::new(None)),
37 simple_publisher: Arc::new(pub_sub::PublisherManager::new()),
38 hashmap_publisher: Arc::new(pub_sub::PublisherManager::new()),
39 actions: Arc::new(RwLock::new(VecDeque::new())),
40 simple_get_queue: Arc::new(Mutex::new(HashMap::new())),
41 hashmap_get_queue: Arc::new(Mutex::new(HashMap::new())),
42 }
43 }
44
45 pub fn set(&self, key: KeyId, value: Vec<u8>, ex: Option<u64>) {
46 self.actions.write().push_back(crate::KeyValueSdkEvent::Set(key, value, ex));
47 self.awaker.read().as_ref().unwrap().notify();
48 }
49
50 pub async fn get(&self, key: KeyId, timeout_ms: u64) -> Result<Option<(ValueType, KeyVersion, KeySource)>, SimpleKeyValueGetError> {
51 let req_id = self.req_id_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
52 self.actions.write().push_back(crate::KeyValueSdkEvent::Get(req_id, key, timeout_ms));
53 self.awaker.read().as_ref().unwrap().notify();
54 let (tx, rx) = async_std::channel::bounded(1);
55 self.simple_get_queue.lock().insert(req_id, tx);
56 rx.recv().await.map_err(|_| SimpleKeyValueGetError::InternalError)?
57 }
58
59 pub fn del(&self, key: KeyId) {
60 self.actions.write().push_back(crate::KeyValueSdkEvent::Del(key));
61 self.awaker.read().as_ref().unwrap().notify();
62 }
63
64 pub fn subscribe(&self, key: KeyId, ex: Option<u64>) -> SimpleKeyValueSubscriber {
65 let sub_uuid = self.uuid_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
66 let actions = self.actions.clone();
67 let awaker = self.awaker.clone();
68 let subscriber = self.simple_publisher.subscribe(
69 key,
70 Box::new(move || {
71 actions.write().push_back(crate::KeyValueSdkEvent::Unsub(sub_uuid, key));
72 awaker.read().as_ref().unwrap().notify();
73 }),
74 );
75
76 self.actions.write().push_back(crate::KeyValueSdkEvent::Sub(sub_uuid, key, ex));
77 self.awaker.read().as_ref().unwrap().notify();
78
79 subscriber
80 }
81
82 pub fn hset(&self, key: KeyId, sub_key: SubKeyId, value: Vec<u8>, ex: Option<u64>) {
83 self.actions.write().push_back(crate::KeyValueSdkEvent::SetH(key, sub_key, value, ex));
84 self.awaker.read().as_ref().unwrap().notify();
85 }
86
87 pub async fn hget(&self, key: KeyId, timeout_ms: u64) -> Result<Option<Vec<(SubKeyId, ValueType, KeyVersion, KeySource)>>, HashmapKeyValueGetError> {
88 let req_id = self.req_id_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
89 self.actions.write().push_back(crate::KeyValueSdkEvent::GetH(req_id, key, timeout_ms));
90 self.awaker.read().as_ref().unwrap().notify();
91 let (tx, rx) = async_std::channel::bounded(1);
92 self.hashmap_get_queue.lock().insert(req_id, tx);
93 rx.recv().await.map_err(|_| HashmapKeyValueGetError::InternalError)?
94 }
95
96 pub fn hdel(&self, key: KeyId, sub_key: SubKeyId) {
97 self.actions.write().push_back(crate::KeyValueSdkEvent::DelH(key, sub_key));
98 self.awaker.read().as_ref().unwrap().notify();
99 }
100
101 pub fn hsubscribe(&self, key: u64, ex: Option<u64>) -> HashmapKeyValueSubscriber {
102 let sub_uuid = self.uuid_gen.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
103 let actions = self.actions.clone();
104 let awaker = self.awaker.clone();
105 let subscriber = self.hashmap_publisher.subscribe(
106 key,
107 Box::new(move || {
108 actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(sub_uuid, key));
109 awaker.read().as_ref().unwrap().notify();
110 }),
111 );
112
113 self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(sub_uuid, key, ex));
114 self.awaker.read().as_ref().unwrap().notify();
115
116 subscriber
117 }
118
119 pub fn hsubscribe_raw(&self, key: u64, uuid: u64, ex: Option<u64>, tx: Sender<(KeyId, SubKeyId, Option<ValueType>, KeyVersion, KeySource)>) {
120 self.hashmap_publisher.sub_raw(key, uuid, tx);
121 self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(uuid, key, ex));
122 self.awaker.read().as_ref().unwrap().notify();
123 }
124
125 pub fn hunsubscribe_raw(&self, key: u64, uuid: u64) {
126 self.hashmap_publisher.unsub_raw(key, uuid);
127 self.actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(uuid, key));
128 self.awaker.read().as_ref().unwrap().notify();
129 }
130}
131
132impl ExternalControl for KeyValueSdk {
133 fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
134 self.awaker.write().replace(awaker);
135 }
136
137 fn on_event(&self, event: KeyValueSdkEvent) {
138 match event {
139 KeyValueSdkEvent::OnKeyChanged(uuid, key, value, version, source) => {
140 self.simple_publisher.publish(Some(uuid), key, (key, value, version, source));
141 }
142 KeyValueSdkEvent::OnKeyHChanged(uuid, key, sub_key, value, version, source) => {
143 self.hashmap_publisher.publish(Some(uuid), key, (key, sub_key, value, version, source));
144 }
145 KeyValueSdkEvent::OnGet(req_id, key, res) => {
146 if let Some(tx) = self.simple_get_queue.lock().remove(&req_id) {
147 if let Err(e) = tx.try_send(res.map_err(|e| match e {
148 KeyValueSdkEventError::NetworkError => SimpleKeyValueGetError::NetworkError,
149 KeyValueSdkEventError::Timeout => SimpleKeyValueGetError::Timeout,
150 KeyValueSdkEventError::InternalError => SimpleKeyValueGetError::InternalError,
151 })) {
152 log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e);
153 }
154 }
155 }
156 KeyValueSdkEvent::OnGetH(req_id, key, res) => {
157 if let Some(tx) = self.hashmap_get_queue.lock().remove(&req_id) {
158 if let Err(e) = tx.try_send(res.map_err(|e| match e {
159 KeyValueSdkEventError::NetworkError => HashmapKeyValueGetError::NetworkError,
160 KeyValueSdkEventError::Timeout => HashmapKeyValueGetError::Timeout,
161 KeyValueSdkEventError::InternalError => HashmapKeyValueGetError::InternalError,
162 })) {
163 log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e);
164 }
165 }
166 }
167 _ => {}
168 }
169 }
170
171 fn pop_action(&self) -> Option<KeyValueSdkEvent> {
172 self.actions.write().pop_front()
173 }
174}
175
176#[cfg(test)]
177mod test {
178 use std::{sync::Arc, time::Duration};
179
180 use atm0s_sdn_utils::awaker::{Awaker, MockAwaker};
181
182 use crate::{ExternalControl, KeyValueSdk, KeyValueSdkEvent};
183
184 #[async_std::test]
185 async fn sdk_get_should_fire_awaker_and_action() {
186 let sdk = KeyValueSdk::new();
187 let awaker = Arc::new(MockAwaker::default());
188
189 sdk.set_awaker(awaker.clone());
190
191 async_std::future::timeout(Duration::from_millis(100), sdk.get(1000, 100)).await.expect_err("Should timeout");
192 assert_eq!(awaker.pop_awake_count(), 1);
193 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Get(0, 1000, 100)));
194
195 async_std::future::timeout(Duration::from_millis(100), sdk.hget(1000, 100)).await.expect_err("Should timeout");
196 assert_eq!(awaker.pop_awake_count(), 1);
197 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::GetH(1, 1000, 100)));
198 }
199
200 #[test]
201 fn sdk_set_should_fire_awaker_and_action() {
202 let sdk = KeyValueSdk::new();
203 let awaker = Arc::new(MockAwaker::default());
204
205 sdk.set_awaker(awaker.clone());
206
207 sdk.set(1000, vec![1], Some(20000));
208 assert_eq!(awaker.pop_awake_count(), 1);
209
210 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Set(1000, vec![1], Some(20000))));
211
212 sdk.del(1000);
213 assert_eq!(awaker.pop_awake_count(), 1);
214
215 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Del(1000)))
216 }
217
218 #[test]
219 fn sdk_sub_should_fire_awaker_and_action() {
220 let sdk = KeyValueSdk::new();
221 let awaker = Arc::new(MockAwaker::default());
222
223 sdk.set_awaker(awaker.clone());
224
225 let handler1 = sdk.subscribe(1000, Some(20000));
226 let handler2 = sdk.subscribe(1000, Some(20000));
227
228 assert_eq!(awaker.pop_awake_count(), 2);
229
230 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Sub(0, 1000, Some(20000))));
231 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Sub(1, 1000, Some(20000))));
232 assert_eq!(sdk.pop_action(), None);
233
234 drop(handler1);
235 drop(handler2);
236 assert_eq!(awaker.pop_awake_count(), 2);
237
238 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Unsub(0, 1000)));
239 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Unsub(1, 1000)));
240 assert_eq!(sdk.pop_action(), None);
241 }
242
243 #[test]
244 fn sdk_hset_should_fire_awaker_and_action() {
245 let sdk = KeyValueSdk::new();
246 let awaker = Arc::new(MockAwaker::default());
247
248 sdk.set_awaker(awaker.clone());
249
250 sdk.hset(1000, 11, vec![1], Some(20000));
251 assert_eq!(awaker.pop_awake_count(), 1);
252
253 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SetH(1000, 11, vec![1], Some(20000))));
254 assert_eq!(sdk.pop_action(), None);
255
256 sdk.hdel(1000, 11);
257 assert_eq!(awaker.pop_awake_count(), 1);
258
259 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::DelH(1000, 11)));
260 assert_eq!(sdk.pop_action(), None);
261 }
262
263 #[test]
264 fn sdk_hsub_should_fire_awaker_and_action() {
265 let sdk = KeyValueSdk::new();
266 let awaker = Arc::new(MockAwaker::default());
267
268 sdk.set_awaker(awaker.clone());
269
270 let handler = sdk.hsubscribe(1000, Some(20000));
271 assert_eq!(awaker.pop_awake_count(), 1);
272
273 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SubH(0, 1000, Some(20000))));
274 assert_eq!(sdk.pop_action(), None);
275
276 drop(handler);
277 assert_eq!(awaker.pop_awake_count(), 1);
278
279 assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::UnsubH(0, 1000)));
280 assert_eq!(sdk.pop_action(), None);
281 }
282}