1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

mod clone_fn;
mod factory;
pub mod storage;

mod builtin;
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests;

use std::cmp::Ordering;
use std::hash::Hasher;
use std::ops::Deref;
use std::sync::{self, RwLock};

pub use builtin::{PerCore, PerNuma, PerProcess};
pub use storage::{Storage, Strategy};

use crate::ThreadAware;
use crate::affinity::Affinity;
use crate::cell::factory::Factory;
use crate::closure::{ErasedClosureOnce, ThreadAwareFnOnce, closure_once};

/// Adapter that wraps a `ThreadAwareFnOnce<T>` to produce `Box<T>` instead.
struct BoxedRelocate<F>(F);

impl<F: Clone> Clone for BoxedRelocate<F> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<F: ThreadAware> ThreadAware for BoxedRelocate<F> {
    fn relocate(&mut self, source: Option<Affinity>, destination: Affinity) {
        self.0.relocate(source, destination);
    }
}

impl<T, F: ThreadAwareFnOnce<T>> ThreadAwareFnOnce<Box<T>> for BoxedRelocate<F> {
    fn call_once(self) -> Box<T> {
        Box::new(self.0.call_once())
    }
}

/// Transferable reference counted type.
///
/// This type works like a per-affinity (per-thread) [`sync::Arc`]. Each affinity gets a unique value that is shared by clones
/// of the `Arc`, but the [`ThreadAware`] implementation ensures that when moving to another affinity, the resulting
/// `Arc` will point to the value in the destination affinity. See [`new`](`Arc::new`) for information on constructing instances.
///
/// `ThreadAware` of different clones of the `Arc` result in "deduplication" in the destination affinity. The following
/// example demonstrates this using the counter implemented in the documentation for the [`ThreadAware`] trait.
///
/// ```rust
/// # use thread_aware::{Arc, ThreadAware, PerCore};
/// # use thread_aware::affinity::*;
/// # use std::sync::atomic::{AtomicI32, Ordering};
/// # let affinities = pinned_affinities(&[2]);
/// # let affinity1 = Some(affinities[0]);
/// # let affinity2 = affinities[1];
/// # #[derive(Clone)]
/// # struct Counter {
/// #     value: std::sync::Arc<AtomicI32>,
/// # }
/// #
/// # impl Counter {
/// #     fn new() -> Self {
/// #         Self {
/// #             value: std::sync::Arc::new(AtomicI32::new(0)),
/// #         }
/// #     }
/// #
/// #     fn increment_by(&self, v: i32) {
/// #         self.value.fetch_add(v, Ordering::AcqRel);
/// #     }
/// #
/// #     fn value(&self) -> i32 {
/// #         self.value.load(Ordering::Acquire)
/// #     }
/// # }
/// #
/// # impl ThreadAware for Counter {
/// #     fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {
/// #         // Initialize a new value in the destination affinity independent
/// #         // of the source affinity.
/// #         self.value = std::sync::Arc::new(AtomicI32::new(0));
/// #     }
/// # }
///
/// let mut arc_affinity1 = Arc::<_, PerCore>::new(Counter::new);
/// let arc_affinity1_clone = arc_affinity1.clone();
///
/// arc_affinity1.increment_by(42);
/// assert_eq!(arc_affinity1.value(), 42);
///
/// arc_affinity1.relocate(affinity1, affinity2);
/// assert_eq!(arc_affinity1.value(), 0);
/// assert_eq!(arc_affinity1_clone.value(), 42);
///
/// arc_affinity1.increment_by(11);
/// let mut arc_affinity2_clone = arc_affinity1_clone;
/// arc_affinity2_clone.relocate(affinity1, affinity2);
/// assert_eq!(arc_affinity2_clone.value(), 11);
/// ```
#[derive(Debug)]
pub struct Arc<T: ?Sized, S: Strategy> {
    storage: sync::Arc<RwLock<Storage<sync::Arc<T>, S>>>,
    value: sync::Arc<T>,
    factory: Factory<T>,
}

impl<T: PartialEq, S: Strategy> PartialEq for Arc<T, S> {
    fn eq(&self, other: &Self) -> bool {
        self.value == other.value
    }
}

impl<T: Eq, S: Strategy> Eq for Arc<T, S> {}

impl<T: std::hash::Hash + ?Sized, S: Strategy> std::hash::Hash for Arc<T, S> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.value.hash(state);
    }
}

impl<T: Ord, S: Strategy> Ord for Arc<T, S> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.value.cmp(&other.value)
    }
}

