use std::sync::Arc;
use std::ops::Deref;
use std::marker::PhantomData;
use std::io::{Error, Result, ErrorKind};
use futures::future::{FutureExt, BoxFuture};
use bytes::{Buf, BufMut};
use log::warn;
use pi_async_rt::{lock::spin_lock::SpinLock,
rt::{AsyncTaskPoolExt, AsyncTaskPool, AsyncRuntime,
multi_thread::MultiTaskRuntime}};
use pi_assets::{asset::{Asset, Size, Garbageer, GarbageGuard},
mgr::{AssetMgr, LoadResult},
allocator::Allocator};
use pi_share::Share;
use pi_hash::XHashMap;
use crate::vpm::{VirtualPageWriteDelta, VirtualPageBuf, PageId,
page_pool::{VirtualPageCachingStrategy, PageBuffer}};
lazy_static! {
static ref SHARED_PAGE_BUFFER_RELEASE_CALLBACK: SpinLock<Option<usize>> = SpinLock::new(None);
static ref IS_INITED: SpinLock<bool> = SpinLock::new(false);
static ref GLOBAL_VIRTUAL_PAGE_LFU_CACHE_MGR: SpinLock<Option<usize>> = SpinLock::new(None);
static ref GLOBAL_VIRTUAL_PAGE_LFU_CACHE_ALLOCATOR: SpinLock<Option<Allocator>> = SpinLock::new(None);
}
pub fn init_global_virtual_page_lfu_cache_allocator<C, O, B, D, P>(rt: MultiTaskRuntime<()>,
capacity: usize,
min_capacity: usize,
max_capacity: usize,
timeout: usize) -> bool
where C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O> {
let mut locked = IS_INITED.lock();
if !*locked {
let mut allocator = Allocator::new(capacity);
let callback = SharedPageBufferReleaseCallback::<C, O, B, D, P>::new(rt);
*SHARED_PAGE_BUFFER_RELEASE_CALLBACK.lock() = Some(Box::into_raw(Box::new(callback.clone())) as usize);
let mgr = AssetMgr::new(callback,
true,
capacity,
timeout);
*GLOBAL_VIRTUAL_PAGE_LFU_CACHE_MGR.lock() = Some(Arc::into_raw(mgr.clone()) as usize);
allocator.register(mgr, min_capacity, max_capacity);
*GLOBAL_VIRTUAL_PAGE_LFU_CACHE_ALLOCATOR.lock() = Some(allocator);
*locked = true;
true
} else {
false
}
}
pub fn startup_auto_collect<P, RT>(rt: RT,
interval: usize) -> bool
where P: AsyncTaskPoolExt<()> + AsyncTaskPool<(), Pool = P>,
RT: AsyncRuntime<(), Pool = P>, {
let locked = IS_INITED.lock();
if !*locked {
return false;
}
if let Some(allocator) = GLOBAL_VIRTUAL_PAGE_LFU_CACHE_ALLOCATOR.lock().take() {
allocator.auto_collect(rt, interval);
}
true
}
pub fn register_release_handler<C, O, B, D, P>(uid: u32,
handler: Arc<dyn SharedPageRelease<C, O, B, D, P>>) -> Option<Arc<dyn SharedPageRelease<C, O, B, D, P>>>
where C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O> {
if let Some(ptr) = *SHARED_PAGE_BUFFER_RELEASE_CALLBACK.lock() {
let boxed = unsafe {
Box::from_raw(ptr as *mut SharedPageBufferReleaseCallback<C, O, B, D, P>)
};
let result = boxed.register_handler(uid, handler);
Box::into_raw(boxed);
result
} else {
panic!("Register release handler failed, uid: {}, reason: handler not exist", uid);
}
}
pub trait SharedPageRelease<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
>: Send + Sync + 'static {
fn release(&self,
page_id: u128,
buffer: Arc<PageBuffer<C, O, B, D, P>>,
guard: GarbageGuard<SharedPageBuffer<C, O, B, D, P>>) -> BoxFuture<'static, ()>;
}
#[derive(Clone)]
pub struct SharedPageBuffer<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
>(Arc<PageBuffer<C, O, B, D, P>>);
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Deref for SharedPageBuffer<C, O, B, D, P> {
type Target = Arc<PageBuffer<C, O, B, D, P>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Asset for SharedPageBuffer<C, O, B, D, P> {
type Key = u128;
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Size for SharedPageBuffer<C, O, B, D, P> {
fn size(&self) -> usize {
self.0.buf_size()
}
}
pub struct SharedPageBufferReleaseCallback<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
>(Arc<InnerSharedPageBufferReleaseCallback<C, O, B, D, P>>);
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Clone for SharedPageBufferReleaseCallback<C, O, B, D, P> {
fn clone(&self) -> Self {
SharedPageBufferReleaseCallback(self.0.clone())
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Garbageer<SharedPageBuffer<C, O, B, D, P>> for SharedPageBufferReleaseCallback<C, O, B, D, P> {
fn garbage_ref(&self,
k: &u128,
v: &SharedPageBuffer<C, O, B, D, P>,
_timeout: u64,
guard: GarbageGuard<SharedPageBuffer<C, O, B, D, P>>) {
let uid = *k;
let buffer = (*v).clone();
if let Some(ptr) = *SHARED_PAGE_BUFFER_RELEASE_CALLBACK.lock() {
let _ = self.0.rt.spawn(async move {
if buffer.deltas_len() > 0 {
let boxed = unsafe {
Box::from_raw(ptr as *mut SharedPageBufferReleaseCallback<C, O, B, D, P>)
};
let page_id = PageId::new(uid);
if let Some(handler) = boxed.get_handler(&page_id.owner_uid()) {
handler.release(uid, buffer, guard).await;
Box::into_raw(boxed); } else {
Box::into_raw(boxed); error!("Release shared page buffer failed, manager_id: {:?}, page_id: {:?}, reason: handler not exist",
page_id.owner_uid(),
page_id);
}
}
});
} else {
warn!("Garbage ref warning, page_id: {}, reason: release callback not exist",
uid);
}
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> SharedPageBufferReleaseCallback<C, O, B, D, P> {
pub fn new(rt: MultiTaskRuntime<()>) -> Self {
let inner = InnerSharedPageBufferReleaseCallback {
rt,
handlers: SpinLock::new(XHashMap::default()),
};
SharedPageBufferReleaseCallback(Arc::new(inner))
}
pub fn get_handler(&self, uid: &u32) -> Option<Arc<dyn SharedPageRelease<C, O, B, D, P>>> {
if let Some(handler) = self.0.handlers.lock().get(uid) {
Some(handler.clone())
} else {
None
}
}
pub fn register_handler(&self,
uid: u32,
handler: Arc<dyn SharedPageRelease<C, O, B, D, P>>) -> Option<Arc<dyn SharedPageRelease<C, O, B, D, P>>> {
self
.0
.handlers
.lock()
.insert(uid, handler)
}
}
struct InnerSharedPageBufferReleaseCallback<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> {
rt: MultiTaskRuntime<()>, handlers: SpinLock<XHashMap<u32, Arc<dyn SharedPageRelease<C, O, B, D, P>>>>, }
pub struct VirtualPageLFUCache<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
>(Arc<InnerVirtualPageLFUCache<C, O, B, D, P>>);
unsafe impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Send for VirtualPageLFUCache<C, O, B, D, P> {}
unsafe impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Sync for VirtualPageLFUCache<C, O, B, D, P> {}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Clone for VirtualPageLFUCache<C, O, B, D, P> {
fn clone(&self) -> Self {
VirtualPageLFUCache(self.0.clone())
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> VirtualPageCachingStrategy<C, O, B, D, P> for VirtualPageLFUCache<C, O, B, D, P> {
type Iter = VirtualPageLFUCacheDirtyIterator<C, O, B, D, P>;
fn is_full(&self) -> bool {
self.0.mgr.get_capacity() <= self.0.mgr.size()
}
fn contains(&self, page_id: &u128) -> bool {
self.0.mgr.contains_key(page_id)
}
fn len(&self) -> usize {
self.0.mgr.len()
}
fn size(&self) -> u64 {
self.0.mgr.size() as u64
}
fn adjust_size(&self, page_id: &u128, size: isize) {
if let Some(handle) = self.0.mgr.get(page_id) {
handle.adjust_size(size);
}
}
fn get(&self, page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>> {
if let Some(handle) = self.0.mgr.get(page_id) {
Some((*handle).clone())
} else {
None
}
}
fn load(&self,
page_id: u128,
loading: BoxFuture<'static, Result<PageBuffer<C, O, B, D, P>>>,
loaded: Box<dyn Fn(&PageBuffer<C, O, B, D, P>) + Send + 'static>)
-> BoxFuture<'static, Result<Arc<PageBuffer<C, O, B, D, P>>>>
{
let mgr = self.0.mgr.clone();
async move {
match AssetMgr::load(&mgr, &page_id) {
LoadResult::Ok(handle) => {
Ok((*handle).clone())
},
LoadResult::Wait(wait) => {
match wait.await {
Err(e) if e.kind() == ErrorKind::NotFound => {
Err(e)
},
Err(e) => {
Err(e)
},
Ok(handle) => {
Ok((*handle).clone())
},
}
},
LoadResult::Receiver(receiver) => {
match loading.await {
Err(e) if e.kind() == ErrorKind::NotFound => {
let _ = receiver.receive(page_id, Err(Error::new(e.kind(), format!("Load page buffer failed, page_id: {:?}, reason: {:?}", page_id, e)))).await; Err(e)
},
Err(e) => {
let _ = receiver.receive(page_id, Err(Error::new(e.kind(), format!("Async load page buffer failed, page_id: {:?}, reason: {:?}", page_id, e)))).await; Err(e)
},
Ok(buffer) => {
loaded(&buffer);
match receiver.receive(page_id, Ok(SharedPageBuffer(Arc::new(buffer)))).await {
Err(e) => {
Err(Error::new(ErrorKind::Other, format!("Async load page buffer failed, page_id: {:?}, reason: {:?} receive", page_id, e)))
},
Ok(handle) => {
Ok((*handle).clone())
},
}
},
}
},
}
}.boxed()
}
fn insert(&self, page_id: u128, buffer: PageBuffer<C, O, B, D, P>)
-> Option<Arc<PageBuffer<C, O, B, D, P>>> {
let _ = self
.0
.mgr
.insert(page_id, SharedPageBuffer(Arc::new(buffer))); None
}
fn remove(&self, _page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>> {
None
}
fn clear(&self) -> usize {
0
}
fn iter(&self) -> Self::Iter {
let mut keys: Vec<u128> = Vec::new();
self.0.mgr.cache_iter(&mut keys,
|keys: &mut Vec<u128>, key: &u128, val: &SharedPageBuffer<C, O, B, D, P>, time: u64| {
if val.0.deltas_len() > 0 {
keys.push(*key);
}
});
VirtualPageLFUCacheDirtyIterator {
mgr: self.0.mgr.clone(),
keys,
marker: PhantomData,
}
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> VirtualPageLFUCache<C, O, B, D, P> {
pub fn new() -> Self {
if let Some(ptr) = *GLOBAL_VIRTUAL_PAGE_LFU_CACHE_MGR.lock() {
let mgr = unsafe {
Arc::from_raw(ptr as *const AssetMgr<SharedPageBuffer<C, O, B, D, P>, SharedPageBufferReleaseCallback<C, O, B, D, P>>)
};
let inner = InnerVirtualPageLFUCache {
mgr: mgr.clone(),
};
let _ = Arc::into_raw(mgr);
VirtualPageLFUCache(Arc::new(inner))
} else {
panic!("Create VirtualPageLFUCache failed, reason: uninitialized global cache allocator");
}
}
pub fn is_enough_empty(&self) -> bool {
self.0.mgr.size() as f64 / self.0.mgr.get_capacity() as f64 <= 0.75
}
}
struct InnerVirtualPageLFUCache<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> {
mgr: Share<AssetMgr<SharedPageBuffer<C, O, B, D, P>, SharedPageBufferReleaseCallback<C, O, B, D, P>>>,
}
pub struct VirtualPageLFUCacheDirtyIterator<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> {
mgr: Share<AssetMgr<SharedPageBuffer<C, O, B, D, P>, SharedPageBufferReleaseCallback<C, O, B, D, P>>>,
keys: Vec<u128>,
marker: PhantomData<(C, O, B, D, P)>,
}
unsafe impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Send for VirtualPageLFUCacheDirtyIterator<C, O, B, D, P> {}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> Iterator for VirtualPageLFUCacheDirtyIterator<C, O, B, D, P> {
type Item = Arc<PageBuffer<C, O, B, D, P>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(key) = self.keys.pop() {
if let Some(handle) = self.mgr.get(&key) {
Some((*handle).clone())
} else {
None
}
} else {
None
}
}
}