shared_buffer_rs/
lib.rs

1/*-
2 * shared-buffer-rs - a buffer managment and sharing crate.
3 * 
4 * Copyright (C) 2025 Aleksandr Morozov
5 * 
6 * The scram-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14/*! A small crate which implements a thread safe implementation to allocate
15 the buffer of some size, borrow as muable in current context or thread
16 and borrow as many read only references as needed which are Send+Sync.
17
18 It acts like Arc but embeds the RefCell functionality without any
19 issues with Send and Sync.
20
21 The main purpose it to have a lock free, lightweight buffer I/O for
22 writing in one side and broadcast to multiple tasks i.e threads and
23 async task making sure that it can not be modifyied.
24 */
25
26#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31
32#[cfg(not(feature = "std"))]
33use core::
34{
35    fmt, 
36    mem, 
37    ops::{Deref, DerefMut}, 
38    ptr::{self, NonNull}, 
39    sync::atomic::{AtomicU64, Ordering}
40};
41
42#[cfg(not(feature = "std"))]
43use core::{marker::PhantomData, task::Poll, time::Duration};
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
47
48#[cfg(not(feature = "std"))]
49use alloc::vec;
50use crossbeam_utils::Backoff;
51
52#[cfg(feature = "std")]
53use std::
54{
55    collections::VecDeque, 
56    ops::{Deref, DerefMut}, 
57    ptr::{self, NonNull}, 
58    sync::atomic::{AtomicU64, Ordering},
59    fmt,
60    mem
61};
62
63#[cfg(feature = "std")]
64use std::{marker::PhantomData, task::Poll, time::Duration};
65
66extern crate crossbeam_utils;
67
68pub trait TryClone: Sized 
69{
70    type Error;
71
72    fn try_clone(&self) -> Result<Self, Self::Error>;
73}
74
75/// An implementation of the `async_drop` for async.
76trait LocalAsyncDrop: Send + Sync + 'static
77{
78    async fn async_drop(&mut self);
79}
80
81/// Use this function to drop [RBuffer] and [WBuffer] obtained via
82/// `async_*`.
83async 
84fn async_drop<LAD: LocalAsyncDrop + Send + Sync>(mut lad: LAD)
85{
86    lad.async_drop().await;
87
88    drop(lad);
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum RwBufferError
93{
94    TooManyRead,
95    TooManyBase,
96    ReadTryAgianLater,
97    WriteTryAgianLater,
98    BaseTryAgainLater,
99    OutOfBuffers,
100    DowngradeFailed,
101    InvalidArguments,
102    Busy,
103}
104
105impl fmt::Display for RwBufferError
106{
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
108    {
109        match self
110        {
111            Self::TooManyRead => 
112                write!(f, "TooManyRead: read soft limit reached"),
113            Self::TooManyBase => 
114                write!(f, "TooManyBase: base soft limit reached"),
115            Self::ReadTryAgianLater => 
116                write!(f, "ReadTryAgianLater: shared access not available, try again later"),
117            Self::WriteTryAgianLater => 
118                write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
119            Self::BaseTryAgainLater => 
120                write!(f, "BaseTryAgainLater: failed to obtain a clone in reasonable time"),
121            Self::OutOfBuffers => 
122                write!(f, "OutOfBuffers: no more free bufers are left"),
123            Self::DowngradeFailed => 
124                write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
125            Self::InvalidArguments => 
126                write!(f, "InvalidArguments: arguments are not valid"),  
127            Self::Busy => 
128                write!(f, "RwBuffer is busy and cannot be acquired"),       
129        }
130    }
131}
132
133pub type RwBufferRes<T> = Result<T, RwBufferError>;
134
135/// A read only buffer. This instance is [Send] and [Sync]
136/// as it does not provide any write access.
137#[derive(Debug, PartialEq, Eq)]
138pub struct RBuffer
139{
140    /// The inner read only
141    inner: NonNull<RwBufferInner>,
142
143    /// Is set to `true` when async_drop was performed earlier.
144    a_dropped: bool,
145}
146
147unsafe impl Send for RBuffer {}
148unsafe impl Sync for RBuffer {}
149
150impl RwBufType for RBuffer {}
151
152impl RBuffer
153{
154    #[inline]
155    fn new(inner: NonNull<RwBufferInner>) -> Self
156    {
157        return Self{ inner, a_dropped: false };
158    }
159
160    #[cfg(test)]
161    fn get_flags(&self) -> RwBufferFlags<Self>
162    {
163        use core::sync::atomic::Ordering;
164
165        let inner = unsafe{ self.inner.as_ref() };
166
167        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
168
169        return flags;
170    }
171
172    /// Borrow the inner buffer as slice.
173    pub
174    fn as_slice(&self) -> &[u8]
175    {
176        let inner = unsafe { self.inner.as_ref() };
177
178        return inner.buf.as_ref().unwrap().as_slice();
179    }
180
181    /// Attempts to consume the instance and retrive the inner buffer. This means
182    /// that the instance will no longer be available.
183    ///
184    /// The following condition should be satisfied:
185    /// 1) No more readers except current instance.
186    ///
187    /// 2) No base references, item should not contain base references from [RwBuffer].
188    /// 
189    /// # Returns
190    /// 
191    /// A [Result] is returned with: 
192    /// 
193    /// * [Result::Ok] with the consumed inner [Vec]
194    /// 
195    /// * [Result::Err] with the consumed instance
196    pub
197    fn try_inner(mut self) -> Result<Vec<u8>, Self>
198    {
199        let inner = unsafe { self.inner.as_ref() };
200
201        let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
202
203       // new_flags.unread();
204
205        if current_flags.try_inner_check() == true
206        {
207            // in theory if at that moment only one read operation left, then no other can occure
208            // because the current_flags are obtained with SeqCst, and no read can appear
209
210            let inner = unsafe { self.inner.as_mut() };
211
212            let buf = inner.buf.take().unwrap();
213        
214            drop(self);
215
216            return Ok(buf);
217        }
218
219        return Err(self);        
220    }
221
222    fn inner(&self) -> &RwBufferInner
223    {
224        return unsafe { self.inner.as_ref() };
225    }
226}
227
228impl LocalAsyncDrop for RBuffer
229{
230    async fn async_drop(&mut self) 
231    {
232        self.a_dropped = true;
233
234        std::future::poll_fn(
235            |cx|
236            {
237                let inner = self.inner();
238
239                let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
240                let mut new_flags = current_flags.clone();
241
242                new_flags.unread();
243
244                let res = 
245                    inner
246                        .flags
247                        .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
248
249                if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
250                {
251                    if flags.is_drop_inplace() == true
252                    {
253                        // call descrutor
254                        unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
255                    }
256
257                    return Poll::Ready(());
258                }
259
260                cx.waker().wake_by_ref();
261
262                return Poll::Pending;
263            }
264        )
265        .await;
266    }
267}
268
269impl Deref for RBuffer
270{
271    type Target = Vec<u8>;
272
273    fn deref(&self) -> &Vec<u8>
274    {
275        let inner = self.inner();
276
277        return inner.buf.as_ref().unwrap();
278    }
279}
280
281impl Clone for RBuffer
282{
283    /// Attempts to clone the RBuffer incrementing the `read` reference. 
284    /// Would block until the clone is obtained. Should not block for long 
285    /// time. Because this crate is experimental, it will panic if it will
286    /// not be able to obtain clone.
287    /// 
288    /// # Returns 
289    /// 
290    /// Returns the new [RBuffer] instance.
291    /// 
292    /// # Panic
293    /// 
294    /// Panics if too many references were created. The reference count
295    /// is limited to max::u32 - 10. Or will panic if will not be able to obtain 
296    /// a clone of the [RBuffer] in reasonable time as this must not block
297    /// for a long time.
298    fn clone(&self) -> Self 
299    {
300        let inner = self.inner();
301
302        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
303        let mut new_flags = current_flags.clone();
304
305        new_flags.read().unwrap();
306
307        let backoff = Backoff::new();
308        let mut parked = false;
309
310        loop
311        {
312            let res = 
313                inner
314                    .flags
315                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
316
317            if let Ok(_) = res
318            {
319                return Self{ inner: self.inner, a_dropped: self.a_dropped };
320            }
321
322            current_flags = res.err().unwrap().into();
323            new_flags = current_flags.clone();
324
325            new_flags.read().unwrap();
326
327            if backoff.is_completed() == false
328            {
329                backoff.snooze();
330            }
331            else
332            {
333                if parked == false
334                {
335                    // last attempt
336                    std::thread::park_timeout(Duration::from_millis(1));
337
338                    parked = true;
339                }
340                else
341                {
342                    panic!("can not obtain a clone of RBuffer!");
343                }
344            }
345        }
346    }
347}
348
349impl TryClone for RBuffer
350{
351    type Error = RwBufferError;
352
353    /// Attempts to clone the RBuffer incrementing the `read` reference.
354    /// 
355    /// # Returns 
356    /// 
357    /// Returns the new [Result] where on success a clone of [RBuffer] instance is
358    /// returned, otherwise the:
359    /// 
360    /// * [RwBufferError::ReadTryAgianLater] - is returned if it failed to acquire the read clone
361    ///     in reasonable time.
362    /// 
363    /// * [RwBufferError::TooManyRead] - is returned if limit was reached.
364    fn try_clone(&self) -> Result<Self, Self::Error> 
365    {
366        let inner = self.inner();
367
368        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
369        let mut new_flags = current_flags.clone();
370
371        new_flags.read()?;
372
373        let backoff = Backoff::new();
374
375        loop
376        {
377            let res = 
378                inner
379                    .flags
380                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
381
382            if let Ok(_) = res
383            {
384                return Ok(Self{ inner: self.inner, a_dropped: self.a_dropped });
385            }
386
387            current_flags = res.err().unwrap().into();
388            new_flags = current_flags.clone();
389
390            new_flags.read()?;
391
392            if backoff.is_completed() == false
393            {
394                backoff.snooze();
395            }
396            else
397            {
398                break;
399            }
400        }
401
402        return Err(RwBufferError::ReadTryAgianLater);
403    }
404}
405
406impl Drop for RBuffer
407{
408    /// Should completly drop the instance (with data) only, if there is no more
409    /// readers or it is not referenced in the base.
410    /// 
411    /// # Panic
412    /// 
413    /// May panic if it will not be able to drop the instance in reasonable time i.e in 
414    /// 1000 attempts.
415    fn drop(&mut self)
416    {
417        if self.a_dropped == true
418        {
419            return;
420        }
421
422        let inner = self.inner();
423
424        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
425        let mut new_flags = current_flags.clone();
426
427        new_flags.unread();
428
429        let backoff = Backoff::new();
430        
431        for _ in 0..1000
432        {
433            let res = 
434                inner
435                    .flags
436                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
437
438            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
439            {
440                if flags.is_drop_inplace() == true
441                {
442                    // call descrutor
443                    unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
444                }
445
446                return;
447            }
448
449            current_flags = res.err().unwrap().into();
450            new_flags = current_flags.clone();
451
452            new_flags.unread();
453
454            backoff.snooze();
455        }
456
457        // todo... solve this situation somehow
458        panic!("assertion trap: RBuffer::drop can not drop RBuffer in reasonable time!");
459    }
460}
461
462/// A Write and Read buffer. An exclusive instance which can not be copied or
463/// clonned. Once writing is complete, the instance can be dropped or downgraded to
464/// Read-only instance. This instance is NOT [Send] and [Sync]. 
465#[derive(Debug, PartialEq, Eq)]
466pub struct WBuffer
467{
468    /// A pointer to the leaked buffer instance.
469    buf: NonNull<RwBufferInner>,
470
471    /// Is set to `true` when `downgrade` is called.
472    downgraded: bool,
473}
474
475unsafe impl Send for WBuffer{}
476unsafe impl Sync for WBuffer{}
477
478impl RwBufType for WBuffer{}
479
480impl WBuffer
481{
482    #[inline]
483    fn new(inner: NonNull<RwBufferInner>) -> Self
484    {
485        return Self{ buf: inner, downgraded: false };
486    }
487
488    /// Attempts to downgrade the `write` instance into the `read` instance by consuming the
489    /// [WBuffer]. 
490    /// 
491    /// Can not be performed vice-versa (at least in this version). In normal conditions
492    /// should never return Error.
493    /// 
494    /// # Returns 
495    /// 
496    /// A [Result] is returned with the [RBuffer] on success. The [Result::Err] is returned 
497    /// if it failed to downgrade instance in resonable time.
498    pub
499    fn downgrade(mut self) ->  Result<RBuffer, Self>
500    {
501        let inner = unsafe { self.buf.as_ref() };
502
503        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
504        let mut new_flags = current_flags.clone();
505
506        new_flags.downgrade();
507
508        let backoff = Backoff::new();
509
510        while backoff.is_completed() == false
511        {
512            let res = 
513                inner
514                    .flags
515                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
516
517            if let Ok(_) = res
518            {
519                self.downgraded = true;
520
521                return Ok(RBuffer::new(self.buf.clone()));
522            }
523
524            current_flags = res.err().unwrap().into();
525            new_flags = current_flags.clone();
526
527            new_flags.downgrade();
528
529            backoff.snooze();
530        }
531
532        return Err(self);
533    }
534
535    pub
536    fn as_slice(&self) -> &[u8]
537    {
538        let inner = unsafe { self.buf.as_ref() };
539
540        return inner.buf.as_ref().unwrap()
541    }
542}
543
544impl Deref for WBuffer
545{
546    type Target = Vec<u8>;
547
548    fn deref(&self) -> &Vec<u8>
549    {
550        let inner = unsafe { self.buf.as_ref() };
551
552        return inner.buf.as_ref().unwrap();
553    }
554}
555
556impl DerefMut for WBuffer
557{
558    fn deref_mut(&mut self) -> &mut Vec<u8>
559    {
560        let inner = unsafe { self.buf.as_mut() };
561
562        return inner.buf.as_mut().unwrap();
563    }
564}
565
566impl Drop for WBuffer
567{
568    /// The instance may perform `drop_in_place` if there is no `base` references.
569    fn drop(&mut self)
570    {
571        if self.downgraded == true
572        {
573            return;
574        }
575
576        let inner = unsafe { self.buf.as_ref() };
577
578        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
579        let mut new_flags = current_flags.clone();
580
581        new_flags.unwrite();
582
583        let backoff = Backoff::new();
584
585        for _ in 0..1000
586        {
587            let res = 
588                inner
589                    .flags
590                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
591
592            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
593            {
594                if flags.is_drop_inplace() == true
595                {
596                    // call descrutor
597                    unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
598                }
599
600                return;
601            }
602
603            current_flags = res.err().unwrap().into();
604            new_flags = current_flags.clone();
605
606            new_flags.unwrite();
607
608            backoff.snooze();
609        }
610
611        // todo... solve this situation somehow
612
613        panic!("assertion trap: WBuffer::drop can not drop RBuffer in reasonable time!");
614    }
615}
616
617impl LocalAsyncDrop for WBuffer
618{
619    async 
620    fn async_drop(&mut self) 
621    {
622        self.downgraded = true;
623
624        std::future::poll_fn(
625            move |cx|
626            {
627                let inner = unsafe { self.buf.as_ref() };
628
629                let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
630                let mut new_flags = current_flags.clone();
631
632                new_flags.unwrite();
633
634                let res = 
635                    inner
636                        .flags
637                        .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
638
639                if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
640                {
641                    if flags.is_drop_inplace() == true
642                    {
643                        // call descrutor
644                        unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
645                    }
646
647                    return Poll::Ready(());
648                }
649
650                cx.waker().wake_by_ref();
651
652                return Poll::Pending;
653            }
654        )
655        .await;
656    }
657}
658
659trait RwBufType {}
660
661/// Internal structure which represents the status. It can not be
662/// larger than 8-byte to fit into [AtomicU64].
663#[repr(align(8))]
664#[derive(Debug, PartialEq, Eq)]
665struct RwBufferFlags<TP>
666{
667    /// A reader refs counter. If larger than 0, no writes possible.
668    read: u32, // = 4
669
670    /// An exclusive write lock. When true, no reades should present.
671    write: bool, // = 1
672
673    /// A base refs i.e which holds the data.
674    /// If this value is zero, means the instance can be dropped in place
675    /// when `write` is false and `read` equals 0.
676    base: u16, // = 2
677
678    /// Unused
679    unused0: u8, // = 1,
680
681    _p: PhantomData<TP>,
682}
683
684
685impl<TP: RwBufType> From<u64> for RwBufferFlags<TP>
686{
687    fn from(value: u64) -> Self
688    {
689        return unsafe { mem::transmute(value) };
690    }
691}
692
693impl<TP: RwBufType> From<RwBufferFlags<TP>> for u64
694{
695    fn from(value: RwBufferFlags<TP>) -> Self
696    {
697        return unsafe { mem::transmute(value) };
698    }
699}
700
701impl<TP: RwBufType> Default for RwBufferFlags<TP>
702{
703    fn default() -> RwBufferFlags<TP>
704    {
705        return
706            Self
707            {
708                read: 0,
709                write: false,
710                base: 1,
711                unused0: 0,
712                _p: PhantomData
713            };
714    }
715}
716
717impl Copy for RwBufferFlags<WBuffer>{}
718
719impl Clone for RwBufferFlags<WBuffer>
720{
721    fn clone(&self) -> Self 
722    {
723        return 
724            Self 
725            { 
726                read: self.read.clone(), 
727                write: self.write.clone(), 
728                base: self.base.clone(), 
729                unused0: self.unused0.clone(), 
730                _p: PhantomData
731            }
732    }
733}
734
735impl Copy for RwBufferFlags<RBuffer>{}
736
737impl Clone for RwBufferFlags<RBuffer>
738{
739    fn clone(&self) -> Self 
740    {
741        return 
742            Self 
743            { 
744                read: self.read.clone(), 
745                write: self.write.clone(), 
746                base: self.base.clone(), 
747                unused0: self.unused0.clone(), 
748                _p: PhantomData
749            }
750    }
751}
752
753impl Copy for RwBufferFlags<RwBuffer>{}
754
755impl Clone for RwBufferFlags<RwBuffer>
756{
757    fn clone(&self) -> Self 
758    {
759        return 
760            Self 
761            { 
762                read: self.read.clone(), 
763                write: self.write.clone(), 
764                base: self.base.clone(), 
765                unused0: self.unused0.clone(), 
766                _p: PhantomData
767            }
768    }
769}
770
771impl RwBufferFlags<WBuffer>
772{
773    #[inline]
774    fn write(&mut self) -> RwBufferRes<()>
775    {
776        if self.read == 0
777        {
778            self.write = true;
779
780            return Ok(());
781        }
782        else
783        {
784            return Err(RwBufferError::WriteTryAgianLater);
785        }
786    }
787
788    #[inline]
789    fn downgrade(&mut self) 
790    {
791        self.write = false;
792        self.read += 1;
793    }
794
795     #[inline]
796    fn unwrite(&mut self)
797    {
798        self.write = false;
799    }
800}
801
802impl RwBufferFlags<RBuffer>
803{
804    #[inline]
805    fn try_inner_check(&self) -> bool
806    {
807        return self.read == 1 && self.write == false && self.base == 0;
808    }
809
810    #[inline]
811    fn unread(&mut self)
812    {
813        self.read -= 1;
814    }
815
816    #[inline]
817    fn read(&mut self) -> RwBufferRes<()>
818    {
819        if self.write == false
820        {
821            self.read += 1;
822
823            if self.read <= Self::MAX_READ_REFS
824            {
825                return Ok(());
826            }
827            
828            return Err(RwBufferError::TooManyRead);
829        }
830
831        return Err(RwBufferError::ReadTryAgianLater);
832    }
833}
834
835impl RwBufferFlags<RwBuffer>
836{
837    #[inline]
838    fn make_pre_unused() -> Self
839    {
840        return Self{ read: 0, write: false, base: 1, unused0: 0, _p: PhantomData };
841    }
842
843    #[inline]
844    fn read(&mut self) -> RwBufferRes<()>
845    {
846        if self.write == false
847        {
848            self.read += 1;
849
850            if self.read <= Self::MAX_READ_REFS
851            {
852                return Ok(());
853            }
854            
855            return Err(RwBufferError::TooManyRead);
856        }
857
858        return Err(RwBufferError::ReadTryAgianLater);
859    }
860
861    #[inline]
862    fn write(&mut self) -> RwBufferRes<()>
863    {
864        if self.read == 0
865        {
866            self.write = true;
867
868            return Ok(());
869        }
870        else
871        {
872            return Err(RwBufferError::WriteTryAgianLater);
873        }
874    }
875    
876    #[inline]
877    fn base(&mut self) -> RwBufferRes<()>
878    {
879        self.base += 1;
880
881        if self.base <= Self::MAX_BASE_REFS
882        {
883            return Ok(());
884        }
885
886        return Err(RwBufferError::TooManyBase);
887    }
888
889    #[inline]
890    fn unbase(&mut self) -> bool
891    {
892        self.base -= 1;
893
894        return self.base != 0;
895    }
896}
897
898impl<TP: RwBufType> RwBufferFlags<TP>
899{
900    /// A soft limit on the amount of references for reading instances.
901    pub const MAX_READ_REFS: u32 = u32::MAX - 2;
902
903    /// A soft limit on the amount of references for base instances.
904    pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
905
906    
907
908    #[inline]
909    fn is_free(&self) -> bool
910    {
911        return self.write == false && self.read == 0 && self.base == 1;
912    }
913
914   
915
916   
917
918    #[inline]
919    fn is_drop_inplace(&self) -> bool
920    {
921        return self.read == 0 && self.write == false && self.base == 0;
922    }
923
924    
925
926    
927
928   
929}
930
931#[derive(Debug)]
932pub struct RwBufferInner
933{
934    /// A [RwBufferFlags] represented as atomic u64.
935    flags: AtomicU64,
936
937    /// A buffer.
938    buf: Option<Vec<u8>>,
939}
940
941impl RwBufferInner
942{
943    fn new(buf_size: usize) -> Self
944    {
945        return
946            Self
947            {
948                flags: 
949                    AtomicU64::new(RwBufferFlags::<RwBuffer>::default().into()),
950                buf: 
951                    Some(vec![0_u8; buf_size])
952            };
953    }
954}
955
956/// A base instance which holds the `leaked` pointer to [RwBufferInner].
957/// 
958/// This instance can provide either an exclusive write access or 
959/// multiple read access, but not at the same time. Can be used to store
960/// the instance. This instance is [Send] and [Sync] because the insternals
961/// are guarded by ordered atomic operations.
962#[derive(Debug, PartialEq, Eq)]
963pub struct RwBuffer(NonNull<RwBufferInner>);
964
965unsafe impl Send for RwBuffer {}
966unsafe impl Sync for RwBuffer {}
967
968impl RwBufType for RwBuffer {}
969
970impl RwBuffer
971{
972    #[inline]
973    fn new(buf_size: usize) -> Self
974    {
975        let status = Box::new(RwBufferInner::new(buf_size));
976
977        return Self(Box::leak(status).into());
978    }
979
980    #[inline]
981    fn inner(&self) -> &RwBufferInner
982    {
983        return unsafe { self.0.as_ref() };
984    }
985
986    /// Checks if this instance satisfies the following conditions:
987    /// 
988    /// * No exclusive write access
989    /// 
990    /// * No read access
991    /// 
992    /// * There is only one base reference.
993    /// 
994    /// But since check everything may have been already changed.
995    /// 
996    /// # Returns 
997    /// 
998    /// * - `true` if instance satisfies the conditions above.
999    /// 
1000    /// * - `false` if does not satisfy the conditions above.
1001    #[inline]
1002    pub
1003    fn is_free(&self) -> bool
1004    {
1005        let inner = self.inner();
1006
1007        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1008
1009        return flags.is_free();
1010    }
1011
1012    /// Accures the instance, if it satisfy the following conditions:
1013    /// 
1014    /// * No exclusive write access
1015    /// 
1016    /// * No read access
1017    /// 
1018    /// * There is only one base reference.
1019    /// 
1020    /// # Returns 
1021    /// 
1022    /// The [Result] is retuerned with the clonned [RwBuffer] instance or
1023    /// [Result::Err] with the following errors:
1024    /// 
1025    /// * [RwBufferError::Busy] - the instance have already been taken.
1026    /// 
1027    /// * [RwBufferError::TooManyBase] - too many base references are already around.
1028    #[inline]
1029    pub(crate) 
1030    fn acqiure_if_free(&self) -> RwBufferRes<Self>
1031    {
1032        let inner = self.inner();
1033
1034        let current_flags: RwBufferFlags<Self> = RwBufferFlags::make_pre_unused();
1035        let mut new_flags = current_flags.clone();
1036        
1037        new_flags.base()?;
1038
1039        let res = 
1040                inner
1041                    .flags
1042                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1043
1044        if let Ok(_) = res
1045        {
1046            return Ok(Self(self.0.clone()));
1047        }
1048
1049        return Err(RwBufferError::Busy);
1050    }
1051
1052    /// Attemts to make an exclusive (write) access to the buffer.
1053    /// 
1054    /// Would block for short period of time and return error.
1055    /// 
1056    /// # Returns
1057    /// 
1058    /// A [Result] in form of [RwBufferRes] is returned with:
1059    /// 
1060    /// * [Result::Ok] with the [WBuffer] instance
1061    /// 
1062    /// * [Result::Err] may be returned a [RwBufferError::WriteTryAgianLater] in case 
1063    ///     if the there is/are an active `read` references or acquite exc. lock failed.
1064    pub
1065    fn write(&self) -> RwBufferRes<WBuffer>
1066    {
1067        let inner = self.inner();
1068
1069        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1070        let mut new_flags = current_flags.clone();
1071
1072        new_flags.write()?;
1073
1074        let backoff = Backoff::new();
1075
1076        while backoff.is_completed() == false
1077        {
1078            let res = 
1079                inner
1080                    .flags
1081                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1082
1083            if let Ok(_) = res
1084            {
1085                return Ok(WBuffer::new(self.0.clone()));
1086            }
1087
1088            current_flags = res.err().unwrap().into();
1089            new_flags = current_flags.clone();
1090
1091            new_flags.write()?;
1092
1093            backoff.snooze();
1094        }
1095
1096        return Err(RwBufferError::WriteTryAgianLater);
1097    }
1098
1099    /// Attempts to gain an exclusive access in async way.
1100    /// 
1101    /// # Return
1102    /// 
1103    /// Return the same result as [RwBuffer::write].
1104    pub async 
1105    fn write_async(&self) -> RwBufferRes<WBuffer>
1106    {
1107        return 
1108            std::future::poll_fn(
1109                |cx|
1110                {
1111                    let inner = self.inner();
1112
1113                    let current_flags: RwBufferFlags<WBuffer> = inner.flags.load(Ordering::SeqCst).into();
1114                    let mut new_flags = current_flags.clone();
1115
1116                    if let Err(e) = new_flags.write()
1117                    {
1118                        return Poll::Ready(Err(e));
1119                    }
1120
1121                    let res = 
1122                        inner
1123                            .flags
1124                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1125
1126                    if let Ok(_) = res
1127                    {
1128                        return Poll::Ready( Ok( WBuffer::new(self.0.clone()) ) );
1129                    }
1130
1131                    cx.waker().wake_by_ref();
1132
1133                    return Poll::Pending;
1134                }
1135            )
1136            .await;
1137    }
1138    
1139
1140    /// Attemts to make a shared (read) access to the buffer.
1141    /// 
1142    /// Would block for short period of time and return error.
1143    /// 
1144    /// # Returns
1145    /// 
1146    /// A [Result] in form of [RwBufferRes] is returned with:
1147    /// 
1148    /// * [Result::Ok] with the [RBuffer] instance
1149    /// 
1150    /// * [Result::Err] with error type is returned:
1151    /// 
1152    /// - [RwBufferError::TooManyRead] is returned when the soft limit of
1153    ///     references was reached.
1154    /// 
1155    /// - [RwBufferError::ReadTryAgianLater] is returned if there is an 
1156    ///     active exclusive access.
1157    pub
1158    fn read(&self) -> RwBufferRes<RBuffer>
1159    {
1160
1161        let inner = self.inner();
1162
1163        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1164        let mut new_flags = current_flags.clone();
1165
1166        new_flags.read()?;
1167
1168        let backoff = Backoff::new();
1169
1170        while backoff.is_completed() == false
1171        {
1172            let res = 
1173                inner
1174                    .flags
1175                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1176
1177            if let Ok(_) = res
1178            {
1179                return Ok(RBuffer::new(self.0.clone()));
1180            }
1181
1182            current_flags = res.err().unwrap().into();
1183            new_flags = current_flags.clone();
1184
1185            new_flags.read()?;
1186
1187            backoff.snooze();
1188        }
1189
1190        return Err(RwBufferError::ReadTryAgianLater);
1191    }
1192
1193    /// Attempts to gain an shared access in async way.
1194    /// 
1195    /// # Return
1196    /// 
1197    /// Return the same result as [RwBuffer::read].
1198    pub async 
1199    fn read_async(&self) -> RwBufferRes<RBuffer>
1200    {
1201        return 
1202            std::future::poll_fn(
1203                |cx|
1204                {
1205                    let inner = self.inner();
1206
1207                    let current_flags: RwBufferFlags<RBuffer> = inner.flags.load(Ordering::SeqCst).into();
1208                    let mut new_flags = current_flags.clone();
1209
1210                    match new_flags.read()
1211                    {
1212                        Ok(_) => {},
1213                        Err(RwBufferError::TooManyRead) => 
1214                            return Poll::Ready(Err(RwBufferError::TooManyRead)),
1215                        Err(RwBufferError::ReadTryAgianLater) =>
1216                        {
1217                            cx.waker().wake_by_ref();
1218
1219                            return Poll::Pending;
1220                        },
1221                        Err(e) =>
1222                            panic!("assertion trap: unknown error {} in Future for AsyncRBuffer", e)
1223                    }
1224
1225                    let res = 
1226                        inner
1227                            .flags
1228                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1229
1230                    if let Ok(_) = res
1231                    {
1232                        return Poll::Ready( Ok( RBuffer::new(self.0.clone()) ) );
1233                    }
1234
1235                    cx.waker().wake_by_ref();
1236
1237                    return Poll::Pending;
1238                }
1239            )
1240            .await;
1241    }
1242
1243    #[cfg(test)]
1244    fn get_flags(&self) -> RwBufferFlags<Self>
1245    {
1246        let inner = self.inner();
1247
1248        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Acquire).into();
1249
1250        return flags;
1251    }
1252
1253    /// Clones the freshly created instance which is visible for current thread only!
1254    /// 
1255    /// # Returns 
1256    /// 
1257    /// A [Result] is returned with error [RwBufferError::TooManyBase] if too many copies 
1258    /// of base are made.
1259    fn clone_single(&self) -> RwBufferRes<Self>
1260    {
1261        let inner = self.inner();
1262
1263        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1264
1265        current_flags.base()?;
1266
1267        inner.flags.store(current_flags.into(), Ordering::Relaxed);
1268
1269        return Ok(Self(self.0));
1270    }
1271}
1272
1273impl Clone for RwBuffer
1274{
1275    /// Clones the instance and increasing the `base` ref count.
1276    /// 
1277    /// Will `panic` if a soft limit of refs were reached.
1278    fn clone(&self) -> Self
1279    {
1280        let inner = self.inner();
1281
1282        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1283        let mut new_flags = current_flags.clone();
1284
1285        new_flags.base().unwrap();
1286
1287        let backoff = Backoff::new();
1288        let mut parked = false;
1289
1290        loop
1291        {
1292            let res = 
1293                inner
1294                    .flags
1295                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1296
1297            if let Ok(_) = res
1298            {
1299                return Self(self.0);
1300            }
1301
1302            current_flags = res.err().unwrap().into();
1303            new_flags = current_flags.clone();
1304
1305            new_flags.base().unwrap();
1306
1307            if backoff.is_completed() == false
1308            {
1309                backoff.snooze();
1310            }
1311            else
1312            {
1313                if parked == false
1314                {
1315                    // last attempt
1316                    std::thread::park_timeout(Duration::from_millis(1));
1317
1318                    parked = true;
1319                }
1320                else
1321                {
1322                    panic!("can not obtain a clone of RBuffer!");
1323                }
1324            }
1325        }
1326    }
1327}
1328
1329impl TryClone for RwBuffer
1330{
1331    type Error = RwBufferError;
1332
1333    /// Attempts to clone the [RwBuffer] incrementing the `base` reference.
1334    /// 
1335    /// # Returns 
1336    /// 
1337    /// Returns the new [Result] where on success a clone of [RBuffer] instance is
1338    /// returned, otherwise the:
1339    /// 
1340    /// * [RwBufferError::BaseTryAgainLater] - is returned if it failed to acquire the base clone
1341    ///     in reasonable time.
1342    /// 
1343    /// * [RwBufferError::TooManyBase] - is returned if limit was reached.
1344    fn try_clone(&self) -> Result<Self, Self::Error> 
1345    {
1346        let inner = self.inner();
1347
1348        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1349        let mut new_flags = current_flags.clone();
1350
1351        new_flags.base()?;
1352
1353        let backoff = Backoff::new();
1354
1355        while backoff.is_completed() == false
1356        {
1357            let res = 
1358                inner
1359                    .flags
1360                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1361
1362            if let Ok(_) = res
1363            {
1364                return Ok(Self(self.0));
1365            }
1366
1367            current_flags = res.err().unwrap().into();
1368            new_flags = current_flags.clone();
1369
1370            new_flags.base()?;
1371
1372            backoff.snooze();
1373        }
1374
1375        return Err(RwBufferError::BaseTryAgainLater);
1376    }
1377}
1378
1379impl Drop for RwBuffer
1380{
1381    /// Drops the RwBuffer instance. In case if there is no readers and
1382    /// writers, then drop immidiatly the inner data.
1383    /// In case if there is any readers or writing, then drop only wrapper which
1384    /// is the zero reader.
1385    fn drop(&mut self)
1386    {
1387        let inner = self.inner();
1388
1389        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1390        let mut new_flags = current_flags.clone();
1391
1392        new_flags.unbase();
1393
1394        let backoff = Backoff::new();
1395
1396        for _ in 0..1000
1397        {
1398            let res = 
1399                inner
1400                    .flags
1401                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1402
1403            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
1404            {
1405                if flags.is_drop_inplace() == true
1406                {
1407                    // call descrutor
1408                    unsafe { ptr::drop_in_place(self.0.as_ptr()) };
1409                }
1410
1411                return;
1412            }
1413
1414            current_flags = res.err().unwrap().into();
1415            new_flags = current_flags.clone();
1416
1417            new_flags.unbase();
1418
1419            backoff.snooze();
1420        }
1421
1422        // todo... solve this situation somehow
1423        panic!("assertion trap: RwBuffer::drop can not drop RwBuffer in reasonable time!");
1424    }
1425}
1426
1427/// An instance which controls the allocation of the new buffers or
1428/// reusage of already created and free instances. This instance is
1429/// not thread safe. The external mutex should be used.
1430#[derive(Debug)]
1431pub struct RwBuffers
1432{
1433    /// A buffer length in bytes. Not aligned.
1434    buf_len: usize,
1435
1436    /// A maximum slots for new buffers.
1437    bufs_cnt_lim: usize,
1438
1439    /// A list of buffers.
1440    buffs: VecDeque<RwBuffer>
1441
1442}
1443
1444impl RwBuffers
1445{
1446    /// Creates new instance wshich holds the base reference in the 
1447    /// inner storage with the capacity bounds.
1448    /// 
1449    /// # Arguments
1450    /// 
1451    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
1452    ///     the payload is located.
1453    /// 
1454    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
1455    /// 
1456    /// * `bufs_cnt_lim` - a maximum amount of the available slots. Determines the 
1457    ///     capacity bounds.
1458    /// 
1459    /// # Returns
1460    /// 
1461    /// A [Result] in form of [RwBufferRes] is returned with:
1462    /// 
1463    /// * [Result::Ok] with the [RwBuffers] instance
1464    /// 
1465    /// * [Result::Err] with error type is returned:
1466    /// 
1467    /// - [RwBufferError::InvalidArguments] is returned when the arguments are
1468    ///     incorrect. 
1469    pub
1470    fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
1471    {
1472        if pre_init_cnt > bufs_cnt_lim
1473        {
1474            return Err(RwBufferError::InvalidArguments);
1475        }
1476        else if buf_len == 0
1477        {
1478            return Err(RwBufferError::InvalidArguments);
1479        }
1480
1481        let buffs: VecDeque<RwBuffer> = 
1482            if pre_init_cnt > 0
1483            {
1484                let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
1485
1486                for _ in 0..pre_init_cnt
1487                {
1488                    buffs.push_back(RwBuffer::new(buf_len));
1489                }
1490
1491                buffs
1492            }
1493            else
1494            {
1495                VecDeque::with_capacity(bufs_cnt_lim)
1496            };
1497
1498        return Ok(
1499            Self
1500            {
1501                buf_len: buf_len,
1502                bufs_cnt_lim: bufs_cnt_lim,
1503                buffs: buffs,
1504            }
1505        )
1506    }
1507
1508    /// Same as `new` but without any limits. Unbounded storage.
1509    /// 
1510    /// # Arguments
1511    /// 
1512    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
1513    ///     the payload is located.
1514    /// 
1515    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
1516    /// 
1517    /// # Returns
1518    /// 
1519    /// Returns the instance.
1520    pub
1521    fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
1522    {
1523        let mut buffs = VecDeque::with_capacity(pre_init_cnt);
1524
1525        for _ in 0..pre_init_cnt
1526        {
1527            buffs.push_back(RwBuffer::new(buf_len));
1528        }
1529
1530        return
1531            Self
1532            {
1533                buf_len: buf_len,
1534                bufs_cnt_lim: 0,
1535                buffs: buffs,
1536            };
1537    }
1538
1539    /// Allocates either a new buffer or reuse the free. If the instance
1540    /// is created with bounds then in case if no free slots available
1541    /// returns error.
1542    /// 
1543    /// # Returns
1544    /// 
1545    /// A [Result] in form of [RwBufferRes] is returned with:
1546    /// 
1547    /// * [Result::Ok] with the [RwBuffers] instance
1548    /// 
1549    /// * [Result::Err] with error codes:
1550    ///  
1551    /// 
1552    /// * [RwBufferError::OutOfBuffers] - if limit was reached.
1553    /// 
1554    /// * [RwBufferError::TooManyBase] - should not appear, but if would
1555    ///     it means that there is a bug somewhere in the code.
1556    pub
1557    fn allocate(&mut self) -> RwBufferRes<RwBuffer>
1558    {
1559        // check the list if any available
1560        for buf in self.buffs.iter()
1561        {
1562            if let Ok(rwbuf) = buf.acqiure_if_free()
1563            {
1564                return Ok(rwbuf);
1565            }
1566        }
1567
1568        if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
1569        {
1570            let buf = RwBuffer::new(self.buf_len);
1571            let c_buf = buf.clone_single()?;
1572
1573            self.buffs.push_back(buf);
1574
1575            return Ok(c_buf);
1576        }
1577
1578        return Err(RwBufferError::OutOfBuffers);
1579    }
1580
1581    /// Allocates a buffer "in place" i.e finds the next allocated but unused
1582    /// buffer and removes it from the list or alloactes new buffer without
1583    /// adding it to the list. Should never return error.
1584    /// 
1585    /// # Returns
1586    /// 
1587    /// A [RwBuffer] is returned.
1588    pub
1589    fn allocate_in_place(&mut self) -> RwBuffer
1590    {
1591        let mut idx = Option::None;
1592
1593        for (i, item) in self.buffs.iter().enumerate()
1594        {
1595            if let Ok(_) = self.buffs[i].acqiure_if_free()
1596            {
1597                idx = Some(i);
1598
1599                break;
1600            }
1601        }
1602
1603        return 
1604            idx
1605                .map_or(
1606                    RwBuffer::new(self.buf_len), 
1607                    |f| self.buffs.remove(f).unwrap()
1608                );
1609
1610    }
1611
1612    /// Retains the buffer list by removing any unused buffers as many times
1613    /// as set in the argument `cnt`. It does not guaranty than the selected 
1614    /// amount will be freed.
1615    /// 
1616    /// # Arguments
1617    /// 
1618    /// * `cnt` - how many slots to clean before exit.
1619    /// 
1620    /// # Returns 
1621    /// 
1622    /// A [usize] is returned which indicates how many instances was removed
1623    /// before the `cnt` was reached. 
1624    pub
1625    fn compact(&mut self, mut cnt: usize) -> usize
1626    {
1627        let p_cnt = cnt;
1628
1629        self
1630            .buffs
1631            .retain(
1632                |buf|
1633                {
1634                    if buf.is_free() == true
1635                    {
1636                        cnt -= 1;
1637
1638                        return false;
1639                    }
1640
1641                    return true;
1642                }
1643            );
1644
1645        return p_cnt - cnt;
1646    }
1647
1648    #[cfg(test)]
1649    fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags<RwBuffer>>
1650    {
1651        return Some(self.buffs.get(index)?.get_flags());
1652    }
1653}
1654
1655#[cfg(feature = "std")]
1656#[cfg(test)]
1657mod tests
1658{
1659    use std::time::{Duration, Instant};
1660
1661    use tokio::task;
1662
1663    use super::*;
1664
1665    #[test]
1666    fn simple_test()
1667    {
1668        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1669
1670        let buf0_res = bufs.allocate();
1671        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1672
1673        let buf0 = buf0_res.unwrap();
1674
1675        let buf0_w = buf0.write();
1676        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1677        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1678        drop(buf0_w);
1679
1680        let buf0_r = buf0.read();
1681        assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1682        assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1683
1684        let buf0_1 = buf0.clone();
1685        assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
1686
1687        let flags0 = buf0.get_flags();
1688        let flags0_1 = buf0_1.get_flags();
1689
1690        assert_eq!(flags0, flags0_1);
1691        assert_eq!(flags0.base, 3);
1692        assert_eq!(flags0.read, 1);
1693        assert_eq!(flags0.write, false);
1694    }
1695
1696    #[test]
1697    fn simple_test_dopped_in_place()
1698    {
1699        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1700
1701        let buf0_res = bufs.allocate();
1702        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1703
1704        let buf0 = buf0_res.unwrap();
1705
1706        println!("{:?}", buf0.get_flags());
1707
1708        let buf0_w = buf0.write();
1709        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1710        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1711
1712        drop(buf0);
1713
1714        let buf0_flags = bufs.get_flags_by_index(0);
1715        assert_eq!(buf0_flags.is_some(), true, "no flags");
1716        let buf0_flags = buf0_flags.unwrap();
1717
1718        println!("{:?}", buf0_flags);
1719
1720        assert_eq!(buf0_flags.base, 1);
1721        assert_eq!(buf0_flags.read, 0);
1722        assert_eq!(buf0_flags.write, true);
1723
1724        drop(buf0_w.unwrap());
1725
1726        let buf0_flags = bufs.get_flags_by_index(0);
1727        assert_eq!(buf0_flags.is_some(), true, "no flags");
1728        let buf0_flags = buf0_flags.unwrap();
1729
1730        println!("{:?}", buf0_flags);
1731
1732        assert_eq!(buf0_flags.base, 1);
1733        assert_eq!(buf0_flags.read, 0);
1734        assert_eq!(buf0_flags.write, false);
1735
1736    }
1737
1738    #[test]
1739    fn simple_test_dropped_in_place_downgrade()
1740    {
1741        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1742
1743        let buf0_res = bufs.allocate();
1744        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1745
1746        let buf0 = buf0_res.unwrap();
1747
1748        println!("{:?}", buf0.get_flags());
1749
1750        let buf0_w = buf0.write();
1751        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1752        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1753
1754        drop(buf0);
1755
1756        let buf0_rd = buf0_w.unwrap().downgrade();
1757        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1758
1759        let buf0_flags = bufs.get_flags_by_index(0);
1760        assert_eq!(buf0_flags.is_some(), true, "no flags");
1761        let buf0_flags = buf0_flags.unwrap();
1762
1763        println!("{:?}", buf0_flags);
1764
1765        assert_eq!(buf0_flags.base, 1);
1766        assert_eq!(buf0_flags.read, 1);
1767        assert_eq!(buf0_flags.write, false);
1768
1769    }
1770
1771    #[test]
1772    fn simple_test_drop_in_place_downgrade()
1773    {
1774        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1775
1776        let buf0_w = 
1777            {
1778                let buf0 = bufs.allocate_in_place();
1779
1780                println!("1: {:?}", buf0.get_flags());
1781
1782                let buf0_w = buf0.write();
1783                assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1784                assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1785
1786                drop(buf0);
1787
1788                buf0_w
1789            };
1790
1791        let buf0_rd = buf0_w.unwrap().downgrade();
1792        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1793
1794        let buf0_flags = bufs.get_flags_by_index(0);
1795        assert_eq!(buf0_flags.is_some(), false, "flags");
1796
1797        let buf0_rd = buf0_rd.unwrap();
1798        let buf0_flags = buf0_rd.get_flags();
1799
1800        println!("2: {:?}", buf0_flags);
1801
1802        assert_eq!(buf0_flags.base, 0);
1803        assert_eq!(buf0_flags.read, 1);
1804        assert_eq!(buf0_flags.write, false);
1805    }
1806
1807    #[test]
1808    fn timing_test()
1809    {
1810        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1811
1812        for _ in 0..10
1813        {
1814            let inst = Instant::now();
1815            let buf0_res = bufs.allocate_in_place();
1816            let end = inst.elapsed();
1817
1818            println!("alloc: {:?}", end);
1819            drop(buf0_res);
1820        }
1821
1822        let buf0_res = bufs.allocate();
1823        assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
1824
1825        let buf0 = buf0_res.unwrap();
1826
1827        for _ in 0..10
1828        {
1829            let inst = Instant::now();
1830            let buf0_w = buf0.write();
1831            let end = inst.elapsed();
1832
1833            println!("write: {:?}", end);
1834
1835            assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1836            assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1837            drop(buf0_w);
1838        }
1839
1840        for _ in 0..10
1841        {
1842            let inst = Instant::now();
1843            let buf0_r = buf0.read();
1844            let end = inst.elapsed();
1845
1846            println!("read: {:?}", end);
1847
1848            assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
1849            assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
1850            drop(buf0_r);
1851        }
1852    }
1853
1854    #[test]
1855    fn simple_test_mth()
1856    {
1857        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1858
1859        let buf0 = bufs.allocate().unwrap();
1860
1861        let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
1862
1863        let join1=
1864            std::thread::spawn(move ||
1865                {
1866                    println!("{:?}", buf0_rd);
1867
1868                    std::thread::sleep(Duration::from_secs(2));
1869
1870                    return;
1871                }
1872            );
1873
1874        let buf1_rd = buf0.read().unwrap();
1875
1876        let join2=
1877            std::thread::spawn(move ||
1878                {
1879                    println!("{:?}", buf1_rd);
1880
1881                    std::thread::sleep(Duration::from_secs(2));
1882
1883                    return;
1884                }
1885            );
1886
1887        let flags = buf0.get_flags();
1888
1889        assert_eq!(flags.base, 2);
1890        assert_eq!(flags.read, 2);
1891        assert_eq!(flags.write, false);
1892
1893        let _ = join1.join();
1894        let _ = join2.join();
1895
1896        let flags = buf0.get_flags();
1897
1898        assert_eq!(flags.base, 2);
1899        assert_eq!(flags.read, 0);
1900        assert_eq!(flags.write, false);
1901    }
1902
1903    #[test]
1904    fn simple_test_concurent()
1905    {
1906        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
1907
1908        let buf0 = bufs.allocate().unwrap();
1909
1910        let buf0_w = buf0.write().unwrap();
1911
1912
1913        let join1=
1914            std::thread::spawn(move ||
1915                {
1916                    std::thread::park();
1917
1918                    println!("{:?}", buf0_w);
1919
1920                    std::thread::sleep(Duration::from_secs(3));
1921
1922                    return;
1923                }
1924            );
1925
1926        join1.thread().unpark();
1927        let s = Instant::now();
1928
1929        let buf1_rd = 
1930            loop
1931            {
1932                match buf0.read()
1933                {
1934                    Ok(r) => break r,
1935                    Err(e) =>
1936                    {
1937                        assert_eq!(e, RwBufferError::ReadTryAgianLater);
1938                        //println!("try again later!");
1939
1940                        continue;
1941                    }
1942                }
1943            };
1944
1945        let e = s.elapsed();
1946
1947        println!("read await {:?} {}", e, e.as_millis());
1948
1949        assert_eq!(e.as_millis(), 3000);
1950
1951        let _ = join1.join();
1952
1953      
1954        let flags = buf0.get_flags();
1955
1956        assert_eq!(flags.base, 2);
1957        assert_eq!(flags.read, 1);
1958        assert_eq!(flags.write, false);
1959
1960        drop(buf1_rd);
1961
1962        let flags = buf0.get_flags();
1963
1964        assert_eq!(flags.base, 2);
1965        assert_eq!(flags.read, 0);
1966        assert_eq!(flags.write, false);
1967    }
1968
1969    #[test]
1970    fn test_try_into_read()
1971    {
1972        let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
1973
1974        let buf0 = bufs.allocate_in_place();
1975
1976        println!("{:?}", buf0.get_flags());
1977
1978        let buf0_w = buf0.write();
1979        assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
1980        assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
1981
1982        drop(buf0);
1983
1984        let buf0_rd = buf0_w.unwrap().downgrade();
1985        assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
1986
1987        let buf0_flags = bufs.get_flags_by_index(0);
1988        assert_eq!(buf0_flags.is_some(), false, "flags");
1989
1990        let buf0_rd = buf0_rd.unwrap();
1991        let buf0_flags = buf0_rd.get_flags();
1992
1993        println!("{:?}", buf0_flags);
1994
1995        assert_eq!(buf0_flags.base, 0);
1996        assert_eq!(buf0_flags.read, 1);
1997        assert_eq!(buf0_flags.write, false);
1998
1999        let inst = Instant::now();
2000        let ve = buf0_rd.try_inner();
2001        let end = inst.elapsed();
2002
2003        println!("try inner: {:?}", end);
2004        assert_eq!(ve.is_ok(), true);
2005
2006
2007    }
2008
2009    #[tokio::test]
2010    async fn test_multithreading()
2011    {
2012
2013        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
2014
2015        let buf0 = bufs.allocate().unwrap();
2016
2017        let mut buf0_write = buf0.write().unwrap();
2018        
2019        buf0_write.as_mut_slice()[0] = 5;
2020        buf0_write.as_mut_slice()[1] = 4;
2021
2022        println!("{}", buf0_write[0]);
2023
2024        let buf0_r = buf0_write.downgrade().unwrap();
2025
2026        let join1=
2027            tokio::task::spawn(async move
2028                {
2029                    println!("thread[1]:{}", buf0_r[0]);
2030
2031                    tokio::time::sleep(Duration::from_millis(200)).await;
2032
2033                    return;
2034                }
2035            );
2036
2037        let buf0_r = buf0.read().unwrap();
2038
2039        // drop base
2040        drop(buf0);
2041
2042        let join2=
2043            tokio::task::spawn(async move
2044                {
2045                    println!("thread[2]: {}", buf0_r[0]);
2046                    println!("thread[2]: {}", buf0_r[1]);
2047
2048                    tokio::time::sleep(Duration::from_millis(200)).await;
2049
2050                    return;
2051                }
2052            );
2053
2054        let _ = join1.await;
2055        let _ = join2.await;
2056
2057        return;
2058    }
2059
2060    #[tokio::test]
2061    async fn test_multithreading_async()
2062    {
2063
2064        let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
2065
2066        let buf0 = bufs.allocate().unwrap();
2067
2068        let buf0_w = buf0.write_async().await.unwrap();
2069
2070        let task_hndl = 
2071            task::spawn(async move
2072                {
2073                    tokio::task::yield_now().await;
2074
2075                    println!("{:?}", buf0_w);
2076
2077                    tokio::time::sleep(Duration::from_secs(3)).await;
2078
2079                    async_drop(buf0_w).await;
2080                    return;
2081                }
2082            );
2083
2084        let s = tokio::time::Instant::now();
2085
2086        let buf1_rd = buf0.read_async().await.unwrap();
2087
2088        let e = s.elapsed();
2089
2090        println!("read await {:?} {}", e, e.as_millis());
2091
2092        assert_eq!(e.as_millis(), 3000);
2093
2094        let _ = task_hndl.await;
2095
2096      
2097        let flags = buf0.get_flags();
2098
2099        assert_eq!(flags.base, 2);
2100        assert_eq!(flags.read, 1);
2101        assert_eq!(flags.write, false);
2102
2103        async_drop(buf1_rd).await;
2104
2105        let flags = buf0.get_flags();
2106
2107        assert_eq!(flags.base, 2);
2108        assert_eq!(flags.read, 0);
2109        assert_eq!(flags.write, false);
2110    }
2111}