1use crate::client_factory::ClientFactoryAsync;
11use crate::sync::table::{Table, TableError, Version};
12
13use pravega_client_shared::Scope;
14use pravega_wire_protocol::commands::TableKey;
15
16use futures::pin_mut;
17use futures::stream::StreamExt;
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use serde_cbor::ser::Serializer as CborSerializer;
21use serde_cbor::to_vec;
22use snafu::Snafu;
23use std::clone::Clone;
24use std::cmp::{Eq, PartialEq};
25use std::collections::HashMap;
26use std::fmt::Debug;
27use std::hash::{Hash, Hasher};
28use std::slice::Iter;
29use std::time::Duration;
30use tokio::time::sleep;
31use tracing::debug;
32
33#[derive(Debug, Snafu)]
34#[snafu(visibility = "pub(crate)")]
35pub enum SynchronizerError {
36 #[snafu(display(
37 "Synchronizer failed while performing {:?} with table error: {:?}",
38 operation,
39 source
40 ))]
41 SyncTableError { operation: String, source: TableError },
42
43 #[snafu(display("Failed to run update function in table synchronizer due to: {:?}", error_msg))]
44 SyncUpdateError { error_msg: String },
45
46 #[snafu(display("Failed insert tombstone in table synchronizer due to: {:?}", error_msg))]
47 SyncTombstoneError { error_msg: String },
48
49 #[snafu(display("Failed due to Precondition check failure: {:?}", error_msg))]
50 SyncPreconditionError { error_msg: String },
51}
52
53pub struct Synchronizer {
94 name: String,
96
97 table_map: Table,
99
100 in_memory_map: HashMap<String, HashMap<Key, Value>>,
107
108 in_memory_map_version: HashMap<Key, Value>,
112
113 table_segment_offset: i64,
115
116 fetch_position: i64,
118}
119
120const MAX_RETRIES: i32 = 10;
122const DELAY_MILLIS: u64 = 1000;
124
125impl Synchronizer {
126 pub(crate) async fn new(scope: Scope, name: String, factory: ClientFactoryAsync) -> Synchronizer {
127 let table_map = Table::new(scope, name.clone(), factory)
128 .await
129 .expect("create table");
130 Synchronizer {
131 name: name.clone(),
132 table_map,
133 in_memory_map: HashMap::new(),
134 in_memory_map_version: HashMap::new(),
135 table_segment_offset: -1,
136 fetch_position: 0,
137 }
138 }
139
140 pub fn get_outer_map(&self) -> HashMap<String, HashMap<String, Value>> {
143 self.in_memory_map
144 .iter()
145 .map(|(k, v)| {
146 (
147 k.clone(),
148 v.iter()
149 .filter(|(_k2, v2)| v2.type_id != TOMBSTONE)
150 .map(|(k2, v2)| (k2.key.clone(), v2.clone()))
151 .collect::<HashMap<String, Value>>(),
152 )
153 })
154 .collect()
155 }
156
157 pub fn get_inner_map(&self, outer_key: &str) -> HashMap<String, Value> {
160 self.in_memory_map
161 .get(outer_key)
162 .map_or_else(HashMap::new, |inner| {
163 inner
164 .iter()
165 .filter(|(_k, v)| v.type_id != TOMBSTONE)
166 .map(|(k, v)| (k.key.clone(), v.clone()))
167 .collect::<HashMap<String, Value>>()
168 })
169 }
170
171 fn get_inner_map_version(&self) -> HashMap<String, Value> {
172 self.in_memory_map_version
173 .iter()
174 .map(|(k, v)| (k.key.clone(), v.clone()))
175 .collect()
176 }
177
178 pub fn get_name(&self) -> String {
180 self.name.clone()
181 }
182
183 pub fn get(&self, outer_key: &str, inner_key: &str) -> Option<&Value> {
186 let inner_map = self.in_memory_map.get(outer_key)?;
187
188 let search_key_inner = Key {
189 key: inner_key.to_owned(),
190 key_version: TableKey::KEY_NO_VERSION,
191 };
192
193 inner_map.get(&search_key_inner).and_then(
194 |val| {
195 if val.type_id == TOMBSTONE {
196 None
197 } else {
198 Some(val)
199 }
200 },
201 )
202 }
203
204 pub fn get_key_version(&self, outer_key: &str, inner_key: &Option<String>) -> Version {
206 if let Some(inner) = inner_key {
207 let search_key = Key {
208 key: inner.to_owned(),
209 key_version: TableKey::KEY_NO_VERSION,
210 };
211 if let Some(inner_map) = self.in_memory_map.get(outer_key) {
212 if let Some((key, _value)) = inner_map.get_key_value(&search_key) {
213 return key.key_version;
214 }
215 }
216 TableKey::KEY_NOT_EXISTS
217 } else {
218 let search_key = Key {
219 key: outer_key.to_owned(),
220 key_version: TableKey::KEY_NO_VERSION,
221 };
222 if let Some((key, _value)) = self.in_memory_map_version.get_key_value(&search_key) {
223 key.key_version
224 } else {
225 TableKey::KEY_NOT_EXISTS
226 }
227 }
228 }
229
230 fn get_key_value(&self, outer_key: &str, inner_key: &str) -> Option<(String, Value)> {
233 let inner_map = self.in_memory_map.get(outer_key)?;
234
235 let search_key = Key {
236 key: inner_key.to_owned(),
237 key_version: TableKey::KEY_NO_VERSION,
238 };
239
240 if let Some((key, value)) = inner_map.get_key_value(&search_key) {
241 Some((key.key.clone(), value.clone()))
242 } else {
243 None
244 }
245 }
246
247 pub async fn fetch_updates(&mut self) -> Result<i32, TableError> {
249 debug!(
250 "fetch the latest map and apply to the local map, fetch from position {}",
251 self.fetch_position
252 );
253 let reply = self
254 .table_map
255 .read_entries_stream_from_position(10, self.fetch_position);
256 pin_mut!(reply);
257
258 let mut counter: i32 = 0;
259 while let Some(entry) = reply.next().await {
260 let (k, v, version, last_position) = entry?;
261 debug!("fetched key with version {}", version);
262 let internal_key = InternalKey { key: k };
263 let (outer_key, inner_key) = internal_key.split();
264
265 if let Some(inner) = inner_key {
266 let inner_map_key = Key {
268 key: inner,
269 key_version: version,
270 };
271 let inner_map = self.in_memory_map.entry(outer_key).or_default();
272
273 inner_map.remove(&inner_map_key);
275 inner_map.insert(inner_map_key, v);
276 } else {
277 let outer_map_key = Key {
279 key: outer_key,
280 key_version: version,
281 };
282 self.in_memory_map_version.remove(&outer_map_key.clone());
284 self.in_memory_map_version.insert(outer_map_key, v);
285 }
286 self.fetch_position = last_position;
287 counter += 1;
288 }
289 debug!("finished fetching updates");
290 Ok(counter)
291 }
292
293 pub async fn insert<R>(
296 &mut self,
297 updates_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
298 ) -> Result<R, SynchronizerError> {
299 conditionally_write(updates_generator, self, MAX_RETRIES).await
300 }
301
302 pub async fn remove<R>(
305 &mut self,
306 deletes_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
307 ) -> Result<R, SynchronizerError> {
308 conditionally_remove(deletes_generator, self, MAX_RETRIES).await
309 }
310}
311
312#[derive(Debug, Clone)]
316pub struct Key {
317 pub key: String,
318 pub key_version: Version,
319}
320
321impl PartialEq for Key {
322 fn eq(&self, other: &Self) -> bool {
323 self.key == other.key
324 }
325}
326
327impl Eq for Key {}
328
329impl Hash for Key {
330 fn hash<H: Hasher>(&self, state: &mut H) {
331 self.key.hash(state)
332 }
333}
334
335const PREFIX_LENGTH: usize = 2;
336
337struct InternalKey {
339 pub key: String,
340}
341
342impl InternalKey {
343 fn split(&self) -> (String, Option<String>) {
344 let outer_name_length: usize = self.key[..PREFIX_LENGTH].parse().expect("parse prefix length");
345 assert!(self.key.len() >= PREFIX_LENGTH + outer_name_length);
346 let outer = self.key[PREFIX_LENGTH..PREFIX_LENGTH + outer_name_length]
347 .parse::<String>()
348 .expect("parse outer key");
349
350 if self.key.len() > PREFIX_LENGTH + outer_name_length {
351 let inner = self.key[PREFIX_LENGTH + outer_name_length + 1..]
353 .parse::<String>()
354 .expect("parse inner key");
355 (outer, Some(inner))
356 } else {
357 (outer, None)
358 }
359 }
360}
361
362#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
366pub struct Value {
367 pub type_id: String,
368 pub data: Vec<u8>,
369}
370
371pub const TOMBSTONE: &str = "tombstone";
372
373#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
374struct Tombstone {}
375
376pub struct Update {
381 map: HashMap<String, HashMap<String, Value>>,
382 map_version: HashMap<String, Value>,
383 insert: Vec<Insert>,
384 remove: Vec<Remove>,
385}
386
387impl Update {
388 pub fn new(
389 map: HashMap<String, HashMap<String, Value>>,
390 map_version: HashMap<String, Value>,
391 insert: Vec<Insert>,
392 remove: Vec<Remove>,
393 ) -> Self {
394 Update {
395 map,
396 map_version,
397 insert,
398 remove,
399 }
400 }
401
402 pub fn insert(
405 &mut self,
406 outer_key: String,
407 inner_key: String,
408 type_id: String,
409 new_value: Box<dyn ValueData>,
410 ) {
411 let data = serialize(&*new_value).expect("serialize value");
412 let insert = Insert::new(outer_key.clone(), Some(inner_key.clone()), type_id.clone());
413
414 self.insert.push(insert);
415 let inner_map = self.map.entry(outer_key.clone()).or_default();
417 inner_map.insert(inner_key, Value { type_id, data });
418
419 self.increment_map_version(outer_key);
421 }
422
423 pub fn insert_tombstone(
427 &mut self,
428 outer_key: String,
429 inner_key: String,
430 ) -> Result<(), SynchronizerError> {
431 let data = to_vec(&Tombstone {}).expect("serialize tombstone");
432 let insert = Insert::new(outer_key.clone(), Some(inner_key.clone()), "tombstone".to_owned());
433
434 self.insert.push(insert);
435
436 let inner_map = self
437 .map
438 .get_mut(&outer_key)
439 .ok_or(SynchronizerError::SyncTombstoneError {
440 error_msg: format!("outer key {} does not exist", outer_key),
441 })?;
442
443 inner_map.get(&inner_key).map_or(
444 Err(SynchronizerError::SyncTombstoneError {
445 error_msg: format!("inner key {} does not exist", inner_key),
446 }),
447 |v| {
448 if v.type_id == TOMBSTONE {
449 Err(SynchronizerError::SyncTombstoneError {
450 error_msg: format!(
451 "tombstone has already been added for key {}/{}",
452 outer_key, inner_key
453 ),
454 })
455 } else {
456 Ok(())
457 }
458 },
459 )?;
460
461 inner_map.insert(
462 inner_key.clone(),
463 Value {
464 type_id: TOMBSTONE.to_owned(),
465 data,
466 },
467 );
468
469 self.increment_map_version(outer_key.clone());
470
471 let remove = Remove::new(outer_key.clone(), inner_key);
473 self.remove.push(remove);
474
475 Ok(())
476 }
477
478 fn remove(&mut self, outer_key: String, inner_key: String) {
480 let inner_map = self.map.get_mut(&outer_key).expect("should contain outer key");
482 inner_map.remove(&inner_key);
483
484 let remove = Remove::new(outer_key.clone(), inner_key);
485 self.remove.push(remove);
486 }
487
488 pub fn retain(&mut self, outer_key: String) {
494 self.increment_map_version(outer_key);
495 }
496
497 pub fn get(&self, outer_key: &str, inner_key: &str) -> Option<&Value> {
500 let inner_map = self.map.get(outer_key).expect("should contain outer key");
501 inner_map
502 .get(inner_key)
503 .and_then(|val| if val.type_id == TOMBSTONE { None } else { Some(val) })
504 }
505
506 pub fn get_inner_map(&self, outer_key: &str) -> HashMap<String, Value> {
509 self.map.get(outer_key).map_or_else(HashMap::new, |inner_map| {
510 inner_map
511 .iter()
512 .filter(|(_k, v)| v.type_id != TOMBSTONE)
513 .map(|(k, v)| (k.to_owned(), v.clone()))
514 .collect::<HashMap<String, Value>>()
515 })
516 }
517
518 fn is_tombstoned(&self, outer_key: &str, inner_key: &str) -> bool {
519 self.map.get(outer_key).map_or(false, |inner_map| {
520 inner_map
521 .get(inner_key)
522 .map_or(false, |val| val.type_id == TOMBSTONE)
523 })
524 }
525
526 fn get_internal(&self, outer_key: &str, inner_key: &Option<String>) -> Option<&Value> {
527 if let Some(inner) = inner_key {
528 let inner_map = self.map.get(outer_key).expect("should contain outer key");
529 inner_map.get(inner)
530 } else {
531 self.map_version.get(outer_key)
532 }
533 }
534
535 pub fn contains_key(&self, outer_key: &str, inner_key: &str) -> bool {
537 self.map.get(outer_key).map_or(false, |inner_map| {
538 inner_map
539 .get(inner_key)
540 .map_or(false, |value| value.type_id != TOMBSTONE)
541 })
542 }
543
544 pub fn contains_outer_key(&self, outer_key: &str) -> bool {
546 self.map.contains_key(outer_key)
547 }
548
549 pub fn is_empty(&self) -> bool {
550 self.map.is_empty()
551 }
552
553 fn insert_is_empty(&self) -> bool {
554 self.insert.is_empty()
555 }
556
557 fn remove_is_empty(&self) -> bool {
558 self.remove.is_empty()
559 }
560
561 fn get_insert_iter(&self) -> Iter<Insert> {
562 self.insert.iter()
563 }
564
565 fn get_remove_iter(&self) -> Iter<Remove> {
566 self.remove.iter()
567 }
568
569 fn increment_map_version(&mut self, outer_key: String) {
570 self.map_version.entry(outer_key.clone()).or_insert(Value {
572 type_id: "blob".to_owned(),
573 data: vec![0],
574 });
575 let insert = Insert::new(outer_key, None, "blob".to_owned());
576 self.insert.push(insert);
577 }
578}
579
580pub struct Insert {
587 outer_key: String,
588 inner_key: Option<String>,
589 composite_key: String,
590 type_id: String,
591}
592
593impl Insert {
594 pub fn new(outer_key: String, inner_key: Option<String>, type_id: String) -> Self {
595 let composite_key = if inner_key.is_some() {
596 format!(
597 "{:02}{}/{}",
598 outer_key.len(),
599 outer_key,
600 inner_key.clone().expect("get inner key")
601 )
602 } else {
603 format!("{:02}{}", outer_key.len(), outer_key)
604 };
605
606 Insert {
607 outer_key,
608 inner_key,
609 composite_key,
610 type_id,
611 }
612 }
613}
614
615pub struct Remove {
619 outer_key: String,
620 inner_key: String,
621 composite_key: String,
622}
623
624impl Remove {
625 pub fn new(outer_key: String, inner_key: String) -> Self {
626 Remove {
627 outer_key: outer_key.clone(),
628 inner_key: inner_key.clone(),
629 composite_key: format!("{:02}{}/{}", outer_key.len(), outer_key, inner_key),
630 }
631 }
632}
633
634pub trait ValueData: ValueSerialize + ValueClone + Debug {}
636
637impl<T> ValueData for T where T: 'static + Serialize + DeserializeOwned + Clone + Debug {}
638
639pub trait ValueClone {
641 fn clone_box(&self) -> Box<dyn ValueData>;
642}
643
644impl<T> ValueClone for T
645where
646 T: 'static + ValueData + Clone,
647{
648 fn clone_box(&self) -> Box<dyn ValueData> {
649 Box::new(self.clone())
650 }
651}
652
653impl Clone for Box<dyn ValueData> {
654 fn clone(&self) -> Self {
655 self.clone_box()
656 }
657}
658
659pub trait ValueSerialize {
661 fn serialize_value(
662 &self,
663 seralizer: &mut CborSerializer<&mut Vec<u8>>,
664 ) -> Result<(), serde_cbor::error::Error>;
665}
666
667impl<T> ValueSerialize for T
668where
669 T: Serialize,
670{
671 fn serialize_value(
672 &self,
673 serializer: &mut CborSerializer<&mut Vec<u8>>,
674 ) -> Result<(), serde_cbor::error::Error> {
675 self.serialize(serializer)
676 }
677}
678
679pub fn serialize(value: &dyn ValueData) -> Result<Vec<u8>, serde_cbor::error::Error> {
682 let mut vec = Vec::new();
683 value.serialize_value(&mut CborSerializer::new(&mut vec))?;
684 Ok(vec)
685}
686
687pub fn deserialize_from<T>(reader: &[u8]) -> Result<T, serde_cbor::error::Error>
690where
691 T: DeserializeOwned,
692{
693 serde_cbor::de::from_slice(reader)
694}
695
696async fn conditionally_write<R>(
697 mut updates_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
698 table_synchronizer: &mut Synchronizer,
699 mut retry: i32,
700) -> Result<R, SynchronizerError> {
701 let mut update_result = None;
702
703 while retry > 0 {
704 let map = table_synchronizer.get_outer_map();
705 let map_version = table_synchronizer.get_inner_map_version();
706
707 let mut to_update = Update {
708 map,
709 map_version,
710 insert: Vec::new(),
711 remove: Vec::new(),
712 };
713
714 update_result = Some(updates_generator(&mut to_update)?);
715 debug!("number of insert is {}", to_update.insert.len());
716 if to_update.insert_is_empty() {
717 debug!(
718 "Conditionally Write to {} completed, as there is nothing to update for map",
719 table_synchronizer.get_name()
720 );
721 break;
722 }
723
724 let mut to_send = Vec::new();
725 for update in to_update.get_insert_iter() {
726 let value = to_update
727 .get_internal(&update.outer_key, &update.inner_key)
728 .expect("get the insert data");
729 let key_version = table_synchronizer.get_key_version(&update.outer_key, &update.inner_key);
730
731 to_send.push((&update.composite_key, value, key_version));
732 }
733 let result = table_synchronizer
734 .table_map
735 .insert_conditionally_all(to_send, table_synchronizer.table_segment_offset)
736 .await;
737 match result {
738 Err(TableError::IncorrectKeyVersion { operation, error_msg }) => {
739 debug!("IncorrectKeyVersion {}, {}", operation, error_msg);
740 table_synchronizer.fetch_updates().await.expect("fetch update");
741 }
742 Err(TableError::KeyDoesNotExist { operation, error_msg }) => {
743 debug!("KeyDoesNotExist {}, {}", operation, error_msg);
744 table_synchronizer.fetch_updates().await.expect("fetch update");
745 }
746 Err(e) => {
747 debug!("Error message is {}", e);
748 if retry > 0 {
749 retry -= 1;
750 sleep(Duration::from_millis(DELAY_MILLIS)).await;
751 } else {
752 return Err(SynchronizerError::SyncTableError {
753 operation: "insert conditionally_all".to_owned(),
754 source: e,
755 });
756 }
757 }
758 Ok(res) => {
759 apply_inserts_to_localmap(&mut to_update, res, table_synchronizer);
760 clear_tombstone(&mut to_update, table_synchronizer).await?;
761 break;
762 }
763 }
764 }
765 update_result.ok_or(SynchronizerError::SyncUpdateError {
766 error_msg: "No attempts were made.".into(),
767 })
768}
769
770async fn conditionally_remove<R>(
771 mut delete_generator: impl FnMut(&mut Update) -> Result<R, SynchronizerError>,
772 table_synchronizer: &mut Synchronizer,
773 mut retry: i32,
774) -> Result<R, SynchronizerError> {
775 let mut delete_result = None;
776
777 while retry > 0 {
778 let map = table_synchronizer.get_outer_map();
779 let map_version = table_synchronizer.get_inner_map_version();
780
781 let mut to_delete = Update {
782 map,
783 map_version,
784 insert: Vec::new(),
785 remove: Vec::new(),
786 };
787 delete_result = Some(delete_generator(&mut to_delete)?);
788
789 if to_delete.remove_is_empty() {
790 debug!(
791 "Conditionally remove to {} completed, as there is nothing to remove for map",
792 table_synchronizer.get_name()
793 );
794 break;
795 }
796
797 let mut send = Vec::new();
798 for delete in to_delete.get_remove_iter() {
799 let key_version =
800 table_synchronizer.get_key_version(&delete.outer_key, &Some(delete.inner_key.to_owned()));
801 send.push((&delete.composite_key, key_version))
802 }
803
804 let result = table_synchronizer
805 .table_map
806 .remove_conditionally_all(send, table_synchronizer.table_segment_offset)
807 .await;
808
809 match result {
810 Err(TableError::IncorrectKeyVersion { operation, error_msg }) => {
811 debug!("IncorrectKeyVersion {}, {}", operation, error_msg);
812 table_synchronizer.fetch_updates().await.expect("fetch update");
813 }
814 Err(TableError::KeyDoesNotExist { operation, error_msg }) => {
815 debug!("KeyDoesNotExist {}, {}", operation, error_msg);
816 table_synchronizer.fetch_updates().await.expect("fetch update");
817 }
818 Err(e) => {
819 debug!("Error message is {}", e);
820 if retry > 0 {
821 retry -= 1;
822 sleep(Duration::from_millis(DELAY_MILLIS)).await;
823 } else {
824 return Err(SynchronizerError::SyncTableError {
825 operation: "remove conditionally_all".to_owned(),
826 source: e,
827 });
828 }
829 }
830 Ok(()) => {
831 apply_deletes_to_localmap(&mut to_delete, table_synchronizer);
832 break;
833 }
834 }
835 }
836 delete_result.ok_or(SynchronizerError::SyncUpdateError {
837 error_msg: "No attempts were made.".into(),
838 })
839}
840
841async fn clear_tombstone(
842 to_remove: &mut Update,
843 table_synchronizer: &mut Synchronizer,
844) -> Result<(), SynchronizerError> {
845 table_synchronizer
846 .remove(|table| {
847 for remove in to_remove.get_remove_iter() {
848 if table.is_tombstoned(&remove.outer_key, &remove.inner_key) {
849 table.remove(remove.outer_key.to_owned(), remove.inner_key.to_owned());
850 }
851 }
852 Ok(())
853 })
854 .await
855}
856
857fn apply_inserts_to_localmap(
858 to_update: &mut Update,
859 new_version: Vec<Version>,
860 table_synchronizer: &mut Synchronizer,
861) {
862 let mut i = 0;
863 for update in to_update.get_insert_iter() {
864 if let Some(ref inner_key) = update.inner_key {
865 let new_key = Key {
866 key: inner_key.to_owned(),
867 key_version: *new_version.get(i).expect("get new version"),
868 };
869 let inner_map = to_update.map.get(&update.outer_key).expect("get inner map");
870 let new_value = inner_map.get(inner_key).expect("get the Value").clone();
871
872 let in_mem_inner_map = table_synchronizer
873 .in_memory_map
874 .entry(update.outer_key.clone())
875 .or_default();
876 in_mem_inner_map.insert(new_key, new_value);
877 } else {
878 let new_key = Key {
879 key: update.outer_key.to_owned(),
880 key_version: *new_version.get(i).expect("get new version"),
881 };
882 let new_value = to_update
883 .map_version
884 .get(&update.outer_key)
885 .expect("get the Value")
886 .clone();
887 table_synchronizer
888 .in_memory_map_version
889 .insert(new_key, new_value);
890 }
891 i += 1;
892 }
893 debug!("Updates {} entries in local map ", i);
894}
895
896fn apply_deletes_to_localmap(to_delete: &mut Update, table_synchronizer: &mut Synchronizer) {
897 let mut i = 0;
898 for delete in to_delete.get_remove_iter() {
899 let delete_key = Key {
900 key: delete.inner_key.clone(),
901 key_version: TableKey::KEY_NO_VERSION,
902 };
903 let in_mem_inner_map = table_synchronizer
904 .in_memory_map
905 .entry(delete.outer_key.clone())
906 .or_default();
907 in_mem_inner_map.remove(&delete_key);
908 i += 1;
909 }
910 debug!("Deletes {} entries in local map ", i);
911}
912
913#[cfg(test)]
914mod test {
915 use super::*;
916 use crate::client_factory::ClientFactory;
917 use crate::sync::synchronizer::{deserialize_from, Update};
918 use crate::sync::synchronizer::{serialize, Value};
919 use pravega_client_config::connection_type::{ConnectionType, MockType};
920 use pravega_client_config::ClientConfigBuilder;
921 use pravega_client_shared::PravegaNodeUri;
922 use std::collections::HashMap;
923 use tokio::runtime::Runtime;
924
925 #[test]
926 fn test_intern_key_split() {
927 let key1 = InternalKey {
928 key: "10outer_keys/inner_key".to_owned(),
929 };
930 let (outer, inner) = key1.split();
931 assert_eq!(outer, "outer_keys".to_owned());
932 assert_eq!(inner.expect("should contain inner key"), "inner_key".to_owned());
933
934 let key2 = InternalKey {
935 key: "05outer/inner_key".to_owned(),
936 };
937 let (outer, inner) = key2.split();
938 assert_eq!(outer, "outer".to_owned());
939 assert_eq!(inner.expect("should contain inner key"), "inner_key".to_owned());
940
941 let key3 = InternalKey {
942 key: "05outer".to_owned(),
943 };
944 let (outer, inner) = key3.split();
945 assert_eq!(outer, "outer".to_owned());
946 assert!(inner.is_none());
947 }
948
949 #[test]
950 fn test_insert_keys() {
951 let mut map: HashMap<Key, Value> = HashMap::new();
952 let key1 = Key {
953 key: "a".to_owned(),
954 key_version: 0,
955 };
956 let data = serialize(&"value".to_owned()).expect("serialize");
957 let value1 = Value {
958 type_id: "String".to_owned(),
959 data,
960 };
961
962 let key2 = Key {
963 key: "b".to_owned(),
964 key_version: 0,
965 };
966
967 let data = serialize(&1).expect("serialize");
968 let value2 = Value {
969 type_id: "i32".to_owned(),
970 data,
971 };
972 let result = map.insert(key1, value1);
973 assert!(result.is_none());
974 let result = map.insert(key2, value2);
975 assert!(result.is_none());
976 assert_eq!(map.len(), 2);
977 }
978
979 #[test]
980 fn test_insert_key_with_different_key_version() {
981 let mut map: HashMap<Key, Value> = HashMap::new();
982 let key1 = Key {
983 key: "a".to_owned(),
984 key_version: 0,
985 };
986
987 let data = serialize(&"value".to_owned()).expect("serialize");
988 let value1 = Value {
989 type_id: "String".to_owned(),
990 data,
991 };
992 let key2 = Key {
993 key: "a".to_owned(),
994 key_version: 1,
995 };
996 let data = serialize(&1).expect("serialize");
997 let value2 = Value {
998 type_id: "i32".into(),
999 data,
1000 };
1001
1002 let result = map.insert(key1.clone(), value1);
1003 assert!(result.is_none());
1004 let result = map.insert(key2.clone(), value2);
1005 assert!(result.is_some());
1006 assert_eq!(map.len(), 1);
1007 }
1008
1009 #[test]
1010 fn test_clone_map() {
1011 let mut map: HashMap<Key, Value> = HashMap::new();
1012 let key1 = Key {
1013 key: "a".to_owned(),
1014 key_version: 0,
1015 };
1016
1017 let data = serialize(&"value".to_owned()).expect("serialize");
1018 let value1 = Value {
1019 type_id: "String".to_owned(),
1020 data,
1021 };
1022
1023 let key2 = Key {
1024 key: "a".to_owned(),
1025 key_version: 1,
1026 };
1027
1028 let data = serialize(&1).expect("serialize");
1029 let value2 = Value {
1030 type_id: "i32".to_owned(),
1031 data,
1032 };
1033
1034 map.insert(key1.clone(), value1.clone());
1035 map.insert(key2.clone(), value2.clone());
1036 let new_map = map.clone();
1037 let result = new_map.get(&key1).expect("get value");
1038 assert_eq!(new_map.len(), 1);
1039 assert_eq!(result.clone(), value2);
1040 }
1041
1042 #[test]
1043 fn test_insert_and_get() {
1044 let mut table = Update {
1045 map: HashMap::new(),
1046 map_version: HashMap::new(),
1047 insert: Vec::new(),
1048 remove: Vec::new(),
1049 };
1050 table.insert(
1051 "test_outer".to_owned(),
1052 "test_inner".to_owned(),
1053 "i32".to_owned(),
1054 Box::new(1),
1055 );
1056 let value = table.get("test_outer", "test_inner").expect("get value");
1057 let deserialized_data: i32 = deserialize_from(&value.data).expect("deserialize");
1058 assert_eq!(deserialized_data, 1);
1059 }
1060
1061 #[test]
1062 fn test_integration_with_table_map() {
1063 let rt = Runtime::new().unwrap();
1064 let config = ClientConfigBuilder::default()
1065 .connection_type(ConnectionType::Mock(MockType::Happy))
1066 .mock(true)
1067 .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
1068 .build()
1069 .unwrap();
1070 let factory = ClientFactory::new(config);
1071 let scope = Scope {
1072 name: "tableSyncScope".to_string(),
1073 };
1074 let mut sync = rt.block_on(factory.create_synchronizer(scope, "sync".to_string()));
1075 let _: Option<String> = rt
1076 .block_on(sync.insert(|table| {
1077 table.insert(
1078 "outer_key".to_owned(),
1079 "inner_key".to_owned(),
1080 "i32".to_owned(),
1081 Box::new(1),
1082 );
1083 Ok(None)
1084 }))
1085 .unwrap();
1086 let value_option = sync.get("outer_key", "inner_key");
1087 assert!(value_option.is_some());
1088
1089 let _: Option<String> = rt
1090 .block_on(sync.insert(|table| {
1091 table.insert_tombstone("outer_key".to_owned(), "inner_key".to_owned())?;
1092 Ok(None)
1093 }))
1094 .unwrap();
1095 let value_option = sync.get("outer_key", "inner_key");
1096 assert!(value_option.is_none());
1097 }
1098}