cura/lib.rs
1#![warn(missing_docs)]
2//! An attempt at creating an Arc-RwLock combination that is straightforward
3//! to use and no hassle , instead of worrying about being fast and lean.
4//!
5//! * cloning referefences works like Arc
6//! * made for sharing objects between threads without worry
7//! * locking things works like RwLock with write() or read()
8//! * it spins a few times and then queues if a lock is not obtained
9//! * miri seems to be happy , so i trust it doesnt leak too much memory etc.
10//! * requires that everything you stick into it is Send+Sync
11//! * no need to constantly .unwrap() things instead it will just
12//! block forever or blow up
13//!
14//! # Example
15//! ```
16//! use cura::Cura;
17//! trait Foo:Send+Sync
18//! {
19//! fn get(&self)->i32;
20//! fn set(&mut self,i:i32);
21//! }
22//! #[derive(Debug)]
23//! struct FF{i:i32,};
24//! struct BB{i:i32};
25//!
26//! impl Foo for FF{
27//! fn get(&self)->i32{return self.i;}
28//! fn set(&mut self,i:i32){self.i=i;}
29//! }
30//!
31//! impl Foo for BB{
32//! fn get(&self)->i32{return self.i;}
33//! fn set(&mut self,i:i32){self.i=i;}
34//! }
35//!
36//! let t=FF{i:1};
37//!
38//! // you can do straight "from_box" but currently its impossible to
39//! // "alter" unsized types
40//! let foo2:Cura<dyn Foo>=Cura::from_box(Box::new(FF{i:2}));
41//! let foo:Cura<Box<dyn Foo>>=Cura::new(Box::new(t));
42//! let a=foo.clone();
43//! let b=foo.clone();
44//!
45//! {
46//! assert_eq!(a.read().get(),1);
47//! {
48//! a.alter(|s|{
49//! s.set(2);
50//! Some(())
51//! });
52//! }
53//! {
54//! a.alter(|s|{ //this only works for Sized types
55//! *s=Box::new(BB{i:2});
56//! Some(())
57//! });
58//! }
59//! let lock=a.read();
60//! let v=lock;
61//! assert_eq!(v.get(),2)
62//! }//lock dropped here
63//! {
64//! (*b.write()).set(3); //lock dropped here i think
65//! }
66//!
67//! assert_eq!((*a.read()).get(),3);
68//!
69//! ```
70use std::ops::{Deref,DerefMut};
71use std::ptr::NonNull;
72use std::sync::atomic::{AtomicUsize,AtomicI32,AtomicU32};
73use std::sync::atomic::Ordering::{Acquire, Relaxed, Release,SeqCst};
74use std::cell::UnsafeCell;
75use std::thread::Thread;
76use std::time::{SystemTime,UNIX_EPOCH};
77use std::marker::PhantomData;
78const LOCKED:i32=-999;
79const FREE:i32=0;
80const LOCKQUEUE:u32=u32::MAX/2;
81
82/// a sort of an Arc that will both readwrite lock , be easy to
83/// handle and is cloneable
84/// ```
85/// use cura::Cura;
86/// let s=Cura::new(1);
87/// let a=s.clone();
88///
89/// ```
90pub struct Cura<T: Sync + Send +?Sized> {
91 ptr: NonNull<CuraData<T>>,
92 phantom:PhantomData<CuraData<T>>,
93 //dummy:i32,
94}
95struct CuraData<T: Sync + Send+?Sized> {
96 data: UnsafeCell<Box<T>>,
97 queuedata:UnsafeCell<QueueData>,
98 count: AtomicUsize,
99 lockcount:AtomicI32, //-999=writeĺock,0=free,>0 readlock count
100 queuecount:AtomicU32, // number of threads,
101}
102struct QueueData
103{
104 queue:*mut QueueLink,
105 endqueue:*mut QueueLink,
106}
107impl QueueData
108{
109 ///
110 /// queue stuff into end of queue
111 ///
112 fn enqueue(&mut self,t:LockType)
113 {
114 let link=Box::leak(Box::new(QueueLink::new(t)));
115 let next=self.endqueue;
116 if next.is_null()
117 {
118 self.queue=link;
119 }else{
120 unsafe{(*next).next=link;}
121 }
122 self.endqueue=link;
123 }
124 fn dequeue(&mut self)
125 {
126 // dequeue
127 let me=self.queue;
128 unsafe{self.queue=(*self.queue).next;}
129 // if we were the last
130 if me==self.endqueue
131 {
132 self.endqueue=std::ptr::null_mut();
133 }
134 unsafe {
135 drop(Box::from_raw(me));
136 }
137 }
138}
139#[derive(PartialEq)]
140enum LockType
141{
142 Read,
143 Write,
144}
145struct QueueLink
146{
147 thread:Thread,
148 lock:LockType,
149 next:*mut QueueLink,
150}
151impl QueueLink
152{
153 fn new(l:LockType)->QueueLink
154 {
155 QueueLink{
156 thread:std::thread::current(),
157 lock:l,
158 next:std::ptr::null_mut(),
159 }
160 }
161 /*
162 useful methods
163 */
164}
165///
166/// Cura public interface
167///
168impl <T: Sync + Send> Cura<T> {
169 ///
170 /// constructor for a Cura
171 /// ```
172 /// use cura::Cura;
173 /// let t=1;
174 /// let foo=Cura::new(t); //instead of Arc::new(Mutex::new(t));
175 /// ```
176 pub fn new(t: T) -> Cura<T> {
177 Self::from_box(Box::new(t))
178 }
179}
180///
181/// Cura public interface
182///
183impl<T: Sync + Send + ?Sized> Cura<T> {
184 ///
185 /// convert from box<T> to Cura<T>
186 ///
187 pub fn from_box(v: Box<T>) -> Cura<T> {
188 let queuedata=UnsafeCell::new(QueueData{
189 queue:std::ptr::null_mut(),
190 endqueue:std::ptr::null_mut(),
191 });
192 Cura {
193 ptr: NonNull::from(Box::leak(Box::new(CuraData {
194 count: AtomicUsize::new(1),
195 data: UnsafeCell::new(v),
196 lockcount:AtomicI32::new(0),
197 queuecount:AtomicU32::new(0), //
198 queuedata:queuedata,
199 }))),
200 phantom:PhantomData,
201 //dummy:0,
202 }
203 }
204 ///
205 /// readlock a 'Cura',returning a guard that can be
206 /// dereferenced for read-only operations
207 ///
208 pub fn read(&self)->ReadGuard<T>
209 {
210 //TBD think through these memory orderings
211
212 // how many times have we looped here...
213 let mut loops=0;
214 loop{
215 let lock=self.data().lockcount.fetch_update(
216 SeqCst,
217 SeqCst,
218 |x|{
219 if x>=0{
220 Some(x+1)
221 }else{
222 None
223 }
224 });
225 match lock {
226 Err(_)=>{/* its probably writelocked,so we will spin*/
227 if loops>3 || self.queue_size()>0
228 {
229 self.enqueue(LockType::Read);
230 loops=0;
231 }else{
232 loops+=1;
233 std::hint::spin_loop();
234 }
235 },
236 Ok(_x)=>{/* x readers,including us*/
237 // let everyone else in from the queue
238 if self.queue_size()>0
239 {
240 self.wakereader();
241 }
242 break;
243 },
244 }
245 }
246 ReadGuard{
247 cura:self,
248 }
249 }
250 ///
251 /// writelock a 'Cura' , returning a guard that can be
252 /// dereferenced for write-operations.
253 ///
254 pub fn write(&self)->Guard<T>
255 {
256 //TBD think through these memory orderings
257 let mut loops=0;
258 loop{
259 let lock=self.data().lockcount.fetch_update(
260 SeqCst,
261 SeqCst,
262 |x|{
263 if x==FREE{
264 Some(LOCKED)
265 }else{
266 None
267 }
268 });
269 match lock {
270 Err(_)=>{/* its write/readlocked,so we will spin*/
271 if loops>3 || self.queue_size()>0
272 {
273 self.enqueue(LockType::Write);
274 loops=0;
275 }else{
276 loops+=1;
277 std::hint::spin_loop();
278 }
279 },
280 Ok(_x)=>{/* should be just us , writing*/
281 break;
282 },
283 }
284 }
285 Guard{
286 cura:self,
287 }
288 }
289 ///
290 /// transparently take a writelock, attempt to mutate the value
291 /// and then release the lock
292 /// ```
293 /// use cura::Cura;
294 /// let t=Cura::new(1);
295 /// let res=t.alter(|x|{ //this is a mutable ref, can be altered that way
296 /// if(*x==1){
297 /// *x=2;
298 /// Some(()) //signal alteration
299 /// }else{
300 /// None //signal not altered
301 /// }
302 /// });
303 /// match res {
304 /// None=>{/* no change made*/},
305 /// Some(_)=>{/*change made*/},
306 /// }
307 ///
308 /// ```
309 pub fn alter(&self,f:fn(&mut T)->Option<()>)->Option<()>
310 {
311 let mut lock=self.write(); //lock
312 let v=f(&mut *lock);
313 match v{
314 None=>{None},
315 Some(_)=>{
316 Some(())
317 },
318 }
319 }
320 //TBD method to swap values with options
321}
322///
323/// cura private stuff
324///
325impl<T: Sync + Send + ?Sized> Cura<T> {
326 ///
327 /// util to get accesss to curadata
328 ///
329 fn data(&self) -> &CuraData<T> {
330 unsafe { self.ptr.as_ref() }
331 }
332 ///
333 /// util to get access to the internal queuedata
334 ///
335 fn get_queuedata(&self) -> *mut QueueData
336 {
337 self.data().queuedata.get()
338 }
339 /*
340 ///
341 /// compare queue count to LOCḰQUEUE to see if it is already
342 /// locked
343 ///
344 fn queue_locked(&self)->bool{
345 self.data().queuecount.load(Acquire)>=LOCKQUEUE
346 }*/
347 ///
348 /// spin until we can acquire a lock on queue by incrementing
349 /// it with LOCKQUEUE
350 ///
351 fn lock_queue(&self)
352 {
353 loop{
354 let lock=self.data().queuecount.fetch_update(
355 SeqCst,
356 SeqCst,
357 |x|{
358 if x<LOCKQUEUE{
359 Some(x+LOCKQUEUE)
360 }else{
361 None
362 }
363 });
364 match lock {
365 Err(_)=>{
366 /* it is already locked, so we spin*/
367 std::hint::spin_loop();
368 },
369 Ok(_x)=>{
370 /* locked successfully*/
371 break;
372 },
373 }
374
375 }
376 }
377 ///
378 /// check that queue is locked and unlock it by decrementing
379 /// by LOCKQUEUE
380 ///
381 fn unlock_queue(&self)
382 {
383 let _lock=self.data().queuecount.fetch_update(
384 SeqCst,
385 SeqCst,
386 |x|{
387 if x<LOCKQUEUE {
388 panic!("trying to unlock nonlocked queue");
389 }else{
390 Some(x-LOCKQUEUE)
391 }
392 });
393 }
394 ///
395 /// lock queue and insert ourselves to it and park
396 /// waiting for the time in the future when we are
397 /// unparked as the first in the queue
398 ///
399 fn enqueue(&self,t:LockType){
400
401 // lock and increment queue size
402 self.lock_queue();
403 self.inc_queue();
404
405 // insert ourselves into queue
406 unsafe{
407 (*self.get_queuedata()).enqueue(t);
408 }
409 // unlock queue for others to modify and see
410 self.unlock_queue();
411
412 // and park, ready to spin on return
413 loop{
414 std::thread::park();
415 self.lock_queue();
416 let amfirst=unsafe{
417 (*(*self.get_queuedata()).queue).thread.id()==std::thread::current().id()
418 };
419 if amfirst
420 {
421 unsafe{
422 (*self.get_queuedata()).dequeue();
423 }
424 self.dec_queue();
425 self.unlock_queue();
426 break;
427 }else{
428 self.wakenext();
429 self.unlock_queue();
430 }
431 }
432 }
433 ///
434 /// increment number of threads blocked in queue
435 ///
436 fn inc_queue(&self)
437 {
438 self.data().queuecount.fetch_add(
439 1,
440 SeqCst);
441 }
442 ///
443 /// decrement number of threads blocked in queue
444 ///
445 fn dec_queue(&self)
446 {
447 self.data().queuecount.fetch_sub(1,SeqCst);
448 }
449 ///
450 /// find out approximate size of queue
451 ///
452 fn queue_size(&self)->u32
453 {
454 self.data().queuecount.load(Acquire)
455 }
456 ///
457 /// assumes queue is already locked by us
458 ///
459 fn wakenext(&self)
460 {
461 unsafe{
462 if !(*self.get_queuedata()).queue.is_null()
463 {
464 (*(*self.get_queuedata()).queue).thread.unpark();
465 }
466 }
467 }
468 ///
469 /// wake reader in front of queue
470 ///
471 fn wakereader(&self)
472 {
473 self.lock_queue();
474 unsafe{
475 let qdata=self.get_queuedata();
476 if !(*qdata).queue.is_null()
477 {
478 if (*(*qdata).queue).lock==LockType::Read
479 {
480 (*(*qdata).queue).thread.unpark();
481 }
482 }
483 }
484 self.unlock_queue();
485 }
486 ///
487 /// release write lock
488 ///
489 fn unwritelock(&self)
490 {
491 self.lock_queue();
492 self.wakenext();
493 self.unlock_queue();
494 let lock=self.data().lockcount.compare_exchange(
495 LOCKED,FREE,SeqCst,SeqCst);
496 match lock {
497 Ok(LOCKED) =>{}, //ok
498 Ok(x)=>panic!("was supposed to be locked but was {}",x),
499 Err(x)=>panic!("was supposed to be locked but was {}",x),
500 }
501 }
502 ///
503 /// decrement number of readlocks held
504 ///
505 fn unreadlock(&self)
506 {
507 let lock=self.data().lockcount.fetch_sub(1,SeqCst);
508 if lock<1
509 {
510 panic!("was supposed to be readlocked but was {}",1);
511 }
512 self.lock_queue();
513 self.wakenext();
514 self.unlock_queue();
515 }
516}
517
518/**
519 * implement send and sync since thats all we want
520 */
521unsafe impl<T: Send + Sync + ?Sized> Send for Cura<T> {}
522unsafe impl<T: Send + Sync + ?Sized> Sync for Cura<T> {}
523
524/**
525 * deref to make use simpler, this should also transparently
526 * read-lock
527 */
528//TBD feed this to chatgpt
529/*
530impl<T:Sync+Send> Deref for Cura<T>
531{
532 type Target = ReadGuard<T>;
533 //TBD this should probably return a reference to readguard?
534 fn deref(&self) -> &Self::Target {
535 todo!("this deref should actually do a 'read()' ");
536 //&self.data().data
537 &self.read()
538 }
539}*/
540/**
541 * clone to make new references of the object
542 */
543impl<T: Sync + Send +?Sized> Clone for Cura<T> {
544 fn clone(&self) -> Self {
545 self.data().count.fetch_add(1, Relaxed);
546 Cura {
547 ptr: self.ptr,
548 phantom:PhantomData,
549 //dummy:0,
550 }
551 }
552}
553/**
554 * drop to clean up references
555 */
556impl<T: Sync + Send + ?Sized> Drop for Cura<T> {
557 fn drop(&mut self) {
558 if self.data().count.fetch_sub(1, Release) == 1 {
559 unsafe {
560 drop(Box::from_raw(self.ptr.as_ptr()));
561 }
562 }
563 }
564}
565/**********************************************************
566 * guards
567 */
568///
569/// writeguard for Cura
570///
571#[must_use = "if unused the Lock will immediately unlock"]
572#[clippy::has_significant_drop]
573pub struct Guard<'a,T: Send+Sync+?Sized>
574{
575 cura:&'a Cura<T>,
576}
577impl<T:Send+Sync+?Sized> Drop for Guard<'_,T>
578{
579 fn drop(&mut self) {
580 self.cura.unwritelock(); //TBD no need to do anything else?
581 }
582}
583impl<T: Sync + Send+?Sized> Deref for Guard<'_,T> {
584 type Target = T;
585 fn deref(&self) -> &T { //TBD reference lifetime?
586 unsafe{
587 &*self.cura.data().data.get()
588 }
589 }
590}
591impl<T: Sync + Send + ?Sized> DerefMut for Guard<'_,T> {
592 fn deref_mut(&mut self) -> &mut T { //TBD reference lifetime?
593 unsafe {
594 &mut *self.cura.data().data.get()
595 }
596 }
597}
598
599
600/**
601 * readguard for Cura
602 */
603#[must_use = "if unused the Lock will immediately unlock"]
604#[clippy::has_significant_drop]
605pub struct ReadGuard<'a,T:Send+Sync+?Sized>
606{
607 cura:&'a Cura<T>,
608}
609impl<T:Send+Sync+?Sized> Drop for ReadGuard<'_,T>
610{
611 fn drop(&mut self) {
612 self.cura.unreadlock(); //TBD nothing else?
613 }
614}
615impl<T: Sync + Send + ?Sized> Deref for ReadGuard<'_,T> {
616 type Target = T;
617 fn deref(&self) -> &T {
618 unsafe{
619 &*self.cura.data().data.get()
620 }
621 }
622}
623///
624/// util to get current time in millis for testing
625///
626fn current_time()->u128{
627 SystemTime::now().
628 duration_since(UNIX_EPOCH).
629 expect("weird shit happened").
630 as_millis()
631}
632///
633/// util to sĺeep for a few millis
634///
635fn sleep(millis:u32){
636 std::thread::sleep(std::time::Duration::from_millis(millis.into()));
637}
638
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643 #[test]
644 fn basic_usecases() {
645
646 /* calculator for hits to "testing"*/
647 static NUM_HITS: AtomicUsize = AtomicUsize::new(0);
648 /* testing struct to act as the value*/
649 struct Foo {
650 id:u16,
651 }
652 impl Foo {
653 pub fn new(id:u16) -> Foo {
654 Foo {
655 id:id,
656 }
657 }
658 pub fn testing(&self) {
659 println!("works {}",self.id);
660 NUM_HITS.fetch_add(self.id.into(), SeqCst);
661 }
662 }
663 /* create ref and hit the test method*/
664 let x = Cura::new(Foo::new(1));
665 let y = x.clone();
666 {
667 x.read().testing();
668 }
669 /* take a writelock just for fun*/
670 {
671 let mut w=x.write();
672 *w=Foo::new(10);
673 w.testing();
674 }
675 y.read().testing(); //TBD convert this to work with deref
676 assert_eq!(21, NUM_HITS.load(Acquire));
677 }
678 #[test]
679 fn advanced_usecases()
680 {
681 static STATE:AtomicUsize=AtomicUsize::new(0);
682
683 struct Foo{
684 id:u16,
685 }
686 impl Foo {
687 pub fn new(id:u16)->Foo{
688 Foo{id:id}
689 }
690 pub fn testing(&mut self,id:u16) {
691 self.id=id;
692 }
693 pub fn id(&self)->u16
694 {
695 self.id
696 }
697 }
698 /* create ref*/
699 let x=Cura::new(Foo::new(1));
700 let y=x.clone();
701 /* writelock one ref*/
702 let w=y.write();
703 //let start=current_time();
704 /* start creating write and read threads to block*/
705 let mut threads=Vec::new();
706 for i in 0..30 {
707 let c=x.clone();
708 let write= i%5==0;
709 let i=i;
710 let t = std::thread::spawn(move || {
711 if write {
712 let c=c.clone();
713 loop{
714 println!("loop {} {}",i,write);
715 let _foo=c.write();
716 STATE.fetch_add(1,SeqCst);
717 //block and loop until STATE
718 if STATE.load(SeqCst)>=(i+1)
719 {
720 break;
721 }
722 }
723 }else{
724 let c=c.clone();
725 loop{
726 println!("loop {} {}",i,write);
727 let _foo=c.read();
728 STATE.fetch_add(1,SeqCst);
729 //TBD loop here until STATE>=i
730 if STATE.load(SeqCst)>=(i+1)
731 {
732 break;
733 }
734 }
735 }
736 });
737 threads.push(t);
738 }
739 sleep(1000);
740 drop(w);
741 while let Some(t) = threads.pop()
742 {
743 t.join().unwrap();
744 println!("joined");
745 }
746
747 //let end=current_time();
748 //println!("took:{}",(end-start));
749 }
750 #[test]
751 fn sized()
752 {
753 use std::sync::Arc;
754 fn test<T>(t:T)
755 {
756 println!("got t");
757 }
758 #[derive(Clone,Copy)]
759 enum EE
760 {
761 Bing,
762 Bong,
763 }
764 trait Foo:Send+Sync
765 {
766 fn get(&self)->EE
767 {
768 return EE::Bing;
769 }
770 }
771 struct FF;
772 impl Foo for FF{}
773
774 let t:Cura<dyn Foo>=Cura::from_box(Box::new(FF{}));
775 test(t);
776 let tt:Cura<dyn Foo>=Cura::from_box(Box::new(FF{}));
777 struct Bar<T:Sync+Send+?Sized>
778 {
779 tt:Cura<T>,
780 }
781 let f=Bar{tt:tt};
782 let _ = std::mem::size_of::<Cura<dyn Foo>>();
783 let _ = std::mem::size_of::<Arc<Arc<dyn Foo>>>();
784 let _ = std::mem::size_of::<Cura<Cura<dyn Foo>>>();
785 let _ = std::mem::size_of::<Bar<Cura<dyn Foo>>>();
786 }
787 #[test]
788 fn alter_works()
789 {
790 let t=Cura::new(3);
791 t.alter(|x|{
792 if *x==2{
793 *x=3;
794 Some(())
795 }else{
796 None
797 }
798 });
799
800 t.alter(|x|{
801 if *x==3{
802 *x=4;
803 Some(())
804 }else{
805 None
806 }
807 });
808
809 }
810 #[test]
811 fn loop_a_lot()
812 {
813 #[derive(Clone,Copy)]
814 enum Foo
815 {
816 Bing,
817 Bong,
818 }
819 let s=Cura::new(Foo::Bing);
820 let mut i=2000;
821 while i>0
822 {
823 match {*s.read()}.clone() {
824 Foo::Bing=>{
825 },
826 Foo::Bong=>{
827 },
828 }
829 i=i-1;
830 }
831 }
832 #[test]
833 fn loop_a_lot_box()
834 {
835 #[derive(Clone,Copy)]
836 enum EE
837 {
838 Bing,
839 Bong,
840 }
841 trait Foo:Send+Sync
842 {
843 fn get(&self)->EE
844 {
845 return EE::Bing;
846 }
847 }
848 struct FF;
849 impl Foo for FF{};
850 let t=Box::new(FF{});
851 let s:Cura<dyn Foo>=Cura::from_box(t);
852 let mut i=2000;
853 while i>0
854 {
855 match {s.read()}.get().clone() {
856 EE::Bing=>{
857 },
858 EE::Bong=>{
859 panic!("never here");
860 },
861 }
862 i=i-1;
863 }
864 }
865 #[test]
866 fn it_works() {
867 static DROPS: AtomicUsize = AtomicUsize::new(0);
868
869 struct Dropped;
870
871 impl Drop for Dropped {
872 fn drop(&mut self) {
873 DROPS.fetch_add(1, Relaxed);
874 }
875 }
876
877 /* create two testobjects and keep track of dropping*/
878 /* by including the drop-detector into a tuple*/
879 let x = Cura::new(("salve!", Dropped));
880 let y = x.clone();
881
882 /* push to another thread, see it works there*/
883 let t = std::thread::spawn(move || {
884 assert_eq!(x.read().0, "salve!");//TBD conver to deref
885 });
886
887 /* and still works here*/
888 assert_eq!(y.read().0, "salve!"); //TBD convert to deref
889
890 /* wait for the thread*/
891 t.join().unwrap();
892
893 /* object shouldnt have dropped yet*/
894 assert_eq!(DROPS.load(Relaxed), 0);
895
896 /* and we drop the last reference here , so it should drop*/
897 drop(y);
898
899 /* and check the result.*/
900 assert_eq!(DROPS.load(Relaxed), 1);
901 }
902}