impl<T: PartialOrd, S: Strategy> PartialOrd for Arc<T, S> {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        self.value.partial_cmp(&other.value)
    }
}

impl<T: ?Sized, S: Strategy> Clone for Arc<T, S> {
    fn clone(&self) -> Self {
        Self {
            storage: sync::Arc::clone(&self.storage),
            value: sync::Arc::clone(&self.value),
            factory: self.factory.clone(),
        }
    }
}

impl<T: ?Sized, S: Strategy> Deref for Arc<T, S> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.value
    }
}

impl<T, S> Arc<T, S>
where
    T: Send + 'static,
    S: Strategy,
{
    /// Creates a new `Arc` with the given value and strategy.
    ///
    /// This variant takes a zero-argument constructor function (`fn() -> T`).
    /// The constructor is invoked lazily and independently for each
    /// processor the first time a `PerCore` is materialized on that processor (i.e. on
    /// the first transfer into that processor). This guarantees that every processor obtains its own
    /// freshly created `T` without requiring `T: Clone` or `T: ThreadAware`.
    ///
    /// Requirements:
    /// * `T` must be `Send + 'static` so it can live in the processor storage.
    /// * The provided function must be pure with respect to per-processor isolation (it should not
    ///   leak references into other processors). Any captured state should therefore be provided via
    ///   globally shareable mechanisms or prefer [`new_with`](Self::new_with) if you need to
    ///   capture data that itself implements [`ThreadAware`].
    ///
    /// When transferring to another affinity which doesn't yet contain a value, the constructor is
    /// called in the destination affinity to create a brand new instance.
    ///
    /// For example, the counter type we implemented in the documentation for [`ThreadAware`] trait
    /// can be used with `new` by passing the constructor function (note the absence of `()`):
    ///
    /// ```rust
    /// # use thread_aware::{Arc, ThreadAware, PerCore};
    /// # use thread_aware::affinity::*;
    /// # use std::sync::atomic::{AtomicI32, Ordering};
    /// # use std::sync;
    /// # #[derive(Clone)]
    /// # struct Counter {
    /// #     value: sync::Arc<AtomicI32>,
    /// # }
    /// #
    /// # impl Counter {
    /// #     fn new() -> Self {
    /// #         Self {
    /// #             value: sync::Arc::new(AtomicI32::new(0)),
    /// #         }
    /// #     }
    /// #
    /// #     fn increment_by(&self, v: i32) {
    /// #         self.value.fetch_add(v, Ordering::AcqRel);
    /// #     }
    /// #
    /// #     fn value(&self) -> i32 {
    /// #         self.value.load(Ordering::Acquire)
    /// #     }
    /// # }
    /// # impl ThreadAware for Counter {
    /// #     fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {
    /// #         // Initialize a new value in the destination affinity independent
    /// #         // of the source affinity.
    /// #         self.value = sync::Arc::new(AtomicI32::new(0));
    /// #     }
    /// # }
    ///
    /// let container = Arc::<_, PerCore>::new(Counter::new);
    /// let container_clone = container.clone();
    /// container.increment_by(42);
    /// assert_eq!(container.value(), 42);
    /// assert_eq!(container_clone.value(), 42);
    /// ```
    pub fn new(ctor: fn() -> T) -> Self {
        // We wrap the function pointer in a tiny ThreadAwareFnOnce implementation that
        // recreates the value independently for each affinity.
        struct Ctor<T> {
            f: fn() -> T,
        }

        impl<T> Clone for Ctor<T> {
            fn clone(&self) -> Self {
                Self { f: self.f }
            }
        }

        impl<T> ThreadAware for Ctor<T> {
            fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {}
        }

        impl<T> ThreadAwareFnOnce<Box<T>> for Ctor<T> {
            fn call_once(self) -> Box<T> {
                Box::new((self.f)())
            }
        }

        // Use Self::with_closure_boxed to ensure Factory::Closure path.
        Self::with_closure_boxed(Ctor { f: ctor })
    }
}

