1#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31
32#[cfg(not(feature = "std"))]
33use core::
34{
35 fmt,
36 mem,
37 ops::{Deref, DerefMut},
38 ptr::{self, NonNull},
39 sync::atomic::{AtomicU64, Ordering}
40};
41
42#[cfg(not(feature = "std"))]
43use core::{marker::PhantomData, task::Poll, time::Duration};
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
47
48#[cfg(not(feature = "std"))]
49use alloc::vec;
50use crossbeam_utils::Backoff;
51
52#[cfg(feature = "std")]
53use std::
54{
55 collections::VecDeque,
56 ops::{Deref, DerefMut},
57 ptr::{self, NonNull},
58 sync::atomic::{AtomicU64, Ordering},
59 fmt,
60 mem
61};
62
63#[cfg(feature = "std")]
64use std::{marker::PhantomData, task::Poll, time::Duration};
65
66extern crate crossbeam_utils;
67
68pub trait TryClone: Sized
70{
71 type Error;
72
73 fn try_clone(&self) -> Result<Self, Self::Error>;
76}
77
78pub trait LocalAsyncDrop: Send + Sync + 'static
80{
81 fn async_drop(&mut self) -> impl std::future::Future<Output = ()>;
83}
84
85pub trait LocalAsyncClone: Send + Sync + 'static
87{
88 fn async_clone(&self) -> impl std::future::Future<Output = Self>;
90}
91
92pub async
95fn async_drop<LAD: LocalAsyncDrop + Send + Sync>(mut lad: LAD)
96{
97 lad.async_drop().await;
98
99 drop(lad);
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum RwBufferError
104{
105 TooManyRead,
107
108 TooManyBase,
110
111 ReadTryAgianLater,
113
114 WriteTryAgianLater,
116
117 BaseTryAgainLater,
119
120 OutOfBuffers,
122
123 DowngradeFailed,
125
126 InvalidArguments,
128
129 Busy,
131}
132
133impl fmt::Display for RwBufferError
134{
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
136 {
137 match self
138 {
139 Self::TooManyRead =>
140 write!(f, "TooManyRead: read soft limit reached"),
141 Self::TooManyBase =>
142 write!(f, "TooManyBase: base soft limit reached"),
143 Self::ReadTryAgianLater =>
144 write!(f, "ReadTryAgianLater: shared access not available, try again later"),
145 Self::WriteTryAgianLater =>
146 write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
147 Self::BaseTryAgainLater =>
148 write!(f, "BaseTryAgainLater: failed to obtain a clone in reasonable time"),
149 Self::OutOfBuffers =>
150 write!(f, "OutOfBuffers: no more free bufers are left"),
151 Self::DowngradeFailed =>
152 write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
153 Self::InvalidArguments =>
154 write!(f, "InvalidArguments: arguments are not valid"),
155 Self::Busy =>
156 write!(f, "RwBuffer is busy and cannot be acquired"),
157 }
158 }
159}
160
161pub type RwBufferRes<T> = Result<T, RwBufferError>;
162
163#[derive(Debug)]
166pub struct RBuffer
167{
168 inner: NonNull<RwBufferInner>,
170
171 a_dropped: bool,
173}
174
175unsafe impl Send for RBuffer {}
176unsafe impl Sync for RBuffer {}
177
178impl RwBufType for RBuffer {}
179
180impl Eq for RBuffer {}
181
182impl PartialEq for RBuffer
183{
184 fn eq(&self, other: &Self) -> bool
185 {
186 return self.inner == other.inner;
187 }
188}
189
190impl RBuffer
191{
192 #[inline]
193 fn new(inner: NonNull<RwBufferInner>) -> Self
194 {
195 return Self{ inner, a_dropped: false };
196 }
197
198 #[cfg(test)]
199 fn get_flags(&self) -> RwBufferFlags<Self>
200 {
201 use core::sync::atomic::Ordering;
202
203 let inner = unsafe{ self.inner.as_ref() };
204
205 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
206
207 return flags;
208 }
209
210 pub
212 fn as_slice(&self) -> &[u8]
213 {
214 let inner = unsafe { self.inner.as_ref() };
215
216 return inner.buf.as_ref().unwrap().as_slice();
217 }
218
219 pub
237 fn try_inner(mut self) -> Result<Vec<u8>, Self>
238 {
239 let inner = unsafe { self.inner.as_ref() };
240
241 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
242
243 if current_flags.try_inner_check() == true
246 {
247 let inner = unsafe { self.inner.as_mut() };
251
252 let buf = inner.buf.take().unwrap();
253
254 drop(self);
257
258 return Ok(buf);
259 }
260
261 return Err(self);
262 }
263
264 fn inner(&self) -> &RwBufferInner
265 {
266 return unsafe { self.inner.as_ref() };
267 }
268}
269
270impl Deref for RBuffer
271{
272 type Target = Vec<u8>;
273
274 fn deref(&self) -> &Vec<u8>
275 {
276 let inner = self.inner();
277
278 return inner.buf.as_ref().unwrap();
279 }
280}
281
282impl LocalAsyncClone for RBuffer
283{
284 fn async_clone(&self) -> impl std::future::Future<Output = Self>
285 {
286 return
287 std::future::poll_fn(
288 |cx|
289 {
290 let inner = self.inner();
291
292 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
293 let mut new_flags = current_flags.clone();
294
295 new_flags.read().unwrap();
296
297 let res =
298 inner
299 .flags
300 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
301
302 if let Ok(_) = res
303 {
304 return Poll::Ready( Self{ inner: self.inner, a_dropped: self.a_dropped } );
305 }
306
307 return Poll::Pending;
308 }
309 );
310 }
311}
312
313impl Clone for RBuffer
314{
315 fn clone(&self) -> Self
331 {
332 let inner = self.inner();
333
334 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
335 let mut new_flags = current_flags.clone();
336
337 new_flags.read().unwrap();
338
339 let backoff = Backoff::new();
340 let mut parked = false;
341
342 loop
343 {
344 let res =
345 inner
346 .flags
347 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
348
349 if let Ok(_) = res
350 {
351 return Self{ inner: self.inner, a_dropped: self.a_dropped };
352 }
353
354 current_flags = res.err().unwrap().into();
355 new_flags = current_flags.clone();
356
357 new_flags.read().unwrap();
358
359 if backoff.is_completed() == false
360 {
361 backoff.snooze();
362 }
363 else
364 {
365 if parked == false
366 {
367 std::thread::park_timeout(Duration::from_millis(1));
369
370 parked = true;
371 }
372 else
373 {
374 panic!("can not obtain a clone of RBuffer!");
375 }
376 }
377 }
378 }
379}
380
381impl TryClone for RBuffer
382{
383 type Error = RwBufferError;
384
385 fn try_clone(&self) -> Result<Self, Self::Error>
397 {
398 let inner = self.inner();
399
400 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
401 let mut new_flags = current_flags.clone();
402
403 new_flags.read()?;
404
405 let backoff = Backoff::new();
406
407 loop
408 {
409 let res =
410 inner
411 .flags
412 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
413
414 if let Ok(_) = res
415 {
416 return Ok(Self{ inner: self.inner, a_dropped: self.a_dropped });
417 }
418
419 current_flags = res.err().unwrap().into();
420 new_flags = current_flags.clone();
421
422 new_flags.read()?;
423
424 if backoff.is_completed() == false
425 {
426 backoff.snooze();
427 }
428 else
429 {
430 break;
431 }
432 }
433
434 return Err(RwBufferError::ReadTryAgianLater);
435 }
436}
437
438impl LocalAsyncDrop for RBuffer
439{
440 fn async_drop(&mut self) -> impl std::future::Future<Output = ()>
441 {
442 self.a_dropped = true;
443
444 return
445 std::future::poll_fn(
446 |cx|
447 {
448 let inner = self.inner();
449
450 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
451 let mut new_flags = current_flags.clone();
452
453 new_flags.unread();
454
455 let res =
456 inner
457 .flags
458 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
459
460 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
461 {
462 if flags.is_drop_inplace() == true
463 {
464 unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
466 }
467
468 return Poll::Ready(());
469 }
470
471 cx.waker().wake_by_ref();
472
473 return Poll::Pending;
474 }
475 );
476 }
477}
478
479impl Drop for RBuffer
480{
481 fn drop(&mut self)
489 {
490 if self.a_dropped == true
491 {
492 return;
493 }
494
495 let inner = self.inner();
496
497 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
498 let mut new_flags = current_flags.clone();
499
500 new_flags.unread();
501
502 let backoff = Backoff::new();
503
504 for _ in 0..1000
505 {
506 let res =
507 inner
508 .flags
509 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
510
511 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
512 {
513 if flags.is_drop_inplace() == true
514 {
515 unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
517 }
518
519 return;
520 }
521
522 current_flags = res.err().unwrap().into();
523 new_flags = current_flags.clone();
524
525 new_flags.unread();
526
527 backoff.snooze();
528 }
529
530 panic!("assertion trap: RBuffer::drop can not drop RBuffer in reasonable time!");
532 }
533}
534
535#[derive(Debug, PartialEq, Eq)]
539pub struct WBuffer
540{
541 buf: NonNull<RwBufferInner>,
543
544 downgraded: bool,
546}
547
548unsafe impl Send for WBuffer{}
549unsafe impl Sync for WBuffer{}
550
551impl RwBufType for WBuffer{}
552
553impl WBuffer
554{
555 #[inline]
556 fn new(inner: NonNull<RwBufferInner>) -> Self
557 {
558 return Self{ buf: inner, downgraded: false };
559 }
560
561 pub
572 fn downgrade(mut self) -> Result<RBuffer, Self>
573 {
574 let inner = unsafe { self.buf.as_ref() };
575
576 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
577 let mut new_flags = current_flags.clone();
578
579 new_flags.downgrade();
580
581 let backoff = Backoff::new();
582
583 while backoff.is_completed() == false
584 {
585 let res =
586 inner
587 .flags
588 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
589
590 if let Ok(_) = res
591 {
592 self.downgraded = true;
593
594 return Ok(RBuffer::new(self.buf.clone()));
595 }
596
597 current_flags = res.err().unwrap().into();
598 new_flags = current_flags.clone();
599
600 new_flags.downgrade();
601
602 backoff.snooze();
603 }
604
605 return Err(self);
606 }
607
608 pub
609 fn as_slice(&self) -> &[u8]
610 {
611 let inner = unsafe { self.buf.as_ref() };
612
613 return inner.buf.as_ref().unwrap()
614 }
615}
616
617impl Deref for WBuffer
618{
619 type Target = Vec<u8>;
620
621 fn deref(&self) -> &Vec<u8>
622 {
623 let inner = unsafe { self.buf.as_ref() };
624
625 return inner.buf.as_ref().unwrap();
626 }
627}
628
629impl DerefMut for WBuffer
630{
631 fn deref_mut(&mut self) -> &mut Vec<u8>
632 {
633 let inner = unsafe { self.buf.as_mut() };
634
635 return inner.buf.as_mut().unwrap();
636 }
637}
638
639impl Drop for WBuffer
640{
641 fn drop(&mut self)
643 {
644 if self.downgraded == true
645 {
646 return;
647 }
648
649 let inner = unsafe { self.buf.as_ref() };
650
651 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
652 let mut new_flags = current_flags.clone();
653
654 new_flags.unwrite();
655
656 let backoff = Backoff::new();
657
658 for _ in 0..1000
659 {
660 let res =
661 inner
662 .flags
663 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
664
665 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
666 {
667 if flags.is_drop_inplace() == true
668 {
669 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
671 }
672
673 return;
674 }
675
676 current_flags = res.err().unwrap().into();
677 new_flags = current_flags.clone();
678
679 new_flags.unwrite();
680
681 backoff.snooze();
682 }
683
684 panic!("assertion trap: WBuffer::drop can not drop RBuffer in reasonable time!");
687 }
688}
689
690impl LocalAsyncDrop for WBuffer
691{
692 fn async_drop(&mut self) -> impl std::future::Future<Output = ()>
693 {
694 self.downgraded = true;
695
696 return
697 std::future::poll_fn(
698 move |cx|
699 {
700 let inner = unsafe { self.buf.as_ref() };
701
702 let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
703 let mut new_flags = current_flags.clone();
704
705 new_flags.unwrite();
706
707 let res =
708 inner
709 .flags
710 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
711
712 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
713 {
714 if flags.is_drop_inplace() == true
715 {
716 unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
718 }
719
720 return Poll::Ready(());
721 }
722
723 cx.waker().wake_by_ref();
724
725 return Poll::Pending;
726 }
727 );
728 }
729}
730
731trait RwBufType {}
732
733#[repr(align(8))]
736#[derive(Debug, PartialEq, Eq)]
737struct RwBufferFlags<TP>
738{
739 read: u32, write: bool, base: u16, unused0: u8, _p: PhantomData<TP>,
754}
755
756
757impl<TP: RwBufType> From<u64> for RwBufferFlags<TP>
758{
759 fn from(value: u64) -> Self
760 {
761 return unsafe { mem::transmute(value) };
762 }
763}
764
765impl<TP: RwBufType> From<RwBufferFlags<TP>> for u64
766{
767 fn from(value: RwBufferFlags<TP>) -> Self
768 {
769 return unsafe { mem::transmute(value) };
770 }
771}
772
773impl<TP: RwBufType> Default for RwBufferFlags<TP>
774{
775 fn default() -> RwBufferFlags<TP>
776 {
777 return
778 Self
779 {
780 read: 0,
781 write: false,
782 base: 1,
783 unused0: 0,
784 _p: PhantomData
785 };
786 }
787}
788
789impl Copy for RwBufferFlags<WBuffer>{}
790
791impl Clone for RwBufferFlags<WBuffer>
792{
793 fn clone(&self) -> Self
794 {
795 return
796 Self
797 {
798 read: self.read.clone(),
799 write: self.write.clone(),
800 base: self.base.clone(),
801 unused0: self.unused0.clone(),
802 _p: PhantomData
803 }
804 }
805}
806
807impl Copy for RwBufferFlags<RBuffer>{}
808
809impl Clone for RwBufferFlags<RBuffer>
810{
811 fn clone(&self) -> Self
812 {
813 return
814 Self
815 {
816 read: self.read.clone(),
817 write: self.write.clone(),
818 base: self.base.clone(),
819 unused0: self.unused0.clone(),
820 _p: PhantomData
821 }
822 }
823}
824
825impl Copy for RwBufferFlags<RwBuffer>{}
826
827impl Clone for RwBufferFlags<RwBuffer>
828{
829 fn clone(&self) -> Self
830 {
831 return
832 Self
833 {
834 read: self.read.clone(),
835 write: self.write.clone(),
836 base: self.base.clone(),
837 unused0: self.unused0.clone(),
838 _p: PhantomData
839 }
840 }
841}
842
843impl RwBufferFlags<WBuffer>
844{
845 #[inline]
846 fn write(&mut self) -> RwBufferRes<()>
847 {
848 if self.read == 0
849 {
850 self.write = true;
851
852 return Ok(());
853 }
854 else
855 {
856 return Err(RwBufferError::WriteTryAgianLater);
857 }
858 }
859
860 #[inline]
861 fn downgrade(&mut self)
862 {
863 self.write = false;
864 self.read += 1;
865 }
866
867 #[inline]
868 fn unwrite(&mut self)
869 {
870 self.write = false;
871 }
872}
873
874impl RwBufferFlags<RBuffer>
875{
876 #[inline]
877 fn try_inner_check(&self) -> bool
878 {
879 return self.read == 1 && self.write == false && self.base == 0;
880 }
881
882 #[inline]
883 fn unread(&mut self)
884 {
885 self.read -= 1;
886 }
887
888 #[inline]
889 fn read(&mut self) -> RwBufferRes<()>
890 {
891 if self.write == false
892 {
893 self.read += 1;
894
895 if self.read <= Self::MAX_READ_REFS
896 {
897 return Ok(());
898 }
899
900 return Err(RwBufferError::TooManyRead);
901 }
902
903 return Err(RwBufferError::ReadTryAgianLater);
904 }
905}
906
907impl RwBufferFlags<RwBuffer>
908{
909 #[inline]
910 fn make_pre_unused() -> Self
911 {
912 return Self{ read: 0, write: false, base: 1, unused0: 0, _p: PhantomData };
913 }
914
915 #[inline]
916 fn read(&mut self) -> RwBufferRes<()>
917 {
918 if self.write == false
919 {
920 self.read += 1;
921
922 if self.read <= Self::MAX_READ_REFS
923 {
924 return Ok(());
925 }
926
927 return Err(RwBufferError::TooManyRead);
928 }
929
930 return Err(RwBufferError::ReadTryAgianLater);
931 }
932
933 #[inline]
934 fn write(&mut self) -> RwBufferRes<()>
935 {
936 if self.read == 0
937 {
938 self.write = true;
939
940 return Ok(());
941 }
942 else
943 {
944 return Err(RwBufferError::WriteTryAgianLater);
945 }
946 }
947
948 #[inline]
949 fn base(&mut self) -> RwBufferRes<()>
950 {
951 self.base += 1;
952
953 if self.base <= Self::MAX_BASE_REFS
954 {
955 return Ok(());
956 }
957
958 return Err(RwBufferError::TooManyBase);
959 }
960
961 #[inline]
962 fn unbase(&mut self) -> bool
963 {
964 self.base -= 1;
965
966 return self.base != 0;
967 }
968}
969
970impl<TP: RwBufType> RwBufferFlags<TP>
971{
972 pub const MAX_READ_REFS: u32 = u32::MAX - 2;
974
975 pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
977
978
979 #[inline]
980 fn is_free(&self) -> bool
981 {
982 return self.write == false && self.read == 0 && self.base == 1;
983 }
984
985 #[inline]
986 fn is_drop_inplace(&self) -> bool
987 {
988 return self.read == 0 && self.write == false && self.base == 0;
989 }
990}
991
992#[derive(Debug)]
993pub struct RwBufferInner
994{
995 flags: AtomicU64,
997
998 buf: Option<Vec<u8>>,
1000}
1001
1002impl RwBufferInner
1003{
1004 fn new(buf_size: usize) -> Self
1005 {
1006 return
1007 Self
1008 {
1009 flags:
1010 AtomicU64::new(RwBufferFlags::<RwBuffer>::default().into()),
1011 buf:
1012 Some(vec![0_u8; buf_size])
1013 };
1014 }
1015}
1016
1017#[derive(Debug, PartialEq, Eq)]
1024pub struct RwBuffer(NonNull<RwBufferInner>);
1025
1026unsafe impl Send for RwBuffer {}
1027unsafe impl Sync for RwBuffer {}
1028
1029impl RwBufType for RwBuffer {}
1030
1031impl RwBuffer
1032{
1033 #[inline]
1034 fn new(buf_size: usize) -> Self
1035 {
1036 let status = Box::new(RwBufferInner::new(buf_size));
1037
1038 return Self(Box::leak(status).into());
1039 }
1040
1041 #[inline]
1042 fn inner(&self) -> &RwBufferInner
1043 {
1044 return unsafe { self.0.as_ref() };
1045 }
1046
1047 #[inline]
1063 pub
1064 fn is_free(&self) -> bool
1065 {
1066 let inner = self.inner();
1067
1068 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1069
1070 return flags.is_free();
1071 }
1072
1073 #[inline]
1090 pub(crate)
1091 fn acqiure_if_free(&self) -> RwBufferRes<Self>
1092 {
1093 let inner = self.inner();
1094
1095 let current_flags: RwBufferFlags<Self> = RwBufferFlags::make_pre_unused();
1096 let mut new_flags = current_flags.clone();
1097
1098 new_flags.base()?;
1099
1100 let res =
1101 inner
1102 .flags
1103 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1104
1105 if let Ok(_) = res
1106 {
1107 return Ok(Self(self.0.clone()));
1108 }
1109
1110 return Err(RwBufferError::Busy);
1111 }
1112
1113 pub
1126 fn write(&self) -> RwBufferRes<WBuffer>
1127 {
1128 let inner = self.inner();
1129
1130 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1131 let mut new_flags = current_flags.clone();
1132
1133 new_flags.write()?;
1134
1135 let backoff = Backoff::new();
1136
1137 while backoff.is_completed() == false
1138 {
1139 let res =
1140 inner
1141 .flags
1142 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1143
1144 if let Ok(_) = res
1145 {
1146 return Ok(WBuffer::new(self.0.clone()));
1147 }
1148
1149 current_flags = res.err().unwrap().into();
1150 new_flags = current_flags.clone();
1151
1152 new_flags.write()?;
1153
1154 backoff.snooze();
1155 }
1156
1157 return Err(RwBufferError::WriteTryAgianLater);
1158 }
1159
1160 pub async
1166 fn write_async(&self) -> RwBufferRes<WBuffer>
1167 {
1168 return
1169 std::future::poll_fn(
1170 |cx|
1171 {
1172 let inner = self.inner();
1173
1174 let current_flags: RwBufferFlags<WBuffer> = inner.flags.load(Ordering::SeqCst).into();
1175 let mut new_flags = current_flags.clone();
1176
1177 if let Err(e) = new_flags.write()
1178 {
1179 return Poll::Ready(Err(e));
1180 }
1181
1182 let res =
1183 inner
1184 .flags
1185 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1186
1187 if let Ok(_) = res
1188 {
1189 return Poll::Ready( Ok( WBuffer::new(self.0.clone()) ) );
1190 }
1191
1192 cx.waker().wake_by_ref();
1193
1194 return Poll::Pending;
1195 }
1196 )
1197 .await;
1198 }
1199
1200
1201 pub
1219 fn read(&self) -> RwBufferRes<RBuffer>
1220 {
1221
1222 let inner = self.inner();
1223
1224 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1225 let mut new_flags = current_flags.clone();
1226
1227 new_flags.read()?;
1228
1229 let backoff = Backoff::new();
1230
1231 while backoff.is_completed() == false
1232 {
1233 let res =
1234 inner
1235 .flags
1236 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1237
1238 if let Ok(_) = res
1239 {
1240 return Ok(RBuffer::new(self.0.clone()));
1241 }
1242
1243 current_flags = res.err().unwrap().into();
1244 new_flags = current_flags.clone();
1245
1246 new_flags.read()?;
1247
1248 backoff.snooze();
1249 }
1250
1251 return Err(RwBufferError::ReadTryAgianLater);
1252 }
1253
1254 pub async
1260 fn read_async(&self) -> RwBufferRes<RBuffer>
1261 {
1262 return
1263 std::future::poll_fn(
1264 |cx|
1265 {
1266 let inner = self.inner();
1267
1268 let current_flags: RwBufferFlags<RBuffer> = inner.flags.load(Ordering::SeqCst).into();
1269 let mut new_flags = current_flags.clone();
1270
1271 match new_flags.read()
1272 {
1273 Ok(_) => {},
1274 Err(RwBufferError::TooManyRead) =>
1275 return Poll::Ready(Err(RwBufferError::TooManyRead)),
1276 Err(RwBufferError::ReadTryAgianLater) =>
1277 {
1278 cx.waker().wake_by_ref();
1279
1280 return Poll::Pending;
1281 },
1282 Err(e) =>
1283 panic!("assertion trap: unknown error {} in Future for AsyncRBuffer", e)
1284 }
1285
1286 let res =
1287 inner
1288 .flags
1289 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1290
1291 if let Ok(_) = res
1292 {
1293 return Poll::Ready( Ok( RBuffer::new(self.0.clone()) ) );
1294 }
1295
1296 cx.waker().wake_by_ref();
1297
1298 return Poll::Pending;
1299 }
1300 )
1301 .await;
1302 }
1303
1304 #[cfg(test)]
1305 fn get_flags(&self) -> RwBufferFlags<Self>
1306 {
1307 let inner = self.inner();
1308
1309 let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Acquire).into();
1310
1311 return flags;
1312 }
1313
1314 fn clone_single(&self) -> RwBufferRes<Self>
1321 {
1322 let inner = self.inner();
1323
1324 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1325
1326 current_flags.base()?;
1327
1328 inner.flags.store(current_flags.into(), Ordering::Relaxed);
1329
1330 return Ok(Self(self.0));
1331 }
1332}
1333
1334impl Clone for RwBuffer
1335{
1336 fn clone(&self) -> Self
1340 {
1341 let inner = self.inner();
1342
1343 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1344 let mut new_flags = current_flags.clone();
1345
1346 new_flags.base().unwrap();
1347
1348 let backoff = Backoff::new();
1349 let mut parked = false;
1350
1351 loop
1352 {
1353 let res =
1354 inner
1355 .flags
1356 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1357
1358 if let Ok(_) = res
1359 {
1360 return Self(self.0);
1361 }
1362
1363 current_flags = res.err().unwrap().into();
1364 new_flags = current_flags.clone();
1365
1366 new_flags.base().unwrap();
1367
1368 if backoff.is_completed() == false
1369 {
1370 backoff.snooze();
1371 }
1372 else
1373 {
1374 if parked == false
1375 {
1376 std::thread::park_timeout(Duration::from_millis(1));
1378
1379 parked = true;
1380 }
1381 else
1382 {
1383 panic!("can not obtain a clone of RBuffer!");
1384 }
1385 }
1386 }
1387 }
1388}
1389
1390impl TryClone for RwBuffer
1391{
1392 type Error = RwBufferError;
1393
1394 fn try_clone(&self) -> Result<Self, Self::Error>
1406 {
1407 let inner = self.inner();
1408
1409 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1410 let mut new_flags = current_flags.clone();
1411
1412 new_flags.base()?;
1413
1414 let backoff = Backoff::new();
1415
1416 while backoff.is_completed() == false
1417 {
1418 let res =
1419 inner
1420 .flags
1421 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1422
1423 if let Ok(_) = res
1424 {
1425 return Ok(Self(self.0));
1426 }
1427
1428 current_flags = res.err().unwrap().into();
1429 new_flags = current_flags.clone();
1430
1431 new_flags.base()?;
1432
1433 backoff.snooze();
1434 }
1435
1436 return Err(RwBufferError::BaseTryAgainLater);
1437 }
1438}
1439
1440impl Drop for RwBuffer
1441{
1442 fn drop(&mut self)
1447 {
1448 let inner = self.inner();
1449
1450 let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1451 let mut new_flags = current_flags.clone();
1452
1453 new_flags.unbase();
1454
1455 let backoff = Backoff::new();
1456
1457 for _ in 0..1000
1458 {
1459 let res =
1460 inner
1461 .flags
1462 .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1463
1464 if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
1465 {
1466 if flags.is_drop_inplace() == true
1467 {
1468 unsafe { ptr::drop_in_place(self.0.as_ptr()) };
1470 }
1471
1472 return;
1473 }
1474
1475 current_flags = res.err().unwrap().into();
1476 new_flags = current_flags.clone();
1477
1478 new_flags.unbase();
1479
1480 backoff.snooze();
1481 }
1482
1483 panic!("assertion trap: RwBuffer::drop can not drop RwBuffer in reasonable time!");
1485 }
1486}
1487
1488#[derive(Debug)]
1492pub struct RwBuffers
1493{
1494 buf_len: usize,
1496
1497 bufs_cnt_lim: usize,
1499
1500 buffs: VecDeque<RwBuffer>
1502
1503}
1504
1505impl RwBuffers
1506{
1507 pub
1531 fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
1532 {
1533 if pre_init_cnt > bufs_cnt_lim
1534 {
1535 return Err(RwBufferError::InvalidArguments);
1536 }
1537 else if buf_len == 0
1538 {
1539 return Err(RwBufferError::InvalidArguments);
1540 }
1541
1542 let buffs: VecDeque<RwBuffer> =
1543 if pre_init_cnt > 0
1544 {
1545 let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
1546
1547 for _ in 0..pre_init_cnt
1548 {
1549 buffs.push_back(RwBuffer::new(buf_len));
1550 }
1551
1552 buffs
1553 }
1554 else
1555 {
1556 VecDeque::with_capacity(bufs_cnt_lim)
1557 };
1558
1559 return Ok(
1560 Self
1561 {
1562 buf_len: buf_len,
1563 bufs_cnt_lim: bufs_cnt_lim,
1564 buffs: buffs,
1565 }
1566 )
1567 }
1568
1569 pub
1582 fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
1583 {
1584 let mut buffs = VecDeque::with_capacity(pre_init_cnt);
1585
1586 for _ in 0..pre_init_cnt
1587 {
1588 buffs.push_back(RwBuffer::new(buf_len));
1589 }
1590
1591 return
1592 Self
1593 {
1594 buf_len: buf_len,
1595 bufs_cnt_lim: 0,
1596 buffs: buffs,
1597 };
1598 }
1599
1600 pub
1618 fn allocate(&mut self) -> RwBufferRes<RwBuffer>
1619 {
1620 for buf in self.buffs.iter()
1622 {
1623 if let Ok(rwbuf) = buf.acqiure_if_free()
1624 {
1625 return Ok(rwbuf);
1626 }
1627 }
1628
1629 if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
1630 {
1631 let buf = RwBuffer::new(self.buf_len);
1632 let c_buf = buf.clone_single()?;
1633
1634 self.buffs.push_back(buf);
1635
1636 return Ok(c_buf);
1637 }
1638
1639 return Err(RwBufferError::OutOfBuffers);
1640 }
1641
1642 pub
1650 fn allocate_in_place(&mut self) -> RwBuffer
1651 {
1652 let mut idx = Option::None;
1653
1654 for (i, item) in self.buffs.iter().enumerate()
1655 {
1656 if let Ok(_) = self.buffs[i].acqiure_if_free()
1657 {
1658 idx = Some(i);
1659
1660 break;
1661 }
1662 }
1663
1664 return
1665 idx
1666 .map_or(
1667 RwBuffer::new(self.buf_len),
1668 |f| self.buffs.remove(f).unwrap()
1669 );
1670
1671 }
1672
1673 pub
1686 fn compact(&mut self, mut cnt: usize) -> usize
1687 {
1688 let p_cnt = cnt;
1689
1690 self
1691 .buffs
1692 .retain(
1693 |buf|
1694 {
1695 if buf.is_free() == true
1696 {
1697 cnt -= 1;
1698
1699 return false;
1700 }
1701
1702 return true;
1703 }
1704 );
1705
1706 return p_cnt - cnt;
1707 }
1708
1709 #[cfg(test)]
1710 fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags<RwBuffer>>
1711 {
1712 return Some(self.buffs.get(index)?.get_flags());
1713 }
1714}
1715
1716#[cfg(feature = "std")]
1717#[cfg(test)]
1718mod tests;