shared_buffer_rs/
lib.rs

1/*-
2 * shared-buffer-rs - a buffer managment and sharing crate.
3 * 
4 * Copyright (C) 2025 Aleksandr Morozov
5 * 
6 * The scram-rs crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
10 *                     
11 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
12 */
13
14/*! A small crate which implements a thread safe implementation to allocate
15 the buffer of some size, borrow as muable in current context or thread
16 and borrow as many read only references as needed which are Send+Sync.
17
18 It acts like Arc but embeds the RefCell functionality without any
19 issues with Send and Sync.
20
21 The main purpose it to have a lock free, lightweight buffer I/O for
22 writing in one side and broadcast to multiple tasks i.e threads and
23 async task making sure that it can not be modifyied.
24 */
25
26#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(not(feature = "std"))]
29extern crate alloc;
30
31
32#[cfg(not(feature = "std"))]
33use core::
34{
35    fmt, 
36    mem, 
37    ops::{Deref, DerefMut}, 
38    ptr::{self, NonNull}, 
39    sync::atomic::{AtomicU64, Ordering}
40};
41
42#[cfg(not(feature = "std"))]
43use core::{marker::PhantomData, task::Poll, time::Duration};
44
45#[cfg(not(feature = "std"))]
46use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
47
48#[cfg(not(feature = "std"))]
49use alloc::vec;
50use crossbeam_utils::Backoff;
51
52#[cfg(feature = "std")]
53use std::
54{
55    collections::VecDeque, 
56    ops::{Deref, DerefMut}, 
57    ptr::{self, NonNull}, 
58    sync::atomic::{AtomicU64, Ordering},
59    fmt,
60    mem
61};
62
63#[cfg(feature = "std")]
64use std::{marker::PhantomData, task::Poll, time::Duration};
65
66extern crate crossbeam_utils;
67
68/// A local try_clone trait.
69pub trait TryClone: Sized 
70{
71    type Error;
72
73    /// Attempts to clone the instance in reasonable time without completly
74    /// blocking the thread.
75    fn try_clone(&self) -> Result<Self, Self::Error>;
76}
77
78/// An interface of the `async_drop` for async.
79pub trait LocalAsyncDrop: Send + Sync + 'static
80{
81    /// Same as `drop()` but for `async`.
82    fn async_drop(&mut self) -> impl std::future::Future<Output = ()>;
83}
84
85/// An interface of the `async_clone` for async.
86pub trait LocalAsyncClone: Send + Sync + 'static
87{
88    /// Same as `clone()` but for `async`.
89    fn async_clone(&self) -> impl std::future::Future<Output = Self>;
90}
91
92/// Use this function to drop [RBuffer] and [WBuffer] obtained via
93/// `async_*`.
94pub async 
95fn async_drop<LAD: LocalAsyncDrop + Send + Sync>(mut lad: LAD)
96{
97    lad.async_drop().await;
98
99    drop(lad);
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum RwBufferError
104{
105    /// The limit on clones of [RBuffer] was reached.
106    TooManyRead,
107
108    /// The limit on clones of [RwBuffer] was reched.
109    TooManyBase,
110
111    /// Just retry read operation later.
112    ReadTryAgianLater,
113
114    /// Just retry write operation later.
115    WriteTryAgianLater,
116
117    /// Just retry cloning [RwBuffer] the base or any other related operation later.
118    BaseTryAgainLater,
119
120    /// No more buffers left.
121    OutOfBuffers,
122
123    /// Can not downgrade [WBuffer] to [RBuffer]
124    DowngradeFailed,
125
126    /// The provided arguments are not valid.
127    InvalidArguments,
128
129    /// The buffer is busy.
130    Busy,
131}
132
133impl fmt::Display for RwBufferError
134{
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
136    {
137        match self
138        {
139            Self::TooManyRead => 
140                write!(f, "TooManyRead: read soft limit reached"),
141            Self::TooManyBase => 
142                write!(f, "TooManyBase: base soft limit reached"),
143            Self::ReadTryAgianLater => 
144                write!(f, "ReadTryAgianLater: shared access not available, try again later"),
145            Self::WriteTryAgianLater => 
146                write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
147            Self::BaseTryAgainLater => 
148                write!(f, "BaseTryAgainLater: failed to obtain a clone in reasonable time"),
149            Self::OutOfBuffers => 
150                write!(f, "OutOfBuffers: no more free bufers are left"),
151            Self::DowngradeFailed => 
152                write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
153            Self::InvalidArguments => 
154                write!(f, "InvalidArguments: arguments are not valid"),  
155            Self::Busy => 
156                write!(f, "RwBuffer is busy and cannot be acquired"),       
157        }
158    }
159}
160
161pub type RwBufferRes<T> = Result<T, RwBufferError>;
162
163/// A read only buffer. This instance is [Send] and [Sync]
164/// as it does not provide any write access.
165#[derive(Debug)]
166pub struct RBuffer
167{
168    /// The inner read only
169    inner: NonNull<RwBufferInner>,
170
171    /// Is set to `true` when async_drop was performed earlier.
172    a_dropped: bool,
173}
174
175unsafe impl Send for RBuffer {}
176unsafe impl Sync for RBuffer {}
177
178impl RwBufType for RBuffer {}
179
180impl Eq for RBuffer {}
181
182impl PartialEq for RBuffer
183{
184    fn eq(&self, other: &Self) -> bool 
185    {
186        return self.inner == other.inner;
187    }
188}
189
190impl RBuffer
191{
192    #[inline]
193    fn new(inner: NonNull<RwBufferInner>) -> Self
194    {
195        return Self{ inner, a_dropped: false };
196    }
197
198    #[cfg(test)]
199    fn get_flags(&self) -> RwBufferFlags<Self>
200    {
201        use core::sync::atomic::Ordering;
202
203        let inner = unsafe{ self.inner.as_ref() };
204
205        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
206
207        return flags;
208    }
209
210    /// Borrow the inner buffer as slice.
211    pub
212    fn as_slice(&self) -> &[u8]
213    {
214        let inner = unsafe { self.inner.as_ref() };
215
216        return inner.buf.as_ref().unwrap().as_slice();
217    }
218
219    /// Attempts to consume the instance and retrive the inner buffer. This means
220    /// that the instance will no longer be available.
221    /// 
222    /// > Safe to call from async.
223    ///
224    /// The following condition should be satisfied:
225    /// 1) No more readers except current instance.
226    ///
227    /// 2) No base references, item should not contain base references from [RwBuffer].
228    /// 
229    /// # Returns
230    /// 
231    /// A [Result] is returned with: 
232    /// 
233    /// * [Result::Ok] with the consumed inner [Vec]
234    /// 
235    /// * [Result::Err] with the consumed instance
236    pub
237    fn try_inner(mut self) -> Result<Vec<u8>, Self>
238    {
239        let inner = unsafe { self.inner.as_ref() };
240
241        let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
242
243       // new_flags.unread();
244
245        if current_flags.try_inner_check() == true
246        {
247            // in theory if at that moment only one read operation left, then no other can occure
248            // because the current_flags are obtained with SeqCst, and no read can appear
249
250            let inner = unsafe { self.inner.as_mut() };
251
252            let buf = inner.buf.take().unwrap();
253        
254            // even if the instance is used with async, this must never block because 
255            // instance is already uniq
256            drop(self);
257
258            return Ok(buf);
259        }
260
261        return Err(self);        
262    }
263
264    fn inner(&self) -> &RwBufferInner
265    {
266        return unsafe { self.inner.as_ref() };
267    }
268}
269
270impl Deref for RBuffer
271{
272    type Target = Vec<u8>;
273
274    fn deref(&self) -> &Vec<u8>
275    {
276        let inner = self.inner();
277
278        return inner.buf.as_ref().unwrap();
279    }
280}
281
282impl LocalAsyncClone for RBuffer
283{
284    fn async_clone(&self) -> impl std::future::Future<Output = Self> 
285    {
286        return 
287            std::future::poll_fn(
288                |cx|
289                {
290                    let inner = self.inner();
291
292                    let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
293                    let mut new_flags = current_flags.clone();
294
295                    new_flags.read().unwrap();
296
297                    let res = 
298                        inner
299                            .flags
300                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
301
302                    if let Ok(_) = res
303                    {
304                        return Poll::Ready( Self{ inner: self.inner, a_dropped: self.a_dropped } );
305                    }
306
307                    return Poll::Pending;
308                }
309            );
310    }
311}
312
313impl Clone for RBuffer
314{
315    /// Attempts to clone the RBuffer incrementing the `read` reference. 
316    /// Would block until the clone is obtained. Should not block for long 
317    /// time. Because this crate is experimental, it will panic if it will
318    /// not be able to obtain clone.
319    /// 
320    /// # Returns 
321    /// 
322    /// Returns the new [RBuffer] instance.
323    /// 
324    /// # Panic
325    /// 
326    /// Panics if too many references were created. The reference count
327    /// is limited to max::u32 - 10. Or will panic if will not be able to obtain 
328    /// a clone of the [RBuffer] in reasonable time as this must not block
329    /// for a long time.
330    fn clone(&self) -> Self 
331    {
332        let inner = self.inner();
333
334        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
335        let mut new_flags = current_flags.clone();
336
337        new_flags.read().unwrap();
338
339        let backoff = Backoff::new();
340        let mut parked = false;
341
342        loop
343        {
344            let res = 
345                inner
346                    .flags
347                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
348
349            if let Ok(_) = res
350            {
351                return Self{ inner: self.inner, a_dropped: self.a_dropped };
352            }
353
354            current_flags = res.err().unwrap().into();
355            new_flags = current_flags.clone();
356
357            new_flags.read().unwrap();
358
359            if backoff.is_completed() == false
360            {
361                backoff.snooze();
362            }
363            else
364            {
365                if parked == false
366                {
367                    // last attempt
368                    std::thread::park_timeout(Duration::from_millis(1));
369
370                    parked = true;
371                }
372                else
373                {
374                    panic!("can not obtain a clone of RBuffer!");
375                }
376            }
377        }
378    }
379}
380
381impl TryClone for RBuffer
382{
383    type Error = RwBufferError;
384
385    /// Attempts to clone the RBuffer incrementing the `read` reference.
386    /// 
387    /// # Returns 
388    /// 
389    /// Returns the new [Result] where on success a clone of [RBuffer] instance is
390    /// returned, otherwise the:
391    /// 
392    /// * [RwBufferError::ReadTryAgianLater] - is returned if it failed to acquire the read clone
393    ///     in reasonable time.
394    /// 
395    /// * [RwBufferError::TooManyRead] - is returned if limit was reached.
396    fn try_clone(&self) -> Result<Self, Self::Error> 
397    {
398        let inner = self.inner();
399
400        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
401        let mut new_flags = current_flags.clone();
402
403        new_flags.read()?;
404
405        let backoff = Backoff::new();
406
407        loop
408        {
409            let res = 
410                inner
411                    .flags
412                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
413
414            if let Ok(_) = res
415            {
416                return Ok(Self{ inner: self.inner, a_dropped: self.a_dropped });
417            }
418
419            current_flags = res.err().unwrap().into();
420            new_flags = current_flags.clone();
421
422            new_flags.read()?;
423
424            if backoff.is_completed() == false
425            {
426                backoff.snooze();
427            }
428            else
429            {
430                break;
431            }
432        }
433
434        return Err(RwBufferError::ReadTryAgianLater);
435    }
436}
437
438impl LocalAsyncDrop for RBuffer
439{
440    fn async_drop(&mut self) -> impl std::future::Future<Output = ()>
441    {
442        self.a_dropped = true;
443
444        return 
445            std::future::poll_fn(
446                |cx|
447                {
448                    let inner = self.inner();
449
450                    let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
451                    let mut new_flags = current_flags.clone();
452
453                    new_flags.unread();
454
455                    let res = 
456                        inner
457                            .flags
458                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
459
460                    if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
461                    {
462                        if flags.is_drop_inplace() == true
463                        {
464                            // call descrutor
465                            unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
466                        }
467
468                        return Poll::Ready(());
469                    }
470
471                    cx.waker().wake_by_ref();
472
473                    return Poll::Pending;
474                }
475            );
476    }
477}
478
479impl Drop for RBuffer
480{
481    /// Should completly drop the instance (with data) only, if there is no more
482    /// readers or it is not referenced in the base.
483    /// 
484    /// # Panic
485    /// 
486    /// May panic if it will not be able to drop the instance in reasonable time i.e in 
487    /// 1000 attempts.
488    fn drop(&mut self)
489    {
490        if self.a_dropped == true
491        {
492            return;
493        }
494
495        let inner = self.inner();
496
497        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
498        let mut new_flags = current_flags.clone();
499
500        new_flags.unread();
501
502        let backoff = Backoff::new();
503        
504        for _ in 0..1000
505        {
506            let res = 
507                inner
508                    .flags
509                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
510
511            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
512            {
513                if flags.is_drop_inplace() == true
514                {
515                    // call descrutor
516                    unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
517                }
518
519                return;
520            }
521
522            current_flags = res.err().unwrap().into();
523            new_flags = current_flags.clone();
524
525            new_flags.unread();
526
527            backoff.snooze();
528        }
529
530        // todo... solve this situation somehow
531        panic!("assertion trap: RBuffer::drop can not drop RBuffer in reasonable time!");
532    }
533}
534
535/// A Write and Read buffer. An exclusive instance which can not be copied or
536/// clonned. Once writing is complete, the instance can be dropped or downgraded to
537/// Read-only instance. This instance is NOT [Send] and [Sync]. 
538#[derive(Debug, PartialEq, Eq)]
539pub struct WBuffer
540{
541    /// A pointer to the leaked buffer instance.
542    buf: NonNull<RwBufferInner>,
543
544    /// Is set to `true` when `downgrade` is called.
545    downgraded: bool,
546}
547
548unsafe impl Send for WBuffer{}
549unsafe impl Sync for WBuffer{}
550
551impl RwBufType for WBuffer{}
552
553impl WBuffer
554{
555    #[inline]
556    fn new(inner: NonNull<RwBufferInner>) -> Self
557    {
558        return Self{ buf: inner, downgraded: false };
559    }
560
561    /// Attempts to downgrade the `write` instance into the `read` instance by consuming the
562    /// [WBuffer]. 
563    /// 
564    /// Can not be performed vice-versa (at least in this version). In normal conditions
565    /// should never return Error.
566    /// 
567    /// # Returns 
568    /// 
569    /// A [Result] is returned with the [RBuffer] on success. The [Result::Err] is returned 
570    /// if it failed to downgrade instance in resonable time.
571    pub
572    fn downgrade(mut self) ->  Result<RBuffer, Self>
573    {
574        let inner = unsafe { self.buf.as_ref() };
575
576        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
577        let mut new_flags = current_flags.clone();
578
579        new_flags.downgrade();
580
581        let backoff = Backoff::new();
582
583        while backoff.is_completed() == false
584        {
585            let res = 
586                inner
587                    .flags
588                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
589
590            if let Ok(_) = res
591            {
592                self.downgraded = true;
593
594                return Ok(RBuffer::new(self.buf.clone()));
595            }
596
597            current_flags = res.err().unwrap().into();
598            new_flags = current_flags.clone();
599
600            new_flags.downgrade();
601
602            backoff.snooze();
603        }
604
605        return Err(self);
606    }
607
608    pub
609    fn as_slice(&self) -> &[u8]
610    {
611        let inner = unsafe { self.buf.as_ref() };
612
613        return inner.buf.as_ref().unwrap()
614    }
615}
616
617impl Deref for WBuffer
618{
619    type Target = Vec<u8>;
620
621    fn deref(&self) -> &Vec<u8>
622    {
623        let inner = unsafe { self.buf.as_ref() };
624
625        return inner.buf.as_ref().unwrap();
626    }
627}
628
629impl DerefMut for WBuffer
630{
631    fn deref_mut(&mut self) -> &mut Vec<u8>
632    {
633        let inner = unsafe { self.buf.as_mut() };
634
635        return inner.buf.as_mut().unwrap();
636    }
637}
638
639impl Drop for WBuffer
640{
641    /// The instance may perform `drop_in_place` if there is no `base` references.
642    fn drop(&mut self)
643    {
644        if self.downgraded == true
645        {
646            return;
647        }
648
649        let inner = unsafe { self.buf.as_ref() };
650
651        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
652        let mut new_flags = current_flags.clone();
653
654        new_flags.unwrite();
655
656        let backoff = Backoff::new();
657
658        for _ in 0..1000
659        {
660            let res = 
661                inner
662                    .flags
663                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
664
665            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
666            {
667                if flags.is_drop_inplace() == true
668                {
669                    // call descrutor
670                    unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
671                }
672
673                return;
674            }
675
676            current_flags = res.err().unwrap().into();
677            new_flags = current_flags.clone();
678
679            new_flags.unwrite();
680
681            backoff.snooze();
682        }
683
684        // todo... solve this situation somehow
685
686        panic!("assertion trap: WBuffer::drop can not drop RBuffer in reasonable time!");
687    }
688}
689
690impl LocalAsyncDrop for WBuffer
691{
692    fn async_drop(&mut self)  -> impl std::future::Future<Output = ()>
693    {
694        self.downgraded = true;
695
696        return 
697            std::future::poll_fn(
698                move |cx|
699                {
700                    let inner = unsafe { self.buf.as_ref() };
701
702                    let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
703                    let mut new_flags = current_flags.clone();
704
705                    new_flags.unwrite();
706
707                    let res = 
708                        inner
709                            .flags
710                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
711
712                    if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
713                    {
714                        if flags.is_drop_inplace() == true
715                        {
716                            // call descrutor
717                            unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
718                        }
719
720                        return Poll::Ready(());
721                    }
722
723                    cx.waker().wake_by_ref();
724
725                    return Poll::Pending;
726                }
727            );
728    }
729}
730
731trait RwBufType {}
732
733/// Internal structure which represents the status. It can not be
734/// larger than 8-byte to fit into [AtomicU64].
735#[repr(align(8))]
736#[derive(Debug, PartialEq, Eq)]
737struct RwBufferFlags<TP>
738{
739    /// A reader refs counter. If larger than 0, no writes possible.
740    read: u32, // = 4
741
742    /// An exclusive write lock. When true, no reades should present.
743    write: bool, // = 1
744
745    /// A base refs i.e which holds the data.
746    /// If this value is zero, means the instance can be dropped in place
747    /// when `write` is false and `read` equals 0.
748    base: u16, // = 2
749
750    /// Unused
751    unused0: u8, // = 1,
752
753    _p: PhantomData<TP>,
754}
755
756
757impl<TP: RwBufType> From<u64> for RwBufferFlags<TP>
758{
759    fn from(value: u64) -> Self
760    {
761        return unsafe { mem::transmute(value) };
762    }
763}
764
765impl<TP: RwBufType> From<RwBufferFlags<TP>> for u64
766{
767    fn from(value: RwBufferFlags<TP>) -> Self
768    {
769        return unsafe { mem::transmute(value) };
770    }
771}
772
773impl<TP: RwBufType> Default for RwBufferFlags<TP>
774{
775    fn default() -> RwBufferFlags<TP>
776    {
777        return
778            Self
779            {
780                read: 0,
781                write: false,
782                base: 1,
783                unused0: 0,
784                _p: PhantomData
785            };
786    }
787}
788
789impl Copy for RwBufferFlags<WBuffer>{}
790
791impl Clone for RwBufferFlags<WBuffer>
792{
793    fn clone(&self) -> Self 
794    {
795        return 
796            Self 
797            { 
798                read: self.read.clone(), 
799                write: self.write.clone(), 
800                base: self.base.clone(), 
801                unused0: self.unused0.clone(), 
802                _p: PhantomData
803            }
804    }
805}
806
807impl Copy for RwBufferFlags<RBuffer>{}
808
809impl Clone for RwBufferFlags<RBuffer>
810{
811    fn clone(&self) -> Self 
812    {
813        return 
814            Self 
815            { 
816                read: self.read.clone(), 
817                write: self.write.clone(), 
818                base: self.base.clone(), 
819                unused0: self.unused0.clone(), 
820                _p: PhantomData
821            }
822    }
823}
824
825impl Copy for RwBufferFlags<RwBuffer>{}
826
827impl Clone for RwBufferFlags<RwBuffer>
828{
829    fn clone(&self) -> Self 
830    {
831        return 
832            Self 
833            { 
834                read: self.read.clone(), 
835                write: self.write.clone(), 
836                base: self.base.clone(), 
837                unused0: self.unused0.clone(), 
838                _p: PhantomData
839            }
840    }
841}
842
843impl RwBufferFlags<WBuffer>
844{
845    #[inline]
846    fn write(&mut self) -> RwBufferRes<()>
847    {
848        if self.read == 0
849        {
850            self.write = true;
851
852            return Ok(());
853        }
854        else
855        {
856            return Err(RwBufferError::WriteTryAgianLater);
857        }
858    }
859
860    #[inline]
861    fn downgrade(&mut self) 
862    {
863        self.write = false;
864        self.read += 1;
865    }
866
867     #[inline]
868    fn unwrite(&mut self)
869    {
870        self.write = false;
871    }
872}
873
874impl RwBufferFlags<RBuffer>
875{
876    #[inline]
877    fn try_inner_check(&self) -> bool
878    {
879        return self.read == 1 && self.write == false && self.base == 0;
880    }
881
882    #[inline]
883    fn unread(&mut self)
884    {
885        self.read -= 1;
886    }
887
888    #[inline]
889    fn read(&mut self) -> RwBufferRes<()>
890    {
891        if self.write == false
892        {
893            self.read += 1;
894
895            if self.read <= Self::MAX_READ_REFS
896            {
897                return Ok(());
898            }
899            
900            return Err(RwBufferError::TooManyRead);
901        }
902
903        return Err(RwBufferError::ReadTryAgianLater);
904    }
905}
906
907impl RwBufferFlags<RwBuffer>
908{
909    #[inline]
910    fn make_pre_unused() -> Self
911    {
912        return Self{ read: 0, write: false, base: 1, unused0: 0, _p: PhantomData };
913    }
914
915    #[inline]
916    fn read(&mut self) -> RwBufferRes<()>
917    {
918        if self.write == false
919        {
920            self.read += 1;
921
922            if self.read <= Self::MAX_READ_REFS
923            {
924                return Ok(());
925            }
926            
927            return Err(RwBufferError::TooManyRead);
928        }
929
930        return Err(RwBufferError::ReadTryAgianLater);
931    }
932
933    #[inline]
934    fn write(&mut self) -> RwBufferRes<()>
935    {
936        if self.read == 0
937        {
938            self.write = true;
939
940            return Ok(());
941        }
942        else
943        {
944            return Err(RwBufferError::WriteTryAgianLater);
945        }
946    }
947    
948    #[inline]
949    fn base(&mut self) -> RwBufferRes<()>
950    {
951        self.base += 1;
952
953        if self.base <= Self::MAX_BASE_REFS
954        {
955            return Ok(());
956        }
957
958        return Err(RwBufferError::TooManyBase);
959    }
960
961    #[inline]
962    fn unbase(&mut self) -> bool
963    {
964        self.base -= 1;
965
966        return self.base != 0;
967    }
968}
969
970impl<TP: RwBufType> RwBufferFlags<TP>
971{
972    /// A soft limit on the amount of references for reading instances.
973    pub const MAX_READ_REFS: u32 = u32::MAX - 2;
974
975    /// A soft limit on the amount of references for base instances.
976    pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
977
978    
979    #[inline]
980    fn is_free(&self) -> bool
981    {
982        return self.write == false && self.read == 0 && self.base == 1;
983    }
984
985    #[inline]
986    fn is_drop_inplace(&self) -> bool
987    {
988        return self.read == 0 && self.write == false && self.base == 0;
989    }
990}
991
992#[derive(Debug)]
993pub struct RwBufferInner
994{
995    /// A [RwBufferFlags] represented as atomic u64.
996    flags: AtomicU64,
997
998    /// A buffer.
999    buf: Option<Vec<u8>>,
1000}
1001
1002impl RwBufferInner
1003{
1004    fn new(buf_size: usize) -> Self
1005    {
1006        return
1007            Self
1008            {
1009                flags: 
1010                    AtomicU64::new(RwBufferFlags::<RwBuffer>::default().into()),
1011                buf: 
1012                    Some(vec![0_u8; buf_size])
1013            };
1014    }
1015}
1016
1017/// A base instance which holds the `leaked` pointer to [RwBufferInner].
1018/// 
1019/// This instance can provide either an exclusive write access or 
1020/// multiple read access, but not at the same time. Can be used to store
1021/// the instance. This instance is [Send] and [Sync] because the insternals
1022/// are guarded by ordered atomic operations.
1023#[derive(Debug, PartialEq, Eq)]
1024pub struct RwBuffer(NonNull<RwBufferInner>);
1025
1026unsafe impl Send for RwBuffer {}
1027unsafe impl Sync for RwBuffer {}
1028
1029impl RwBufType for RwBuffer {}
1030
1031impl RwBuffer
1032{
1033    #[inline]
1034    fn new(buf_size: usize) -> Self
1035    {
1036        let status = Box::new(RwBufferInner::new(buf_size));
1037
1038        return Self(Box::leak(status).into());
1039    }
1040
1041    #[inline]
1042    fn inner(&self) -> &RwBufferInner
1043    {
1044        return unsafe { self.0.as_ref() };
1045    }
1046
1047    /// Checks if this instance satisfies the following conditions:
1048    /// 
1049    /// * No exclusive write access
1050    /// 
1051    /// * No read access
1052    /// 
1053    /// * There is only one base reference.
1054    /// 
1055    /// But since check everything may have been already changed.
1056    /// 
1057    /// # Returns 
1058    /// 
1059    /// * - `true` if instance satisfies the conditions above.
1060    /// 
1061    /// * - `false` if does not satisfy the conditions above.
1062    #[inline]
1063    pub
1064    fn is_free(&self) -> bool
1065    {
1066        let inner = self.inner();
1067
1068        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1069
1070        return flags.is_free();
1071    }
1072
1073    /// Accures the instance, if it satisfy the following conditions:
1074    /// 
1075    /// * No exclusive write access
1076    /// 
1077    /// * No read access
1078    /// 
1079    /// * There is only one base reference.
1080    /// 
1081    /// # Returns 
1082    /// 
1083    /// The [Result] is retuerned with the clonned [RwBuffer] instance or
1084    /// [Result::Err] with the following errors:
1085    /// 
1086    /// * [RwBufferError::Busy] - the instance have already been taken.
1087    /// 
1088    /// * [RwBufferError::TooManyBase] - too many base references are already around.
1089    #[inline]
1090    pub(crate) 
1091    fn acqiure_if_free(&self) -> RwBufferRes<Self>
1092    {
1093        let inner = self.inner();
1094
1095        let current_flags: RwBufferFlags<Self> = RwBufferFlags::make_pre_unused();
1096        let mut new_flags = current_flags.clone();
1097        
1098        new_flags.base()?;
1099
1100        let res = 
1101                inner
1102                    .flags
1103                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1104
1105        if let Ok(_) = res
1106        {
1107            return Ok(Self(self.0.clone()));
1108        }
1109
1110        return Err(RwBufferError::Busy);
1111    }
1112
1113    /// Attemts to make an exclusive (write) access to the buffer.
1114    /// 
1115    /// Would block for short period of time and return error.
1116    /// 
1117    /// # Returns
1118    /// 
1119    /// A [Result] in form of [RwBufferRes] is returned with:
1120    /// 
1121    /// * [Result::Ok] with the [WBuffer] instance
1122    /// 
1123    /// * [Result::Err] may be returned a [RwBufferError::WriteTryAgianLater] in case 
1124    ///     if the there is/are an active `read` references or acquite exc. lock failed.
1125    pub
1126    fn write(&self) -> RwBufferRes<WBuffer>
1127    {
1128        let inner = self.inner();
1129
1130        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1131        let mut new_flags = current_flags.clone();
1132
1133        new_flags.write()?;
1134
1135        let backoff = Backoff::new();
1136
1137        while backoff.is_completed() == false
1138        {
1139            let res = 
1140                inner
1141                    .flags
1142                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1143
1144            if let Ok(_) = res
1145            {
1146                return Ok(WBuffer::new(self.0.clone()));
1147            }
1148
1149            current_flags = res.err().unwrap().into();
1150            new_flags = current_flags.clone();
1151
1152            new_flags.write()?;
1153
1154            backoff.snooze();
1155        }
1156
1157        return Err(RwBufferError::WriteTryAgianLater);
1158    }
1159
1160    /// Attempts to gain an exclusive access in async way.
1161    /// 
1162    /// # Return
1163    /// 
1164    /// Return the same result as [RwBuffer::write].
1165    pub async 
1166    fn write_async(&self) -> RwBufferRes<WBuffer>
1167    {
1168        return 
1169            std::future::poll_fn(
1170                |cx|
1171                {
1172                    let inner = self.inner();
1173
1174                    let current_flags: RwBufferFlags<WBuffer> = inner.flags.load(Ordering::SeqCst).into();
1175                    let mut new_flags = current_flags.clone();
1176
1177                    if let Err(e) = new_flags.write()
1178                    {
1179                        return Poll::Ready(Err(e));
1180                    }
1181
1182                    let res = 
1183                        inner
1184                            .flags
1185                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1186
1187                    if let Ok(_) = res
1188                    {
1189                        return Poll::Ready( Ok( WBuffer::new(self.0.clone()) ) );
1190                    }
1191
1192                    cx.waker().wake_by_ref();
1193
1194                    return Poll::Pending;
1195                }
1196            )
1197            .await;
1198    }
1199    
1200
1201    /// Attemts to make a shared (read) access to the buffer.
1202    /// 
1203    /// Would block for short period of time and return error.
1204    /// 
1205    /// # Returns
1206    /// 
1207    /// A [Result] in form of [RwBufferRes] is returned with:
1208    /// 
1209    /// * [Result::Ok] with the [RBuffer] instance
1210    /// 
1211    /// * [Result::Err] with error type is returned:
1212    /// 
1213    /// - [RwBufferError::TooManyRead] is returned when the soft limit of
1214    ///     references was reached.
1215    /// 
1216    /// - [RwBufferError::ReadTryAgianLater] is returned if there is an 
1217    ///     active exclusive access.
1218    pub
1219    fn read(&self) -> RwBufferRes<RBuffer>
1220    {
1221
1222        let inner = self.inner();
1223
1224        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1225        let mut new_flags = current_flags.clone();
1226
1227        new_flags.read()?;
1228
1229        let backoff = Backoff::new();
1230
1231        while backoff.is_completed() == false
1232        {
1233            let res = 
1234                inner
1235                    .flags
1236                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1237
1238            if let Ok(_) = res
1239            {
1240                return Ok(RBuffer::new(self.0.clone()));
1241            }
1242
1243            current_flags = res.err().unwrap().into();
1244            new_flags = current_flags.clone();
1245
1246            new_flags.read()?;
1247
1248            backoff.snooze();
1249        }
1250
1251        return Err(RwBufferError::ReadTryAgianLater);
1252    }
1253
1254    /// Attempts to gain an shared access in async way.
1255    /// 
1256    /// # Return
1257    /// 
1258    /// Return the same result as [RwBuffer::read].
1259    pub async 
1260    fn read_async(&self) -> RwBufferRes<RBuffer>
1261    {
1262        return 
1263            std::future::poll_fn(
1264                |cx|
1265                {
1266                    let inner = self.inner();
1267
1268                    let current_flags: RwBufferFlags<RBuffer> = inner.flags.load(Ordering::SeqCst).into();
1269                    let mut new_flags = current_flags.clone();
1270
1271                    match new_flags.read()
1272                    {
1273                        Ok(_) => {},
1274                        Err(RwBufferError::TooManyRead) => 
1275                            return Poll::Ready(Err(RwBufferError::TooManyRead)),
1276                        Err(RwBufferError::ReadTryAgianLater) =>
1277                        {
1278                            cx.waker().wake_by_ref();
1279
1280                            return Poll::Pending;
1281                        },
1282                        Err(e) =>
1283                            panic!("assertion trap: unknown error {} in Future for AsyncRBuffer", e)
1284                    }
1285
1286                    let res = 
1287                        inner
1288                            .flags
1289                            .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1290
1291                    if let Ok(_) = res
1292                    {
1293                        return Poll::Ready( Ok( RBuffer::new(self.0.clone()) ) );
1294                    }
1295
1296                    cx.waker().wake_by_ref();
1297
1298                    return Poll::Pending;
1299                }
1300            )
1301            .await;
1302    }
1303
1304    #[cfg(test)]
1305    fn get_flags(&self) -> RwBufferFlags<Self>
1306    {
1307        let inner = self.inner();
1308
1309        let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Acquire).into();
1310
1311        return flags;
1312    }
1313
1314    /// Clones the freshly created instance which is visible for current thread only!
1315    /// 
1316    /// # Returns 
1317    /// 
1318    /// A [Result] is returned with error [RwBufferError::TooManyBase] if too many copies 
1319    /// of base are made.
1320    fn clone_single(&self) -> RwBufferRes<Self>
1321    {
1322        let inner = self.inner();
1323
1324        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1325
1326        current_flags.base()?;
1327
1328        inner.flags.store(current_flags.into(), Ordering::Relaxed);
1329
1330        return Ok(Self(self.0));
1331    }
1332}
1333
1334impl Clone for RwBuffer
1335{
1336    /// Clones the instance and increasing the `base` ref count.
1337    /// 
1338    /// Will `panic` if a soft limit of refs were reached.
1339    fn clone(&self) -> Self
1340    {
1341        let inner = self.inner();
1342
1343        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1344        let mut new_flags = current_flags.clone();
1345
1346        new_flags.base().unwrap();
1347
1348        let backoff = Backoff::new();
1349        let mut parked = false;
1350
1351        loop
1352        {
1353            let res = 
1354                inner
1355                    .flags
1356                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1357
1358            if let Ok(_) = res
1359            {
1360                return Self(self.0);
1361            }
1362
1363            current_flags = res.err().unwrap().into();
1364            new_flags = current_flags.clone();
1365
1366            new_flags.base().unwrap();
1367
1368            if backoff.is_completed() == false
1369            {
1370                backoff.snooze();
1371            }
1372            else
1373            {
1374                if parked == false
1375                {
1376                    // last attempt
1377                    std::thread::park_timeout(Duration::from_millis(1));
1378
1379                    parked = true;
1380                }
1381                else
1382                {
1383                    panic!("can not obtain a clone of RBuffer!");
1384                }
1385            }
1386        }
1387    }
1388}
1389
1390impl TryClone for RwBuffer
1391{
1392    type Error = RwBufferError;
1393
1394    /// Attempts to clone the [RwBuffer] incrementing the `base` reference.
1395    /// 
1396    /// # Returns 
1397    /// 
1398    /// Returns the new [Result] where on success a clone of [RBuffer] instance is
1399    /// returned, otherwise the:
1400    /// 
1401    /// * [RwBufferError::BaseTryAgainLater] - is returned if it failed to acquire the base clone
1402    ///     in reasonable time.
1403    /// 
1404    /// * [RwBufferError::TooManyBase] - is returned if limit was reached.
1405    fn try_clone(&self) -> Result<Self, Self::Error> 
1406    {
1407        let inner = self.inner();
1408
1409        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
1410        let mut new_flags = current_flags.clone();
1411
1412        new_flags.base()?;
1413
1414        let backoff = Backoff::new();
1415
1416        while backoff.is_completed() == false
1417        {
1418            let res = 
1419                inner
1420                    .flags
1421                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1422
1423            if let Ok(_) = res
1424            {
1425                return Ok(Self(self.0));
1426            }
1427
1428            current_flags = res.err().unwrap().into();
1429            new_flags = current_flags.clone();
1430
1431            new_flags.base()?;
1432
1433            backoff.snooze();
1434        }
1435
1436        return Err(RwBufferError::BaseTryAgainLater);
1437    }
1438}
1439
1440impl Drop for RwBuffer
1441{
1442    /// Drops the RwBuffer instance. In case if there is no readers and
1443    /// writers, then drop immidiatly the inner data.
1444    /// In case if there is any readers or writing, then drop only wrapper which
1445    /// is the zero reader.
1446    fn drop(&mut self)
1447    {
1448        let inner = self.inner();
1449
1450        let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
1451        let mut new_flags = current_flags.clone();
1452
1453        new_flags.unbase();
1454
1455        let backoff = Backoff::new();
1456
1457        for _ in 0..1000
1458        {
1459            let res = 
1460                inner
1461                    .flags
1462                    .compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
1463
1464            if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
1465            {
1466                if flags.is_drop_inplace() == true
1467                {
1468                    // call descrutor
1469                    unsafe { ptr::drop_in_place(self.0.as_ptr()) };
1470                }
1471
1472                return;
1473            }
1474
1475            current_flags = res.err().unwrap().into();
1476            new_flags = current_flags.clone();
1477
1478            new_flags.unbase();
1479
1480            backoff.snooze();
1481        }
1482
1483        // todo... solve this situation somehow
1484        panic!("assertion trap: RwBuffer::drop can not drop RwBuffer in reasonable time!");
1485    }
1486}
1487
1488/// An instance which controls the allocation of the new buffers or
1489/// reusage of already created and free instances. This instance is
1490/// not thread safe. The external mutex should be used.
1491#[derive(Debug)]
1492pub struct RwBuffers
1493{
1494    /// A buffer length in bytes. Not aligned.
1495    buf_len: usize,
1496
1497    /// A maximum slots for new buffers.
1498    bufs_cnt_lim: usize,
1499
1500    /// A list of buffers.
1501    buffs: VecDeque<RwBuffer>
1502
1503}
1504
1505impl RwBuffers
1506{
1507    /// Creates new instance wshich holds the base reference in the 
1508    /// inner storage with the capacity bounds.
1509    /// 
1510    /// # Arguments
1511    /// 
1512    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
1513    ///     the payload is located.
1514    /// 
1515    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
1516    /// 
1517    /// * `bufs_cnt_lim` - a maximum amount of the available slots. Determines the 
1518    ///     capacity bounds.
1519    /// 
1520    /// # Returns
1521    /// 
1522    /// A [Result] in form of [RwBufferRes] is returned with:
1523    /// 
1524    /// * [Result::Ok] with the [RwBuffers] instance
1525    /// 
1526    /// * [Result::Err] with error type is returned:
1527    /// 
1528    /// - [RwBufferError::InvalidArguments] is returned when the arguments are
1529    ///     incorrect. 
1530    pub
1531    fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
1532    {
1533        if pre_init_cnt > bufs_cnt_lim
1534        {
1535            return Err(RwBufferError::InvalidArguments);
1536        }
1537        else if buf_len == 0
1538        {
1539            return Err(RwBufferError::InvalidArguments);
1540        }
1541
1542        let buffs: VecDeque<RwBuffer> = 
1543            if pre_init_cnt > 0
1544            {
1545                let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
1546
1547                for _ in 0..pre_init_cnt
1548                {
1549                    buffs.push_back(RwBuffer::new(buf_len));
1550                }
1551
1552                buffs
1553            }
1554            else
1555            {
1556                VecDeque::with_capacity(bufs_cnt_lim)
1557            };
1558
1559        return Ok(
1560            Self
1561            {
1562                buf_len: buf_len,
1563                bufs_cnt_lim: bufs_cnt_lim,
1564                buffs: buffs,
1565            }
1566        )
1567    }
1568
1569    /// Same as `new` but without any limits. Unbounded storage.
1570    /// 
1571    /// # Arguments
1572    /// 
1573    /// * `buf_len` - a [usize] length of each buffer instance in bytes where
1574    ///     the payload is located.
1575    /// 
1576    /// * `pre_init_cnt` - a [usize] an initial pre allocated slots with created instances.
1577    /// 
1578    /// # Returns
1579    /// 
1580    /// Returns the instance.
1581    pub
1582    fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
1583    {
1584        let mut buffs = VecDeque::with_capacity(pre_init_cnt);
1585
1586        for _ in 0..pre_init_cnt
1587        {
1588            buffs.push_back(RwBuffer::new(buf_len));
1589        }
1590
1591        return
1592            Self
1593            {
1594                buf_len: buf_len,
1595                bufs_cnt_lim: 0,
1596                buffs: buffs,
1597            };
1598    }
1599
1600    /// Allocates either a new buffer or reuse the free. If the instance
1601    /// is created with bounds then in case if no free slots available
1602    /// returns error.
1603    /// 
1604    /// # Returns
1605    /// 
1606    /// A [Result] in form of [RwBufferRes] is returned with:
1607    /// 
1608    /// * [Result::Ok] with the [RwBuffers] instance
1609    /// 
1610    /// * [Result::Err] with error codes:
1611    ///  
1612    /// 
1613    /// * [RwBufferError::OutOfBuffers] - if limit was reached.
1614    /// 
1615    /// * [RwBufferError::TooManyBase] - should not appear, but if would
1616    ///     it means that there is a bug somewhere in the code.
1617    pub
1618    fn allocate(&mut self) -> RwBufferRes<RwBuffer>
1619    {
1620        // check the list if any available
1621        for buf in self.buffs.iter()
1622        {
1623            if let Ok(rwbuf) = buf.acqiure_if_free()
1624            {
1625                return Ok(rwbuf);
1626            }
1627        }
1628
1629        if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
1630        {
1631            let buf = RwBuffer::new(self.buf_len);
1632            let c_buf = buf.clone_single()?;
1633
1634            self.buffs.push_back(buf);
1635
1636            return Ok(c_buf);
1637        }
1638
1639        return Err(RwBufferError::OutOfBuffers);
1640    }
1641
1642    /// Allocates a buffer "in place" i.e finds the next allocated but unused
1643    /// buffer and removes it from the list or alloactes new buffer without
1644    /// adding it to the list. Should never return error.
1645    /// 
1646    /// # Returns
1647    /// 
1648    /// A [RwBuffer] is returned.
1649    pub
1650    fn allocate_in_place(&mut self) -> RwBuffer
1651    {
1652        let mut idx = Option::None;
1653
1654        for (i, item) in self.buffs.iter().enumerate()
1655        {
1656            if let Ok(_) = self.buffs[i].acqiure_if_free()
1657            {
1658                idx = Some(i);
1659
1660                break;
1661            }
1662        }
1663
1664        return 
1665            idx
1666                .map_or(
1667                    RwBuffer::new(self.buf_len), 
1668                    |f| self.buffs.remove(f).unwrap()
1669                );
1670
1671    }
1672
1673    /// Retains the buffer list by removing any unused buffers as many times
1674    /// as set in the argument `cnt`. It does not guaranty than the selected 
1675    /// amount will be freed.
1676    /// 
1677    /// # Arguments
1678    /// 
1679    /// * `cnt` - how many slots to clean before exit.
1680    /// 
1681    /// # Returns 
1682    /// 
1683    /// A [usize] is returned which indicates how many instances was removed
1684    /// before the `cnt` was reached. 
1685    pub
1686    fn compact(&mut self, mut cnt: usize) -> usize
1687    {
1688        let p_cnt = cnt;
1689
1690        self
1691            .buffs
1692            .retain(
1693                |buf|
1694                {
1695                    if buf.is_free() == true
1696                    {
1697                        cnt -= 1;
1698
1699                        return false;
1700                    }
1701
1702                    return true;
1703                }
1704            );
1705
1706        return p_cnt - cnt;
1707    }
1708
1709    #[cfg(test)]
1710    fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags<RwBuffer>>
1711    {
1712        return Some(self.buffs.get(index)?.get_flags());
1713    }
1714}
1715
1716#[cfg(feature = "std")]
1717#[cfg(test)]
1718mod tests;