impl<T, S> Arc<T, S>
where
    T: Send + 'static + ?Sized,
    S: Strategy,
{
    /// Creates a new `Arc` with a constructor that returns `Box<T>`.
    ///
    /// This is the `?Sized`-compatible version of [`new`](Self::new). Use this when `T` is a
    /// trait object (e.g., `dyn Trait`) or other unsized type. The constructor produces a
    /// `Box<T>` which is then stored behind a [`sync::Arc`].
    ///
    /// ```rust
    /// # use thread_aware::{Arc, ThreadAware, PerCore};
    /// let arc = Arc::<dyn ThreadAware, PerCore>::new_boxed(|| Box::new(42u32));
    /// ```
    pub fn new_boxed(ctor: fn() -> Box<T>) -> Self {
        struct Ctor<T: ?Sized> {
            f: fn() -> Box<T>,
        }

        impl<T: ?Sized> Clone for Ctor<T> {
            fn clone(&self) -> Self {
                Self { f: self.f }
            }
        }

        impl<T: ?Sized> ThreadAware for Ctor<T> {
            fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {}
        }

        impl<T: ?Sized> ThreadAwareFnOnce<Box<T>> for Ctor<T> {
            fn call_once(self) -> Box<T> {
                (self.f)()
            }
        }

        Self::with_closure_boxed(Ctor { f: ctor })
    }
}

impl<T, S> Arc<T, S>
where
    T: 'static,
    S: Strategy,
{
    /// Creates a new `Arc` with a closure that will be called once per-processor to create the inner value.
    ///
    /// The closure only gets called once for each processor, and it's called only when a `Arc` is actually transferred
    /// to another processor. The closure behaves like a `ThreadAwareFnOnce` to ensure it captures only values that are safe to
    /// transfer themselves.
    ///
    /// This function can be used to create an `Arc` of a type that itself doesn't implement [`ThreadAware`] because
    /// we can ensure that each affinity will get its own, independently-initialized value:
    ///
    /// ```rust
    /// # use std::sync::{self, Mutex};
    /// # use thread_aware::{Arc, PerCore};
    /// struct MyStruct {
    ///     inner: sync::Arc<Mutex<i32>>,
    /// }
    ///
    /// impl MyStruct {
    ///     fn new() -> Self {
    ///         Self {
    ///             inner: sync::Arc::new(Mutex::new(0)),
    ///         }
    ///     }
    /// }
    ///
    /// let container = Arc::<_, PerCore>::new_with((), |_| MyStruct::new());
    /// ```
    ///
    /// The constructor can depend on other values that implement [`ThreadAware`] (this example uses the Counter
    /// defined in [`ThreadAware`] documentation):
    ///
    /// ```rust
    /// # use thread_aware::{ThreadAware, Arc, PerCore};
    /// # use thread_aware::affinity::*;
    /// # use std::sync::atomic::{AtomicI32, Ordering};
    /// # use std::sync;
    /// # #[derive(Clone)]
    /// # struct Counter {
    /// #     value: sync::Arc<AtomicI32>,
    /// # }
    /// #
    /// # impl Counter {
    /// #     fn new() -> Self {
    /// #         Self {
    /// #             value: sync::Arc::new(AtomicI32::new(0)),
    /// #         }
    /// #     }
    /// #
    /// #     fn increment_by(&self, v: i32) {
    /// #         self.value.fetch_add(v, Ordering::AcqRel);
    /// #     }
    /// #
    /// #     fn value(&self) -> i32 {
    /// #         self.value.load(Ordering::Acquire)
    /// #     }
    /// # }
    /// #
    /// # impl ThreadAware for Counter {
    /// #     fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {
    /// #         // Initialize a new value in the destination affinity independent
    /// #         // of the source affinity.
    /// #         self.value = sync::Arc::new(AtomicI32::new(0));
    /// #     }
    /// # }
    ///
    /// struct MyStruct;
    ///
    /// impl MyStruct {
    ///     fn new(value: i32) -> Self {
    ///         Self
    ///     }
    /// }
    ///
    /// let counter = Counter::new();
    /// let container = Arc::<_, PerCore>::new_with(counter, |counter| MyStruct::new(counter.value()));
    /// ```
    pub fn new_with<D>(data: D, f: fn(D) -> T) -> Self
    where
        D: ThreadAware + Send + Sync + Clone + 'static,
    {
        Self::with_closure_boxed(BoxedRelocate(closure_once(data, f)))
    }
}

impl<T, S: Strategy> Arc<T, S>
where
    T: ThreadAware + Clone + 'static + Send,
{
    /// Creates a new `Arc` with the given value.
    ///
    /// The value must implement [`ThreadAware`] and [`Clone`]. When transferring to another affinity
    /// which doesn't yet contain a value, a new value is created by cloning the value in current
    /// affinity and transferring it to the new affinity.
    ///
    /// For example, the counter type we implemented in the documentation for the [`ThreadAware`] trait
    /// can be used with new.
    #[cfg(test)]
    pub(crate) fn with_value(value: T) -> Self {
        let value = sync::Arc::new(value);

        Self {
            storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
            value,
            factory: Factory::Data(|data: &T, source, destination| {
                let mut data = data.clone();
                data.relocate(source, destination);
                Box::new(data)
            }),
        }
    }
}

