use std::marker::PhantomData;
use std::collections::VecDeque;
use std::io::Result;
use std::sync::{Arc,
atomic::{AtomicU64, AtomicUsize, Ordering}};
use futures::future::BoxFuture;
use async_lock::{Mutex, MutexGuard};
use bytes::BufMut;
use pi_async_rt::{lock::spin_lock::SpinLock,
rt::multi_thread::MultiTaskRuntime};
use crate::vpm::{VirtualPageWriteDelta, VirtualPageBuf};
pub const PAGE_INITED: u8 = 0;
pub const PAGE_FLUSHING: u8 = 1;
pub const PAGE_SYNCING: u8 = 2;
pub const PAGE_SYNCED: u8 = 3;
pub const PAGE_COLLECTING: u8 = 4;
pub trait VirtualPageCachingStrategy<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
>: Clone + Send + Sync + 'static {
type Iter: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static;
fn is_full(&self) -> bool;
fn contains(&self, page_id: &u128) -> bool;
fn len(&self) -> usize;
fn size(&self) -> u64;
fn adjust_size(&self, page_id: &u128, size: isize);
fn get(&self, page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>>;
fn load(&self,
page_id: u128,
loading: BoxFuture<'static, Result<PageBuffer<C, O, B, D, P>>>,
loaded: Box<dyn Fn(&PageBuffer<C, O, B, D, P>) + Send + 'static>)
-> BoxFuture<'static, Result<Arc<PageBuffer<C, O, B, D, P>>>>;
fn insert(&self, page_id: u128, buffer: PageBuffer<C, O, B, D, P>)
-> Option<Arc<PageBuffer<C, O, B, D, P>>>;
fn remove(&self, page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>>;
fn clear(&self) -> usize;
fn iter(&self) -> Self::Iter;
}
pub struct VirtualPageBufferPool<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
>(Arc<InnerVirtualPageBufPool<C, O, B, D, P, I, M>>);
unsafe impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> Send for VirtualPageBufferPool<C, O, B, D, P, I, M> {}
unsafe impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> Sync for VirtualPageBufferPool<C, O, B, D, P, I, M> {}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> Clone for VirtualPageBufferPool<C, O, B, D, P, I, M> {
fn clone(&self) -> Self {
VirtualPageBufferPool(self.0.clone())
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> VirtualPageBufferPool<C, O, B, D, P, I, M> {
pub fn new(rt: MultiTaskRuntime<()>,
cache: M,
limit: usize) -> Self {
let inner = InnerVirtualPageBufPool {
rt,
cache,
limit,
marker: PhantomData,
};
VirtualPageBufferPool(Arc::new(inner))
}
pub fn contains(&self, page_id: &u128) -> bool {
self.0.cache.contains(page_id)
}
pub fn is_dirty(&self, page_id: &u128) -> bool {
self.deltas_len(page_id) > 0
}
pub fn get_limit(&self) -> usize {
self.0.limit
}
pub fn len(&self) -> usize {
self.0.cache.len()
}
pub fn size(&self) -> u64 {
self.0.cache.size()
}
pub fn deltas_len(&self, page_id: &u128) -> usize {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.deltas.lock().len()
} else {
0
}
}
pub fn deltas_size(&self, page_id: &u128) -> usize {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.current.load(Ordering::Relaxed)
} else {
0
}
}
pub fn deltas_size_limit(&self, page_id: &u128) -> usize {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.limit
} else {
0
}
}
pub fn dirty_expired(&self, page_id: &u128) -> u64 {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.dirty_expired.load(Ordering::Relaxed)
} else {
0
}
}
pub fn page_expired(&self, page_id: &u128) -> u64 {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.expired.load(Ordering::Relaxed)
} else {
0
}
}
pub fn adjust_page_buffer_size(&self, page_id: &u128, size: isize) {
self.0.cache.adjust_size(page_id, size);
}
#[inline]
pub fn get_page_buffer(&self, page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>> {
self.0.cache.get(page_id)
}
pub fn join_page(&self,
page_id: u128,
buffer: PageBuffer<C, O, B, D, P>) {
self.0.cache.insert(page_id, buffer);
}
pub fn read_base_page(&self,
page_id: &u128,
expired: u64) -> Option<O> {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.expired.store(expired, Ordering::Relaxed);
Some(buf
.page
.lock()
.read_page())
} else {
None
}
}
pub fn join_delta(&self,
page_id: &u128,
delta: D,
dirty_expired: u64,
expired: u64) -> bool {
if let Some(buf) = self.get_page_buffer(page_id) {
buf.deltas.lock().push_back(delta);
buf.dirty_expired.store(dirty_expired, Ordering::Relaxed);
buf.expired.store(expired, Ordering::Relaxed);
true
} else {
false
}
}
pub fn remove_page(&self, page_id: &u128) -> Option<Arc<PageBuffer<C, O, B, D, P>>> {
self.0.cache.remove(page_id)
}
pub fn iter(&self) -> I {
self.0.cache.iter()
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> VirtualPageBufferPool<C, O, B, D, P, I, M> {
#[inline]
pub async fn load_page_buffer(&self,
page_id: u128,
loading: BoxFuture<'static, Result<PageBuffer<C, O, B, D, P>>>,
loaded: Box<dyn Fn(&PageBuffer<C, O, B, D, P>) + Send + 'static>)
-> Result<Arc<PageBuffer<C, O, B, D, P>>> {
self
.0
.cache
.load(page_id, loading, loaded)
.await
}
}
struct InnerVirtualPageBufPool<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
I: Iterator<Item = Arc<PageBuffer<C, O, B, D, P>>> + Send + 'static,
M: VirtualPageCachingStrategy<C, O, B, D, P, Iter = I>,
> {
rt: MultiTaskRuntime<()>, cache: M, limit: usize, marker: PhantomData<(C, O, B, D, P, I, M)>,
}
pub struct PageBuffer<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> {
page: SpinLock<P>, deltas: SpinLock<VecDeque<D>>, status: Mutex<u8>, current: AtomicUsize, limit: usize, dirty_expired: AtomicU64, expired: AtomicU64, }
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> PageBuffer<C, O, B, D, P> {
pub fn new(page: P, limit: usize) -> Self {
PageBuffer {
page: SpinLock::new(page),
deltas: SpinLock::new(VecDeque::new()),
status: Mutex::new(PAGE_INITED),
current: AtomicUsize::new(0),
limit,
dirty_expired: AtomicU64::new(0),
expired: AtomicU64::new(0),
}
}
#[inline]
pub fn base_page(&self) -> &SpinLock<P> {
&self.page
}
#[inline]
pub fn set_base_page(&self, new_base_page: P) {
*self.page.lock() = new_base_page;
}
#[inline]
pub fn copy_base_page(&self) -> P {
self
.page
.lock()
.clone()
}
#[inline]
pub fn deltas_len(&self) -> usize {
self.deltas.lock().len()
}
#[inline]
pub fn deltas_size(&self) -> usize {
self.current.load(Ordering::Relaxed)
}
#[inline]
pub fn deltas_limit(&self) -> usize {
self.limit
}
#[inline]
pub fn get_deltas(&self) -> &SpinLock<VecDeque<D>> {
&self.deltas
}
#[inline]
pub fn append_delta(&self, delta: D) {
self
.current
.fetch_add(delta.size(), Ordering::Relaxed);
self
.deltas
.lock()
.push_back(delta);
}
#[inline]
pub fn buf_size(&self) -> usize {
self.page.lock().page_size() + self.deltas_size()
}
#[inline]
pub fn get_dirty_expired(&self) -> u64 {
self.dirty_expired.load(Ordering::Relaxed)
}
#[inline]
pub fn set_dirty_expired(&self, dirty_expired: u64) {
self.dirty_expired.store(dirty_expired, Ordering::Relaxed);
}
#[inline]
pub fn get_expired(&self) -> u64 {
self.expired.load(Ordering::Relaxed)
}
pub fn set_expired(&self, expired: u64) {
self.expired.store(expired, Ordering::Relaxed);
}
}
impl<
C: Send + 'static,
O: Send + 'static,
B: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static,
D: VirtualPageWriteDelta<Content = C>,
P: VirtualPageBuf<Content = C, Delta = D, Bin = B, Output = O>,
> PageBuffer<C, O, B, D, P> {
#[inline]
pub async fn get_status(&self) -> u8 {
*self.status.lock().await
}
#[inline]
pub async fn start_flush<'a>(&'a self) -> PageBufferStatusGuard<'a> {
let mut locked = self.status.lock().await;
match *locked {
PAGE_INITED => {
*locked = PAGE_FLUSHING;
PageBufferStatusGuard(locked)
},
current => {
panic!("Start flush failed, current: {}, reason: invalid status", current);
},
}
}
#[inline]
pub async fn start_sync<'a>(&'a self) -> PageBufferStatusGuard<'a> {
let mut locked = self.status.lock().await;
match *locked {
PAGE_INITED => {
*locked = PAGE_SYNCING;
PageBufferStatusGuard(locked)
},
current => {
panic!("Start sync failed, current: {}, reason: invalid status", current);
},
}
}
}
pub struct PageBufferStatusGuard<'a>(MutexGuard<'a, u8>);
impl<'a> Drop for PageBufferStatusGuard<'a> {
fn drop(&mut self) {
*self.0 = PAGE_INITED;
}
}