region_cached 1.0.16

Adds a logical layer of caching between processor L3 cache and main memory
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
use std::any::type_name;
use std::fmt;
use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, OnceLock};

use arc_swap::{ArcSwap, ArcSwapOption};
use many_cpus::{MemoryRegionId, SystemHardware};
use rsevents::{Awaitable, EventState, ManualResetEvent};

/// Provides access to an instance of `T` that is locally cached in the current memory region.
///
/// Refer to [crate-level documentation][crate] for more information.
#[derive(Debug)]
#[linked::object]
pub struct RegionCached<T>
where
    T: Clone + Send + Sync + 'static,
{
    // If the current thread is pinned to a memory region, we just reference the regional state.
    // Otherwise, we have to look up the regional state from the global state on every access
    // because we do not know what region we are in (it might change for every call).
    // If this is `None`, we are in a mode where we need to perform the lookup every time.
    regional_state: Option<Arc<RegionalState<T>>>,

    global_state: Arc<GlobalState<T>>,

    hardware: SystemHardware,
}

impl<T> RegionCached<T>
where
    T: Clone + Send + Sync + 'static,
{
    /// Creates a new instance of `RegionCached` with the given initial value.
    ///
    /// The instance of `RegionCached` may be cloned and shared between threads using mechanisms
    /// of the [linked object pattern][3]. Every instance from the same family of objects
    /// will reference the same region-cached value.
    ///
    /// This type is internally used by the [`region_cached!`][1] macro but can also be used
    /// independently of that macro, typically via a [`PerThread`][2] wrapper that automatically
    /// manages the per-thread instance lifecycle and delivery across threads.
    ///
    /// [1]: crate::region_cached
    /// [2]: linked::InstancePerThread
    /// [3]: linked
    #[must_use]
    pub fn new(initial_value: T) -> Self {
        Self::with_hardware(initial_value, SystemHardware::current().clone())
    }

    /// Creates a new instance of `RegionCached` using the given [`SystemHardware`] to determine
    /// hardware topology. This allows injecting a fake hardware configuration (created via
    /// `SystemHardware::fake()`) for testing.
    #[must_use]
    pub fn with_hardware(initial_value: T, hardware: SystemHardware) -> Self {
        let memory_region_count = hardware.max_memory_region_count();

        let global_state = Arc::new(GlobalState::new(initial_value, memory_region_count));

        linked::new!(Self {
            regional_state: Self::try_locate_regional_state(&global_state, &hardware),
            global_state: Arc::clone(&global_state),
            hardware: hardware.clone(),
        })
    }

    fn try_locate_regional_state(
        global_state: &Arc<GlobalState<T>>,
        hardware: &SystemHardware,
    ) -> Option<Arc<RegionalState<T>>> {
        if !hardware.is_thread_memory_region_pinned() {
            return None;
        }

        // This thread is pinned to a specific memory region, so we can directly access the
        // state of that region and skip the regional state lookup on every access.
        let memory_region_id = hardware.current_memory_region_id();
        Some(global_state.with_regional_state(memory_region_id, Arc::clone))
    }

    /// Returns whether this instance has a cached regional state, meaning access will use the
    /// fast path instead of looking up the memory region on every access.
    #[cfg(test)]
    fn has_regional_state(&self) -> bool {
        self.regional_state.is_some()
    }

    /// Executes the provided function with a reference to the cached value
    /// in the current memory region.
    ///
    /// # Example
    ///
    /// ```
    /// use linked::InstancePerThread;
    /// use region_cached::RegionCached;
    ///
    /// let favorite_color_global = InstancePerThread::new(RegionCached::new("blue".to_string()));
    ///
    /// // This localizes the object to the current thread. Reuse this object when possible.
    /// let favorite_color = favorite_color_global.acquire();
    ///
    /// let len = favorite_color.with_cached(|color| color.len());
    /// assert_eq!(len, 4);
    /// ```
    pub fn with_cached<F, R>(&self, f: F) -> R
    where
        F: FnOnce(&T) -> R,
    {
        // If we are in a fixed memory region, we can just use the value directly.
        if let Some(value) = self.regional_state.as_ref() {
            return self.with_in_region(value, f);
        }

        // Otherwise, we need to identify our memory region look up the region-specific
        // value from the global state. This is the slow path - pin your threads for max happiness.

        // We fix the memory region ID at this point. It may be that the thread migrates to a
        // different memory region during the rest of this function - we do not care about that.
        let memory_region_id = self.hardware.current_memory_region_id();

        self.global_state
            .with_regional_state(memory_region_id, |regional_state| {
                self.with_in_region(regional_state, f)
            })
    }

    #[cfg_attr(test, mutants::skip)] // Mutation easily makes this into infinite loop.
    fn with_in_region<F, R>(&self, regional_state: &RegionalState<T>, mut f: F) -> R
    where
        F: FnOnce(&T) -> R,
    {
        loop {
            // If the read fails, we get our `f` callback returned back to us.
            match regional_state.try_with_value(f) {
                Ok(result) => return result,
                Err(callback) => f = callback,
            }

            loop {
                // If we got here then the regional state is not initialized. Let us initialize it.
                // We now need a value to initialize the region state with. The latest written value
                // (for our weakly consistent definition of "latest") is stored in the global state.
                // Note that other threads in the region may also be racing to initialize. While
                // there is mutual exclusion built in, it remains up to us here to detect ordering
                // issues and reinitialize if an outdated value was set.
                let initial_value = self.global_state.latest_value.load();

                let expected_generation = initial_value.generation;
                let actual_generation = regional_state.initialize(&initial_value);

                // The commit will fail if the generation of the value we set does not match
                // the generation of the value that was initialized. We do not know which one
                // is the correct one, so we just retry until we get a match.
                if expected_generation == actual_generation {
                    // We are done - the universe did not change during initialization.
                    break;
                }

                // Retry initialization. It could be that our expected value was wrong, in which
                // case we perform some wasted cloning but avoid violating causality.
                self.global_state.invalidate_regions();
            }
        }
    }

    /// Publishes a new value to all threads in all memory regions.
    ///
    /// The update will be applied to all memory regions in a [weakly consistent manner][1].
    ///
    /// # Example
    ///
    /// ```
    /// use linked::InstancePerThread;
    /// use region_cached::RegionCached;
    ///
    /// let favorite_color_global = InstancePerThread::new(RegionCached::new("blue".to_string()));
    ///
    /// // This localizes the object to the current thread. Reuse this object when possible.
    /// let favorite_color = favorite_color_global.acquire();
    ///
    /// favorite_color.set_global("red".to_string());
    /// ```
    ///
    /// Updating the value is [weakly consistent][1]. Do not expect the update to be
    /// immediately visible. Even on the same thread, it is only guaranteed to be
    /// immediately visible if the thread is pinned to a specific memory region.
    ///
    /// ```
    /// use std::num::NonZero;
    ///
    /// use linked::InstancePerThread;
    /// use many_cpus::SystemHardware;
    /// use region_cached::RegionCached;
    ///
    /// let favorite_color_global = InstancePerThread::new(RegionCached::new("blue".to_string()));
    ///
    /// // We can use this to pin a thread to a specific processor, to demonstrate a
    /// // situation where you can rely on consistency guarantees for immediate visibility.
    /// let one_processor = SystemHardware::current()
    ///     .processors()
    ///     .to_builder()
    ///     .take(NonZero::new(1).unwrap())
    ///     .unwrap();
    ///
    /// one_processor
    ///     .spawn_thread(move |processor_set| {
    ///         let processor = processor_set.processors().first();
    ///         println!(
    ///             "Thread pinned to processor {} in memory region {}",
    ///             processor.id(),
    ///             processor.memory_region_id()
    ///         );
    ///
    ///         // This localizes the object to the current thread. Reuse this object when possible.
    ///         let favorite_color = favorite_color_global.acquire();
    ///
    ///         favorite_color.set_global("red".to_string());
    ///
    ///         // This thread is pinned to a specific processor, so it is guaranteed to stay
    ///         // within the same memory region (== on the same physical hardware). This means
    ///         // that an update to a region-cached value is immediately visible.
    ///         let color = favorite_color.with_cached(|color| color.clone());
    ///         assert_eq!(color, "red");
    ///     })
    ///     .join()
    ///     .unwrap();
    /// ```
    ///
    /// [1]: crate#consistency-guarantees
    pub fn set_global(&self, value: T) {
        // Numeric value is irrelevant, all that matters is the uniqueness.
        let generation = self
            .global_state
            .next_generation
            .fetch_add(1, atomic::Ordering::Relaxed);

        // The first thing we do is update the latest value in the global state. This ensures that
        // any new regional states that get initialized will get our latest updated value.
        self.global_state
            .latest_value
            .store(Arc::new(GenerationValue { generation, value }));

        // Now all we need to do is invalidate the current value in all regions.
        // Each region will reinitialize itself automatically on next access.
        self.global_state.invalidate_regions();
    }
}

