instance_copy_on_write/
lib.rs

1/*-
2 * instance-copy-on-write - a synchronization primitive based on copy-on-write.
3 * 
4 * Copyright (C) 2025 Aleksandr Morozov alex@4neko.org
5 * 
6 * The instance-copy-on-write crate can be redistributed and/or modified
7 * under the terms of either of the following licenses:
8 *
9 *   1. The MIT License (MIT)
10 */
11
12/*!
13 An experimental thread access synchronization primitive 
14 which is based on CoW Copy-on-Write and also exclusive lock.
15 
16 By default an `RwLock` based implementation is used, because the 
17 `atomics` based implementation is still not ready. It can be activated 
18 by enabling the feature prefer_atomic.
19
20 **feature = prefer_atomic**
21 
22 The `ICoWRead<DATA>` instance with inner value `DATA` obtained from the 
23 `ICoW<DATA>` is valid even after the inner value was copied and copy was 
24 commited back. All `readers` which will will read the `ICoW` instance after 
25 the `commit` will receive a `ICoWRead<DATA>` with updated `DATA`.
26
27 Exclusive copying prevents `readers` from reading while simple `non-exclusive` 
28 copy allows to access the inner data for reading and further copying. But, the 
29 `non-exclusive` copies are not in sync, so each copy is uniq. The `sequential` 
30 commit of copied data is not yet supported.
31
32 The experimental `cow_refcell.rs` was added to support single-thread mode
33 pure CoW model which is not really usefull, but this is a path to the future
34 `async` implementation.
35
36 *  ```ignore
37
38    let val = 
39        ICoW::new(TestStruct::new(1, 2));
40
41    // read only 
42    let read1 = val.read();
43    //..
44    drop(read1);
45
46    // ...
47
48    // copy on write NON-exclusively, read is possible
49    let mut transaction = val.clone_copy();
50
51    transaction.start = 5;
52    transaction.stop = 6;
53
54    // update, after calling this function, all reades who 
55    // read before will still see old version.
56    // all reades after, new
57    transaction.commit();
58    //or
59    drop(transaction); // to drop changes
60
61
62    // exclusive lock, read is also not possible
63
64    let res = val.clone_exclusivly();
65    // ..
66    // commit changes releasing lock
67    commit(val); //or
68    drop(val); // to drop changes and release lock
69
70    ```
71 */
72
73 /// Type of the sync code.
74 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
75 pub enum ICoWLockTypes
76 {
77    /// Based on atomic and backoff.
78    Atomic,
79
80    /// based on the OS RwLock
81    RwLock,
82 }
83
84impl fmt::Display for ICoWLockTypes
85{
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
87    {
88        match self
89        {
90            Self::Atomic => 
91                write!(f, "atomic"),
92            Self::RwLock => 
93                write!(f, "rwlock"),
94        }
95    }
96}
97
98/// Errors which may be returned.
99#[derive(Copy, Clone, Debug, PartialEq, Eq)]
100pub enum ICoWError
101{
102    /// An attempt to write to the instance using non exclusive copy-on-write operation
103    /// while the exclusive is still going.
104    ExclusiveLockPending,
105
106    /// Is issued if "exponential backoff has completed and blocking the thread is advised".
107    WouldBlock,
108}
109
110impl fmt::Display for ICoWError
111{
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
113    {
114        match self
115        {
116            Self::ExclusiveLockPending => 
117                write!(f, "already exlcusivly locked"),
118            Self::WouldBlock => 
119                write!(f, "cannot perform try operation due to blocking"),
120        }
121    }
122}
123
124extern crate crossbeam_utils;
125extern crate crossbeam_deque;
126
127/// A lightweight CoW implementation.
128#[cfg(all(target_has_atomic = "32", feature = "prefer_atomic"))]
129pub mod cow;
130
131/// A RwLock based copy-on-write.
132#[cfg(any(not(target_has_atomic = "32"), not(feature = "prefer_atomic")))]
133pub mod cow_mutex;
134
135#[cfg(any(not(target_has_atomic = "32"), not(feature = "prefer_atomic")))]
136pub mod cow_refcell;
137
138use std::fmt;
139
140#[cfg(all(target_has_atomic = "32", feature = "prefer_atomic"))]
141pub use cow::{ICoW, ICoWRead, ICoWCopy, ICoWLock, ICoWWeak, ICoWWeakRead};
142
143#[cfg(any(not(target_has_atomic = "32"), not(feature = "prefer_atomic")))]
144pub use cow_mutex::{ICoW, ICoWRead, ICoWCopy, ICoWLock, ICoWWeak, ICoWWeakRead};
145
146#[cfg(all(not(target_has_atomic = "32"), feature = "prefer_atomic"))]
147compile_error!("target CPU does not support 'target_has_atomic'");
148
149pub trait ICowType
150{
151    fn get_lock_type() -> ICoWLockTypes;
152}
153
154#[cfg(test)]
155mod test
156{
157    use std::{sync::{Arc, LazyLock, mpsc}, thread, time::{Duration, Instant}};
158
159    use super::*;
160
161
162
163
164    #[test]
165    fn test_3()
166    {
167        #[derive(Debug, Clone)]
168        struct Test { s: String }
169
170        let icow = ICoW::new(Test{ s: "test".into() });
171
172        for _ in 0..10
173        {
174            let s = Instant::now();
175
176            let read0 = icow.read();
177
178            let e = s.elapsed();
179
180            println!("{:?}", e);
181        }
182
183    }
184
185    #[test]
186    fn test_33()
187    {
188        #[derive(Debug, Clone)]
189        struct Test { s: String };
190
191        let icow = ICoW::new(Test{ s: "test".into() });
192
193        let write_ex = icow.try_clone_copy_exclusivly().unwrap();
194
195        {
196        let write_ex_err = icow.try_clone_copy_exclusivly();
197        assert_eq!(write_ex_err.is_none(), true);
198        }
199
200        let write_ex_err = icow.try_clone_copy();
201        assert_eq!(write_ex_err.is_none(), true);
202
203        let read1 = icow.try_read();
204        assert_eq!(read1.is_none(), true);
205
206        drop(write_ex);
207
208        drop(icow);
209
210    }
211
212    #[test]
213    fn test_4()
214    {
215        #[derive(Debug, Clone)]
216        struct Test { s: u32 }
217
218        let icow = ICoW::new(Test{ s: 1 });
219
220        let read0 = icow.read();
221        let read1 = icow.read();
222
223        let mut excl_write = icow.try_clone_copy_exclusivly().unwrap();
224
225        excl_write.s = 5;
226
227
228        excl_write.commit();
229
230        assert_eq!(read0.s, 1);
231
232        let read3 = icow.read();
233        assert_eq!(read3.s, 5);
234
235        let mut writing = icow.try_clone_copy().unwrap();
236        writing.s = 4;
237
238        writing.commit().unwrap();
239
240        assert_eq!(read0.s, 1);
241        assert_eq!(read3.s, 5);
242
243        let read4 = icow.read();
244        assert_eq!(read4.s, 4);
245    }
246
247    #[test]
248    fn test_parking_lot_reader_wakeup()
249    {
250        #[derive(Debug, Clone)]
251        struct Test { s: u32 }
252
253        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
254
255        let read0 = icow.read();
256        let read2 = icow.read();
257
258        let c_icow = icow.clone();
259        let (se, rc) = mpsc::channel::<()>();
260        let handler0 = 
261            std::thread::spawn(move || 
262                {
263                    let mut lock0 = c_icow.try_clone_copy_exclusivly().unwrap();
264
265                    se.send(()).unwrap();
266
267                    lock0.s = 5;
268
269                    std::thread::sleep(Duration::from_secs(2));
270
271                    lock0.commit();
272                }
273            );
274
275        rc.recv().unwrap();
276
277        let s = Instant::now();
278
279        let read1 = icow.read();
280        
281        let e = s.elapsed();
282
283        println!("thread wait: {:?}", e);
284
285        assert_eq!(e.as_secs(), 2);
286
287        handler0.join().unwrap();
288
289        drop(read0);
290        drop(read1);
291        drop(read2);
292    }
293
294    #[test]
295    fn text_9_exclu_lock_pend()
296    {
297        #[derive(Debug, Clone)]
298        struct Test { s: u32 }
299
300        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
301
302            let mut write0 = icow.clone_copy();
303            write0.s = 1222;
304
305        let c_icow = icow.clone();
306        let (se, rc) = mpsc::channel::<()>();
307        let handler0 = 
308            std::thread::spawn(move || 
309                {
310                    let mut lock0 = 
311                        loop 
312                        {
313                            let res = c_icow.try_clone_copy_exclusivly();
314
315                            if res.is_none() == true
316                            {
317                                continue;
318                            }
319                            
320                            break res.unwrap();
321                        };
322
323                    se.send(()).unwrap();
324
325                    lock0.s = 5;
326
327                    std::thread::sleep(Duration::from_secs(1));
328
329                    lock0.commit();
330                }
331            );
332
333
334        rc.recv().unwrap();
335
336        let s = Instant::now();
337
338        let commit_res = write0.commit();
339        assert_eq!(commit_res.is_err(), true);
340        assert_eq!(commit_res.as_ref().err().unwrap().0, ICoWError::ExclusiveLockPending);
341        write0 = commit_res.err().unwrap().1;
342
343        write0.commit_blocking(true).unwrap();
344        
345        let e = s.elapsed();
346
347        println!("thread wait: {:?}", e);
348
349        assert_eq!(e.as_secs(), 1);
350
351        handler0.join().unwrap();
352
353        return;
354    }
355
356    #[test]
357    fn test_5()
358    {
359        #[derive(Debug, Clone)]
360        struct Test { s: u32 }
361
362        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
363
364        let read0 = icow.read();
365        let read2 = icow.read();
366        
367        let c_icow = icow.clone();
368
369        let (se, rc) = mpsc::channel::<()>();
370        let handler0 = 
371            std::thread::spawn(move || 
372                {
373                    let mut lock0 = c_icow.try_clone_copy_exclusivly().unwrap();
374
375                    se.send(()).unwrap();
376
377                    lock0.s = 5;
378
379                    std::thread::sleep(Duration::from_micros(2));
380
381                    lock0.commit();
382                }
383            );
384
385        rc.recv().unwrap();
386
387        let s = Instant::now();
388
389        let read1 = icow.read();
390        
391        let e = s.elapsed();
392
393        println!("{:?}", e);
394
395        
396        assert_eq!(read1.s, 5);
397        assert_eq!(read0.s, 1);
398
399        handler0.join().unwrap();
400
401        let weak0 = read0.weak();
402        let weak1 = read1.weak();
403
404        drop(read0);
405        drop(read1);
406
407        assert_eq!(weak0.upgrade().is_some(), true);
408        assert_eq!(weak1.upgrade().is_some(), true);
409
410        drop(read2);
411        assert_eq!(weak0.upgrade().is_none(), true);
412        assert_eq!(weak1.upgrade().is_some(), true);
413    }
414
415    #[test]
416    fn test_5_read_lock()
417    {
418        #[derive(Debug, Clone)]
419        struct Test { s: u32 }
420
421        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
422        
423        let c_icow = icow.clone();
424
425        let (se, rc) = mpsc::channel::<()>();
426        let (se2, rc2) = mpsc::channel::<()>();
427        let handler0 = 
428            std::thread::spawn(move || 
429                {
430                    rc2.recv().unwrap();
431
432                    let mut lock0 = 
433                        loop 
434                        {
435                            let res = c_icow.try_clone_copy_exclusivly();
436
437                            if res.is_none() == true
438                            {
439                                continue;
440                            }
441                            
442                            break res.unwrap();
443                        };
444
445                    se.send(()).unwrap();
446
447                    lock0.s = 5;
448
449                    lock0.commit();
450                }
451            );
452
453        let mut reads = Vec::<ICoWRead<'_, Test>>::with_capacity(20000);  
454        let mut sent = false;
455
456        let s = Instant::now();
457
458        for i in 0..10000
459        {
460            let read0 = icow.read();
461            if read0.s == 5
462            {
463                println!("break at {}", i);
464                break;
465            }
466
467            reads.push(read0);
468
469            if sent == false
470            {
471                se2.send(());
472                sent = true;
473            }
474        }
475        
476        let e = s.elapsed();
477
478        println!("{:?}", e);
479
480        assert_eq!(reads[0].s, 1);
481
482        handler0.join().unwrap();
483    }
484
485    #[test]
486    fn test_6()
487    {
488        #[derive(Debug, Clone)]
489        struct Test { s: u32 }
490
491        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
492
493        let read0 = icow.read();
494        let read2 = icow.read();
495        
496        let c_icow = icow.clone();
497
498        let (se, rc) = mpsc::channel::<()>();
499        let handler0 = 
500            std::thread::spawn(move || 
501                {
502                    let read2 = c_icow.read();
503
504                    let mut lock0 = c_icow.try_clone_copy_exclusivly().unwrap();
505
506                    se.send(()).unwrap();
507
508                    lock0.s = 5;
509
510                    std::thread::sleep(Duration::from_nanos(50));
511                    lock0.commit();
512
513                    let read3 = c_icow.read();
514
515                    assert_eq!(read2.s, 1);
516                    assert_eq!(read3.s, 5);
517                }
518            );
519
520        rc.recv().unwrap();
521
522        for _ in 0..100000000
523        {
524            let read1 = icow.read();
525
526            if read1.s == 1
527            {
528                continue;
529            }
530            else
531            {
532                break;
533            }
534        }
535
536        let read1 = icow.read();
537        assert_eq!(read1.item.s, 5);
538
539        handler0.join().unwrap();
540
541        return;
542    }
543
544    #[test]
545    fn test_6_drop()
546    {
547        #[derive(Debug, Clone)]
548        struct Test { s: u32 }
549
550        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
551
552        let read0 = icow.read();
553        let read2 = icow.read();
554        
555        let c_icow = icow.clone();
556
557        let (se, rc) = mpsc::channel::<()>();
558        let handler0 = 
559            std::thread::spawn(move || 
560                {
561                    let read2 = c_icow.read();
562
563                    let mut lock0 = c_icow.try_clone_copy_exclusivly().unwrap();
564
565                    rc.recv().unwrap();
566
567                    lock0.s = 5;
568
569                    std::thread::sleep(Duration::from_millis(1500));
570                    lock0.commit();
571
572                    let read3 = c_icow.read();
573
574                    assert_eq!(read2.s, 1);
575                    assert_eq!(read3.s, 5);
576                }
577            );
578
579        drop(read0);
580        let r2 = read2.clone_copy_into_inner();
581
582        se.send(()).unwrap();
583
584        drop(icow);
585
586        assert_eq!(r2.s, 1);
587
588        handler0.join().unwrap();
589
590        return;
591    }
592
593
594    #[test]
595    fn test_7()
596    {
597        #[derive(Debug, Clone)]
598        struct TestStruct { s: u32 }
599
600        impl TestStruct
601        {
602            fn new(s: u32) -> Self
603            {
604                return Self{ s: s };
605            }
606        }
607
608        static VAL: LazyLock<ICoW<TestStruct>> = 
609            LazyLock::new(|| ICoW::new(TestStruct::new(1))); 
610        
611
612        
613        
614            let borrow1 = VAL.read();
615            assert_eq!(borrow1.item.s, 1);
616
617            let (mpsc_send, mpsc_rcv) = mpsc::channel::<u64>();
618            let (mpsc_send2, mpsc_rcv2) = mpsc::channel::<u64>();
619
620            let thread1 = 
621                std::thread::spawn(move ||
622                    {
623                        for _ in 0..1000
624                        {
625                            let _ = mpsc_rcv2.recv();
626                            let borrow1 = VAL.read();
627
628                            let mut transaction = VAL.try_clone_copy_exclusivly().unwrap();
629
630                            transaction.s = 5;
631
632                            
633
634                            std::thread::sleep(Duration::from_nanos(1001));
635                            transaction.commit();
636                            let borrow2 = VAL.read();
637                            
638
639                            assert_eq!(borrow1.item.s, 1);
640
641                            assert_eq!(borrow2.item.s, 5);
642
643                            let _ = mpsc_send.send(1);
644                        }
645                    }
646                );
647            
648            
649
650            for x in 0..1000
651            {
652                println!("{}", x);
653                mpsc_send2.send(1).unwrap();
654                let _ = mpsc_rcv.recv();
655
656                let borrow1 = VAL.read();
657                assert_eq!(borrow1.s, 5);
658
659                let mut transaction = VAL.try_clone_copy_exclusivly().unwrap();
660                transaction.s = 1;
661                transaction.commit();
662
663                let borrow1 = VAL.read();
664                assert_eq!(borrow1.s, 1);
665                
666            }
667            
668            
669
670            thread1.join().unwrap();
671
672            
673    }
674
675
676    #[test]
677    fn test_8()
678    {
679        #[derive(Debug, Clone)]
680        struct Test { s: u32 }
681
682        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
683
684        for _ in 0..20
685        {
686            let read0 = icow.read();
687            let read2 = icow.read();
688            
689            let c_icow = icow.clone();
690
691            //let (se, rc) = mpsc::channel::<()>();
692            let handler0 = 
693                std::thread::spawn(move || 
694                    {
695                        let read2 = c_icow.read();
696
697                        let mut lock0 = c_icow.as_ref().clone_copy();
698
699                        //se.send(()).unwrap();
700
701                        lock0.s = 5;
702
703                        std::thread::sleep(Duration::from_micros(1));
704                        lock0.commit_blocking(false).unwrap();
705
706                        let read3 = c_icow.read();
707
708                        assert_eq!(read2.s, 1);
709                        assert_eq!(read3.s, 5);
710                    }
711                );
712
713            //rc.recv().unwrap();
714
715            for i in 0..1000000000
716            {
717                let read1 = icow.read();
718
719                if read1.s == 1
720                {
721                    continue;
722                }
723                else
724                {
725                    println!("{}", i);
726                    break;
727                }
728            }
729
730            let read1 = icow.read();
731            assert_eq!(read1.s, 5);
732
733            handler0.join().unwrap();
734
735            let mut lock0 = icow.try_clone_copy().unwrap();
736            lock0.s = 1;
737            lock0.commit_blocking(false).unwrap();
738        }
739
740        return;
741    }
742
743    #[test]
744    fn test_9()
745    {
746        #[derive(Debug, Clone)]
747        struct Test { s: u32 }
748
749        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
750
751        let weak_reader = icow.reader();
752
753        let weak_r0 = weak_reader.aquire().unwrap();
754
755        assert_eq!(weak_r0.s, 1);
756
757        drop(weak_r0);
758
759        let mut copy1 = icow.clone_copy();
760        copy1.s = 8;
761        copy1.commit_blocking(true).unwrap();
762
763        let weak_r0 = weak_reader.aquire().unwrap();
764
765        assert_eq!(weak_r0.s, 8);
766
767        drop(weak_r0);
768
769        // drop icow, check that the aquire() returns None because the data and
770        // icow instance have gone.
771        drop(icow);
772
773        let weak_r0 = weak_reader.aquire();
774        assert_eq!(weak_r0.is_none(), true);
775
776    }
777
778    #[test]
779    fn test_10()
780    {
781        #[derive(Debug, Clone)]
782        struct Test { s: u32 }
783
784        let icow = Arc::new(ICoW::new(Test{ s: 1 }));
785
786        let weak_reader = icow.reader();
787
788        let (tx, rx) = mpsc::channel::<()>();
789        let (tx2, rx2) = mpsc::channel::<()>();
790
791        let thread_hndl = 
792            thread::spawn(move ||
793                {
794                    let weak_r0 = weak_reader.aquire().unwrap();
795
796                    assert_eq!(weak_r0.s, 1);
797
798                    drop(weak_r0);
799
800                    tx.send(()).unwrap();
801
802                    // ----
803                    let _ = rx2.recv().unwrap();
804
805                    let weak_r0 = weak_reader.aquire().unwrap();
806
807                    assert_eq!(weak_r0.item_updated(), true);
808                    assert_eq!(weak_r0.s, 8);
809
810                    drop(weak_r0);
811
812                    tx.send(()).unwrap();
813                    // ----
814                    let _ = rx2.recv().unwrap();
815
816                    // check that icow dropped
817                    let weak_r0 = weak_reader.aquire();
818                    assert_eq!(weak_r0.is_none(), true);
819                }
820            );
821
822        let _ = rx.recv().unwrap();
823
824        let mut copy1 = icow.clone_copy();
825        copy1.s = 8;
826        copy1.commit_blocking(true).unwrap();
827
828        tx2.send(()).unwrap();
829        let _ = rx.recv().unwrap();
830
831        // drop icow, check that the aquire() returns None because the data and
832        // icow instance have gone.
833        drop(icow);
834
835        tx2.send(()).unwrap();
836
837        thread_hndl.join().unwrap();
838
839        return;
840
841    }
842}