1use crate::rt::execution::Execution;
58use crate::rt::location::{self, Location, LocationSet};
59use crate::rt::object;
60use crate::rt::{
61 self, thread, Access, Numeric, Synchronize, VersionVec, MAX_ATOMIC_HISTORY, MAX_THREADS,
62};
63
64use std::cmp;
65use std::marker::PhantomData;
66use std::sync::atomic::Ordering;
67use std::u16;
68
69use tracing::trace;
70
71#[derive(Debug)]
72pub(crate) struct Atomic<T> {
73 state: object::Ref<State>,
74 _p: PhantomData<fn() -> T>,
75}
76
77#[derive(Debug)]
78pub(super) struct State {
79 created_location: Location,
81
82 loaded_at: VersionVec,
84
85 loaded_locations: LocationSet,
87
88 unsync_loaded_at: VersionVec,
90
91 unsync_loaded_locations: LocationSet,
93
94 stored_at: VersionVec,
96
97 stored_locations: LocationSet,
99
100 unsync_mut_at: VersionVec,
106
107 unsync_mut_locations: LocationSet,
109
110 is_mutating: bool,
113
114 last_access: Option<Access>,
117
118 last_non_load_access: Option<Access>,
120
121 stores: [Store; MAX_ATOMIC_HISTORY],
124
125 cnt: u16,
127}
128
129#[derive(Debug, Copy, Clone, PartialEq)]
130pub(super) enum Action {
131 Load,
133
134 Store,
136
137 Rmw,
139}
140
141#[derive(Debug)]
142struct Store {
143 value: u64,
145
146 happens_before: VersionVec,
148
149 modification_order: VersionVec,
152
153 sync: Synchronize,
155
156 first_seen: FirstSeen,
158
159 seq_cst: bool,
161}
162
163#[derive(Debug)]
164struct FirstSeen([u16; MAX_THREADS]);
165
166pub(crate) fn fence(ordering: Ordering) {
168 rt::synchronize(|execution| match ordering {
169 Ordering::Acquire => fence_acq(execution),
170 Ordering::Release => fence_rel(execution),
171 Ordering::AcqRel => fence_acqrel(execution),
172 Ordering::SeqCst => fence_seqcst(execution),
173 Ordering::Relaxed => panic!("there is no such thing as a relaxed fence"),
174 order => unimplemented!("unimplemented ordering {:?}", order),
175 });
176}
177
178fn fence_acq(execution: &mut Execution) {
179 for state in execution.objects.iter_mut::<State>() {
182 for store in state.stores_mut() {
184 if !store.first_seen.is_seen_by_current(&execution.threads) {
185 continue;
186 }
187
188 store
189 .sync
190 .sync_load(&mut execution.threads, Ordering::Acquire);
191 }
192 }
193}
194
195fn fence_rel(execution: &mut Execution) {
196 let active = execution.threads.active_mut();
198 active.released = active.causality;
199}
200
201fn fence_acqrel(execution: &mut Execution) {
202 fence_acq(execution);
203 fence_rel(execution);
204}
205
206fn fence_seqcst(execution: &mut Execution) {
207 fence_acqrel(execution);
208 execution.threads.seq_cst_fence();
209}
210
211impl<T: Numeric> Atomic<T> {
212 pub(crate) fn new(value: T, location: Location) -> Atomic<T> {
214 rt::execution(|execution| {
215 let state = State::new(&mut execution.threads, value.into_u64(), location);
216 let state = execution.objects.insert(state);
217
218 trace!(?state, "Atomic::new");
219
220 Atomic {
221 state,
222 _p: PhantomData,
223 }
224 })
225 }
226
227 pub(crate) fn load(&self, location: Location, ordering: Ordering) -> T {
229 self.branch(Action::Load, location);
230
231 super::synchronize(|execution| {
232 let state = self.state.get_mut(&mut execution.objects);
233
234 if execution.path.is_traversed() {
236 let mut seed = [0; MAX_ATOMIC_HISTORY];
237
238 let n = state.match_load_to_stores(&execution.threads, &mut seed[..], ordering);
239
240 execution.path.push_load(&seed[..n]);
241 }
242
243 let index = execution.path.branch_load();
245
246 trace!(state = ?self.state, ?ordering, "Atomic::load");
247
248 T::from_u64(state.load(&mut execution.threads, index, location, ordering))
249 })
250 }
251
252 pub(crate) fn unsync_load(&self, location: Location) -> T {
254 rt::execution(|execution| {
255 let state = self.state.get_mut(&mut execution.objects);
256
257 state
258 .unsync_loaded_locations
259 .track(location, &execution.threads);
260
261 state.track_unsync_load(&execution.threads);
263
264 trace!(state = ?self.state, "Atomic::unsync_load");
265
266 let index = index(state.cnt - 1);
268 T::from_u64(state.stores[index].value)
269 })
270 }
271
272 pub(crate) fn store(&self, location: Location, val: T, ordering: Ordering) {
274 self.branch(Action::Store, location);
275
276 super::synchronize(|execution| {
277 let state = self.state.get_mut(&mut execution.objects);
278
279 state.stored_locations.track(location, &execution.threads);
280
281 state.track_store(&execution.threads);
284
285 trace!(state = ?self.state, ?ordering, "Atomic::store");
286
287 state.store(
289 &mut execution.threads,
290 Synchronize::new(),
291 val.into_u64(),
292 ordering,
293 );
294 })
295 }
296
297 pub(crate) fn rmw<F, E>(
298 &self,
299 location: Location,
300 success: Ordering,
301 failure: Ordering,
302 f: F,
303 ) -> Result<T, E>
304 where
305 F: FnOnce(T) -> Result<T, E>,
306 {
307 self.branch(Action::Rmw, location);
308
309 super::synchronize(|execution| {
310 let state = self.state.get_mut(&mut execution.objects);
311
312 if execution.path.is_traversed() {
314 let mut seed = [0; MAX_ATOMIC_HISTORY];
315
316 let n = state.match_rmw_to_stores(&mut seed[..]);
317 execution.path.push_load(&seed[..n]);
318 }
319
320 let index = execution.path.branch_load();
322
323 trace!(state = ?self.state, ?success, ?failure, "Atomic::rmw");
324
325 state
326 .rmw(
327 &mut execution.threads,
328 index,
329 location,
330 success,
331 failure,
332 |num| f(T::from_u64(num)).map(T::into_u64),
333 )
334 .map(T::from_u64)
335 })
336 }
337
338 pub(crate) fn with_mut<R>(&mut self, location: Location, f: impl FnOnce(&mut T) -> R) -> R {
342 let value = super::execution(|execution| {
343 let state = self.state.get_mut(&mut execution.objects);
344
345 state
346 .unsync_mut_locations
347 .track(location, &execution.threads);
348 state.track_unsync_mut(&execution.threads);
350 state.is_mutating = true;
351
352 trace!(state = ?self.state, "Atomic::with_mut");
353
354 let index = index(state.cnt - 1);
356 T::from_u64(state.stores[index].value)
357 });
358
359 struct Reset<T: Numeric>(T, object::Ref<State>);
360
361 impl<T: Numeric> Drop for Reset<T> {
362 fn drop(&mut self) {
363 super::execution(|execution| {
364 let state = self.1.get_mut(&mut execution.objects);
365
366 assert!(state.is_mutating);
368 state.is_mutating = false;
369
370 let index = index(state.cnt - 1);
373 state.stores[index].value = T::into_u64(self.0);
374
375 if !std::thread::panicking() {
376 state.track_unsync_mut(&execution.threads);
377 }
378 });
379 }
380 }
381
382 let mut reset = Reset(value, self.state);
384 f(&mut reset.0)
385 }
386
387 fn branch(&self, action: Action, location: Location) {
388 let r = self.state;
389 r.branch_action(action, location);
390 assert!(
391 r.ref_eq(self.state),
392 "Internal state mutated during branch. This is \
393 usually due to a bug in the algorithm being tested writing in \
394 an invalid memory location."
395 );
396 }
397}
398
399impl State {
402 fn new(threads: &mut thread::Set, value: u64, location: Location) -> State {
403 let mut state = State {
404 created_location: location,
405 loaded_at: VersionVec::new(),
406 loaded_locations: LocationSet::new(),
407 unsync_loaded_at: VersionVec::new(),
408 unsync_loaded_locations: LocationSet::new(),
409 stored_at: VersionVec::new(),
410 stored_locations: LocationSet::new(),
411 unsync_mut_at: VersionVec::new(),
412 unsync_mut_locations: LocationSet::new(),
413 is_mutating: false,
414 last_access: None,
415 last_non_load_access: None,
416 stores: Default::default(),
417 cnt: 0,
418 };
419
420 state.track_unsync_mut(threads);
422
423 state.store(threads, Synchronize::new(), value, Ordering::Release);
431
432 state
433 }
434
435 fn load(
436 &mut self,
437 threads: &mut thread::Set,
438 index: usize,
439 location: Location,
440 ordering: Ordering,
441 ) -> u64 {
442 self.loaded_locations.track(location, threads);
443 self.track_load(threads);
445
446 self.apply_load_coherence(threads, index);
448
449 let store = &mut self.stores[index];
450
451 store.first_seen.touch(threads);
452 store.sync.sync_load(threads, ordering);
453 store.value
454 }
455
456 fn store(
457 &mut self,
458 threads: &mut thread::Set,
459 mut sync: Synchronize,
460 value: u64,
461 ordering: Ordering,
462 ) {
463 let index = index(self.cnt);
464
465 self.cnt += 1;
467
468 let happens_before = threads.active().causality;
472
473 let mut modification_order = happens_before;
475
476 for i in 0..self.stores.len() {
478 if self.stores[i].first_seen.is_seen_by_current(threads) {
480 let mo = self.stores[i].modification_order;
481 modification_order.join(&mo);
482 }
483 }
484
485 sync.sync_store(threads, ordering);
486
487 let mut first_seen = FirstSeen::new();
488 first_seen.touch(threads);
489
490 self.stores[index] = Store {
492 value,
493 happens_before,
494 modification_order,
495 sync,
496 first_seen,
497 seq_cst: is_seq_cst(ordering),
498 };
499 }
500
501 fn rmw<E>(
502 &mut self,
503 threads: &mut thread::Set,
504 index: usize,
505 location: Location,
506 success: Ordering,
507 failure: Ordering,
508 f: impl FnOnce(u64) -> Result<u64, E>,
509 ) -> Result<u64, E> {
510 self.loaded_locations.track(location, threads);
511
512 self.track_load(threads);
515
516 self.apply_load_coherence(threads, index);
518
519 self.stores[index].first_seen.touch(threads);
520
521 let prev = self.stores[index].value;
522
523 match f(prev) {
524 Ok(next) => {
525 self.stored_locations.track(location, threads);
526 self.track_store(threads);
528
529 self.stores[index].sync.sync_load(threads, success);
531
532 let sync = self.stores[index].sync;
536 self.store(threads, sync, next, success);
537
538 Ok(prev)
539 }
540 Err(e) => {
541 self.stores[index].sync.sync_load(threads, failure);
542 Err(e)
543 }
544 }
545 }
546
547 fn apply_load_coherence(&mut self, threads: &mut thread::Set, index: usize) {
548 for i in 0..self.stores.len() {
549 if index == i {
551 continue;
552 }
553
554 if self.stores[i].first_seen.is_seen_by_current(threads) {
556 let mo = self.stores[i].modification_order;
557 self.stores[index].modification_order.join(&mo);
558 }
559
560 if self.stores[i].happens_before < threads.active().causality {
562 let mo = self.stores[i].modification_order;
563 self.stores[index].modification_order.join(&mo);
564 }
565 }
566 }
567
568 fn track_load(&mut self, threads: &thread::Set) {
570 assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
571
572 let current = &threads.active().causality;
573
574 if let Some(mut_at) = current.ahead(&self.unsync_mut_at) {
575 location::panic("Causality violation: Concurrent load and mut accesses.")
576 .location("created", self.created_location)
577 .thread("with_mut", mut_at, self.unsync_mut_locations[mut_at])
578 .thread("load", threads.active_id(), self.loaded_locations[threads])
579 .fire();
580 }
581
582 self.loaded_at.join(current);
583 }
584
585 fn track_unsync_load(&mut self, threads: &thread::Set) {
587 assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
588
589 let current = &threads.active().causality;
590
591 if let Some(mut_at) = current.ahead(&self.unsync_mut_at) {
592 location::panic("Causality violation: Concurrent `unsync_load` and mut accesses.")
593 .location("created", self.created_location)
594 .thread("with_mut", mut_at, self.unsync_mut_locations[mut_at])
595 .thread(
596 "unsync_load",
597 threads.active_id(),
598 self.unsync_loaded_locations[threads],
599 )
600 .fire();
601 }
602
603 if let Some(stored) = current.ahead(&self.stored_at) {
604 location::panic("Causality violation: Concurrent `unsync_load` and atomic store.")
605 .location("created", self.created_location)
606 .thread("atomic store", stored, self.stored_locations[stored])
607 .thread(
608 "unsync_load",
609 threads.active_id(),
610 self.unsync_loaded_locations[threads],
611 )
612 .fire();
613 }
614
615 self.unsync_loaded_at.join(current);
616 }
617
618 fn track_store(&mut self, threads: &thread::Set) {
620 assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
621
622 let current = &threads.active().causality;
623
624 if let Some(mut_at) = current.ahead(&self.unsync_mut_at) {
625 location::panic("Causality violation: Concurrent atomic store and mut accesses.")
626 .location("created", self.created_location)
627 .thread("with_mut", mut_at, self.unsync_mut_locations[mut_at])
628 .thread(
629 "atomic store",
630 threads.active_id(),
631 self.stored_locations[threads],
632 )
633 .fire();
634 }
635
636 if let Some(loaded) = current.ahead(&self.unsync_loaded_at) {
637 location::panic(
638 "Causality violation: Concurrent atomic store and `unsync_load` accesses.",
639 )
640 .location("created", self.created_location)
641 .thread("unsync_load", loaded, self.unsync_loaded_locations[loaded])
642 .thread(
643 "atomic store",
644 threads.active_id(),
645 self.stored_locations[threads],
646 )
647 .fire();
648 }
649
650 self.stored_at.join(current);
651 }
652
653 fn track_unsync_mut(&mut self, threads: &thread::Set) {
655 assert!(!self.is_mutating, "atomic cell is in `with_mut` call");
656
657 let current = &threads.active().causality;
658
659 if let Some(loaded) = current.ahead(&self.loaded_at) {
660 location::panic("Causality violation: Concurrent atomic load and unsync mut accesses.")
661 .location("created", self.created_location)
662 .thread("atomic load", loaded, self.loaded_locations[loaded])
663 .thread(
664 "with_mut",
665 threads.active_id(),
666 self.unsync_mut_locations[threads],
667 )
668 .fire();
669 }
670
671 if let Some(loaded) = current.ahead(&self.unsync_loaded_at) {
672 location::panic(
673 "Causality violation: Concurrent `unsync_load` and unsync mut accesses.",
674 )
675 .location("created", self.created_location)
676 .thread("unsync_load", loaded, self.unsync_loaded_locations[loaded])
677 .thread(
678 "with_mut",
679 threads.active_id(),
680 self.unsync_mut_locations[threads],
681 )
682 .fire();
683 }
684
685 if let Some(stored) = current.ahead(&self.stored_at) {
686 location::panic(
687 "Causality violation: Concurrent atomic store and unsync mut accesses.",
688 )
689 .location("created", self.created_location)
690 .thread("atomic store", stored, self.stored_locations[stored])
691 .thread(
692 "with_mut",
693 threads.active_id(),
694 self.unsync_mut_locations[threads],
695 )
696 .fire();
697 }
698
699 if let Some(mut_at) = current.ahead(&self.unsync_mut_at) {
700 location::panic("Causality violation: Concurrent unsync mut accesses.")
701 .location("created", self.created_location)
702 .thread("with_mut one", mut_at, self.unsync_mut_locations[mut_at])
703 .thread(
704 "with_mut two",
705 threads.active_id(),
706 self.unsync_mut_locations[threads],
707 )
708 .fire();
709 }
710
711 self.unsync_mut_at.join(current);
712 }
713
714 fn match_load_to_stores(
716 &self,
717 threads: &thread::Set,
718 dst: &mut [u8],
719 ordering: Ordering,
720 ) -> usize {
721 let mut n = 0;
722 let cnt = self.cnt as usize;
723
724 'outer: for i in 0..self.stores.len() {
734 let store_i = &self.stores[i];
735
736 if i >= cnt {
737 continue;
739 }
740
741 for j in 0..self.stores.len() {
742 let store_j = &self.stores[j];
743
744 if i == j || j >= cnt {
745 continue;
746 }
747
748 let mo_i = store_i.modification_order;
749 let mo_j = store_j.modification_order;
750
751 assert_ne!(mo_i, mo_j);
753
754 if mo_i < mo_j {
755 if store_j.first_seen.is_seen_by_current(threads) {
756 continue 'outer;
758 }
759
760 if store_i.first_seen.is_seen_before_yield(threads) {
761 continue 'outer;
764 }
765
766 if is_seq_cst(ordering) && store_i.seq_cst && store_j.seq_cst {
767 continue 'outer;
769 }
770 }
771 }
772
773 dst[n] = i as u8;
775 n += 1;
776 }
777
778 n
779 }
780
781 fn match_rmw_to_stores(&self, dst: &mut [u8]) -> usize {
782 let mut n = 0;
783 let cnt = self.cnt as usize;
784
785 'outer: for i in 0..self.stores.len() {
788 let store_i = &self.stores[i];
789
790 if i >= cnt {
791 continue;
793 }
794
795 for j in 0..self.stores.len() {
796 let store_j = &self.stores[j];
797
798 if i == j || j >= cnt {
799 continue;
800 }
801
802 let mo_i = store_i.modification_order;
803 let mo_j = store_j.modification_order;
804
805 assert_ne!(mo_i, mo_j);
806
807 if mo_i < mo_j {
808 continue 'outer;
810 }
811 }
812
813 dst[n] = i as u8;
815 n += 1;
816 }
817
818 n
819 }
820
821 fn stores_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut Store> {
822 let (start, end) = range(self.cnt);
823 let (two, one) = self.stores[..end].split_at_mut(start);
824
825 one.iter_mut().chain(two.iter_mut())
826 }
827
828 pub(super) fn last_dependent_access(&self, action: Action) -> Option<&Access> {
830 match action {
831 Action::Load => self.last_non_load_access.as_ref(),
832 _ => self.last_access.as_ref(),
833 }
834 }
835
836 pub(super) fn set_last_access(&mut self, action: Action, path_id: usize, version: &VersionVec) {
838 Access::set_or_create(&mut self.last_access, path_id, version);
840
841 match action {
842 Action::Load => {}
843 _ => {
844 Access::set_or_create(&mut self.last_non_load_access, path_id, version);
846 }
847 }
848 }
849}
850
851impl Default for Store {
854 fn default() -> Store {
855 Store {
856 value: 0,
857 happens_before: VersionVec::new(),
858 modification_order: VersionVec::new(),
859 sync: Synchronize::new(),
860 first_seen: FirstSeen::new(),
861 seq_cst: false,
862 }
863 }
864}
865
866impl FirstSeen {
869 fn new() -> FirstSeen {
870 FirstSeen([u16::max_value(); MAX_THREADS])
871 }
872
873 fn touch(&mut self, threads: &thread::Set) {
874 if self.0[threads.active_id().as_usize()] == u16::max_value() {
875 self.0[threads.active_id().as_usize()] = threads.active_atomic_version();
876 }
877 }
878
879 fn is_seen_by_current(&self, threads: &thread::Set) -> bool {
880 for (thread_id, version) in threads.active().causality.versions(threads.execution_id()) {
881 match self.0[thread_id.as_usize()] {
882 u16::MAX => {}
883 v if v <= version => return true,
884 _ => {}
885 }
886 }
887
888 false
889 }
890
891 fn is_seen_before_yield(&self, threads: &thread::Set) -> bool {
892 let thread_id = threads.active_id();
893
894 let last_yield = match threads.active().last_yield {
895 Some(v) => v,
896 None => return false,
897 };
898
899 match self.0[thread_id.as_usize()] {
900 u16::MAX => false,
901 v => v <= last_yield,
902 }
903 }
904}
905
906fn is_seq_cst(order: Ordering) -> bool {
907 order == Ordering::SeqCst
908}
909
910fn range(cnt: u16) -> (usize, usize) {
911 let start = index(cnt.saturating_sub(MAX_ATOMIC_HISTORY as u16));
912 let mut end = index(cmp::min(cnt, MAX_ATOMIC_HISTORY as u16));
913
914 if end == 0 {
915 end = MAX_ATOMIC_HISTORY;
916 }
917
918 assert!(
919 start <= end,
920 "[loom internal bug] cnt = {}; start = {}; end = {}",
921 cnt,
922 start,
923 end
924 );
925
926 (start, end)
927}
928
929fn index(cnt: u16) -> usize {
930 cnt as usize % MAX_ATOMIC_HISTORY
931}