impl<T> RegionCached<T>
where
    T: Clone + Copy + Send + Sync + 'static,
{
    /// Gets a copy of the cached value in the current memory region.
    ///
    /// # Example
    ///
    /// ```
    /// use linked::InstancePerThread;
    /// use region_cached::RegionCached;
    ///
    /// let current_access_token_global = InstancePerThread::new(RegionCached::new(0x123100));
    ///
    /// // This localizes the object to the current thread. Reuse this object when possible.
    /// let current_access_token = current_access_token_global.acquire();
    ///
    /// let token = current_access_token.get_cached();
    /// assert_eq!(token, 0x123100);
    /// ```
    #[must_use]
    #[inline]
    pub fn get_cached(&self) -> T {
        self.with_cached(|v| *v)
    }
}

#[derive(Clone)]
struct GenerationValue<T> {
    generation: u64,
    value: T,
}

#[cfg_attr(coverage_nightly, coverage(off))] // No API contract to test.
impl<T> fmt::Debug for GenerationValue<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct(type_name::<Self>())
            .field("generation", &self.generation)
            .field("value", &format_args!("<{}>", type_name::<T>()))
            .finish()
    }
}

struct GlobalState<T>
where
    T: Clone + Send + Sync + 'static,
{
    /// The latest value written into the region-cached variable from any thread. This is only used
    /// to create regional clones for local caching and is never read directly in any hot path.
    latest_value: ArcSwap<GenerationValue<T>>,

    /// The generation to assign to the next value written into the global state.
    /// This value is used to identity outdated caches.
    next_generation: AtomicU64,

    // We cannot avoid the array itself being cross-region accessed but the RegionalState items
    // inside are at least initialized lazily and on the correct region, so we can ensure that
    // they are allocated in that memory region (assuming the allocator cooperates).
    //
    // Accessing this can be skipped for threads that are pinned in one specific memory region,
    // as they then have direct access to the `Arc<RegionalState>`, which is the fastest path.
    regional_states: Box<[OnceLock<Arc<RegionalState<T>>>]>,
}

