1use std::{
2 mem,
3 sync::{Arc, Mutex, RwLock, TryLockError},
4};
5pub const PRIME_NUMBERS_TO_TAB: [u64; 64] = [
6 0x2, 0x5, 0x7, 0xb_u64, 0x17_u64, 0x2f_u64, 0x61_u64, 0xc7_u64, 0x199_u64, 0x337_u64, 0x6cd_u64, 0xd8d_u64, 0x1b25_u64, 0x36d1_u64, 0x6efb_u64, 0xe0d5_u64, 0x1c7fb_u64, 0x39d61_u64, 0x75671_u64, 0xee5f1_u64, 0x1e40a3_u64, 0x3d6eaf_u64, 0x7cbf17_u64, 0xfd51f9_u64, 0x2026a59_u64, 0x4149f67_u64, 0x8495051_u64, 0x10d3c05f_u64, 0x222bc111_u64, 0x45641269_u64, 0x8ce98529_u64, 0xfffffffb_u64, 0x1fffffff7_u64, 0x3ffffffd7_u64, 0x7ffffffe1_u64, 0xffffffffb_u64, 0x1fffffffe7_u64, 0x3fffffffd3_u64, 0x7ffffffff9_u64, 0xffffffffa9_u64, 0x1ffffffffeb_u64, 0x3fffffffff5_u64, 0x7ffffffffc7_u64, 0xfffffffffef_u64, 0x1fffffffffc9_u64, 0x3fffffffffeb_u64, 0x7fffffffff8d_u64, 0xffffffffffc5_u64, 0x1ffffffffffaf_u64, 0x3ffffffffffe5_u64, 0x7ffffffffff7f_u64, 0xfffffffffffd1_u64, 0x1fffffffffff91_u64, 0x3fffffffffffdf_u64, 0x7fffffffffffc9_u64, 0xfffffffffffffb_u64, 0x1fffffffffffff3_u64, 0x3ffffffffffffe5_u64, 0x7ffffffffffffc9_u64, 0xfffffffffffffa3_u64, 0x1fffffffffffffff_u64, 0x3fffffffffffffc7_u64, 0x7fffffffffffffe7_u64, 0xffffffffffffffc5_u64, ];
71
72const EPSILON: f32 = 0.0001;
73const SHRINK_THRESHOLD_FACTOR: f32 = 1.25;
74const WASMOVED: bool = false;
75const ONPLACE: bool = true;
76const ELEM_NUMS_TO_READ_IN_MUTEX: u64 = 32;
77type ElemBox<T> = (T, u64);
78type Vecta<T> = Mutex<(bool, Vec<ElemBox<T>>)>;
79type Shard<T> = (
80 Mutex<u32>, Mutex<usize>, RwLock<[Box<[Vecta<T>]>; 2]>,
83);
84
85enum WhatIdo {
86 READ,
87 REMOVE,
88 INSERT,
89}
90
91impl PartialEq for WhatIdo {
92 fn eq(&self, other: &Self) -> bool {
93 match (self, other) {
94 (WhatIdo::INSERT, WhatIdo::INSERT) => true,
95 (WhatIdo::REMOVE, WhatIdo::REMOVE) => true,
96 (WhatIdo::READ, WhatIdo::READ) => true,
97 _ => false,
98 }
99 }
100}
101pub struct Mambo<T: Clone> {
103 data_arc: Arc<(f32, Box<[Shard<T>]>)>,
104 how_edit_elem_without_update: Box<[i64]>,
105 usize_smaller_u64: bool,
106}
107
108impl<T: Clone> Mambo<T> {
109 pub fn new(num_shards: usize, redream_factor: f32) -> Result<Self, &'static str> {
118 if !(1.0..11.0).contains(&redream_factor) {
119 return Err("(1.0..11.0).contains( redream_factor)");
120 }
121 let shards = (0..num_shards)
122 .map(|_| {
123 let data: Box<[Vecta<T>]> = (0..PRIME_NUMBERS_TO_TAB[0])
124 .map(|_| Mutex::new((ONPLACE, vec![])))
125 .collect();
126
127 (
128 Mutex::new(0_u32),
129 Mutex::new(0_usize),
130 RwLock::new([data, vec![].into_boxed_slice()]),
131 )
132 })
133 .collect();
134 Ok(Self {
135 data_arc: Arc::new((redream_factor, shards)),
136 how_edit_elem_without_update: vec![0; num_shards].into_boxed_slice(),
137 usize_smaller_u64: (usize::MAX as u64) < (u64::MAX as u64),
138 })
139 }
140
141 fn resize_shard(&self, shard_index: usize, new_len: usize) -> Result<bool, &'static str> {
147 let shard = self
148 .data_arc
149 .1
150 .get(shard_index)
151 .ok_or("shard_index out of range")?;
152 let mut locker_resizer = match shard.0.try_lock() {
159 Ok(guard) => guard,
160 Err(TryLockError::WouldBlock) => {
161 return Ok(false);
162 }
163 Err(TryLockError::Poisoned(err)) => {
164 panic!("Mutex poisoned: {:?}", err);
165 }
166 };
167 *locker_resizer = locker_resizer.wrapping_add(1);
168
169 let _ = {
170 let v0_len = shard.2.read().unwrap()[0].len();
179 let v1_len = shard.2.read().unwrap()[1].len();
180
181 if v0_len > 0 && v1_len == 0 {
182 v0_len
183 } else if v1_len > 0 && v0_len > 0 {
184 return Err("vec0 and vec1 > 0 ");
185 } else {
186 return Err("vec0 and vec1 == 0 ");
187 }
188 };
189
190 {
191 let new_vec_box: Box<[Vecta<T>]> = (0..new_len)
193 .map(|_| Mutex::new((ONPLACE, vec![])))
194 .collect();
195 let mut w_rlock = shard.2.write().unwrap();
197 if w_rlock[1].len() == 0 {
198 w_rlock[1] = new_vec_box;
199 } else {
200 return Err("vec 1 is resizes in this moment");
201 }
202 }
204 {
205 let r_lock = shard.2.read().unwrap();
207 let v0_old = &r_lock[0];
208 let v1_new = &r_lock[1];
209 for x in v0_old.iter() {
212 let mut temp_old_mutexer = x.lock().unwrap();
213 if temp_old_mutexer.0 == WASMOVED {
219 return Err(
220 "Unknown error element that should not be moved, was moved before moving",
221 );
222 }
223 temp_old_mutexer.0 = WASMOVED;
224
225 for old_elem in temp_old_mutexer.1.iter() {
226 let mut new_mutex_elem =
227 v1_new[old_elem.1 as usize % v1_new.len()].lock().unwrap();
228
229 new_mutex_elem.1.push(old_elem.clone());
230 }
232 if temp_old_mutexer.1.capacity() > 0 {
234 temp_old_mutexer.1 = Vec::with_capacity(0);
235 }
236
237 *locker_resizer = locker_resizer.wrapping_add(temp_old_mutexer.1.len() as u32);
239 }
240 }
241 {
242 let mut w_lock = shard.2.write().unwrap();
244
245 let old_1 = mem::replace(&mut w_lock[1], Vec::new().into_boxed_slice());
246 w_lock[0] = old_1;
247 #[cfg(test)]
248 {
249 }
251 }
252
253 Ok(true)
254 }
255
256 fn search_vec<F>(&mut self, key: u64, operation: F) -> Result<(), &'static str>
257 where
258 F: FnOnce(&mut Vec<ElemBox<T>>) -> Result<WhatIdo, &'static str>,
259 {
260 let mut is_edit = false;
261 let mut shard_capasity = 0;
262 let shard_index = key % self.data_arc.1.len() as u64;
263 let shard = &self.data_arc.1[shard_index as usize];
264
265 {
266 let r_lock = shard.2.read().unwrap();
267 for x in 0..2 {
268 let vecta = &r_lock[x & 0b1];
269
270 if 0 != vecta.len() {
271 let mutexer = &mut vecta[key as usize % vecta.len()].lock().unwrap();
272 if ONPLACE == mutexer.0 {
273 shard_capasity = vecta.len();
274
275 match operation(&mut mutexer.1)? {
276 WhatIdo::REMOVE => {
277 self.how_edit_elem_without_update[shard_index as usize] =
278 self.how_edit_elem_without_update[shard_index as usize]
279 .checked_sub(1)
280 .ok_or("how_edit_elem_without_update sub is owerflow")?;
281 is_edit = true;
282 }
283 WhatIdo::INSERT => {
284 self.how_edit_elem_without_update[shard_index as usize] =
285 self.how_edit_elem_without_update[shard_index as usize]
286 .checked_add(1)
287 .ok_or("how_edit_elem_without_update add is owerflow")?;
288 is_edit = true;
289 }
290 _ => {
291 return Ok(());
292 }
293 };
294 break;
295 }
296 }
297 }
298 } if is_edit {
300 return self.is_resize_shard(shard_index as usize, shard_capasity, false);
302 }
303
304 Err("access error, constant employment")
305 }
306 fn is_resize_shard(
307 &mut self,
308 shard_index: usize,
309 shard_capasity: usize,
310 force_edit_global_counter: bool,
311 ) -> Result<(), &'static str> {
312 if let Some(new_len) = self.new_size_shard(
313 shard_index as usize,
314 shard_capasity,
315 force_edit_global_counter,
316 ) {
317 if self.usize_smaller_u64 && (new_len > (usize::MAX as u64)) {
318 return Err(
319 "if you see this error, it means that in your system usize::MAX < u64::MAX
320 and when increasing the size of the shard, a number from PRIME_NUMBERS_TO_TAB was requested that
321 is greater than usize::MAX. most likely, the usize on this device is 32 or 16 bits, which means that
322 you can put the orientation points in MamboMambo(redream_factor* (usize:: MAX/4))"
323 );
324 }
325
326 self.resize_shard(shard_index, new_len as usize).unwrap();
327 }
328 Ok(())
329 }
330
331 fn new_size_shard(
338 &mut self,
339 shard_index: usize,
340 shard_capasity: usize,
341 force_edit_global_counter: bool,
342 ) -> Option<u64> {
343 let shard_i = &mut self.how_edit_elem_without_update[shard_index];
344
345 if !force_edit_global_counter && shard_i.abs_diff(0) < ELEM_NUMS_TO_READ_IN_MUTEX {
346 return None;
347 }
348
349 let elems_in_me = {
350 let mut mutexer = self.data_arc.1[shard_index].1.lock().unwrap();
351
352 if *shard_i > 0 {
353 *mutexer = mutexer.checked_add(shard_i.abs_diff(0) as usize).unwrap();
354 } else {
355 *mutexer = mutexer.checked_sub(shard_i.abs_diff(0) as usize).unwrap();
356 }
357 *shard_i = 0;
358 *mutexer
359 };
360 let index_my_prime = self.difer_index(shard_capasity);
362
363 if elems_in_me as f32 / (shard_capasity as f32 + EPSILON) > self.data_arc.0 {
364 if index_my_prime + 1 < PRIME_NUMBERS_TO_TAB.len() {
365 Some(PRIME_NUMBERS_TO_TAB[index_my_prime + 1])
366 } else {
367 None
368 }
369 } else if index_my_prime > 0 {
370 let t = PRIME_NUMBERS_TO_TAB[index_my_prime - 1];
371 if self.data_arc.0 > (elems_in_me as f32 * SHRINK_THRESHOLD_FACTOR) / t as f32 {
372 Some(PRIME_NUMBERS_TO_TAB[index_my_prime - 1])
373 } else {
374 None
375 }
376 } else {
377 None
378 }
379 }
380 fn difer_index(&self, target: usize) -> usize {
382 let target = target as u64;
383
384 PRIME_NUMBERS_TO_TAB
385 .iter()
386 .enumerate()
387 .min_by_key(|&(_, &prime)| prime.abs_diff(target))
388 .map(|(index, _)| index)
389 .unwrap_or(0)
390 }
391 pub fn arc_clone(&self) -> Self {
396 Self {
397 data_arc: Arc::clone(&self.data_arc),
398 how_edit_elem_without_update: vec![0; self.data_arc.1.len()].into_boxed_slice(),
399 usize_smaller_u64: self.usize_smaller_u64,
400 }
401 }
402 pub fn insert(
409 &mut self,
410 key: u64,
411 elem: &T,
412 force_replace: bool,
413 ) -> Result<Option<T>, &'static str> {
414 let mut replaced_elem: Option<T> = None;
415 self.search_vec(key, |vecta| {
416 for x in vecta.iter_mut() {
417 if x.1 == key as u64 {
419 if force_replace {
421 replaced_elem = Some(x.0.clone());
422 *x = (elem.clone(), key);
423 }
424 return Ok(WhatIdo::READ);
426 }
427 }
428 vecta.push((elem.clone(), key as u64));
429
430 Ok(WhatIdo::INSERT)
431 })?; Ok(replaced_elem)
434 }
435 pub fn remove(&mut self, key: u64) -> Result<Option<T>, &'static str> {
439 let mut ret_after_remove: Option<T> = None;
440
441 self.search_vec(key, |vecta| {
442 for x in 0..vecta.len() {
443 if vecta[x].1 == key as u64 {
444 let last = vecta.last();
445
446 if last.is_none() {
447 return Ok(WhatIdo::READ);
448 }
449 let last = last.unwrap();
450
451 ret_after_remove = Some(vecta[x].0.clone());
452 vecta[x] = last.clone();
453 vecta.pop();
454 return Ok(WhatIdo::REMOVE);
455 }
456 }
457
458 Ok(WhatIdo::READ)
459 })?;
460 Ok(ret_after_remove)
461 }
462 pub fn read<RFy>(&mut self, key: u64, ridler: RFy) -> Result<(), &'static str>
475 where
476 RFy: FnOnce(Option<&mut T>) -> Result<(), &'static str>,
477 {
478 self.search_vec(key, |vecta| {
479 for (t, hahs) in vecta.iter_mut() {
480 if *hahs == key as u64 {
481 ridler(Some(t))?;
482 return Ok(WhatIdo::READ);
483 }
484 }
485 ridler(None)?;
486 return Ok(WhatIdo::READ);
487 })
488 }
489 pub fn elems_im_me(&self) -> Result<usize, &'static str> {
499 let mut elems = 0_usize;
500 for x in self.data_arc.1.iter() {
501 let t = x.1.lock().unwrap();
502 elems = elems
503 .checked_add(*t)
504 .ok_or("err elems.checked_add(elems in shard )")?;
505 }
506 Ok(elems)
507 }
508}
509
510impl<T: Clone> Drop for Mambo<T> {
511 fn drop(&mut self) {
518 let mut ive = vec![0usize; 0];
519
520 for (shard, shard_i) in self
521 .data_arc
522 .1
523 .iter()
524 .zip(self.how_edit_elem_without_update.iter())
525 {
526 let mut mutexer = shard.1.lock().unwrap();
527 let shard_capasity = {
528 let rwr = shard.2.read().unwrap();
529 let len_r1 = rwr[1].len();
530 if 0 != len_r1 {
531 len_r1
532 } else {
533 rwr[0].len()
534 }
535 };
536
537 if *shard_i > 0 {
538 *mutexer = mutexer.checked_add(shard_i.abs_diff(0) as usize).unwrap();
539 } else {
541 *mutexer = mutexer.checked_sub(shard_i.abs_diff(0) as usize).unwrap();
542 }
543 ive.push(shard_capasity);
545 }
546
547 for (shard_index, &shard_capasity) in ive.iter().enumerate() {
548 self.is_resize_shard(shard_index as usize, shard_capasity, true)
549 .unwrap();
550 }
552 }
553}
554
555#[cfg(test)]
556mod tests_n {
557
558 use super::*;
559
560 use std::thread;
563 use std::time::Instant;
564
565 use std::u64;
566
567 #[test]
568 fn based_insert_test() {
569 let shards = 2;
570 let mut mambo = Mambo::<u32>::new(shards, 5.0).unwrap();
571 println!("{}", mambo.data_arc.1.len());
572
573 for x in 0..30_000 {
574 if x % 10 == 9 {
575 let elems = mambo.elems_im_me().unwrap();
576
577 assert!(
578 elems.abs_diff(x) <= ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards,
579 "elems.abs_diff(x):{} {}",
580 elems.abs_diff(x),
581 ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards
582 );
583
584 }
586 let x = x as u32;
587 assert_eq!(mambo.insert(x as u64, &x, false), Ok(None));
588 assert_eq!(mambo.insert(x as u64, &x, true), Ok(Some(x)));
589 assert_eq!(mambo.insert(x as u64, &x, false), Ok(None));
590 }
591
592 for x in 0..30_000 {
593 assert_eq!(
594 mambo.read(x, |el| {
595 let el = el.unwrap();
596
597 assert_eq!(*el, x as u32);
598 *el += 1;
599 Ok(())
600 }),
601 Ok(())
602 );
603
604 assert_eq!(
605 mambo.read(x + 1_000_000, |el| {
606 assert_eq!(el, None);
607 Ok(())
608 }),
609 Ok(())
610 );
611 }
612
613 for x in 0..30_000 {
614 let inv_x = 30_000 - x;
615 let elems = mambo.elems_im_me().unwrap();
616 assert!(
617 elems.abs_diff(inv_x) <= ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards,
618 "elems.abs_diff(x):{} {}",
619 elems.abs_diff(inv_x),
620 ELEM_NUMS_TO_READ_IN_MUTEX as usize * shards
621 );
622 let x = x as u64;
623 assert_eq!(mambo.remove(x as u64), Ok(Some(x as u32 + 1)));
625 }
626 return;
629 }
630
631 #[test]
632 fn based_example() {
633 {
634 let shards = 16;
635 let elems_in_mutex = 7.0;
636 const NUM_THREADS: usize = 10;
637 const OPS_PER_THREAD: usize = 100;
638 let mambo = Mambo::<String>::new(shards, elems_in_mutex).unwrap();
639
640 for tt in 1..NUM_THREADS {
641 let mut mambo_arc = mambo.arc_clone();
642
643 let _ = thread::spawn(move || {
644 for key in 0..OPS_PER_THREAD {
645 let key = tt + (key * OPS_PER_THREAD * 10);
646
647 let elem_me = format!("mambo elem{}", key);
648
649 mambo_arc.insert(key as u64, &elem_me, false).unwrap();
650
651 assert_eq!(mambo_arc.insert(key as u64, &elem_me, false), Ok(None));
652
653 mambo_arc
654 .read(key as u64, |ind| {
655 let ind = ind.unwrap();
656
657 assert_eq!(
658 ind.clone(),
659 elem_me,
660 " non eq read key: {} rea: {} in map: {}",
661 key,
662 elem_me,
663 ind.clone()
664 );
665
666 Ok(())
667 })
668 .unwrap();
669 }
670
671 for key in 0..OPS_PER_THREAD {
672 let key = tt + (key * OPS_PER_THREAD * 10);
673 let elem_me = format!("mambo elem{}", key);
674
675 assert_eq!(mambo_arc.remove(key as u64), Ok(Some(elem_me.clone())));
676 }
677 });
678 }
679 }
680 }
681
682 #[test]
683 fn based_threars_test() {
684 {
685 const NUM_THREADS: usize = 10;
687 const OPS_PER_THREAD: usize = 1000;
688 const TEST_ELEMES: usize = 1000;
689 let mambo = Mambo::<u64>::new(16, 5.0).unwrap();
690
691 fn pair(u: usize) -> usize {
692 let mut u: usize = u as usize;
693 for m in 0..12 {
694 u ^= u.rotate_left(m * 1).wrapping_add(u.rotate_right(3 * m))
695 ^ PRIME_NUMBERS_TO_TAB[10 + m as usize] as usize;
696 }
697 u as usize
698 }
699
700 let mut std_handles = Vec::new();
701 let std_barrier = Arc::new(std::sync::Barrier::new(NUM_THREADS + 1));
702
703 for tt in 1..NUM_THREADS + 1 {
704 let barrier_clone = Arc::clone(&std_barrier);
705
706 let mut ra_clone = mambo.arc_clone();
707
708 let handle = thread::spawn(move || {
709 barrier_clone.wait();
710
711 if tt % 4 == 100 {
715 for _ in 0..OPS_PER_THREAD {
716 for o in 0..TEST_ELEMES {
717 let pai = pair(o);
718 ra_clone
719 .read(o as u64, |ind| {
720 let ind = ind.unwrap();
721 assert_eq!(
722 *ind,
723 pai as u64,
724 " based_threars_test non eq read index: {} rea: {} in map: {}",o,pai as u32,*ind
725 );
726 Ok(())
727 })
728 .unwrap();
729 }
730 }
731 } else {
732 let elem_read_write = pair(pair(tt)) % TEST_ELEMES;
733
734 let iterations = pair(pair(elem_read_write)) % OPS_PER_THREAD;
735
736 for _ in 0..iterations {
737 for yy in 0..elem_read_write {
738 let index = tt + (NUM_THREADS * 10_000_000 * yy);
739 let yy = yy as u64;
740 ra_clone.insert(index as u64, &yy, false).unwrap();
741 }
742 for yy in 0..elem_read_write {
744 let index = tt + (NUM_THREADS * 10_000_000 * yy);
746 let ga = ra_clone.remove(index as u64);
747
748 if ga.is_err() && 200 > TEST_ELEMES {
749 println!("index {}", index);
750
751 if ra_clone.data_arc.1.len() == 1 {
752 for inn in 0..(TEST_ELEMES as f32 * (1.0 / 0.6)) as usize {
753 let _ = ra_clone.read(inn as u64, |xx| {
754 let xx = xx.unwrap();
755 print!("| {} ", xx);
756 Ok(())
757 });
758 if inn % 4 == 3 {
759 println!()
760 }
761 }
762 } else {
763 panic!("ra_clone.shards.len()==1else");
764 }
765 }
766 }
767 }
768 }
769 });
770
771 std_handles.push(handle);
772 }
773
774 std_barrier.wait();
775
776 for handle in std_handles {
777 handle.join().unwrap();
778 }
779 }
780 }
781
782 #[test]
783 fn read_write() {
784 for tre in (1..20).step_by(1) {
785 let num_treads: usize = tre;
787 const NUM_ELEMS: usize = 80;
788 const TOTAL_OPS: u64 = 500_000;
789 let ops_threads: u64 = TOTAL_OPS / num_treads as u64;
790 {
791 let mut mambo = Mambo::<u64>::new(16, 10.0).unwrap();
792 let mut std_handles = Vec::new();
793 let std_start = Instant::now();
794 let std_barrier = Arc::new(std::sync::Barrier::new(num_treads + 1));
795
796 for i in 0..NUM_ELEMS {
797 let t = 0;
798 mambo.insert(i as u64, &t, false).unwrap();
799 }
800
801 for _ in 0..num_treads {
802 let barrier_clone = Arc::clone(&std_barrier);
803
804 let mut ra_clone = mambo.arc_clone();
805
806 let handle = thread::spawn(move || {
807 barrier_clone.wait();
808
809 for i in 0..ops_threads {
810 let mut od: u64 = i as u64;
811 for _ in 0..3 {
812 od = od.rotate_left(43).wrapping_add(!i as u64);
813 }
814 let _ = ra_clone
815 .read(od % NUM_ELEMS as u64, |x| {
816 let x = x.unwrap();
817 *x += 1;
818 Ok(())
819 })
820 .unwrap();
821 }
822 });
823
824 std_handles.push(handle);
825 }
826
827 std_barrier.wait();
828
829 for handle in std_handles {
830 handle.join().unwrap();
831 }
832 if true {
833 println!(
834 "{}Mop/S: {:.3}",
835 "only read ",
836 TOTAL_OPS as f64 / std_start.elapsed().as_micros() as f64
837 );
838 } else {
839 println!(
840 "[{}, {:.3}]",
841 tre,
842 TOTAL_OPS as f64 / std_start.elapsed().as_micros() as f64
843 );
844 }
845 }
846 }
847 }
849
850 #[test]
851 fn read_write_ers() {
852 let mambo = Mambo::<u64>::new(1, 10.0).unwrap();
853 let std_start = Instant::now();
854 let elem_in_cycle = 10u64;
855 let cycles = 200_00u64;
856 for xx in (1..cycles).step_by(1) {
857 let mut clom = mambo.arc_clone();
858 for yy in 0..elem_in_cycle {
860 let t = 0;
861 clom.insert((yy * cycles * 100) + xx, &t, false).unwrap();
862 }
863 }
864
865 println!(
866 "{}Mop/S: {:.4}",
867 "only read ",
868 (elem_in_cycle * cycles) as f64 / std_start.elapsed().as_micros() as f64
869 );
870
871 }
873}