impl<T, S: Strategy> Arc<T, S>
where
    T: Clone + 'static + Send,
{
    /// Creates a new `Arc` with the given value.
    ///
    /// The value must implement [`Clone`]. When transferring to another affinity
    /// which doesn't yet contain a value, a new value is created by cloning the value in current
    /// affinity and transferring it to the new affinity.
    ///
    /// This is useful for types that do not implement [`ThreadAware`]. In such cases, the same value
    /// is cloned for each affinity without any relocation logic.
    ///
    /// For example, the counter type we implemented in the documentation for [`ThreadAware`] trait
    /// can be used with new:
    ///
    /// ```rust
    /// # use thread_aware::{Arc, PerCore};
    /// # use std::sync::atomic::{AtomicI32, Ordering};
    /// # use std::sync;
    /// # #[derive(Clone)]
    /// # struct Counter {
    /// #     value: sync::Arc<AtomicI32>,
    /// # }
    /// #
    /// # impl Counter {
    /// #     fn new() -> Self {
    /// #         Self {
    /// #             value: sync::Arc::new(AtomicI32::new(0)),
    /// #         }
    /// #     }
    /// #
    /// #     fn increment_by(&self, v: i32) {
    /// #         self.value.fetch_add(v, Ordering::AcqRel);
    /// #     }
    /// #
    /// #     fn value(&self) -> i32 {
    /// #         self.value.load(Ordering::Acquire)
    /// #     }
    /// # }
    ///
    /// let arc = Arc::<_, PerCore>::new(Counter::new);
    /// let arc_clone = arc.clone();
    /// arc.increment_by(42);
    /// assert_eq!(arc.value(), 42);
    /// assert_eq!(arc_clone.value(), 42);
    /// ```
    pub fn from_unaware(value: T) -> Self {
        let value = sync::Arc::new(value);

        Self {
            storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
            value,
            factory: Factory::Data(|data: &T, _source, _destination| Box::new(data.clone())),
        }
    }
}

impl<T, S: Strategy> Arc<T, S>
where
    T: ThreadAware + 'static + ?Sized,
{
    /// Creates a new `Arc` from a value and a clone function, supporting trait objects.
    ///
    /// The object passed will be kept, and serves as the template for all subsequent clones
    /// that happen on relocation. A clone is also performed for the initial `Arc`.
    ///
    /// The `clone_fn` receives `&V` (the concrete type) and returns `Box<T>`, enabling
    /// use with `dyn Trait` where `Clone` is not object-safe. Each clone is
    /// [`relocate`](ThreadAware::relocate) to its target affinity.
    ///
    /// ```rust
    /// # use thread_aware::{Arc, PerCore, ThreadAware};
    /// # #[derive(Clone)]
    /// # struct Foo(u32);
    /// # impl ThreadAware for Foo {
    /// #     fn relocate(&mut self, _: Option<thread_aware::affinity::Affinity>, _: thread_aware::affinity::Affinity) {}
    /// # }
    /// trait MyPlugin: ThreadAware {}
    /// impl MyPlugin for Foo {}
    ///
    /// let arc = Arc::<dyn MyPlugin, PerCore>::with_clone_fn(Foo(42), |v: &Foo| Box::new(v.clone()));
    /// ```
    pub fn with_clone_fn<V: Send + Sync + 'static>(value: V, clone_fn: fn(&V) -> Box<T>) -> Self {
        // In a canonical case, we might have `V = u32`, `T = dyn Foo`, and `clone_fn = |&u32| -> Box<dyn Foo>`.
        let erased = clone_fn::ErasedCloneFn::new(value, clone_fn);
        let value = sync::Arc::clone(erased.arc());

        Self {
            storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
            value,
            factory: Factory::ErasedCloneFn(erased),
        }
    }
}