#[cfg_attr(coverage_nightly, coverage(off))] // No API contract to test.
impl<T> fmt::Debug for GlobalState<T>
where
    T: Clone + Send + Sync + 'static,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct(type_name::<Self>())
            .field(
                "latest_value",
                &format_args!("<GenerationValue<{}>>", type_name::<T>()),
            )
            .field("next_generation", &self.next_generation)
            .field(
                "regional_states",
                &format_args!("<{} regional states>", self.regional_states.len()),
            )
            .finish()
    }
}

impl<T> GlobalState<T>
where
    T: Clone + Send + Sync + 'static,
{
    fn new(initial_value: T, memory_region_count: usize) -> Self {
        let mut regional_states = Vec::with_capacity(memory_region_count);

        for _ in 0..memory_region_count {
            regional_states.push(OnceLock::new());
        }

        let initial_value = ArcSwap::from_pointee(GenerationValue {
            generation: 0,
            value: initial_value,
        });

        Self {
            latest_value: initial_value,
            regional_states: regional_states.into_boxed_slice(),
            next_generation: AtomicU64::new(1),
        }
    }

    /// Executes a function on the regional state for the given memory region.
    fn with_regional_state<F, R>(&self, memory_region_id: MemoryRegionId, f: F) -> R
    where
        F: FnOnce(&Arc<RegionalState<T>>) -> R,
    {
        let slot = &self.regional_states.get(memory_region_id as usize).expect(
            "memory region ID was out of bounds - the platform lied about how many there are",
        );

        // The entire purpose of that OnceLock is to ensure this Arc::new() happens
        // when the current thread is executing in the correct memory region, to place
        // the regional state of every region in that specific region.
        let regional_state = slot.get_or_init(|| Arc::new(RegionalState::new()));

        f(regional_state)
    }

    fn invalidate_regions(&self) {
        for slot in &self.regional_states {
            // If it is already `None`, it will already get initialized on the next access.
            // It might already be in the process of being initialized by another thread, which
            // is fine - once initialized, it will by default be in the invalidated state.
            if let Some(state) = slot.get() {
                state.clear();
            }
        }
    }
}

