use std::ops::Deref;
use std::collections::VecDeque;
use std::io::{Error, Result as IOResult, ErrorKind};
use std::marker::PhantomData;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::sync::atomic::AtomicU64;
use async_channel::{Sender as AsyncSender, Receiver as AsyncReceiver, bounded as async_bounded};
use bytes::{Buf, BufMut};
use pi_async_rt::lock::spin_lock::SpinLock;
pub mod page_manager;
pub mod page_pool;
pub mod page_table;
pub mod page_cache;
pub mod utils;
pub const EMPTY_PAGE: u128 = 0;
const VIRTUAL_PAGE_MANAGER_DEVICES_INDEX: u32 = 0;
const EMPTY_WRITE_INDEX: u64 = 0;
const DEFAULT_ENCODING_TAG: u8 = 0;
const DEFAULT_ENCODING_ARG: u8 = 0;
pub trait VirtualPageWriteDelta: Send + 'static {
type Content: Send + Sized + 'static;
fn size(&self) -> usize;
fn get_cmd_index(&self) -> u64;
fn set_cmd_index(&mut self, cmd_index: u64);
fn get_origin_page_id(&self) -> PageId;
fn get_copied_page_id(&self) -> PageId;
fn get_type(&self) -> usize;
fn inner(self) -> Self::Content;
}
pub trait VirtualPageBuf: Clone + Send + Sync + 'static {
type Content: Send + 'static; type Delta: VirtualPageWriteDelta<Content = Self::Content>; type Output: Send + 'static; type Bin: BufMut + AsRef<[u8]> + AsMut<[u8]> + Clone + Send + Sync + 'static;
fn with_page_type(origin_page_id: PageId,
copied_page_id: PageId,
page_type: Option<usize>) -> Self;
fn get_original_page_id(&self) -> PageId;
fn get_copied_page_id(&self) -> PageId;
fn is_missing_pages(&self) -> bool;
fn get_page_type(&self) -> usize;
fn page_size(&self) -> usize;
fn read_page(&self) -> Self::Output;
fn write_page_delta(&mut self, delta: Self::Delta) -> Result<(), String>;
fn deserialize_page<Input>(&mut self, bin: Input)
where Input: AsRef<[u8]> + Send + Sized + 'static;
fn serialize_page(self) -> Self::Bin;
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct PageId(pub(crate) u128);
unsafe impl Send for PageId {}
unsafe impl Sync for PageId {}
impl From<u128> for PageId {
fn from(src: u128) -> Self {
PageId::new(src)
}
}
impl From<PageId> for u128 {
fn from(src: PageId) -> Self {
src.0
}
}
impl Deref for PageId {
type Target = u128;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<u128> for PageId {
fn as_ref(&self) -> &u128 {
&self.0
}
}
impl PageId {
#[inline]
pub(crate) fn new(id: u128) -> Self {
PageId(id)
}
#[inline]
pub fn empty() -> Self {
PageId(EMPTY_PAGE)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0 == EMPTY_PAGE
}
#[inline]
pub fn is_reserved(&self) -> bool {
self.owner_uid() == 0
&& self.device_offset() == 0
}
#[inline]
pub fn is_internal(&self) -> bool {
self.owner_uid() > 0
&& self.device_offset() == 0
}
#[inline]
pub fn is_normal(&self) -> bool {
self.owner_uid() > 0
&& self.device_offset() > 0
}
#[inline]
pub fn owner_uid(&self) -> u32 {
(self.0 >> 96) as u32
}
#[inline]
pub fn device_offset(&self) -> u32 {
(self.0 >> 64) as u32
}
#[inline]
pub fn page_uid(&self) -> u64 {
self.0 as u64
}
}
pub struct VirtualPageWriteCmd<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
>(Arc<InnerVirtualPageWriteCmd<C, D>>);
unsafe impl<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> Send for VirtualPageWriteCmd<C, D> {}
unsafe impl<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> Sync for VirtualPageWriteCmd<C, D> {}
impl<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> Clone for VirtualPageWriteCmd<C, D> {
fn clone(&self) -> Self {
VirtualPageWriteCmd(self.0.clone())
}
}
impl<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> VirtualPageWriteCmd<C, D> {
pub fn new() -> Self {
VirtualPageWriteCmd::with_capacity(8, 1)
}
pub fn with_capacity(capacity: usize,
followup_capacity: usize) -> Self {
let deltas = SpinLock::new(VecDeque::with_capacity(capacity));
let followups = SpinLock::new(VecDeque::with_capacity(followup_capacity));
let deltas_count = AtomicUsize::new(0);
let follow_up_count = AtomicUsize::new(0);
let (sender, receiver) = async_bounded(capacity + followup_capacity);
let inner = InnerVirtualPageWriteCmd {
index: AtomicU64::new(0), deltas,
followups,
deltas_count,
follow_up_count,
sender,
receiver,
};
VirtualPageWriteCmd(Arc::new(inner))
}
pub fn is_synced_deltas(&self) -> bool {
self.0.deltas_count.load(Ordering::Relaxed) == 0
}
pub fn is_synced_followup_deltas(&self) -> bool {
self.0.follow_up_count.load(Ordering::Relaxed) == 0
}
pub fn deltas_len(&self) -> usize {
self
.0
.deltas
.lock()
.len()
}
pub fn followup_len(&self) -> usize {
self
.0
.followups
.lock()
.len()
}
pub fn get_index(&self) -> u64 {
self.0.index.load(Ordering::Acquire)
}
pub fn set_index(&mut self, index: u64) {
self.0.index.store(index, Ordering::Release);
}
pub fn pop_front(&self) -> Option<D> {
let mut delta = self
.0
.deltas
.lock()
.pop_front();
if let Some(inner) = delta.as_mut() {
inner.set_cmd_index(self.get_index());
}
delta
}
pub fn append(&self, delta: D) {
let origin_page_id = delta.get_origin_page_id();
let copied_page_id = delta.get_copied_page_id();
if (origin_page_id.is_internal()
|| copied_page_id.is_internal())
&& (origin_page_id != copied_page_id) {
panic!("Follow up failed, origin_page_id: {:?}, copied_page_id: {:?}, reason: the internal page id must be the same",
origin_page_id,
copied_page_id);
}
self
.0
.deltas
.lock()
.push_back(delta);
self.0.deltas_count.fetch_add(1, Ordering::Relaxed);
}
pub fn pop_front_from_followup(&self) -> Option<D> {
let mut delta = self
.0
.followups
.lock()
.pop_front();
if let Some(inner) = delta.as_mut() {
inner.set_cmd_index(self.get_index());
}
delta
}
pub fn follow_up(&self, delta: D) {
let origin_page_id = delta.get_origin_page_id();
let copied_page_id = delta.get_copied_page_id();
if (origin_page_id.is_internal()
|| copied_page_id.is_internal())
&& (origin_page_id != copied_page_id) {
panic!("Follow up failed, origin_page_id: {:?}, copied_page_id: {:?}, reason: the internal page id must be the same",
origin_page_id,
copied_page_id);
}
self
.0
.followups
.lock()
.push_back(delta);
self.0.follow_up_count.fetch_add(1, Ordering::Relaxed);
}
}
impl<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> VirtualPageWriteCmd<C, D> {
pub async fn callback_by_sync(&self, result: IOResult<u64>) -> Result<(), String> {
if let Ok(count) = &result {
let sync_count = *count as usize;
let current_deltas_count = self
.0
.deltas_count
.load(Ordering::Relaxed);
if current_deltas_count == 0 {
let current_follow_up_count = self
.0
.follow_up_count
.load(Ordering::Relaxed);
self
.0
.follow_up_count
.store(current_follow_up_count
.checked_sub(sync_count)
.unwrap_or(0),
Ordering::Relaxed); } else {
self
.0
.deltas_count
.store(current_deltas_count
.checked_sub(sync_count)
.unwrap_or(0),
Ordering::Relaxed); }
}
if let Err(e) = self.0.sender.send(result).await {
return Err(format!("Callbak by sync failed, reason: {:?}", e));
}
Ok(())
}
pub async fn wait_write_sync(&self) -> IOResult<u64> {
loop {
match self
.0
.receiver
.recv()
.await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Wait write through failed, deltas_count: {}, reason: {:?}", self.0.deltas_count.load(Ordering::Relaxed), e)));
},
Ok(Err(e)) => {
return Err(Error::new(ErrorKind::Other, format!("Wait write through failed, deltas_count: {}, reason: {:?}", self.0.deltas_count.load(Ordering::Relaxed), e)));
},
Ok(Ok(_count)) => {
if self
.0
.deltas_count
.load(Ordering::Relaxed) == 0 {
break;
}
},
}
}
Ok(self.0.index.load(Ordering::Relaxed))
}
pub async fn wait_write_follow_up_sync(&self) -> IOResult<u64> {
loop {
match self
.0
.receiver
.recv()
.await {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Wait write through failed, deltas_count: {}, reason: {:?}", self.0.deltas_count.load(Ordering::Relaxed), e)));
},
Ok(Err(e)) => {
return Err(Error::new(ErrorKind::Other, format!("Wait write through failed, deltas_count: {}, reason: {:?}", self.0.deltas_count.load(Ordering::Relaxed), e)));
},
Ok(Ok(_count)) => {
if self
.0
.follow_up_count
.load(Ordering::Relaxed) == 0 {
break;
}
},
}
}
Ok(self.0.index.load(Ordering::Relaxed))
}
}
pub struct InnerVirtualPageWriteCmd<
C: Send + 'static,
D: VirtualPageWriteDelta<Content = C>,
> {
index: AtomicU64, deltas: SpinLock<VecDeque<D>>, followups: SpinLock<VecDeque<D>>, deltas_count: AtomicUsize, follow_up_count: AtomicUsize, sender: AsyncSender<IOResult<u64>>, receiver: AsyncReceiver<IOResult<u64>>, }
#[derive(Debug, Clone)]
pub struct WriteIndex(u64);
unsafe impl Send for WriteIndex {}
unsafe impl Sync for WriteIndex {}
impl From<u64> for WriteIndex {
fn from(src: u64) -> Self {
WriteIndex::new(src)
}
}
impl From<WriteIndex> for u64 {
fn from(src: WriteIndex) -> Self {
src.0
}
}
impl Deref for WriteIndex {
type Target = u64;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<u64> for WriteIndex {
fn as_ref(&self) -> &u64 {
&self.0
}
}
impl WriteIndex {
#[inline]
pub(crate) fn new(index: u64) -> Self {
WriteIndex(index)
}
#[inline]
pub fn empty() -> Self {
WriteIndex(EMPTY_WRITE_INDEX)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.0 == EMPTY_WRITE_INDEX
}
}
pub trait VirtualPageEncoding: Send + Sync + 'static {
type Raw: AsRef<[u8]> + Send + Sync + 'static;
type Encoded: AsRef<[u8]> + Send + Sync + 'static;
fn encoding_type(&self) -> VirtualPageEncodingType;
fn encode(&self, raw: Self::Raw) -> IOResult<Self::Encoded>;
fn decode(&self, encoded: Self::Encoded, page_len: usize) -> IOResult<Self::Raw>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VirtualPageEncodingType {
Empty, LZ4(u8), }
impl From<(u8, u8)> for VirtualPageEncodingType {
fn from((tag, arg): (u8, u8)) -> Self {
match tag {
0 => VirtualPageEncodingType::Empty,
_ => VirtualPageEncodingType::LZ4(arg),
}
}
}
impl From<VirtualPageEncodingType> for (u8, u8) {
fn from(value: VirtualPageEncodingType) -> Self {
match value {
VirtualPageEncodingType::Empty => (DEFAULT_ENCODING_TAG, DEFAULT_ENCODING_ARG),
VirtualPageEncodingType::LZ4(tag) => (1, tag),
}
}
}
impl VirtualPageEncodingType {
pub fn is_empty(&self) -> bool {
if let VirtualPageEncodingType::Empty = self {
true
} else {
false
}
}
pub fn is_lz4(&self) -> bool {
if let VirtualPageEncodingType::LZ4(_) = self {
true
} else {
false
}
}
}
impl VirtualPageEncodingType {
pub fn empty() -> VirtualPageEncodingType {
VirtualPageEncodingType::Empty
}
pub fn with_lz4(level: u8) -> VirtualPageEncodingType {
VirtualPageEncodingType::LZ4(level)
}
pub fn encoding_tag(&self) -> u8 {
match self {
VirtualPageEncodingType::Empty => DEFAULT_ENCODING_TAG,
VirtualPageEncodingType::LZ4(_) => 1,
}
}
pub fn encoding_arg(&self) -> u8 {
match self {
VirtualPageEncodingType::Empty => DEFAULT_ENCODING_ARG,
VirtualPageEncodingType::LZ4(tag) => *tag,
}
}
}
#[derive(Debug, Clone)]
pub struct DefaultVirtualPageEncoder<B: AsRef<[u8]> + Send + Sync + 'static>{
encoding_type: VirtualPageEncodingType, marker: PhantomData<B>,
}
impl<B: AsRef<[u8]> + Send + Sync + 'static> Default for DefaultVirtualPageEncoder<B> {
fn default() -> Self {
DefaultVirtualPageEncoder {
encoding_type: VirtualPageEncodingType::empty(),
marker: PhantomData,
}
}
}
impl<B: AsRef<[u8]> + Send + Sync + 'static> VirtualPageEncoding for DefaultVirtualPageEncoder<B> {
type Raw = B;
type Encoded = B;
fn encoding_type(&self) -> VirtualPageEncodingType {
self.encoding_type
}
fn encode(&self, raw: Self::Raw) -> IOResult<Self::Encoded> {
Ok(raw)
}
fn decode(&self, encoded: Self::Encoded, _page_len: usize) -> IOResult<Self::Raw> {
Ok(encoded)
}
}