#![allow(unused)]
use std::any::TypeId;
use std::cell::UnsafeCell;
use std::future::Future;
use std::num::NonZeroU128;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use allocative::Allocative;
use allocative::Visitor;
use dupe::Clone_;
use dupe::Copy_;
use dupe::Dupe;
use dupe::Dupe_;
use either::Either;
use parking_lot::Mutex;
use strong_hash::StrongHash;
use crate::Pagable;
use crate::PagableDeserialize;
use crate::PagableDeserializer;
use crate::PagableEagerDeserialize;
use crate::PagableEagerSerialize;
use crate::PagableSerialize;
use crate::PagableSerializer;
use crate::arc_erase::ArcErase;
use crate::arc_erase::ArcEraseDyn;
use crate::arc_erase::ArcEraseType;
use crate::arc_erase::StdArcEraseType;
use crate::arc_erase::deserialize_arc;
use crate::storage::data::DataKey;
use crate::storage::data::OptionalDataKey;
use crate::storage::handle::PagableStorageHandle;
use crate::storage::traits::PagableStorage;
#[derive(allocative::Allocative)]
pub struct PagableArc<T: Pagable> {
state: PagableArcStateHolder,
#[allocative(skip)]
pointer: triomphe::Arc<PagableArcInner<T>>,
}
#[derive(allocative::Allocative)]
struct PagableArcStateHolder(AtomicUsize);
impl PagableArcStateHolder {
fn get(&self) -> PagableArcState {
match self.0.load(Ordering::Relaxed) {
0 => {
static_assertions::const_assert_eq!(0, PagableArcState::Unpinned as usize);
PagableArcState::Unpinned
}
1 => {
static_assertions::const_assert_eq!(1, PagableArcState::Pinned as usize);
PagableArcState::Pinned
}
_ => unreachable!(),
}
}
fn set_pinned(&self) -> bool {
self.0
.compare_exchange(
PagableArcState::Unpinned as usize,
PagableArcState::Pinned as usize,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
}
fn set_unpinned(&self) -> bool {
self.0
.compare_exchange(
PagableArcState::Pinned as usize,
PagableArcState::Unpinned as usize,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
}
}
#[derive(Eq, PartialEq)]
enum PagableArcState {
Unpinned,
Pinned,
}
impl From<PagableArcState> for PagableArcStateHolder {
fn from(state: PagableArcState) -> Self {
Self(AtomicUsize::new(state as usize))
}
}
impl<T: Pagable + std::fmt::Debug> std::fmt::Debug for PagableArc<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PagableArc")
.field("state", &self.state.0)
.field("pointer", &self.pointer)
.finish()
}
}
impl<T: Pagable> Clone for PagableArc<T> {
fn clone(&self) -> Self {
match self.state.get() {
PagableArcState::Pinned => {
if self.pointer.try_alloc_pinned() {
Self {
state: PagableArcState::Pinned.into(),
pointer: self.pointer.clone(),
}
} else {
Self {
state: PagableArcState::Unpinned.into(),
pointer: self.pointer.clone(),
}
}
}
PagableArcState::Unpinned => Self {
state: PagableArcState::Unpinned.into(),
pointer: self.pointer.clone(),
},
}
}
}
impl<T: Pagable> Dupe for PagableArc<T> {}
#[allow(private_interfaces)]
#[derive(Debug, Clone_, Copy_)]
pub struct PinnedPagableArcBorrow<'a, T> {
inner: triomphe::ArcBorrow<'a, PagableArcInner<T>>,
}
impl<'a, T> Dupe for PinnedPagableArcBorrow<'a, T> {}
impl<'a, T: Pagable> PinnedPagableArcBorrow<'a, T> {
pub fn clone_arc(self) -> PinnedPagableArc<T> {
self.inner.add_pinned_unchecked();
PinnedPagableArc::new_from_ptr(self.inner.clone_arc())
}
pub fn get(&self) -> &T {
self.inner.pinned_access()
}
}
impl<T: Pagable> std::ops::Deref for PinnedPagableArcBorrow<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.inner.pinned_access()
}
}
impl<T: Pagable> PagableArc<T> {
pub fn new(v: T, storage: PagableStorageHandle) -> Self {
let pointer = triomphe::Arc::new(PagableArcInner::new(v, storage));
Self {
state: PagableArcStateHolder(AtomicUsize::new(1)),
pointer,
}
}
pub(crate) fn new_paged_out(key: &DataKey, storage: PagableStorageHandle) -> Self {
let pointer = triomphe::Arc::new(PagableArcInner::new_paged_out(key, storage));
Self {
state: PagableArcStateHolder(AtomicUsize::new(0)),
pointer,
}
}
pub fn ptr_eq(left: &Self, right: &Self) -> bool {
triomphe::Arc::ptr_eq(&left.pointer, &right.pointer)
}
#[cfg(any(feature = "tokio", test))]
pub fn pin_sync(&self) -> crate::Result<PinnedPagableArc<T>>
where
T: Pagable,
{
self.pointer.alloc_pinned_blocking()?;
if self.state.set_pinned() {
self.pointer.add_pinned_unchecked();
}
Ok(PinnedPagableArc::new_from_ptr(self.pointer.clone()))
}
pub async fn pin(&self) -> anyhow::Result<PinnedPagableArc<T>>
where
T: Pagable,
{
self.pointer.alloc_pinned_async().await?;
if self.state.set_pinned() {
self.pointer.add_pinned_unchecked();
}
Ok(PinnedPagableArc::new_from_ptr(self.pointer.clone()))
}
pub fn unpin(&self) {
if self.state.set_unpinned() {
PagableArcInner::release_pin(&self.pointer);
}
}
pub(crate) fn is_paged_out(&self) -> bool {
self.pointer.is_paged_out()
}
pub(crate) fn get_data_key(&self) -> OptionalDataKey {
self.pointer.get_data_key()
}
pub(crate) fn set_data_key(&self, key: DataKey) {
self.pointer.set_data_key(key);
}
pub(crate) fn pinned_count(&self) -> usize {
self.pointer.pinned_count()
}
pub(crate) fn is_pinned(&self) -> bool {
if self.state.get() == PagableArcState::Pinned {
assert!(self.pointer.is_pinned());
true
} else {
false
}
}
}
impl<T: Pagable> Drop for PagableArc<T> {
fn drop(&mut self) {
match self.state.get() {
PagableArcState::Pinned => PagableArcInner::release_pin(&self.pointer),
PagableArcState::Unpinned => {}
}
}
}
pub struct PinnedPagableArc<T: Pagable> {
pointer: triomphe::Arc<PagableArcInner<T>>,
}
impl<T: Pagable + std::fmt::Debug> std::fmt::Debug for PinnedPagableArc<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Arc")
.field("pointer", &self.pointer)
.finish()
}
}
unsafe impl<T: Pagable> Send for PinnedPagableArc<T> {}
unsafe impl<T: Pagable> Sync for PinnedPagableArc<T> {}
impl<T: Pagable> Allocative for PinnedPagableArc<T> {
fn visit<'a, 'b: 'a>(&self, _visitor: &'a mut Visitor<'b>) {}
}
impl<T: Pagable + Eq + PartialEq> Eq for PinnedPagableArc<T> {}
impl<T: Pagable + PartialEq> PartialEq for PinnedPagableArc<T> {
fn eq(&self, other: &Self) -> bool {
triomphe::Arc::ptr_eq(&self.pointer, &other.pointer)
|| self.pointer.pinned_access() == other.pointer.pinned_access()
}
}
impl<T: Pagable> PinnedPagableArc<T> {
pub fn new(v: T, storage: PagableStorageHandle) -> Self {
let pointer = triomphe::Arc::new(PagableArcInner::new(v, storage));
Self { pointer }
}
pub fn borrow(v: &PinnedPagableArc<T>) -> PinnedPagableArcBorrow<'_, T> {
PinnedPagableArcBorrow {
inner: triomphe::Arc::borrow_arc(&v.pointer),
}
}
fn new_from_ptr(pointer: triomphe::Arc<PagableArcInner<T>>) -> Self {
Self { pointer }
}
pub fn ptr_eq(left: &Self, right: &Self) -> bool {
triomphe::Arc::ptr_eq(&left.pointer, &right.pointer)
}
pub fn into_pagable(self) -> PagableArc<T>
where
T: Pagable,
{
self.pointer.add_pinned_unchecked();
PagableArc {
state: PagableArcState::Pinned.into(),
pointer: self.pointer.clone(),
}
}
}
impl<T: Pagable> Drop for PinnedPagableArc<T> {
fn drop(&mut self) {
PagableArcInner::release_pin(&self.pointer);
}
}
impl<T: Pagable> Clone for PinnedPagableArc<T> {
fn clone(&self) -> Self {
self.pointer.add_pinned_unchecked();
Self::new_from_ptr(self.pointer.clone())
}
}
impl<T: Pagable> Dupe for PinnedPagableArc<T> {}
impl<T: Pagable> Deref for PinnedPagableArc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.pointer.pinned_access()
}
}
struct PagableArcInner<T> {
pinned_count: AtomicUsize,
lock: Mutex<()>,
data: UnsafeCell<PagableArcInnerData<T>>,
storage: PagableStorageHandle,
}
unsafe impl<T> Sync for PagableArcInner<T> {}
impl<T: std::fmt::Debug> std::fmt::Debug for PagableArcInner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let _lock = self.lock.lock();
f.debug_struct("PagableArcInner")
.field("pinned", &self.pinned_count)
.field("value", unsafe { &*self.data.get() })
.finish()
}
}
#[derive(Debug)]
struct PagableArcInnerData<T> {
key: OptionalDataKey,
value: PagableArcInnerState<T>,
}
impl<T> PagableArcInnerData<T> {
fn new_paged_out(key: &DataKey) -> Self {
Self {
key: (*key).into(),
value: PagableArcInnerState::PagedOut,
}
}
fn new_pinned(value: std::sync::Arc<T>) -> Self {
Self {
key: OptionalDataKey::None,
value: PagableArcInnerState::Pinned(value),
}
}
fn unpin(&mut self) -> bool {
if self.key.is_some() {
self.value = PagableArcInnerState::PagedOut;
false
} else {
take_mut::take(&mut self.value, |v| {
PagableArcInnerState::Unpinned(v.unwrap_into_ready())
});
true
}
}
fn try_pin(&mut self) -> bool {
take_mut::take(&mut self.value, |v| match v {
PagableArcInnerState::Pinned(v) | PagableArcInnerState::Unpinned(v) => {
PagableArcInnerState::Pinned(v)
}
PagableArcInnerState::PagedOut => PagableArcInnerState::PagedOut,
});
matches!(self.value, PagableArcInnerState::Pinned(_))
}
fn pin(&mut self, data: std::sync::Arc<T>) {
take_mut::take(&mut self.value, |v| {
PagableArcInnerState::Pinned(match v {
PagableArcInnerState::Pinned(v) | PagableArcInnerState::Unpinned(v) => v,
PagableArcInnerState::PagedOut => data,
})
})
}
}
#[derive(Debug)]
enum PagableArcInnerState<T> {
Pinned(std::sync::Arc<T>),
Unpinned(std::sync::Arc<T>),
PagedOut,
}
impl<T> PagableArcInnerState<T> {
fn unwrap_ready(&self) -> &T {
match self {
PagableArcInnerState::Pinned(t) => t,
PagableArcInnerState::Unpinned(_) => panic!("Unpinned state is not ready"),
PagableArcInnerState::PagedOut => panic!("PagedOut state is not ready"),
}
}
fn unwrap_into_ready(self) -> std::sync::Arc<T> {
match self {
PagableArcInnerState::Pinned(t) => t,
PagableArcInnerState::Unpinned(_) => panic!("Unpinned state is not ready"),
PagableArcInnerState::PagedOut => panic!("PagedOut state is not ready"),
}
}
}
#[cfg(target_pointer_width = "64")]
static_assertions::assert_eq_size!(PagableArcInner<[usize; 4]>, [usize; 8]);
#[cfg(target_pointer_width = "32")]
static_assertions::assert_eq_size!(PagableArcInner<[usize; 4]>, [usize; 12]);
impl<T: Pagable> PagableArcInner<T> {
pub fn new_paged_out(key: &DataKey, storage: PagableStorageHandle) -> Self {
Self {
pinned_count: AtomicUsize::new(0),
lock: Mutex::new(()),
data: UnsafeCell::new(PagableArcInnerData::new_paged_out(key)),
storage,
}
}
fn new(value: T, storage: PagableStorageHandle) -> Self {
Self {
pinned_count: AtomicUsize::new(1),
lock: Mutex::new(()),
data: UnsafeCell::new(PagableArcInnerData::new_pinned(std::sync::Arc::new(value))),
storage,
}
}
fn pinned_access(&self) -> &T {
unsafe { &*self.data.get() }.value.unwrap_ready()
}
fn add_pinned_unchecked(&self) {
let s = self.pinned_count.fetch_add(1, Ordering::Relaxed);
assert!(s >= 1);
}
#[allow(clippy::manual_async_fn)]
fn alloc_pinned_async<'a>(&'a self) -> impl Future<Output = anyhow::Result<()>> + Send + 'a
where
T: Pagable,
{
async move {
if self.try_alloc_pinned() {
return Ok(());
}
let lock = self.lock.lock();
let data = unsafe { &mut *self.data.get() };
if !data.try_pin() {
let key = data.key.unwrap();
drop((data, lock));
let value: std::sync::Arc<T> =
std::sync::Arc::new(self.storage.deserialize_pagable_data(&key).await?);
let _lock = self.lock.lock();
let data = unsafe { &mut *self.data.get() };
data.pin(value);
}
self.pinned_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
#[cfg(any(feature = "tokio", test))]
fn alloc_pinned_blocking(&self) -> anyhow::Result<()>
where
T: Pagable,
{
if self.try_alloc_pinned() {
return Ok(());
}
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async move { self.alloc_pinned_async().await })
})
}
fn try_alloc_pinned(&self) -> bool {
if self
.pinned_count
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
if v == 0 { None } else { Some(v + 1) }
})
.is_ok()
{
return true;
}
{
let _lock = self.lock.lock();
let data = unsafe { &mut *self.data.get() };
if data.try_pin() {
self.pinned_count.fetch_add(1, Ordering::Relaxed);
return true;
}
}
false
}
fn release_pin(ptr: &triomphe::Arc<Self>) {
if ptr.pinned_count.fetch_sub(1, Ordering::Relaxed) == 1 {
let _lock = ptr.lock.lock();
if ptr.pinned_count.load(Ordering::Relaxed) == 0 {
let data: &mut _ = unsafe { &mut *ptr.data.get() };
if data.unpin() {
let d = PagableArc {
state: PagableArcState::Unpinned.into(),
pointer: ptr.clone(),
};
ptr.storage.schedule_for_paging(d);
}
}
}
}
pub(crate) fn is_paged_out(&self) -> bool {
let _lock = self.lock.lock();
match unsafe { &*self.data.get() }.value {
PagableArcInnerState::PagedOut => true,
PagableArcInnerState::Pinned(_) | PagableArcInnerState::Unpinned(_) => false,
}
}
pub(crate) fn get_data_key(&self) -> OptionalDataKey {
let _lock = self.lock.lock();
unsafe { &*self.data.get() }.key
}
pub(crate) fn set_data_key(&self, key: DataKey) {
let _lock = self.lock.lock();
let mut data = unsafe { &mut *self.data.get() };
data.key = key.into();
take_mut::take(&mut data.value, |v| match v {
PagableArcInnerState::Unpinned(_) | PagableArcInnerState::PagedOut => {
PagableArcInnerState::PagedOut
}
v => v,
});
}
pub(crate) fn pinned_count(&self) -> usize {
self.pinned_count.load(Ordering::Relaxed)
}
pub(crate) fn is_pinned(&self) -> bool {
if self.pinned_count.load(Ordering::Relaxed) > 0 {
let _lock = self.lock.lock();
if self.pinned_count.load(Ordering::Relaxed) > 0 {
assert!(matches!(
unsafe { &*self.data.get() }.value,
PagableArcInnerState::Pinned(_)
));
return true;
}
}
false
}
}
impl<T: Pagable> PagableSerialize for PagableArc<T> {
fn pagable_serialize(&self, serializer: &mut dyn PagableSerializer) -> anyhow::Result<()> {
serializer.serialize_arc(self)
}
}
impl<'de, T: Pagable> PagableDeserialize<'de> for PagableArc<T> {
fn pagable_deserialize<D: PagableDeserializer<'de> + ?Sized>(
deserializer: &mut D,
) -> crate::Result<Self> {
deserialize_arc::<Self, _>(deserializer)
}
}
impl<T: Pagable> ArcErase for PagableArc<T> {
type Weak = ();
fn dupe_strong(&self) -> Self {
self.dupe()
}
fn upgrade_weak(weak: &Self::Weak) -> Option<Self> {
None
}
fn erase_type() -> impl ArcEraseType {
StdArcEraseType::<Self>::new()
}
fn identity(&self) -> usize {
self.pointer.as_ptr() as usize
}
fn downgrade(&self) -> Option<Self::Weak> {
None
}
fn set_data_key(&self, k: DataKey) {
self.set_data_key(k);
}
fn needs_paging_out(&self) -> bool {
self.get_data_key().is_none()
}
fn serialize_inner(&self, ser: &mut dyn PagableSerializer) -> anyhow::Result<()> {
#[cfg(any(feature = "tokio", test))]
{
let strong = self.pin_sync()?;
<T as PagableSerialize>::pagable_serialize(&strong, ser)
}
#[cfg(not(any(feature = "tokio", test)))]
{
Err(anyhow::anyhow!(
"Cannot serialize PagableArc without tokio feature"
))
}
}
fn deserialize_inner<'de, D: PagableDeserializer<'de> + ?Sized>(
deser: &mut D,
) -> anyhow::Result<Self> {
Ok(Self::new(
<T as PagableDeserialize>::pagable_deserialize(deser)?,
deser.storage().dupe(),
))
}
}
impl<T: Pagable> PagableSerialize for PinnedPagableArc<T> {
fn pagable_serialize(&self, serializer: &mut dyn PagableSerializer) -> anyhow::Result<()> {
serializer.serialize_arc(self)
}
}
impl<'de, T: Pagable> PagableDeserialize<'de> for PinnedPagableArc<T> {
fn pagable_deserialize<D: PagableDeserializer<'de> + ?Sized>(
deserializer: &mut D,
) -> crate::Result<Self> {
deserialize_arc::<Self, _>(deserializer)
}
}
impl<T: Pagable> ArcErase for PinnedPagableArc<T> {
type Weak = ();
fn dupe_strong(&self) -> Self {
self.dupe()
}
fn upgrade_weak(weak: &Self::Weak) -> Option<Self> {
None
}
fn erase_type() -> impl ArcEraseType {
StdArcEraseType::<Self>::new()
}
fn identity(&self) -> usize {
self.pointer.as_ptr() as usize
}
fn downgrade(&self) -> Option<Self::Weak> {
None
}
fn serialize_inner(&self, ser: &mut dyn PagableSerializer) -> anyhow::Result<()> {
<T as PagableSerialize>::pagable_serialize(self, ser)
}
fn deserialize_inner<'de, D: PagableDeserializer<'de> + ?Sized>(
deser: &mut D,
) -> anyhow::Result<Self> {
Ok(Self::new(
<T as PagableDeserialize>::pagable_deserialize(deser)?,
deser.storage().dupe(),
))
}
}