impl<T, S: Strategy> Arc<T, S>
where
    T: 'static + ?Sized,
{
    /// Creates a new `Arc` with a closure that produces `Box<T>`, called once per-affinity to create the inner value.
    ///
    /// The closure only gets called once for each affinity, and it's called only when an `Arc` is actually transferred
    /// to another affinity. The closure is a [`ThreadAwareFnOnce`] to ensure it captures only values that are safe to
    /// transfer themselves.
    pub(crate) fn with_closure_boxed<F>(closure: F) -> Self
    where
        F: ThreadAwareFnOnce<Box<T>> + Clone + ThreadAware + 'static + Send + Sync,
    {
        let value = sync::Arc::from(closure.clone().call_once());

        Self {
            storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
            value,
            factory: Factory::Closure(sync::Arc::new(ErasedClosureOnce::new(closure)), None),
        }
    }

    /// Creates a new `Arc` from the given storage and the current affinity.
    ///
    /// If the resulting `Arc` is transferred to an affinity which does not have data in the storage,
    /// it will behave like a [`sync::Arc`].
    ///
    /// # Panics
    /// This may panic if the storage does not contain data for the current affinity.
    pub fn from_storage(storage: sync::Arc<RwLock<Storage<sync::Arc<T>, S>>>, current_affinity: Affinity) -> Self {
        let value = storage
            .read()
            .expect("Failed to acquire read lock")
            .get_clone(current_affinity)
            .expect("No data found for the current affinity");

        Self {
            storage,
            value,
            factory: Factory::Manual,
        }
    }
}

impl<T, S: Strategy> Arc<T, S> {
    /// Gets the number of strong references to the value in the current thread/affinity.
    ///
    /// This method returns the strong reference count for the underlying [`sync::Arc`]
    /// that holds the value for the current affinity, excluding any internal references
    /// held by the storage for deduplication purposes. Each affinity maintains its own
    /// separate value with its own reference count.
    ///
    /// # Examples
    ///
    /// ```
    /// use thread_aware::{Arc, PerCore};
    ///
    /// let arc = Arc::<_, PerCore>::new(|| 42);
    /// assert_eq!(Arc::strong_count(&arc), 1);
    ///
    /// let arc2 = arc.clone();
    /// assert_eq!(Arc::strong_count(&arc), 2);
    /// assert_eq!(Arc::strong_count(&arc2), 2);
    /// ```
    #[must_use]
    #[expect(clippy::missing_panics_doc, reason = "this code only panics when the lock is poisoned")]
    pub fn strong_count(this: &Self) -> usize {
        let raw = sync::Arc::strong_count(&this.value);
        let guard = this.storage.read().expect("Failed to acquire read lock");
        let internal = guard.count_where(|stored| sync::Arc::ptr_eq(stored, &this.value));
        raw - internal
    }

    /// Converts the `Arc<T, S>` into an `sync::Arc<T>`.
    #[must_use]
    pub fn into_arc(self) -> sync::Arc<T> {
        self.value
    }
}

impl<T: Send + Sync + ?Sized, S: Strategy + Send + Sync> ThreadAware for Arc<T, S> {
    fn relocate(&mut self, source: Option<Affinity>, destination: Affinity) {
        let mut guard = self.storage.write().expect("Failed to acquire write lock");

        if let Some(value) = guard.get_clone(destination) {
            self.value = value;
        } else {
            // We need to transfer or recreate the data
            let (data, new_factory) = match &self.factory {
                // We can use the closure to create new data
                Factory::Closure(factory, factory_source_affinity) => {
                    let mut factory_clone = (**factory).clone();

                    // In case factory source is stored in factory, use that - it means we already transferred the factory
                    // once, so we know the original source affinity. Otherwise, use source as that means this is the first
                    // time we're transferring the Arc, so source is the source affinity of the factory as well.
                    let factory_source = factory_source_affinity.or(source);

                    factory_clone.relocate(factory_source, destination);
                    (
                        sync::Arc::from(factory_clone.call_once()),
                        Factory::Closure(sync::Arc::clone(factory), factory_source),
                    )
                }

                // We can clone and transfer the data
                Factory::Data(factory) => (sync::Arc::from(factory(&self.value, source, destination)), self.factory.clone()),

                // We can clone the data and closure it
                Factory::ErasedCloneFn(erased) => {
                    let cloned = erased.clone_and_relocate(source, destination);
                    (cloned, self.factory.clone())
                }

                Factory::Manual => {
                    // If we are in manual mode, we just clone the data
                    // This effectively makes it behave like `sync::Arc<T>`
                    (sync::Arc::clone(&self.value), self.factory.clone())
                }
            };

            let old_value = std::mem::replace(&mut self.value, data);

            let old_data = guard.replace(destination, sync::Arc::<T>::clone(&self.value));
            assert!(
                old_data.is_none(),
                "Data already exists for the destination affinity. This should be unreachable due to the the early write lock."
            );

            if let Some(source) = source {
                // Only restore the value to the source slot when source and destination differ.
                // If they are the same slot, the replacement above already stored the new value
                // there; overwriting it here would corrupt storage with the stale pre-relocation value.
                if source != destination {
                    guard.replace(source, old_value);
                }
            }

            self.factory = new_factory;
        }

        drop(guard);
    }
}