#[derive(Debug)]
struct RegionalState<T>
where
    T: Clone + Send + Sync + 'static,
{
    /// The value shared (via `Arc`) between all threads in the same memory region.
    ///
    /// This is `None` if the value for this region has not been initialized yet.
    /// It will be initialized on first access from this region.
    ///
    /// We use `ArcSwap` here because it offers very good multithreaded read performance.
    /// In single-threaded and write-heavy scenarios, `RwLock` is faster but those
    /// are not the scenarios we target - we expect the data to be cached for long
    /// periods and read from many threads, with writes happening not so often.
    value: ArcSwapOption<RegionalValue<T>>,
}

impl<T> RegionalState<T>
where
    T: Clone + Send + Sync + 'static,
{
    fn new() -> Self {
        Self {
            value: ArcSwapOption::const_empty(),
        }
    }

    /// Attempts to execute a function on the value stored in this regional state.
    ///
    /// Returns back the unused function via `Err` if the value in this regional state is not yet
    /// initialized. In such a case, you should call `initialize()` and try again.
    fn try_with_value<F, R>(&self, f: F) -> Result<R, F>
    where
        F: FnOnce(&T) -> R,
    {
        let reader = self.value.load();

        if let Some(ref value) = *reader
            && let RegionalValue::Ready(GenerationValue { value, .. }) = &**value
        {
            return Ok(f(value));
        }

        Err(f)
    }

    /// Initializes the value in this regional state (potentially accepting a value from another
    /// thread already doing the same).
    ///
    /// Returns the generation of the value that was set. This is not necessarily the same as the
    /// input value, if we accept initialization from another thread. It is the responsibility of
    /// the caller to decide whether that is acceptable or not (in which case it can reset).
    // Skip mutating - would lead to infinite loop as it looks just like another thread
    // constantly resetting the value, so the conflict resolver will never finish.
    #[cfg_attr(test, mutants::skip)]
    fn initialize(&self, value: &GenerationValue<T>) -> u64 {
        // This is a conditional swap - we only initialize if we can swap in our "initializing"
        // value onto a clean slate. If someone else got there first, we line up behind them
        // and wait for them to finish before we do anything.

        loop {
            let reader = self.value.load();

            if let Some(ref value) = *reader {
                // Something is already happening.

                match &**value {
                    RegionalValue::Initializing(manual_reset_event) => {
                        manual_reset_event.wait();
                        // Initialization by someone else has completed.
                        // Loop back and try to read again to see what we got.
                        continue;
                    }
                    RegionalValue::Ready(GenerationValue { generation, .. }) => {
                        return *generation;
                    }
                }
            }

            // Nothing is happening. We may be the first to start initializing.
            let attempt_signal = Arc::new(ManualResetEvent::new(EventState::Unset));
            let attempt = RegionalValue::<T>::Initializing(Arc::clone(&attempt_signal));

            let previous_value = self.value.compare_and_swap(reader, Some(Arc::new(attempt)));

            if !previous_value.is_none() {
                // Someone raced ahead of us. Re-enter loop.
                continue;
            }

            // We must ensure that if cloning panics, we reset the state
            // and signal any waiting threads to prevent them from waiting forever.
            let cleanup_signal = Arc::clone(&attempt_signal);
            let cleanup_self = self; // Create a reference for the cleanup
            let cleanup_guard = scopeguard::guard((), move |()| {
                // If we are still in panic mode when this guard executes, reset the
                // initializing state to None and signal waiters so they can retry.
                cleanup_self.value.store(None);
                cleanup_signal.set();
            });

            let new_value = RegionalValue::Ready(value.clone());

            // It is possible that another thread has assigned a new global value
            // while we are doing this, so our `value` is out of date already. We
            // detect this in the caller by checking (after initialization) whether
            // the value that was set is of the expected generation. If not, everything
            // starts all over again for the current thread and it tries to re-initialize.

            self.value.store(Some(Arc::new(new_value)));

            // We are done initializing. Notify all waiters that they can continue.
            attempt_signal.set();

            // Disarm the cleanup guard since initialization succeeded.
            scopeguard::ScopeGuard::into_inner(cleanup_guard);

            return value.generation;
        }
    }

    fn clear(&self) {
        self.value.store(None);
    }
}

