use crate::common::errors::Error;
use crate::common::intercom::SyncNotification;
use crate::common::defs::SharedSequences;
use crate::system::config::ConfigMt;
use crate::log_mgr::log_mgr::LogMgr;
use crate::storage::driver::StorageDriver;
use crate::tran_mgr::tran_mgr::TranMgr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::thread::JoinHandle;
use std::time::Duration;
use log::error;
use log::warn;
use log::info;
const CONDVAR_WAIT_INTERVAL_MS: u64 = 1000;
pub struct Checkpointer {
checkpointer_thread: JoinHandle<()>,
terminate: Arc<AtomicBool>,
checkpoint_ready: SyncNotification<bool>,
processed_data_threashold: u64,
processed_data_size: AtomicU64,
checkpoint_req_count: Arc<AtomicU32>,
}
impl Checkpointer {
pub fn new(log_mgr: LogMgr,
csns: SharedSequences,
conf: ConfigMt,
tran_mgr: TranMgr
) -> Result<Self, Error>
{
let processed_data_threashold = *conf.get_conf().get_checkpoint_data_threshold();
let terminate = Arc::new(AtomicBool::new(false));
let checkpoint_ready = SyncNotification::new(false);
let checkpoint_req_count = Arc::new(AtomicU32::new(0));
let terminate2 = terminate.clone();
let checkpoint_ready2 = checkpoint_ready.clone();
let checkpoint_req_count2 = checkpoint_req_count.clone();
let checkpointer_thread = std::thread::spawn(move || {
Self::checkpointer_thread(conf,
terminate2,
checkpoint_ready2,
log_mgr.clone(),
csns.clone(),
checkpoint_req_count2,
tran_mgr);
});
assert!(processed_data_threashold > 0);
let processed_data_size = AtomicU64::new(0);
Ok(Checkpointer {
checkpointer_thread,
terminate,
checkpoint_ready,
processed_data_threashold,
processed_data_size,
checkpoint_req_count,
})
}
pub fn register_processed_data_size(&self, size: u64) {
let prev_size = self.processed_data_size.fetch_add(size, Ordering::Relaxed);
if prev_size < self.processed_data_threashold && prev_size + size >= self.processed_data_threashold {
if let Err(e) = self.initiate_checkpoint() {
warn!("Failed to initiate checkpoint, error: {}", e);
}
self.processed_data_size.store(0, Ordering::Relaxed);
}
}
pub fn terminate(self) {
self.terminate.store(true, Ordering::Relaxed);
self.checkpointer_thread.join().unwrap();
}
fn initiate_checkpoint(&self) -> Result<(), Error> {
let mut req_count = self.checkpoint_req_count.load(Ordering::Relaxed);
while req_count < 2 {
if let Err(new_req_count) = self.checkpoint_req_count.compare_exchange(req_count, req_count + 1, Ordering::Relaxed, Ordering::Relaxed) {
req_count = new_req_count;
} else {
if req_count == 0 {
self.checkpoint_ready.send(true, false);
}
break;
}
}
Ok(())
}
fn checkpointer_thread(conf: ConfigMt,
terminate: Arc<AtomicBool>,
checkpoint_ready: SyncNotification<bool>,
log_mgr: LogMgr,
csns: SharedSequences,
checkpoint_req_count: Arc<AtomicU32>,
tran_mgr: TranMgr)
{
match StorageDriver::new(conf, csns.clone()) {
Ok(sd) => {
let mut checkpoint_csn = csns.checkpoint_csn.get_cur();
loop {
if let Some(mut lock) = checkpoint_ready.wait_for_interruptable(
&mut (|state| -> bool { ! *state }),
&mut (|| -> bool { terminate.load(Ordering::Relaxed) }),
Duration::from_millis(CONDVAR_WAIT_INTERVAL_MS)
) {
*lock = false;
drop(lock);
let mut req_count = checkpoint_req_count.load(Ordering::Relaxed);
while req_count > 0 {
req_count = checkpoint_req_count.fetch_sub(1, Ordering::Relaxed);
checkpoint_csn += 1;
if let Err(e) = log_mgr.write_checkpoint_begin(checkpoint_csn, csns.latest_commit_csn.load(Ordering::Relaxed)) {
error!("Failed to write to log about checkpoint initiation, error: {}", e);
} else {
info!("Checkpoint initiated");
csns.checkpoint_csn.set(checkpoint_csn);
if let Err(e) = sd.checkpoint(checkpoint_csn) {
error!("Failed to perform checkpoint: {}", e);
} else {
if let Err(e) = log_mgr.write_checkpoint_completed(checkpoint_csn-1,
csns.latest_commit_csn.load(Ordering::Relaxed),
tran_mgr.get_tsn()) {
error!("Failed to write to log about checkpoint completion, error: {}", e);
} else {
info!("Checkpoint completed successfully");
}
}
}
}
} else {
break;
}
}
},
Err(e) => {
error!("Failed to initialize checkpointer thread, storage driver failure, error: {}", e);
},
}
}
}