pravega_client/sync/
synchronizer.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10use crate::client_factory::ClientFactoryAsync;
11use crate::sync::table::{Table, TableError, Version};
12
13use pravega_client_shared::Scope;
14use pravega_wire_protocol::commands::TableKey;
15
16use futures::pin_mut;
17use futures::stream::StreamExt;
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use serde_cbor::ser::Serializer as CborSerializer;
21use serde_cbor::to_vec;
22use snafu::Snafu;
23use std::clone::Clone;
24use std::cmp::{Eq, PartialEq};
25use std::collections::HashMap;
26use std::fmt::Debug;
27use std::hash::{Hash, Hasher};
28use std::slice::Iter;
29use std::time::Duration;
30use tokio::time::sleep;
31use tracing::debug;
32
33#[derive(Debug, Snafu)]
34#[snafu(visibility = "pub(crate)")]
35pub enum SynchronizerError {
36    #[snafu(display(
37        "Synchronizer failed while performing {:?} with table error: {:?}",
38        operation,
39        source
40    ))]
41    SyncTableError { operation: String, source: TableError },
42
43    #[snafu(display("Failed to run update function in table synchronizer due to: {:?}", error_msg))]
44    SyncUpdateError { error_msg: String },
45
46    #[snafu(display("Failed insert tombstone in table synchronizer due to: {:?}", error_msg))]
47    SyncTombstoneError { error_msg: String },
48
49    #[snafu(display("Failed due to Precondition check failure: {:?}", error_msg))]
50    SyncPreconditionError { error_msg: String },
51}
52
53/// Provide a map that is synchronized across different processes.
54///
55/// The goal is to have a map that can be updated by using Insert or Remove.
56/// Each process can do updates to its in memory map by supplying a
57/// function to create Insert/Remove objects.
58/// Updates from other processes can be obtained by calling fetchUpdates().
59///
60/// The name of the Synchronizer is also the stream name of the table segment.
61/// Different instances of Synchronizer with same name will point to the same table segment.
62///
63/// # Exmaples
64/// ```ignore
65/// // two synchronizer instances with the same name can communicate with each other.
66/// let mut synchronizer1 = client_factory
67///     .create_synchronizer(scope.clone(), "synchronizer".to_owned())
68///     .await;
69///
70/// let mut synchronizer2 = client_factory
71///     .create_synchronizer(scope.clone(), "synchronizer".to_owned())
72///     .await;
73///
74/// let result = synchronizer1
75///     .insert(|table| {
76///         table.insert(
77///             "outer_key_foo".to_owned(),
78///             "inner_key_bar".to_owned(),
79///             "i32".to_owned(),
80///             Box::new(1),
81///         );
82///         Ok(None)
83///     })
84///     .await;
85/// assert!(result.is_ok());
86///
87/// let entries_num = synchronizer2.fetch_updates().await.expect("fetch updates");
88/// assert_eq!(entries_num, 1);
89/// let value_option = synchronizer2.get("outer_key_foo", "inner_key_bar");
90/// assert!(value_option.is_some());
91///
92/// ```
93pub struct Synchronizer {
94    /// The name of the Synchronizer.
95    name: String,
96
97    /// Table is the table segment client.
98    table_map: Table,
99
100    /// in_memory_map is a two-level nested hash map that uses two keys to identify a value.
101    /// The reason to make it a nested map is that the actual data structures shared across
102    /// different processes are often more complex than a simple hash map. The problem of using a
103    /// simple hash map to model a complex data structure is that the key will be coarse-grained
104    /// and every update will incur a lot of overhead.
105    /// A two-level hash map is fine for now, maybe it will need more nested layers in the future.
106    in_memory_map: HashMap<String, HashMap<Key, Value>>,
107
108    /// in_memory_map_version is used to monitor the versions of each second level hash maps.
109    /// The idea is to monitor the changes of the second level hash maps besides individual keys
110    /// since some logic may depend on a certain map not being changed during an update.
111    in_memory_map_version: HashMap<Key, Value>,
112
113    /// An offset that is used to make conditional updates.
114    table_segment_offset: i64,
115
116    /// The latest fetch position on the server side.
117    fetch_position: i64,
118}
119
120// Max number of retries by the table synchronizer in case of a failure.
121const MAX_RETRIES: i32 = 10;
122// Wait until next attempt.
123const DELAY_MILLIS: u64 = 1000;
124
125impl Synchronizer {
126    pub(crate) async fn new(scope: Scope, name: String, factory: ClientFactoryAsync) -> Synchronizer {
127        let table_map = Table::new(scope, name.clone(), factory)
128            .await
129            .expect("create table");
130        Synchronizer {
131            name: name.clone(),
132            table_map,
133            in_memory_map: HashMap::new(),
134            in_memory_map_version: HashMap::new(),
135            table_segment_offset: -1,
136            fetch_position: 0,
137        }
138    }
139
140    /// Get the outer map currently held in memory.
141    /// The return type does not contain the version information.
142    pub fn get_outer_map(&self) -> HashMap<String, HashMap<String, Value>> {
143        self.in_memory_map
144            .iter()
145            .map(|(k, v)| {
146                (
147                    k.clone(),
148                    v.iter()
149                        .filter(|(_k2, v2)| v2.type_id != TOMBSTONE)
150                        .map(|(k2, v2)| (k2.key.clone(), v2.clone()))
151                        .collect::<HashMap<String, Value>>(),
152                )
153            })
154            .collect()
155    }
156
157    /// Get the inner map currently held in memory.
158    /// The return type does not contain the version information.
159    pub fn get_inner_map(&self, outer_key: &str) -> HashMap<String, Value> {
160        self.in_memory_map
161            .get(outer_key)
162            .map_or_else(HashMap::new, |inner| {
163                inner
164                    .iter()
165                    .filter(|(_k, v)| v.type_id != TOMBSTONE)
166                    .map(|(k, v)| (k.key.clone(), v.clone()))
167                    .collect::<HashMap<String, Value>>()
168            })
169    }
170
171    fn get_inner_map_version(&self) -> HashMap<String, Value> {
172        self.in_memory_map_version
173            .iter()
174            .map(|(k, v)| (k.key.clone(), v.clone()))
175            .collect()
176    }
177
178    /// Get the name of the Synchronizer.
179    pub fn get_name(&self) -> String {
180        self.name.clone()
181    }
182
183    /// Get the Value associated with the map.
184    /// The data in Value is not deserialized and the caller should call deserialize_from to deserialize.
185    pub fn get(&self, outer_key: &str, inner_key: &str) -> Option<&Value> {
186        let inner_map = self.in_memory_map.get(outer_key)?;
187
188        let search_key_inner = Key {
189            key: inner_key.to_owned(),
190            key_version: TableKey::KEY_NO_VERSION,
191        };
192
193        inner_map.get(&search_key_inner).and_then(
194            |val| {
195                if val.type_id == TOMBSTONE {
196                    None
197                } else {
198                    Some(val)
199                }
200            },
201        )
202    }
203
204    /// Get the Key version of the given key,
205    pub fn get_key_version(&self, outer_key: &str, inner_key: &Option<String>) -> Version {
206        if let Some(inner) = inner_key {
207            let search_key = Key {
208                key: inner.to_owned(),
209                key_version: TableKey::KEY_NO_VERSION,
210            };
211            if let Some(inner_map) = self.in_memory_map.get(outer_key) {
212                if let Some((key, _value)) = inner_map.get_key_value(&search_key) {
213                    return key.key_version;
214                }
215            }
216            TableKey::KEY_NOT_EXISTS
217        } else {
218            let search_key = Key {
219                key: outer_key.to_owned(),
220                key_version: TableKey::KEY_NO_VERSION,
221            };
222            if let Some((key, _value)) = self.in_memory_map_version.get_key_value(&search_key) {
223                key.key_version
224            } else {
225                TableKey::KEY_NOT_EXISTS
226            }
227        }
228    }
229
230    /// Get the key-value pair of given key,
231    /// It will return a copy of the key-value pair.
232    fn get_key_value(&self, outer_key: &str, inner_key: &str) -> Option<(String, Value)> {
233        let inner_map = self.in_memory_map.get(outer_key)?;
234
235        let search_key = Key {
236            key: inner_key.to_owned(),
237            key_version: TableKey::KEY_NO_VERSION,
238        };
239
240        if let Some((key, value)) = inner_map.get_key_value(&search_key) {
241            Some((key.key.clone(), value.clone()))
242        } else {
243            None
244        }
245    }
246
247    /// Fetch the latest map from remote server and apply it to the local map.
248    pub async fn fetch_updates(&mut self) -> Result<i32, TableError> {
249        debug!(
250            "fetch the latest map and apply to the local map, fetch from position {}",
251            self.fetch_position
252        );
253        let reply = self
254            .table_map
255            .read_entries_stream_from_position(10, self.fetch_position);
256        pin_mut!(reply);
257
258        let mut counter: i32 = 0;
259        while let Some(entry) = reply.next().await {
260            let (k, v, version, last_position) = entry?;
261            debug!("fetched key with version {}", version);
262            let internal_key = InternalKey { key: k };
263            let (outer_key, inner_key) = internal_key.split();
264
265            if let Some(inner) = inner_key {
266                // the key is a composite key, update the nested hashmap
267                let inner_map_key = Key {
268                    key: inner,
269                    key_version: version,
270                };
271                let inner_map = self.in_memory_map.entry(outer_key).or_default();
272
273                // this is necessary since insert will not update the Key
274                inner_map.remove(&inner_map_key);
275                inner_map.insert(inner_map_key, v);
276            } else {
277                // the key is an outer key, update the map version
278                let outer_map_key = Key {
279                    key: outer_key,
280                    key_version: version,
281                };
282                // this is necessary since insert will not update the Key
283                self.in_memory_map_version.remove(&outer_map_key.clone());
284                self.in_memory_map_version.insert(outer_map_key, v);
285            }
286            self.fetch_position = last_position;
287            counter += 1;
288        }
289        debug!("finished fetching updates");
290        Ok(counter)
291    }
292
293    /// Insert/Update a list of keys and applies it atomically to the local map.
294    /// This will update the local map to the latest version.
295    pub async fn insert<R>(
296        &mut self,
297        updates_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
298    ) -> Result<R, SynchronizerError> {
299        conditionally_write(updates_generator, self, MAX_RETRIES).await
300    }
301
302    /// Remove a list of keys and apply it atomically to local map.
303    /// This will update the local map to latest version.
304    pub async fn remove<R>(
305        &mut self,
306        deletes_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
307    ) -> Result<R, SynchronizerError> {
308        conditionally_remove(deletes_generator, self, MAX_RETRIES).await
309    }
310}
311
312/// The Key struct in the in memory map. It contains two fields, the key and key_version.
313/// The key_version is used for conditional update on server side. If the key_version is i64::MIN,
314/// then the update will be unconditional.
315#[derive(Debug, Clone)]
316pub struct Key {
317    pub key: String,
318    pub key_version: Version,
319}
320
321impl PartialEq for Key {
322    fn eq(&self, other: &Self) -> bool {
323        self.key == other.key
324    }
325}
326
327impl Eq for Key {}
328
329impl Hash for Key {
330    fn hash<H: Hasher>(&self, state: &mut H) {
331        self.key.hash(state)
332    }
333}
334
335const PREFIX_LENGTH: usize = 2;
336
337// This is used to parse the key received from the server.
338struct InternalKey {
339    pub key: String,
340}
341
342impl InternalKey {
343    fn split(&self) -> (String, Option<String>) {
344        let outer_name_length: usize = self.key[..PREFIX_LENGTH].parse().expect("parse prefix length");
345        assert!(self.key.len() >= PREFIX_LENGTH + outer_name_length);
346        let outer = self.key[PREFIX_LENGTH..PREFIX_LENGTH + outer_name_length]
347            .parse::<String>()
348            .expect("parse outer key");
349
350        if self.key.len() > PREFIX_LENGTH + outer_name_length {
351            // there is a slash separating outer_key and_inner key
352            let inner = self.key[PREFIX_LENGTH + outer_name_length + 1..]
353                .parse::<String>()
354                .expect("parse inner key");
355            (outer, Some(inner))
356        } else {
357            (outer, None)
358        }
359    }
360}
361
362/// The Value struct in the in memory map. It contains two fields.
363/// type_id: it is used by caller to figure out the exact type of the data.
364/// data: the serialized Value.
365#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
366pub struct Value {
367    pub type_id: String,
368    pub data: Vec<u8>,
369}
370
371pub const TOMBSTONE: &str = "tombstone";
372
373#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374struct Tombstone {}
375
376/// The Update contains a nested map and a version map, which are the same map in
377/// synchronizer but will be updated instantly when caller calls Insert or Remove method.
378/// It is used to update the server side of table and its updates will be applied to
379/// synchronizer once the updates are successfully stored on the server side.
380pub struct Update {
381    map: HashMap<String, HashMap<String, Value>>,
382    map_version: HashMap<String, Value>,
383    insert: Vec<Insert>,
384    remove: Vec<Remove>,
385}
386
387impl Update {
388    pub fn new(
389        map: HashMap<String, HashMap<String, Value>>,
390        map_version: HashMap<String, Value>,
391        insert: Vec<Insert>,
392        remove: Vec<Remove>,
393    ) -> Self {
394        Update {
395            map,
396            map_version,
397            insert,
398            remove,
399        }
400    }
401
402    /// insert method needs an outer_key and an inner_key to find a value.
403    /// It will update the map inside the Table.
404    pub fn insert(
405        &mut self,
406        outer_key: String,
407        inner_key: String,
408        type_id: String,
409        new_value: Box<dyn ValueData>,
410    ) {
411        let data = serialize(&*new_value).expect("serialize value");
412        let insert = Insert::new(outer_key.clone(), Some(inner_key.clone()), type_id.clone());
413
414        self.insert.push(insert);
415        // also insert into map.
416        let inner_map = self.map.entry(outer_key.clone()).or_default();
417        inner_map.insert(inner_key, Value { type_id, data });
418
419        // increment the version of the map, indicating that this map has changed
420        self.increment_map_version(outer_key);
421    }
422
423    /// insert_tombstone method replaces the original value with a tombstone, which means that this
424    /// key value pair is invalid and will be removed later. The reason of adding a tombstone is
425    /// to guarantee the atomicity of a remove-and-insert operation.
426    pub fn insert_tombstone(
427        &mut self,
428        outer_key: String,
429        inner_key: String,
430    ) -> Result<(), SynchronizerError> {
431        let data = to_vec(&Tombstone {}).expect("serialize tombstone");
432        let insert = Insert::new(outer_key.clone(), Some(inner_key.clone()), "tombstone".to_owned());
433
434        self.insert.push(insert);
435
436        let inner_map = self
437            .map
438            .get_mut(&outer_key)
439            .ok_or(SynchronizerError::SyncTombstoneError {
440                error_msg: format!("outer key {} does not exist", outer_key),
441            })?;
442
443        inner_map.get(&inner_key).map_or(
444            Err(SynchronizerError::SyncTombstoneError {
445                error_msg: format!("inner key {} does not exist", inner_key),
446            }),
447            |v| {
448                if v.type_id == TOMBSTONE {
449                    Err(SynchronizerError::SyncTombstoneError {
450                        error_msg: format!(
451                            "tombstone has already been added for key {}/{}",
452                            outer_key, inner_key
453                        ),
454                    })
455                } else {
456                    Ok(())
457                }
458            },
459        )?;
460
461        inner_map.insert(
462            inner_key.clone(),
463            Value {
464                type_id: TOMBSTONE.to_owned(),
465                data,
466            },
467        );
468
469        self.increment_map_version(outer_key.clone());
470
471        // also push this key to remove list, this key will be removed after insert is completed.
472        let remove = Remove::new(outer_key.clone(), inner_key);
473        self.remove.push(remove);
474
475        Ok(())
476    }
477
478    /// remove takes an outer_key and an inner_key and removes a particular entry.
479    fn remove(&mut self, outer_key: String, inner_key: String) {
480        //Also remove from the map.
481        let inner_map = self.map.get_mut(&outer_key).expect("should contain outer key");
482        inner_map.remove(&inner_key);
483
484        let remove = Remove::new(outer_key.clone(), inner_key);
485        self.remove.push(remove);
486    }
487
488    /// retain a specific map to make sure it's not altered by other processes when an update
489    /// is being made that depends on it.
490    /// Notice that this method only needs to be called when this dependent map is not being updated,
491    /// since any modifications to a map on server side will use a version to make sure the update
492    /// is based on the latest change.
493    pub fn retain(&mut self, outer_key: String) {
494        self.increment_map_version(outer_key);
495    }
496
497    /// get method will take an outer_key and an inner_key and return the valid value.
498    /// It will not return value hinted by tombstone.
499    pub fn get(&self, outer_key: &str, inner_key: &str) -> Option<&Value> {
500        let inner_map = self.map.get(outer_key).expect("should contain outer key");
501        inner_map
502            .get(inner_key)
503            .and_then(|val| if val.type_id == TOMBSTONE { None } else { Some(val) })
504    }
505
506    /// get_inner_map method will take an outer_key return the outer map.
507    /// The returned outer map will not contain value hinted by tombstone.
508    pub fn get_inner_map(&self, outer_key: &str) -> HashMap<String, Value> {
509        self.map.get(outer_key).map_or_else(HashMap::new, |inner_map| {
510            inner_map
511                .iter()
512                .filter(|(_k, v)| v.type_id != TOMBSTONE)
513                .map(|(k, v)| (k.to_owned(), v.clone()))
514                .collect::<HashMap<String, Value>>()
515        })
516    }
517
518    fn is_tombstoned(&self, outer_key: &str, inner_key: &str) -> bool {
519        self.map.get(outer_key).map_or(false, |inner_map| {
520            inner_map
521                .get(inner_key)
522                .map_or(false, |val| val.type_id == TOMBSTONE)
523        })
524    }
525
526    fn get_internal(&self, outer_key: &str, inner_key: &Option<String>) -> Option<&Value> {
527        if let Some(inner) = inner_key {
528            let inner_map = self.map.get(outer_key).expect("should contain outer key");
529            inner_map.get(inner)
530        } else {
531            self.map_version.get(outer_key)
532        }
533    }
534
535    /// Check if an inner key exists. The tombstoned value will return a false.
536    pub fn contains_key(&self, outer_key: &str, inner_key: &str) -> bool {
537        self.map.get(outer_key).map_or(false, |inner_map| {
538            inner_map
539                .get(inner_key)
540                .map_or(false, |value| value.type_id != TOMBSTONE)
541        })
542    }
543
544    /// Check if an outer_key exists. The tombstoned value will return a false.
545    pub fn contains_outer_key(&self, outer_key: &str) -> bool {
546        self.map.contains_key(outer_key)
547    }
548
549    pub fn is_empty(&self) -> bool {
550        self.map.is_empty()
551    }
552
553    fn insert_is_empty(&self) -> bool {
554        self.insert.is_empty()
555    }
556
557    fn remove_is_empty(&self) -> bool {
558        self.remove.is_empty()
559    }
560
561    fn get_insert_iter(&self) -> Iter<Insert> {
562        self.insert.iter()
563    }
564
565    fn get_remove_iter(&self) -> Iter<Remove> {
566        self.remove.iter()
567    }
568
569    fn increment_map_version(&mut self, outer_key: String) {
570        // the value is just a placeholder, the version information is stored in the Key.
571        self.map_version.entry(outer_key.clone()).or_insert(Value {
572            type_id: "blob".to_owned(),
573            data: vec![0],
574        });
575        let insert = Insert::new(outer_key, None, "blob".to_owned());
576        self.insert.push(insert);
577    }
578}
579
580/// Insert struct is used internally to update the server side of map.
581/// The outer_key and inner_key are combined to identify a value in the nested map.
582/// The composite_key is derived from outer_key and inner_key, which is the actual key that's
583/// stored on the server side.
584/// The type_id is used to identify the type of the value in the map since the value
585/// is just a serialized blob that does not contain any type information.
586pub struct Insert {
587    outer_key: String,
588    inner_key: Option<String>,
589    composite_key: String,
590    type_id: String,
591}
592
593impl Insert {
594    pub fn new(outer_key: String, inner_key: Option<String>, type_id: String) -> Self {
595        let composite_key = if inner_key.is_some() {
596            format!(
597                "{:02}{}/{}",
598                outer_key.len(),
599                outer_key,
600                inner_key.clone().expect("get inner key")
601            )
602        } else {
603            format!("{:02}{}", outer_key.len(), outer_key)
604        };
605
606        Insert {
607            outer_key,
608            inner_key,
609            composite_key,
610            type_id,
611        }
612    }
613}
614
615/// The remove struct is used internally to remove a value from the server side of map.
616/// Unlike the Insert struct, it does not need to have a type_id since we don't care about
617/// the value.
618pub struct Remove {
619    outer_key: String,
620    inner_key: String,
621    composite_key: String,
622}
623
624impl Remove {
625    pub fn new(outer_key: String, inner_key: String) -> Self {
626        Remove {
627            outer_key: outer_key.clone(),
628            inner_key: inner_key.clone(),
629            composite_key: format!("{:02}{}/{}", outer_key.len(), outer_key, inner_key),
630        }
631    }
632}
633
634/// The trait bound for the ValueData
635pub trait ValueData: ValueSerialize + ValueClone + Debug {}
636
637impl<T> ValueData for T where T: 'static + Serialize + DeserializeOwned + Clone + Debug {}
638
639/// Clone trait helper.
640pub trait ValueClone {
641    fn clone_box(&self) -> Box<dyn ValueData>;
642}
643
644impl<T> ValueClone for T
645where
646    T: 'static + ValueData + Clone,
647{
648    fn clone_box(&self) -> Box<dyn ValueData> {
649        Box::new(self.clone())
650    }
651}
652
653impl Clone for Box<dyn ValueData> {
654    fn clone(&self) -> Self {
655        self.clone_box()
656    }
657}
658
659/// Serialize trait helper, we need to serialize the ValueData in Insert struct into Vec<u8>.
660pub trait ValueSerialize {
661    fn serialize_value(
662        &self,
663        seralizer: &mut CborSerializer<&mut Vec<u8>>,
664    ) -> Result<(), serde_cbor::error::Error>;
665}
666
667impl<T> ValueSerialize for T
668where
669    T: Serialize,
670{
671    fn serialize_value(
672        &self,
673        serializer: &mut CborSerializer<&mut Vec<u8>>,
674    ) -> Result<(), serde_cbor::error::Error> {
675        self.serialize(serializer)
676    }
677}
678
679/// Serialize the <dyn ValueData> into the Vec<u8> by using cbor serializer.
680/// This method would be used by the insert method in table_synchronizer.
681pub fn serialize(value: &dyn ValueData) -> Result<Vec<u8>, serde_cbor::error::Error> {
682    let mut vec = Vec::new();
683    value.serialize_value(&mut CborSerializer::new(&mut vec))?;
684    Ok(vec)
685}
686
687/// Deserialize the Value into the type T by using cbor deserializer.
688/// This method would be used by the user after calling get() of table_synchronizer.
689pub fn deserialize_from<T>(reader: &[u8]) -> Result<T, serde_cbor::error::Error>
690where
691    T: DeserializeOwned,
692{
693    serde_cbor::de::from_slice(reader)
694}
695
696async fn conditionally_write<R>(
697    mut updates_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
698    table_synchronizer: &mut Synchronizer,
699    mut retry: i32,
700) -> Result<R, SynchronizerError> {
701    let mut update_result = None;
702
703    while retry > 0 {
704        let map = table_synchronizer.get_outer_map();
705        let map_version = table_synchronizer.get_inner_map_version();
706
707        let mut to_update = Update {
708            map,
709            map_version,
710            insert: Vec::new(),
711            remove: Vec::new(),
712        };
713
714        update_result = Some(updates_generator(&mut to_update)?);
715        debug!("number of insert is {}", to_update.insert.len());
716        if to_update.insert_is_empty() {
717            debug!(
718                "Conditionally Write to {} completed, as there is nothing to update for map",
719                table_synchronizer.get_name()
720            );
721            break;
722        }
723
724        let mut to_send = Vec::new();
725        for update in to_update.get_insert_iter() {
726            let value = to_update
727                .get_internal(&update.outer_key, &update.inner_key)
728                .expect("get the insert data");
729            let key_version = table_synchronizer.get_key_version(&update.outer_key, &update.inner_key);
730
731            to_send.push((&update.composite_key, value, key_version));
732        }
733        let result = table_synchronizer
734            .table_map
735            .insert_conditionally_all(to_send, table_synchronizer.table_segment_offset)
736            .await;
737        match result {
738            Err(TableError::IncorrectKeyVersion { operation, error_msg }) => {
739                debug!("IncorrectKeyVersion {}, {}", operation, error_msg);
740                table_synchronizer.fetch_updates().await.expect("fetch update");
741            }
742            Err(TableError::KeyDoesNotExist { operation, error_msg }) => {
743                debug!("KeyDoesNotExist {}, {}", operation, error_msg);
744                table_synchronizer.fetch_updates().await.expect("fetch update");
745            }
746            Err(e) => {
747                debug!("Error message is {}", e);
748                if retry > 0 {
749                    retry -= 1;
750                    sleep(Duration::from_millis(DELAY_MILLIS)).await;
751                } else {
752                    return Err(SynchronizerError::SyncTableError {
753                        operation: "insert conditionally_all".to_owned(),
754                        source: e,
755                    });
756                }
757            }
758            Ok(res) => {
759                apply_inserts_to_localmap(&mut to_update, res, table_synchronizer);
760                clear_tombstone(&mut to_update, table_synchronizer).await?;
761                break;
762            }
763        }
764    }
765    update_result.ok_or(SynchronizerError::SyncUpdateError {
766        error_msg: "No attempts were made.".into(),
767    })
768}
769
770async fn conditionally_remove<R>(
771    mut delete_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
772    table_synchronizer: &mut Synchronizer,
773    mut retry: i32,
774) -> Result<R, SynchronizerError> {
775    let mut delete_result = None;
776
777    while retry > 0 {
778        let map = table_synchronizer.get_outer_map();
779        let map_version = table_synchronizer.get_inner_map_version();
780
781        let mut to_delete = Update {
782            map,
783            map_version,
784            insert: Vec::new(),
785            remove: Vec::new(),
786        };
787        delete_result = Some(delete_generator(&mut to_delete)?);
788
789        if to_delete.remove_is_empty() {
790            debug!(
791                "Conditionally remove to {} completed, as there is nothing to remove for map",
792                table_synchronizer.get_name()
793            );
794            break;
795        }
796
797        let mut send = Vec::new();
798        for delete in to_delete.get_remove_iter() {
799            let key_version =
800                table_synchronizer.get_key_version(&delete.outer_key, &Some(delete.inner_key.to_owned()));
801            send.push((&delete.composite_key, key_version))
802        }
803
804        let result = table_synchronizer
805            .table_map
806            .remove_conditionally_all(send, table_synchronizer.table_segment_offset)
807            .await;
808
809        match result {
810            Err(TableError::IncorrectKeyVersion { operation, error_msg }) => {
811                debug!("IncorrectKeyVersion {}, {}", operation, error_msg);
812                table_synchronizer.fetch_updates().await.expect("fetch update");
813            }
814            Err(TableError::KeyDoesNotExist { operation, error_msg }) => {
815                debug!("KeyDoesNotExist {}, {}", operation, error_msg);
816                table_synchronizer.fetch_updates().await.expect("fetch update");
817            }
818            Err(e) => {
819                debug!("Error message is {}", e);
820                if retry > 0 {
821                    retry -= 1;
822                    sleep(Duration::from_millis(DELAY_MILLIS)).await;
823                } else {
824                    return Err(SynchronizerError::SyncTableError {
825                        operation: "remove conditionally_all".to_owned(),
826                        source: e,
827                    });
828                }
829            }
830            Ok(()) => {
831                apply_deletes_to_localmap(&mut to_delete, table_synchronizer);
832                break;
833            }
834        }
835    }
836    delete_result.ok_or(SynchronizerError::SyncUpdateError {
837        error_msg: "No attempts were made.".into(),
838    })
839}
840
841async fn clear_tombstone(
842    to_remove: &mut Update,
843    table_synchronizer: &mut Synchronizer,
844) -> Result<(), SynchronizerError> {
845    table_synchronizer
846        .remove(|table| {
847            for remove in to_remove.get_remove_iter() {
848                if table.is_tombstoned(&remove.outer_key, &remove.inner_key) {
849                    table.remove(remove.outer_key.to_owned(), remove.inner_key.to_owned());
850                }
851            }
852            Ok(())
853        })
854        .await
855}
856
857fn apply_inserts_to_localmap(
858    to_update: &mut Update,
859    new_version: Vec<Version>,
860    table_synchronizer: &mut Synchronizer,
861) {
862    let mut i = 0;
863    for update in to_update.get_insert_iter() {
864        if let Some(ref inner_key) = update.inner_key {
865            let new_key = Key {
866                key: inner_key.to_owned(),
867                key_version: *new_version.get(i).expect("get new version"),
868            };
869            let inner_map = to_update.map.get(&update.outer_key).expect("get inner map");
870            let new_value = inner_map.get(inner_key).expect("get the Value").clone();
871
872            let in_mem_inner_map = table_synchronizer
873                .in_memory_map
874                .entry(update.outer_key.clone())
875                .or_default();
876            in_mem_inner_map.insert(new_key, new_value);
877        } else {
878            let new_key = Key {
879                key: update.outer_key.to_owned(),
880                key_version: *new_version.get(i).expect("get new version"),
881            };
882            let new_value = to_update
883                .map_version
884                .get(&update.outer_key)
885                .expect("get the Value")
886                .clone();
887            table_synchronizer
888                .in_memory_map_version
889                .insert(new_key, new_value);
890        }
891        i += 1;
892    }
893    debug!("Updates {} entries in local map ", i);
894}
895
896fn apply_deletes_to_localmap(to_delete: &mut Update, table_synchronizer: &mut Synchronizer) {
897    let mut i = 0;
898    for delete in to_delete.get_remove_iter() {
899        let delete_key = Key {
900            key: delete.inner_key.clone(),
901            key_version: TableKey::KEY_NO_VERSION,
902        };
903        let in_mem_inner_map = table_synchronizer
904            .in_memory_map
905            .entry(delete.outer_key.clone())
906            .or_default();
907        in_mem_inner_map.remove(&delete_key);
908        i += 1;
909    }
910    debug!("Deletes {} entries in local map ", i);
911}
912
913#[cfg(test)]
914mod test {
915    use super::*;
916    use crate::client_factory::ClientFactory;
917    use crate::sync::synchronizer::{deserialize_from, Update};
918    use crate::sync::synchronizer::{serialize, Value};
919    use pravega_client_config::connection_type::{ConnectionType, MockType};
920    use pravega_client_config::ClientConfigBuilder;
921    use pravega_client_shared::PravegaNodeUri;
922    use std::collections::HashMap;
923    use tokio::runtime::Runtime;
924
925    #[test]
926    fn test_intern_key_split() {
927        let key1 = InternalKey {
928            key: "10outer_keys/inner_key".to_owned(),
929        };
930        let (outer, inner) = key1.split();
931        assert_eq!(outer, "outer_keys".to_owned());
932        assert_eq!(inner.expect("should contain inner key"), "inner_key".to_owned());
933
934        let key2 = InternalKey {
935            key: "05outer/inner_key".to_owned(),
936        };
937        let (outer, inner) = key2.split();
938        assert_eq!(outer, "outer".to_owned());
939        assert_eq!(inner.expect("should contain inner key"), "inner_key".to_owned());
940
941        let key3 = InternalKey {
942            key: "05outer".to_owned(),
943        };
944        let (outer, inner) = key3.split();
945        assert_eq!(outer, "outer".to_owned());
946        assert!(inner.is_none());
947    }
948
949    #[test]
950    fn test_insert_keys() {
951        let mut map: HashMap<Key, Value> = HashMap::new();
952        let key1 = Key {
953            key: "a".to_owned(),
954            key_version: 0,
955        };
956        let data = serialize(&"value".to_owned()).expect("serialize");
957        let value1 = Value {
958            type_id: "String".to_owned(),
959            data,
960        };
961
962        let key2 = Key {
963            key: "b".to_owned(),
964            key_version: 0,
965        };
966
967        let data = serialize(&1).expect("serialize");
968        let value2 = Value {
969            type_id: "i32".to_owned(),
970            data,
971        };
972        let result = map.insert(key1, value1);
973        assert!(result.is_none());
974        let result = map.insert(key2, value2);
975        assert!(result.is_none());
976        assert_eq!(map.len(), 2);
977    }
978
979    #[test]
980    fn test_insert_key_with_different_key_version() {
981        let mut map: HashMap<Key, Value> = HashMap::new();
982        let key1 = Key {
983            key: "a".to_owned(),
984            key_version: 0,
985        };
986
987        let data = serialize(&"value".to_owned()).expect("serialize");
988        let value1 = Value {
989            type_id: "String".to_owned(),
990            data,
991        };
992        let key2 = Key {
993            key: "a".to_owned(),
994            key_version: 1,
995        };
996        let data = serialize(&1).expect("serialize");
997        let value2 = Value {
998            type_id: "i32".into(),
999            data,
1000        };
1001
1002        let result = map.insert(key1.clone(), value1);
1003        assert!(result.is_none());
1004        let result = map.insert(key2.clone(), value2);
1005        assert!(result.is_some());
1006        assert_eq!(map.len(), 1);
1007    }
1008
1009    #[test]
1010    fn test_clone_map() {
1011        let mut map: HashMap<Key, Value> = HashMap::new();
1012        let key1 = Key {
1013            key: "a".to_owned(),
1014            key_version: 0,
1015        };
1016
1017        let data = serialize(&"value".to_owned()).expect("serialize");
1018        let value1 = Value {
1019            type_id: "String".to_owned(),
1020            data,
1021        };
1022
1023        let key2 = Key {
1024            key: "a".to_owned(),
1025            key_version: 1,
1026        };
1027
1028        let data = serialize(&1).expect("serialize");
1029        let value2 = Value {
1030            type_id: "i32".to_owned(),
1031            data,
1032        };
1033
1034        map.insert(key1.clone(), value1.clone());
1035        map.insert(key2.clone(), value2.clone());
1036        let new_map = map.clone();
1037        let result = new_map.get(&key1).expect("get value");
1038        assert_eq!(new_map.len(), 1);
1039        assert_eq!(result.clone(), value2);
1040    }
1041
1042    #[test]
1043    fn test_insert_and_get() {
1044        let mut table = Update {
1045            map: HashMap::new(),
1046            map_version: HashMap::new(),
1047            insert: Vec::new(),
1048            remove: Vec::new(),
1049        };
1050        table.insert(
1051            "test_outer".to_owned(),
1052            "test_inner".to_owned(),
1053            "i32".to_owned(),
1054            Box::new(1),
1055        );
1056        let value = table.get("test_outer", "test_inner").expect("get value");
1057        let deserialized_data: i32 = deserialize_from(&value.data).expect("deserialize");
1058        assert_eq!(deserialized_data, 1);
1059    }
1060
1061    #[test]
1062    fn test_integration_with_table_map() {
1063        let rt = Runtime::new().unwrap();
1064        let config = ClientConfigBuilder::default()
1065            .connection_type(ConnectionType::Mock(MockType::Happy))
1066            .mock(true)
1067            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
1068            .build()
1069            .unwrap();
1070        let factory = ClientFactory::new(config);
1071        let scope = Scope {
1072            name: "tableSyncScope".to_string(),
1073        };
1074        let mut sync = rt.block_on(factory.create_synchronizer(scope, "sync".to_string()));
1075        let _: Option<String> = rt
1076            .block_on(sync.insert(|table| {
1077                table.insert(
1078                    "outer_key".to_owned(),
1079                    "inner_key".to_owned(),
1080                    "i32".to_owned(),
1081                    Box::new(1),
1082                );
1083                Ok(None)
1084            }))
1085            .unwrap();
1086        let value_option = sync.get("outer_key", "inner_key");
1087        assert!(value_option.is_some());
1088
1089        let _: Option<String> = rt
1090            .block_on(sync.insert(|table| {
1091                table.insert_tombstone("outer_key".to_owned(), "inner_key".to_owned())?;
1092                Ok(None)
1093            }))
1094            .unwrap();
1095        let value_option = sync.get("outer_key", "inner_key");
1096        assert!(value_option.is_none());
1097    }
1098}