1#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31#[cfg(not(feature = "std"))]
32use core::
33{
34 fmt,
35 mem,
36 ops::{Deref, DerefMut},
37 ptr::{self, NonNull},
38 sync::atomic::{AtomicU64, Ordering}
39};
40
41#[cfg(not(feature = "std"))]
42use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
43
44#[cfg(not(feature = "std"))]
45use alloc::vec;
46
47#[cfg(feature = "std")]
48use std::
49{
50 collections::VecDeque,
51 ops::{Deref, DerefMut},
52 ptr::{self, NonNull},
53 sync::atomic::{AtomicU64, Ordering},
54 fmt,
55 mem
56};
57
58
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum RwBufferError
62{
63 TooManyRead,
64 TooManyBase,
65 ReadTryAgianLater,
66 WriteTryAgianLater,
67 OutOfBuffers,
68 DowngradeFailed,
69 InvalidArguments
70}
71
72impl fmt::Display for RwBufferError
73{
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
75 {
76 match self
77 {
78 RwBufferError::TooManyRead =>
79 write!(f, "TooManyRead: read soft limit reached"),
80 RwBufferError::TooManyBase =>
81 write!(f, "TooManyBase: base soft limit reached"),
82 RwBufferError::ReadTryAgianLater =>
83 write!(f, "ReadTryAgianLater: shared access not available, try again later"),
84 RwBufferError::WriteTryAgianLater =>
85 write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
86 RwBufferError::OutOfBuffers =>
87 write!(f, "OutOfBuffers: no more free bufers are left"),
88 RwBufferError::DowngradeFailed =>
89 write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
90 RwBufferError::InvalidArguments =>
91 write!(f, "InvalidArguments: arguments are not valid"),
92 }
93 }
94}
95
96pub type RwBufferRes<T> = Result<T, RwBufferError>;
97
98#[derive(Debug, PartialEq, Eq)]
101pub struct RBuffer(NonNull<RwBufferInner>);
102
103unsafe impl Send for RBuffer {}
104unsafe impl Sync for RBuffer {}
105
106impl RBuffer
107{
108 #[inline]
109 fn new(inner: NonNull<RwBufferInner>) -> Self
110 {
111 return Self(inner);
112 }
113
114 #[cfg(test)]
115 fn get_flags(&self) -> RwBufferFlags
116 {
117 use core::sync::atomic::Ordering;
118
119 let inner = unsafe{ self.0.as_ref() };
120
121 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
122
123 return flags;
124 }
125
126 pub
128 fn as_slice(&self) -> &[u8]
129 {
130 let inner = unsafe { self.0.as_ref() };
131
132 return inner.buf.as_ref().unwrap().as_slice();
133 }
134
135 pub
151 fn try_inner(mut self) -> Result<Vec<u8>, Self>
152 {
153 let inner = unsafe { self.0.as_ref() };
154
155 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
156
157 if flags.read == 1 && flags.write == false && flags.base == 0
158 {
159 let inner = unsafe { self.0.as_mut() };
160
161 let buf = inner.buf.take().unwrap();
162
163 drop(self);
164
165 return Ok(buf);
166 }
167 else
168 {
169 return Err(self);
170 }
171 }
172
173 fn inner(&self) -> &RwBufferInner
174 {
175 return unsafe { self.0.as_ref() };
176 }
177}
178
179impl Deref for RBuffer
180{
181 type Target = Vec<u8>;
182
183 fn deref(&self) -> &Vec<u8>
184 {
185 let inner = self.inner();
186
187 return inner.buf.as_ref().unwrap();
188 }
189}
190
191impl Clone for RBuffer
192{
193 fn clone(&self) -> Self
204 {
205 let inner = self.inner();
206
207 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
208
209 if flags.read().unwrap() == false
210 {
211 panic!("too many read references for RBuffer");
212 }
213
214 inner.flags.store(flags.into(), Ordering::Release);
215
216 return Self(self.0);
217 }
218}
219
220impl Drop for RBuffer
221{
222 fn drop(&mut self)
225 {
226 let inner = self.inner();
227
228 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
229
230 flags.unread();
231
232 if flags.read == 0 && flags.base == 0
234 {
235 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
237
238 return;
239 }
240
241 inner.flags.store(flags.into(), Ordering::Release);
242
243 return;
244 }
245}
246
247#[derive(Debug, PartialEq, Eq)]
251pub struct WBuffer
252{
253 buf: NonNull<RwBufferInner>,
255
256 downgraded: bool
258}
259
260unsafe impl Send for WBuffer{}
261
262impl WBuffer
263{
264 #[inline]
265 fn new(inner: NonNull<RwBufferInner>) -> Self
266 {
267 return Self{ buf: inner, downgraded: false };
268 }
269
270 pub
286 fn downgrade(mut self) -> RwBufferRes<RBuffer>
287 {
288 let inner = unsafe { self.buf.as_ref() };
289
290 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
291
292 let res = flags.downgrade();
293
294 inner.flags.store(flags.into(), Ordering::Release);
295
296 if res == true
297 {
298 self.downgraded = true;
299
300 return Ok(RBuffer::new(self.buf.clone()));
301 }
302 else
303 {
304 return Err(RwBufferError::DowngradeFailed);
305 }
306 }
307
308 pub
309 fn as_slice(&self) -> &[u8]
310 {
311 let inner = unsafe { self.buf.as_ref() };
312
313 return inner.buf.as_ref().unwrap()
314 }
315}
316
317impl Deref for WBuffer
318{
319 type Target = Vec<u8>;
320
321 fn deref(&self) -> &Vec<u8>
322 {
323 let inner = unsafe { self.buf.as_ref() };
324
325 return inner.buf.as_ref().unwrap();
326 }
327}
328
329impl DerefMut for WBuffer
330{
331 fn deref_mut(&mut self) -> &mut Vec<u8>
332 {
333 let inner = unsafe { self.buf.as_mut() };
334
335 return inner.buf.as_mut().unwrap();
336 }
337}
338
339impl Drop for WBuffer
340{
341 fn drop(&mut self)
343 {
344 if self.downgraded == true
345 {
346 return;
347 }
348
349 let inner = unsafe { self.buf.as_ref() };
350
351 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
352
353 flags.unwrite();
354
355 if flags.read == 0 && flags.base == 0
356 {
357 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
359
360 return;
361 }
362
363 inner.flags.store(flags.into(), Ordering::Release);
364
365 return;
366 }
367}
368
369#[repr(align(8))]
372#[derive(Debug, PartialEq, Eq)]
373struct RwBufferFlags
374{
375 read: u32, write: bool, base: u16, unused0: u8 }
389
390impl From<u64> for RwBufferFlags
391{
392 fn from(value: u64) -> Self
393 {
394 return unsafe { mem::transmute(value) };
395 }
396}
397
398impl From<RwBufferFlags> for u64
399{
400 fn from(value: RwBufferFlags) -> Self
401 {
402 return unsafe { mem::transmute(value) };
403 }
404}
405
406impl Default for RwBufferFlags
407{
408 fn default() -> Self
409 {
410 return
411 Self
412 {
413 read: 0,
414 write: false,
415 base: 1,
416 unused0: 0,
417 };
418 }
419}
420
421impl RwBufferFlags
422{
423 pub const MAX_READ_REFS: u32 = u32::MAX - 2;
425
426 pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
428
429 #[inline]
430 fn base(&mut self) -> bool
431 {
432 self.base += 1;
433
434 return self.base <= Self::MAX_BASE_REFS;
435 }
436
437 #[inline]
438 fn unbase(&mut self) -> bool
439 {
440 self.base -= 1;
441
442 return self.base != 0;
443 }
444
445 #[inline]
446 fn unread(&mut self)
447 {
448 self.read -= 1;
449 }
450
451 #[inline]
452 fn downgrade(&mut self) -> bool
453 {
454 if self.write == true
455 {
456 self.write = false;
457 self.read += 1;
458
459 return true;
460 }
461 else
462 {
463 return false;
464 }
465 }
466
467 #[inline]
468 fn read(&mut self) -> RwBufferRes<bool>
469 {
470 if self.write == false
471 {
472 self.read += 1;
473
474 return Ok(self.read <= Self::MAX_READ_REFS);
475 }
476
477 return Err(RwBufferError::ReadTryAgianLater);
478 }
479
480 #[inline]
481 fn write(&mut self) -> RwBufferRes<()>
482 {
483 if self.read == 0
484 {
485 self.write = true;
486
487 return Ok(());
488 }
489 else
490 {
491 return Err(RwBufferError::WriteTryAgianLater);
492 }
493 }
494
495 #[inline]
496 fn unwrite(&mut self)
497 {
498 self.write = false;
499 }
500}
501
502#[derive(Debug)]
503pub struct RwBufferInner
504{
505 flags: AtomicU64,
507
508 buf: Option<Vec<u8>>,
510}
511
512impl RwBufferInner
513{
514 fn new(buf_size: usize) -> Self
515 {
516 return
517 Self
518 {
519 flags: AtomicU64::new(RwBufferFlags::default().into()),
520 buf: Some(vec![0_u8; buf_size])
521 };
522 }
523}
524
525#[derive(Debug, PartialEq, Eq)]
532pub struct RwBuffer(NonNull<RwBufferInner>);
533
534unsafe impl Send for RwBuffer {}
535unsafe impl Sync for RwBuffer {}
536
537impl RwBuffer
538{
539 #[inline]
540 fn new(buf_size: usize) -> Self
541 {
542 let status = Box::new(RwBufferInner::new(buf_size));
543
544 return Self(Box::leak(status).into());
545 }
546
547 #[inline]
548 fn inner(&self) -> &RwBufferInner
549 {
550 return unsafe { self.0.as_ref() };
551 }
552
553 #[inline]
567 pub
568 fn is_free(&self) -> bool
569 {
570 let inner = self.inner();
571
572 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
573
574 return flags.write == false && flags.read == 0 && flags.base == 1;
575 }
576
577 #[inline]
591 pub
592 fn accure_if_free(&self) -> bool
593 {
594 let inner = self.inner();
595
596 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
597
598 let res = flags.write == false && flags.read == 0 && flags.base == 1;
599 inner.flags.store(flags.into(), Ordering::Release);
614
615 return res;
616 }
617
618 pub
629 fn write(&self) -> RwBufferRes<WBuffer>
630 {
631 let inner = self.inner();
632
633 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
634
635 let res = flags.write();
636
637 inner.flags.store(flags.into(), Ordering::Release);
638
639 res?;
640
641 return Ok(WBuffer::new(self.0.clone()));
642 }
643
644 pub
660 fn read(&self) -> RwBufferRes<RBuffer>
661 {
662 let inner = self.inner();
663
664 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
665
666 let res = flags.read();
667
668 inner.flags.store(flags.into(), Ordering::Release);
669
670 if res? == false
671 {
672 return Err(RwBufferError::TooManyRead);
673 }
674
675 return Ok(RBuffer::new(self.0.clone()));
676 }
677
678 #[cfg(test)]
679 fn get_flags(&self) -> RwBufferFlags
680 {
681 let inner = self.inner();
682
683 let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
684
685 return flags;
686 }
687}
688
689impl Clone for RwBuffer
690{
691 fn clone(&self) -> Self
695 {
696 let inner = self.inner();
697
698 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
699
700 if flags.base() == false
701 {
702 panic!("too many base references for RBuffer");
703 }
704
705 inner.flags.store(flags.into(), Ordering::Release);
706
707 return Self(self.0.clone());
708 }
709}
710
711impl Drop for RwBuffer
712{
713 fn drop(&mut self)
718 {
719 let inner = self.inner();
720
721 let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
722
723 let unbased = flags.unbase();
724
725 if flags.read == 0 && flags.write == false && unbased == false
726 {
727 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
729
730 return;
731 }
732
733 inner.flags.store(flags.into(), Ordering::Release);
734 }
735}
736
737#[derive(Debug)]
741pub struct RwBuffers
742{
743 buf_len: usize,
745
746 bufs_cnt_lim: usize,
748
749 buffs: VecDeque<RwBuffer>
751
752}
753
754impl RwBuffers
755{
756 pub
780 fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
781 {
782 if pre_init_cnt > bufs_cnt_lim
783 {
784 return Err(RwBufferError::InvalidArguments);
785 }
786 else if buf_len == 0
787 {
788 return Err(RwBufferError::InvalidArguments);
789 }
790
791 let buffs: VecDeque<RwBuffer> =
792 if pre_init_cnt > 0
793 {
794 let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
795
796 for _ in 0..pre_init_cnt
797 {
798 buffs.push_back(RwBuffer::new(buf_len));
799 }
800
801 buffs
802 }
803 else
804 {
805 VecDeque::with_capacity(bufs_cnt_lim)
806 };
807
808 return Ok(
809 Self
810 {
811 buf_len: buf_len,
812 bufs_cnt_lim: bufs_cnt_lim,
813 buffs: buffs,
814 }
815 )
816 }
817
818 pub
831 fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
832 {
833 let mut buffs = VecDeque::with_capacity(pre_init_cnt);
834
835 for _ in 0..pre_init_cnt
836 {
837 buffs.push_back(RwBuffer::new(buf_len));
838 }
839
840 return
841 Self
842 {
843 buf_len: buf_len,
844 bufs_cnt_lim: 0,
845 buffs: buffs,
846 };
847 }
848
849 pub
861 fn allocate(&mut self) -> RwBufferRes<RwBuffer>
862 {
863 for buf in self.buffs.iter()
865 {
866 if buf.is_free() == true
867 {
868 return Ok(buf.clone());
869 }
870 }
871
872 if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
873 {
874 let buf = RwBuffer::new(self.buf_len);
875 let c_buf = buf.clone();
876
877 self.buffs.push_back(buf);
878
879 return Ok(c_buf);
880 }
881
882 return Err(RwBufferError::OutOfBuffers);
883 }
884
885 pub
893 fn allocate_in_place(&mut self) -> RwBuffer
894 {
895 for i in 0..self.buffs.len()
896 {
897 if self.buffs[i].accure_if_free() == true
898 {
899 return self.buffs.remove(i).unwrap();
900 }
901 }
902
903 let buf = RwBuffer::new(self.buf_len);
904
905 return buf;
906 }
907
908 pub
921 fn compact(&mut self, mut cnt: usize) -> usize
922 {
923 let p_cnt = cnt;
924
925 self
926 .buffs
927 .retain(
928 |buf|
929 {
930 if buf.is_free() == true
931 {
932 cnt -= 1;
933
934 return false;
935 }
936
937 return true;
938 }
939 );
940
941 return p_cnt - cnt;
942 }
943
944 #[cfg(test)]
945 fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags>
946 {
947 return Some(self.buffs.get(index)?.get_flags());
948 }
949}
950
951#[cfg(feature = "std")]
952#[cfg(test)]
953mod tests
954{
955 use std::time::{Duration, Instant};
956
957 use super::*;
958
959 #[test]
960 fn simple_test()
961 {
962 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
963
964 let buf0_res = bufs.allocate();
965 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
966
967 let buf0 = buf0_res.unwrap();
968
969 let buf0_w = buf0.write();
970 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
971 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
972 drop(buf0_w);
973
974 let buf0_r = buf0.read();
975 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
976 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
977
978 let buf0_1 = buf0.clone();
979 assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
980
981 let flags0 = buf0.get_flags();
982 let flags0_1 = buf0_1.get_flags();
983
984 assert_eq!(flags0, flags0_1);
985 assert_eq!(flags0.base, 3);
986 assert_eq!(flags0.read, 1);
987 assert_eq!(flags0.write, false);
988 }
989
990 #[test]
991 fn simple_test_dopped_in_place()
992 {
993 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
994
995 let buf0_res = bufs.allocate();
996 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
997
998 let buf0 = buf0_res.unwrap();
999
1000 println!("{:?}", buf0.get_flags());
1001
1002 let buf0_w = buf0.write();
1003 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1004 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1005
1006 drop(buf0);
1007
1008 let buf0_flags = bufs.get_flags_by_index(0);
1009 assert_eq!(buf0_flags.is_some(), true, "no flags");
1010 let buf0_flags = buf0_flags.unwrap();
1011
1012 println!("{:?}", buf0_flags);
1013
1014 assert_eq!(buf0_flags.base, 1);
1015 assert_eq!(buf0_flags.read, 0);
1016 assert_eq!(buf0_flags.write, true);
1017
1018 drop(buf0_w.unwrap());
1019
1020 let buf0_flags = bufs.get_flags_by_index(0);
1021 assert_eq!(buf0_flags.is_some(), true, "no flags");
1022 let buf0_flags = buf0_flags.unwrap();
1023
1024 println!("{:?}", buf0_flags);
1025
1026 assert_eq!(buf0_flags.base, 1);
1027 assert_eq!(buf0_flags.read, 0);
1028 assert_eq!(buf0_flags.write, false);
1029
1030 }
1031
1032 #[test]
1033 fn simple_test_dropped_in_place_downgrade()
1034 {
1035 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1036
1037 let buf0_res = bufs.allocate();
1038 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1039
1040 let buf0 = buf0_res.unwrap();
1041
1042 println!("{:?}", buf0.get_flags());
1043
1044 let buf0_w = buf0.write();
1045 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1046 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1047
1048 drop(buf0);
1049
1050 let buf0_rd = buf0_w.unwrap().downgrade();
1051 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1052
1053 let buf0_flags = bufs.get_flags_by_index(0);
1054 assert_eq!(buf0_flags.is_some(), true, "no flags");
1055 let buf0_flags = buf0_flags.unwrap();
1056
1057 println!("{:?}", buf0_flags);
1058
1059 assert_eq!(buf0_flags.base, 1);
1060 assert_eq!(buf0_flags.read, 1);
1061 assert_eq!(buf0_flags.write, false);
1062
1063 }
1064
1065 #[test]
1066 fn simple_test_drop_in_place_downgrade()
1067 {
1068 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1069
1070 let buf0_w =
1071 {
1072 let buf0 = bufs.allocate_in_place();
1073
1074 println!("1: {:?}", buf0.get_flags());
1075
1076 let buf0_w = buf0.write();
1077 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1078 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1079
1080 drop(buf0);
1081
1082 buf0_w
1083 };
1084
1085 let buf0_rd = buf0_w.unwrap().downgrade();
1086 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1087
1088 let buf0_flags = bufs.get_flags_by_index(0);
1089 assert_eq!(buf0_flags.is_some(), false, "flags");
1090
1091 let buf0_rd = buf0_rd.unwrap();
1092 let buf0_flags = buf0_rd.get_flags();
1093
1094 println!("2: {:?}", buf0_flags);
1095
1096 assert_eq!(buf0_flags.base, 0);
1097 assert_eq!(buf0_flags.read, 1);
1098 assert_eq!(buf0_flags.write, false);
1099 }
1100
1101 #[test]
1102 fn timing_test()
1103 {
1104 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1105
1106 for _ in 0..10
1107 {
1108 let inst = Instant::now();
1109 let buf0_res = bufs.allocate_in_place();
1110 let end = inst.elapsed();
1111
1112 println!("alloc: {:?}", end);
1113 drop(buf0_res);
1114 }
1115
1116 let buf0_res = bufs.allocate();
1117 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1118
1119 let buf0 = buf0_res.unwrap();
1120
1121 for _ in 0..10
1122 {
1123 let inst = Instant::now();
1124 let buf0_w = buf0.write();
1125 let end = inst.elapsed();
1126
1127 println!("write: {:?}", end);
1128
1129 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1130 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1131 drop(buf0_w);
1132 }
1133
1134 for _ in 0..10
1135 {
1136 let inst = Instant::now();
1137 let buf0_r = buf0.read();
1138 let end = inst.elapsed();
1139
1140 println!("read: {:?}", end);
1141
1142 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1143 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1144 drop(buf0_r);
1145 }
1146 }
1147
1148 #[test]
1149 fn simple_test_mth()
1150 {
1151 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1152
1153 let buf0 = bufs.allocate().unwrap();
1154
1155 let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1156
1157 let join1=
1158 std::thread::spawn(move ||
1159 {
1160 println!("{:?}", buf0_rd);
1161
1162 std::thread::sleep(Duration::from_secs(2));
1163
1164 return;
1165 }
1166 );
1167
1168 let buf1_rd = buf0.read().unwrap();
1169
1170 let join2=
1171 std::thread::spawn(move ||
1172 {
1173 println!("{:?}", buf1_rd);
1174
1175 std::thread::sleep(Duration::from_secs(2));
1176
1177 return;
1178 }
1179 );
1180
1181 let flags = buf0.get_flags();
1182
1183 assert_eq!(flags.base, 2);
1184 assert_eq!(flags.read, 2);
1185 assert_eq!(flags.write, false);
1186
1187 let _ = join1.join();
1188 let _ = join2.join();
1189
1190 let flags = buf0.get_flags();
1191
1192 assert_eq!(flags.base, 2);
1193 assert_eq!(flags.read, 0);
1194 assert_eq!(flags.write, false);
1195 }
1196
1197 #[test]
1198 fn test_try_into_read()
1199 {
1200 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1201
1202 let buf0 = bufs.allocate_in_place();
1203
1204 println!("{:?}", buf0.get_flags());
1205
1206 let buf0_w = buf0.write();
1207 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1208 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1209
1210 drop(buf0);
1211
1212 let buf0_rd = buf0_w.unwrap().downgrade();
1213 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1214
1215 let buf0_flags = bufs.get_flags_by_index(0);
1216 assert_eq!(buf0_flags.is_some(), false, "flags");
1217
1218 let buf0_rd = buf0_rd.unwrap();
1219 let buf0_flags = buf0_rd.get_flags();
1220
1221 println!("{:?}", buf0_flags);
1222
1223 assert_eq!(buf0_flags.base, 0);
1224 assert_eq!(buf0_flags.read, 1);
1225 assert_eq!(buf0_flags.write, false);
1226
1227 let inst = Instant::now();
1228 let ve = buf0_rd.try_inner();
1229 let end = inst.elapsed();
1230
1231 println!("try inner: {:?}", end);
1232 assert_eq!(ve.is_ok(), true);
1233
1234
1235 }
1236
1237 #[tokio::test]
1238 async fn test_multithreading()
1239 {
1240
1241 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1242
1243 let buf0 = bufs.allocate().unwrap();
1244
1245 let mut buf0_write = buf0.write().unwrap();
1246
1247 buf0_write.as_mut_slice()[0] = 5;
1248 buf0_write.as_mut_slice()[1] = 4;
1249
1250 println!("{}", buf0_write[0]);
1251
1252 let buf0_r = buf0_write.downgrade().unwrap();
1253
1254 let join1=
1255 tokio::task::spawn(async move
1256 {
1257 println!("thread[1]:{}", buf0_r[0]);
1258
1259 tokio::time::sleep(Duration::from_millis(200)).await;
1260
1261 return;
1262 }
1263 );
1264
1265 let buf0_r = buf0.read().unwrap();
1266
1267 drop(buf0);
1269
1270 let join2=
1271 tokio::task::spawn(async move
1272 {
1273 println!("thread[2]: {}", buf0_r[0]);
1274 println!("thread[2]: {}", buf0_r[1]);
1275
1276 tokio::time::sleep(Duration::from_millis(200)).await;
1277
1278 return;
1279 }
1280 );
1281
1282 let _ = join1.await;
1283 let _ = join2.await;
1284
1285 return;
1286 }
1287}