1use std::cell::RefCell;
9use std::convert::Infallible;
10use std::rc::Rc;
11use std::time::Duration;
12
13use crate::proxy_wasm::types::{Bytes, Status};
14use log::{trace, warn};
15use serde::de::DeserializeOwned;
16use serde::Serialize;
17
18use crate::extract::{Extract, FromContext};
19use crate::host::clock::Clock;
20use crate::host::shared_data::SharedData;
21use crate::utils::random_generator;
22
23use super::in_memory_cache::InMemoryCache;
24
25#[derive(Clone, PartialEq, Eq, Debug)]
27pub enum TransactionStatus {
28 Complete,
30 InternalError,
32 Rejected,
34}
35
36#[doc(hidden)]
37#[derive(Clone, PartialEq, Eq, Debug)]
38pub enum UpdateError {
39 Desist,
40}
41
42const CACHE_EXPIRATION_TIME_IN_MILLIS: Duration = Duration::from_secs(120);
43
44pub struct ConcurrentSharedData {
47 shared_data: Rc<dyn SharedData>,
48 lock_version_cache: RefCell<InMemoryCache<Option<u32>>>,
49}
50
51impl<C> FromContext<C> for ConcurrentSharedData
52where
53 Rc<dyn Clock>: FromContext<C, Error = Infallible>,
54 Rc<dyn SharedData>: FromContext<C, Error = Infallible>,
55{
56 type Error = Infallible;
57
58 fn from_context(context: &C) -> Result<Self, Self::Error> {
59 let clock: Rc<dyn Clock> = context.extract()?;
60 let shared_data: Rc<dyn SharedData> = context.extract()?;
61
62 Ok(ConcurrentSharedData::new(clock, shared_data))
63 }
64}
65
66impl ConcurrentSharedData {
67 pub fn new(clock: Rc<dyn Clock>, shared_data: Rc<dyn SharedData>) -> Self {
69 Self {
70 shared_data,
71 lock_version_cache: RefCell::new(InMemoryCache::new(
72 clock,
73 CACHE_EXPIRATION_TIME_IN_MILLIS,
74 )),
75 }
76 }
77
78 pub fn insert<T, F>(
81 &self,
82 key: String,
83 data: T,
84 handle_consistency: F,
85 ) -> (TransactionStatus, T)
86 where
87 T: Clone + Serialize + DeserializeOwned,
88 F: Fn(T, T) -> Option<T>,
89 {
90 let mut cache = self.lock_version_cache.borrow_mut();
91 let lock_version = match cache.get(&key) {
92 Some(cached_cas) => cached_cas,
93 None => {
94 if let (_, Some(stored_version)) = self.shared_data.shared_data_get(&key) {
95 Some(stored_version)
96 } else {
97 Self::generate_random_lock_version()
101 }
102 }
103 };
104
105 let (transaction_status, algorithm_state) =
106 self.lock_and_save(&key, data, lock_version, handle_consistency);
107 cache.remove(&key);
108 (transaction_status, algorithm_state)
109 }
110
111 pub fn update<T, F>(&self, key: &str, update_function: F) -> (TransactionStatus, Option<T>)
114 where
115 T: Clone + Serialize + DeserializeOwned,
116 F: Fn(Option<&T>) -> Result<Option<T>, UpdateError>,
117 {
118 loop {
119 let (data, lock) = self.shared_data.shared_data_get(key);
120 let data: Option<T> = data.and_then(|data| bincode::deserialize(&data).ok());
121 let lock = lock
122 .map(Option::Some)
123 .unwrap_or_else(Self::generate_random_lock_version);
124
125 match update_function(data.as_ref()) {
126 Ok(Some(value)) => match self.save(key, &value, lock) {
127 Ok(()) => return (TransactionStatus::Complete, Some(value)),
128 Err(e) => {
129 trace!(
130 "Failed to persist data for identifier {} with error {:?}",
131 &key,
132 e
133 );
134 if e != Status::CasMismatch {
135 return (TransactionStatus::InternalError, None);
136 }
137 }
138 },
139 Ok(None) => match self.shared_data.shared_data_remove(key, None) {
140 Ok(_) => return (TransactionStatus::Complete, None),
141 Err(_) => return (TransactionStatus::InternalError, None),
142 },
143 Err(UpdateError::Desist) => {
144 return (TransactionStatus::Rejected, data);
145 }
146 }
147 }
148 }
149
150 pub fn get<T>(&self, key: &str) -> Option<T>
152 where
153 T: Clone + Serialize + DeserializeOwned,
154 {
155 if let (Some(data), cas) = self.shared_data.shared_data_get(key) {
156 self.lock_version_cache
157 .borrow_mut()
158 .save(String::from(key), cas);
159 self.deserialize_value(key, data)
160 } else {
161 None
162 }
163 }
164
165 pub fn remove<T>(&self, key: &str) -> Option<T>
167 where
168 T: Clone + Serialize + DeserializeOwned,
169 {
170 match self.shared_data.shared_data_remove(key, None) {
171 Ok(Some(value)) => {
172 self.lock_version_cache.borrow_mut().remove(key);
173 self.deserialize_value(key, value)
174 }
175 Ok(None) => None,
176 Err(err) => {
177 let error_message = self.interpret_envoy_shared_data_errors(err);
178 trace!(
179 "Failed to remove data for identifier {} with error {}",
180 &key,
181 &error_message
182 );
183 None
184 }
185 }
186 }
187
188 pub fn keys(&self) -> Vec<String> {
190 self.shared_data.shared_data_keys()
191 }
192
193 pub fn safe_remove(&self, key: &str) {
195 match self.shared_data.shared_data_remove(key, None) {
196 Ok(_) => {
197 self.lock_version_cache.borrow_mut().remove(key);
198 }
199 Err(err) => {
200 let error_message = self.interpret_envoy_shared_data_errors(err);
201 trace!(
202 "Failed to remove data for identifier {} with error {}",
203 &key,
204 &error_message
205 );
206 }
207 }
208 }
209
210 fn interpret_envoy_shared_data_errors(&self, error_status: Status) -> String {
211 match error_status {
212 Status::BadArgument => String::from("Bad Argument"),
213 Status::InternalFailure => String::from("Internal Failure"),
214 Status::ParseFailure => String::from("Parse Failure"),
215 Status::NotFound => String::from("Entity Not Found"),
216 Status::Empty => String::from("Empty Entity"),
217 Status::CasMismatch => String::from("Cas Mismatch"),
218 _ => String::from("OK"),
219 }
220 }
221
222 fn lock_and_save<T, F>(
231 &self,
232 key: &str,
233 mut data_to_persist: T,
234 lock_version: Option<u32>,
235 handle_consistency: F,
236 ) -> (TransactionStatus, T)
237 where
238 T: Clone + Serialize + DeserializeOwned,
239 F: Fn(T, T) -> Option<T>,
240 {
241 let mut result: Result<(), Status> = self.save(key, &data_to_persist, lock_version);
242
243 while let Err(Status::CasMismatch) = result {
244 trace!(
245 "Failed to persist data for identifier {} with error {:?}",
246 &key,
247 Status::CasMismatch
248 );
249
250 match self.shared_data.shared_data_get(key) {
251 (Some(data), lock_version) => {
252 trace!("Executing handle consistency function for key {key}");
253 if let Ok(current_data) = bincode::deserialize::<T>(&data) {
254 let consistency_result =
255 handle_consistency(current_data.clone(), data_to_persist.clone());
256 if let Some(value) = consistency_result {
257 data_to_persist = value;
258 result = self.save(key, &data_to_persist, lock_version);
259 } else {
260 return (TransactionStatus::Rejected, current_data);
261 }
262 } else {
263 return (TransactionStatus::InternalError, data_to_persist);
264 }
265 }
266 (None, Some(version)) => {
267 trace!("No value found for {key}, but lock version present, retrying store");
268 result = self.save(key, &data_to_persist, Some(version));
269 }
270 (None, None) => {
271 trace!("No value found for {key}, retrying store");
272 result = self.save(key, &data_to_persist, lock_version);
273 }
274 }
275 }
276
277 if let Err(err) = result {
278 let error_message = self.interpret_envoy_shared_data_errors(err);
279 trace!(
280 "Failed to persist data for identifier {} with error {}",
281 &key,
282 &error_message
283 );
284 (TransactionStatus::InternalError, data_to_persist)
285 } else {
286 (TransactionStatus::Complete, data_to_persist)
287 }
288 }
289
290 fn save<T>(&self, key: &str, state: &T, lock_version: Option<u32>) -> Result<(), Status>
291 where
292 T: Serialize + DeserializeOwned,
293 {
294 let serialized_state = bincode::serialize(state).unwrap();
295 self.shared_data
296 .shared_data_set(key, serialized_state.as_slice(), lock_version)
297 }
298
299 fn generate_random_lock_version() -> Option<u32> {
300 match random_generator::generate_u32() {
308 Ok(version) => Some(version),
309 Err(e) => {
310 log::error!("Error trying to generate version: {e}");
311 None
312 }
313 }
314 }
315
316 fn deserialize_value<T>(&self, key: &str, data: Bytes) -> Option<T>
317 where
318 T: Clone + Serialize + DeserializeOwned,
319 {
320 let result = bincode::deserialize(&data);
321 match result {
322 Ok(value) => Some(value),
323 Err(err) => {
324 warn!("Unexpected error trying to deserialize value for key {key}: {err:?}");
325 None
326 }
327 }
328 }
329}
330
331#[cfg(test)]
332mod test {
333 use std::cell::RefCell;
334 use std::ops::Add;
335 use std::rc::Rc;
336 use std::time::{Duration, SystemTime};
337
338 use crate::proxy_wasm::types::{Bytes, Status};
339 use mockall::mock;
340 use mockall::predicate::{always, eq};
341 use mockall::Sequence;
342 use serde::{Deserialize, Serialize};
343
344 use super::InMemoryCache;
345 use super::{ConcurrentSharedData, TransactionStatus, CACHE_EXPIRATION_TIME_IN_MILLIS};
346 use crate::host::clock::TimeUnit;
347
348 mock! {
349
350 pub SharedData {}
351
352 impl crate::host::shared_data::SharedData for SharedData {
353 fn shared_data_get(&self, key: &str) -> (Option<Bytes>, Option<u32>);
354 fn shared_data_set(&self, key: &str, value: &[u8], version: Option<u32>) -> Result<(), Status>;
355 fn shared_data_remove (&self, key: &str, version: Option<u32>) -> Result<Option<Bytes>, Status>;
356 fn shared_data_keys (&self) -> Vec<String>;
357 }
358 }
359
360 mock! {
361
362 pub Clock {}
363
364 impl crate::host::clock::Clock for Clock {
365 fn get_current_time(&self) -> SystemTime;
366 fn get_current_time_unit(&self, unit:TimeUnit) ->u128;
367 }
368 }
369
370 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
371 pub struct SerializableObject {
372 property_one: u64,
373 property_two: u64,
374 }
375
376 #[test]
377 fn get_state_successfully() {
378 let state = SerializableObject {
379 property_one: 1,
380 property_two: 10,
381 };
382 let serialized_state = bincode::serialize(&state);
383 let mut mock_clock = MockClock::new();
384
385 let now = SystemTime::now();
386 let now_plus_five_seconds = now.add(Duration::new(5, 0));
387
388 let mut mock_shared_data = MockSharedData::new();
389
390 mock_shared_data_get(
391 &mut mock_shared_data,
392 Some(serialized_state.as_ref().unwrap().clone()),
393 None,
394 None,
395 );
396
397 mock_clock
398 .expect_get_current_time()
399 .times(1)
400 .returning(move || now);
401
402 mock_clock
403 .expect_get_current_time()
404 .times(1)
405 .returning(move || now_plus_five_seconds);
406
407 let storage = ConcurrentSharedData {
408 lock_version_cache: RefCell::new(InMemoryCache::new(
409 Rc::new(mock_clock),
410 CACHE_EXPIRATION_TIME_IN_MILLIS,
411 )),
412 shared_data: Rc::new(mock_shared_data),
413 };
414 let found_state = storage.get("key");
415 assert_eq!(state, found_state.unwrap());
416 }
417
418 #[test]
419 fn get_non_existent_state() {
420 let mut mock_clock = MockClock::new();
421 let mut mock_shared_data = MockSharedData::new();
422
423 mock_clock_now(&mut mock_clock, &mut Sequence::new());
424
425 mock_shared_data_get(&mut mock_shared_data, None, None, None);
426
427 let storage = ConcurrentSharedData {
428 lock_version_cache: RefCell::new(InMemoryCache::new(
429 Rc::new(mock_clock),
430 CACHE_EXPIRATION_TIME_IN_MILLIS,
431 )),
432 shared_data: Rc::new(mock_shared_data),
433 };
434 let found_state: Option<SerializableObject> = storage.get("key");
435 assert!(found_state.is_none());
436 }
437
438 #[test]
439 fn remove_non_existent() {
440 let mut mock_clock = MockClock::new();
441 let mut mock_shared_data = MockSharedData::new();
442
443 mock_shared_data
444 .expect_shared_data_remove()
445 .with(eq("key"), eq(None))
446 .times(1)
447 .returning(move |_x: &str, _y: Option<u32>| Ok(None));
448
449 mock_clock_now(&mut mock_clock, &mut Sequence::new());
450
451 let storage = ConcurrentSharedData {
452 lock_version_cache: RefCell::new(InMemoryCache::new(
453 Rc::new(mock_clock),
454 CACHE_EXPIRATION_TIME_IN_MILLIS,
455 )),
456 shared_data: Rc::new(mock_shared_data),
457 };
458
459 let found_state: Option<SerializableObject> = storage.remove("key");
460
461 assert!(found_state.is_none());
462 }
463
464 #[test]
465 fn remove_stored_object() {
466 let mut mock_clock = MockClock::new();
467 let mut mock_shared_data = MockSharedData::new();
468
469 let persisted_state = SerializableObject {
470 property_one: 1,
471 property_two: 100,
472 };
473 let serialized_persisted_state = bincode::serialize(&persisted_state);
474
475 mock_shared_data
476 .expect_shared_data_remove()
477 .with(eq("key"), eq(None))
478 .times(1)
479 .returning(move |_x: &str, _y: Option<u32>| {
480 Ok(Some(serialized_persisted_state.as_ref().unwrap().clone()))
481 });
482
483 mock_clock_now(&mut mock_clock, &mut Sequence::new());
484 mock_clock_now(&mut mock_clock, &mut Sequence::new());
485
486 let mut lock_version_cache: InMemoryCache<Option<u32>> =
487 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
488
489 lock_version_cache.save(String::from("key"), Option::from(1));
490
491 let storage = ConcurrentSharedData {
492 lock_version_cache: RefCell::new(lock_version_cache),
493 shared_data: Rc::new(mock_shared_data),
494 };
495
496 let found_state: Option<SerializableObject> = storage.remove("key");
497
498 assert_eq!(found_state, Some(persisted_state));
499 let cache = storage.lock_version_cache.borrow();
500 assert_eq!(cache.get("key"), None)
501 }
502
503 #[test]
504 fn save_state_without_optimistic_locking() {
505 let mut mock_clock = MockClock::new();
506 let state = SerializableObject {
507 property_one: 1,
508 property_two: 10,
509 };
510 let expected_state = state;
511 let mut mock_shared_data = MockSharedData::new();
512
513 mock_shared_data_get(&mut mock_shared_data, None, None, None);
514
515 mock_clock_now(&mut mock_clock, &mut Sequence::new());
516
517 mock_shared_data
518 .expect_shared_data_set()
519 .with(eq("key"), always(), always())
520 .times(1)
521 .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Ok(()));
522
523 let storage = ConcurrentSharedData {
524 lock_version_cache: RefCell::new(InMemoryCache::new(
525 Rc::new(mock_clock),
526 CACHE_EXPIRATION_TIME_IN_MILLIS,
527 )),
528 shared_data: Rc::new(mock_shared_data),
529 };
530 let (status, found_state) =
531 storage.insert(String::from("key"), state, |_previous, _new| Option::None);
532 assert_eq!(TransactionStatus::Complete, status);
533 assert_eq!(expected_state, found_state);
534 }
535
536 #[test]
537 fn save_state_with_correct_lock_version() {
538 let now = SystemTime::now();
539 let now_plus_five_seconds = now.add(Duration::new(5, 0));
540
541 let state = SerializableObject {
542 property_one: 1,
543 property_two: 10,
544 };
545 let expected_state = state;
546
547 let mut mock_clock = MockClock::new();
548
549 let mut mock_shared_data = MockSharedData::new();
550
551 mock_shared_data_set(&mut mock_shared_data, &mut Sequence::new());
552
553 let mut seq = Sequence::new();
554
555 mock_clock
556 .expect_get_current_time()
557 .times(1)
558 .returning(move || now)
559 .in_sequence(&mut seq);
560
561 mock_clock
562 .expect_get_current_time()
563 .times(1)
564 .returning(move || now_plus_five_seconds)
565 .in_sequence(&mut seq);
566
567 let mut lock_version_cache: InMemoryCache<Option<u32>> =
568 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
569
570 lock_version_cache.save(String::from("key"), Option::from(1));
571
572 let mut storage = ConcurrentSharedData {
573 lock_version_cache: RefCell::new(lock_version_cache),
574 shared_data: Rc::new(mock_shared_data),
575 };
576 let (status, algorithm_obtained_state) =
577 storage.insert(String::from("key"), state, |_previous, _new| Option::None);
578 let cache = storage.lock_version_cache.get_mut();
579 cache.remove("key");
580 assert_eq!(TransactionStatus::Complete, status);
581 assert_eq!(expected_state, algorithm_obtained_state);
582 assert!(cache.is_empty())
583 }
584
585 #[test]
586 fn save_state_first_time_lock_version_collision() {
587 let mut mock_clock = MockClock::new();
588 let state_to_persist = SerializableObject {
589 property_one: 1,
590 property_two: 100,
591 };
592 let persisted_state = SerializableObject {
593 property_one: 1,
594 property_two: 100,
595 };
596 let serialized_persisted_state = bincode::serialize(&persisted_state);
597
598 let mut mock_shared_data = MockSharedData::new();
599
600 let mut sequence = Sequence::new();
601
602 mock_clock_now(&mut mock_clock, &mut sequence);
603
604 mock_shared_data_get(&mut mock_shared_data, None, None, Some(&mut sequence));
605
606 mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
607
608 mock_shared_data_get(
609 &mut mock_shared_data,
610 Some(serialized_persisted_state.as_ref().unwrap().clone()),
611 Some(1),
612 None,
613 );
614
615 mock_shared_data_set(&mut mock_shared_data, &mut sequence);
616
617 let lock_version_cache: InMemoryCache<Option<u32>> =
618 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
619
620 let handle_consistency_mock_first_insertion =
621 |_x: SerializableObject, _y: SerializableObject| -> Option<SerializableObject> {
622 Option::from(SerializableObject {
623 property_one: 2,
624 property_two: 100,
625 })
626 };
627
628 let storage = ConcurrentSharedData {
629 lock_version_cache: RefCell::new(lock_version_cache),
630 shared_data: Rc::new(mock_shared_data),
631 };
632 let (status, found_state): (_, SerializableObject) = storage.insert(
633 String::from("key"),
634 state_to_persist,
635 handle_consistency_mock_first_insertion,
636 );
637 let cache = storage.lock_version_cache.borrow();
638 let option_key = cache.get("key");
639 assert_eq!(TransactionStatus::Complete, status);
640 assert_eq!(2, found_state.property_one);
641 assert_eq!(100, found_state.property_two);
642 assert!(option_key.is_none())
643 }
644
645 #[test]
646 fn save_state_with_incorrect_lock_version_and_retry_success() {
647 let mut mock_clock = MockClock::new();
648
649 let now = SystemTime::now();
650 let now_plus_five_seconds = now.add(Duration::new(5, 0));
651
652 let persisted_state = SerializableObject {
653 property_one: 1,
654 property_two: 10,
655 };
656 let serialized_persisted_state = bincode::serialize(&persisted_state);
657
658 let state_to_persist = SerializableObject {
659 property_one: 2,
660 property_two: 11,
661 };
662
663 let mut mock_shared_data = MockSharedData::new();
664
665 let mut sequence = Sequence::new();
666
667 mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
668
669 mock_shared_data_set(&mut mock_shared_data, &mut sequence);
670
671 mock_shared_data_get(
672 &mut mock_shared_data,
673 Some(serialized_persisted_state.as_ref().unwrap().clone()),
674 Some(1),
675 None,
676 );
677
678 let mut seq = Sequence::new();
679 mock_clock_now(&mut mock_clock, &mut seq);
680 mock_clock
681 .expect_get_current_time()
682 .times(1)
683 .returning(move || now_plus_five_seconds)
684 .in_sequence(&mut seq);
685
686 let handle_consistency_mock =
687 |_x: SerializableObject, _y: SerializableObject| -> Option<SerializableObject> {
688 Option::from(SerializableObject {
689 property_one: 2,
690 property_two: 11,
691 })
692 };
693
694 let mut lock_version_cache: InMemoryCache<Option<u32>> =
695 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
696
697 lock_version_cache.save(String::from("key"), Option::from(1));
698
699 let storage = ConcurrentSharedData {
700 lock_version_cache: RefCell::new(lock_version_cache),
701 shared_data: Rc::new(mock_shared_data),
702 };
703 let (status, algorithm_obtained_state) = storage.insert(
704 String::from("key"),
705 state_to_persist,
706 handle_consistency_mock,
707 );
708
709 let cache = storage.lock_version_cache.borrow();
710 let option_key = cache.get("key");
711 assert_eq!(TransactionStatus::Complete, status);
712 assert_eq!(2, algorithm_obtained_state.property_one);
713 assert_eq!(11, algorithm_obtained_state.property_two);
714 assert!(option_key.is_none())
715 }
716
717 #[test]
718 fn save_state_with_incorrect_lock_version_and_retry_with_inconsistency() {
719 let mut mock_clock = MockClock::new();
720
721 let now = SystemTime::now();
722 let now_plus_five_seconds = now.add(Duration::new(5, 0));
723
724 let persisted_state = SerializableObject {
725 property_one: 1,
726 property_two: 10,
727 };
728 let serialized_persisted_state = bincode::serialize(&persisted_state);
729 let state_to_persist = SerializableObject {
730 property_one: 2,
731 property_two: 11,
732 };
733
734 let mut mock_shared_data = MockSharedData::new();
735
736 let mut seq = Sequence::new();
737
738 mock_clock_now(&mut mock_clock, &mut seq);
739
740 mock_clock
741 .expect_get_current_time()
742 .times(1)
743 .returning(move || now_plus_five_seconds)
744 .in_sequence(&mut seq);
745
746 let mut sequence = Sequence::new();
747
748 mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
749
750 mock_shared_data_get(
751 &mut mock_shared_data,
752 Some(serialized_persisted_state.as_ref().unwrap().clone()),
753 Some(1),
754 None,
755 );
756
757 let mut lock_version_cache: InMemoryCache<Option<u32>> =
758 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
759 lock_version_cache.save(String::from("key"), Option::from(1));
760
761 let storage = ConcurrentSharedData {
762 lock_version_cache: RefCell::new(lock_version_cache),
763 shared_data: Rc::new(mock_shared_data),
764 };
765 let (status, algorithm_obtained_state) =
766 storage.insert(String::from("key"), state_to_persist, |_previous, _new| {
767 Option::None
768 });
769 let cache = storage.lock_version_cache.borrow();
770 let option_key = cache.get("key");
771 assert_eq!(TransactionStatus::Rejected, status);
772 assert_eq!(persisted_state, algorithm_obtained_state);
773 assert!(option_key.is_none())
774 }
775
776 #[test]
777 fn save_state_with_incorrect_lock_version_and_retry_value_not_found() {
778 let mut mock_clock = MockClock::new();
779
780 let now = SystemTime::now();
781 let now_plus_five_seconds = now.add(Duration::new(5, 0));
782
783 let state_to_persist = SerializableObject {
784 property_one: 2,
785 property_two: 11,
786 };
787
788 let mut mock_shared_data = MockSharedData::new();
789
790 let mut sequence = Sequence::new();
791
792 mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
793
794 mock_shared_data_get(&mut mock_shared_data, None, None, None);
795
796 mock_shared_data_set(&mut mock_shared_data, &mut sequence);
797
798 let mut seq = Sequence::new();
799 mock_clock_now(&mut mock_clock, &mut seq);
800
801 mock_clock
802 .expect_get_current_time()
803 .times(1)
804 .returning(move || now_plus_five_seconds)
805 .in_sequence(&mut seq);
806
807 let mut lock_version_cache: InMemoryCache<Option<u32>> =
808 InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
809
810 lock_version_cache.save(String::from("key"), Option::from(1));
811
812 let storage = ConcurrentSharedData {
813 lock_version_cache: RefCell::new(lock_version_cache),
814 shared_data: Rc::new(mock_shared_data),
815 };
816 let (status, algorithm_obtained_state) =
817 storage.insert(String::from("key"), state_to_persist, |_previous, _new| {
818 Option::None
819 });
820
821 let cache = storage.lock_version_cache.borrow();
822 let option_key = cache.get("key");
823
824 assert_eq!(TransactionStatus::Complete, status);
825 assert_eq!(
826 state_to_persist.property_one,
827 algorithm_obtained_state.property_one
828 );
829 assert_eq!(
830 state_to_persist.property_two,
831 algorithm_obtained_state.property_two
832 );
833 assert!(option_key.is_none())
834 }
835
836 fn mock_shared_data_get(
837 mock_shared_data: &mut MockSharedData,
838 value: Option<Vec<u8>>,
839 cas: Option<u32>,
840 seq: Option<&mut Sequence>,
841 ) {
842 let ongoing = mock_shared_data
843 .expect_shared_data_get()
844 .with(eq("key"))
845 .times(1)
846 .returning(move |_x: &str| (value.clone(), cas));
847
848 if let Some(s) = seq {
849 ongoing.in_sequence(s);
850 }
851 }
852
853 fn mock_shared_data_set(mock_shared_data: &mut MockSharedData, sequence: &mut Sequence) {
854 mock_shared_data
855 .expect_shared_data_set()
856 .times(1)
857 .in_sequence(sequence)
858 .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Ok(()));
859 }
860
861 fn mock_shared_data_set_cas_mismatch(
862 mock_shared_data: &mut MockSharedData,
863 sequence: &mut Sequence,
864 ) {
865 mock_shared_data
866 .expect_shared_data_set()
867 .times(1)
868 .in_sequence(sequence)
869 .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Err(Status::CasMismatch));
870 }
871
872 fn mock_clock_now(mock_clock: &mut MockClock, seq: &mut Sequence) {
873 let now = SystemTime::now();
874 mock_clock
875 .expect_get_current_time()
876 .times(1)
877 .returning(move || now)
878 .in_sequence(seq);
879 }
880}