mambo_map/
mambo.rs

1use std::{
2    mem,
3    sync::{Arc, Mutex, RwLock, TryLockError},
4};
5pub const PRIME_NUMBERS_TO_TAB: [u64; 64] = [
6    0x2,                    //default
7    0x5,                    //0
8    0x7,                    //1
9    0xb_u64,                // 2
10    0x17_u64,               // 3
11    0x2f_u64,               // 4
12    0x61_u64,               // 5
13    0xc7_u64,               // 6
14    0x199_u64,              // 7
15    0x337_u64,              // 8
16    0x6cd_u64,              // 9
17    0xd8d_u64,              // 10
18    0x1b25_u64,             // 11
19    0x36d1_u64,             // 12
20    0x6efb_u64,             // 13
21    0xe0d5_u64,             // 14
22    0x1c7fb_u64,            // 15
23    0x39d61_u64,            // 16
24    0x75671_u64,            // 17
25    0xee5f1_u64,            // 18
26    0x1e40a3_u64,           // 19
27    0x3d6eaf_u64,           // 20
28    0x7cbf17_u64,           // 21
29    0xfd51f9_u64,           // 22
30    0x2026a59_u64,          // 23
31    0x4149f67_u64,          // 24
32    0x8495051_u64,          // 25
33    0x10d3c05f_u64,         // 26
34    0x222bc111_u64,         // 27
35    0x45641269_u64,         // 28
36    0x8ce98529_u64,         // 29
37    0xfffffffb_u64,         // 30
38    0x1fffffff7_u64,        // 31
39    0x3ffffffd7_u64,        // 32
40    0x7ffffffe1_u64,        // 33
41    0xffffffffb_u64,        // 34
42    0x1fffffffe7_u64,       // 35
43    0x3fffffffd3_u64,       // 36
44    0x7ffffffff9_u64,       // 37
45    0xffffffffa9_u64,       // 38
46    0x1ffffffffeb_u64,      // 39
47    0x3fffffffff5_u64,      // 40
48    0x7ffffffffc7_u64,      // 41
49    0xfffffffffef_u64,      // 42
50    0x1fffffffffc9_u64,     // 43
51    0x3fffffffffeb_u64,     // 44
52    0x7fffffffff8d_u64,     // 45
53    0xffffffffffc5_u64,     // 46
54    0x1ffffffffffaf_u64,    // 47
55    0x3ffffffffffe5_u64,    // 48
56    0x7ffffffffff7f_u64,    // 49
57    0xfffffffffffd1_u64,    // 50
58    0x1fffffffffff91_u64,   // 51
59    0x3fffffffffffdf_u64,   // 52
60    0x7fffffffffffc9_u64,   // 53
61    0xfffffffffffffb_u64,   // 54
62    0x1fffffffffffff3_u64,  // 55
63    0x3ffffffffffffe5_u64,  // 56
64    0x7ffffffffffffc9_u64,  // 57
65    0xfffffffffffffa3_u64,  // 58
66    0x1fffffffffffffff_u64, // 59
67    0x3fffffffffffffc7_u64, // 60
68    0x7fffffffffffffe7_u64, // 61
69    0xffffffffffffffc5_u64, // 62
70];
71
72const EPSILON: f32 = 0.0001;
73const SHRINK_THRESHOLD_FACTOR: f32 = 1.25;
74const WASMOVED: bool = false;
75const ONPLACE: bool = true;
76const ELEM_NUMS_TO_READ_IN_MUTEX: u64 = 32;
77type ElemBox<T> = (T, u64);
78type Vecta<T> = Mutex<(bool, Vec<ElemBox<T>>)>;
79type Shard<T> = (
80    Mutex<u32>,   //locler os resize
81    Mutex<usize>, //elems in srard
82    RwLock<[Box<[Vecta<T>]>; 2]>,
83);
84
85enum WhatIdo {
86    READ,
87    REMOVE,
88    INSERT,
89}
90
91impl PartialEq for WhatIdo {
92    fn eq(&self, other: &Self) -> bool {
93        match (self, other) {
94            (WhatIdo::INSERT, WhatIdo::INSERT) => true,
95            (WhatIdo::REMOVE, WhatIdo::REMOVE) => true,
96            (WhatIdo::READ, WhatIdo::READ) => true,
97            _ => false,
98        }
99    }
100}
101///hashtable
102pub struct Mambo<T: Clone> {
103    data_arc: Arc<(f32, Box<[Shard<T>]>)>,
104    how_edit_elem_without_update: Box<[i64]>,
105    usize_smaller_u64: bool,
106}
107
108impl<T: Clone> Mambo<T> {
109    ///  The num_shards table is divided into independent sections for optimization,
110    ///  the optimal value is num_shards = the number of cores on your processor for multithreaded operation.
111    ///  The redream_factor can be from 1.0 to 10.0, depending on how Mambo
112    ///  is planned to be applied. ->10.0 when a lot of elements are planned (more than 10_000 ).
113    ///  if there are few elements, about 1000 or less, it is better to use values tending to ->1.0.
114    ///  At ->10, memory consumption per element decreases (depending on the system, at 1.0,
115    ///  memory costs per element range from 32 to 64 bytes of overhead memory,
116    ///  at ->10.0 per 1 element costs are reduced to ~10 bytes per element)
117    pub fn new(num_shards: usize, redream_factor: f32) -> Result<Self, &'static str> {
118        if !(1.0..11.0).contains(&redream_factor) {
119            return Err("(1.0..11.0).contains( redream_factor)");
120        }
121        let shards = (0..num_shards)
122            .map(|_| {
123                let data: Box<[Vecta<T>]> = (0..PRIME_NUMBERS_TO_TAB[0])
124                    .map(|_| Mutex::new((ONPLACE, vec![])))
125                    .collect();
126
127                (
128                    Mutex::new(0_u32),
129                    Mutex::new(0_usize),
130                    RwLock::new([data, vec![].into_boxed_slice()]),
131                )
132            })
133            .collect();
134        Ok(Self {
135            data_arc: Arc::new((redream_factor, shards)),
136            how_edit_elem_without_update: vec![0; num_shards].into_boxed_slice(),
137            usize_smaller_u64: (usize::MAX as u64) < (u64::MAX as u64),
138        })
139    }
140
141    ///!!BE SURE TO USE .unwrap() PANIC BECAUSE!!:
142    /// -
143    /// errors during shard resizing are fatal. and in most cases,
144    /// it is impossible to restore data integrity
145    // in the shard if an error occurred during self.resize_shard.
146    fn resize_shard(&self, shard_index: usize, new_len: usize) -> Result<bool, &'static str> {
147        let shard = self
148            .data_arc
149            .1
150            .get(shard_index)
151            .ok_or("shard_index out of range")?;
152        /*locker_resizer is needed for a global announcement that the size of the current shard is currently
153        being edited. if locker_resizer is currently blocked, then other functions donot need to resize this shard.
154        !NOTE!
155        some compilers with aggressive optimization parameters may see that the data in locker_resizer
156        is not being used and may remove the lock call to locker_resizer, which will lead to UB.
157        therefore, garbage is written to locker_resizer, which does not affect the operation of the program.*/
158        let mut locker_resizer = match shard.0.try_lock() {
159            Ok(guard) => guard,
160            Err(TryLockError::WouldBlock) => {
161                return Ok(false);
162            }
163            Err(TryLockError::Poisoned(err)) => {
164                panic!("Mutex poisoned: {:?}", err);
165            }
166        };
167        *locker_resizer = locker_resizer.wrapping_add(1);
168
169        let _ = {
170            //non lock read
171            /*
172            checking that the vector number 0 has a length, which means that it is initialized,
173            it must be of non-zero length. the vector number 1 has a length of 0. If it has a length of 0,
174            it means that an error has occurred in the program or the resize_shard
175            function is currently activated in another thread, which is not possible. In real work,
176            this check should always end positively, and is needed to simplify
177            the detection of errors during development.*/
178            let v0_len = shard.2.read().unwrap()[0].len();
179            let v1_len = shard.2.read().unwrap()[1].len();
180
181            if v0_len > 0 && v1_len == 0 {
182                v0_len
183            } else if v1_len > 0 && v0_len > 0 {
184                return Err("vec0 and vec1 > 0 ");
185            } else {
186                return Err("vec0 and vec1 == 0 ");
187            }
188        };
189
190        {
191            //a new vector is created, with a length of PRIME_NUMBERS_TO_TAB[new_index_in_prime]
192            let new_vec_box: Box<[Vecta<T>]> = (0..new_len)
193                .map(|_| Mutex::new((ONPLACE, vec![])))
194                .collect();
195            //write lock edits putting a new vector in the Rwlock
196            let mut w_rlock = shard.2.write().unwrap();
197            if w_rlock[1].len() == 0 {
198                w_rlock[1] = new_vec_box;
199            } else {
200                return Err("vec 1 is resizes in this moment");
201            }
202            //println!("vec in rw 1len {}", w_rlock[0].len());
203        }
204        {
205            //read non lock
206            let r_lock = shard.2.read().unwrap();
207            let v0_old = &r_lock[0];
208            let v1_new = &r_lock[1];
209            //iterating through the elements in the old vector and moving the elements to the new one
210            //let mut elems_in_shard: usize = 0;
211            for x in v0_old.iter() {
212                let mut temp_old_mutexer = x.lock().unwrap();
213                /*if the elements were moved before they were moved, an error is triggered.
214                since the elements are moved only during copying, and if they have been moved and some other
215                function is also moving them at the moment, this should not be possible,
216                since locker_resizer must ensure that only one fn resize_shard is active at
217                one time for the current shard.*/
218                if temp_old_mutexer.0 == WASMOVED {
219                    return Err(
220                        "Unknown error element that should not be moved, was moved before moving",
221                    );
222                }
223                temp_old_mutexer.0 = WASMOVED;
224
225                for old_elem in temp_old_mutexer.1.iter() {
226                    let mut new_mutex_elem =
227                        v1_new[old_elem.1 as usize % v1_new.len()].lock().unwrap();
228
229                    new_mutex_elem.1.push(old_elem.clone());
230                    //elems_in_shard += 1;
231                }
232                //minimizing memory consumption, removing unused vectors
233                if temp_old_mutexer.1.capacity() > 0 {
234                    temp_old_mutexer.1 = Vec::with_capacity(0);
235                }
236
237                //trash changes
238                *locker_resizer = locker_resizer.wrapping_add(temp_old_mutexer.1.len() as u32);
239            }
240        }
241        {
242            //write lock edits
243            let mut w_lock = shard.2.write().unwrap();
244
245            let old_1 = mem::replace(&mut w_lock[1], Vec::new().into_boxed_slice());
246            w_lock[0] = old_1;
247            #[cfg(test)]
248            {
249                //  println!("resize_shard {}  len {}", shard_index, w_lock[0].len())
250            }
251        }
252
253        Ok(true)
254    }
255
256    fn search_vec<F>(&mut self, key: u64, operation: F) -> Result<(), &'static str>
257    where
258        F: FnOnce(&mut Vec<ElemBox<T>>) -> Result<WhatIdo, &'static str>,
259    {
260        let mut is_edit = false;
261        let mut shard_capasity = 0;
262        let shard_index = key % self.data_arc.1.len() as u64;
263        let shard = &self.data_arc.1[shard_index as usize];
264
265        {
266            let r_lock = shard.2.read().unwrap();
267            for x in 0..2 {
268                let vecta = &r_lock[x & 0b1];
269
270                if 0 != vecta.len() {
271                    let mutexer = &mut vecta[key as usize % vecta.len()].lock().unwrap();
272                    if ONPLACE == mutexer.0 {
273                        shard_capasity = vecta.len();
274
275                        match operation(&mut mutexer.1)? {
276                            WhatIdo::REMOVE => {
277                                self.how_edit_elem_without_update[shard_index as usize] =
278                                    self.how_edit_elem_without_update[shard_index as usize]
279                                        .checked_sub(1)
280                                        .ok_or("how_edit_elem_without_update sub is owerflow")?;
281                                is_edit = true;
282                            }
283                            WhatIdo::INSERT => {
284                                self.how_edit_elem_without_update[shard_index as usize] =
285                                    self.how_edit_elem_without_update[shard_index as usize]
286                                        .checked_add(1)
287                                        .ok_or("how_edit_elem_without_update add is owerflow")?;
288                                is_edit = true;
289                            }
290                            _ => {
291                                return Ok(());
292                            }
293                        };
294                        break;
295                    }
296                }
297            }
298        } //unlock read r_lock
299        if is_edit {
300            // println!("shard_capasity: {}", shard_capasity);
301            return self.is_resize_shard(shard_index as usize, shard_capasity, false);
302        }
303
304        Err("access error, constant employment")
305    }
306    fn is_resize_shard(
307        &mut self,
308        shard_index: usize,
309        shard_capasity: usize,
310        force_edit_global_counter: bool,
311    ) -> Result<(), &'static str> {
312        if let Some(new_len) = self.new_size_shard(
313            shard_index as usize,
314            shard_capasity,
315            force_edit_global_counter,
316        ) {
317            if self.usize_smaller_u64 && (new_len > (usize::MAX as u64)) {
318                return Err(
319                    "if you see this error, it means that in your system usize::MAX < u64::MAX
320                    and when increasing the size of the shard, a number from PRIME_NUMBERS_TO_TAB was requested that
321                    is greater than usize::MAX. most likely, the usize on this device is 32 or 16 bits, which means that
322                    you can put the orientation points in MamboMambo(redream_factor* (usize:: MAX/4))"
323                    );
324            }
325
326            self.resize_shard(shard_index, new_len as usize).unwrap();
327        }
328        Ok(())
329    }
330
331    ///Resizing should only be done if the occupancy rate is higher than the redream_factor.
332    /// if it is higher and these are not the last elements in PRIME_NUMBERS_TO_TAB,
333    /// then its size becomes PRIME_NUMBERS_TO_TAB[+1],
334    /// if the number of elements is so large that you can use PRIME_NUMBERS_TO_TAB[-1]
335    /// and there will still be room under SHRINK_THRESHOLD_FACTOR before exceeding redream_factor,
336    /// the capacity size decreases by PRIME_NUMBERS_TO_TAB[-1].
337    fn new_size_shard(
338        &mut self,
339        shard_index: usize,
340        shard_capasity: usize,
341        force_edit_global_counter: bool,
342    ) -> Option<u64> {
343        let shard_i = &mut self.how_edit_elem_without_update[shard_index];
344
345        if !force_edit_global_counter && shard_i.abs_diff(0) < ELEM_NUMS_TO_READ_IN_MUTEX {
346            return None;
347        }
348
349        let elems_in_me = {
350            let mut mutexer = self.data_arc.1[shard_index].1.lock().unwrap();
351
352            if *shard_i > 0 {
353                *mutexer = mutexer.checked_add(shard_i.abs_diff(0) as usize).unwrap();
354            } else {
355                *mutexer = mutexer.checked_sub(shard_i.abs_diff(0) as usize).unwrap();
356            }
357            *shard_i = 0;
358            *mutexer
359        };
360        //println!("elems in me: {}", elems_in_me);
361        let index_my_prime = self.difer_index(shard_capasity);
362
363        if elems_in_me as f32 / (shard_capasity as f32 + EPSILON) > self.data_arc.0 {
364            if index_my_prime + 1 < PRIME_NUMBERS_TO_TAB.len() {
365                Some(PRIME_NUMBERS_TO_TAB[index_my_prime + 1])
366            } else {
367                None
368            }
369        } else if index_my_prime > 0 {
370            let t = PRIME_NUMBERS_TO_TAB[index_my_prime - 1];
371            if self.data_arc.0 > (elems_in_me as f32 * SHRINK_THRESHOLD_FACTOR) / t as f32 {
372                Some(PRIME_NUMBERS_TO_TAB[index_my_prime - 1])
373            } else {
374                None
375            }
376        } else {
377            None
378        }
379    }
380    /// finds the index of the number closest to the target in PRIME_NUMBER_TO_TAB
381    fn difer_index(&self, target: usize) -> usize {
382        let target = target as u64;
383
384        PRIME_NUMBERS_TO_TAB
385            .iter()
386            .enumerate()
387            .min_by_key(|&(_, &prime)| prime.abs_diff(target))
388            .map(|(index, _)| index)
389            .unwrap_or(0)
390    }
391    /// Some of the Mambo structure data is thread-independent,
392    /// and each copy via arc_clone provides a convenient instance of the Mambo structure that can be used
393    /// in a new thread. all data regarding the elements of the Mambo hash table can be obtained from any
394    /// stream that has an instance of arc_clone.
395    pub fn arc_clone(&self) -> Self {
396        Self {
397            data_arc: Arc::clone(&self.data_arc),
398            how_edit_elem_without_update: vec![0; self.data_arc.1.len()].into_boxed_slice(),
399            usize_smaller_u64: self.usize_smaller_u64,
400        }
401    }
402    /// insert an element T with the key: u64.if there is already an element with the same key: u64 in the table,
403    /// then when force_replace == true, the old element T will be replaced by the new element T,
404    /// while the old element T will be returned as Ok(Some(T.clone())). if force_replace == false,
405    /// the old element will not be replaced and the function will return Ok(None).
406    /// if there is no element with this key: u64,
407    /// a new element will be added to the table and the function will output Ok(None)
408    pub fn insert(
409        &mut self,
410        key: u64,
411        elem: &T,
412        force_replace: bool,
413    ) -> Result<Option<T>, &'static str> {
414        let mut replaced_elem: Option<T> = None;
415        self.search_vec(key, |vecta| {
416            for x in vecta.iter_mut() {
417                //Searching for an element and an equivalent key
418                if x.1 == key as u64 {
419                    //if there was no insertion due to the fact that the element is already in the table,
420                    if force_replace {
421                        replaced_elem = Some(x.0.clone());
422                        *x = (elem.clone(), key);
423                    }
424                    //the operation is considered as a read.
425                    return Ok(WhatIdo::READ);
426                }
427            }
428            vecta.push((elem.clone(), key as u64));
429
430            Ok(WhatIdo::INSERT)
431        })?; //search_vec
432
433        Ok(replaced_elem)
434    }
435    /// to remove the fragment. you need to return the key key: u64
436    /// if an element was in the table and it was deleted, but the function returns Ok(Some(T.clone()))
437    // if there was no such element in the hash table, Ok(None) will be returned
438    pub fn remove(&mut self, key: u64) -> Result<Option<T>, &'static str> {
439        let mut ret_after_remove: Option<T> = None;
440
441        self.search_vec(key, |vecta| {
442            for x in 0..vecta.len() {
443                if vecta[x].1 == key as u64 {
444                    let last = vecta.last();
445
446                    if last.is_none() {
447                        return Ok(WhatIdo::READ);
448                    }
449                    let last = last.unwrap();
450
451                    ret_after_remove = Some(vecta[x].0.clone());
452                    vecta[x] = last.clone();
453                    vecta.pop();
454                    return Ok(WhatIdo::REMOVE);
455                }
456            }
457
458            Ok(WhatIdo::READ)
459        })?;
460        Ok(ret_after_remove)
461    }
462    ///!Attention!! DO NOT CALL read INSIDE THE read CLOSURE!!! THIS MAY CAUSE THE THREAD TO SELF-LOCK!!
463    /// -
464    ///  reading. to access an item for reading, you need to request it using the key.
465    ///  and the element is read and processed in the
466    ///  RFy closure:FnOnce(Option<&mut T>) -> Result<(), &'static str>,
467    ///  since &mut T is Mutex.lock(). while processing is taking place in read<RFy>,
468    ///  the shard in which this element is located cannot:
469    ///  1 change its size.
470    ///  2: since Mutex contains elements from 0 to redream_factor
471    ///  (a parameter in the Mamdo::new constructor),
472    ///  access in other threads for elements in one Mutex is blocked.
473    ///   to summarize, the faster the closure is resolved inside read, the better.
474    pub fn read<RFy>(&mut self, key: u64, ridler: RFy) -> Result<(), &'static str>
475    where
476        RFy: FnOnce(Option<&mut T>) -> Result<(), &'static str>,
477    {
478        self.search_vec(key, |vecta| {
479            for (t, hahs) in vecta.iter_mut() {
480                if *hahs == key as u64 {
481                    ridler(Some(t))?;
482                    return Ok(WhatIdo::READ);
483                }
484            }
485            ridler(None)?;
486            return Ok(WhatIdo::READ);
487        })
488    }
489    /// The number of elements in the hash table.
490    ///  returns the number of all items in the table.note.
491    ///  for optimization in multithreaded environments,
492    ///
493    /// !!!item count! changes do NOT occur EVERY TIME an item is DELETED/INSERTED.!!!
494    ///  with a large number of elements and in a multithreaded environment,
495    ///  it may not be critical, but when there are few elements, when elems_im_me is called,
496    ///  it may return 0 even if there are elements in Mambo.
497    ///  This is a forced decision to increase productivity.
498    pub fn elems_im_me(&self) -> Result<usize, &'static str> {
499        let mut elems = 0_usize;
500        for x in self.data_arc.1.iter() {
501            let t = x.1.lock().unwrap();
502            elems = elems
503                .checked_add(*t)
504                .ok_or("err elems.checked_add(elems in shard )")?;
505        }
506        Ok(elems)
507    }
508}
509
510impl<T: Clone> Drop for Mambo<T> {
511    ///  deleting a Mambo instance. Since Mambo uses only secure rust APIs,
512    ///  it does not need to implement the Drop trace, but since when inserting
513    ///  or deleting an element, calls insert() or remove() do not change the global
514    ///  element counter every time these methods are called so that the discrepancy
515    ///  is minimal, each time an instance of Mambo is deleted, the value is forcibly
516    ///  saved from a local variable. to the global internal Mutex
517    fn drop(&mut self) {
518        let mut ive = vec![0usize; 0];
519
520        for (shard, shard_i) in self
521            .data_arc
522            .1
523            .iter()
524            .zip(self.how_edit_elem_without_update.iter())
525        {
526            let mut mutexer = shard.1.lock().unwrap();
527            let shard_capasity = {
528                let rwr = shard.2.read().unwrap();
529                let len_r1 = rwr[1].len();
530                if 0 != len_r1 {
531                    len_r1
532                } else {
533                    rwr[0].len()
534                }
535            };
536
537            if *shard_i > 0 {
538                *mutexer = mutexer.checked_add(shard_i.abs_diff(0) as usize).unwrap();
539                //println!("mux {}", shard_i.abs_diff(0));
540            } else {
541                *mutexer = mutexer.checked_sub(shard_i.abs_diff(0) as usize).unwrap();
542            }
543            //println!("len: {}", shard_capasity);
544            ive.push(shard_capasity);
545        }
546
547        for (shard_index, &shard_capasity) in ive.iter().enumerate() {
548            self.is_resize_shard(shard_index as usize, shard_capasity, true)
549                .unwrap();
550            //println!("drop{}", shard_index);
551        }
552    }
553}
554
555#[cfg(test)]
556mod tests_n {
557
558    use super::*;
559
560    // use core::time;
561    //use std::sync::Arc;
562    use std::thread;
563    use std::time::Instant;
564
565    use std::u64;
566
567    #[test]
568    fn based_insert_test() {
569        let shards = 2;
570        let mut mambo = Mambo::<u32>::new(shards, 5.0).unwrap();
571        println!("{}", mambo.data_arc.1.len());
572
573        for x in 0..30_000 {
574            if x % 10 == 9 {
575                let elems = mambo.elems_im_me().unwrap();
576
577                assert!(
578                    elems.abs_diff(x) <= ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards,
579                    "elems.abs_diff(x):{} {}",
580                    elems.abs_diff(x),
581                    ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards
582                );
583
584                //println!("{}", elems);
585            }
586            let x = x as u32;
587            assert_eq!(mambo.insert(x as u64, &x, false), Ok(None));
588            assert_eq!(mambo.insert(x as u64, &x, true), Ok(Some(x)));
589            assert_eq!(mambo.insert(x as u64, &x, false), Ok(None));
590        }
591
592        for x in 0..30_000 {
593            assert_eq!(
594                mambo.read(x, |el| {
595                    let el = el.unwrap();
596
597                    assert_eq!(*el, x as u32);
598                    *el += 1;
599                    Ok(())
600                }),
601                Ok(())
602            );
603
604            assert_eq!(
605                mambo.read(x + 1_000_000, |el| {
606                    assert_eq!(el, None);
607                    Ok(())
608                }),
609                Ok(())
610            );
611        }
612
613        for x in 0..30_000 {
614            let inv_x = 30_000 - x;
615            let elems = mambo.elems_im_me().unwrap();
616            assert!(
617                elems.abs_diff(inv_x) <= ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards,
618                "elems.abs_diff(x):{} {}",
619                elems.abs_diff(inv_x),
620                ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards
621            );
622            let x = x as u64;
623            //println!("{:?}  {}", mambo.remove(x as u64), x);
624            assert_eq!(mambo.remove(x as u64), Ok(Some(x as u32 + 1)));
625        }
626        //assert_eq!(mambo.remove(11), Ok(None));
627
628        return;
629    }
630
631    #[test]
632    fn based_example() {
633        {
634            let shards = 16;
635            let elems_in_mutex = 7.0;
636            const NUM_THREADS: usize = 10;
637            const OPS_PER_THREAD: usize = 100;
638            let mambo = Mambo::<String>::new(shards, elems_in_mutex).unwrap();
639
640            for tt in 1..NUM_THREADS {
641                let mut mambo_arc = mambo.arc_clone();
642
643                let _ = thread::spawn(move || {
644                    for key in 0..OPS_PER_THREAD {
645                        let key = tt + (key * OPS_PER_THREAD * 10);
646
647                        let elem_me = format!("mambo elem{}", key);
648
649                        mambo_arc.insert(key as u64, &elem_me, false).unwrap();
650
651                        assert_eq!(mambo_arc.insert(key as u64, &elem_me, false), Ok(None));
652
653                        mambo_arc
654                            .read(key as u64, |ind| {
655                                let ind = ind.unwrap();
656
657                                assert_eq!(
658                                    ind.clone(),
659                                    elem_me,
660                                    " non eq read  key: {}   rea: {}   in map: {}",
661                                    key,
662                                    elem_me,
663                                    ind.clone()
664                                );
665
666                                Ok(())
667                            })
668                            .unwrap();
669                    }
670
671                    for key in 0..OPS_PER_THREAD {
672                        let key = tt + (key * OPS_PER_THREAD * 10);
673                        let elem_me = format!("mambo elem{}", key);
674
675                        assert_eq!(mambo_arc.remove(key as u64), Ok(Some(elem_me.clone())));
676                    }
677                });
678            }
679        }
680    }
681
682    #[test]
683    fn based_threars_test() {
684        {
685            //return;
686            const NUM_THREADS: usize = 10;
687            const OPS_PER_THREAD: usize = 1000;
688            const TEST_ELEMES: usize = 1000;
689            let mambo = Mambo::<u64>::new(16, 5.0).unwrap();
690
691            fn pair(u: usize) -> usize {
692                let mut u: usize = u as usize;
693                for m in 0..12 {
694                    u ^= u.rotate_left(m * 1).wrapping_add(u.rotate_right(3 * m))
695                        ^ PRIME_NUMBERS_TO_TAB[10 + m as usize] as usize;
696                }
697                u as usize
698            }
699
700            let mut std_handles = Vec::new();
701            let std_barrier = Arc::new(std::sync::Barrier::new(NUM_THREADS + 1));
702
703            for tt in 1..NUM_THREADS + 1 {
704                let barrier_clone = Arc::clone(&std_barrier);
705
706                let mut ra_clone = mambo.arc_clone();
707
708                let handle = thread::spawn(move || {
709                    barrier_clone.wait();
710
711                    //println!("t: {}  np {:x}", tt, ptr::null_mut::<u8>() as usize);
712                    //return;
713
714                    if tt % 4 == 100 {
715                        for _ in 0..OPS_PER_THREAD {
716                            for o in 0..TEST_ELEMES {
717                                let pai = pair(o);
718                                ra_clone
719                                .read(o as u64, |ind| {
720                                    let ind = ind.unwrap();
721                                    assert_eq!(
722                                        *ind,
723                                        pai as u64,
724                                        " based_threars_test non eq read  index: {}   rea: {}   in map: {}",o,pai as u32,*ind
725                                    );
726                                    Ok(())
727                                })
728                                .unwrap();
729                            }
730                        }
731                    } else {
732                        let elem_read_write = pair(pair(tt)) % TEST_ELEMES;
733
734                        let iterations = pair(pair(elem_read_write)) % OPS_PER_THREAD;
735
736                        for _ in 0..iterations {
737                            for yy in 0..elem_read_write {
738                                let index = tt + (NUM_THREADS * 10_000_000 * yy);
739                                let yy = yy as u64;
740                                ra_clone.insert(index as u64, &yy, false).unwrap();
741                            }
742                            // println!("+++++++++++++++==");
743                            for yy in 0..elem_read_write {
744                                //println!("{}", yy);
745                                let index = tt + (NUM_THREADS * 10_000_000 * yy);
746                                let ga = ra_clone.remove(index as u64);
747
748                                if ga.is_err() && 200 > TEST_ELEMES {
749                                    println!("index {}", index);
750
751                                    if ra_clone.data_arc.1.len() == 1 {
752                                        for inn in 0..(TEST_ELEMES as f32 * (1.0 / 0.6)) as usize {
753                                            let _ = ra_clone.read(inn as u64, |xx| {
754                                                let xx = xx.unwrap();
755                                                print!("| {} ", xx);
756                                                Ok(())
757                                            });
758                                            if inn % 4 == 3 {
759                                                println!()
760                                            }
761                                        }
762                                    } else {
763                                        panic!("ra_clone.shards.len()==1else");
764                                    }
765                                }
766                            }
767                        }
768                    }
769                });
770
771                std_handles.push(handle);
772            }
773
774            std_barrier.wait();
775
776            for handle in std_handles {
777                handle.join().unwrap();
778            }
779        }
780    }
781
782    #[test]
783    fn read_write() {
784        for tre in (1..20).step_by(1) {
785            //const NUM_THREADS: usize = 50;
786            let num_treads: usize = tre;
787            const NUM_ELEMS: usize = 80;
788            const TOTAL_OPS: u64 = 500_000;
789            let ops_threads: u64 = TOTAL_OPS / num_treads as u64;
790            {
791                let mut mambo = Mambo::<u64>::new(16, 10.0).unwrap();
792                let mut std_handles = Vec::new();
793                let std_start = Instant::now();
794                let std_barrier = Arc::new(std::sync::Barrier::new(num_treads + 1));
795
796                for i in 0..NUM_ELEMS {
797                    let t = 0;
798                    mambo.insert(i as u64, &t, false).unwrap();
799                }
800
801                for _ in 0..num_treads {
802                    let barrier_clone = Arc::clone(&std_barrier);
803
804                    let mut ra_clone = mambo.arc_clone();
805
806                    let handle = thread::spawn(move || {
807                        barrier_clone.wait();
808
809                        for i in 0..ops_threads {
810                            let mut od: u64 = i as u64;
811                            for _ in 0..3 {
812                                od = od.rotate_left(43).wrapping_add(!i as u64);
813                            }
814                            let _ = ra_clone
815                                .read(od % NUM_ELEMS as u64, |x| {
816                                    let x = x.unwrap();
817                                    *x += 1;
818                                    Ok(())
819                                })
820                                .unwrap();
821                        }
822                    });
823
824                    std_handles.push(handle);
825                }
826
827                std_barrier.wait();
828
829                for handle in std_handles {
830                    handle.join().unwrap();
831                }
832                if true {
833                    println!(
834                        "{}Mop/S: {:.3}",
835                        "only read  ",
836                        TOTAL_OPS as f64 / std_start.elapsed().as_micros() as f64
837                    );
838                } else {
839                    println!(
840                        "[{},  {:.3}]",
841                        tre,
842                        TOTAL_OPS as f64 / std_start.elapsed().as_micros() as f64
843                    );
844                }
845            }
846        }
847        // assert!(false);
848    }
849
850    #[test]
851    fn read_write_ers() {
852        let mambo = Mambo::<u64>::new(1, 10.0).unwrap();
853        let std_start = Instant::now();
854        let elem_in_cycle = 10u64;
855        let cycles = 200_00u64;
856        for xx in (1..cycles).step_by(1) {
857            let mut clom = mambo.arc_clone();
858            //println!("{}", clom.elems_im_me().unwrap());
859            for yy in 0..elem_in_cycle {
860                let t = 0;
861                clom.insert((yy * cycles * 100) + xx, &t, false).unwrap();
862            }
863        }
864
865        println!(
866            "{}Mop/S: {:.4}",
867            "only read  ",
868            (elem_in_cycle * cycles) as f64 / std_start.elapsed().as_micros() as f64
869        );
870
871        // assert!(false);
872    }
873}