pdk_classy/shared_data/
concurrent_shared_data.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Implementation that handles updates of the given key-value taking into account that race
6//! conditions could arise between reading and writing values to the shared memory.
7
8use std::cell::RefCell;
9use std::convert::Infallible;
10use std::rc::Rc;
11use std::time::Duration;
12
13use crate::proxy_wasm::types::{Bytes, Status};
14use log::{trace, warn};
15use serde::de::DeserializeOwned;
16use serde::Serialize;
17
18use crate::extract::{Extract, FromContext};
19use crate::host::clock::Clock;
20use crate::host::shared_data::SharedData;
21use crate::utils::random_generator;
22
23use super::in_memory_cache::InMemoryCache;
24
25/// Indicates the exit status of modification to the [`ConcurrentSharedData`]
26#[derive(Clone, PartialEq, Eq, Debug)]
27pub enum TransactionStatus {
28    /// The transaction completed as expected
29    Complete,
30    /// An unhandled error was returned by the update function.
31    InternalError,
32    /// The update could not be performed and the function decided to desist.
33    Rejected,
34}
35
36#[doc(hidden)]
37#[derive(Clone, PartialEq, Eq, Debug)]
38pub enum UpdateError {
39    Desist,
40}
41
42const CACHE_EXPIRATION_TIME_IN_MILLIS: Duration = Duration::from_secs(120);
43
44/// Implementation that handles updates of the given key-value taking into account that race
45/// conditions could arise between reading and writing values to the shared memory.
46pub struct ConcurrentSharedData {
47    shared_data: Rc<dyn SharedData>,
48    lock_version_cache: RefCell<InMemoryCache<Option<u32>>>,
49}
50
51impl<C> FromContext<C> for ConcurrentSharedData
52where
53    Rc<dyn Clock>: FromContext<C, Error = Infallible>,
54    Rc<dyn SharedData>: FromContext<C, Error = Infallible>,
55{
56    type Error = Infallible;
57
58    fn from_context(context: &C) -> Result<Self, Self::Error> {
59        let clock: Rc<dyn Clock> = context.extract()?;
60        let shared_data: Rc<dyn SharedData> = context.extract()?;
61
62        Ok(ConcurrentSharedData::new(clock, shared_data))
63    }
64}
65
66impl ConcurrentSharedData {
67    /// Create a new instance.
68    pub fn new(clock: Rc<dyn Clock>, shared_data: Rc<dyn SharedData>) -> Self {
69        Self {
70            shared_data,
71            lock_version_cache: RefCell::new(InMemoryCache::new(
72                clock,
73                CACHE_EXPIRATION_TIME_IN_MILLIS,
74            )),
75        }
76    }
77
78    /// Insert an element, if the element has a different value from the last time it was read, then
79    /// the function will be invoked to resolve the conflict.
80    pub fn insert<T, F>(
81        &self,
82        key: String,
83        data: T,
84        handle_consistency: F,
85    ) -> (TransactionStatus, T)
86    where
87        T: Clone + Serialize + DeserializeOwned,
88        F: Fn(T, T) -> Option<T>,
89    {
90        let mut cache = self.lock_version_cache.borrow_mut();
91        let lock_version = match cache.get(&key) {
92            Some(cached_cas) => cached_cas,
93            None => {
94                if let (_, Some(stored_version)) = self.shared_data.shared_data_get(&key) {
95                    Some(stored_version)
96                } else {
97                    // We are using a random  CAS generator in order to avoid concurrency inconsistencies
98                    // as a consequence of two workers that reach the shared data at the same time and there are not
99                    // previous information linked to the key that we are going to save
100                    Self::generate_random_lock_version()
101                }
102            }
103        };
104
105        let (transaction_status, algorithm_state) =
106            self.lock_and_save(&key, data, lock_version, handle_consistency);
107        cache.remove(&key);
108        (transaction_status, algorithm_state)
109    }
110
111    /// Updates an element, if the element has a different value from the last time it was read, then
112    /// the function will be invoked to resolve the conflict.
113    pub fn update<T, F>(&self, key: &str, update_function: F) -> (TransactionStatus, Option<T>)
114    where
115        T: Clone + Serialize + DeserializeOwned,
116        F: Fn(Option<&T>) -> Result<Option<T>, UpdateError>,
117    {
118        loop {
119            let (data, lock) = self.shared_data.shared_data_get(key);
120            let data: Option<T> = data.and_then(|data| bincode::deserialize(&data).ok());
121            let lock = lock
122                .map(Option::Some)
123                .unwrap_or_else(Self::generate_random_lock_version);
124
125            match update_function(data.as_ref()) {
126                Ok(Some(value)) => match self.save(key, &value, lock) {
127                    Ok(()) => return (TransactionStatus::Complete, Some(value)),
128                    Err(e) => {
129                        trace!(
130                            "Failed to persist data for identifier {} with error {:?}",
131                            &key,
132                            e
133                        );
134                        if e != Status::CasMismatch {
135                            return (TransactionStatus::InternalError, None);
136                        }
137                    }
138                },
139                Ok(None) => match self.shared_data.shared_data_remove(key, None) {
140                    Ok(_) => return (TransactionStatus::Complete, None),
141                    Err(_) => return (TransactionStatus::InternalError, None),
142                },
143                Err(UpdateError::Desist) => {
144                    return (TransactionStatus::Rejected, data);
145                }
146            }
147        }
148    }
149
150    /// Get an element for the given key.
151    pub fn get<T>(&self, key: &str) -> Option<T>
152    where
153        T: Clone + Serialize + DeserializeOwned,
154    {
155        if let (Some(data), cas) = self.shared_data.shared_data_get(key) {
156            self.lock_version_cache
157                .borrow_mut()
158                .save(String::from(key), cas);
159            self.deserialize_value(key, data)
160        } else {
161            None
162        }
163    }
164
165    /// Removes the element for the given key.
166    pub fn remove<T>(&self, key: &str) -> Option<T>
167    where
168        T: Clone + Serialize + DeserializeOwned,
169    {
170        match self.shared_data.shared_data_remove(key, None) {
171            Ok(Some(value)) => {
172                self.lock_version_cache.borrow_mut().remove(key);
173                self.deserialize_value(key, value)
174            }
175            Ok(None) => None,
176            Err(err) => {
177                let error_message = self.interpret_envoy_shared_data_errors(err);
178                trace!(
179                    "Failed to remove data for identifier {} with error {}",
180                    &key,
181                    &error_message
182                );
183                None
184            }
185        }
186    }
187
188    /// Get list of keys currently stored.
189    pub fn keys(&self) -> Vec<String> {
190        self.shared_data.shared_data_keys()
191    }
192
193    /// Remove the element for the given key. This function fails silently.
194    pub fn safe_remove(&self, key: &str) {
195        match self.shared_data.shared_data_remove(key, None) {
196            Ok(_) => {
197                self.lock_version_cache.borrow_mut().remove(key);
198            }
199            Err(err) => {
200                let error_message = self.interpret_envoy_shared_data_errors(err);
201                trace!(
202                    "Failed to remove data for identifier {} with error {}",
203                    &key,
204                    &error_message
205                );
206            }
207        }
208    }
209
210    fn interpret_envoy_shared_data_errors(&self, error_status: Status) -> String {
211        match error_status {
212            Status::BadArgument => String::from("Bad Argument"),
213            Status::InternalFailure => String::from("Internal Failure"),
214            Status::ParseFailure => String::from("Parse Failure"),
215            Status::NotFound => String::from("Entity Not Found"),
216            Status::Empty => String::from("Empty Entity"),
217            Status::CasMismatch => String::from("Cas Mismatch"),
218            _ => String::from("OK"),
219        }
220    }
221
222    /* We should analyze if passing a function as a parameter is the best choice to allow each storage implementation to manage consistency.
223        This decoupling strategy allows each rate limit algorithm to define their consistency implementation by themselves.
224        We believe this mechanism is enough for the beta release. However, we could consider other choices for GA.
225    */
226    /*
227        On the other hand, the compare and swap algorithm implemented here, maybe could be extracted to the PDK in future releases.
228        Additionally, Should we define a number of conciliation attempts?
229    */
230    fn lock_and_save<T, F>(
231        &self,
232        key: &str,
233        mut data_to_persist: T,
234        lock_version: Option<u32>,
235        handle_consistency: F,
236    ) -> (TransactionStatus, T)
237    where
238        T: Clone + Serialize + DeserializeOwned,
239        F: Fn(T, T) -> Option<T>,
240    {
241        let mut result: Result<(), Status> = self.save(key, &data_to_persist, lock_version);
242
243        while let Err(Status::CasMismatch) = result {
244            trace!(
245                "Failed to persist data for identifier {} with error {:?}",
246                &key,
247                Status::CasMismatch
248            );
249
250            match self.shared_data.shared_data_get(key) {
251                (Some(data), lock_version) => {
252                    trace!("Executing handle consistency function for key {key}");
253                    if let Ok(current_data) = bincode::deserialize::<T>(&data) {
254                        let consistency_result =
255                            handle_consistency(current_data.clone(), data_to_persist.clone());
256                        if let Some(value) = consistency_result {
257                            data_to_persist = value;
258                            result = self.save(key, &data_to_persist, lock_version);
259                        } else {
260                            return (TransactionStatus::Rejected, current_data);
261                        }
262                    } else {
263                        return (TransactionStatus::InternalError, data_to_persist);
264                    }
265                }
266                (None, Some(version)) => {
267                    trace!("No value found for {key}, but lock version present, retrying store");
268                    result = self.save(key, &data_to_persist, Some(version));
269                }
270                (None, None) => {
271                    trace!("No value found for {key}, retrying store");
272                    result = self.save(key, &data_to_persist, lock_version);
273                }
274            }
275        }
276
277        if let Err(err) = result {
278            let error_message = self.interpret_envoy_shared_data_errors(err);
279            trace!(
280                "Failed to persist data for identifier {} with error {}",
281                &key,
282                &error_message
283            );
284            (TransactionStatus::InternalError, data_to_persist)
285        } else {
286            (TransactionStatus::Complete, data_to_persist)
287        }
288    }
289
290    fn save<T>(&self, key: &str, state: &T, lock_version: Option<u32>) -> Result<(), Status>
291    where
292        T: Serialize + DeserializeOwned,
293    {
294        let serialized_state = bincode::serialize(state).unwrap();
295        self.shared_data
296            .shared_data_set(key, serialized_state.as_slice(), lock_version)
297    }
298
299    fn generate_random_lock_version() -> Option<u32> {
300        // Due to the degree of parallelism managed by Envoy Workers, each worker will have their independent object instances.
301        // Additionally, filters tend to be stateless (regardless of shared data and shared queues).
302        // Therefore, to guarantee independent random CAS numbers between workers and iterations, it is necessary to set a new seed each time.
303        // This is necessary because generated random numbers that share the same seed will produce the same sequence even though they are created by different workers.
304        // On the other hand, since we are creating these objects for each HTTP context, it is not possible in this scheme to create a unique generator to ask for new random numbers from the sequence.
305        // To avoid this type of synchronization, we should analyze Singleton Envoy Services.
306
307        match random_generator::generate_u32() {
308            Ok(version) => Some(version),
309            Err(e) => {
310                log::error!("Error trying to generate version: {e}");
311                None
312            }
313        }
314    }
315
316    fn deserialize_value<T>(&self, key: &str, data: Bytes) -> Option<T>
317    where
318        T: Clone + Serialize + DeserializeOwned,
319    {
320        let result = bincode::deserialize(&data);
321        match result {
322            Ok(value) => Some(value),
323            Err(err) => {
324                warn!("Unexpected error trying to deserialize value for key {key}: {err:?}");
325                None
326            }
327        }
328    }
329}
330
331#[cfg(test)]
332mod test {
333    use std::cell::RefCell;
334    use std::ops::Add;
335    use std::rc::Rc;
336    use std::time::{Duration, SystemTime};
337
338    use crate::proxy_wasm::types::{Bytes, Status};
339    use mockall::mock;
340    use mockall::predicate::{always, eq};
341    use mockall::Sequence;
342    use serde::{Deserialize, Serialize};
343
344    use super::InMemoryCache;
345    use super::{ConcurrentSharedData, TransactionStatus, CACHE_EXPIRATION_TIME_IN_MILLIS};
346    use crate::host::clock::TimeUnit;
347
348    mock! {
349
350         pub SharedData {}
351
352         impl crate::host::shared_data::SharedData for SharedData {
353           fn shared_data_get(&self, key: &str) -> (Option<Bytes>, Option<u32>);
354           fn shared_data_set(&self, key: &str, value: &[u8], version: Option<u32>) -> Result<(), Status>;
355           fn shared_data_remove (&self, key: &str, version: Option<u32>) -> Result<Option<Bytes>, Status>;
356           fn shared_data_keys (&self) -> Vec<String>;
357         }
358    }
359
360    mock! {
361
362         pub Clock {}
363
364         impl crate::host::clock::Clock for Clock {
365               fn get_current_time(&self) -> SystemTime;
366               fn get_current_time_unit(&self, unit:TimeUnit) ->u128;
367         }
368    }
369
370    #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
371    pub struct SerializableObject {
372        property_one: u64,
373        property_two: u64,
374    }
375
376    #[test]
377    fn get_state_successfully() {
378        let state = SerializableObject {
379            property_one: 1,
380            property_two: 10,
381        };
382        let serialized_state = bincode::serialize(&state);
383        let mut mock_clock = MockClock::new();
384
385        let now = SystemTime::now();
386        let now_plus_five_seconds = now.add(Duration::new(5, 0));
387
388        let mut mock_shared_data = MockSharedData::new();
389
390        mock_shared_data_get(
391            &mut mock_shared_data,
392            Some(serialized_state.as_ref().unwrap().clone()),
393            None,
394            None,
395        );
396
397        mock_clock
398            .expect_get_current_time()
399            .times(1)
400            .returning(move || now);
401
402        mock_clock
403            .expect_get_current_time()
404            .times(1)
405            .returning(move || now_plus_five_seconds);
406
407        let storage = ConcurrentSharedData {
408            lock_version_cache: RefCell::new(InMemoryCache::new(
409                Rc::new(mock_clock),
410                CACHE_EXPIRATION_TIME_IN_MILLIS,
411            )),
412            shared_data: Rc::new(mock_shared_data),
413        };
414        let found_state = storage.get("key");
415        assert_eq!(state, found_state.unwrap());
416    }
417
418    #[test]
419    fn get_non_existent_state() {
420        let mut mock_clock = MockClock::new();
421        let mut mock_shared_data = MockSharedData::new();
422
423        mock_clock_now(&mut mock_clock, &mut Sequence::new());
424
425        mock_shared_data_get(&mut mock_shared_data, None, None, None);
426
427        let storage = ConcurrentSharedData {
428            lock_version_cache: RefCell::new(InMemoryCache::new(
429                Rc::new(mock_clock),
430                CACHE_EXPIRATION_TIME_IN_MILLIS,
431            )),
432            shared_data: Rc::new(mock_shared_data),
433        };
434        let found_state: Option<SerializableObject> = storage.get("key");
435        assert!(found_state.is_none());
436    }
437
438    #[test]
439    fn remove_non_existent() {
440        let mut mock_clock = MockClock::new();
441        let mut mock_shared_data = MockSharedData::new();
442
443        mock_shared_data
444            .expect_shared_data_remove()
445            .with(eq("key"), eq(None))
446            .times(1)
447            .returning(move |_x: &str, _y: Option<u32>| Ok(None));
448
449        mock_clock_now(&mut mock_clock, &mut Sequence::new());
450
451        let storage = ConcurrentSharedData {
452            lock_version_cache: RefCell::new(InMemoryCache::new(
453                Rc::new(mock_clock),
454                CACHE_EXPIRATION_TIME_IN_MILLIS,
455            )),
456            shared_data: Rc::new(mock_shared_data),
457        };
458
459        let found_state: Option<SerializableObject> = storage.remove("key");
460
461        assert!(found_state.is_none());
462    }
463
464    #[test]
465    fn remove_stored_object() {
466        let mut mock_clock = MockClock::new();
467        let mut mock_shared_data = MockSharedData::new();
468
469        let persisted_state = SerializableObject {
470            property_one: 1,
471            property_two: 100,
472        };
473        let serialized_persisted_state = bincode::serialize(&persisted_state);
474
475        mock_shared_data
476            .expect_shared_data_remove()
477            .with(eq("key"), eq(None))
478            .times(1)
479            .returning(move |_x: &str, _y: Option<u32>| {
480                Ok(Some(serialized_persisted_state.as_ref().unwrap().clone()))
481            });
482
483        mock_clock_now(&mut mock_clock, &mut Sequence::new());
484        mock_clock_now(&mut mock_clock, &mut Sequence::new());
485
486        let mut lock_version_cache: InMemoryCache<Option<u32>> =
487            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
488
489        lock_version_cache.save(String::from("key"), Option::from(1));
490
491        let storage = ConcurrentSharedData {
492            lock_version_cache: RefCell::new(lock_version_cache),
493            shared_data: Rc::new(mock_shared_data),
494        };
495
496        let found_state: Option<SerializableObject> = storage.remove("key");
497
498        assert_eq!(found_state, Some(persisted_state));
499        let cache = storage.lock_version_cache.borrow();
500        assert_eq!(cache.get("key"), None)
501    }
502
503    #[test]
504    fn save_state_without_optimistic_locking() {
505        let mut mock_clock = MockClock::new();
506        let state = SerializableObject {
507            property_one: 1,
508            property_two: 10,
509        };
510        let expected_state = state;
511        let mut mock_shared_data = MockSharedData::new();
512
513        mock_shared_data_get(&mut mock_shared_data, None, None, None);
514
515        mock_clock_now(&mut mock_clock, &mut Sequence::new());
516
517        mock_shared_data
518            .expect_shared_data_set()
519            .with(eq("key"), always(), always())
520            .times(1)
521            .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Ok(()));
522
523        let storage = ConcurrentSharedData {
524            lock_version_cache: RefCell::new(InMemoryCache::new(
525                Rc::new(mock_clock),
526                CACHE_EXPIRATION_TIME_IN_MILLIS,
527            )),
528            shared_data: Rc::new(mock_shared_data),
529        };
530        let (status, found_state) =
531            storage.insert(String::from("key"), state, |_previous, _new| Option::None);
532        assert_eq!(TransactionStatus::Complete, status);
533        assert_eq!(expected_state, found_state);
534    }
535
536    #[test]
537    fn save_state_with_correct_lock_version() {
538        let now = SystemTime::now();
539        let now_plus_five_seconds = now.add(Duration::new(5, 0));
540
541        let state = SerializableObject {
542            property_one: 1,
543            property_two: 10,
544        };
545        let expected_state = state;
546
547        let mut mock_clock = MockClock::new();
548
549        let mut mock_shared_data = MockSharedData::new();
550
551        mock_shared_data_set(&mut mock_shared_data, &mut Sequence::new());
552
553        let mut seq = Sequence::new();
554
555        mock_clock
556            .expect_get_current_time()
557            .times(1)
558            .returning(move || now)
559            .in_sequence(&mut seq);
560
561        mock_clock
562            .expect_get_current_time()
563            .times(1)
564            .returning(move || now_plus_five_seconds)
565            .in_sequence(&mut seq);
566
567        let mut lock_version_cache: InMemoryCache<Option<u32>> =
568            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
569
570        lock_version_cache.save(String::from("key"), Option::from(1));
571
572        let mut storage = ConcurrentSharedData {
573            lock_version_cache: RefCell::new(lock_version_cache),
574            shared_data: Rc::new(mock_shared_data),
575        };
576        let (status, algorithm_obtained_state) =
577            storage.insert(String::from("key"), state, |_previous, _new| Option::None);
578        let cache = storage.lock_version_cache.get_mut();
579        cache.remove("key");
580        assert_eq!(TransactionStatus::Complete, status);
581        assert_eq!(expected_state, algorithm_obtained_state);
582        assert!(cache.is_empty())
583    }
584
585    #[test]
586    fn save_state_first_time_lock_version_collision() {
587        let mut mock_clock = MockClock::new();
588        let state_to_persist = SerializableObject {
589            property_one: 1,
590            property_two: 100,
591        };
592        let persisted_state = SerializableObject {
593            property_one: 1,
594            property_two: 100,
595        };
596        let serialized_persisted_state = bincode::serialize(&persisted_state);
597
598        let mut mock_shared_data = MockSharedData::new();
599
600        let mut sequence = Sequence::new();
601
602        mock_clock_now(&mut mock_clock, &mut sequence);
603
604        mock_shared_data_get(&mut mock_shared_data, None, None, Some(&mut sequence));
605
606        mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
607
608        mock_shared_data_get(
609            &mut mock_shared_data,
610            Some(serialized_persisted_state.as_ref().unwrap().clone()),
611            Some(1),
612            None,
613        );
614
615        mock_shared_data_set(&mut mock_shared_data, &mut sequence);
616
617        let lock_version_cache: InMemoryCache<Option<u32>> =
618            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
619
620        let handle_consistency_mock_first_insertion =
621            |_x: SerializableObject, _y: SerializableObject| -> Option<SerializableObject> {
622                Option::from(SerializableObject {
623                    property_one: 2,
624                    property_two: 100,
625                })
626            };
627
628        let storage = ConcurrentSharedData {
629            lock_version_cache: RefCell::new(lock_version_cache),
630            shared_data: Rc::new(mock_shared_data),
631        };
632        let (status, found_state): (_, SerializableObject) = storage.insert(
633            String::from("key"),
634            state_to_persist,
635            handle_consistency_mock_first_insertion,
636        );
637        let cache = storage.lock_version_cache.borrow();
638        let option_key = cache.get("key");
639        assert_eq!(TransactionStatus::Complete, status);
640        assert_eq!(2, found_state.property_one);
641        assert_eq!(100, found_state.property_two);
642        assert!(option_key.is_none())
643    }
644
645    #[test]
646    fn save_state_with_incorrect_lock_version_and_retry_success() {
647        let mut mock_clock = MockClock::new();
648
649        let now = SystemTime::now();
650        let now_plus_five_seconds = now.add(Duration::new(5, 0));
651
652        let persisted_state = SerializableObject {
653            property_one: 1,
654            property_two: 10,
655        };
656        let serialized_persisted_state = bincode::serialize(&persisted_state);
657
658        let state_to_persist = SerializableObject {
659            property_one: 2,
660            property_two: 11,
661        };
662
663        let mut mock_shared_data = MockSharedData::new();
664
665        let mut sequence = Sequence::new();
666
667        mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
668
669        mock_shared_data_set(&mut mock_shared_data, &mut sequence);
670
671        mock_shared_data_get(
672            &mut mock_shared_data,
673            Some(serialized_persisted_state.as_ref().unwrap().clone()),
674            Some(1),
675            None,
676        );
677
678        let mut seq = Sequence::new();
679        mock_clock_now(&mut mock_clock, &mut seq);
680        mock_clock
681            .expect_get_current_time()
682            .times(1)
683            .returning(move || now_plus_five_seconds)
684            .in_sequence(&mut seq);
685
686        let handle_consistency_mock =
687            |_x: SerializableObject, _y: SerializableObject| -> Option<SerializableObject> {
688                Option::from(SerializableObject {
689                    property_one: 2,
690                    property_two: 11,
691                })
692            };
693
694        let mut lock_version_cache: InMemoryCache<Option<u32>> =
695            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
696
697        lock_version_cache.save(String::from("key"), Option::from(1));
698
699        let storage = ConcurrentSharedData {
700            lock_version_cache: RefCell::new(lock_version_cache),
701            shared_data: Rc::new(mock_shared_data),
702        };
703        let (status, algorithm_obtained_state) = storage.insert(
704            String::from("key"),
705            state_to_persist,
706            handle_consistency_mock,
707        );
708
709        let cache = storage.lock_version_cache.borrow();
710        let option_key = cache.get("key");
711        assert_eq!(TransactionStatus::Complete, status);
712        assert_eq!(2, algorithm_obtained_state.property_one);
713        assert_eq!(11, algorithm_obtained_state.property_two);
714        assert!(option_key.is_none())
715    }
716
717    #[test]
718    fn save_state_with_incorrect_lock_version_and_retry_with_inconsistency() {
719        let mut mock_clock = MockClock::new();
720
721        let now = SystemTime::now();
722        let now_plus_five_seconds = now.add(Duration::new(5, 0));
723
724        let persisted_state = SerializableObject {
725            property_one: 1,
726            property_two: 10,
727        };
728        let serialized_persisted_state = bincode::serialize(&persisted_state);
729        let state_to_persist = SerializableObject {
730            property_one: 2,
731            property_two: 11,
732        };
733
734        let mut mock_shared_data = MockSharedData::new();
735
736        let mut seq = Sequence::new();
737
738        mock_clock_now(&mut mock_clock, &mut seq);
739
740        mock_clock
741            .expect_get_current_time()
742            .times(1)
743            .returning(move || now_plus_five_seconds)
744            .in_sequence(&mut seq);
745
746        let mut sequence = Sequence::new();
747
748        mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
749
750        mock_shared_data_get(
751            &mut mock_shared_data,
752            Some(serialized_persisted_state.as_ref().unwrap().clone()),
753            Some(1),
754            None,
755        );
756
757        let mut lock_version_cache: InMemoryCache<Option<u32>> =
758            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
759        lock_version_cache.save(String::from("key"), Option::from(1));
760
761        let storage = ConcurrentSharedData {
762            lock_version_cache: RefCell::new(lock_version_cache),
763            shared_data: Rc::new(mock_shared_data),
764        };
765        let (status, algorithm_obtained_state) =
766            storage.insert(String::from("key"), state_to_persist, |_previous, _new| {
767                Option::None
768            });
769        let cache = storage.lock_version_cache.borrow();
770        let option_key = cache.get("key");
771        assert_eq!(TransactionStatus::Rejected, status);
772        assert_eq!(persisted_state, algorithm_obtained_state);
773        assert!(option_key.is_none())
774    }
775
776    #[test]
777    fn save_state_with_incorrect_lock_version_and_retry_value_not_found() {
778        let mut mock_clock = MockClock::new();
779
780        let now = SystemTime::now();
781        let now_plus_five_seconds = now.add(Duration::new(5, 0));
782
783        let state_to_persist = SerializableObject {
784            property_one: 2,
785            property_two: 11,
786        };
787
788        let mut mock_shared_data = MockSharedData::new();
789
790        let mut sequence = Sequence::new();
791
792        mock_shared_data_set_cas_mismatch(&mut mock_shared_data, &mut sequence);
793
794        mock_shared_data_get(&mut mock_shared_data, None, None, None);
795
796        mock_shared_data_set(&mut mock_shared_data, &mut sequence);
797
798        let mut seq = Sequence::new();
799        mock_clock_now(&mut mock_clock, &mut seq);
800
801        mock_clock
802            .expect_get_current_time()
803            .times(1)
804            .returning(move || now_plus_five_seconds)
805            .in_sequence(&mut seq);
806
807        let mut lock_version_cache: InMemoryCache<Option<u32>> =
808            InMemoryCache::new(Rc::new(mock_clock), CACHE_EXPIRATION_TIME_IN_MILLIS);
809
810        lock_version_cache.save(String::from("key"), Option::from(1));
811
812        let storage = ConcurrentSharedData {
813            lock_version_cache: RefCell::new(lock_version_cache),
814            shared_data: Rc::new(mock_shared_data),
815        };
816        let (status, algorithm_obtained_state) =
817            storage.insert(String::from("key"), state_to_persist, |_previous, _new| {
818                Option::None
819            });
820
821        let cache = storage.lock_version_cache.borrow();
822        let option_key = cache.get("key");
823
824        assert_eq!(TransactionStatus::Complete, status);
825        assert_eq!(
826            state_to_persist.property_one,
827            algorithm_obtained_state.property_one
828        );
829        assert_eq!(
830            state_to_persist.property_two,
831            algorithm_obtained_state.property_two
832        );
833        assert!(option_key.is_none())
834    }
835
836    fn mock_shared_data_get(
837        mock_shared_data: &mut MockSharedData,
838        value: Option<Vec<u8>>,
839        cas: Option<u32>,
840        seq: Option<&mut Sequence>,
841    ) {
842        let ongoing = mock_shared_data
843            .expect_shared_data_get()
844            .with(eq("key"))
845            .times(1)
846            .returning(move |_x: &str| (value.clone(), cas));
847
848        if let Some(s) = seq {
849            ongoing.in_sequence(s);
850        }
851    }
852
853    fn mock_shared_data_set(mock_shared_data: &mut MockSharedData, sequence: &mut Sequence) {
854        mock_shared_data
855            .expect_shared_data_set()
856            .times(1)
857            .in_sequence(sequence)
858            .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Ok(()));
859    }
860
861    fn mock_shared_data_set_cas_mismatch(
862        mock_shared_data: &mut MockSharedData,
863        sequence: &mut Sequence,
864    ) {
865        mock_shared_data
866            .expect_shared_data_set()
867            .times(1)
868            .in_sequence(sequence)
869            .returning(move |_x: &str, _value: &[u8], _lock: Option<u32>| Err(Status::CasMismatch));
870    }
871
872    fn mock_clock_now(mock_clock: &mut MockClock, seq: &mut Sequence) {
873        let now = SystemTime::now();
874        mock_clock
875            .expect_get_current_time()
876            .times(1)
877            .returning(move || now)
878            .in_sequence(seq);
879    }
880}