use std::mem;
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::collections::BTreeMap;
use std::time::{Duration, Instant, SystemTime};
use std::io::{Error, Result, ErrorKind};
use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, Ordering}};
use std::fmt::{Result as GenResult, Formatter, Debug};
use futures::future::{FutureExt, BoxFuture};
use parking_lot::Mutex;
use crossbeam_channel::{Sender, Receiver, unbounded};
use url::Url;
use bytes::{Buf, BufMut, buf::UninitSlice};
use crc32fast::Hasher;
use log::error;
use pi_guid::Guid;
use pi_async_rt::{lock::spin_lock::SpinLock,
rt::{AsyncRuntime, multi_thread::MultiTaskRuntime}};
use pi_async_file::file::{AsyncFileOptions, AsyncFile, WriteOptions};
use crate::devices::{DeviceType, DeviceValueType, DeviceDetailMap, DeviceStatus, DeviceStatistics, BlockDevice, BlockLocation, EMPTY_BLOCK_LOCATION, CommitEvent, WriteOption};
const DEFAULT_BLOCK_HEAD_LEN: usize = 16;
const DEFAULT_BLOCK_UNIT_LEN: u64 = 4096;
const MIN_BLOCK_UNIT_LEN: u64 = 1;
const MAX_BLOCK_UNIT_LEN: u64 = 65535;
const MAX_BLOCK_UNIT_COUNT: u64 = 65535;
pub struct Binary(AtomicBool, Option<Arc<Vec<u8>>>);
unsafe impl Send for Binary {}
unsafe impl Sync for Binary {}
impl Drop for Binary {
fn drop(&mut self) {
if !self.0.load(Ordering::Relaxed) {
mem::forget(self.1.take().unwrap());
}
}
}
impl Clone for Binary {
fn clone(&self) -> Self {
if let Ok(true) = self.0.compare_exchange(true,
false,
Ordering::SeqCst,
Ordering::Acquire) {
Binary(AtomicBool::new(true),
Some(unsafe { Arc::from_raw(Arc::as_ptr(self.1.as_ref().unwrap())) }))
} else {
panic!("Clone failed, reason: already be cloned");
}
}
}
impl Debug for Binary {
fn fmt(&self, f: &mut Formatter<'_>) -> GenResult {
write!(f,
"Binary<{:?}, {:?}>",
self.0.load(Ordering::Acquire), self.1.as_ref().unwrap())
}
}
impl AsRef<[u8]> for Binary {
fn as_ref(&self) -> &[u8] {
self
.1
.as_ref()
.unwrap()
.as_slice()
}
}
impl AsMut<[u8]> for Binary {
fn as_mut(&mut self) -> &mut [u8] {
if self.0.load(Ordering::Relaxed) {
Arc::get_mut(self
.1
.as_mut()
.unwrap())
.unwrap()
.as_mut_slice()
} else {
panic!("Get mut failed, reason: already be cloned");
}
}
}
unsafe impl BufMut for Binary {
fn remaining_mut(&self) -> usize {
self
.1
.as_ref()
.unwrap()
.remaining_mut()
}
unsafe fn advance_mut(&mut self, cnt: usize) {
if self.0.load(Ordering::Relaxed) {
Arc::get_mut(self.1.as_mut().unwrap())
.unwrap()
.advance_mut(cnt);
} else {
panic!("Advance mut failed, reason: already be cloned");
}
}
fn chunk_mut(&mut self) -> &mut UninitSlice {
if self.0.load(Ordering::Relaxed) {
Arc::get_mut(self.1.as_mut().unwrap())
.unwrap()
.chunk_mut()
} else {
panic!("Chunk mut failed, reason: already be cloned");
}
}
fn put<T: Buf>(&mut self, mut src: T) {
if self.0.load(Ordering::Relaxed) {
Arc::get_mut(self.1.as_mut().unwrap())
.unwrap()
.put(src);
} else {
panic!("Put to bytes failed, reason: already be cloned");
}
}
fn put_slice(&mut self, src: &[u8]) {
if self.0.load(Ordering::Relaxed) {
Arc::get_mut(self.1.as_mut().unwrap())
.unwrap()
.put_slice(src);
} else {
panic!("Put slice to bytes failed, reason: already be cloned");
}
}
}
impl Binary {
pub fn new(bin: Vec<u8>) -> Self {
Binary(AtomicBool::new(true), Some(Arc::new(bin)))
}
pub fn len(&self) -> usize {
self
.1
.as_ref()
.unwrap()
.len()
}
}
pub struct SimpleDevice {
rt: MultiTaskRuntime<()>, path: PathBuf, file: AsyncFile<()>, frees: Arc<Mutex<BTreeMap<u64, u64>>>, status: AtomicU8, block_unit_len: u64, statistics: Vec<SpinLock<u128>>, statistics_recv: Vec<Receiver<u128>>, statistics_sent: Vec<Sender<u128>>, timeling: Instant, }
unsafe impl Send for SimpleDevice {}
unsafe impl Sync for SimpleDevice {}
impl BlockDevice for SimpleDevice {
type Uid = Guid;
type Status = DeviceStatus;
type DetailKey = DeviceValueType;
type DetailVal = DeviceValueType;
type Detail = DeviceDetailMap;
type Buf = Binary;
fn is_full_free(&self) -> bool {
self.file.get_size() == 0
}
fn is_local(&self) -> bool {
true
}
fn is_persistent(&self) -> bool {
true
}
fn is_security(&self) -> bool {
false
}
fn is_safety(&self) -> bool {
false
}
fn enable_compression(&self) -> bool {
false
}
fn is_require_collect(&self) -> bool {
true
}
fn is_freed(&self, location: &BlockLocation) -> bool {
false
}
fn capacity(&self) -> Option<u64> {
None
}
fn avail_size(&self) -> Option<u64> {
None
}
fn used_size(&self) -> u64 {
self.file.get_size()
}
fn block_unit_len(&self) -> usize {
self.block_unit_len as usize
}
fn max_block_size(&self) -> usize {
self.block_unit_len as usize * MAX_BLOCK_UNIT_COUNT as usize
}
fn block_size(&self, location: &BlockLocation) -> usize {
todo!()
}
fn get_url(&self) -> Option<Url> {
if let Ok(url) = Url::from_directory_path(&self.path) {
Some(url)
} else {
None
}
}
fn get_uid(&self) -> Option<Self::Uid> {
None
}
fn get_info(&self) -> DeviceType<Self::Detail> {
DeviceType::Disk(DeviceDetailMap::new())
}
fn get_status(&self) -> Self::Status {
self
.status
.load(Ordering::Relaxed)
.into()
}
fn statistics(&self) -> Option<DeviceStatistics> {
let mut index = 0;
for receiver in &self.statistics_recv {
if let Ok(r) = receiver.recv() {
*self.statistics[index].lock() += r;
} else {
index += 1;
continue;
}
}
Some(DeviceStatistics::new(*self.statistics[0].lock(),
*self.statistics[1].lock(),
*self.statistics[2].lock(),
*self.statistics[3].lock(),
*self.statistics[4].lock(),
*self.statistics[5].lock(),
*self.statistics[6].lock(),
*self.statistics[7].lock(),
self.timeling.elapsed().as_millis()))
}
fn alloc_block(&self, size: usize) -> BoxFuture<BlockLocation> {
let url = self.get_url();
let file = self.file.clone();
let frees = self.frees.clone();
let sender0 = self.statistics_sent[0].clone();
let sender1 = self.statistics_sent[1].clone();
async move {
if size == 0 {
return EMPTY_BLOCK_LOCATION;
}
let block_align_size = block_align_size(size as u64 + DEFAULT_BLOCK_HEAD_LEN as u64, self.block_unit_len);
{
let mut locked = frees.lock();
if let Some((max, _)) = locked.last_key_value() {
let mut key = None;
for (k, _) in locked.range((Included(&(block_align_size as u64)), Included(max))) {
key = Some(*k);
break;
};
if let Some(key) = key {
let val = locked.remove(&key).unwrap();
return to_location(val, block_align_count(block_align_size, self.block_unit_len));
}
}
}
let top = file.get_size();
let block_align_pos = block_align_pos(top, self.block_unit_len);
if let Ok(inner) = file.get_inner() {
if let Err(e) = inner.set_len(top + block_align_size) {
error!("Alloc simple block failed, url: {:?}, top: {}, block_align_top: {}, size: {}, block_align_size: {}, reason: {:?}",
url,
top,
block_align_pos,
size,
block_align_size,
e);
return EMPTY_BLOCK_LOCATION;
}
let _ = sender0.send(1);
let _ = sender1.send(size as u128);
to_location(block_align_pos, block_align_count(block_align_size, self.block_unit_len))
} else {
error!("Alloc simple block failed, url: {:?}, top: {}, block_align_top: {}, size: {}, block_align_size: {}, reason: access inner file error",
url,
top,
block_align_pos,
size,
block_align_size);
EMPTY_BLOCK_LOCATION
}
}.boxed()
}
fn read(&self, location: &BlockLocation) -> BoxFuture<Result<Self::Buf>> {
let url = self.get_url();
let file = self.file.clone();
let block_unit_len = self.block_unit_len;
let sender0 = self.statistics_sent[4].clone();
let sender1 = self.statistics_sent[6].clone();
let (block_align_pos, block_align_count) = to_pos_size(location);
async move {
if block_align_count == 0 {
return Ok(Binary::new(Vec::new()));
}
let block_byte_pos = block_byte_pos(block_align_pos, block_unit_len);
let block_align_size = block_align_count_to_block_align_size(block_align_count, block_unit_len) as usize;
match file.read(block_byte_pos, block_align_size).await {
Err(e) => {
error!("Read simple block failed, url: {:?}, block_byte_pos: {}, block_align_size: {}, reason: {:?}", url, block_byte_pos, block_align_size, e);
Err(e)
},
Ok(mut bin) => {
let _ = sender0.send(1);
let _ = sender1.send(block_align_count as u128);
let (payload_len, checksum, _time) = read_header(bin.as_mut_slice());
read_payload_checked(bin.as_slice(), payload_len as usize, checksum)
}
}
}.boxed()
}
fn write(&self,
location: &BlockLocation,
buf: &Self::Buf,
_option: WriteOption) -> BoxFuture<Result<usize>> {
let url = self.get_url();
let file = self.file.clone();
let block_unit_len = self.block_unit_len;
let sender0 = self.statistics_sent[5].clone();
let sender1 = self.statistics_sent[7].clone();
let buf_copy = buf.clone();
let (block_align_pos, block_align_count) = to_pos_size(location);
async move {
if block_align_count == 0 {
return Ok(0);
}
let buf_len = buf_copy.len();
let block_byte_pos = block_byte_pos(block_align_pos, block_unit_len);
let block_align_size = block_align_count_to_block_align_size(block_align_count, block_unit_len) as usize;
if block_align_size < DEFAULT_BLOCK_HEAD_LEN + buf_len as usize {
return Err(Error::new(ErrorKind::Other, format!("Write simple block failed, url: {:?}, block_byte_pos: {}, block_align_size: {}, payload_len: {}, reason: out of block capacity", url, block_byte_pos, block_align_size, buf_len)));
}
let mut hasher = Hasher::new();
let buffer = write_header(buf_copy, hasher, buf_len);
match file.write(block_byte_pos,
buffer,
WriteOptions::Sync(true)).await {
Err(e) => {
error!("Write simple block failed, url: {:?}, block_byte_pos: {}, block_align_size: {}, reason: {:?}", url, block_byte_pos, block_align_size, e);
Err(e)
},
Ok(size) => {
let _ = sender0.send(1);
let _ = sender1.send(buf_len as u128);
Ok(size)
},
}
}.boxed()
}
fn free_block(&self, location: &BlockLocation) -> BoxFuture<bool> {
let (_block_align_pos, block_align_count) = to_pos_size(location);
async move {
if block_align_count == 0 {
return true;
}
false
}.boxed()
}
fn collect_alloced_blocks(&self, alloced: &[BlockLocation]) -> BoxFuture<Result<usize>> {
let rt = self.rt.clone();
let file = self.file.clone();
let url = self.get_url();
let frees = self.frees.clone();
let block_unit_len = self.block_unit_len;
let mut locationes = Vec::with_capacity(alloced.len());
for location in alloced {
let (align_pos, align_count) = to_pos_size(location);
locationes.push((align_pos, align_count));
}
async move {
locationes.sort();
let mut count = 0;
let mut offset = 0;
let mut top = 0;
for (align_pos, align_count) in locationes {
let block_byte_pos = block_byte_pos(align_pos, block_unit_len);
let block_align_size = block_align_count_to_block_align_size(align_count, block_unit_len);
count += 1;
if count % 10000 == 0 {
rt.timeout(1).await;
}
if block_byte_pos == 0 {
let file_length = file.get_size();
if block_align_size > file_length {
return Err(Error::new(ErrorKind::Other, format!("Collect simple block device failed, url: {:?}, last_pos: {}, last_len: {}, block_byte_pos: {}, block_align_size: {}, reason: invalid block size", url, offset, top, block_byte_pos, block_align_size)));
}
frees.lock().insert(block_align_size, 0);
offset = block_byte_pos;
top += block_align_size;
continue;
} else {
if block_byte_pos > top {
frees
.lock()
.insert(block_byte_pos - top,
block_align_pos(top, block_unit_len));
offset = block_byte_pos;
top += block_align_size;
continue;
} else if block_byte_pos == top {
offset = block_byte_pos;
top += block_align_size;
continue;
} else {
return Err(Error::new(ErrorKind::Other, format!("Collect simple block device failed, url: {:?}, last_pos: {}, last_len: {}, block_byte_pos: {}, block_align_size: {}, reason: block overlap", url, offset, top, block_byte_pos, block_align_size)));
}
}
}
Ok(frees.lock().len())
}.boxed()
}
fn blocks_iter(&self)
-> BoxFuture<Option<Box<dyn DoubleEndedIterator<Item = BlockLocation>>>> {
async {
None
}.boxed()
}
fn commit_round(&self) -> u64 {
0
}
fn on_commit(&self) -> BoxFuture<Result<Box<dyn CommitEvent>>> {
async move {
let boxed: Box<dyn CommitEvent> = Box::new(Event);
Ok(boxed)
}.boxed()
}
}
impl SimpleDevice {
pub async fn open<P: AsRef<Path>>(rt: MultiTaskRuntime<()>,
path: P,
block_unit_len: Option<u64>) -> Result<Self> {
let file = AsyncFile::open(rt.clone(), path.as_ref().to_path_buf(), AsyncFileOptions::ReadWrite).await?;
let path = path.as_ref().to_path_buf();
let frees = Arc::new(Mutex::new(BTreeMap::new()));
let status = AtomicU8::new(DeviceStatus::Inited as u8);
let block_unit_len = if let Some(len) = block_unit_len {
if len > MIN_BLOCK_UNIT_LEN {
if len < MAX_BLOCK_UNIT_LEN {
len
} else {
MAX_BLOCK_UNIT_LEN
}
} else {
MIN_BLOCK_UNIT_LEN
}
} else {
DEFAULT_BLOCK_UNIT_LEN
};
let statistics = Vec::with_capacity(8);
let mut statistics_recv = Vec::with_capacity(8);
let mut statistics_sent = Vec::with_capacity(8);
for _ in 0..8 {
let (sender, receiver) = unbounded();
statistics_recv.push(receiver);
statistics_sent.push(sender);
}
let timeling = Instant::now();
Ok(SimpleDevice {
rt,
file,
path,
frees,
status,
block_unit_len,
statistics,
statistics_recv,
statistics_sent,
timeling,
})
}
}
#[inline]
fn block_align_pos(byte_pos: u64, block_unit_len: u64) -> u64 {
byte_pos / block_unit_len
}
#[inline]
fn block_byte_pos(align_pos: u64, block_unit_len: u64) -> u64 {
align_pos * block_unit_len
}
#[inline]
fn block_align_size(header_payload_size: u64, block_unit_len: u64) -> u64 {
let real_size = header_payload_size - 1;
real_size + (block_unit_len - real_size % block_unit_len)
}
#[inline]
fn block_align_count(align_size: u64, block_unit_len: u64) -> u64 {
align_size / block_unit_len
}
#[inline]
fn block_align_count_to_block_align_size(align_count: u64, block_unit_len: u64) -> u64 {
align_count * block_unit_len
}
#[inline]
fn to_location(pos: u64, size: u64) -> BlockLocation {
BlockLocation::new((pos << 48) | (size & 0xffff))
}
#[inline]
fn to_pos_size(location: &BlockLocation) -> (u64, u64) {
let inner = *location.as_ref();
((inner >> 48) & 0xffffffffffff, inner & 0xffff)
}
fn now_unix_epoch() -> u64 {
if let Ok(time) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
return time.as_millis() as u64;
}
0
}
#[inline]
fn read_header(mut buf: &[u8]) -> (u32, u32, u64) {
let len = buf.get_u32_le();
let checksum = buf.get_u32_le();
let time = buf.get_u64_le();
(len, checksum, time)
}
#[inline]
fn read_payload_checked(buf: &[u8], len: usize, checksum: u32) -> Result<Binary> {
let payload = &buf[DEFAULT_BLOCK_HEAD_LEN..DEFAULT_BLOCK_HEAD_LEN + len];
let mut hasher = Hasher::new();
hasher.update(payload);
let hash = hasher.finalize();
if hash != checksum {
return Err(Error::new(ErrorKind::Other, format!("Read payload failed, checksum: {}, real: {}, reason: invalid checksum", checksum, hash)));
}
Ok(Binary::new(payload.to_vec()))
}
#[inline]
fn write_header(bin: Binary,
mut hasher: Hasher,
len: usize) -> Vec<u8> {
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_HEAD_LEN + len);
buf.put_u32_le(len as u32);
let slice = bin.as_ref();
hasher.update(slice);
let hash = hasher.finalize();
buf.put_u32_le(hash);
buf.put_u64_le(now_unix_epoch());
buf.put_slice(slice);
buf
}
pub struct Event;
impl CommitEvent for Event {
fn is_commiting(&self) -> bool {
false
}
fn is_commited(&self) -> bool {
!self.is_commiting()
}
fn round(&self) -> u64 {
0
}
fn time(&self) -> Duration {
Duration::default()
}
fn result(&self) -> Option<&Result<()>> {
Some(&Ok(()))
}
}