atm0s_sdn_key_value/behavior/
simple_remote.rs

1/// This remote storage is a simple key value storage, it will store all key value in memory, and send event to other node when key value changed
2/// Each event is attached with a req_id and wait for ack, if ack not receive, it will resend the event each tick util ack received or tick_count is 0
3use crate::storage::simple::{OutputEvent, SimpleKeyValue};
4use crate::{
5    msg::{SimpleLocalEvent, SimpleRemoteEvent},
6    KeyId, ValueType,
7};
8use atm0s_sdn_identity::NodeId;
9use atm0s_sdn_router::RouteRule;
10use std::collections::VecDeque;
11
12use super::event_acks::EventAckManager;
13
14const RETRY_COUNT: u8 = 5;
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub struct RemoteStorageAction(pub(crate) SimpleLocalEvent, pub(crate) RouteRule);
18
19pub struct SimpleRemoteStorage {
20    req_id_seed: u64,
21    storage: SimpleKeyValue<KeyId, ValueType, NodeId, NodeId>,
22    event_acks: EventAckManager<RemoteStorageAction>,
23    output_events: VecDeque<RemoteStorageAction>,
24}
25
26impl SimpleRemoteStorage {
27    pub fn new() -> Self {
28        Self {
29            req_id_seed: 0,
30            storage: SimpleKeyValue::new(),
31            event_acks: EventAckManager::new(),
32            output_events: VecDeque::new(),
33        }
34    }
35
36    pub fn tick(&mut self, now_ms: u64) {
37        self.storage.tick(now_ms);
38        self.event_acks.tick(now_ms);
39    }
40
41    pub fn on_event(&mut self, now_ms: u64, from: NodeId, event: SimpleRemoteEvent) {
42        match event {
43            SimpleRemoteEvent::Set(req_id, key, value, version, ex) => {
44                let setted = self.storage.set(now_ms, key, value, version, from, ex);
45                if setted {
46                    log::info!("[SimpleRemote] receive set event from {} key {} version {} ex {:?}", from, key, version, ex);
47                }
48                self.output_events
49                    .push_back(RemoteStorageAction(SimpleLocalEvent::SetAck(req_id, key, version, setted), RouteRule::ToNode(from)));
50            }
51            SimpleRemoteEvent::Get(req_id, key) => {
52                if let Some((value, version, source)) = self.storage.get(&key) {
53                    log::debug!("[SimpleRemote] receive get event from {} key {} value {:?} version {}", from, key, value, version);
54                    self.output_events.push_back(RemoteStorageAction(
55                        SimpleLocalEvent::GetAck(req_id, key, Some((value.clone(), version, source))),
56                        RouteRule::ToNode(from),
57                    ));
58                } else {
59                    log::debug!("[SimpleRemote] receive get event from {} key {} value None", from, key);
60                    self.output_events.push_back(RemoteStorageAction(SimpleLocalEvent::GetAck(req_id, key, None), RouteRule::ToNode(from)));
61                }
62            }
63            SimpleRemoteEvent::Del(req_id, key, req_version) => {
64                let version = self.storage.del(&key, req_version).map(|(_, version, _)| version);
65                if version.is_some() {
66                    log::info!("[SimpleRemote] receive del event from {} key {} version {:?}", from, key, req_version);
67                }
68                self.output_events
69                    .push_back(RemoteStorageAction(SimpleLocalEvent::DelAck(req_id, key, version), RouteRule::ToNode(from)));
70            }
71            SimpleRemoteEvent::Sub(req_id, key, ex) => {
72                if self.storage.subscribe(now_ms, &key, from, ex) {
73                    log::info!("[SimpleRemote] receive sub event from {} key {} ex {:?}", from, key, ex);
74                }
75                self.output_events.push_back(RemoteStorageAction(SimpleLocalEvent::SubAck(req_id, key), RouteRule::ToNode(from)));
76            }
77            SimpleRemoteEvent::Unsub(req_id, key) => {
78                let success = self.storage.unsubscribe(&key, &from);
79                if success {
80                    log::info!("[SimpleRemote] receive unsub event from {} key {}", from, key);
81                }
82                self.output_events
83                    .push_back(RemoteStorageAction(SimpleLocalEvent::UnsubAck(req_id, key, success), RouteRule::ToNode(from)));
84            }
85            SimpleRemoteEvent::OnKeySetAck(req_id) => {
86                log::debug!("[SimpleRemote] receive on_key_set_ack event from {}, req_id {}", from, req_id);
87                self.event_acks.on_ack(req_id);
88            }
89            SimpleRemoteEvent::OnKeyDelAck(req_id) => {
90                log::debug!("[SimpleRemote] receive on_key_del_ack event from {}, req_id {}", from, req_id);
91                self.event_acks.on_ack(req_id);
92            }
93        }
94    }
95
96    pub fn pop_action(&mut self, now_ms: u64) -> Option<RemoteStorageAction> {
97        //first pop from output_events, if not exits then pop from event_acks
98        if let Some(e) = self.output_events.pop_front() {
99            log::debug!("[SimpleRemote] pop action from output_events: {:?}", e);
100            Some(e)
101        } else {
102            if let Some(event) = self.storage.poll() {
103                let req_id = self.req_id_seed;
104                self.req_id_seed += 1;
105                let event = match event {
106                    OutputEvent::NotifySet(key, value, version, source, handler) => RemoteStorageAction(SimpleLocalEvent::OnKeySet(req_id, key, value, version, source), RouteRule::ToNode(handler)),
107                    OutputEvent::NotifyDel(key, _value, version, source, handler) => RemoteStorageAction(SimpleLocalEvent::OnKeyDel(req_id, key, version, source), RouteRule::ToNode(handler)),
108                };
109                log::debug!("[SimpleRemote] pop action from event_acks: {:?}, req_id {}", event, req_id);
110                self.event_acks.add_event(now_ms, req_id, event, RETRY_COUNT);
111            }
112            self.event_acks.pop_action()
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::RemoteStorageAction;
120    use crate::{
121        behavior::event_acks::RESEND_AFTER_MS,
122        msg::{SimpleLocalEvent, SimpleRemoteEvent},
123    };
124    use atm0s_sdn_router::RouteRule;
125
126    #[test]
127    fn receive_set_dersiered_send_ack() {
128        let mut remote_storage = super::SimpleRemoteStorage::new();
129
130        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
131        assert_eq!(
132            remote_storage.pop_action(0),
133            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
134        );
135        assert_eq!(remote_storage.pop_action(0), None);
136    }
137
138    #[test]
139    fn receive_set_ex() {
140        let mut remote_storage = super::SimpleRemoteStorage::new();
141
142        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, Some(1000)));
143        assert_eq!(
144            remote_storage.pop_action(0),
145            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
146        );
147        assert_eq!(remote_storage.pop_action(0), None);
148
149        assert_eq!(remote_storage.storage.len(), 1);
150        remote_storage.tick(500);
151        assert_eq!(remote_storage.storage.len(), 1);
152
153        remote_storage.tick(1000);
154        assert_eq!(remote_storage.storage.len(), 0);
155    }
156
157    #[test]
158    fn receive_set_with_wrong_version() {
159        let mut remote_storage = super::SimpleRemoteStorage::new();
160
161        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
162        assert_eq!(
163            remote_storage.pop_action(0),
164            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
165        );
166        assert_eq!(remote_storage.pop_action(0), None);
167
168        // receive a older version will be rejected
169        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 5, None));
170        assert_eq!(
171            remote_storage.pop_action(0),
172            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 5, false), RouteRule::ToNode(1000)))
173        );
174        assert_eq!(remote_storage.pop_action(0), None);
175    }
176
177    #[test]
178    fn receive_del_dersiered_send_ack() {
179        let mut remote_storage = super::SimpleRemoteStorage::new();
180
181        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
182        assert_eq!(
183            remote_storage.pop_action(0),
184            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
185        );
186        assert_eq!(remote_storage.pop_action(0), None);
187
188        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 0));
189        assert_eq!(
190            remote_storage.pop_action(0),
191            Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, Some(0)), RouteRule::ToNode(1000)))
192        );
193        assert_eq!(remote_storage.pop_action(0), None);
194    }
195
196    #[test]
197    fn receive_del_older_version() {
198        let mut remote_storage = super::SimpleRemoteStorage::new();
199
200        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
201        assert_eq!(
202            remote_storage.pop_action(0),
203            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
204        );
205        assert_eq!(remote_storage.pop_action(0), None);
206
207        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 5));
208        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, None), RouteRule::ToNode(1000))));
209        assert_eq!(remote_storage.pop_action(0), None);
210    }
211
212    #[test]
213    fn receive_del_newer_version() {
214        let mut remote_storage = super::SimpleRemoteStorage::new();
215
216        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
217        assert_eq!(
218            remote_storage.pop_action(0),
219            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
220        );
221        assert_eq!(remote_storage.pop_action(0), None);
222
223        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 100));
224        assert_eq!(
225            remote_storage.pop_action(0),
226            Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, Some(0)), RouteRule::ToNode(1000)))
227        );
228        assert_eq!(remote_storage.pop_action(0), None);
229    }
230
231    #[test]
232    fn receive_get_dersiered_send_ack() {
233        let mut remote_storage = super::SimpleRemoteStorage::new();
234
235        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
236        assert_eq!(
237            remote_storage.pop_action(0),
238            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
239        );
240        assert_eq!(remote_storage.pop_action(0), None);
241
242        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Get(2, 1));
243        assert_eq!(
244            remote_storage.pop_action(0),
245            Some(RemoteStorageAction(SimpleLocalEvent::GetAck(2, 1, Some((vec![1], 10, 1000))), RouteRule::ToNode(1001)))
246        );
247        assert_eq!(remote_storage.pop_action(0), None);
248
249        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Get(3, 2));
250        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::GetAck(3, 2, None), RouteRule::ToNode(1001))));
251        assert_eq!(remote_storage.pop_action(0), None);
252    }
253
254    #[test]
255    fn receive_sub_dersiered_send_ack() {
256        let mut remote_storage = super::SimpleRemoteStorage::new();
257
258        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
259        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
260        assert_eq!(remote_storage.pop_action(0), None);
261
262        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
263        assert_eq!(
264            remote_storage.pop_action(0),
265            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
266        );
267        assert_eq!(
268            remote_storage.pop_action(0),
269            Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
270        );
271        assert_eq!(remote_storage.pop_action(0), None);
272    }
273
274    #[test]
275    fn receive_sub_after_set_dersiered_send_ack() {
276        let mut remote_storage = super::SimpleRemoteStorage::new();
277
278        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
279        assert_eq!(
280            remote_storage.pop_action(0),
281            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
282        );
283        assert_eq!(remote_storage.pop_action(0), None);
284
285        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
286        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
287        assert_eq!(
288            remote_storage.pop_action(0),
289            Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
290        );
291        assert_eq!(remote_storage.pop_action(0), None);
292    }
293
294    #[test]
295    fn receive_unsub_dersiered_send_ack() {
296        let mut remote_storage = super::SimpleRemoteStorage::new();
297
298        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
299        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
300        assert_eq!(remote_storage.pop_action(0), None);
301
302        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Unsub(2, 1));
303        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::UnsubAck(2, 1, true), RouteRule::ToNode(1000))));
304        assert_eq!(remote_storage.pop_action(0), None);
305
306        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(3, 1, vec![1], 100, None));
307        assert_eq!(
308            remote_storage.pop_action(0),
309            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(3, 1, 100, true), RouteRule::ToNode(1001)))
310        );
311        assert_eq!(remote_storage.pop_action(0), None);
312    }
313
314    #[test]
315    fn receive_unsub_wrong_key() {
316        let mut remote_storage = super::SimpleRemoteStorage::new();
317
318        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Unsub(2, 1));
319        assert_eq!(
320            remote_storage.pop_action(0),
321            Some(RemoteStorageAction(SimpleLocalEvent::UnsubAck(2, 1, false), RouteRule::ToNode(1000)))
322        );
323        assert_eq!(remote_storage.pop_action(0), None);
324    }
325
326    #[test]
327    fn key_changed_event_with_ack() {
328        let mut remote_storage = super::SimpleRemoteStorage::new();
329
330        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
331        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
332        assert_eq!(remote_storage.pop_action(0), None);
333
334        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
335        assert_eq!(
336            remote_storage.pop_action(0),
337            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
338        );
339        assert_eq!(
340            remote_storage.pop_action(0),
341            Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
342        );
343        assert_eq!(remote_storage.pop_action(0), None);
344
345        remote_storage.on_event(0, 1000, SimpleRemoteEvent::OnKeySetAck(0));
346        remote_storage.tick(0);
347        assert_eq!(remote_storage.pop_action(0), None);
348    }
349
350    #[test]
351    fn key_changed_event_without_ack_should_resend() {
352        let mut remote_storage = super::SimpleRemoteStorage::new();
353
354        remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
355        assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
356        assert_eq!(remote_storage.pop_action(0), None);
357
358        remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
359        assert_eq!(
360            remote_storage.pop_action(0),
361            Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
362        );
363        assert_eq!(
364            remote_storage.pop_action(0),
365            Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
366        );
367        assert_eq!(remote_storage.pop_action(0), None);
368
369        remote_storage.tick(RESEND_AFTER_MS);
370        // need resend each tick
371        assert_eq!(
372            remote_storage.pop_action(RESEND_AFTER_MS),
373            Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
374        );
375        assert_eq!(remote_storage.pop_action(RESEND_AFTER_MS), None);
376    }
377}