use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
use std::collections::VecDeque;
use std::io::{Error, Result, ErrorKind};
use std::sync::{Arc,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}};
use std::time::Instant;
use futures::future::{FutureExt, BoxFuture};
use async_lock::Mutex;
use bytes::BufMut;
use log::{info, warn};
use pi_guid::Guid;
use pi_hash::XHashMap;
use pi_async_rt::{lock::spin_lock::SpinLock,
rt::{AsyncRuntime, multi_thread::MultiTaskRuntime}};
use pi_async_transaction::AsyncCommitLog;
use crate::log_store::log_file::{PairLoader, LogMethod, LogFile, log_file_name_to_usize, PairLoaderExt};
const DEFAULT_COMMIT_LOG_FILE_SIZE: usize = 16 * 1024 * 1024 * 1024;
const DEFAULT_LOAD_BUFFER_LEN: u64 = 8192;
const DEFAULT_COMMIT_LOG_BLOCK_SIZE: usize = 8192;
const DEFAULT_DELAY_COMMIT_TIMEOUT: usize = 1;
const DEFAULT_COMMIT_LOG_FILE_MAX_LIMIT: u64 = 32 * 1024 * 1024;
const DEFAULT_COMMIT_LOG_COLLECT_INTERVAL: usize = 10 * 1000;
pub trait CommitLoggerExt: AsyncCommitLog {
fn start_replay_ext<B, F>(&self, callback: Arc<F>)
-> BoxFuture<'static, Result<(usize, usize)>>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Option<(Self::Cid, LogMethod, u64, B)>) -> Result<()> + Send + Sync + 'static;
}
pub struct CommitLoggerBuilder {
rt: MultiTaskRuntime<()>, path: PathBuf, log_block_limit: usize, delay_timeout: usize, log_file_limit: u64, collect_interval: usize, }
unsafe impl Send for CommitLoggerBuilder {}
unsafe impl Sync for CommitLoggerBuilder {}
impl CommitLoggerBuilder {
pub fn new<P: AsRef<Path>>(rt: MultiTaskRuntime<()>,
dir: P) -> Self {
CommitLoggerBuilder {
rt,
path: dir.as_ref().to_path_buf(),
log_block_limit: DEFAULT_COMMIT_LOG_BLOCK_SIZE,
delay_timeout: DEFAULT_DELAY_COMMIT_TIMEOUT,
log_file_limit: DEFAULT_COMMIT_LOG_FILE_MAX_LIMIT,
collect_interval: DEFAULT_COMMIT_LOG_COLLECT_INTERVAL,
}
}
pub fn log_block_limit(mut self, mut limit: usize) -> Self {
if limit < 2048 || limit > 32 * 1024 * 1024 {
limit = DEFAULT_COMMIT_LOG_BLOCK_SIZE
}
self.log_block_limit = limit;
self
}
pub fn delay_timeout(mut self, mut timeout: usize) -> Self {
if timeout < 1 || timeout > 10 {
timeout = DEFAULT_DELAY_COMMIT_TIMEOUT
}
self.delay_timeout = timeout;
self
}
pub fn log_file_limit(mut self, mut limit: u64) -> Self {
if limit < 2 * 1024 * 1024 || limit > 2 * 1024 * 1024 * 1024 {
limit = DEFAULT_COMMIT_LOG_FILE_MAX_LIMIT;
}
self.log_file_limit = limit;
self
}
pub fn collect_interval(mut self, mut interval: usize) -> Self {
if interval < 5 * 1000 || interval > 5 * 60 * 1000 {
interval = DEFAULT_COMMIT_LOG_COLLECT_INTERVAL;
}
self.collect_interval = interval;
self
}
pub async fn build(mut self) -> Result<CommitLogger> {
let file = LogFile::open(self.rt.clone(),
self.path.clone(),
self.log_block_limit,
DEFAULT_COMMIT_LOG_FILE_SIZE, None).await?;
let rt = self.rt;
let delay_timeout = self.delay_timeout;
let log_file_limit = self.log_file_limit;
let writed_size = AtomicU64::new(0); let check_point_counter = Arc::new(AtomicU64::new(0)); let check_point_path = Arc::new(file.writable_path().unwrap()); let writable = SpinLock::new((check_point_counter, check_point_path)); let only_reads = SpinLock::new(VecDeque::new());
let check_points = Mutex::new(XHashMap::default());
let is_replaying = AtomicBool::new(false); let replay_only_reads = SpinLock::new(VecDeque::new());
let replay_confirm_buf = SpinLock::new(VecDeque::new());
let replay_file_stats = SpinLock::new(XHashMap::default());
let replay_path_counters = SpinLock::new(XHashMap::default());
let replay_duplicate_commit_uids = AtomicUsize::new(0);
let commit_log_count = AtomicUsize::new(0);
let confirm_commited_count = AtomicUsize::new(0);
let inner = InnerCommitLogger {
rt: rt.clone(),
file,
delay_timeout,
log_file_limit,
writed_size,
writable,
only_reads,
check_points,
is_replaying,
replay_only_reads,
replay_confirm_buf,
replay_file_stats,
replay_path_counters,
replay_duplicate_commit_uids,
commit_log_count,
confirm_commited_count,
};
let commit_logger = CommitLogger(Arc::new(inner));
let commit_logger_copy = commit_logger.clone();
let timeout = self.collect_interval;
let _ = rt.spawn(async move {
loop {
collect_commit_logger(&commit_logger_copy, timeout).await;
}
});
Ok(commit_logger)
}
}
#[derive(Clone)]
pub struct CommitLogger(Arc<InnerCommitLogger>);
unsafe impl Send for CommitLogger {}
unsafe impl Sync for CommitLogger {}
impl AsyncCommitLog for CommitLogger {
type C = usize;
type Cid = Guid;
fn append<B>(&self, commit_uid: Self::Cid, log: B) -> BoxFuture<'static, Result<Self::C>>
where B: BufMut + AsRef<[u8]> + Send + Sized + 'static {
let logger = self.clone();
async move {
let started = Instant::now();
let input_len = log.as_ref().len();
eprintln!(
"pi_store commit_append_enter commit_uid={:?} log_path={:?} input_len={} writable_size={} append_total={}",
commit_uid,
logger.0.file.path(),
input_len,
logger.0.file.writable_size(),
logger.0.commit_log_count.load(Ordering::Relaxed),
);
if log.as_ref().len() == 0 {
eprintln!(
"pi_store commit_append_inner_ok commit_uid={:?} log_path={:?} input_len=0 log_handle=0 elapsed_ms={}",
commit_uid,
logger.0.file.path(),
started.elapsed().as_millis(),
);
return Ok(0);
}
eprintln!(
"pi_store append_check_points_lock_begin commit_uid={:?} log_path={:?} input_len={} writed_size={} log_file_limit={} is_replaying={} append_total={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
input_len,
logger.0.writed_size.load(Ordering::Relaxed),
logger.0.log_file_limit,
logger.0.is_replaying.load(Ordering::Relaxed),
logger.0.commit_log_count.load(Ordering::Relaxed),
started.elapsed().as_millis(),
);
let mut check_pointes_locked = logger.0.check_points.lock().await;
eprintln!(
"pi_store commit_append_lock_ok commit_uid={:?} log_path={:?} input_len={} check_points_len={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
input_len,
check_pointes_locked.len(),
started.elapsed().as_millis(),
);
eprintln!(
"pi_store commit_append_inner_begin commit_uid={:?} log_path={:?} input_len={} writable_size={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
input_len,
logger.0.file.writable_size(),
started.elapsed().as_millis(),
);
let log_handle = logger.0.file.append(LogMethod::PlainAppend,
commit_uid.0.to_le_bytes().as_ref(),
log.as_ref());
eprintln!(
"pi_store commit_append_inner_ok commit_uid={:?} log_path={:?} input_len={} log_handle={} writable_size={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
input_len,
log_handle,
logger.0.file.writable_size(),
started.elapsed().as_millis(),
);
logger.0.writed_size.fetch_add(input_len as u64 + 16, Ordering::Relaxed);
logger.0.commit_log_count.fetch_add(1, Ordering::Relaxed);
let (counter, path) = &*logger.0.writable.lock();
counter.fetch_add(1, Ordering::AcqRel); check_pointes_locked.insert(commit_uid.clone(), (counter.clone(), path.clone()));
eprintln!(
"pi_store commit_append_done commit_uid={:?} log_path={:?} input_len={} log_handle={} check_points_len={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
input_len,
log_handle,
check_pointes_locked.len(),
started.elapsed().as_millis(),
);
Ok(log_handle)
}.boxed()
}
fn flush(&self, log_handle: Self::C) -> BoxFuture<'static, Result<()>> {
let mut logger = self.clone();
async move {
let started = Instant::now();
eprintln!(
"pi_store commit_flush_enter log_path={:?} log_handle={} delay_timeout={} commited_uid={} writable_size={}",
logger.0.file.path(),
log_handle,
logger.0.delay_timeout,
logger.0.file.commited_uid(),
logger.0.file.writable_size(),
);
let result = logger.0.file.delay_commit(log_handle,
false,
logger.0.delay_timeout).await;
match &result {
Ok(_) => eprintln!(
"pi_store commit_flush_ok log_path={:?} log_handle={} commited_uid={} writable_size={} elapsed_ms={}",
logger.0.file.path(),
log_handle,
logger.0.file.commited_uid(),
logger.0.file.writable_size(),
started.elapsed().as_millis(),
),
Err(e) => eprintln!(
"pi_store commit_flush_err log_path={:?} log_handle={} commited_uid={} writable_size={} elapsed_ms={} error={:?}",
logger.0.file.path(),
log_handle,
logger.0.file.commited_uid(),
logger.0.file.writable_size(),
started.elapsed().as_millis(),
e,
),
}
result
}.boxed()
}
fn confirm(&self, commit_uid: Self::Cid) -> BoxFuture<'static, Result<()>> {
if self.0.is_replaying.load(Ordering::Relaxed) {
return self.confirm_replay(commit_uid);
}
let logger = self.clone();
async move {
let started = Instant::now();
let writed_size = logger.0.writed_size.load(Ordering::Relaxed);
eprintln!(
"pi_store confirm_check_points_lock_begin commit_uid={:?} log_path={:?} writed_size={} log_file_limit={} is_replaying={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
writed_size,
logger.0.log_file_limit,
logger.0.is_replaying.load(Ordering::Relaxed),
started.elapsed().as_millis(),
);
let mut check_pointes_locked = logger.0.check_points.lock().await;
let will_new_check_point = logger.0.writed_size.load(Ordering::Relaxed) >= logger.0.log_file_limit;
eprintln!(
"pi_store confirm_check_points_lock_ok commit_uid={:?} log_path={:?} check_points_len={} will_new_check_point={} wait_elapsed_ms={}",
commit_uid,
logger.0.file.path(),
check_pointes_locked.len(),
will_new_check_point,
started.elapsed().as_millis(),
);
if will_new_check_point {
let _ = new_check_point(&logger, false, "confirm_limit").await;
}
if let Some((counter, check_point_path)) = check_pointes_locked.remove(&commit_uid) {
logger.0.confirm_commited_count.fetch_add(1, Ordering::Relaxed);
if counter.fetch_sub(1, Ordering::AcqRel) == 1 {
let is_current_writable_check_point = check_point_path.as_ref() == logger.0.writable.lock().1.as_ref();
if is_current_writable_check_point {
let _ = new_check_point(&logger, true, "confirm_current_writable").await;
}
let mut swap = VecDeque::new();
let mut matched_only_read_path = false;
let mut promoted_now = 0usize;
let (stalled_head_path, stalled_head_is_finish_confirm, finished_behind_stalled_head) = {
let only_reads = &mut *logger.0.only_reads.lock();
for (path, is_finish_confirm) in only_reads.iter_mut() {
if check_point_path.as_ref() == path {
*is_finish_confirm = true; matched_only_read_path = true;
} else {
match path.metadata() {
Err(e) => {
warn!("Confirm commited transaction failed, path: {:?}, reason: {:?}", path, e);
},
Ok(meta) => {
if meta.len() == 0 {
*is_finish_confirm = true; }
}
}
}
}
let mut prev = true; while let Some((path, is_finish_confirm)) = only_reads.pop_front() {
if prev && is_finish_confirm {
let file_size_bytes = path.metadata().ok().map(|meta| meta.len());
let _ = logger.0.file.readable_to_back(path.clone()).await?;
on_replay_file_promoted_to_back(&logger,
&path,
file_size_bytes);
promoted_now += 1;
} else if !prev {
swap.push_back((path, is_finish_confirm));
} else {
swap.push_back((path, is_finish_confirm));
prev = false; }
}
let (head_path, head_is_finish_confirm, finished_behind_head, _) =
summarize_only_reads_queue(&swap);
(head_path, head_is_finish_confirm, finished_behind_head)
};
*logger.0.only_reads.lock() = swap;
let remaining_replay_files = logger.0.replay_file_stats.lock().len();
let remaining_only_reads = logger.0.only_reads.lock().len();
info!("Commit logger replay checkpoint confirmed, path: {:?}, matched_only_read_path: {}, current_writable_checkpoint: {}, promoted_now: {}, remaining_replay_files_waiting_confirm: {}, remaining_only_reads: {}, stalled_head_path: {:?}, stalled_head_finished: {:?}, finished_behind_stalled_head: {}",
check_point_path,
matched_only_read_path,
is_current_writable_check_point,
promoted_now,
remaining_replay_files,
remaining_only_reads,
stalled_head_path,
stalled_head_is_finish_confirm,
finished_behind_stalled_head);
}
}
eprintln!(
"pi_store confirm_check_points_scope_end commit_uid={:?} log_path={:?} check_points_len={} elapsed_ms={}",
commit_uid,
logger.0.file.path(),
check_pointes_locked.len(),
started.elapsed().as_millis(),
);
Ok(())
}.boxed()
}
fn start_replay<B, F>(&self, mut callback: Arc<F>) -> BoxFuture<'static, Result<(usize, usize)>>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Self::Cid, B) -> Result<()> + Send + Sync + 'static {
self.0.is_replaying.store(true, Ordering::SeqCst); self.0.replay_file_stats.lock().clear();
self.0.replay_path_counters.lock().clear();
self.0.replay_duplicate_commit_uids.store(0, Ordering::Relaxed);
info!("Commit logger start_replay begin");
let commit_logger = self.clone();
async move {
if let Some(writable_path) = commit_logger.0.file.writable_path() {
match writable_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, path: {:?}, reason: {:?}", writable_path, e)));
},
Ok(meta) => {
if meta.len() == 0 && commit_logger.0.file.readable_amount() == 0 {
return Ok((0, 0));
}
}
}
}
if let Err(e) = commit_logger.0.file.split().await {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, reason: {:?}", e)));
}
let mut invalid_only_read_paths = Vec::new(); let mut only_read_paths = commit_logger.0.file.all_readable_path();
for only_read_path in only_read_paths {
match only_read_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, path: {:?}, reason: {:?}", only_read_path, e)));
},
Ok(meta) => {
if meta.len() == 0 {
invalid_only_read_paths.push(only_read_path);
continue;
}
}
}
commit_logger.0.replay_only_reads.lock().push_back(only_read_path);
}
let replay_file_count = commit_logger.0.replay_only_reads.lock().len();
info!("Commit logger start_replay prepared replay files, readable_files: {}, invalid_empty_files: {}",
replay_file_count,
invalid_only_read_paths.len());
if let Some(path) = commit_logger.0.replay_only_reads.lock().pop_front() {
*commit_logger.0.writable.lock() = (Arc::new(AtomicU64::new(0)), Arc::new(path));
}
let mut loader = CommitLoggerLoader {
logger: commit_logger.clone(),
buf: Vec::new(),
log_file: None,
current_log_count: 0,
current_bytes: 0,
current_begin: None,
callback,
result: Ok((0, 0)),
marker: PhantomData,
};
if let Err(e) = commit_logger.0.file.load_before(&mut loader,
None,
DEFAULT_LOAD_BUFFER_LEN,
true).await {
return Err(e);
}
for invalid_only_read_path in invalid_only_read_paths {
if let Err(e) = commit_logger
.0
.file.readable_to_back(invalid_only_read_path.clone())
.await {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, path: {:?}, reason: {:?}", invalid_only_read_path, e)));
}
}
let replay_result = loader.result();
if let Ok((replayed_logs, replayed_bytes)) = &replay_result {
info!("Commit logger start_replay finished loading, replayed_logs: {}, replayed_bytes: {}",
replayed_logs,
replayed_bytes);
}
replay_result
}.boxed()
}
fn start_replay_by_file<B, F, G>(&self,
mut callback: Arc<F>,
mut file_finished: Arc<G>) -> BoxFuture<'static, Result<(usize, usize)>>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Self::Cid, B) -> Result<()> + Send + Sync + 'static,
G: Fn() -> Result<()> + Send + Sync + 'static {
self.0.is_replaying.store(true, Ordering::SeqCst); self.0.replay_file_stats.lock().clear();
self.0.replay_path_counters.lock().clear();
self.0.replay_duplicate_commit_uids.store(0, Ordering::Relaxed);
info!("Commit logger start_replay_by_file begin");
let commit_logger = self.clone();
async move {
if let Some(writable_path) = commit_logger.0.file.writable_path() {
match writable_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log by file failed, path: {:?}, reason: {:?}", writable_path, e)));
},
Ok(meta) => {
if meta.len() == 0 && commit_logger.0.file.readable_amount() == 0 {
return Ok((0, 0));
}
}
}
}
if let Err(e) = commit_logger.0.file.split().await {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log by file failed, reason: {:?}", e)));
}
let mut invalid_only_read_paths = Vec::new(); let mut only_read_paths = commit_logger.0.file.all_readable_path();
for only_read_path in only_read_paths {
match only_read_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log by file failed, path: {:?}, reason: {:?}", only_read_path, e)));
},
Ok(meta) => {
if meta.len() == 0 {
invalid_only_read_paths.push(only_read_path);
continue;
}
}
}
commit_logger.0.replay_only_reads.lock().push_back(only_read_path);
}
let replay_file_count = commit_logger.0.replay_only_reads.lock().len();
info!("Commit logger start_replay_by_file prepared replay files, readable_files: {}, invalid_empty_files: {}",
replay_file_count,
invalid_only_read_paths.len());
if let Some(path) = commit_logger.0.replay_only_reads.lock().pop_front() {
*commit_logger.0.writable.lock() = (Arc::new(AtomicU64::new(0)), Arc::new(path));
}
let mut loader = CommitLoggerLoaderByFile {
logger: commit_logger.clone(),
buf: Vec::new(),
log_file: None,
current_log_count: 0,
current_bytes: 0,
current_begin: None,
callback,
file_callback: file_finished,
result: Ok((0, 0)),
marker: PhantomData,
};
if let Err(e) = commit_logger.0.file.load_before(&mut loader,
None,
DEFAULT_LOAD_BUFFER_LEN,
true).await {
return Err(e);
}
for invalid_only_read_path in invalid_only_read_paths {
if let Err(e) = commit_logger
.0
.file.readable_to_back(invalid_only_read_path.clone())
.await {
return Err(Error::new(ErrorKind::Other, format!("Replay commit log by file failed, path: {:?}, reason: {:?}", invalid_only_read_path, e)));
}
}
let replay_result = loader.result();
if let Ok((replayed_logs, replayed_bytes)) = &replay_result {
info!("Commit logger start_replay_by_file finished loading, replayed_logs: {}, replayed_bytes: {}",
replayed_logs,
replayed_bytes);
}
replay_result
}.boxed()
}
fn append_replay<B>(&self, commit_uid: Self::Cid, _log: B) -> BoxFuture<'static, Result<Self::C>>
where B: BufMut + AsRef<[u8]> + Send + Sized + 'static {
let logger = self.clone();
async move {
let mut check_pointes_locked = logger.0.check_points.lock().await;
let (counter, path) = &*logger.0.writable.lock();
counter.fetch_add(1, Ordering::AcqRel); if let Some((_old_counter, old_path)) = check_pointes_locked.insert(commit_uid.clone(), (counter.clone(), path.clone())) {
let duplicate_index = logger
.0
.replay_duplicate_commit_uids
.fetch_add(1, Ordering::Relaxed)
+ 1;
if duplicate_index <= 8 {
info!("Replay commit uid overwritten during append_replay, commit_uid: {:?}, old_checkpoint_path: {:?}, new_checkpoint_path: {:?}, duplicate_index: {}",
commit_uid,
old_path,
path,
duplicate_index);
}
}
logger.0.commit_log_count.fetch_add(1, Ordering::Relaxed);
Ok(0)
}.boxed()
}
fn flush_replay(&self, _log_handle: Self::C) -> BoxFuture<'static, Result<()>> {
async move {
Ok(())
}.boxed()
}
fn confirm_replay(&self, commit_uid: Self::Cid) -> BoxFuture<'static, Result<()>> {
let logger = self.clone();
async move {
logger.0.replay_confirm_buf.lock().push_back(commit_uid);
Ok(())
}.boxed()
}
fn finish_replay(&self) -> BoxFuture<'static, Result<()>> {
let logger = self.clone();
async move {
let buffered_confirms = logger.0.replay_confirm_buf.lock().len();
let replaying_files = logger.0.replay_file_stats.lock().len();
let pending_only_reads = logger.0.only_reads.lock().len();
info!("Commit logger finish_replay begin, buffered_confirms: {}, replaying_files_waiting_confirm: {}, pending_only_reads: {}",
buffered_confirms,
replaying_files,
pending_only_reads);
logger.0.is_replaying.store(false, Ordering::SeqCst);
let replay_confirms = &mut *logger.0.replay_confirm_buf.lock();
let mut drained_confirms = 0usize;
while let Some(commit_uid) = replay_confirms.pop_front() {
let _ = logger.confirm(commit_uid).await?;
drained_confirms += 1;
}
let remaining_replay_files = logger.0.replay_file_stats.lock().len();
let remaining_only_reads = logger.0.only_reads.lock().len();
let remaining_check_points = logger.0.check_points.lock().await.len();
let replay_duplicate_commit_uids = logger
.0
.replay_duplicate_commit_uids
.load(Ordering::Relaxed);
info!("Commit logger finish_replay end, drained_confirms: {}, remaining_replay_files_waiting_confirm: {}, remaining_only_reads: {}",
drained_confirms,
remaining_replay_files,
remaining_only_reads);
if remaining_replay_files > 0 || remaining_only_reads > 0 {
let only_reads = logger.0.only_reads.lock();
let (head_path, head_is_finish_confirm, finished_behind_head, queue_total) =
summarize_only_reads_queue(&only_reads);
drop(only_reads);
let mut head_remaining_check_points = 0usize;
let mut head_counter_value = None;
if let Some(ref head_path) = head_path {
let check_points = logger.0.check_points.lock().await;
head_remaining_check_points = check_points
.values()
.filter(|(_counter, path)| path.as_ref() == head_path)
.count();
drop(check_points);
head_counter_value = logger
.0
.replay_path_counters
.lock()
.get(head_path)
.map(|counter| counter.load(Ordering::Acquire));
}
info!("Commit logger finish_replay pending promotion summary, only_reads_head_path: {:?}, only_reads_head_finished: {:?}, finished_behind_head: {}, only_reads_total: {}, remaining_replay_files_waiting_confirm: {}, remaining_check_points: {}, head_remaining_check_points: {}, head_counter_value: {:?}, replay_duplicate_commit_uids: {}",
head_path,
head_is_finish_confirm,
finished_behind_head,
queue_total,
remaining_replay_files,
remaining_check_points,
head_remaining_check_points,
head_counter_value,
replay_duplicate_commit_uids);
if head_path.is_some()
&& head_is_finish_confirm == Some(false)
&& head_remaining_check_points == 0
{
info!("Commit logger finish_replay stalled head detail, head_path: {:?}, head_counter_value: {:?}, finished_behind_head: {}, remaining_replay_files_waiting_confirm: {}",
head_path,
head_counter_value,
finished_behind_head,
remaining_replay_files);
}
}
Ok(())
}.boxed()
}
fn advance_replay_check_point(&self) -> BoxFuture<'static, Result<()>> {
let logger = self.clone();
async move {
if !logger.0.is_replaying.load(Ordering::Relaxed) {
return Err(Error::new(ErrorKind::Other,
"Advance replay check point failed, reason: commit logger is not replaying"));
}
next_check_point(&logger);
Ok(())
}.boxed()
}
fn check_point_of(&self, commit_uid: Self::Cid) -> BoxFuture<'static, Option<usize>> {
let logger = self.clone();
async move {
let check_point_path = if let Some((_counter, check_point_path)) = logger.0.check_points.lock().await.get(&commit_uid) {
check_point_path.as_ref().clone()
} else {
return None;
};
if let Some(file_name) = check_point_path.file_name() {
if let Some(file_name_str) = file_name.to_str() {
return log_file_name_to_usize(file_name_str);
}
}
None
}.boxed()
}
fn current_check_point(&self) -> BoxFuture<'static, usize> {
let logger = self.clone();
async move {
logger
.0
.file
.current_log_index()
}.boxed()
}
fn append_check_point(&self) -> BoxFuture<'static, Result<usize>> {
let logger = self.clone();
async move {
let started = Instant::now();
eprintln!(
"pi_store append_check_point_lock_begin log_path={:?} writed_size={} log_file_limit={} elapsed_ms={}",
logger.0.file.path(),
logger.0.writed_size.load(Ordering::Relaxed),
logger.0.log_file_limit,
started.elapsed().as_millis(),
);
let check_pointes_locked = logger.0.check_points.lock().await;
eprintln!(
"pi_store append_check_point_lock_ok log_path={:?} check_points_len={} wait_elapsed_ms={}",
logger.0.file.path(),
check_pointes_locked.len(),
started.elapsed().as_millis(),
);
new_check_point(&logger, false, "append_check_point").await
}.boxed()
}
fn waiting_confirm_count(&self) -> BoxFuture<'static, usize> {
let logger = self.clone();
async move {
logger
.0
.check_points
.lock()
.await
.len()
}.boxed()
}
fn append_total_count(&self) -> usize {
self
.0
.commit_log_count
.load(Ordering::Relaxed)
}
fn confirm_total_count(&self) -> usize {
self
.0
.confirm_commited_count
.load(Ordering::Relaxed)
}
}
async fn new_check_point(logger: &CommitLogger,
is_finish_confirm: bool,
caller: &'static str) -> Result<usize> {
let started = Instant::now();
eprintln!(
"pi_store new_check_point_begin caller={} log_path={:?} is_finish_confirm={} writed_size={} log_file_limit={} writable_size={} only_reads_len={}",
caller,
logger.0.file.path(),
is_finish_confirm,
logger.0.writed_size.load(Ordering::Relaxed),
logger.0.log_file_limit,
logger.0.file.writable_size(),
logger.0.only_reads.lock().len(),
);
eprintln!(
"pi_store new_check_point_split_begin caller={} log_path={:?} elapsed_ms={}",
caller,
logger.0.file.path(),
started.elapsed().as_millis(),
);
let log_index = match logger.0.file.split().await {
Ok(log_index) => {
eprintln!(
"pi_store new_check_point_split_end caller={} log_path={:?} log_index={} elapsed_ms={}",
caller,
logger.0.file.path(),
log_index,
started.elapsed().as_millis(),
);
log_index
},
Err(e) => {
eprintln!(
"pi_store new_check_point_split_err caller={} log_path={:?} elapsed_ms={} error={:?}",
caller,
logger.0.file.path(),
started.elapsed().as_millis(),
e,
);
return Err(e);
},
};
let check_point_counter = Arc::new(AtomicU64::new(0)); let check_point_path = Arc::new(logger.0.file.writable_path().unwrap()); *logger.0.writable.lock() = (check_point_counter, check_point_path);
let only_read_path = logger.0.file.last_readable_path();
logger.0.only_reads.lock().push_back((only_read_path, is_finish_confirm));
logger.0.writed_size.store(0, Ordering::Relaxed);
eprintln!(
"pi_store new_check_point_end caller={} log_path={:?} log_index={} writable_size={} only_reads_len={} elapsed_ms={}",
caller,
logger.0.file.path(),
log_index,
logger.0.file.writable_size(),
logger.0.only_reads.lock().len(),
started.elapsed().as_millis(),
);
Ok(log_index)
}
async fn collect_commit_logger(logger: &CommitLogger, timeout: usize) {
logger.0.rt.timeout(timeout).await;
let started = Instant::now();
let is_replaying = logger.0.is_replaying.load(Ordering::Relaxed);
let writed_size = logger.0.writed_size.load(Ordering::Relaxed);
eprintln!(
"pi_store collect_timeout_wake log_path={:?} timeout_ms={} is_replaying={} writed_size={} log_file_limit={} writable_size={}",
logger.0.file.path(),
timeout,
is_replaying,
writed_size,
logger.0.log_file_limit,
logger.0.file.writable_size(),
);
if is_replaying {
return;
}
eprintln!(
"pi_store collect_check_points_lock_begin log_path={:?} writed_size={} log_file_limit={} elapsed_ms={}",
logger.0.file.path(),
logger.0.writed_size.load(Ordering::Relaxed),
logger.0.log_file_limit,
started.elapsed().as_millis(),
);
let check_pointes_locked = logger.0.check_points.lock().await;
let will_new_check_point = logger.0.writed_size.load(Ordering::Relaxed) >= logger.0.log_file_limit;
eprintln!(
"pi_store collect_check_points_lock_ok log_path={:?} check_points_len={} will_new_check_point={} wait_elapsed_ms={}",
logger.0.file.path(),
check_pointes_locked.len(),
will_new_check_point,
started.elapsed().as_millis(),
);
if will_new_check_point {
let _ = new_check_point(&logger, false, "collect").await;
}
drop(check_pointes_locked); eprintln!(
"pi_store collect_check_points_scope_end log_path={:?} elapsed_ms={}",
logger.0.file.path(),
started.elapsed().as_millis(),
);
}
impl CommitLoggerExt for CommitLogger {
fn start_replay_ext<B, F>(&self, mut callback: Arc<F>)
-> BoxFuture<'static, Result<(usize, usize)>>
where B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Option<(Self::Cid, LogMethod, u64, B)>) -> Result<()> + Send + Sync + 'static
{
self.0.is_replaying.store(true, Ordering::SeqCst); let commit_logger = self.clone();
async move {
if let Some(writable_path) = commit_logger.0.file.writable_path() {
match writable_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other,
format!("Replay commit log failed, path: {:?}, reason: {:?}",
writable_path,
e)));
},
Ok(meta) => {
if meta.len() == 0 && commit_logger.0.file.readable_amount() == 0 {
return Ok((0, 0));
}
}
}
}
if let Err(e) = commit_logger.0.file.split().await {
return Err(Error::new(ErrorKind::Other,
format!("Replay commit log failed, reason: {:?}",
e)));
}
let mut invalid_only_read_paths = Vec::new(); let mut only_read_paths = commit_logger.0.file.all_readable_path();
for only_read_path in only_read_paths {
match only_read_path.metadata() {
Err(e) => {
return Err(Error::new(ErrorKind::Other,
format!("Replay commit log failed, path: {:?}, reason: {:?}",
only_read_path,
e)));
},
Ok(meta) => {
if meta.len() == 0 {
invalid_only_read_paths.push(only_read_path);
continue;
}
}
}
commit_logger.0.replay_only_reads.lock().push_back(only_read_path);
}
if let Some(path) = commit_logger.0.replay_only_reads.lock().pop_front() {
*commit_logger.0.writable.lock() = (Arc::new(AtomicU64::new(0)), Arc::new(path));
}
let mut loader = CommitLoggerLoaderExt {
logger: commit_logger.clone(),
buf: Vec::new(),
log_file: None,
callback,
result: Ok((0, 0)),
marker: PhantomData,
};
if let Err(e) = commit_logger.0.file.load_before_with_payload_time(&mut loader,
None,
DEFAULT_LOAD_BUFFER_LEN,
true).await {
return Err(e);
}
for invalid_only_read_path in invalid_only_read_paths {
if let Err(e) = commit_logger
.0
.file.readable_to_back(invalid_only_read_path.clone())
.await {
return Err(Error::new(ErrorKind::Other,
format!("Replay commit log failed, path: {:?}, reason: {:?}",
invalid_only_read_path,
e)));
}
}
loader.result()
}.boxed()
}
}
struct InnerCommitLogger {
rt: MultiTaskRuntime<()>, file: LogFile, delay_timeout: usize, log_file_limit: u64, writed_size: AtomicU64, writable: SpinLock<(Arc<AtomicU64>, Arc<PathBuf>)>, only_reads: SpinLock<VecDeque<(PathBuf, bool)>>, check_points: Mutex<XHashMap<Guid, (Arc<AtomicU64>, Arc<PathBuf>)>>, is_replaying: AtomicBool, replay_only_reads: SpinLock<VecDeque<PathBuf>>, replay_confirm_buf: SpinLock<VecDeque<Guid>>, replay_file_stats: SpinLock<XHashMap<PathBuf, ReplayFileStats>>, replay_path_counters: SpinLock<XHashMap<PathBuf, Arc<AtomicU64>>>, replay_duplicate_commit_uids: AtomicUsize, commit_log_count: AtomicUsize, confirm_commited_count: AtomicUsize, }
#[derive(Debug)]
struct ReplayFileStats {
replayed_logs: usize, replayed_bytes: usize, begin: Instant, }
struct CommitLoggerLoader<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
> {
logger: CommitLogger, buf: Vec<(Guid, Vec<u8>)>, log_file: Option<PathBuf>, current_log_count: usize, current_bytes: usize, current_begin: Option<Instant>, callback: Arc<F>, result: Result<(usize, usize)>, marker: PhantomData<B>,
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
> PairLoader for CommitLoggerLoader<B, F> {
fn is_require(&self, _log_file: Option<&PathBuf>, _key: &Vec<u8>) -> bool {
true
}
fn load(&mut self,
log_file: Option<&PathBuf>,
_method: LogMethod,
key: Vec<u8>,
value: Option<Vec<u8>>) {
if self.result.is_err() {
return;
}
if let Some(log_file) = log_file {
if self.log_file.is_none() {
self.log_file = Some(log_file.clone());
self.current_begin = Some(Instant::now());
}
if self.log_file.as_ref().unwrap() != log_file {
while let Some((commit_uid, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(commit_uid.clone(), B::from(log)) {
self.result = Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, commit_uid: {:?}, reason: {:?}", commit_uid, e)));
}
}
if self.result.is_ok() {
finish_replay_file_stats(&self.logger,
self.log_file.as_ref().unwrap().clone(),
self.current_log_count,
self.current_bytes,
self.current_begin.take().unwrap());
}
self.log_file = Some(log_file.clone());
self.current_log_count = 0;
self.current_bytes = 0;
self.current_begin = Some(Instant::now());
next_check_point(&self.logger);
}
let uid = u128::from_le_bytes(key.try_into().unwrap());
let commit_uid = Guid(uid);
if let Some(log) = value {
if let Ok((log_count, bytes_count)) = self.result {
self.result = Ok((log_count + 1, bytes_count + 16 + log.len()));
}
self.current_log_count += 1;
self.current_bytes += 16 + log.len();
self.buf.push((commit_uid, log));
}
}
}
}
fn next_check_point(logger: &CommitLogger) {
{
let (last_writable_counter, last_writable_path) = &*logger.0.writable.lock();
let only_read_path = last_writable_path.as_ref().clone();
let first_replay_only_read = {
let mut only_reads = logger.0.only_reads.lock();
let was_empty = only_reads.is_empty();
only_reads.push_back((only_read_path.clone(), false));
was_empty
};
if logger.0.is_replaying.load(Ordering::Relaxed) {
let counter = last_writable_counter.clone();
logger
.0
.replay_path_counters
.lock()
.insert(only_read_path.clone(), counter.clone());
if first_replay_only_read {
info!("Commit logger replay head only_read enqueued, path: {:?}, initial_finished: false, counter_value: {}",
only_read_path,
counter.load(Ordering::Acquire));
}
}
}
if let Some(path) = logger.0.replay_only_reads.lock().pop_front() {
let check_point_counter = Arc::new(AtomicU64::new(0)); let check_point_path = Arc::new(path); *logger.0.writable.lock() = (check_point_counter, check_point_path);
} else {
let check_point_counter = Arc::new(AtomicU64::new(0)); let check_point_path = Arc::new(logger.0.file.writable_path().unwrap()); *logger.0.writable.lock() = (check_point_counter, check_point_path);
}
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
> CommitLoggerLoader<B, F> {
pub fn result(mut self) -> Result<(usize, usize)> {
if self.buf.len() > 0 {
while let Some((commit_uid, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(commit_uid.clone(), B::from(log)) {
self.result = Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, commit_uid: {:?}, reason: {:?}", commit_uid, e)));
}
}
if self.result.is_ok() {
finish_replay_file_stats(&self.logger,
self.log_file.as_ref().unwrap().clone(),
self.current_log_count,
self.current_bytes,
self.current_begin.take().unwrap());
}
next_check_point(&self.logger);
}
self.result
}
}
struct CommitLoggerLoaderByFile<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
G: Fn() -> Result<()> + Send + 'static,
> {
logger: CommitLogger, buf: Vec<(Guid, Vec<u8>)>, log_file: Option<PathBuf>, current_log_count: usize, current_bytes: usize, current_begin: Option<Instant>, callback: Arc<F>, file_callback: Arc<G>, result: Result<(usize, usize)>, marker: PhantomData<B>,
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
G: Fn() -> Result<()> + Send + 'static,
> PairLoader for CommitLoggerLoaderByFile<B, F, G> {
fn is_require(&self, _log_file: Option<&PathBuf>, _key: &Vec<u8>) -> bool {
true
}
fn load(&mut self,
log_file: Option<&PathBuf>,
_method: LogMethod,
key: Vec<u8>,
value: Option<Vec<u8>>) {
if self.result.is_err() {
return;
}
if let Some(log_file) = log_file {
if self.log_file.is_none() {
self.log_file = Some(log_file.clone());
self.current_begin = Some(Instant::now());
}
if self.log_file.as_ref().unwrap() != log_file {
while let Some((commit_uid, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(commit_uid.clone(), B::from(log)) {
self.result = Err(Error::new(ErrorKind::Other, format!("Replay commit log by file failed, commit_uid: {:?}, reason: {:?}", commit_uid, e)));
}
}
if self.result.is_ok() {
if let Err(e) = (self.file_callback)() {
self.result = Err(Error::new(ErrorKind::Other,
format!("Replay commit log by file failed, log_file: {:?}, reason: {:?}",
self.log_file,
e)));
}
}
if self.result.is_ok() {
finish_replay_file_stats(&self.logger,
self.log_file.as_ref().unwrap().clone(),
self.current_log_count,
self.current_bytes,
self.current_begin.take().unwrap());
}
self.log_file = Some(log_file.clone());
self.current_log_count = 0;
self.current_bytes = 0;
self.current_begin = Some(Instant::now());
}
let uid = u128::from_le_bytes(key.try_into().unwrap());
let commit_uid = Guid(uid);
if let Some(log) = value {
if let Ok((log_count, bytes_count)) = self.result {
self.result = Ok((log_count + 1, bytes_count + 16 + log.len()));
}
self.current_log_count += 1;
self.current_bytes += 16 + log.len();
self.buf.push((commit_uid, log));
}
}
}
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Guid, B) -> Result<()> + Send + 'static,
G: Fn() -> Result<()> + Send + 'static,
> CommitLoggerLoaderByFile<B, F, G> {
pub fn result(mut self) -> Result<(usize, usize)> {
if self.buf.len() > 0 {
while let Some((commit_uid, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(commit_uid.clone(), B::from(log)) {
self.result = Err(Error::new(ErrorKind::Other,
format!("Replay commit log by file failed, commit_uid: {:?}, reason: {:?}",
commit_uid,
e)));
}
}
if self.result.is_ok() {
if let Err(e) = (self.file_callback)() {
self.result = Err(Error::new(ErrorKind::Other,
format!("Replay commit log by file failed, log_file: {:?}, reason: {:?}",
self.log_file,
e)));
}
}
if self.result.is_ok() {
finish_replay_file_stats(&self.logger,
self.log_file.as_ref().unwrap().clone(),
self.current_log_count,
self.current_bytes,
self.current_begin.take().unwrap());
}
}
self.result
}
}
#[inline]
fn finish_replay_file_stats(logger: &CommitLogger,
path: PathBuf,
replayed_logs: usize,
replayed_bytes: usize,
begin: Instant) {
let replay_elapsed_ms = begin.elapsed().as_millis();
info!("Replay commit log file replayed and waiting confirm, path: {:?}, logs: {}, replayed_bytes: {}, replay_elapsed_ms: {}",
path,
replayed_logs,
replayed_bytes,
replay_elapsed_ms);
logger.0.replay_file_stats.lock().insert(path,
ReplayFileStats {
replayed_logs,
replayed_bytes,
begin,
});
}
#[inline]
fn on_replay_file_promoted_to_back(logger: &CommitLogger,
path: &PathBuf,
file_size_bytes: Option<u64>) {
logger.0.replay_path_counters.lock().remove(path);
if let Some(stats) = logger.0.replay_file_stats.lock().remove(path) {
info!("Replay commit log file confirmed and promoted to .bak, path: {:?}, logs: {}, replayed_bytes: {}, file_size_bytes: {:?}, repair_confirm_elapsed_ms: {}",
path,
stats.replayed_logs,
stats.replayed_bytes,
file_size_bytes,
stats.begin.elapsed().as_millis());
info!("Replay commit log file .bak promotion settled, path: {:?}, remaining_replay_files_waiting_confirm: {}, remaining_only_reads: {}",
path,
logger.0.replay_file_stats.lock().len(),
logger.0.only_reads.lock().len());
}
}
#[inline]
fn summarize_only_reads_queue(queue: &VecDeque<(PathBuf, bool)>)
-> (Option<PathBuf>, Option<bool>, usize, usize) {
let total = queue.len();
if let Some((path, is_finish_confirm)) = queue.front() {
let finished_behind_head = queue
.iter()
.skip(1)
.filter(|(_, tail_is_finish_confirm)| *tail_is_finish_confirm)
.count();
(Some(path.clone()),
Some(*is_finish_confirm),
finished_behind_head,
total)
} else {
(None, None, 0, 0)
}
}
struct CommitLoggerLoaderExt<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Option<(Guid, LogMethod, u64, B)>) -> Result<()> + Send + 'static,
> {
logger: CommitLogger, buf: Vec<(Guid, LogMethod, u64, Vec<u8>)>, log_file: Option<PathBuf>, callback: Arc<F>, result: Result<(usize, usize)>, marker: PhantomData<B>,
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Option<(Guid, LogMethod, u64, B)>) -> Result<()> + Send + 'static,
> PairLoaderExt for CommitLoggerLoaderExt<B, F> {
fn is_require(&self,
_log_file: Option<&PathBuf>,
_payload_time: u64,
_key: &Vec<u8>) -> bool {
true
}
fn load(&mut self,
log_file: Option<&PathBuf>,
method: LogMethod,
payload_time: u64,
key: Vec<u8>,
value: Option<Vec<u8>>) {
if self.result.is_err() {
return;
}
if let Some(log_file) = log_file {
if self.log_file.is_none() {
self.log_file = Some(log_file.clone());
}
if self.log_file.as_ref().unwrap() != log_file {
while let Some((commit_uid, method, time, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(Some((commit_uid.clone(), method, time, B::from(log)))) {
self.result = Err(Error::new(ErrorKind::Other, format!("Replay commit log failed, commit_uid: {:?}, reason: {:?}", commit_uid, e)));
}
}
self.log_file = Some(log_file.clone());
next_check_point(&self.logger);
}
let uid = u128::from_le_bytes(key.try_into().unwrap());
let commit_uid = Guid(uid);
if let Some(log) = value {
if let Ok((log_count, bytes_count)) = self.result {
self.result = Ok((log_count + 1, bytes_count + 16 + log.len()));
}
self.buf.push((commit_uid, method, payload_time, log));
}
}
}
}
impl<
B: BufMut + AsRef<[u8]> + From<Vec<u8>> + Send + Sized + 'static,
F: Fn(Option<(Guid, LogMethod, u64, B)>) -> Result<()> + Send + 'static,
> CommitLoggerLoaderExt<B, F> {
pub fn result(mut self) -> Result<(usize, usize)> {
if self.buf.len() > 0 {
while let Some((commit_uid, method, time, log)) = self.buf.pop() {
if let Err(e) = (self.callback)(Some((commit_uid.clone(), method, time, B::from(log)))) {
self.result = Err(Error::new(ErrorKind::Other,
format!("Replay commit log failed, commit_uid: {:?}, reason: {:?}",
commit_uid,
e)));
}
}
next_check_point(&self.logger);
}
(self.callback)(None);
self.result
}
}