1use crate::storage::simple::{OutputEvent, SimpleKeyValue};
4use crate::{
5 msg::{SimpleLocalEvent, SimpleRemoteEvent},
6 KeyId, ValueType,
7};
8use atm0s_sdn_identity::NodeId;
9use atm0s_sdn_router::RouteRule;
10use std::collections::VecDeque;
11
12use super::event_acks::EventAckManager;
13
14const RETRY_COUNT: u8 = 5;
15
16#[derive(Clone, Debug, PartialEq, Eq)]
17pub struct RemoteStorageAction(pub(crate) SimpleLocalEvent, pub(crate) RouteRule);
18
19pub struct SimpleRemoteStorage {
20 req_id_seed: u64,
21 storage: SimpleKeyValue<KeyId, ValueType, NodeId, NodeId>,
22 event_acks: EventAckManager<RemoteStorageAction>,
23 output_events: VecDeque<RemoteStorageAction>,
24}
25
26impl SimpleRemoteStorage {
27 pub fn new() -> Self {
28 Self {
29 req_id_seed: 0,
30 storage: SimpleKeyValue::new(),
31 event_acks: EventAckManager::new(),
32 output_events: VecDeque::new(),
33 }
34 }
35
36 pub fn tick(&mut self, now_ms: u64) {
37 self.storage.tick(now_ms);
38 self.event_acks.tick(now_ms);
39 }
40
41 pub fn on_event(&mut self, now_ms: u64, from: NodeId, event: SimpleRemoteEvent) {
42 match event {
43 SimpleRemoteEvent::Set(req_id, key, value, version, ex) => {
44 let setted = self.storage.set(now_ms, key, value, version, from, ex);
45 if setted {
46 log::info!("[SimpleRemote] receive set event from {} key {} version {} ex {:?}", from, key, version, ex);
47 }
48 self.output_events
49 .push_back(RemoteStorageAction(SimpleLocalEvent::SetAck(req_id, key, version, setted), RouteRule::ToNode(from)));
50 }
51 SimpleRemoteEvent::Get(req_id, key) => {
52 if let Some((value, version, source)) = self.storage.get(&key) {
53 log::debug!("[SimpleRemote] receive get event from {} key {} value {:?} version {}", from, key, value, version);
54 self.output_events.push_back(RemoteStorageAction(
55 SimpleLocalEvent::GetAck(req_id, key, Some((value.clone(), version, source))),
56 RouteRule::ToNode(from),
57 ));
58 } else {
59 log::debug!("[SimpleRemote] receive get event from {} key {} value None", from, key);
60 self.output_events.push_back(RemoteStorageAction(SimpleLocalEvent::GetAck(req_id, key, None), RouteRule::ToNode(from)));
61 }
62 }
63 SimpleRemoteEvent::Del(req_id, key, req_version) => {
64 let version = self.storage.del(&key, req_version).map(|(_, version, _)| version);
65 if version.is_some() {
66 log::info!("[SimpleRemote] receive del event from {} key {} version {:?}", from, key, req_version);
67 }
68 self.output_events
69 .push_back(RemoteStorageAction(SimpleLocalEvent::DelAck(req_id, key, version), RouteRule::ToNode(from)));
70 }
71 SimpleRemoteEvent::Sub(req_id, key, ex) => {
72 if self.storage.subscribe(now_ms, &key, from, ex) {
73 log::info!("[SimpleRemote] receive sub event from {} key {} ex {:?}", from, key, ex);
74 }
75 self.output_events.push_back(RemoteStorageAction(SimpleLocalEvent::SubAck(req_id, key), RouteRule::ToNode(from)));
76 }
77 SimpleRemoteEvent::Unsub(req_id, key) => {
78 let success = self.storage.unsubscribe(&key, &from);
79 if success {
80 log::info!("[SimpleRemote] receive unsub event from {} key {}", from, key);
81 }
82 self.output_events
83 .push_back(RemoteStorageAction(SimpleLocalEvent::UnsubAck(req_id, key, success), RouteRule::ToNode(from)));
84 }
85 SimpleRemoteEvent::OnKeySetAck(req_id) => {
86 log::debug!("[SimpleRemote] receive on_key_set_ack event from {}, req_id {}", from, req_id);
87 self.event_acks.on_ack(req_id);
88 }
89 SimpleRemoteEvent::OnKeyDelAck(req_id) => {
90 log::debug!("[SimpleRemote] receive on_key_del_ack event from {}, req_id {}", from, req_id);
91 self.event_acks.on_ack(req_id);
92 }
93 }
94 }
95
96 pub fn pop_action(&mut self, now_ms: u64) -> Option<RemoteStorageAction> {
97 if let Some(e) = self.output_events.pop_front() {
99 log::debug!("[SimpleRemote] pop action from output_events: {:?}", e);
100 Some(e)
101 } else {
102 if let Some(event) = self.storage.poll() {
103 let req_id = self.req_id_seed;
104 self.req_id_seed += 1;
105 let event = match event {
106 OutputEvent::NotifySet(key, value, version, source, handler) => RemoteStorageAction(SimpleLocalEvent::OnKeySet(req_id, key, value, version, source), RouteRule::ToNode(handler)),
107 OutputEvent::NotifyDel(key, _value, version, source, handler) => RemoteStorageAction(SimpleLocalEvent::OnKeyDel(req_id, key, version, source), RouteRule::ToNode(handler)),
108 };
109 log::debug!("[SimpleRemote] pop action from event_acks: {:?}, req_id {}", event, req_id);
110 self.event_acks.add_event(now_ms, req_id, event, RETRY_COUNT);
111 }
112 self.event_acks.pop_action()
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::RemoteStorageAction;
120 use crate::{
121 behavior::event_acks::RESEND_AFTER_MS,
122 msg::{SimpleLocalEvent, SimpleRemoteEvent},
123 };
124 use atm0s_sdn_router::RouteRule;
125
126 #[test]
127 fn receive_set_dersiered_send_ack() {
128 let mut remote_storage = super::SimpleRemoteStorage::new();
129
130 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
131 assert_eq!(
132 remote_storage.pop_action(0),
133 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
134 );
135 assert_eq!(remote_storage.pop_action(0), None);
136 }
137
138 #[test]
139 fn receive_set_ex() {
140 let mut remote_storage = super::SimpleRemoteStorage::new();
141
142 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, Some(1000)));
143 assert_eq!(
144 remote_storage.pop_action(0),
145 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
146 );
147 assert_eq!(remote_storage.pop_action(0), None);
148
149 assert_eq!(remote_storage.storage.len(), 1);
150 remote_storage.tick(500);
151 assert_eq!(remote_storage.storage.len(), 1);
152
153 remote_storage.tick(1000);
154 assert_eq!(remote_storage.storage.len(), 0);
155 }
156
157 #[test]
158 fn receive_set_with_wrong_version() {
159 let mut remote_storage = super::SimpleRemoteStorage::new();
160
161 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
162 assert_eq!(
163 remote_storage.pop_action(0),
164 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
165 );
166 assert_eq!(remote_storage.pop_action(0), None);
167
168 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 5, None));
170 assert_eq!(
171 remote_storage.pop_action(0),
172 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 5, false), RouteRule::ToNode(1000)))
173 );
174 assert_eq!(remote_storage.pop_action(0), None);
175 }
176
177 #[test]
178 fn receive_del_dersiered_send_ack() {
179 let mut remote_storage = super::SimpleRemoteStorage::new();
180
181 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
182 assert_eq!(
183 remote_storage.pop_action(0),
184 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
185 );
186 assert_eq!(remote_storage.pop_action(0), None);
187
188 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 0));
189 assert_eq!(
190 remote_storage.pop_action(0),
191 Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, Some(0)), RouteRule::ToNode(1000)))
192 );
193 assert_eq!(remote_storage.pop_action(0), None);
194 }
195
196 #[test]
197 fn receive_del_older_version() {
198 let mut remote_storage = super::SimpleRemoteStorage::new();
199
200 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
201 assert_eq!(
202 remote_storage.pop_action(0),
203 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
204 );
205 assert_eq!(remote_storage.pop_action(0), None);
206
207 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 5));
208 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, None), RouteRule::ToNode(1000))));
209 assert_eq!(remote_storage.pop_action(0), None);
210 }
211
212 #[test]
213 fn receive_del_newer_version() {
214 let mut remote_storage = super::SimpleRemoteStorage::new();
215
216 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 0, None));
217 assert_eq!(
218 remote_storage.pop_action(0),
219 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 0, true), RouteRule::ToNode(1000)))
220 );
221 assert_eq!(remote_storage.pop_action(0), None);
222
223 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Del(2, 1, 100));
224 assert_eq!(
225 remote_storage.pop_action(0),
226 Some(RemoteStorageAction(SimpleLocalEvent::DelAck(2, 1, Some(0)), RouteRule::ToNode(1000)))
227 );
228 assert_eq!(remote_storage.pop_action(0), None);
229 }
230
231 #[test]
232 fn receive_get_dersiered_send_ack() {
233 let mut remote_storage = super::SimpleRemoteStorage::new();
234
235 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Set(1, 1, vec![1], 10, None));
236 assert_eq!(
237 remote_storage.pop_action(0),
238 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(1, 1, 10, true), RouteRule::ToNode(1000)))
239 );
240 assert_eq!(remote_storage.pop_action(0), None);
241
242 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Get(2, 1));
243 assert_eq!(
244 remote_storage.pop_action(0),
245 Some(RemoteStorageAction(SimpleLocalEvent::GetAck(2, 1, Some((vec![1], 10, 1000))), RouteRule::ToNode(1001)))
246 );
247 assert_eq!(remote_storage.pop_action(0), None);
248
249 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Get(3, 2));
250 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::GetAck(3, 2, None), RouteRule::ToNode(1001))));
251 assert_eq!(remote_storage.pop_action(0), None);
252 }
253
254 #[test]
255 fn receive_sub_dersiered_send_ack() {
256 let mut remote_storage = super::SimpleRemoteStorage::new();
257
258 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
259 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
260 assert_eq!(remote_storage.pop_action(0), None);
261
262 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
263 assert_eq!(
264 remote_storage.pop_action(0),
265 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
266 );
267 assert_eq!(
268 remote_storage.pop_action(0),
269 Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
270 );
271 assert_eq!(remote_storage.pop_action(0), None);
272 }
273
274 #[test]
275 fn receive_sub_after_set_dersiered_send_ack() {
276 let mut remote_storage = super::SimpleRemoteStorage::new();
277
278 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
279 assert_eq!(
280 remote_storage.pop_action(0),
281 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
282 );
283 assert_eq!(remote_storage.pop_action(0), None);
284
285 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
286 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
287 assert_eq!(
288 remote_storage.pop_action(0),
289 Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
290 );
291 assert_eq!(remote_storage.pop_action(0), None);
292 }
293
294 #[test]
295 fn receive_unsub_dersiered_send_ack() {
296 let mut remote_storage = super::SimpleRemoteStorage::new();
297
298 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
299 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
300 assert_eq!(remote_storage.pop_action(0), None);
301
302 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Unsub(2, 1));
303 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::UnsubAck(2, 1, true), RouteRule::ToNode(1000))));
304 assert_eq!(remote_storage.pop_action(0), None);
305
306 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(3, 1, vec![1], 100, None));
307 assert_eq!(
308 remote_storage.pop_action(0),
309 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(3, 1, 100, true), RouteRule::ToNode(1001)))
310 );
311 assert_eq!(remote_storage.pop_action(0), None);
312 }
313
314 #[test]
315 fn receive_unsub_wrong_key() {
316 let mut remote_storage = super::SimpleRemoteStorage::new();
317
318 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Unsub(2, 1));
319 assert_eq!(
320 remote_storage.pop_action(0),
321 Some(RemoteStorageAction(SimpleLocalEvent::UnsubAck(2, 1, false), RouteRule::ToNode(1000)))
322 );
323 assert_eq!(remote_storage.pop_action(0), None);
324 }
325
326 #[test]
327 fn key_changed_event_with_ack() {
328 let mut remote_storage = super::SimpleRemoteStorage::new();
329
330 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
331 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
332 assert_eq!(remote_storage.pop_action(0), None);
333
334 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
335 assert_eq!(
336 remote_storage.pop_action(0),
337 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
338 );
339 assert_eq!(
340 remote_storage.pop_action(0),
341 Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
342 );
343 assert_eq!(remote_storage.pop_action(0), None);
344
345 remote_storage.on_event(0, 1000, SimpleRemoteEvent::OnKeySetAck(0));
346 remote_storage.tick(0);
347 assert_eq!(remote_storage.pop_action(0), None);
348 }
349
350 #[test]
351 fn key_changed_event_without_ack_should_resend() {
352 let mut remote_storage = super::SimpleRemoteStorage::new();
353
354 remote_storage.on_event(0, 1000, SimpleRemoteEvent::Sub(1, 1, None));
355 assert_eq!(remote_storage.pop_action(0), Some(RemoteStorageAction(SimpleLocalEvent::SubAck(1, 1), RouteRule::ToNode(1000))));
356 assert_eq!(remote_storage.pop_action(0), None);
357
358 remote_storage.on_event(0, 1001, SimpleRemoteEvent::Set(2, 1, vec![1], 100, None));
359 assert_eq!(
360 remote_storage.pop_action(0),
361 Some(RemoteStorageAction(SimpleLocalEvent::SetAck(2, 1, 100, true), RouteRule::ToNode(1001)))
362 );
363 assert_eq!(
364 remote_storage.pop_action(0),
365 Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
366 );
367 assert_eq!(remote_storage.pop_action(0), None);
368
369 remote_storage.tick(RESEND_AFTER_MS);
370 assert_eq!(
372 remote_storage.pop_action(RESEND_AFTER_MS),
373 Some(RemoteStorageAction(SimpleLocalEvent::OnKeySet(0, 1, vec![1], 100, 1001), RouteRule::ToNode(1000)))
374 );
375 assert_eq!(remote_storage.pop_action(RESEND_AFTER_MS), None);
376 }
377}