#[derive(derive_more::Debug)]
enum RegionalValue<T> {
    /// One thread has declared that it has started initializing the value.
    ///
    /// It is possible that multiple threads to attempt to start initializing (i.e. to set this
    /// state). To avoid double initialization, the second caller must perform a conditional swap
    /// and become a waiter if it sees someone else got there first.
    ///
    /// It is possible that the second initializer who "lined up" behind the first one actually has
    /// a newer value to set. In this case, it will need to restart initialization once it finishes
    /// waiting for the first one to complete the initial initialization.
    Initializing(#[debug(ignore)] Arc<ManualResetEvent>),

    /// The value has been initialized and is ready for use.
    Ready(GenerationValue<T>),
}

#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
    use std::panic::{self, AssertUnwindSafe, RefUnwindSafe, UnwindSafe};
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::sync::{Arc, Barrier, mpsc};
    use std::{ptr, thread};

    use many_cpus::fake::{HardwareBuilder, ProcessorBuilder};
    use static_assertions::assert_impl_all;
    use testing::{assert_panics, with_watchdog};

    use super::*;
    use crate::{RegionCachedCopyExt, RegionCachedExt, region_cached};

    assert_impl_all!(RegionCached<String>: UnwindSafe, RefUnwindSafe);

    /// Creates a fake `SystemHardware` with 3 processors in 3 different memory regions
    /// (regions 0, 1, and 9) plus filler processors to ensure 10 total regions exist.
    fn fake_hardware_3_regions() -> SystemHardware {
        SystemHardware::fake(
            HardwareBuilder::new()
                .processor(ProcessorBuilder::new().id(0).memory_region(0))
                .processor(ProcessorBuilder::new().id(1).memory_region(1))
                .processor(ProcessorBuilder::new().id(9).memory_region(9)),
        )
    }

    /// Pins the current thread to the processor with the given ID in the given hardware,
    /// placing it in the associated memory region.
    fn pin_to_processor(hardware: &SystemHardware, processor_id: u32) {
        hardware
            .all_processors()
            .filter(|p| p.id() == processor_id)
            .unwrap()
            .pin_current_thread_to();
    }

