shared_buffer_rs/
lib.rs

1/*-
2 * shared-buffer-rs - a buffer managment and sharing crate.
3 * 
4 * Copyright (C) 2025 Aleksandr Morozov
5 * 
6 * The scram-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14/*! A small crate which implements a thread safe implementation to allocate
15 the buffer of some size, borrow as muable in current context or thread
16 and borrow as many read only references as needed which are Send+Sync.
17
18 It acts like Arc but embeds the RefCell functionality without any
19 issues with Send and Sync.
20
21 The main purpose it to have a lock free, lightweight buffer I/O for
22 writing in one side and broadcast to multiple tasks i.e threads and
23 async task making sure that it can not be modifyied.
24 */
25
26#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31#[cfg(not(feature = "std"))]
32use core::
33{
34    fmt, 
35    mem, 
36    ops::{Deref, DerefMut}, 
37    ptr::{self, NonNull}, 
38    sync::atomic::{AtomicU64, Ordering}
39};
40
41#[cfg(not(feature = "std"))]
42use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
43
44#[cfg(not(feature = "std"))]
45use alloc::vec;
46
47#[cfg(feature = "std")]
48use std::
49{
50    collections::VecDeque, 
51    ops::{Deref, DerefMut}, 
52    ptr::{self, NonNull}, 
53    sync::atomic::{AtomicU64, Ordering},
54    fmt,
55    mem
56};
57
58
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum RwBufferError
62{
63    TooManyRead,
64    TooManyBase,
65    ReadTryAgianLater,
66    WriteTryAgianLater,
67    OutOfBuffers,
68    DowngradeFailed,
69    InvalidArguments
70}
71
72impl fmt::Display for RwBufferError
73{
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
75    {
76        match self
77        {
78            RwBufferError::TooManyRead => 
79                write!(f, "TooManyRead: read soft limit reached"),
80            RwBufferError::TooManyBase => 
81                write!(f, "TooManyBase: base soft limit reached"),
82            RwBufferError::ReadTryAgianLater => 
83                write!(f, "ReadTryAgianLater: shared access not available, try again later"),
84            RwBufferError::WriteTryAgianLater => 
85                write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
86            RwBufferError::OutOfBuffers => 
87                write!(f, "OutOfBuffers: no more free bufers are left"),
88            RwBufferError::DowngradeFailed => 
89                write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
90            RwBufferError::InvalidArguments => 
91                write!(f, "InvalidArguments: arguments are not valid"),                
92        }
93    }
94}
95
96pub type RwBufferRes<T> = Result<T, RwBufferError>;
97
98/// A read only buffer. This instance is [Send] and [Sync]
99/// as it does not provide any write access.
100#[derive(Debug, PartialEq, Eq)]
101pub struct RBuffer(NonNull<RwBufferInner>);
102
103unsafe impl Send for RBuffer {}
104unsafe impl Sync for RBuffer {}
105
106impl RBuffer
107{
108    #[inline]
109    fn new(inner: NonNull<RwBufferInner>) -> Self
110    {
111        return Self(inner);
112    }
113
114    #[cfg(test)]
115    fn get_flags(&self) -> RwBufferFlags
116    {
117        use core::sync::atomic::Ordering;
118
119        let inner = unsafe{ self.0.as_ref() };
120
121        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
122
123        return flags;
124    }
125
126    /// Borrow the inner buffer as slice.
127    pub
128    fn as_slice(&self) -> &[u8]
129    {
130        let inner = unsafe { self.0.as_ref() };
131
132        return inner.buf.as_ref().unwrap().as_slice();
133    }
134
135    /// Attempts to consume the instance and retrive the inner buffer. This means
136    /// that the instance will no longer be available.
137    ///
138    /// The following condition should be satisfied:
139    /// 1) No more readers except current instance.
140    ///
141    /// 2) No base references, item should not contain base references from [RwBuffer].
142    /// 
143    /// # Returns
144    /// 
145    /// A [Result] is returned with: 
146    /// 
147    /// * [Result::Ok] with the consumed inner [Vec]
148    /// 
149    /// * [Result::Err] with the consumed instance
150    pub
151    fn try_inner(mut self) -> Result<Vec<u8>, Self>
152    {
153        let inner = unsafe { self.0.as_ref() };
154
155        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
156
157        if flags.read == 1 && flags.write == false && flags.base == 0
158        {
159            let inner = unsafe { self.0.as_mut() };
160
161            let buf = inner.buf.take().unwrap();
162        
163            drop(self);
164
165            return Ok(buf);
166        }
167        else
168        {
169            return Err(self);
170        }
171    }
172
173    fn inner(&self) -> &RwBufferInner
174    {
175        return unsafe { self.0.as_ref() };
176    }
177}
178
179impl Deref for RBuffer
180{
181    type Target = Vec<u8>;
182
183    fn deref(&self) -> &Vec<u8>
184    {
185        let inner = self.inner();
186
187        return inner.buf.as_ref().unwrap();
188    }
189}
190
191impl Clone for RBuffer
192{
193    /// Clones the RBuffer incrementing the `read` reference.
194    /// 
195    /// # Returns 
196    /// 
197    /// Returns the new [RBuffer] instance.
198    /// 
199    /// # Panic
200    /// 
201    /// Panics if too many references were created. The reference count
202    /// is limited to max::u32 - 10
203    fn clone(&self) -> Self
204    {
205        let inner = self.inner();
206
207        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
208
209        if flags.read().unwrap() == false
210        {
211            panic!("too many read references for RBuffer");
212        }
213
214        inner.flags.store(flags.into(), Ordering::Release);
215
216        return Self(self.0);
217    }
218}
219
220impl Drop for RBuffer
221{
222    /// Should completly drop the instance (with data) only, if there is no more
223    /// readers or it is not referenced in the base.
224    fn drop(&mut self)
225    {
226        let inner = self.inner();
227
228        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
229
230        flags.unread();
231
232        // no one can write at that moment
233        if flags.read == 0 && flags.base == 0
234        {
235            // call descrutor
236            unsafe { ptr::drop_in_place(self.0.as_ptr()) };
237
238            return;
239        }
240
241        inner.flags.store(flags.into(), Ordering::Release);
242
243        return;
244    }
245}
246
247/// A Write and Read buffer. An exclusive instance which can not be copied or
248/// clonned. Once writing is complete, the instance can be dropped or downgraded to
249/// Read-only instance. This instance is NOT [Send] and [Sync]. 
250#[derive(Debug, PartialEq, Eq)]
251pub struct WBuffer
252{
253    /// A pointer to the leaked buffer instance.
254    buf: NonNull<RwBufferInner>,
255
256    /// Is set to `true` when `downgrade` is called.
257    downgraded: bool
258}
259
260unsafe impl Send for WBuffer{}
261
262impl WBuffer
263{
264    #[inline]
265    fn new(inner: NonNull<RwBufferInner>) -> Self
266    {
267        return Self{ buf: inner, downgraded: false };
268    }
269
270    /// Downgrades the `write` instance into the `read` instance by consuming the
271    /// [WBuffer]. 
272    /// 
273    /// Can not be performed vice-versa (at least in this version). In normal conditions
274    /// should never return Error.
275    /// 
276    /// # Returns 
277    /// 
278    /// A [Result] in form of [RwBufferRes] is returned with:
279    /// 
280    /// * [Result::Ok] with the [RBuffer] instance
281    /// 
282    /// * [Result::Err] may be returned a [RwBufferError::DowngradeFailed] in case of a bug
283    ///     in the code. This error is an indicator that there is a race condition which
284    ///     means there is an error in ordering or CPU does not support strong ordering.
285    pub
286    fn downgrade(mut self) ->  RwBufferRes<RBuffer>
287    {
288        let inner = unsafe { self.buf.as_ref() };
289
290        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
291
292        let res = flags.downgrade();
293
294        inner.flags.store(flags.into(), Ordering::Release);
295
296        if res == true
297        {
298            self.downgraded = true;
299
300            return Ok(RBuffer::new(self.buf.clone()));
301        }
302        else
303        {
304            return Err(RwBufferError::DowngradeFailed);
305        }
306    }
307
308    pub
309    fn as_slice(&self) -> &[u8]
310    {
311        let inner = unsafe { self.buf.as_ref() };
312
313        return inner.buf.as_ref().unwrap()
314    }
315}
316
317impl Deref for WBuffer
318{
319    type Target = Vec<u8>;
320
321    fn deref(&self) -> &Vec<u8>
322    {
323        let inner = unsafe { self.buf.as_ref() };
324
325        return inner.buf.as_ref().unwrap();
326    }
327}
328
329impl DerefMut for WBuffer
330{
331    fn deref_mut(&mut self) -> &mut Vec<u8>
332    {
333        let inner = unsafe { self.buf.as_mut() };
334
335        return inner.buf.as_mut().unwrap();
336    }
337}
338
339impl Drop for WBuffer
340{
341    /// The instance may perform `drop_in_place` if there is no `base` references.
342    fn drop(&mut self)
343    {
344        if self.downgraded == true
345        {
346            return;
347        }
348
349        let inner = unsafe { self.buf.as_ref() };
350
351        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
352
353        flags.unwrite();
354
355        if flags.read == 0 && flags.base == 0
356        {
357            // call descrutor
358            unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
359
360            return;
361        }
362
363        inner.flags.store(flags.into(), Ordering::Release);
364
365        return;
366    }
367}
368
369/// Internal structure which represents the status. It can not be
370/// larger than 8-byte to fit into [AtomicU64].
371#[repr(align(8))]
372#[derive(Debug, PartialEq, Eq)]
373struct RwBufferFlags
374{
375    /// A reader refs counter. If larger than 0, no writes possible.
376    read: u32, // = 4
377
378    /// An exclusive write lock. When true, no reades should present.
379    write: bool, // = 1
380
381    /// A base refs i.e which holds the data.
382    /// If this value is zero, means the instance can be dropped in place
383    /// when `write` is false and `read` equals 0.
384    base: u16, // = 2
385
386    /// Unused
387    unused0: u8 // = 1
388}
389
390impl From<u64> for RwBufferFlags
391{
392    fn from(value: u64) -> Self
393    {
394        return unsafe { mem::transmute(value) };
395    }
396}
397
398impl From<RwBufferFlags> for u64
399{
400    fn from(value: RwBufferFlags) -> Self
401    {
402        return unsafe { mem::transmute(value) };
403    }
404}
405
406impl Default for RwBufferFlags
407{
408    fn default() -> Self
409    {
410        return
411            Self
412            {
413                read: 0,
414                write: false,
415                base: 1,
416                unused0: 0,
417            };
418    }
419}
420
421impl RwBufferFlags
422{
423    /// A soft limit on the amount of references for reading instances.
424    pub const MAX_READ_REFS: u32 = u32::MAX - 2;
425
426    /// A soft limit on the amount of references for base instances.
427    pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
428
429    #[inline]
430    fn base(&mut self) -> bool
431    {
432        self.base += 1;
433
434        return self.base <= Self::MAX_BASE_REFS;
435    }
436
437    #[inline]
438    fn unbase(&mut self) -> bool
439    {
440        self.base -= 1;
441
442        return self.base != 0;
443    }
444
445    #[inline]
446    fn unread(&mut self)
447    {
448        self.read -= 1;
449    }
450
451    #[inline]
452    fn downgrade(&mut self) -> bool
453    {
454        if self.write == true
455        {
456            self.write = false;
457            self.read += 1;
458
459            return true;
460        }
461        else
462        {
463            return false;
464        }
465    }
466
467    #[inline]
468    fn read(&mut self) -> RwBufferRes<bool>
469    {
470        if self.write == false
471        {
472            self.read += 1;
473
474            return Ok(self.read <= Self::MAX_READ_REFS);
475        }
476
477        return Err(RwBufferError::ReadTryAgianLater);
478    }
479
480    #[inline]
481    fn write(&mut self) -> RwBufferRes<()>
482    {
483        if self.read == 0
484        {
485            self.write = true;
486
487            return Ok(());
488        }
489        else
490        {
491            return Err(RwBufferError::WriteTryAgianLater);
492        }
493    }
494
495    #[inline]
496    fn unwrite(&mut self)
497    {
498        self.write = false;
499    }
500}
501
502#[derive(Debug)]
503pub struct RwBufferInner
504{
505    /// A [RwBufferFlags] represented as atomic u64.
506    flags: AtomicU64,
507
508    /// A buffer.
509    buf: Option<Vec<u8>>,
510}
511
512impl RwBufferInner
513{
514    fn new(buf_size: usize) -> Self
515    {
516        return
517            Self
518            {
519                flags: AtomicU64::new(RwBufferFlags::default().into()),
520                buf: Some(vec![0_u8; buf_size])
521            };
522    }
523}
524
525/// A base instance which holds the `leaked` pointer to [RwBufferInner].
526/// 
527/// This instance can provide either an exclusive write access or 
528/// multiple read access, but not at the same time. Can be used to store
529/// the instance. This instance is [Send] and [Sync] because the insternals
530/// are guarded by ordered atomic operations.
531#[derive(Debug, PartialEq, Eq)]
532pub struct RwBuffer(NonNull<RwBufferInner>);
533
534unsafe impl Send for RwBuffer {}
535unsafe impl Sync for RwBuffer {}
536
537impl RwBuffer
538{
539    #[inline]
540    fn new(buf_size: usize) -> Self
541    {
542        let status = Box::new(RwBufferInner::new(buf_size));
543
544        return Self(Box::leak(status).into());
545    }
546
547    #[inline]
548    fn inner(&self) -> &RwBufferInner
549    {
550        return unsafe { self.0.as_ref() };
551    }
552
553    /// Checks if this instance satisfies the following conditions:
554    /// 
555    /// * No exclusive write access
556    /// 
557    /// * No read access
558    /// 
559    /// * There is only one base reference.
560    /// 
561    /// # Returns 
562    /// 
563    /// * - `true` if instance satisfies the conditions above.
564    /// 
565    /// * - `false` if does not satisfy the conditions above.
566    #[inline]
567    pub
568    fn is_free(&self) -> bool
569    {
570        let inner = self.inner();
571
572        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
573
574        return flags.write == false && flags.read == 0 && flags.base == 1;
575    }
576
577    /// Accures the instance, if it satisfy the following conditions:
578    /// 
579    /// * No exclusive write access
580    /// 
581    /// * No read access
582    /// 
583    /// * There is only one base reference.
584    /// 
585    /// # Returns 
586    /// 
587    /// * - `true` if instance satisfies the conditions above.
588    /// 
589    /// * - `false` if does not satisfy the conditions above.
590    #[inline]
591    pub
592    fn accure_if_free(&self) -> bool
593    {
594        let inner = self.inner();
595
596        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
597
598        let res = flags.write == false && flags.read == 0 && flags.base == 1;
599            /*if flags.write == false && flags.read == 0 && flags.base == 1
600            {
601                /*if consume == false
602                {
603                    let _ = flags.base();
604                }*/
605
606                true
607            }
608            else
609            {
610                false
611            };*/
612
613        inner.flags.store(flags.into(), Ordering::Release);
614
615        return res;
616    }
617
618    /// Attemts to make an exclusive (write) access to the buffer.
619    /// 
620    /// # Returns
621    /// 
622    /// A [Result] in form of [RwBufferRes] is returned with:
623    /// 
624    /// * [Result::Ok] with the [WBuffer] instance
625    /// 
626    /// * [Result::Err] may be returned a [RwBufferError::WriteTryAgianLater] in case 
627    ///     if the there is/are an active `read` references.
628    pub
629    fn write(&self) -> RwBufferRes<WBuffer>
630    {
631        let inner = self.inner();
632
633        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
634
635        let res = flags.write();
636
637        inner.flags.store(flags.into(), Ordering::Release);
638
639        res?;
640
641        return Ok(WBuffer::new(self.0.clone()));
642    }
643
644    /// Attemts to make a shared (read) access to the buffer.
645    /// 
646    /// # Returns
647    /// 
648    /// A [Result] in form of [RwBufferRes] is returned with:
649    /// 
650    /// * [Result::Ok] with the [RBuffer] instance
651    /// 
652    /// * [Result::Err] with error type is returned:
653    /// 
654    /// - [RwBufferError::TooManyRead] is returned when the soft limit of
655    ///     references was reached.
656    /// 
657    /// - [RwBufferError::ReadTryAgianLater] is returned if there is an 
658    ///     active exclusive access.
659    pub
660    fn read(&self) -> RwBufferRes<RBuffer>
661    {
662        let inner = self.inner();
663
664        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
665
666        let res = flags.read();
667
668        inner.flags.store(flags.into(), Ordering::Release);
669
670        if res? == false
671        {
672            return Err(RwBufferError::TooManyRead);
673        }
674
675        return Ok(RBuffer::new(self.0.clone()));
676    }
677
678    #[cfg(test)]
679    fn get_flags(&self) -> RwBufferFlags
680    {
681        let inner = self.inner();
682
683        let flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
684
685        return flags;
686    }
687}
688
689impl Clone for RwBuffer
690{
691    /// Clones the instance and increasing the `base` ref count.
692    /// 
693    /// Will `panic` if a soft limit of refs were reached.
694    fn clone(&self) -> Self
695    {
696        let inner = self.inner();
697
698        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
699
700        if flags.base() == false
701        {
702            panic!("too many base references for RBuffer");
703        }
704
705        inner.flags.store(flags.into(), Ordering::Release);
706
707        return Self(self.0.clone());
708    }
709}
710
711impl Drop for RwBuffer
712{
713    /// Drops the RwBuffer instance. In case if there is no readers and
714    /// writers, then drop immidiatly the inner data.
715    /// In case if there is any readers or writing, then drop only wrapper which
716    /// is the zero reader.
717    fn drop(&mut self)
718    {
719        let inner = self.inner();
720
721        let mut flags: RwBufferFlags = inner.flags.load(Ordering::Acquire).into();
722
723        let unbased = flags.unbase();
724
725        if flags.read == 0 && flags.write == false && unbased == false
726        {
727            // call descrutor
728            unsafe { ptr::drop_in_place(self.0.as_ptr()) };
729
730            return;
731        }
732
733        inner.flags.store(flags.into(), Ordering::Release);
734    }
735}
736
737/// An instance which controls the allocation of the new buffers or
738/// reusage of already created and free instances. This instance is
739/// not thread safe. The external mutex should be used.
740#[derive(Debug)]
741pub struct RwBuffers
742{
743    /// A buffer length in bytes. Not aligned.
744    buf_len: usize,
745
746    /// A maximum slots for new buffers.
747    bufs_cnt_lim: usize,
748
749    /// A list of buffers.
750    buffs: VecDeque<RwBuffer>
751
752}
753
754impl RwBuffers
755{
756    /// Creates new instance wshich holds the base reference in the 
757    /// inner storage with the capacity bounds.
758    /// 
759    /// # Arguments
760    /// 
761    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
762    ///     the payload is located.
763    /// 
764    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
765    /// 
766    /// * `bufs_cnt_lim` - a maximum amount of the available slots. Determines the 
767    ///     capacity bounds.
768    /// 
769    /// # Returns
770    /// 
771    /// A [Result] in form of [RwBufferRes] is returned with:
772    /// 
773    /// * [Result::Ok] with the [RwBuffers] instance
774    /// 
775    /// * [Result::Err] with error type is returned:
776    /// 
777    /// - [RwBufferError::InvalidArguments] is returned when the arguments are
778    ///     incorrect. 
779    pub
780    fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
781    {
782        if pre_init_cnt > bufs_cnt_lim
783        {
784            return Err(RwBufferError::InvalidArguments);
785        }
786        else if buf_len == 0
787        {
788            return Err(RwBufferError::InvalidArguments);
789        }
790
791        let buffs: VecDeque<RwBuffer> = 
792            if pre_init_cnt > 0
793            {
794                let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
795
796                for _ in 0..pre_init_cnt
797                {
798                    buffs.push_back(RwBuffer::new(buf_len));
799                }
800
801                buffs
802            }
803            else
804            {
805                VecDeque::with_capacity(bufs_cnt_lim)
806            };
807
808        return Ok(
809            Self
810            {
811                buf_len: buf_len,
812                bufs_cnt_lim: bufs_cnt_lim,
813                buffs: buffs,
814            }
815        )
816    }
817
818    /// Same as `new` but without any limits. Unbounded storage.
819    /// 
820    /// # Arguments
821    /// 
822    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
823    ///     the payload is located.
824    /// 
825    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
826    /// 
827    /// # Returns
828    /// 
829    /// Returns the instance.
830    pub
831    fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
832    {
833        let mut buffs = VecDeque::with_capacity(pre_init_cnt);
834
835        for _ in 0..pre_init_cnt
836        {
837            buffs.push_back(RwBuffer::new(buf_len));
838        }
839
840        return
841            Self
842            {
843                buf_len: buf_len,
844                bufs_cnt_lim: 0,
845                buffs: buffs,
846            };
847    }
848
849    /// Allocates either a new buffer or reuse the free. If the instance
850    /// is created with bounds then in case if no free slots available
851    /// returns error.
852    /// 
853    /// # Returns
854    /// 
855    /// A [Result] in form of [RwBufferRes] is returned with:
856    /// 
857    /// * [Result::Ok] with the [RwBuffers] instance
858    /// 
859    /// * [Result::Err] with [RwBufferError::OutOfBuffers] error.
860    pub
861    fn allocate(&mut self) -> RwBufferRes<RwBuffer>
862    {
863        // check the list if any available
864        for buf in self.buffs.iter()
865        {
866            if buf.is_free() == true
867            {
868                return Ok(buf.clone());
869            }
870        }
871
872        if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
873        {
874            let buf = RwBuffer::new(self.buf_len);
875            let c_buf = buf.clone();
876
877            self.buffs.push_back(buf);
878
879            return Ok(c_buf);
880        }
881
882        return Err(RwBufferError::OutOfBuffers);
883    }
884
885    /// Allocates a buffer "in place" i.e finds the next allocated but unused
886    /// buffer and removes it from the list or alloactes new buffer without
887    /// adding it to the list. Should never return error.
888    /// 
889    /// # Returns
890    /// 
891    /// A [RwBuffer] is returned.
892    pub
893    fn allocate_in_place(&mut self) -> RwBuffer
894    {
895        for i in 0..self.buffs.len()
896        {
897            if self.buffs[i].accure_if_free() == true
898            {
899                return self.buffs.remove(i).unwrap();
900            }
901        }
902
903        let buf = RwBuffer::new(self.buf_len);
904
905        return buf;
906    }
907
908    /// Retains the buffer list by removing any unused buffers as many times
909    /// as set in the argument `cnt`. It does not guaranty than the selected 
910    /// amount will be freed.
911    /// 
912    /// # Arguments
913    /// 
914    /// * `cnt` - how many slots to clean before exit.
915    /// 
916    /// # Returns 
917    /// 
918    /// A [usize] is returned which indicates how many instances was removed
919    /// before the `cnt` was reached. 
920    pub
921    fn compact(&mut self, mut cnt: usize) -> usize
922    {
923        let p_cnt = cnt;
924
925        self
926            .buffs
927            .retain(
928                |buf|
929                {
930                    if buf.is_free() == true
931                    {
932                        cnt -= 1;
933
934                        return false;
935                    }
936
937                    return true;
938                }
939            );
940
941        return p_cnt - cnt;
942    }
943
944    #[cfg(test)]
945    fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags>
946    {
947        return Some(self.buffs.get(index)?.get_flags());
948    }
949}
950
951#[cfg(feature = "std")]
952#[cfg(test)]
953mod tests
954{
955    use std::time::{Duration, Instant};
956
957    use super::*;
958
959    #[test]
960    fn simple_test()
961    {
962        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
963
964        let buf0_res = bufs.allocate();
965        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
966
967        let buf0 = buf0_res.unwrap();
968
969        let buf0_w = buf0.write();
970        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
971        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
972        drop(buf0_w);
973
974        let buf0_r = buf0.read();
975        assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
976        assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
977
978        let buf0_1 = buf0.clone();
979        assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
980
981        let flags0 = buf0.get_flags();
982        let flags0_1 = buf0_1.get_flags();
983
984        assert_eq!(flags0, flags0_1);
985        assert_eq!(flags0.base, 3);
986        assert_eq!(flags0.read, 1);
987        assert_eq!(flags0.write, false);
988    }
989
990    #[test]
991    fn simple_test_dopped_in_place()
992    {
993        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
994
995        let buf0_res = bufs.allocate();
996        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
997
998        let buf0 = buf0_res.unwrap();
999
1000        println!("{:?}", buf0.get_flags());
1001
1002        let buf0_w = buf0.write();
1003        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1004        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1005
1006        drop(buf0);
1007
1008        let buf0_flags = bufs.get_flags_by_index(0);
1009        assert_eq!(buf0_flags.is_some(), true, "no flags");
1010        let buf0_flags = buf0_flags.unwrap();
1011
1012        println!("{:?}", buf0_flags);
1013
1014        assert_eq!(buf0_flags.base, 1);
1015        assert_eq!(buf0_flags.read, 0);
1016        assert_eq!(buf0_flags.write, true);
1017
1018        drop(buf0_w.unwrap());
1019
1020        let buf0_flags = bufs.get_flags_by_index(0);
1021        assert_eq!(buf0_flags.is_some(), true, "no flags");
1022        let buf0_flags = buf0_flags.unwrap();
1023
1024        println!("{:?}", buf0_flags);
1025
1026        assert_eq!(buf0_flags.base, 1);
1027        assert_eq!(buf0_flags.read, 0);
1028        assert_eq!(buf0_flags.write, false);
1029
1030    }
1031
1032    #[test]
1033    fn simple_test_dropped_in_place_downgrade()
1034    {
1035        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1036
1037        let buf0_res = bufs.allocate();
1038        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1039
1040        let buf0 = buf0_res.unwrap();
1041
1042        println!("{:?}", buf0.get_flags());
1043
1044        let buf0_w = buf0.write();
1045        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1046        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1047
1048        drop(buf0);
1049
1050        let buf0_rd = buf0_w.unwrap().downgrade();
1051        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1052
1053        let buf0_flags = bufs.get_flags_by_index(0);
1054        assert_eq!(buf0_flags.is_some(), true, "no flags");
1055        let buf0_flags = buf0_flags.unwrap();
1056
1057        println!("{:?}", buf0_flags);
1058
1059        assert_eq!(buf0_flags.base, 1);
1060        assert_eq!(buf0_flags.read, 1);
1061        assert_eq!(buf0_flags.write, false);
1062
1063    }
1064
1065    #[test]
1066    fn simple_test_drop_in_place_downgrade()
1067    {
1068        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1069
1070        let buf0_w = 
1071            {
1072                let buf0 = bufs.allocate_in_place();
1073
1074                println!("1: {:?}", buf0.get_flags());
1075
1076                let buf0_w = buf0.write();
1077                assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1078                assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1079
1080                drop(buf0);
1081
1082                buf0_w
1083            };
1084
1085        let buf0_rd = buf0_w.unwrap().downgrade();
1086        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1087
1088        let buf0_flags = bufs.get_flags_by_index(0);
1089        assert_eq!(buf0_flags.is_some(), false, "flags");
1090
1091        let buf0_rd = buf0_rd.unwrap();
1092        let buf0_flags = buf0_rd.get_flags();
1093
1094        println!("2: {:?}", buf0_flags);
1095
1096        assert_eq!(buf0_flags.base, 0);
1097        assert_eq!(buf0_flags.read, 1);
1098        assert_eq!(buf0_flags.write, false);
1099    }
1100
1101    #[test]
1102    fn timing_test()
1103    {
1104        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1105
1106        for _ in 0..10
1107        {
1108            let inst = Instant::now();
1109            let buf0_res = bufs.allocate_in_place();
1110            let end = inst.elapsed();
1111
1112            println!("alloc: {:?}", end);
1113            drop(buf0_res);
1114        }
1115
1116        let buf0_res = bufs.allocate();
1117        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1118
1119        let buf0 = buf0_res.unwrap();
1120
1121        for _ in 0..10
1122        {
1123            let inst = Instant::now();
1124            let buf0_w = buf0.write();
1125            let end = inst.elapsed();
1126
1127            println!("write: {:?}", end);
1128
1129            assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1130            assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1131            drop(buf0_w);
1132        }
1133
1134        for _ in 0..10
1135        {
1136            let inst = Instant::now();
1137            let buf0_r = buf0.read();
1138            let end = inst.elapsed();
1139
1140            println!("read: {:?}", end);
1141
1142            assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1143            assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1144            drop(buf0_r);
1145        }
1146    }
1147
1148    #[test]
1149    fn simple_test_mth()
1150    {
1151        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1152
1153        let buf0 = bufs.allocate().unwrap();
1154
1155        let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1156
1157        let join1=
1158            std::thread::spawn(move ||
1159                {
1160                    println!("{:?}", buf0_rd);
1161
1162                    std::thread::sleep(Duration::from_secs(2));
1163
1164                    return;
1165                }
1166            );
1167
1168        let buf1_rd = buf0.read().unwrap();
1169
1170        let join2=
1171            std::thread::spawn(move ||
1172                {
1173                    println!("{:?}", buf1_rd);
1174
1175                    std::thread::sleep(Duration::from_secs(2));
1176
1177                    return;
1178                }
1179            );
1180
1181        let flags = buf0.get_flags();
1182
1183        assert_eq!(flags.base, 2);
1184        assert_eq!(flags.read, 2);
1185        assert_eq!(flags.write, false);
1186
1187        let _ = join1.join();
1188        let _ = join2.join();
1189
1190        let flags = buf0.get_flags();
1191
1192        assert_eq!(flags.base, 2);
1193        assert_eq!(flags.read, 0);
1194        assert_eq!(flags.write, false);
1195    }
1196
1197    #[test]
1198    fn test_try_into_read()
1199    {
1200        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1201
1202        let buf0 = bufs.allocate_in_place();
1203
1204        println!("{:?}", buf0.get_flags());
1205
1206        let buf0_w = buf0.write();
1207        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1208        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1209
1210        drop(buf0);
1211
1212        let buf0_rd = buf0_w.unwrap().downgrade();
1213        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1214
1215        let buf0_flags = bufs.get_flags_by_index(0);
1216        assert_eq!(buf0_flags.is_some(), false, "flags");
1217
1218        let buf0_rd = buf0_rd.unwrap();
1219        let buf0_flags = buf0_rd.get_flags();
1220
1221        println!("{:?}", buf0_flags);
1222
1223        assert_eq!(buf0_flags.base, 0);
1224        assert_eq!(buf0_flags.read, 1);
1225        assert_eq!(buf0_flags.write, false);
1226
1227        let inst = Instant::now();
1228        let ve = buf0_rd.try_inner();
1229        let end = inst.elapsed();
1230
1231        println!("try inner: {:?}", end);
1232        assert_eq!(ve.is_ok(), true);
1233
1234
1235    }
1236
1237    #[tokio::test]
1238    async fn test_multithreading()
1239    {
1240
1241        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1242
1243        let buf0 = bufs.allocate().unwrap();
1244
1245        let mut buf0_write = buf0.write().unwrap();
1246        
1247        buf0_write.as_mut_slice()[0] = 5;
1248        buf0_write.as_mut_slice()[1] = 4;
1249
1250        println!("{}", buf0_write[0]);
1251
1252        let buf0_r = buf0_write.downgrade().unwrap();
1253
1254        let join1=
1255            tokio::task::spawn(async move
1256                {
1257                    println!("thread[1]:{}", buf0_r[0]);
1258
1259                    tokio::time::sleep(Duration::from_millis(200)).await;
1260
1261                    return;
1262                }
1263            );
1264
1265        let buf0_r = buf0.read().unwrap();
1266
1267        // drop base
1268        drop(buf0);
1269
1270        let join2=
1271            tokio::task::spawn(async move
1272                {
1273                    println!("thread[2]: {}", buf0_r[0]);
1274                    println!("thread[2]: {}", buf0_r[1]);
1275
1276                    tokio::time::sleep(Duration::from_millis(200)).await;
1277
1278                    return;
1279                }
1280            );
1281
1282        let _ = join1.await;
1283        let _ = join2.await;
1284
1285        return;
1286    }
1287}