shared_buffer_rs/
lib.rs

1///! A small crate which implements a thread safe implementation to allocate
2///! the buffer of some size, borrow as muable in current context or thread
3///! and borrow as many read only references as needed which are Send+Sync.
4///!
5///! It acts like Arc but embeds the RefCell functionality without any
6///! issues with Send and Sync.
7///!
8///! The main purpose it to have a lock free, lightweight buffer I/O for
9///! writing in one side and broadcast to multiple tasks i.e threads and
10///! async task making sure that it can not be modifyied.
11
12use std::
13{
14    collections::VecDeque, 
15    ops::{Deref, DerefMut}, 
16    ptr::{self, NonNull}, 
17    sync::atomic::{AtomicU64, Ordering},
18    fmt,
19};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum RwBufferError
23{
24    TooManyRead,
25    TooManyBase,
26    ReadTryAgianLater,
27    WriteTryAgianLater,
28    OutOfBuffers,
29    DowngradeFailed,
30    InvalidArguments
31}
32
33impl fmt::Display for RwBufferError
34{
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
36    {
37        match self
38        {
39            RwBufferError::TooManyRead => 
40                write!(f, "TooManyRead: read soft limit reached"),
41            RwBufferError::TooManyBase => 
42                write!(f, "TooManyBase: base soft limit reached"),
43            RwBufferError::ReadTryAgianLater => 
44                write!(f, "ReadTryAgianLater: shared access not available, try again later"),
45            RwBufferError::WriteTryAgianLater => 
46                write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
47            RwBufferError::OutOfBuffers => 
48                write!(f, "OutOfBuffers: no more free bufers are left"),
49            RwBufferError::DowngradeFailed => 
50                write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
51            RwBufferError::InvalidArguments => 
52                write!(f, "InvalidArguments: arguments are not valid"),                
53        }
54    }
55}
56
57pub type RwBufferRes<T> = Result<T, RwBufferError>;
58
59/// A read only buffer. This instance is [Send] and [Sync]
60/// as it does not provide any write access.
61#[derive(Debug, PartialEq, Eq)]
62pub struct RBuffer(NonNull<RwBufferInner>);
63
64unsafe impl Send for RBuffer {}
65unsafe impl Sync for RBuffer {}
66
67impl RBuffer
68{
69    #[inline]
70    fn new(inner: NonNull<RwBufferInner>) -> Self
71    {
72        return Self(inner);
73    }
74
75    #[cfg(test)]
76    fn get_flags(&self) -> RwBufferFlags
77    {
78        let inner = unsafe{ self.0.as_ref() };
79
80        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
81
82        return flags;
83    }
84
85    /// Borrow the inner buffer as slice.
86    pub
87    fn as_slice(&self) -> &[u8]
88    {
89        let inner = unsafe { self.0.as_ref() };
90
91        return inner.buf.as_ref().unwrap().as_slice();
92    }
93
94    /// Attempts to consume the instance and retrive the inner buffer. This means
95    /// that the instance will no longer be available.
96    ///
97    /// The following condition should be satisfied:
98    /// 1) No more readers except current instance.
99    ///
100    /// 2) No base references, item should not contain base references from [RwBuffer].
101    /// 
102    /// # Returns
103    /// 
104    /// A [Result] is returned with: 
105    /// 
106    /// * [Result::Ok] with the consumed inner [Vec]
107    /// 
108    /// * [Result::Err] with the consumed instance
109    pub
110    fn try_inner(mut self) -> Result<Vec<u8>, Self>
111    {
112        let inner = unsafe { self.0.as_ref() };
113
114        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
115
116        if flags.read == 1 && flags.write == false && flags.base == 0
117        {
118            let inner = unsafe { self.0.as_mut() };
119
120            let buf = inner.buf.take().unwrap();
121        
122            drop(self);
123
124            return Ok(buf);
125        }
126        else
127        {
128            return Err(self);
129        }
130    }
131
132    fn inner(&self) -> &RwBufferInner
133    {
134        return unsafe { self.0.as_ref() };
135    }
136}
137
138impl Deref for RBuffer
139{
140    type Target = Vec<u8>;
141
142    fn deref(&self) -> &Vec<u8>
143    {
144        let inner = self.inner();
145
146        return inner.buf.as_ref().unwrap();
147    }
148}
149
150impl Clone for RBuffer
151{
152    /// Clones the RBuffer incrementing the `read` reference.
153    /// 
154    /// # Returns 
155    /// 
156    /// Returns the new [RBuffer] instance.
157    /// 
158    /// # Panic
159    /// 
160    /// Panics if too many references were created. The reference count
161    /// is limited to max::u32 - 10
162    fn clone(&self) -> Self
163    {
164        let inner = self.inner();
165
166        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
167
168        if flags.read().unwrap() == false
169        {
170            panic!("too many read references for RBuffer");
171        }
172
173        inner.flags.store(flags.into(), Ordering::Release);
174
175        return Self(self.0);
176    }
177}
178
179impl Drop for RBuffer
180{
181    /// Should completly drop the instance (with data) only, if there is no more
182    /// readers or it is not referenced in the base.
183    fn drop(&mut self)
184    {
185        let inner = self.inner();
186
187        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
188
189        flags.unread();
190
191        // no one can write at that moment
192        if flags.read == 0 && flags.base == 0
193        {
194            // call descrutor
195            unsafe { ptr::drop_in_place(self.0.as_ptr()) };
196
197            return;
198        }
199
200        inner.flags.store(flags.into(), Ordering::Release);
201
202        return;
203    }
204}
205
206/// A Write and Read buffer. An exclusive instance which can not be copied or
207/// clonned. Once writing is complete, the instance can be dropped or downgraded to
208/// Read-only instance. This instance is NOT [Send] and [Sync]. 
209#[derive(Debug, PartialEq, Eq)]
210pub struct WBuffer
211{
212    /// A pointer to the leaked buffer instance.
213    buf: NonNull<RwBufferInner>,
214
215    /// Is set to `true` when `downgrade` is called.
216    downgraded: bool
217}
218
219unsafe impl Send for WBuffer{}
220
221impl WBuffer
222{
223    #[inline]
224    fn new(inner: NonNull<RwBufferInner>) -> Self
225    {
226        return Self{ buf: inner, downgraded: false };
227    }
228
229    /// Downgrades the `write` instance into the `read` instance by consuming the
230    /// [WBuffer]. 
231    /// 
232    /// Can not be performed vice-versa (at least in this version). In normal conditions
233    /// should never return Error.
234    /// 
235    /// # Returns 
236    /// 
237    /// A [Result] in form of [RwBufferRes] is returned with:
238    /// 
239    /// * [Result::Ok] with the [RBuffer] instance
240    /// 
241    /// * [Result::Err] may be returned a [RwBufferError::DowngradeFailed] in case of a bug
242    ///     in the code. This error is an indicator that there is a race condition which
243    ///     means there is an error in ordering or CPU does not support strong ordering.
244    pub
245    fn downgrade(mut self) ->  RwBufferRes<RBuffer>
246    {
247        let inner = unsafe { self.buf.as_ref() };
248
249        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
250
251        let res = flags.downgrade();
252
253        inner.flags.store(flags.into(), Ordering::Release);
254
255        if res == true
256        {
257            self.downgraded = true;
258
259            return Ok(RBuffer::new(self.buf.clone()));
260        }
261        else
262        {
263            return Err(RwBufferError::DowngradeFailed);
264        }
265    }
266
267    pub
268    fn as_slice(&self) -> &[u8]
269    {
270        let inner = unsafe { self.buf.as_ref() };
271
272        return inner.buf.as_ref().unwrap()
273    }
274}
275
276impl Deref for WBuffer
277{
278    type Target = Vec<u8>;
279
280    fn deref(&self) -> &Vec<u8>
281    {
282        let inner = unsafe { self.buf.as_ref() };
283
284        return inner.buf.as_ref().unwrap();
285    }
286}
287
288impl DerefMut for WBuffer
289{
290    fn deref_mut(&mut self) -> &mut Vec<u8>
291    {
292        let inner = unsafe { self.buf.as_mut() };
293
294        return inner.buf.as_mut().unwrap();
295    }
296}
297
298impl Drop for WBuffer
299{
300    /// The instance may perform `drop_in_place` if there is no `base` references.
301    fn drop(&mut self)
302    {
303        if self.downgraded == true
304        {
305            return;
306        }
307
308        let inner = unsafe { self.buf.as_ref() };
309
310        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
311
312        flags.unwrite();
313
314        if flags.read == 0 && flags.base == 0
315        {
316            // call descrutor
317            unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
318
319            return;
320        }
321
322        inner.flags.store(flags.into(), Ordering::Release);
323
324        return;
325    }
326}
327
328/// Internal structure which represents the status. It can not be
329/// larger than 8-byte to fit into [AtomicU64].
330#[repr(align(8))]
331#[derive(Debug, PartialEq, Eq)]
332struct RwBufferFlags
333{
334    /// A reader refs counter. If larger than 0, no writes possible.
335    read: u32, // = 4
336
337    /// An exclusive write lock. When true, no reades should present.
338    write: bool, // = 1
339
340    /// A base refs i.e which holds the data.
341    /// If this value is zero, means the instance can be dropped in place
342    /// when `write` is false and `read` equals 0.
343    base: u16, // = 2
344
345    /// Unused
346    unused0: u8 // = 1
347}
348
349impl From<u64> for RwBufferFlags
350{
351    fn from(value: u64) -> Self
352    {
353        return unsafe { std::mem::transmute(value) };
354    }
355}
356
357impl From<RwBufferFlags> for u64
358{
359    fn from(value: RwBufferFlags) -> Self
360    {
361        return unsafe { std::mem::transmute(value) };
362    }
363}
364
365impl Default for RwBufferFlags
366{
367    fn default() -> Self
368    {
369        return
370            Self
371            {
372                read: 0,
373                write: false,
374                base: 1,
375                unused0: 0,
376            };
377    }
378}
379
380impl RwBufferFlags
381{
382    /// A soft limit on the amount of references for reading instances.
383    pub const MAX_READ_REFS: u32 = u32::MAX - 2;
384
385    /// A soft limit on the amount of references for base instances.
386    pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
387
388    #[inline]
389    fn base(&mut self) -> bool
390    {
391        self.base += 1;
392
393        return self.base <= Self::MAX_BASE_REFS;
394    }
395
396    #[inline]
397    fn unbase(&mut self) -> bool
398    {
399        self.base -= 1;
400
401        return self.base != 0;
402    }
403
404    #[inline]
405    fn unread(&mut self)
406    {
407        self.read -= 1;
408    }
409
410    #[inline]
411    fn downgrade(&mut self) -> bool
412    {
413        if self.write == true
414        {
415            self.write = false;
416            self.read += 1;
417
418            return true;
419        }
420        else
421        {
422            return false;
423        }
424    }
425
426    #[inline]
427    fn read(&mut self) -> RwBufferRes<bool>
428    {
429        if self.write == false
430        {
431            self.read += 1;
432
433            return Ok(self.read <= Self::MAX_READ_REFS);
434        }
435
436        return Err(RwBufferError::ReadTryAgianLater);
437    }
438
439    #[inline]
440    fn write(&mut self) -> RwBufferRes<()>
441    {
442        if self.read == 0
443        {
444            self.write = true;
445
446            return Ok(());
447        }
448        else
449        {
450            return Err(RwBufferError::WriteTryAgianLater);
451        }
452    }
453
454    #[inline]
455    fn unwrite(&mut self)
456    {
457        self.write = false;
458    }
459}
460
461#[derive(Debug)]
462pub struct RwBufferInner
463{
464    /// A [RwBufferFlags] represented as atomic u64.
465    flags: AtomicU64,
466
467    /// A buffer.
468    buf: Option<Vec<u8>>,
469}
470
471impl RwBufferInner
472{
473    fn new(buf_size: usize) -> Self
474    {
475        return
476            Self
477            {
478                flags: AtomicU64::new(RwBufferFlags::default().into()),
479                buf: Some(vec![0_u8; buf_size])
480            };
481    }
482}
483
484/// A base instance which holds the `leaked` pointer to [RwBufferInner].
485/// 
486/// This instance can provide either an exclusive write access or 
487/// multiple read access, but not at the same time. Can be used to store
488/// the instance. This instance is [Send] and [Sync] because the insternals
489/// are guarded by ordered atomic operations.
490#[derive(Debug, PartialEq, Eq)]
491pub struct RwBuffer(NonNull<RwBufferInner>);
492
493unsafe impl Send for RwBuffer {}
494unsafe impl Sync for RwBuffer {}
495
496impl RwBuffer
497{
498    #[inline]
499    fn new(buf_size: usize) -> Self
500    {
501        let status = Box::new(RwBufferInner::new(buf_size));
502
503        return Self(Box::leak(status).into());
504    }
505
506    #[inline]
507    fn inner(&self) -> &RwBufferInner
508    {
509        return unsafe { self.0.as_ref() };
510    }
511
512    /// Checks if this instance satisfies the following conditions:
513    /// 
514    /// * No exclusive write access
515    /// 
516    /// * No read access
517    /// 
518    /// * There is only one base reference.
519    /// 
520    /// # Returns 
521    /// 
522    /// * - `true` if instance satisfies the conditions above.
523    /// 
524    /// * - `false` if does not satisfy the conditions above.
525    #[inline]
526    pub
527    fn is_free(&self) -> bool
528    {
529        let inner = self.inner();
530
531        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
532
533        return flags.write == false && flags.read == 0 && flags.base == 1;
534    }
535
536    /// Accures the instance, if it satisfy the following conditions:
537    /// 
538    /// * No exclusive write access
539    /// 
540    /// * No read access
541    /// 
542    /// * There is only one base reference.
543    /// 
544    /// # Returns 
545    /// 
546    /// * - `true` if instance satisfies the conditions above.
547    /// 
548    /// * - `false` if does not satisfy the conditions above.
549    #[inline]
550    pub
551    fn accure_if_free(&self) -> bool
552    {
553        let inner = self.inner();
554
555        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
556
557        let res =
558            if flags.write == false && flags.read == 0 && flags.base == 1
559            {
560                let _ = flags.base();
561
562                true
563            }
564            else
565            {
566                false
567            };
568
569        inner.flags.store(flags.into(), Ordering::Release);
570
571        return res;
572    }
573
574    /// Attemts to make an exclusive (write) access to the buffer.
575    /// 
576    /// # Returns
577    /// 
578    /// A [Result] in form of [RwBufferRes] is returned with:
579    /// 
580    /// * [Result::Ok] with the [WBuffer] instance
581    /// 
582    /// * [Result::Err] may be returned a [RwBufferError::WriteTryAgianLater] in case 
583    ///     if the there is/are an active `read` references.
584    pub
585    fn write(&self) -> RwBufferRes<WBuffer>
586    {
587        let inner = self.inner();
588
589        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
590
591        let res = flags.write();
592
593        inner.flags.store(flags.into(), Ordering::Release);
594
595        res?;
596
597        return Ok(WBuffer::new(self.0.clone()));
598    }
599
600    /// Attemts to make a shared (read) access to the buffer.
601    /// 
602    /// # Returns
603    /// 
604    /// A [Result] in form of [RwBufferRes] is returned with:
605    /// 
606    /// * [Result::Ok] with the [RBuffer] instance
607    /// 
608    /// * [Result::Err] with error type is returned:
609    /// 
610    /// - [RwBufferError::TooManyRead] is returned when the soft limit of
611    ///     references was reached.
612    /// 
613    /// - [RwBufferError::ReadTryAgianLater] is returned if there is an 
614    ///     active exclusive access.
615    pub
616    fn read(&self) -> RwBufferRes<RBuffer>
617    {
618        let inner = self.inner();
619
620        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
621
622        let res = flags.read();
623
624        inner.flags.store(flags.into(), Ordering::Release);
625
626        if res? == false
627        {
628            return Err(RwBufferError::TooManyRead);
629        }
630
631        return Ok(RBuffer::new(self.0.clone()));
632    }
633
634    #[cfg(test)]
635    fn get_flags(&self) -> RwBufferFlags
636    {
637        let inner = self.inner();
638
639        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
640
641        return flags;
642    }
643}
644
645impl Clone for RwBuffer
646{
647    /// Clones the instance and increasing the `base` ref count.
648    /// 
649    /// Will `panic` if a soft limit of refs were reached.
650    fn clone(&self) -> Self
651    {
652        let inner = self.inner();
653
654        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
655
656        if flags.base() == false
657        {
658            panic!("too many base references for RBuffer");
659        }
660
661        inner.flags.store(flags.into(), Ordering::Release);
662
663        return Self(self.0.clone());
664    }
665}
666
667impl Drop for RwBuffer
668{
669    /// Drops the RwBuffer instance. In case if there is no readers and
670    /// writers, then drop immidiatly the inner data.
671    /// In case if there is any readers or writing, then drop only wrapper which
672    /// is the zero reader.
673    fn drop(&mut self)
674    {
675        let inner = self.inner();
676
677        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
678
679        let unbased = flags.unbase();
680
681        if flags.read == 0 && flags.write == false && unbased == false
682        {
683            // call descrutor
684            unsafe { ptr::drop_in_place(self.0.as_ptr()) };
685
686            return;
687        }
688
689        inner.flags.store(flags.into(), Ordering::Release);
690    }
691}
692
693/// An instance which controls the allocation of the new buffers or
694/// reusage of already created and free instances. This instance is
695/// not thread safe. The external mutex should be used.
696#[derive(Debug)]
697pub struct RwBuffers
698{
699    /// A buffer length in bytes. Not aligned.
700    buf_len: usize,
701
702    /// A maximum slots for new buffers.
703    bufs_cnt_lim: usize,
704
705    /// A list of buffers.
706    buffs: VecDeque<RwBuffer>
707
708}
709
710impl RwBuffers
711{
712    /// Creates new instance wshich holds the base reference in the 
713    /// inner storage with the capacity bounds.
714    /// 
715    /// # Arguments
716    /// 
717    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
718    ///     the payload is located.
719    /// 
720    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
721    /// 
722    /// * `bufs_cnt_lim` - a maximum amount of the available slots. Determines the 
723    ///     capacity bounds.
724    /// 
725    /// # Returns
726    /// 
727    /// A [Result] in form of [RwBufferRes] is returned with:
728    /// 
729    /// * [Result::Ok] with the [RwBuffers] instance
730    /// 
731    /// * [Result::Err] with error type is returned:
732    /// 
733    /// - [RwBufferError::InvalidArguments] is returned when the arguments are
734    ///     incorrect. 
735    pub
736    fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
737    {
738        if pre_init_cnt > bufs_cnt_lim
739        {
740            return Err(RwBufferError::InvalidArguments);
741        }
742        else if buf_len == 0
743        {
744            return Err(RwBufferError::InvalidArguments);
745        }
746
747        let buffs: VecDeque<RwBuffer> = 
748            if pre_init_cnt > 0
749            {
750                let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
751
752                for _ in 0..pre_init_cnt
753                {
754                    buffs.push_back(RwBuffer::new(buf_len));
755                }
756
757                buffs
758            }
759            else
760            {
761                VecDeque::with_capacity(bufs_cnt_lim)
762            };
763
764        return Ok(
765            Self
766            {
767                buf_len: buf_len,
768                bufs_cnt_lim: bufs_cnt_lim,
769                buffs: buffs,
770            }
771        )
772    }
773
774    /// Same as `new` but without any limits. Unbounded storage.
775    /// 
776    /// # Arguments
777    /// 
778    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
779    ///     the payload is located.
780    /// 
781    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
782    /// 
783    /// # Returns
784    /// 
785    /// Returns the instance.
786    pub
787    fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
788    {
789        let mut buffs = VecDeque::with_capacity(pre_init_cnt);
790
791        for _ in 0..pre_init_cnt
792        {
793            buffs.push_back(RwBuffer::new(buf_len));
794        }
795
796        return
797            Self
798            {
799                buf_len: buf_len,
800                bufs_cnt_lim: 0,
801                buffs: buffs,
802            };
803    }
804
805    /// Allocates either a new buffer or reuse the free. If the instance
806    /// is created with bounds then in case if no free slots available
807    /// returns error.
808    /// 
809    /// # Returns
810    /// 
811    /// A [Result] in form of [RwBufferRes] is returned with:
812    /// 
813    /// * [Result::Ok] with the [RwBuffers] instance
814    /// 
815    /// * [Result::Err] with [RwBufferError::OutOfBuffers] error.
816    pub
817    fn allocate(&mut self) -> RwBufferRes<RwBuffer>
818    {
819        // check the list if any available
820        for buf in self.buffs.iter()
821        {
822            if buf.is_free() == true
823            {
824                return Ok(buf.clone());
825            }
826        }
827
828        if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
829        {
830            let buf = RwBuffer::new(self.buf_len);
831            let c_buf = buf.clone();
832
833            self.buffs.push_back(buf);
834
835            return Ok(c_buf);
836        }
837
838        return Err(RwBufferError::OutOfBuffers);
839    }
840
841    /// Allocates a buffer "in place" i.e finds the next allocated but unused
842    /// buffer and removes it from the list or alloactes new buffer without
843    /// adding it to the list. Should never return error.
844    /// 
845    /// # Returns
846    /// 
847    /// A [RwBuffer] is returned.
848    pub
849    fn allocate_in_place(&mut self) -> RwBuffer
850    {
851        for i in 0..self.buffs.len()
852        {
853            if self.buffs[i].accure_if_free() == true
854            {
855                return self.buffs.remove(i).unwrap();
856            }
857        }
858
859        let buf = RwBuffer::new(self.buf_len);
860
861        return buf;
862    }
863
864    /// Retains the buffer list by removing any unused buffers as many times
865    /// as set in the argument `cnt`. It does not guaranty than the selected 
866    /// amount will be freed.
867    /// 
868    /// # Arguments
869    /// 
870    /// * `cnt` - how many slots to clean before exit.
871    /// 
872    /// # Returns 
873    /// 
874    /// A [usize] is returned which indicates how many instances was removed
875    /// before the `cnt` was reached. 
876    pub
877    fn compact(&mut self, mut cnt: usize) -> usize
878    {
879        let p_cnt = cnt;
880
881        self
882            .buffs
883            .retain(
884                |buf|
885                {
886                    if buf.is_free() == true
887                    {
888                        cnt -= 1;
889
890                        return false;
891                    }
892
893                    return true;
894                }
895            );
896
897        return p_cnt - cnt;
898    }
899
900    #[cfg(test)]
901    fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags>
902    {
903        return Some(self.buffs.get(index)?.get_flags());
904    }
905}
906
907
908#[cfg(test)]
909mod tests
910{
911    use std::time::{Duration, Instant};
912
913    use super::*;
914
915    #[test]
916    fn simple_test()
917    {
918        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
919
920        let buf0_res = bufs.allocate();
921        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
922
923        let buf0 = buf0_res.unwrap();
924
925        let buf0_w = buf0.write();
926        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
927        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
928        drop(buf0_w);
929
930        let buf0_r = buf0.read();
931        assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
932        assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
933
934        let buf0_1 = buf0.clone();
935        assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
936
937        let flags0 = buf0.get_flags();
938        let flags0_1 = buf0_1.get_flags();
939
940        assert_eq!(flags0, flags0_1);
941        assert_eq!(flags0.base, 3);
942        assert_eq!(flags0.read, 1);
943        assert_eq!(flags0.write, false);
944    }
945
946    #[test]
947    fn simple_test_dopped_in_place()
948    {
949        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
950
951        let buf0_res = bufs.allocate();
952        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
953
954        let buf0 = buf0_res.unwrap();
955
956        println!("{:?}", buf0.get_flags());
957
958        let buf0_w = buf0.write();
959        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
960        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
961
962        drop(buf0);
963
964        let buf0_flags = bufs.get_flags_by_index(0);
965        assert_eq!(buf0_flags.is_some(), true, "no flags");
966        let buf0_flags = buf0_flags.unwrap();
967
968        println!("{:?}", buf0_flags);
969
970        assert_eq!(buf0_flags.base, 1);
971        assert_eq!(buf0_flags.read, 0);
972        assert_eq!(buf0_flags.write, true);
973
974        drop(buf0_w.unwrap());
975
976        let buf0_flags = bufs.get_flags_by_index(0);
977        assert_eq!(buf0_flags.is_some(), true, "no flags");
978        let buf0_flags = buf0_flags.unwrap();
979
980        println!("{:?}", buf0_flags);
981
982        assert_eq!(buf0_flags.base, 1);
983        assert_eq!(buf0_flags.read, 0);
984        assert_eq!(buf0_flags.write, false);
985
986    }
987
988    #[test]
989    fn simple_test_dropped_in_place_downgrade()
990    {
991        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
992
993        let buf0_res = bufs.allocate();
994        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
995
996        let buf0 = buf0_res.unwrap();
997
998        println!("{:?}", buf0.get_flags());
999
1000        let buf0_w = buf0.write();
1001        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1002        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1003
1004        drop(buf0);
1005
1006        let buf0_rd = buf0_w.unwrap().downgrade();
1007        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1008
1009        let buf0_flags = bufs.get_flags_by_index(0);
1010        assert_eq!(buf0_flags.is_some(), true, "no flags");
1011        let buf0_flags = buf0_flags.unwrap();
1012
1013        println!("{:?}", buf0_flags);
1014
1015        assert_eq!(buf0_flags.base, 1);
1016        assert_eq!(buf0_flags.read, 1);
1017        assert_eq!(buf0_flags.write, false);
1018
1019    }
1020
1021    #[test]
1022    fn simple_test_drop_in_place_downgrade()
1023    {
1024        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1025
1026        let buf0 = bufs.allocate_in_place();
1027
1028        println!("{:?}", buf0.get_flags());
1029
1030        let buf0_w = buf0.write();
1031        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1032        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1033
1034        drop(buf0);
1035
1036        let buf0_rd = buf0_w.unwrap().downgrade();
1037        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1038
1039        let buf0_flags = bufs.get_flags_by_index(0);
1040        assert_eq!(buf0_flags.is_some(), false, "flags");
1041
1042        let buf0_rd = buf0_rd.unwrap();
1043        let buf0_flags = buf0_rd.get_flags();
1044
1045        println!("{:?}", buf0_flags);
1046
1047        assert_eq!(buf0_flags.base, 0);
1048        assert_eq!(buf0_flags.read, 1);
1049        assert_eq!(buf0_flags.write, false);
1050    }
1051
1052    #[test]
1053    fn timing_test()
1054    {
1055        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1056
1057        for _ in 0..10
1058        {
1059            let inst = Instant::now();
1060            let buf0_res = bufs.allocate_in_place();
1061            let end = inst.elapsed();
1062
1063            println!("alloc: {:?}", end);
1064            drop(buf0_res);
1065        }
1066
1067        let buf0_res = bufs.allocate();
1068        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1069
1070        let buf0 = buf0_res.unwrap();
1071
1072        for _ in 0..10
1073        {
1074            let inst = Instant::now();
1075            let buf0_w = buf0.write();
1076            let end = inst.elapsed();
1077
1078            println!("write: {:?}", end);
1079
1080            assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1081            assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1082            drop(buf0_w);
1083        }
1084
1085        for _ in 0..10
1086        {
1087            let inst = Instant::now();
1088            let buf0_r = buf0.read();
1089            let end = inst.elapsed();
1090
1091            println!("read: {:?}", end);
1092
1093            assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1094            assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1095            drop(buf0_r);
1096        }
1097    }
1098
1099    #[test]
1100    fn simple_test_mth()
1101    {
1102        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1103
1104        let buf0 = bufs.allocate().unwrap();
1105
1106        let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1107
1108        let join1=
1109            std::thread::spawn(move ||
1110                {
1111                    println!("{:?}", buf0_rd);
1112
1113                    std::thread::sleep(Duration::from_secs(2));
1114
1115                    return;
1116                }
1117            );
1118
1119        let buf1_rd = buf0.read().unwrap();
1120
1121        let join2=
1122            std::thread::spawn(move ||
1123                {
1124                    println!("{:?}", buf1_rd);
1125
1126                    std::thread::sleep(Duration::from_secs(2));
1127
1128                    return;
1129                }
1130            );
1131
1132        let flags = buf0.get_flags();
1133
1134        assert_eq!(flags.base, 2);
1135        assert_eq!(flags.read, 2);
1136        assert_eq!(flags.write, false);
1137
1138        let _ = join1.join();
1139        let _ = join2.join();
1140
1141        let flags = buf0.get_flags();
1142
1143        assert_eq!(flags.base, 2);
1144        assert_eq!(flags.read, 0);
1145        assert_eq!(flags.write, false);
1146    }
1147
1148    #[test]
1149    fn test_try_into_read()
1150    {
1151        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1152
1153        let buf0 = bufs.allocate_in_place();
1154
1155        println!("{:?}", buf0.get_flags());
1156
1157        let buf0_w = buf0.write();
1158        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1159        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1160
1161        drop(buf0);
1162
1163        let buf0_rd = buf0_w.unwrap().downgrade();
1164        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1165
1166        let buf0_flags = bufs.get_flags_by_index(0);
1167        assert_eq!(buf0_flags.is_some(), false, "flags");
1168
1169        let buf0_rd = buf0_rd.unwrap();
1170        let buf0_flags = buf0_rd.get_flags();
1171
1172        println!("{:?}", buf0_flags);
1173
1174        assert_eq!(buf0_flags.base, 0);
1175        assert_eq!(buf0_flags.read, 1);
1176        assert_eq!(buf0_flags.write, false);
1177
1178        let inst = Instant::now();
1179        let ve = buf0_rd.try_inner();
1180        let end = inst.elapsed();
1181
1182        println!("try inner: {:?}", end);
1183        assert_eq!(ve.is_ok(), true);
1184
1185
1186    }
1187
1188    #[tokio::test]
1189    async fn test_multithreading()
1190    {
1191
1192        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1193
1194        let buf0 = bufs.allocate().unwrap();
1195
1196        let mut buf0_write = buf0.write().unwrap();
1197        
1198        buf0_write.as_mut_slice()[0] = 5;
1199        buf0_write.as_mut_slice()[1] = 4;
1200
1201        println!("{}", buf0_write[0]);
1202
1203        let buf0_r = buf0_write.downgrade().unwrap();
1204
1205        let join1=
1206            tokio::task::spawn(async move
1207                {
1208                    println!("thread[1]:{}", buf0_r[0]);
1209
1210                    tokio::time::sleep(Duration::from_millis(200)).await;
1211
1212                    return;
1213                }
1214            );
1215
1216        let buf0_r = buf0.read().unwrap();
1217
1218        // drop base
1219        drop(buf0);
1220
1221        let join2=
1222            tokio::task::spawn(async move
1223                {
1224                    println!("thread[2]: {}", buf0_r[0]);
1225                    println!("thread[2]: {}", buf0_r[1]);
1226
1227                    tokio::time::sleep(Duration::from_millis(200)).await;
1228
1229                    return;
1230                }
1231            );
1232
1233        let _ = join1.await;
1234        let _ = join2.await;
1235
1236        return;
1237    }
1238}