1use std::
13{
14 collections::VecDeque,
15 ops::{Deref, DerefMut},
16 ptr::{self, NonNull},
17 sync::atomic::{AtomicU64, Ordering},
18 fmt,
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum RwBufferError
23{
24 TooManyRead,
25 TooManyBase,
26 ReadTryAgianLater,
27 WriteTryAgianLater,
28 OutOfBuffers,
29 DowngradeFailed,
30 InvalidArguments
31}
32
33impl fmt::Display for RwBufferError
34{
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
36 {
37 match self
38 {
39 RwBufferError::TooManyRead =>
40 write!(f, "TooManyRead: read soft limit reached"),
41 RwBufferError::TooManyBase =>
42 write!(f, "TooManyBase: base soft limit reached"),
43 RwBufferError::ReadTryAgianLater =>
44 write!(f, "ReadTryAgianLater: shared access not available, try again later"),
45 RwBufferError::WriteTryAgianLater =>
46 write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
47 RwBufferError::OutOfBuffers =>
48 write!(f, "OutOfBuffers: no more free bufers are left"),
49 RwBufferError::DowngradeFailed =>
50 write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
51 RwBufferError::InvalidArguments =>
52 write!(f, "InvalidArguments: arguments are not valid"),
53 }
54 }
55}
56
57pub type RwBufferRes<T> = Result<T, RwBufferError>;
58
59#[derive(Debug, PartialEq, Eq)]
62pub struct RBuffer(NonNull<RwBufferInner>);
63
64unsafe impl Send for RBuffer {}
65unsafe impl Sync for RBuffer {}
66
67impl RBuffer
68{
69 #[inline]
70 fn new(inner: NonNull<RwBufferInner>) -> Self
71 {
72 return Self(inner);
73 }
74
75 #[cfg(test)]
76 fn get_flags(&self) -> RwBufferFlags
77 {
78 let inner = unsafe{ self.0.as_ref() };
79
80 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
81
82 return flags;
83 }
84
85 pub
87 fn as_slice(&self) -> &[u8]
88 {
89 let inner = unsafe { self.0.as_ref() };
90
91 return inner.buf.as_ref().unwrap().as_slice();
92 }
93
94 pub
110 fn try_inner(mut self) -> Result<Vec<u8>, Self>
111 {
112 let inner = unsafe { self.0.as_ref() };
113
114 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
115
116 if flags.read == 1 && flags.write == false && flags.base == 0
117 {
118 let inner = unsafe { self.0.as_mut() };
119
120 let buf = inner.buf.take().unwrap();
121
122 drop(self);
123
124 return Ok(buf);
125 }
126 else
127 {
128 return Err(self);
129 }
130 }
131
132 fn inner(&self) -> &RwBufferInner
133 {
134 return unsafe { self.0.as_ref() };
135 }
136}
137
138impl Deref for RBuffer
139{
140 type Target = Vec<u8>;
141
142 fn deref(&self) -> &Vec<u8>
143 {
144 let inner = self.inner();
145
146 return inner.buf.as_ref().unwrap();
147 }
148}
149
150impl Clone for RBuffer
151{
152 fn clone(&self) -> Self
163 {
164 let inner = self.inner();
165
166 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
167
168 if flags.read().unwrap() == false
169 {
170 panic!("too many read references for RBuffer");
171 }
172
173 inner.flags.store(flags.into(), Ordering::Release);
174
175 return Self(self.0);
176 }
177}
178
179impl Drop for RBuffer
180{
181 fn drop(&mut self)
184 {
185 let inner = self.inner();
186
187 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
188
189 flags.unread();
190
191 if flags.read == 0 && flags.base == 0
193 {
194 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
196
197 return;
198 }
199
200 inner.flags.store(flags.into(), Ordering::Release);
201
202 return;
203 }
204}
205
206#[derive(Debug, PartialEq, Eq)]
210pub struct WBuffer
211{
212 buf: NonNull<RwBufferInner>,
214
215 downgraded: bool
217}
218
219unsafe impl Send for WBuffer{}
220
221impl WBuffer
222{
223 #[inline]
224 fn new(inner: NonNull<RwBufferInner>) -> Self
225 {
226 return Self{ buf: inner, downgraded: false };
227 }
228
229 pub
245 fn downgrade(mut self) -> RwBufferRes<RBuffer>
246 {
247 let inner = unsafe { self.buf.as_ref() };
248
249 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
250
251 let res = flags.downgrade();
252
253 inner.flags.store(flags.into(), Ordering::Release);
254
255 if res == true
256 {
257 self.downgraded = true;
258
259 return Ok(RBuffer::new(self.buf.clone()));
260 }
261 else
262 {
263 return Err(RwBufferError::DowngradeFailed);
264 }
265 }
266
267 pub
268 fn as_slice(&self) -> &[u8]
269 {
270 let inner = unsafe { self.buf.as_ref() };
271
272 return inner.buf.as_ref().unwrap()
273 }
274}
275
276impl Deref for WBuffer
277{
278 type Target = Vec<u8>;
279
280 fn deref(&self) -> &Vec<u8>
281 {
282 let inner = unsafe { self.buf.as_ref() };
283
284 return inner.buf.as_ref().unwrap();
285 }
286}
287
288impl DerefMut for WBuffer
289{
290 fn deref_mut(&mut self) -> &mut Vec<u8>
291 {
292 let inner = unsafe { self.buf.as_mut() };
293
294 return inner.buf.as_mut().unwrap();
295 }
296}
297
298impl Drop for WBuffer
299{
300 fn drop(&mut self)
302 {
303 if self.downgraded == true
304 {
305 return;
306 }
307
308 let inner = unsafe { self.buf.as_ref() };
309
310 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
311
312 flags.unwrite();
313
314 if flags.read == 0 && flags.base == 0
315 {
316 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
318
319 return;
320 }
321
322 inner.flags.store(flags.into(), Ordering::Release);
323
324 return;
325 }
326}
327
328#[repr(align(8))]
331#[derive(Debug, PartialEq, Eq)]
332struct RwBufferFlags
333{
334 read: u32, write: bool, base: u16, unused0: u8 }
348
349impl From<u64> for RwBufferFlags
350{
351 fn from(value: u64) -> Self
352 {
353 return unsafe { std::mem::transmute(value) };
354 }
355}
356
357impl From<RwBufferFlags> for u64
358{
359 fn from(value: RwBufferFlags) -> Self
360 {
361 return unsafe { std::mem::transmute(value) };
362 }
363}
364
365impl Default for RwBufferFlags
366{
367 fn default() -> Self
368 {
369 return
370 Self
371 {
372 read: 0,
373 write: false,
374 base: 1,
375 unused0: 0,
376 };
377 }
378}
379
380impl RwBufferFlags
381{
382 pub const MAX_READ_REFS: u32 = u32::MAX - 2;
384
385 pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
387
388 #[inline]
389 fn base(&mut self) -> bool
390 {
391 self.base += 1;
392
393 return self.base <= Self::MAX_BASE_REFS;
394 }
395
396 #[inline]
397 fn unbase(&mut self) -> bool
398 {
399 self.base -= 1;
400
401 return self.base != 0;
402 }
403
404 #[inline]
405 fn unread(&mut self)
406 {
407 self.read -= 1;
408 }
409
410 #[inline]
411 fn downgrade(&mut self) -> bool
412 {
413 if self.write == true
414 {
415 self.write = false;
416 self.read += 1;
417
418 return true;
419 }
420 else
421 {
422 return false;
423 }
424 }
425
426 #[inline]
427 fn read(&mut self) -> RwBufferRes<bool>
428 {
429 if self.write == false
430 {
431 self.read += 1;
432
433 return Ok(self.read <= Self::MAX_READ_REFS);
434 }
435
436 return Err(RwBufferError::ReadTryAgianLater);
437 }
438
439 #[inline]
440 fn write(&mut self) -> RwBufferRes<()>
441 {
442 if self.read == 0
443 {
444 self.write = true;
445
446 return Ok(());
447 }
448 else
449 {
450 return Err(RwBufferError::WriteTryAgianLater);
451 }
452 }
453
454 #[inline]
455 fn unwrite(&mut self)
456 {
457 self.write = false;
458 }
459}
460
461#[derive(Debug)]
462pub struct RwBufferInner
463{
464 flags: AtomicU64,
466
467 buf: Option<Vec<u8>>,
469}
470
471impl RwBufferInner
472{
473 fn new(buf_size: usize) -> Self
474 {
475 return
476 Self
477 {
478 flags: AtomicU64::new(RwBufferFlags::default().into()),
479 buf: Some(vec![0_u8; buf_size])
480 };
481 }
482}
483
484#[derive(Debug, PartialEq, Eq)]
491pub struct RwBuffer(NonNull<RwBufferInner>);
492
493unsafe impl Send for RwBuffer {}
494unsafe impl Sync for RwBuffer {}
495
496impl RwBuffer
497{
498 #[inline]
499 fn new(buf_size: usize) -> Self
500 {
501 let status = Box::new(RwBufferInner::new(buf_size));
502
503 return Self(Box::leak(status).into());
504 }
505
506 #[inline]
507 fn inner(&self) -> &RwBufferInner
508 {
509 return unsafe { self.0.as_ref() };
510 }
511
512 #[inline]
526 pub
527 fn is_free(&self) -> bool
528 {
529 let inner = self.inner();
530
531 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
532
533 return flags.write == false && flags.read == 0 && flags.base == 1;
534 }
535
536 #[inline]
550 pub
551 fn accure_if_free(&self) -> bool
552 {
553 let inner = self.inner();
554
555 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
556
557 let res =
558 if flags.write == false && flags.read == 0 && flags.base == 1
559 {
560 let _ = flags.base();
561
562 true
563 }
564 else
565 {
566 false
567 };
568
569 inner.flags.store(flags.into(), Ordering::Release);
570
571 return res;
572 }
573
574 pub
585 fn write(&self) -> RwBufferRes<WBuffer>
586 {
587 let inner = self.inner();
588
589 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
590
591 let res = flags.write();
592
593 inner.flags.store(flags.into(), Ordering::Release);
594
595 res?;
596
597 return Ok(WBuffer::new(self.0.clone()));
598 }
599
600 pub
616 fn read(&self) -> RwBufferRes<RBuffer>
617 {
618 let inner = self.inner();
619
620 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
621
622 let res = flags.read();
623
624 inner.flags.store(flags.into(), Ordering::Release);
625
626 if res? == false
627 {
628 return Err(RwBufferError::TooManyRead);
629 }
630
631 return Ok(RBuffer::new(self.0.clone()));
632 }
633
634 #[cfg(test)]
635 fn get_flags(&self) -> RwBufferFlags
636 {
637 let inner = self.inner();
638
639 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
640
641 return flags;
642 }
643}
644
645impl Clone for RwBuffer
646{
647 fn clone(&self) -> Self
651 {
652 let inner = self.inner();
653
654 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
655
656 if flags.base() == false
657 {
658 panic!("too many base references for RBuffer");
659 }
660
661 inner.flags.store(flags.into(), Ordering::Release);
662
663 return Self(self.0.clone());
664 }
665}
666
667impl Drop for RwBuffer
668{
669 fn drop(&mut self)
674 {
675 let inner = self.inner();
676
677 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
678
679 let unbased = flags.unbase();
680
681 if flags.read == 0 && flags.write == false && unbased == false
682 {
683 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
685
686 return;
687 }
688
689 inner.flags.store(flags.into(), Ordering::Release);
690 }
691}
692
693#[derive(Debug)]
697pub struct RwBuffers
698{
699 buf_len: usize,
701
702 bufs_cnt_lim: usize,
704
705 buffs: VecDeque<RwBuffer>
707
708}
709
710impl RwBuffers
711{
712 pub
736 fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
737 {
738 if pre_init_cnt > bufs_cnt_lim
739 {
740 return Err(RwBufferError::InvalidArguments);
741 }
742 else if buf_len == 0
743 {
744 return Err(RwBufferError::InvalidArguments);
745 }
746
747 let buffs: VecDeque<RwBuffer> =
748 if pre_init_cnt > 0
749 {
750 let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
751
752 for _ in 0..pre_init_cnt
753 {
754 buffs.push_back(RwBuffer::new(buf_len));
755 }
756
757 buffs
758 }
759 else
760 {
761 VecDeque::with_capacity(bufs_cnt_lim)
762 };
763
764 return Ok(
765 Self
766 {
767 buf_len: buf_len,
768 bufs_cnt_lim: bufs_cnt_lim,
769 buffs: buffs,
770 }
771 )
772 }
773
774 pub
787 fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
788 {
789 let mut buffs = VecDeque::with_capacity(pre_init_cnt);
790
791 for _ in 0..pre_init_cnt
792 {
793 buffs.push_back(RwBuffer::new(buf_len));
794 }
795
796 return
797 Self
798 {
799 buf_len: buf_len,
800 bufs_cnt_lim: 0,
801 buffs: buffs,
802 };
803 }
804
805 pub
817 fn allocate(&mut self) -> RwBufferRes<RwBuffer>
818 {
819 for buf in self.buffs.iter()
821 {
822 if buf.is_free() == true
823 {
824 return Ok(buf.clone());
825 }
826 }
827
828 if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
829 {
830 let buf = RwBuffer::new(self.buf_len);
831 let c_buf = buf.clone();
832
833 self.buffs.push_back(buf);
834
835 return Ok(c_buf);
836 }
837
838 return Err(RwBufferError::OutOfBuffers);
839 }
840
841 pub
849 fn allocate_in_place(&mut self) -> RwBuffer
850 {
851 for i in 0..self.buffs.len()
852 {
853 if self.buffs[i].accure_if_free() == true
854 {
855 return self.buffs.remove(i).unwrap();
856 }
857 }
858
859 let buf = RwBuffer::new(self.buf_len);
860
861 return buf;
862 }
863
864 pub
877 fn compact(&mut self, mut cnt: usize) -> usize
878 {
879 let p_cnt = cnt;
880
881 self
882 .buffs
883 .retain(
884 |buf|
885 {
886 if buf.is_free() == true
887 {
888 cnt -= 1;
889
890 return false;
891 }
892
893 return true;
894 }
895 );
896
897 return p_cnt - cnt;
898 }
899
900 #[cfg(test)]
901 fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags>
902 {
903 return Some(self.buffs.get(index)?.get_flags());
904 }
905}
906
907
908#[cfg(test)]
909mod tests
910{
911 use std::time::{Duration, Instant};
912
913 use super::*;
914
915 #[test]
916 fn simple_test()
917 {
918 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
919
920 let buf0_res = bufs.allocate();
921 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
922
923 let buf0 = buf0_res.unwrap();
924
925 let buf0_w = buf0.write();
926 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
927 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
928 drop(buf0_w);
929
930 let buf0_r = buf0.read();
931 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
932 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
933
934 let buf0_1 = buf0.clone();
935 assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
936
937 let flags0 = buf0.get_flags();
938 let flags0_1 = buf0_1.get_flags();
939
940 assert_eq!(flags0, flags0_1);
941 assert_eq!(flags0.base, 3);
942 assert_eq!(flags0.read, 1);
943 assert_eq!(flags0.write, false);
944 }
945
946 #[test]
947 fn simple_test_dopped_in_place()
948 {
949 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
950
951 let buf0_res = bufs.allocate();
952 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
953
954 let buf0 = buf0_res.unwrap();
955
956 println!("{:?}", buf0.get_flags());
957
958 let buf0_w = buf0.write();
959 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
960 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
961
962 drop(buf0);
963
964 let buf0_flags = bufs.get_flags_by_index(0);
965 assert_eq!(buf0_flags.is_some(), true, "no flags");
966 let buf0_flags = buf0_flags.unwrap();
967
968 println!("{:?}", buf0_flags);
969
970 assert_eq!(buf0_flags.base, 1);
971 assert_eq!(buf0_flags.read, 0);
972 assert_eq!(buf0_flags.write, true);
973
974 drop(buf0_w.unwrap());
975
976 let buf0_flags = bufs.get_flags_by_index(0);
977 assert_eq!(buf0_flags.is_some(), true, "no flags");
978 let buf0_flags = buf0_flags.unwrap();
979
980 println!("{:?}", buf0_flags);
981
982 assert_eq!(buf0_flags.base, 1);
983 assert_eq!(buf0_flags.read, 0);
984 assert_eq!(buf0_flags.write, false);
985
986 }
987
988 #[test]
989 fn simple_test_dropped_in_place_downgrade()
990 {
991 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
992
993 let buf0_res = bufs.allocate();
994 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
995
996 let buf0 = buf0_res.unwrap();
997
998 println!("{:?}", buf0.get_flags());
999
1000 let buf0_w = buf0.write();
1001 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1002 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1003
1004 drop(buf0);
1005
1006 let buf0_rd = buf0_w.unwrap().downgrade();
1007 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1008
1009 let buf0_flags = bufs.get_flags_by_index(0);
1010 assert_eq!(buf0_flags.is_some(), true, "no flags");
1011 let buf0_flags = buf0_flags.unwrap();
1012
1013 println!("{:?}", buf0_flags);
1014
1015 assert_eq!(buf0_flags.base, 1);
1016 assert_eq!(buf0_flags.read, 1);
1017 assert_eq!(buf0_flags.write, false);
1018
1019 }
1020
1021 #[test]
1022 fn simple_test_drop_in_place_downgrade()
1023 {
1024 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1025
1026 let buf0 = bufs.allocate_in_place();
1027
1028 println!("{:?}", buf0.get_flags());
1029
1030 let buf0_w = buf0.write();
1031 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1032 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1033
1034 drop(buf0);
1035
1036 let buf0_rd = buf0_w.unwrap().downgrade();
1037 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1038
1039 let buf0_flags = bufs.get_flags_by_index(0);
1040 assert_eq!(buf0_flags.is_some(), false, "flags");
1041
1042 let buf0_rd = buf0_rd.unwrap();
1043 let buf0_flags = buf0_rd.get_flags();
1044
1045 println!("{:?}", buf0_flags);
1046
1047 assert_eq!(buf0_flags.base, 0);
1048 assert_eq!(buf0_flags.read, 1);
1049 assert_eq!(buf0_flags.write, false);
1050 }
1051
1052 #[test]
1053 fn timing_test()
1054 {
1055 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1056
1057 for _ in 0..10
1058 {
1059 let inst = Instant::now();
1060 let buf0_res = bufs.allocate_in_place();
1061 let end = inst.elapsed();
1062
1063 println!("alloc: {:?}", end);
1064 drop(buf0_res);
1065 }
1066
1067 let buf0_res = bufs.allocate();
1068 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1069
1070 let buf0 = buf0_res.unwrap();
1071
1072 for _ in 0..10
1073 {
1074 let inst = Instant::now();
1075 let buf0_w = buf0.write();
1076 let end = inst.elapsed();
1077
1078 println!("write: {:?}", end);
1079
1080 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1081 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1082 drop(buf0_w);
1083 }
1084
1085 for _ in 0..10
1086 {
1087 let inst = Instant::now();
1088 let buf0_r = buf0.read();
1089 let end = inst.elapsed();
1090
1091 println!("read: {:?}", end);
1092
1093 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1094 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1095 drop(buf0_r);
1096 }
1097 }
1098
1099 #[test]
1100 fn simple_test_mth()
1101 {
1102 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1103
1104 let buf0 = bufs.allocate().unwrap();
1105
1106 let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1107
1108 let join1=
1109 std::thread::spawn(move ||
1110 {
1111 println!("{:?}", buf0_rd);
1112
1113 std::thread::sleep(Duration::from_secs(2));
1114
1115 return;
1116 }
1117 );
1118
1119 let buf1_rd = buf0.read().unwrap();
1120
1121 let join2=
1122 std::thread::spawn(move ||
1123 {
1124 println!("{:?}", buf1_rd);
1125
1126 std::thread::sleep(Duration::from_secs(2));
1127
1128 return;
1129 }
1130 );
1131
1132 let flags = buf0.get_flags();
1133
1134 assert_eq!(flags.base, 2);
1135 assert_eq!(flags.read, 2);
1136 assert_eq!(flags.write, false);
1137
1138 let _ = join1.join();
1139 let _ = join2.join();
1140
1141 let flags = buf0.get_flags();
1142
1143 assert_eq!(flags.base, 2);
1144 assert_eq!(flags.read, 0);
1145 assert_eq!(flags.write, false);
1146 }
1147
1148 #[test]
1149 fn test_try_into_read()
1150 {
1151 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1152
1153 let buf0 = bufs.allocate_in_place();
1154
1155 println!("{:?}", buf0.get_flags());
1156
1157 let buf0_w = buf0.write();
1158 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1159 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1160
1161 drop(buf0);
1162
1163 let buf0_rd = buf0_w.unwrap().downgrade();
1164 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1165
1166 let buf0_flags = bufs.get_flags_by_index(0);
1167 assert_eq!(buf0_flags.is_some(), false, "flags");
1168
1169 let buf0_rd = buf0_rd.unwrap();
1170 let buf0_flags = buf0_rd.get_flags();
1171
1172 println!("{:?}", buf0_flags);
1173
1174 assert_eq!(buf0_flags.base, 0);
1175 assert_eq!(buf0_flags.read, 1);
1176 assert_eq!(buf0_flags.write, false);
1177
1178 let inst = Instant::now();
1179 let ve = buf0_rd.try_inner();
1180 let end = inst.elapsed();
1181
1182 println!("try inner: {:?}", end);
1183 assert_eq!(ve.is_ok(), true);
1184
1185
1186 }
1187
1188 #[tokio::test]
1189 async fn test_multithreading()
1190 {
1191
1192 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1193
1194 let buf0 = bufs.allocate().unwrap();
1195
1196 let mut buf0_write = buf0.write().unwrap();
1197
1198 buf0_write.as_mut_slice()[0] = 5;
1199 buf0_write.as_mut_slice()[1] = 4;
1200
1201 println!("{}", buf0_write[0]);
1202
1203 let buf0_r = buf0_write.downgrade().unwrap();
1204
1205 let join1=
1206 tokio::task::spawn(async move
1207 {
1208 println!("thread[1]:{}", buf0_r[0]);
1209
1210 tokio::time::sleep(Duration::from_millis(200)).await;
1211
1212 return;
1213 }
1214 );
1215
1216 let buf0_r = buf0.read().unwrap();
1217
1218 drop(buf0);
1220
1221 let join2=
1222 tokio::task::spawn(async move
1223 {
1224 println!("thread[2]: {}", buf0_r[0]);
1225 println!("thread[2]: {}", buf0_r[1]);
1226
1227 tokio::time::sleep(Duration::from_millis(200)).await;
1228
1229 return;
1230 }
1231 );
1232
1233 let _ = join1.await;
1234 let _ = join2.await;
1235
1236 return;
1237 }
1238}