    #[cfg_attr(miri, ignore)] // Miri does not support talking to the real platform.
    #[test]
    fn real_smoke_test() {
        region_cached! {
            static FAVORITE_COLOR: String = "blue".to_string();
            static FAVORITE_NUMBER: i32 = 42;
        }

        FAVORITE_COLOR.with_cached(|color| {
            assert_eq!(*color, "blue");
        });

        FAVORITE_COLOR.set_global("red".to_string());

        FAVORITE_COLOR.with_cached(|color| {
            assert_eq!(*color, "red");
        });

        assert_eq!(FAVORITE_NUMBER.get_cached(), 42);
    }

    #[cfg_attr(miri, ignore)] // Miri does not support talking to the real platform.
    #[test]
    fn with_non_const_initial_value() {
        region_cached!(static FAVORITE_COLOR: Arc<String> = Arc::new("blue".to_string()));

        FAVORITE_COLOR.with_cached(|color| {
            assert_eq!(**color, "blue");
        });
    }

    #[cfg_attr(miri, ignore)] // Miri does not support talking to the real platform.
    #[test]
    fn non_static() {
        let favorite_color_linked =
            linked::InstancePerThread::new(RegionCached::new("blue".to_string()));

        let favorite_color = favorite_color_linked.acquire();

        favorite_color.with_cached(|color| {
            assert_eq!(*color, "blue");
        });

        thread::spawn(move || {
            let favorite_color = favorite_color_linked.acquire();

            favorite_color.with_cached(|color| {
                assert_eq!(*color, "blue");
            });

            favorite_color.set_global("red".to_string());

            favorite_color.with_cached(|color| {
                assert_eq!(*color, "red");
            });
        })
        .join()
        .unwrap();

        favorite_color.with_cached(|color| {
            assert_eq!(*color, "red");
        });
    }

    #[cfg_attr(miri, ignore)] // Miri does not support talking to the real platform.
    #[test]
    fn non_static_sync() {
        let favorite_color_linked =
            linked::InstancePerThreadSync::new(RegionCached::new("blue".to_string()));

        let favorite_color = favorite_color_linked.acquire();

        favorite_color.with_cached(|color| {
            assert_eq!(*color, "blue");
        });

        thread::spawn(move || {
            let favorite_color = favorite_color_linked.acquire();

            favorite_color.with_cached(|color| {
                assert_eq!(*color, "blue");
            });

            favorite_color.set_global("red".to_string());

            favorite_color.with_cached(|color| {
                assert_eq!(*color, "red");
            });
        })
        .join()
        .unwrap();

        favorite_color.with_cached(|color| {
            assert_eq!(*color, "red");
        });
    }

    #[test]
    fn different_regions_have_different_clones() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware("foo".to_string(), hardware.clone());

            pin_to_processor(&hardware, 0);
            let value1 = local.with_cached(ptr::from_ref);

            pin_to_processor(&hardware, 1);
            let value2 = local.with_cached(ptr::from_ref);

            pin_to_processor(&hardware, 9);
            let value3 = local.with_cached(ptr::from_ref);

            assert_ne!(value1, value2);
            assert_ne!(value1, value3);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn initial_value_propagates_to_all_regions() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware(42, hardware.clone());

            pin_to_processor(&hardware, 0);
            assert_eq!(local.get_cached(), 42);

            pin_to_processor(&hardware, 1);
            assert_eq!(local.get_cached(), 42);

            pin_to_processor(&hardware, 9);
            assert_eq!(local.get_cached(), 42);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn update_propagates_to_all_regions() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware(42, hardware.clone());

            pin_to_processor(&hardware, 0);
            assert_eq!(local.get_cached(), 42);
            local.set_global(43);

            pin_to_processor(&hardware, 0);
            assert_eq!(local.get_cached(), 43);

            pin_to_processor(&hardware, 1);
            assert_eq!(local.get_cached(), 43);

            pin_to_processor(&hardware, 9);
            assert_eq!(local.get_cached(), 43);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn immediate_set_propagates_to_all_regions() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware(42, hardware.clone());

            local.set_global(43);

            pin_to_processor(&hardware, 0);
            assert_eq!(local.get_cached(), 43);

            pin_to_processor(&hardware, 1);
            assert_eq!(local.get_cached(), 43);

