mod clone_fn;
mod factory;
pub mod storage;
mod builtin;
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests;
use std::cmp::Ordering;
use std::hash::Hasher;
use std::ops::Deref;
use std::sync::{self, RwLock};
pub use builtin::{PerCore, PerNuma, PerProcess};
pub use storage::{Storage, Strategy};
use crate::ThreadAware;
use crate::affinity::Affinity;
use crate::cell::factory::Factory;
use crate::closure::{ErasedClosureOnce, ThreadAwareFnOnce, closure_once};
struct BoxedRelocate<F>(F);
impl<F: Clone> Clone for BoxedRelocate<F> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<F: ThreadAware> ThreadAware for BoxedRelocate<F> {
fn relocate(&mut self, source: Option<Affinity>, destination: Affinity) {
self.0.relocate(source, destination);
}
}
impl<T, F: ThreadAwareFnOnce<T>> ThreadAwareFnOnce<Box<T>> for BoxedRelocate<F> {
fn call_once(self) -> Box<T> {
Box::new(self.0.call_once())
}
}
#[derive(Debug)]
pub struct Arc<T: ?Sized, S: Strategy> {
storage: sync::Arc<RwLock<Storage<sync::Arc<T>, S>>>,
value: sync::Arc<T>,
factory: Factory<T>,
}
impl<T: PartialEq, S: Strategy> PartialEq for Arc<T, S> {
fn eq(&self, other: &Self) -> bool {
self.value == other.value
}
}
impl<T: Eq, S: Strategy> Eq for Arc<T, S> {}
impl<T: std::hash::Hash + ?Sized, S: Strategy> std::hash::Hash for Arc<T, S> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.value.hash(state);
}
}
impl<T: Ord, S: Strategy> Ord for Arc<T, S> {
fn cmp(&self, other: &Self) -> Ordering {
self.value.cmp(&other.value)
}
}
impl<T: PartialOrd, S: Strategy> PartialOrd for Arc<T, S> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.value.partial_cmp(&other.value)
}
}
impl<T: ?Sized, S: Strategy> Clone for Arc<T, S> {
fn clone(&self) -> Self {
Self {
storage: sync::Arc::clone(&self.storage),
value: sync::Arc::clone(&self.value),
factory: self.factory.clone(),
}
}
}
impl<T: ?Sized, S: Strategy> Deref for Arc<T, S> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<T, S> Arc<T, S>
where
T: Send + 'static,
S: Strategy,
{
pub fn new(ctor: fn() -> T) -> Self {
struct Ctor<T> {
f: fn() -> T,
}
impl<T> Clone for Ctor<T> {
fn clone(&self) -> Self {
Self { f: self.f }
}
}
impl<T> ThreadAware for Ctor<T> {
fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {}
}
impl<T> ThreadAwareFnOnce<Box<T>> for Ctor<T> {
fn call_once(self) -> Box<T> {
Box::new((self.f)())
}
}
Self::with_closure_boxed(Ctor { f: ctor })
}
}
impl<T, S> Arc<T, S>
where
T: Send + 'static + ?Sized,
S: Strategy,
{
pub fn new_boxed(ctor: fn() -> Box<T>) -> Self {
struct Ctor<T: ?Sized> {
f: fn() -> Box<T>,
}
impl<T: ?Sized> Clone for Ctor<T> {
fn clone(&self) -> Self {
Self { f: self.f }
}
}
impl<T: ?Sized> ThreadAware for Ctor<T> {
fn relocate(&mut self, _source: Option<Affinity>, _destination: Affinity) {}
}
impl<T: ?Sized> ThreadAwareFnOnce<Box<T>> for Ctor<T> {
fn call_once(self) -> Box<T> {
(self.f)()
}
}
Self::with_closure_boxed(Ctor { f: ctor })
}
}
impl<T, S> Arc<T, S>
where
T: 'static,
S: Strategy,
{
pub fn new_with<D>(data: D, f: fn(D) -> T) -> Self
where
D: ThreadAware + Send + Sync + Clone + 'static,
{
Self::with_closure_boxed(BoxedRelocate(closure_once(data, f)))
}
}
impl<T, S: Strategy> Arc<T, S>
where
T: ThreadAware + Clone + 'static + Send,
{
#[cfg(test)]
pub(crate) fn with_value(value: T) -> Self {
let value = sync::Arc::new(value);
Self {
storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
value,
factory: Factory::Data(|data: &T, source, destination| {
let mut data = data.clone();
data.relocate(source, destination);
Box::new(data)
}),
}
}
}
impl<T, S: Strategy> Arc<T, S>
where
T: Clone + 'static + Send,
{
pub fn from_unaware(value: T) -> Self {
let value = sync::Arc::new(value);
Self {
storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
value,
factory: Factory::Data(|data: &T, _source, _destination| Box::new(data.clone())),
}
}
}
impl<T, S: Strategy> Arc<T, S>
where
T: ThreadAware + 'static + ?Sized,
{
pub fn with_clone_fn<V: Send + Sync + 'static>(value: V, clone_fn: fn(&V) -> Box<T>) -> Self {
let erased = clone_fn::ErasedCloneFn::new(value, clone_fn);
let value = sync::Arc::clone(erased.arc());
Self {
storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
value,
factory: Factory::ErasedCloneFn(erased),
}
}
}
impl<T, S: Strategy> Arc<T, S>
where
T: 'static + ?Sized,
{
pub(crate) fn with_closure_boxed<F>(closure: F) -> Self
where
F: ThreadAwareFnOnce<Box<T>> + Clone + ThreadAware + 'static + Send + Sync,
{
let value = sync::Arc::from(closure.clone().call_once());
Self {
storage: sync::Arc::new(RwLock::new(storage::Storage::new())),
value,
factory: Factory::Closure(sync::Arc::new(ErasedClosureOnce::new(closure)), None),
}
}
pub fn from_storage(storage: sync::Arc<RwLock<Storage<sync::Arc<T>, S>>>, current_affinity: Affinity) -> Self {
let value = storage
.read()
.expect("Failed to acquire read lock")
.get_clone(current_affinity)
.expect("No data found for the current affinity");
Self {
storage,
value,
factory: Factory::Manual,
}
}
}
impl<T, S: Strategy> Arc<T, S> {
#[must_use]
#[expect(clippy::missing_panics_doc, reason = "this code only panics when the lock is poisoned")]
pub fn strong_count(this: &Self) -> usize {
let raw = sync::Arc::strong_count(&this.value);
let guard = this.storage.read().expect("Failed to acquire read lock");
let internal = guard.count_where(|stored| sync::Arc::ptr_eq(stored, &this.value));
raw - internal
}
#[must_use]
pub fn into_arc(self) -> sync::Arc<T> {
self.value
}
}
impl<T: Send + Sync + ?Sized, S: Strategy + Send + Sync> ThreadAware for Arc<T, S> {
fn relocate(&mut self, source: Option<Affinity>, destination: Affinity) {
let mut guard = self.storage.write().expect("Failed to acquire write lock");
if let Some(value) = guard.get_clone(destination) {
self.value = value;
} else {
let (data, new_factory) = match &self.factory {
Factory::Closure(factory, factory_source_affinity) => {
let mut factory_clone = (**factory).clone();
let factory_source = factory_source_affinity.or(source);
factory_clone.relocate(factory_source, destination);
(
sync::Arc::from(factory_clone.call_once()),
Factory::Closure(sync::Arc::clone(factory), factory_source),
)
}
Factory::Data(factory) => (sync::Arc::from(factory(&self.value, source, destination)), self.factory.clone()),
Factory::ErasedCloneFn(erased) => {
let cloned = erased.clone_and_relocate(source, destination);
(cloned, self.factory.clone())
}
Factory::Manual => {
(sync::Arc::clone(&self.value), self.factory.clone())
}
};
let old_value = std::mem::replace(&mut self.value, data);
let old_data = guard.replace(destination, sync::Arc::<T>::clone(&self.value));
assert!(
old_data.is_none(),
"Data already exists for the destination affinity. This should be unreachable due to the the early write lock."
);
if let Some(source) = source {
if source != destination {
guard.replace(source, old_value);
}
}
self.factory = new_factory;
}
drop(guard);
}
}