1#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31
32#[cfg(not(feature = "std"))]
33use core::
34{
35 fmt,
36 mem,
37 ops::{Deref, DerefMut},
38 ptr::{self, NonNull},
39 sync::atomic::{AtomicU64, Ordering}
40};
41
42#[cfg(not(feature = "std"))]
43use core::{marker::PhantomData, task::Poll, time::Duration};
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
47
48#[cfg(not(feature = "std"))]
49use alloc::vec;
50use crossbeam_utils::Backoff;
51
52#[cfg(feature = "std")]
53use std::
54{
55 collections::VecDeque,
56 ops::{Deref, DerefMut},
57 ptr::{self, NonNull},
58 sync::atomic::{AtomicU64, Ordering},
59 fmt,
60 mem
61};
62
63#[cfg(feature = "std")]
64use std::{marker::PhantomData, task::Poll, time::Duration};
65
66extern crate crossbeam_utils;
67
68pub trait TryClone: Sized
69{
70 type Error;
71
72 fn try_clone(&self) -> Result<Self, Self::Error>;
73}
74
75trait LocalAsyncDrop: Send + Sync + 'static
77{
78 async fn async_drop(&mut self);
79}
80
81async
84fn async_drop<LAD: LocalAsyncDrop + Send + Sync>(mut lad: LAD)
85{
86 lad.async_drop().await;
87
88 drop(lad);
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum RwBufferError
93{
94 TooManyRead,
95 TooManyBase,
96 ReadTryAgianLater,
97 WriteTryAgianLater,
98 BaseTryAgainLater,
99 OutOfBuffers,
100 DowngradeFailed,
101 InvalidArguments,
102 Busy,
103}
104
105impl fmt::Display for RwBufferError
106{
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
108 {
109 match self
110 {
111 Self::TooManyRead =>
112 write!(f, "TooManyRead: read soft limit reached"),
113 Self::TooManyBase =>
114 write!(f, "TooManyBase: base soft limit reached"),
115 Self::ReadTryAgianLater =>
116 write!(f, "ReadTryAgianLater: shared access not available, try again later"),
117 Self::WriteTryAgianLater =>
118 write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
119 Self::BaseTryAgainLater =>
120 write!(f, "BaseTryAgainLater: failed to obtain a clone in reasonable time"),
121 Self::OutOfBuffers =>
122 write!(f, "OutOfBuffers: no more free bufers are left"),
123 Self::DowngradeFailed =>
124 write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
125 Self::InvalidArguments =>
126 write!(f, "InvalidArguments: arguments are not valid"),
127 Self::Busy =>
128 write!(f, "RwBuffer is busy and cannot be acquired"),
129 }
130 }
131}
132
133pub type RwBufferRes<T> = Result<T, RwBufferError>;
134
135#[derive(Debug, PartialEq, Eq)]
138pub struct RBuffer
139{
140 inner: NonNull<RwBufferInner>,
142
143 a_dropped: bool,
145}
146
147unsafe impl Send for RBuffer {}
148unsafe impl Sync for RBuffer {}
149
150impl RwBufType for RBuffer {}
151
152impl RBuffer
153{
154 #[inline]
155 fn new(inner: NonNull<RwBufferInner>) -> Self
156 {
157 return Self{ inner, a_dropped: false };
158 }
159
160 #[cfg(test)]
161 fn get_flags(&self) -> RwBufferFlags<Self>
162 {
163 use core::sync::atomic::Ordering;
164
165 let inner = unsafe{ self.inner.as_ref() };
166
167 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
168
169 return flags;
170 }
171
172 pub
174 fn as_slice(&self) -> &[u8]
175 {
176 let inner = unsafe { self.inner.as_ref() };
177
178 return inner.buf.as_ref().unwrap().as_slice();
179 }
180
181 pub
197 fn try_inner(mut self) -> Result<Vec<u8>, Self>
198 {
199 let inner = unsafe { self.inner.as_ref() };
200
201 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
202
203 if current_flags.try_inner_check() == true
206 {
207 let inner = unsafe { self.inner.as_mut() };
211
212 let buf = inner.buf.take().unwrap();
213
214 drop(self);
215
216 return Ok(buf);
217 }
218
219 return Err(self);
220 }
221
222 fn inner(&self) -> &RwBufferInner
223 {
224 return unsafe { self.inner.as_ref() };
225 }
226}
227
228impl LocalAsyncDrop for RBuffer
229{
230 async fn async_drop(&mut self)
231 {
232 self.a_dropped = true;
233
234 std::future::poll_fn(
235 |cx|
236 {
237 let inner = self.inner();
238
239 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
240 let mut new_flags = current_flags.clone();
241
242 new_flags.unread();
243
244 let res =
245 inner
246 .flags
247 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
248
249 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
250 {
251 if flags.is_drop_inplace() == true
252 {
253 unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
255 }
256
257 return Poll::Ready(());
258 }
259
260 cx.waker().wake_by_ref();
261
262 return Poll::Pending;
263 }
264 )
265 .await;
266 }
267}
268
269impl Deref for RBuffer
270{
271 type Target = Vec<u8>;
272
273 fn deref(&self) -> &Vec<u8>
274 {
275 let inner = self.inner();
276
277 return inner.buf.as_ref().unwrap();
278 }
279}
280
281impl Clone for RBuffer
282{
283 fn clone(&self) -> Self
299 {
300 let inner = self.inner();
301
302 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
303 let mut new_flags = current_flags.clone();
304
305 new_flags.read().unwrap();
306
307 let backoff = Backoff::new();
308 let mut parked = false;
309
310 loop
311 {
312 let res =
313 inner
314 .flags
315 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
316
317 if let Ok(_) = res
318 {
319 return Self{ inner: self.inner, a_dropped: self.a_dropped };
320 }
321
322 current_flags = res.err().unwrap().into();
323 new_flags = current_flags.clone();
324
325 new_flags.read().unwrap();
326
327 if backoff.is_completed() == false
328 {
329 backoff.snooze();
330 }
331 else
332 {
333 if parked == false
334 {
335 std::thread::park_timeout(Duration::from_millis(1));
337
338 parked = true;
339 }
340 else
341 {
342 panic!("can not obtain a clone of RBuffer!");
343 }
344 }
345 }
346 }
347}
348
349impl TryClone for RBuffer
350{
351 type Error = RwBufferError;
352
353 fn try_clone(&self) -> Result<Self, Self::Error>
365 {
366 let inner = self.inner();
367
368 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
369 let mut new_flags = current_flags.clone();
370
371 new_flags.read()?;
372
373 let backoff = Backoff::new();
374
375 loop
376 {
377 let res =
378 inner
379 .flags
380 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
381
382 if let Ok(_) = res
383 {
384 return Ok(Self{ inner: self.inner, a_dropped: self.a_dropped });
385 }
386
387 current_flags = res.err().unwrap().into();
388 new_flags = current_flags.clone();
389
390 new_flags.read()?;
391
392 if backoff.is_completed() == false
393 {
394 backoff.snooze();
395 }
396 else
397 {
398 break;
399 }
400 }
401
402 return Err(RwBufferError::ReadTryAgianLater);
403 }
404}
405
406impl Drop for RBuffer
407{
408 fn drop(&mut self)
416 {
417 if self.a_dropped == true
418 {
419 return;
420 }
421
422 let inner = self.inner();
423
424 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
425 let mut new_flags = current_flags.clone();
426
427 new_flags.unread();
428
429 let backoff = Backoff::new();
430
431 for _ in 0..1000
432 {
433 let res =
434 inner
435 .flags
436 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
437
438 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
439 {
440 if flags.is_drop_inplace() == true
441 {
442 unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
444 }
445
446 return;
447 }
448
449 current_flags = res.err().unwrap().into();
450 new_flags = current_flags.clone();
451
452 new_flags.unread();
453
454 backoff.snooze();
455 }
456
457 panic!("assertion trap: RBuffer::drop can not drop RBuffer in reasonable time!");
459 }
460}
461
462#[derive(Debug, PartialEq, Eq)]
466pub struct WBuffer
467{
468 buf: NonNull<RwBufferInner>,
470
471 downgraded: bool,
473}
474
475unsafe impl Send for WBuffer{}
476unsafe impl Sync for WBuffer{}
477
478impl RwBufType for WBuffer{}
479
480impl WBuffer
481{
482 #[inline]
483 fn new(inner: NonNull<RwBufferInner>) -> Self
484 {
485 return Self{ buf: inner, downgraded: false };
486 }
487
488 pub
499 fn downgrade(mut self) -> Result<RBuffer, Self>
500 {
501 let inner = unsafe { self.buf.as_ref() };
502
503 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
504 let mut new_flags = current_flags.clone();
505
506 new_flags.downgrade();
507
508 let backoff = Backoff::new();
509
510 while backoff.is_completed() == false
511 {
512 let res =
513 inner
514 .flags
515 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
516
517 if let Ok(_) = res
518 {
519 self.downgraded = true;
520
521 return Ok(RBuffer::new(self.buf.clone()));
522 }
523
524 current_flags = res.err().unwrap().into();
525 new_flags = current_flags.clone();
526
527 new_flags.downgrade();
528
529 backoff.snooze();
530 }
531
532 return Err(self);
533 }
534
535 pub
536 fn as_slice(&self) -> &[u8]
537 {
538 let inner = unsafe { self.buf.as_ref() };
539
540 return inner.buf.as_ref().unwrap()
541 }
542}
543
544impl Deref for WBuffer
545{
546 type Target = Vec<u8>;
547
548 fn deref(&self) -> &Vec<u8>
549 {
550 let inner = unsafe { self.buf.as_ref() };
551
552 return inner.buf.as_ref().unwrap();
553 }
554}
555
556impl DerefMut for WBuffer
557{
558 fn deref_mut(&mut self) -> &mut Vec<u8>
559 {
560 let inner = unsafe { self.buf.as_mut() };
561
562 return inner.buf.as_mut().unwrap();
563 }
564}
565
566impl Drop for WBuffer
567{
568 fn drop(&mut self)
570 {
571 if self.downgraded == true
572 {
573 return;
574 }
575
576 let inner = unsafe { self.buf.as_ref() };
577
578 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
579 let mut new_flags = current_flags.clone();
580
581 new_flags.unwrite();
582
583 let backoff = Backoff::new();
584
585 for _ in 0..1000
586 {
587 let res =
588 inner
589 .flags
590 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
591
592 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
593 {
594 if flags.is_drop_inplace() == true
595 {
596 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
598 }
599
600 return;
601 }
602
603 current_flags = res.err().unwrap().into();
604 new_flags = current_flags.clone();
605
606 new_flags.unwrite();
607
608 backoff.snooze();
609 }
610
611 panic!("assertion trap: WBuffer::drop can not drop RBuffer in reasonable time!");
614 }
615}
616
617impl LocalAsyncDrop for WBuffer
618{
619 async
620 fn async_drop(&mut self)
621 {
622 self.downgraded = true;
623
624 std::future::poll_fn(
625 move |cx|
626 {
627 let inner = unsafe { self.buf.as_ref() };
628
629 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
630 let mut new_flags = current_flags.clone();
631
632 new_flags.unwrite();
633
634 let res =
635 inner
636 .flags
637 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
638
639 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
640 {
641 if flags.is_drop_inplace() == true
642 {
643 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
645 }
646
647 return Poll::Ready(());
648 }
649
650 cx.waker().wake_by_ref();
651
652 return Poll::Pending;
653 }
654 )
655 .await;
656 }
657}
658
659trait RwBufType {}
660
661#[repr(align(8))]
664#[derive(Debug, PartialEq, Eq)]
665struct RwBufferFlags<TP>
666{
667 read: u32, write: bool, base: u16, unused0: u8, _p: PhantomData<TP>,
682}
683
684
685impl<TP: RwBufType> From<u64> for RwBufferFlags<TP>
686{
687 fn from(value: u64) -> Self
688 {
689 return unsafe { mem::transmute(value) };
690 }
691}
692
693impl<TP: RwBufType> From<RwBufferFlags<TP>> for u64
694{
695 fn from(value: RwBufferFlags<TP>) -> Self
696 {
697 return unsafe { mem::transmute(value) };
698 }
699}
700
701impl<TP: RwBufType> Default for RwBufferFlags<TP>
702{
703 fn default() -> RwBufferFlags<TP>
704 {
705 return
706 Self
707 {
708 read: 0,
709 write: false,
710 base: 1,
711 unused0: 0,
712 _p: PhantomData
713 };
714 }
715}
716
717impl Copy for RwBufferFlags<WBuffer>{}
718
719impl Clone for RwBufferFlags<WBuffer>
720{
721 fn clone(&self) -> Self
722 {
723 return
724 Self
725 {
726 read: self.read.clone(),
727 write: self.write.clone(),
728 base: self.base.clone(),
729 unused0: self.unused0.clone(),
730 _p: PhantomData
731 }
732 }
733}
734
735impl Copy for RwBufferFlags<RBuffer>{}
736
737impl Clone for RwBufferFlags<RBuffer>
738{
739 fn clone(&self) -> Self
740 {
741 return
742 Self
743 {
744 read: self.read.clone(),
745 write: self.write.clone(),
746 base: self.base.clone(),
747 unused0: self.unused0.clone(),
748 _p: PhantomData
749 }
750 }
751}
752
753impl Copy for RwBufferFlags<RwBuffer>{}
754
755impl Clone for RwBufferFlags<RwBuffer>
756{
757 fn clone(&self) -> Self
758 {
759 return
760 Self
761 {
762 read: self.read.clone(),
763 write: self.write.clone(),
764 base: self.base.clone(),
765 unused0: self.unused0.clone(),
766 _p: PhantomData
767 }
768 }
769}
770
771impl RwBufferFlags<WBuffer>
772{
773 #[inline]
774 fn write(&mut self) -> RwBufferRes<()>
775 {
776 if self.read == 0
777 {
778 self.write = true;
779
780 return Ok(());
781 }
782 else
783 {
784 return Err(RwBufferError::WriteTryAgianLater);
785 }
786 }
787
788 #[inline]
789 fn downgrade(&mut self)
790 {
791 self.write = false;
792 self.read += 1;
793 }
794
795 #[inline]
796 fn unwrite(&mut self)
797 {
798 self.write = false;
799 }
800}
801
802impl RwBufferFlags<RBuffer>
803{
804 #[inline]
805 fn try_inner_check(&self) -> bool
806 {
807 return self.read == 1 && self.write == false && self.base == 0;
808 }
809
810 #[inline]
811 fn unread(&mut self)
812 {
813 self.read -= 1;
814 }
815
816 #[inline]
817 fn read(&mut self) -> RwBufferRes<()>
818 {
819 if self.write == false
820 {
821 self.read += 1;
822
823 if self.read <= Self::MAX_READ_REFS
824 {
825 return Ok(());
826 }
827
828 return Err(RwBufferError::TooManyRead);
829 }
830
831 return Err(RwBufferError::ReadTryAgianLater);
832 }
833}
834
835impl RwBufferFlags<RwBuffer>
836{
837 #[inline]
838 fn make_pre_unused() -> Self
839 {
840 return Self{ read: 0, write: false, base: 1, unused0: 0, _p: PhantomData };
841 }
842
843 #[inline]
844 fn read(&mut self) -> RwBufferRes<()>
845 {
846 if self.write == false
847 {
848 self.read += 1;
849
850 if self.read <= Self::MAX_READ_REFS
851 {
852 return Ok(());
853 }
854
855 return Err(RwBufferError::TooManyRead);
856 }
857
858 return Err(RwBufferError::ReadTryAgianLater);
859 }
860
861 #[inline]
862 fn write(&mut self) -> RwBufferRes<()>
863 {
864 if self.read == 0
865 {
866 self.write = true;
867
868 return Ok(());
869 }
870 else
871 {
872 return Err(RwBufferError::WriteTryAgianLater);
873 }
874 }
875
876 #[inline]
877 fn base(&mut self) -> RwBufferRes<()>
878 {
879 self.base += 1;
880
881 if self.base <= Self::MAX_BASE_REFS
882 {
883 return Ok(());
884 }
885
886 return Err(RwBufferError::TooManyBase);
887 }
888
889 #[inline]
890 fn unbase(&mut self) -> bool
891 {
892 self.base -= 1;
893
894 return self.base != 0;
895 }
896}
897
898impl<TP: RwBufType> RwBufferFlags<TP>
899{
900 pub const MAX_READ_REFS: u32 = u32::MAX - 2;
902
903 pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
905
906
907
908 #[inline]
909 fn is_free(&self) -> bool
910 {
911 return self.write == false && self.read == 0 && self.base == 1;
912 }
913
914
915
916
917
918 #[inline]
919 fn is_drop_inplace(&self) -> bool
920 {
921 return self.read == 0 && self.write == false && self.base == 0;
922 }
923
924
925
926
927
928
929}
930
931#[derive(Debug)]
932pub struct RwBufferInner
933{
934 flags: AtomicU64,
936
937 buf: Option<Vec<u8>>,
939}
940
941impl RwBufferInner
942{
943 fn new(buf_size: usize) -> Self
944 {
945 return
946 Self
947 {
948 flags:
949 AtomicU64::new(RwBufferFlags::<RwBuffer>::default().into()),
950 buf:
951 Some(vec![0_u8; buf_size])
952 };
953 }
954}
955
956#[derive(Debug, PartialEq, Eq)]
963pub struct RwBuffer(NonNull<RwBufferInner>);
964
965unsafe impl Send for RwBuffer {}
966unsafe impl Sync for RwBuffer {}
967
968impl RwBufType for RwBuffer {}
969
970impl RwBuffer
971{
972 #[inline]
973 fn new(buf_size: usize) -> Self
974 {
975 let status = Box::new(RwBufferInner::new(buf_size));
976
977 return Self(Box::leak(status).into());
978 }
979
980 #[inline]
981 fn inner(&self) -> &RwBufferInner
982 {
983 return unsafe { self.0.as_ref() };
984 }
985
986 #[inline]
1002 pub
1003 fn is_free(&self) -> bool
1004 {
1005 let inner = self.inner();
1006
1007 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1008
1009 return flags.is_free();
1010 }
1011
1012 #[inline]
1029 pub(crate)
1030 fn acqiure_if_free(&self) -> RwBufferRes<Self>
1031 {
1032 let inner = self.inner();
1033
1034 let current_flags: RwBufferFlags<Self> = RwBufferFlags::make_pre_unused();
1035 let mut new_flags = current_flags.clone();
1036
1037 new_flags.base()?;
1038
1039 let res =
1040 inner
1041 .flags
1042 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1043
1044 if let Ok(_) = res
1045 {
1046 return Ok(Self(self.0.clone()));
1047 }
1048
1049 return Err(RwBufferError::Busy);
1050 }
1051
1052 pub
1065 fn write(&self) -> RwBufferRes<WBuffer>
1066 {
1067 let inner = self.inner();
1068
1069 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1070 let mut new_flags = current_flags.clone();
1071
1072 new_flags.write()?;
1073
1074 let backoff = Backoff::new();
1075
1076 while backoff.is_completed() == false
1077 {
1078 let res =
1079 inner
1080 .flags
1081 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1082
1083 if let Ok(_) = res
1084 {
1085 return Ok(WBuffer::new(self.0.clone()));
1086 }
1087
1088 current_flags = res.err().unwrap().into();
1089 new_flags = current_flags.clone();
1090
1091 new_flags.write()?;
1092
1093 backoff.snooze();
1094 }
1095
1096 return Err(RwBufferError::WriteTryAgianLater);
1097 }
1098
1099 pub async
1105 fn write_async(&self) -> RwBufferRes<WBuffer>
1106 {
1107 return
1108 std::future::poll_fn(
1109 |cx|
1110 {
1111 let inner = self.inner();
1112
1113 let current_flags: RwBufferFlags<WBuffer> = inner.flags.load(Ordering::SeqCst).into();
1114 let mut new_flags = current_flags.clone();
1115
1116 if let Err(e) = new_flags.write()
1117 {
1118 return Poll::Ready(Err(e));
1119 }
1120
1121 let res =
1122 inner
1123 .flags
1124 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1125
1126 if let Ok(_) = res
1127 {
1128 return Poll::Ready( Ok( WBuffer::new(self.0.clone()) ) );
1129 }
1130
1131 cx.waker().wake_by_ref();
1132
1133 return Poll::Pending;
1134 }
1135 )
1136 .await;
1137 }
1138
1139
1140 pub
1158 fn read(&self) -> RwBufferRes<RBuffer>
1159 {
1160
1161 let inner = self.inner();
1162
1163 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1164 let mut new_flags = current_flags.clone();
1165
1166 new_flags.read()?;
1167
1168 let backoff = Backoff::new();
1169
1170 while backoff.is_completed() == false
1171 {
1172 let res =
1173 inner
1174 .flags
1175 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1176
1177 if let Ok(_) = res
1178 {
1179 return Ok(RBuffer::new(self.0.clone()));
1180 }
1181
1182 current_flags = res.err().unwrap().into();
1183 new_flags = current_flags.clone();
1184
1185 new_flags.read()?;
1186
1187 backoff.snooze();
1188 }
1189
1190 return Err(RwBufferError::ReadTryAgianLater);
1191 }
1192
1193 pub async
1199 fn read_async(&self) -> RwBufferRes<RBuffer>
1200 {
1201 return
1202 std::future::poll_fn(
1203 |cx|
1204 {
1205 let inner = self.inner();
1206
1207 let current_flags: RwBufferFlags<RBuffer> = inner.flags.load(Ordering::SeqCst).into();
1208 let mut new_flags = current_flags.clone();
1209
1210 match new_flags.read()
1211 {
1212 Ok(_) => {},
1213 Err(RwBufferError::TooManyRead) =>
1214 return Poll::Ready(Err(RwBufferError::TooManyRead)),
1215 Err(RwBufferError::ReadTryAgianLater) =>
1216 {
1217 cx.waker().wake_by_ref();
1218
1219 return Poll::Pending;
1220 },
1221 Err(e) =>
1222 panic!("assertion trap: unknown error {} in Future for AsyncRBuffer", e)
1223 }
1224
1225 let res =
1226 inner
1227 .flags
1228 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1229
1230 if let Ok(_) = res
1231 {
1232 return Poll::Ready( Ok( RBuffer::new(self.0.clone()) ) );
1233 }
1234
1235 cx.waker().wake_by_ref();
1236
1237 return Poll::Pending;
1238 }
1239 )
1240 .await;
1241 }
1242
1243 #[cfg(test)]
1244 fn get_flags(&self) -> RwBufferFlags<Self>
1245 {
1246 let inner = self.inner();
1247
1248 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Acquire).into();
1249
1250 return flags;
1251 }
1252
1253 fn clone_single(&self) -> RwBufferRes<Self>
1260 {
1261 let inner = self.inner();
1262
1263 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1264
1265 current_flags.base()?;
1266
1267 inner.flags.store(current_flags.into(), Ordering::Relaxed);
1268
1269 return Ok(Self(self.0));
1270 }
1271}
1272
1273impl Clone for RwBuffer
1274{
1275 fn clone(&self) -> Self
1279 {
1280 let inner = self.inner();
1281
1282 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1283 let mut new_flags = current_flags.clone();
1284
1285 new_flags.base().unwrap();
1286
1287 let backoff = Backoff::new();
1288 let mut parked = false;
1289
1290 loop
1291 {
1292 let res =
1293 inner
1294 .flags
1295 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1296
1297 if let Ok(_) = res
1298 {
1299 return Self(self.0);
1300 }
1301
1302 current_flags = res.err().unwrap().into();
1303 new_flags = current_flags.clone();
1304
1305 new_flags.base().unwrap();
1306
1307 if backoff.is_completed() == false
1308 {
1309 backoff.snooze();
1310 }
1311 else
1312 {
1313 if parked == false
1314 {
1315 std::thread::park_timeout(Duration::from_millis(1));
1317
1318 parked = true;
1319 }
1320 else
1321 {
1322 panic!("can not obtain a clone of RBuffer!");
1323 }
1324 }
1325 }
1326 }
1327}
1328
1329impl TryClone for RwBuffer
1330{
1331 type Error = RwBufferError;
1332
1333 fn try_clone(&self) -> Result<Self, Self::Error>
1345 {
1346 let inner = self.inner();
1347
1348 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1349 let mut new_flags = current_flags.clone();
1350
1351 new_flags.base()?;
1352
1353 let backoff = Backoff::new();
1354
1355 while backoff.is_completed() == false
1356 {
1357 let res =
1358 inner
1359 .flags
1360 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1361
1362 if let Ok(_) = res
1363 {
1364 return Ok(Self(self.0));
1365 }
1366
1367 current_flags = res.err().unwrap().into();
1368 new_flags = current_flags.clone();
1369
1370 new_flags.base()?;
1371
1372 backoff.snooze();
1373 }
1374
1375 return Err(RwBufferError::BaseTryAgainLater);
1376 }
1377}
1378
1379impl Drop for RwBuffer
1380{
1381 fn drop(&mut self)
1386 {
1387 let inner = self.inner();
1388
1389 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1390 let mut new_flags = current_flags.clone();
1391
1392 new_flags.unbase();
1393
1394 let backoff = Backoff::new();
1395
1396 for _ in 0..1000
1397 {
1398 let res =
1399 inner
1400 .flags
1401 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1402
1403 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
1404 {
1405 if flags.is_drop_inplace() == true
1406 {
1407 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
1409 }
1410
1411 return;
1412 }
1413
1414 current_flags = res.err().unwrap().into();
1415 new_flags = current_flags.clone();
1416
1417 new_flags.unbase();
1418
1419 backoff.snooze();
1420 }
1421
1422 panic!("assertion trap: RwBuffer::drop can not drop RwBuffer in reasonable time!");
1424 }
1425}
1426
1427#[derive(Debug)]
1431pub struct RwBuffers
1432{
1433 buf_len: usize,
1435
1436 bufs_cnt_lim: usize,
1438
1439 buffs: VecDeque<RwBuffer>
1441
1442}
1443
1444impl RwBuffers
1445{
1446 pub
1470 fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
1471 {
1472 if pre_init_cnt > bufs_cnt_lim
1473 {
1474 return Err(RwBufferError::InvalidArguments);
1475 }
1476 else if buf_len == 0
1477 {
1478 return Err(RwBufferError::InvalidArguments);
1479 }
1480
1481 let buffs: VecDeque<RwBuffer> =
1482 if pre_init_cnt > 0
1483 {
1484 let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
1485
1486 for _ in 0..pre_init_cnt
1487 {
1488 buffs.push_back(RwBuffer::new(buf_len));
1489 }
1490
1491 buffs
1492 }
1493 else
1494 {
1495 VecDeque::with_capacity(bufs_cnt_lim)
1496 };
1497
1498 return Ok(
1499 Self
1500 {
1501 buf_len: buf_len,
1502 bufs_cnt_lim: bufs_cnt_lim,
1503 buffs: buffs,
1504 }
1505 )
1506 }
1507
1508 pub
1521 fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
1522 {
1523 let mut buffs = VecDeque::with_capacity(pre_init_cnt);
1524
1525 for _ in 0..pre_init_cnt
1526 {
1527 buffs.push_back(RwBuffer::new(buf_len));
1528 }
1529
1530 return
1531 Self
1532 {
1533 buf_len: buf_len,
1534 bufs_cnt_lim: 0,
1535 buffs: buffs,
1536 };
1537 }
1538
1539 pub
1557 fn allocate(&mut self) -> RwBufferRes<RwBuffer>
1558 {
1559 for buf in self.buffs.iter()
1561 {
1562 if let Ok(rwbuf) = buf.acqiure_if_free()
1563 {
1564 return Ok(rwbuf);
1565 }
1566 }
1567
1568 if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
1569 {
1570 let buf = RwBuffer::new(self.buf_len);
1571 let c_buf = buf.clone_single()?;
1572
1573 self.buffs.push_back(buf);
1574
1575 return Ok(c_buf);
1576 }
1577
1578 return Err(RwBufferError::OutOfBuffers);
1579 }
1580
1581 pub
1589 fn allocate_in_place(&mut self) -> RwBuffer
1590 {
1591 let mut idx = Option::None;
1592
1593 for (i, item) in self.buffs.iter().enumerate()
1594 {
1595 if let Ok(_) = self.buffs[i].acqiure_if_free()
1596 {
1597 idx = Some(i);
1598
1599 break;
1600 }
1601 }
1602
1603 return
1604 idx
1605 .map_or(
1606 RwBuffer::new(self.buf_len),
1607 |f| self.buffs.remove(f).unwrap()
1608 );
1609
1610 }
1611
1612 pub
1625 fn compact(&mut self, mut cnt: usize) -> usize
1626 {
1627 let p_cnt = cnt;
1628
1629 self
1630 .buffs
1631 .retain(
1632 |buf|
1633 {
1634 if buf.is_free() == true
1635 {
1636 cnt -= 1;
1637
1638 return false;
1639 }
1640
1641 return true;
1642 }
1643 );
1644
1645 return p_cnt - cnt;
1646 }
1647
1648 #[cfg(test)]
1649 fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags<RwBuffer>>
1650 {
1651 return Some(self.buffs.get(index)?.get_flags());
1652 }
1653}
1654
1655#[cfg(feature = "std")]
1656#[cfg(test)]
1657mod tests
1658{
1659 use std::time::{Duration, Instant};
1660
1661 use tokio::task;
1662
1663 use super::*;
1664
1665 #[test]
1666 fn simple_test()
1667 {
1668 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1669
1670 let buf0_res = bufs.allocate();
1671 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1672
1673 let buf0 = buf0_res.unwrap();
1674
1675 let buf0_w = buf0.write();
1676 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1677 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1678 drop(buf0_w);
1679
1680 let buf0_r = buf0.read();
1681 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1682 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1683
1684 let buf0_1 = buf0.clone();
1685 assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
1686
1687 let flags0 = buf0.get_flags();
1688 let flags0_1 = buf0_1.get_flags();
1689
1690 assert_eq!(flags0, flags0_1);
1691 assert_eq!(flags0.base, 3);
1692 assert_eq!(flags0.read, 1);
1693 assert_eq!(flags0.write, false);
1694 }
1695
1696 #[test]
1697 fn simple_test_dopped_in_place()
1698 {
1699 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1700
1701 let buf0_res = bufs.allocate();
1702 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1703
1704 let buf0 = buf0_res.unwrap();
1705
1706 println!("{:?}", buf0.get_flags());
1707
1708 let buf0_w = buf0.write();
1709 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1710 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1711
1712 drop(buf0);
1713
1714 let buf0_flags = bufs.get_flags_by_index(0);
1715 assert_eq!(buf0_flags.is_some(), true, "no flags");
1716 let buf0_flags = buf0_flags.unwrap();
1717
1718 println!("{:?}", buf0_flags);
1719
1720 assert_eq!(buf0_flags.base, 1);
1721 assert_eq!(buf0_flags.read, 0);
1722 assert_eq!(buf0_flags.write, true);
1723
1724 drop(buf0_w.unwrap());
1725
1726 let buf0_flags = bufs.get_flags_by_index(0);
1727 assert_eq!(buf0_flags.is_some(), true, "no flags");
1728 let buf0_flags = buf0_flags.unwrap();
1729
1730 println!("{:?}", buf0_flags);
1731
1732 assert_eq!(buf0_flags.base, 1);
1733 assert_eq!(buf0_flags.read, 0);
1734 assert_eq!(buf0_flags.write, false);
1735
1736 }
1737
1738 #[test]
1739 fn simple_test_dropped_in_place_downgrade()
1740 {
1741 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1742
1743 let buf0_res = bufs.allocate();
1744 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1745
1746 let buf0 = buf0_res.unwrap();
1747
1748 println!("{:?}", buf0.get_flags());
1749
1750 let buf0_w = buf0.write();
1751 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1752 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1753
1754 drop(buf0);
1755
1756 let buf0_rd = buf0_w.unwrap().downgrade();
1757 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1758
1759 let buf0_flags = bufs.get_flags_by_index(0);
1760 assert_eq!(buf0_flags.is_some(), true, "no flags");
1761 let buf0_flags = buf0_flags.unwrap();
1762
1763 println!("{:?}", buf0_flags);
1764
1765 assert_eq!(buf0_flags.base, 1);
1766 assert_eq!(buf0_flags.read, 1);
1767 assert_eq!(buf0_flags.write, false);
1768
1769 }
1770
1771 #[test]
1772 fn simple_test_drop_in_place_downgrade()
1773 {
1774 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1775
1776 let buf0_w =
1777 {
1778 let buf0 = bufs.allocate_in_place();
1779
1780 println!("1: {:?}", buf0.get_flags());
1781
1782 let buf0_w = buf0.write();
1783 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1784 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1785
1786 drop(buf0);
1787
1788 buf0_w
1789 };
1790
1791 let buf0_rd = buf0_w.unwrap().downgrade();
1792 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1793
1794 let buf0_flags = bufs.get_flags_by_index(0);
1795 assert_eq!(buf0_flags.is_some(), false, "flags");
1796
1797 let buf0_rd = buf0_rd.unwrap();
1798 let buf0_flags = buf0_rd.get_flags();
1799
1800 println!("2: {:?}", buf0_flags);
1801
1802 assert_eq!(buf0_flags.base, 0);
1803 assert_eq!(buf0_flags.read, 1);
1804 assert_eq!(buf0_flags.write, false);
1805 }
1806
1807 #[test]
1808 fn timing_test()
1809 {
1810 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1811
1812 for _ in 0..10
1813 {
1814 let inst = Instant::now();
1815 let buf0_res = bufs.allocate_in_place();
1816 let end = inst.elapsed();
1817
1818 println!("alloc: {:?}", end);
1819 drop(buf0_res);
1820 }
1821
1822 let buf0_res = bufs.allocate();
1823 assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1824
1825 let buf0 = buf0_res.unwrap();
1826
1827 for _ in 0..10
1828 {
1829 let inst = Instant::now();
1830 let buf0_w = buf0.write();
1831 let end = inst.elapsed();
1832
1833 println!("write: {:?}", end);
1834
1835 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1836 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1837 drop(buf0_w);
1838 }
1839
1840 for _ in 0..10
1841 {
1842 let inst = Instant::now();
1843 let buf0_r = buf0.read();
1844 let end = inst.elapsed();
1845
1846 println!("read: {:?}", end);
1847
1848 assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1849 assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1850 drop(buf0_r);
1851 }
1852 }
1853
1854 #[test]
1855 fn simple_test_mth()
1856 {
1857 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1858
1859 let buf0 = bufs.allocate().unwrap();
1860
1861 let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1862
1863 let join1=
1864 std::thread::spawn(move ||
1865 {
1866 println!("{:?}", buf0_rd);
1867
1868 std::thread::sleep(Duration::from_secs(2));
1869
1870 return;
1871 }
1872 );
1873
1874 let buf1_rd = buf0.read().unwrap();
1875
1876 let join2=
1877 std::thread::spawn(move ||
1878 {
1879 println!("{:?}", buf1_rd);
1880
1881 std::thread::sleep(Duration::from_secs(2));
1882
1883 return;
1884 }
1885 );
1886
1887 let flags = buf0.get_flags();
1888
1889 assert_eq!(flags.base, 2);
1890 assert_eq!(flags.read, 2);
1891 assert_eq!(flags.write, false);
1892
1893 let _ = join1.join();
1894 let _ = join2.join();
1895
1896 let flags = buf0.get_flags();
1897
1898 assert_eq!(flags.base, 2);
1899 assert_eq!(flags.read, 0);
1900 assert_eq!(flags.write, false);
1901 }
1902
1903 #[test]
1904 fn simple_test_concurent()
1905 {
1906 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1907
1908 let buf0 = bufs.allocate().unwrap();
1909
1910 let buf0_w = buf0.write().unwrap();
1911
1912
1913 let join1=
1914 std::thread::spawn(move ||
1915 {
1916 std::thread::park();
1917
1918 println!("{:?}", buf0_w);
1919
1920 std::thread::sleep(Duration::from_secs(3));
1921
1922 return;
1923 }
1924 );
1925
1926 join1.thread().unpark();
1927 let s = Instant::now();
1928
1929 let buf1_rd =
1930 loop
1931 {
1932 match buf0.read()
1933 {
1934 Ok(r) => break r,
1935 Err(e) =>
1936 {
1937 assert_eq!(e, RwBufferError::ReadTryAgianLater);
1938 continue;
1941 }
1942 }
1943 };
1944
1945 let e = s.elapsed();
1946
1947 println!("read await {:?} {}", e, e.as_millis());
1948
1949 assert_eq!(e.as_millis(), 3000);
1950
1951 let _ = join1.join();
1952
1953
1954 let flags = buf0.get_flags();
1955
1956 assert_eq!(flags.base, 2);
1957 assert_eq!(flags.read, 1);
1958 assert_eq!(flags.write, false);
1959
1960 drop(buf1_rd);
1961
1962 let flags = buf0.get_flags();
1963
1964 assert_eq!(flags.base, 2);
1965 assert_eq!(flags.read, 0);
1966 assert_eq!(flags.write, false);
1967 }
1968
1969 #[test]
1970 fn test_try_into_read()
1971 {
1972 let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1973
1974 let buf0 = bufs.allocate_in_place();
1975
1976 println!("{:?}", buf0.get_flags());
1977
1978 let buf0_w = buf0.write();
1979 assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1980 assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1981
1982 drop(buf0);
1983
1984 let buf0_rd = buf0_w.unwrap().downgrade();
1985 assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1986
1987 let buf0_flags = bufs.get_flags_by_index(0);
1988 assert_eq!(buf0_flags.is_some(), false, "flags");
1989
1990 let buf0_rd = buf0_rd.unwrap();
1991 let buf0_flags = buf0_rd.get_flags();
1992
1993 println!("{:?}", buf0_flags);
1994
1995 assert_eq!(buf0_flags.base, 0);
1996 assert_eq!(buf0_flags.read, 1);
1997 assert_eq!(buf0_flags.write, false);
1998
1999 let inst = Instant::now();
2000 let ve = buf0_rd.try_inner();
2001 let end = inst.elapsed();
2002
2003 println!("try inner: {:?}", end);
2004 assert_eq!(ve.is_ok(), true);
2005
2006
2007 }
2008
2009 #[tokio::test]
2010 async fn test_multithreading()
2011 {
2012
2013 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
2014
2015 let buf0 = bufs.allocate().unwrap();
2016
2017 let mut buf0_write = buf0.write().unwrap();
2018
2019 buf0_write.as_mut_slice()[0] = 5;
2020 buf0_write.as_mut_slice()[1] = 4;
2021
2022 println!("{}", buf0_write[0]);
2023
2024 let buf0_r = buf0_write.downgrade().unwrap();
2025
2026 let join1=
2027 tokio::task::spawn(async move
2028 {
2029 println!("thread[1]:{}", buf0_r[0]);
2030
2031 tokio::time::sleep(Duration::from_millis(200)).await;
2032
2033 return;
2034 }
2035 );
2036
2037 let buf0_r = buf0.read().unwrap();
2038
2039 drop(buf0);
2041
2042 let join2=
2043 tokio::task::spawn(async move
2044 {
2045 println!("thread[2]: {}", buf0_r[0]);
2046 println!("thread[2]: {}", buf0_r[1]);
2047
2048 tokio::time::sleep(Duration::from_millis(200)).await;
2049
2050 return;
2051 }
2052 );
2053
2054 let _ = join1.await;
2055 let _ = join2.await;
2056
2057 return;
2058 }
2059
2060 #[tokio::test]
2061 async fn test_multithreading_async()
2062 {
2063
2064 let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
2065
2066 let buf0 = bufs.allocate().unwrap();
2067
2068 let buf0_w = buf0.write_async().await.unwrap();
2069
2070 let task_hndl =
2071 task::spawn(async move
2072 {
2073 tokio::task::yield_now().await;
2074
2075 println!("{:?}", buf0_w);
2076
2077 tokio::time::sleep(Duration::from_secs(3)).await;
2078
2079 async_drop(buf0_w).await;
2080 return;
2081 }
2082 );
2083
2084 let s = tokio::time::Instant::now();
2085
2086 let buf1_rd = buf0.read_async().await.unwrap();
2087
2088 let e = s.elapsed();
2089
2090 println!("read await {:?} {}", e, e.as_millis());
2091
2092 assert_eq!(e.as_millis(), 3000);
2093
2094 let _ = task_hndl.await;
2095
2096
2097 let flags = buf0.get_flags();
2098
2099 assert_eq!(flags.base, 2);
2100 assert_eq!(flags.read, 1);
2101 assert_eq!(flags.write, false);
2102
2103 async_drop(buf1_rd).await;
2104
2105 let flags = buf0.get_flags();
2106
2107 assert_eq!(flags.base, 2);
2108 assert_eq!(flags.read, 0);
2109 assert_eq!(flags.write, false);
2110 }
2111}