            pin_to_processor(&hardware, 9);
            assert_eq!(local.get_cached(), 43);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn pinned_thread_has_direct_access_to_regional_state() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            // Pin before constructing so the constructor sees the thread as pinned.
            pin_to_processor(&hardware, 0);

            let local = RegionCached::with_hardware(42, hardware);

            // The constructor must have detected the pinned thread and cached the regional state.
            assert!(local.has_regional_state());

            // Regional state is cached because the thread is pinned. Multiple reads
            // should all succeed without needing to re-resolve the memory region.
            assert_eq!(local.get_cached(), 42);
            assert_eq!(local.get_cached(), 42);
            assert_eq!(local.get_cached(), 42);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn callback_panic_leaves_consistent_state() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware(42, hardware.clone());

            pin_to_processor(&hardware, 0);

            // First call succeeds and initializes the region.
            assert_eq!(local.get_cached(), 42);

            // Second call with panicking closure.
            assert_panics(|| {
                local.with_cached(|_| {
                    panic!("User callback panicked!");
                })
            });

            // Third call should still work and return the cached value.
            assert_eq!(local.get_cached(), 42);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn callback_panic_during_initialization() {
        let hardware = fake_hardware_3_regions();

        thread::spawn(move || {
            let local = RegionCached::with_hardware(42, hardware.clone());

            pin_to_processor(&hardware, 0);

            // First call with panicking closure should fail but not break state.
            assert_panics(|| {
                local.with_cached(|_| {
                    panic!("User callback panicked during initialization!");
                })
            });

            // Second call should successfully initialize and work.
            assert_eq!(local.get_cached(), 42);
        })
        .join()
        .unwrap();
    }

    #[test]
    fn clone_panic_does_not_block_other_threads() {
        with_watchdog(|| {
            // Create a custom type that panics on clone for the first attempt.
            #[derive(Debug)]
            struct PanickingClone {
                value: i32,
                panic_counter: Arc<AtomicUsize>,
            }

            impl Clone for PanickingClone {
                fn clone(&self) -> Self {
                    let count = self.panic_counter.fetch_add(1, Ordering::SeqCst);
                    assert!(count != 0, "Clone panicked!");
                    Self {
                        value: self.value,
                        panic_counter: Arc::clone(&self.panic_counter),
                    }
                }
            }

            let panic_counter = Arc::new(AtomicUsize::new(0));
            let panicking_value = PanickingClone {
                value: 42,
                panic_counter: Arc::clone(&panic_counter),
            };

            let hardware = SystemHardware::fake(
                HardwareBuilder::new().processor(ProcessorBuilder::new().id(0).memory_region(0)),
            );

            let local = RegionCached::with_hardware(panicking_value, hardware.clone());

            let barrier = Arc::new(Barrier::new(2));
            let local = Arc::new(local);

            let barrier1 = Arc::clone(&barrier);
            let local1 = Arc::clone(&local);
            let hardware1 = hardware.clone();
            let (tx, rx) = mpsc::channel::<()>();
            let handle1 = thread::spawn(move || {
                pin_to_processor(&hardware1, 0);
                barrier1.wait();
                // This thread will trigger the panicking clone.
                let result =
                    panic::catch_unwind(AssertUnwindSafe(|| local1.with_cached(|v| v.value)));
                // Signal that initialization attempt is complete.
                tx.send(()).unwrap();
                result
            });

            let barrier2 = Arc::clone(&barrier);
            let local2 = Arc::clone(&local);
            let handle2 = thread::spawn(move || {
                pin_to_processor(&hardware, 0);
                barrier2.wait();
                // Wait for thread 1 to finish its initialization attempt.
                rx.recv().unwrap();
                local2.with_cached(|v| v.value)
            });

            let result1 = handle1.join().unwrap();
            let result2 = handle2.join().unwrap();

            // First thread should have caught the panic.
            result1.unwrap_err();
            // Second thread should have succeeded.
            assert_eq!(result2, 42);
        });
    }
}