cura/
lib.rs

1#![warn(missing_docs)]
2//! An attempt at creating an Arc-RwLock combination that is straightforward
3//! to use and no hassle , instead of worrying about being fast and lean. 
4//!
5//! * cloning referefences works like Arc
6//! * made for sharing objects between threads without worry
7//! * locking things works like RwLock with write() or read()
8//! * it spins a few times and then queues if a lock is not obtained
9//! * miri seems to be happy , so i trust it doesnt leak too much memory etc.
10//! * requires that everything you stick into it is Send+Sync
11//! * no need to constantly .unwrap() things instead it will just
12//!   block forever or blow up
13//!
14//! # Example
15//! ```
16//! use cura::Cura;
17//! trait Foo:Send+Sync
18//! {
19//!     fn get(&self)->i32;
20//!     fn set(&mut self,i:i32);
21//! }
22//! #[derive(Debug)]
23//! struct FF{i:i32,};
24//! struct BB{i:i32};
25//!
26//! impl Foo for FF{
27//!     fn get(&self)->i32{return self.i;}
28//!     fn set(&mut self,i:i32){self.i=i;}
29//! }
30//!
31//! impl Foo for BB{
32//!     fn get(&self)->i32{return self.i;}
33//!     fn set(&mut self,i:i32){self.i=i;}
34//! }
35//!
36//! let t=FF{i:1};
37//!
38//! // you can do straight "from_box" but currently its impossible to
39//! // "alter" unsized types
40//! let foo2:Cura<dyn Foo>=Cura::from_box(Box::new(FF{i:2}));
41//! let foo:Cura<Box<dyn Foo>>=Cura::new(Box::new(t));
42//! let a=foo.clone();
43//! let b=foo.clone();
44//!
45//! {
46//!     assert_eq!(a.read().get(),1);
47//!     {
48//!         a.alter(|s|{
49//!             s.set(2);
50//!             Some(())
51//!         });
52//!     }
53//!     {
54//!         a.alter(|s|{ //this only works for Sized types
55//!             *s=Box::new(BB{i:2});
56//!             Some(())
57//!         });
58//!     }
59//!     let lock=a.read();
60//!     let v=lock;
61//!     assert_eq!(v.get(),2)
62//! }//lock dropped here
63//! {
64//!     (*b.write()).set(3); //lock dropped here i think 
65//! }
66//!
67//! assert_eq!((*a.read()).get(),3);
68//!
69//! ```
70use std::ops::{Deref,DerefMut};
71use std::ptr::NonNull;
72use std::sync::atomic::{AtomicUsize,AtomicI32,AtomicU32};
73use std::sync::atomic::Ordering::{Acquire, Relaxed, Release,SeqCst};
74use std::cell::UnsafeCell;
75use std::thread::Thread;
76use std::time::{SystemTime,UNIX_EPOCH};
77use std::marker::PhantomData;
78const LOCKED:i32=-999;
79const FREE:i32=0;
80const LOCKQUEUE:u32=u32::MAX/2;
81
82/// a sort of an Arc that will both readwrite lock , be easy to
83/// handle and is cloneable
84/// ```
85/// use cura::Cura;
86/// let s=Cura::new(1);
87/// let a=s.clone();
88///
89/// ```
90pub struct Cura<T: Sync + Send +?Sized> {
91    ptr: NonNull<CuraData<T>>,
92    phantom:PhantomData<CuraData<T>>,
93    //dummy:i32,
94}
95struct CuraData<T: Sync + Send+?Sized> {
96    data: UnsafeCell<Box<T>>,
97    queuedata:UnsafeCell<QueueData>,
98    count: AtomicUsize,
99    lockcount:AtomicI32, //-999=writeĺock,0=free,>0 readlock count
100    queuecount:AtomicU32, // number of threads,
101}
102struct QueueData
103{
104    queue:*mut QueueLink,
105    endqueue:*mut QueueLink,
106}
107impl QueueData
108{
109    ///
110    /// queue stuff into end of queue
111    ///
112    fn enqueue(&mut self,t:LockType)
113    {
114        let link=Box::leak(Box::new(QueueLink::new(t)));
115        let next=self.endqueue;
116        if next.is_null()
117        {
118            self.queue=link;
119        }else{
120            unsafe{(*next).next=link;}
121        }
122        self.endqueue=link;
123    }
124    fn dequeue(&mut self)
125    {
126        //  dequeue
127        let me=self.queue;
128        unsafe{self.queue=(*self.queue).next;}
129        //  if we were the last
130        if me==self.endqueue
131        {
132            self.endqueue=std::ptr::null_mut();
133        }
134        unsafe {
135            drop(Box::from_raw(me));
136        }
137    }
138}
139#[derive(PartialEq)]
140enum LockType
141{
142    Read,
143    Write,
144}
145struct QueueLink
146{
147    thread:Thread,
148    lock:LockType,
149    next:*mut QueueLink,
150}
151impl QueueLink
152{
153    fn new(l:LockType)->QueueLink
154    {
155        QueueLink{
156            thread:std::thread::current(),
157            lock:l,
158            next:std::ptr::null_mut(),
159        }
160    }
161    /*
162        useful methods
163    */
164}
165///
166/// Cura public interface
167///
168impl <T: Sync + Send> Cura<T> {
169    ///
170    /// constructor for a Cura 
171    /// ```
172    ///     use cura::Cura;
173    ///     let t=1;
174    ///     let foo=Cura::new(t); //instead of Arc::new(Mutex::new(t));
175    /// ```
176    pub fn new(t: T) -> Cura<T> {
177        Self::from_box(Box::new(t))
178    }
179}
180///
181/// Cura public interface
182///
183impl<T:  Sync + Send + ?Sized> Cura<T> {
184    ///
185    /// convert from box<T> to Cura<T>
186    ///
187    pub fn from_box(v: Box<T>) -> Cura<T> {
188        let queuedata=UnsafeCell::new(QueueData{
189                queue:std::ptr::null_mut(),
190                endqueue:std::ptr::null_mut(),
191            });
192        Cura {
193            ptr: NonNull::from(Box::leak(Box::new(CuraData {
194                count: AtomicUsize::new(1),
195                data: UnsafeCell::new(v),
196                lockcount:AtomicI32::new(0),
197                queuecount:AtomicU32::new(0), //
198                queuedata:queuedata,
199            }))),
200            phantom:PhantomData,
201            //dummy:0,
202        }
203    }
204    ///
205    /// readlock a 'Cura',returning a guard that can be
206    /// dereferenced for read-only operations
207    ///
208    pub fn read(&self)->ReadGuard<T>
209    {
210        //TBD think through these memory orderings
211
212        //  how many times have we looped here...
213        let mut loops=0;
214        loop{
215            let lock=self.data().lockcount.fetch_update(
216                                        SeqCst,
217                                        SeqCst,
218                                        |x|{
219                                            if x>=0{
220                                                Some(x+1)
221                                            }else{
222                                                None
223                                            }
224                                        });
225            match lock {
226                Err(_)=>{/*   its probably writelocked,so we will spin*/
227                    if loops>3 || self.queue_size()>0
228                    {
229                        self.enqueue(LockType::Read);
230                        loops=0;
231                    }else{
232                        loops+=1;
233                        std::hint::spin_loop();
234                    }
235                },
236                Ok(_x)=>{/*    x readers,including us*/
237                    //  let everyone else in from the queue
238                    if self.queue_size()>0
239                    {
240                        self.wakereader();
241                    }
242                    break;
243                },
244            }
245        }
246        ReadGuard{
247            cura:self,
248        }
249    }
250    ///
251    /// writelock a 'Cura' , returning a guard that can be
252    /// dereferenced for write-operations.
253    ///
254    pub fn write(&self)->Guard<T>
255    {
256        //TBD think through these memory orderings
257        let mut loops=0;
258        loop{
259            let lock=self.data().lockcount.fetch_update(
260                                        SeqCst,
261                                        SeqCst,
262                                        |x|{
263                                            if x==FREE{
264                                                Some(LOCKED)
265                                            }else{
266                                                None
267                                            }
268                                        });
269            match lock {
270                Err(_)=>{/*   its write/readlocked,so we will spin*/
271                    if loops>3 || self.queue_size()>0
272                    {
273                        self.enqueue(LockType::Write);
274                        loops=0;
275                    }else{
276                        loops+=1;
277                        std::hint::spin_loop();
278                    }
279                },
280                Ok(_x)=>{/*    should be just us , writing*/
281                    break;
282                },
283            }
284        }
285        Guard{
286            cura:self,
287        }
288    }
289    ///
290    /// transparently take a writelock, attempt to mutate the value
291    /// and then release the lock
292    /// ```
293    /// use cura::Cura;
294    /// let t=Cura::new(1);
295    /// let res=t.alter(|x|{ //this is a mutable ref, can be altered that way
296    ///     if(*x==1){
297    ///         *x=2;
298    ///         Some(()) //signal alteration
299    ///     }else{
300    ///         None  //signal not altered
301    ///     }
302    ///  });
303    /// match res {
304    ///     None=>{/* no change made*/},
305    ///     Some(_)=>{/*change made*/},
306    /// }
307    ///
308    /// ```
309    pub fn alter(&self,f:fn(&mut T)->Option<()>)->Option<()>
310    {
311        let mut lock=self.write(); //lock
312        let v=f(&mut *lock);
313        match v{
314            None=>{None},
315            Some(_)=>{
316                Some(())
317            },
318        }
319    }
320    //TBD method to swap values with options
321}
322///
323/// cura private stuff
324///
325impl<T:  Sync + Send + ?Sized> Cura<T> {
326    ///
327    /// util to get accesss to curadata
328    ///
329    fn data(&self) -> &CuraData<T> {
330        unsafe { self.ptr.as_ref() }
331    }
332    ///
333    /// util to get access to the internal queuedata
334    ///
335    fn get_queuedata(&self) -> *mut QueueData
336    {
337        self.data().queuedata.get()
338    }
339    /*
340    ///
341    /// compare queue count to LOCḰQUEUE to see if it is already
342    /// locked
343    ///
344    fn queue_locked(&self)->bool{
345        self.data().queuecount.load(Acquire)>=LOCKQUEUE
346    }*/
347    ///
348    /// spin until we can acquire a lock on queue by incrementing
349    /// it with LOCKQUEUE
350    ///
351    fn lock_queue(&self)
352    {
353        loop{
354            let lock=self.data().queuecount.fetch_update(
355                                        SeqCst,
356                                        SeqCst,
357                                        |x|{
358                                            if x<LOCKQUEUE{
359                                                Some(x+LOCKQUEUE)
360                                            }else{
361                                                None
362                                            }
363                                        });
364            match lock {
365                Err(_)=>{
366                    /*  it is already locked, so we spin*/
367                    std::hint::spin_loop();
368                },
369                Ok(_x)=>{
370                    /*  locked successfully*/
371                    break;
372                },
373            }
374
375        }
376    }
377    ///
378    /// check that queue is locked and unlock it by decrementing
379    /// by LOCKQUEUE
380    ///
381    fn unlock_queue(&self)
382    {
383        let _lock=self.data().queuecount.fetch_update(
384                                    SeqCst,
385                                    SeqCst,
386                                    |x|{
387                                        if x<LOCKQUEUE {
388                                            panic!("trying to unlock nonlocked queue");
389                                        }else{
390                                            Some(x-LOCKQUEUE)
391                                        }
392                                    });
393    }
394    ///
395    /// lock queue and insert ourselves to it and park
396    /// waiting for the time in the future when we are
397    /// unparked as the first in the queue
398    ///
399    fn enqueue(&self,t:LockType){
400
401        //  lock and increment queue size
402        self.lock_queue();
403        self.inc_queue();
404
405        //  insert ourselves into queue
406        unsafe{
407            (*self.get_queuedata()).enqueue(t);
408        }
409        //  unlock queue for others to modify and see
410        self.unlock_queue();
411
412        //  and park, ready to spin on return
413        loop{
414            std::thread::park();
415            self.lock_queue();
416            let amfirst=unsafe{
417                    (*(*self.get_queuedata()).queue).thread.id()==std::thread::current().id()
418                };
419            if amfirst
420            {
421                unsafe{
422                    (*self.get_queuedata()).dequeue();
423                }
424                self.dec_queue();
425                self.unlock_queue();
426                break;
427            }else{
428                self.wakenext();
429                self.unlock_queue();
430            }
431        }
432    }
433    ///
434    /// increment number of threads blocked in queue
435    ///
436    fn inc_queue(&self)
437    {
438        self.data().queuecount.fetch_add(
439                                    1,
440                                    SeqCst);
441    }
442    ///
443    /// decrement number of threads blocked in queue
444    ///
445    fn dec_queue(&self)
446    {
447        self.data().queuecount.fetch_sub(1,SeqCst);
448    }
449    ///
450    /// find out approximate size of queue
451    ///
452    fn queue_size(&self)->u32
453    {
454        self.data().queuecount.load(Acquire)
455    }
456    ///
457    /// assumes queue is already locked by us 
458    ///
459    fn wakenext(&self)
460    {
461        unsafe{
462            if !(*self.get_queuedata()).queue.is_null()
463            {
464                (*(*self.get_queuedata()).queue).thread.unpark();
465            }
466        }
467    }
468    ///
469    /// wake reader  in front of queue
470    ///
471    fn wakereader(&self)
472    {
473        self.lock_queue();
474        unsafe{
475            let qdata=self.get_queuedata();
476            if !(*qdata).queue.is_null()
477            {
478                if (*(*qdata).queue).lock==LockType::Read
479                {
480                    (*(*qdata).queue).thread.unpark();
481                }
482            }
483        }
484        self.unlock_queue();
485    }
486    ///
487    /// release write lock
488    ///
489    fn unwritelock(&self)
490    {
491        self.lock_queue();
492        self.wakenext();
493        self.unlock_queue();
494        let lock=self.data().lockcount.compare_exchange(
495                                    LOCKED,FREE,SeqCst,SeqCst);
496        match lock {
497            Ok(LOCKED) =>{}, //ok
498            Ok(x)=>panic!("was supposed to be locked but was {}",x),
499            Err(x)=>panic!("was supposed to be locked but was {}",x),
500        }
501    }
502    ///
503    /// decrement number of readlocks held
504    ///
505    fn unreadlock(&self)
506    {
507        let lock=self.data().lockcount.fetch_sub(1,SeqCst);
508        if lock<1
509        {
510            panic!("was supposed to be readlocked but was {}",1);
511        }
512        self.lock_queue();
513        self.wakenext();
514        self.unlock_queue();
515    }
516}
517
518/**
519 *  implement send and sync since thats all we want
520 */
521unsafe impl<T:  Send + Sync + ?Sized> Send for Cura<T> {}
522unsafe impl<T:  Send + Sync + ?Sized> Sync for Cura<T> {}
523
524/**
525 *  deref to make use simpler, this should also transparently
526 *  read-lock
527 */
528//TBD  feed this to chatgpt
529/*
530impl<T:Sync+Send> Deref for Cura<T>
531{
532    type Target = ReadGuard<T>;
533    //TBD this should probably return a reference to readguard?
534    fn deref(&self) -> &Self::Target {
535        todo!("this deref should actually do a 'read()' ");
536        //&self.data().data
537        &self.read()
538    }
539}*/
540/**
541 *  clone to make new references of the object
542 */
543impl<T:  Sync + Send +?Sized> Clone for Cura<T> {
544    fn clone(&self) -> Self {
545        self.data().count.fetch_add(1, Relaxed);
546        Cura {
547            ptr: self.ptr,
548            phantom:PhantomData,
549            //dummy:0,
550            }
551    }
552}
553/**
554 *  drop to clean up references
555 */
556impl<T:  Sync + Send + ?Sized> Drop for Cura<T> {
557    fn drop(&mut self) {
558        if self.data().count.fetch_sub(1, Release) == 1 {
559            unsafe {
560                drop(Box::from_raw(self.ptr.as_ptr()));
561            }
562        }
563    }
564}
565/**********************************************************
566 *  guards
567 */
568///
569/// writeguard for Cura
570///
571#[must_use = "if unused the Lock will immediately unlock"]
572#[clippy::has_significant_drop]
573pub struct Guard<'a,T: Send+Sync+?Sized>
574{
575    cura:&'a Cura<T>,
576}
577impl<T:Send+Sync+?Sized> Drop for Guard<'_,T>
578{
579    fn drop(&mut self) {
580        self.cura.unwritelock(); //TBD no need to do anything else?
581    }
582}
583impl<T: Sync + Send+?Sized> Deref for Guard<'_,T> {
584    type Target = T;
585    fn deref(&self) -> &T { //TBD reference lifetime?
586        unsafe{
587            &*self.cura.data().data.get()
588        }
589    }
590}
591impl<T: Sync + Send + ?Sized> DerefMut for Guard<'_,T> {
592    fn deref_mut(&mut self) -> &mut T { //TBD reference lifetime?
593        unsafe {
594            &mut *self.cura.data().data.get()
595        }
596    }
597}
598
599
600/**
601 *  readguard for Cura
602 */
603#[must_use = "if unused the Lock will immediately unlock"]
604#[clippy::has_significant_drop]
605pub struct ReadGuard<'a,T:Send+Sync+?Sized>
606{
607    cura:&'a Cura<T>,
608}
609impl<T:Send+Sync+?Sized> Drop for ReadGuard<'_,T>
610{
611    fn drop(&mut self) {
612        self.cura.unreadlock(); //TBD nothing else?
613    }
614}
615impl<T: Sync + Send + ?Sized> Deref for ReadGuard<'_,T> {
616    type Target = T;
617    fn deref(&self) -> &T {
618        unsafe{
619            &*self.cura.data().data.get()
620        }
621    }
622}
623///
624/// util to get current time in millis for testing
625///
626fn current_time()->u128{
627    SystemTime::now().
628        duration_since(UNIX_EPOCH).
629        expect("weird shit happened").
630        as_millis()
631}
632///
633/// util to sĺeep for a few millis
634///
635fn sleep(millis:u32){
636    std::thread::sleep(std::time::Duration::from_millis(millis.into()));
637}
638
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    #[test]
644    fn basic_usecases() {
645
646        /*  calculator for hits to "testing"*/
647        static NUM_HITS: AtomicUsize = AtomicUsize::new(0);
648        /*  testing struct to act as the value*/
649        struct Foo {
650            id:u16,
651        }
652        impl Foo {
653            pub fn new(id:u16) -> Foo {
654                Foo {
655                    id:id,
656                }
657            }
658            pub fn testing(&self) {
659                println!("works {}",self.id);
660                NUM_HITS.fetch_add(self.id.into(), SeqCst);
661            }
662        }
663        /*  create ref and hit the test method*/
664        let x = Cura::new(Foo::new(1));
665        let y = x.clone();
666        {
667            x.read().testing();
668        }
669        /*  take a writelock just for fun*/
670        {
671            let mut w=x.write();
672            *w=Foo::new(10);
673            w.testing();
674        }
675        y.read().testing(); //TBD convert this to work with deref
676        assert_eq!(21, NUM_HITS.load(Acquire));
677    }
678    #[test]
679    fn advanced_usecases()
680    {
681        static STATE:AtomicUsize=AtomicUsize::new(0);
682
683        struct Foo{
684            id:u16,
685        }
686        impl Foo {
687            pub fn new(id:u16)->Foo{
688                Foo{id:id}
689            }
690            pub fn testing(&mut self,id:u16) {
691                self.id=id;
692            }
693            pub fn id(&self)->u16
694            {
695                self.id
696            }
697        }
698        /*  create ref*/
699        let x=Cura::new(Foo::new(1));
700        let y=x.clone();
701        /*  writelock one ref*/
702        let w=y.write();
703        //let start=current_time();
704        /*  start creating write and read threads to block*/
705        let mut threads=Vec::new();
706        for i in 0..30 {
707            let c=x.clone();
708            let write= i%5==0;
709            let i=i;
710            let t = std::thread::spawn(move || {
711                if write {
712                    let c=c.clone();
713                    loop{
714                        println!("loop {} {}",i,write);
715                        let _foo=c.write();
716                        STATE.fetch_add(1,SeqCst);
717                        //block and loop until STATE
718                        if STATE.load(SeqCst)>=(i+1)
719                        {
720                            break;
721                        }
722                    }
723                }else{
724                    let c=c.clone();
725                    loop{
726                        println!("loop {} {}",i,write);
727                        let _foo=c.read();
728                        STATE.fetch_add(1,SeqCst);
729                        //TBD loop here until STATE>=i
730                        if STATE.load(SeqCst)>=(i+1)
731                        {
732                            break;
733                        }
734                    }
735                }
736            });
737            threads.push(t);
738        }
739        sleep(1000);
740        drop(w);
741        while let Some(t) = threads.pop()
742        {
743            t.join().unwrap();
744            println!("joined");
745        }
746
747        //let end=current_time();
748        //println!("took:{}",(end-start));
749    }
750    #[test]
751    fn sized()
752    {
753        use std::sync::Arc;
754        fn test<T>(t:T)
755        {
756            println!("got t");
757        }
758        #[derive(Clone,Copy)]
759        enum EE
760        {
761            Bing,
762            Bong,
763        }
764        trait Foo:Send+Sync
765        {
766            fn get(&self)->EE
767            {
768                return EE::Bing;
769            }
770        }
771        struct FF;
772        impl Foo for FF{}
773
774        let t:Cura<dyn Foo>=Cura::from_box(Box::new(FF{}));
775        test(t);
776        let tt:Cura<dyn Foo>=Cura::from_box(Box::new(FF{}));
777        struct Bar<T:Sync+Send+?Sized>
778        {
779            tt:Cura<T>,
780        }
781        let f=Bar{tt:tt};
782        let _ = std::mem::size_of::<Cura<dyn Foo>>();
783        let _ = std::mem::size_of::<Arc<Arc<dyn Foo>>>();
784        let _ = std::mem::size_of::<Cura<Cura<dyn Foo>>>();
785        let _ = std::mem::size_of::<Bar<Cura<dyn Foo>>>();
786    }
787    #[test]
788    fn alter_works()
789    {
790        let t=Cura::new(3);
791        t.alter(|x|{
792            if *x==2{
793                *x=3;
794                Some(())
795            }else{
796                None
797            }
798        });
799
800        t.alter(|x|{
801            if *x==3{
802                *x=4;
803                Some(())
804            }else{
805                None
806            }
807        });
808
809    }
810    #[test]
811    fn loop_a_lot()
812    {
813        #[derive(Clone,Copy)]
814        enum Foo
815        {
816            Bing,
817            Bong,
818        }
819        let s=Cura::new(Foo::Bing);
820        let mut i=2000;
821        while i>0
822        {
823            match {*s.read()}.clone() {
824                Foo::Bing=>{
825                },
826                Foo::Bong=>{
827                },
828            }
829            i=i-1;
830        }
831    }
832    #[test]
833    fn loop_a_lot_box()
834    {
835        #[derive(Clone,Copy)]
836        enum EE
837        {
838            Bing,
839            Bong,
840        }
841        trait Foo:Send+Sync
842        {
843            fn get(&self)->EE
844            {
845                return EE::Bing;
846            }
847        }
848        struct FF;
849        impl Foo for FF{};
850        let t=Box::new(FF{});
851        let s:Cura<dyn Foo>=Cura::from_box(t);
852        let mut i=2000;
853        while i>0
854        {
855            match {s.read()}.get().clone() {
856                EE::Bing=>{
857                },
858                EE::Bong=>{
859                    panic!("never here");
860                },
861            }
862            i=i-1;
863        }
864    }
865    #[test]
866    fn it_works() {
867        static DROPS: AtomicUsize = AtomicUsize::new(0);
868
869        struct Dropped;
870
871        impl Drop for Dropped {
872            fn drop(&mut self) {
873                DROPS.fetch_add(1, Relaxed);
874            }
875        }
876
877        /*  create two testobjects and keep track of dropping*/
878        /*  by including the drop-detector into a tuple*/
879        let x = Cura::new(("salve!", Dropped));
880        let y = x.clone();
881
882        /*  push to another thread, see it works there*/
883        let t = std::thread::spawn(move || {
884            assert_eq!(x.read().0, "salve!");//TBD conver to deref
885        });
886
887        /*  and still works here*/
888        assert_eq!(y.read().0, "salve!"); //TBD convert to deref
889
890        /*  wait for the thread*/
891        t.join().unwrap();
892
893        /*  object shouldnt have dropped yet*/
894        assert_eq!(DROPS.load(Relaxed), 0);
895
896        /*  and we drop the last reference here , so it should drop*/
897        drop(y);
898
899        /*  and check the result.*/
900        assert_eq!(DROPS.load(Relaxed), 1);
901    }
902}