use crate::cc::context::Context;
use crate::map::data::{CheckpointTask, FileBuilder, MapBuilder};
use crate::meta::{BlobStat, DataStat, IntervalPair, MemBlobStat, MemDataStat, PageTable};
#[cfg(feature = "extra_check")]
use crate::utils::NULL_ADDR;
use crate::utils::countblock::Countblock;
use crate::utils::data::{GatherWriter, GroupPositions, Interval};
use crate::utils::options::ParsedOptions;
use crate::utils::{Handle, MutRef};
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::RecvTimeoutError;
use std::thread::JoinHandle;
use std::{
sync::{
Arc,
atomic::Ordering::Relaxed,
mpsc::{Receiver, Sender, channel},
},
time::{Duration, Instant},
};
pub enum FlushDirective {
Skip,
Normal,
}
pub struct FlushResult {
opt: Arc<ParsedOptions>,
pub bucket_id: u64,
pub map_table: PageTable,
pub data_ivls: Vec<IntervalPair>,
pub data_stats: Vec<DataStat>,
pub blob_ivls: Vec<IntervalPair>,
pub blob_stats: Vec<BlobStat>,
pub blob_junk: Vec<u64>,
pub data_junk: Vec<u64>,
pub writers: Vec<GatherWriter>,
pub latest_chkpoint_lsn: MutRef<GroupPositions>,
}
impl FlushResult {
pub fn sync(&mut self) {
let has_outputs = !self.writers.is_empty();
if self.opt.sync_on_write {
for mut x in self.writers.drain(..) {
x.sync();
}
} else {
for mut x in self.writers.drain(..) {
x.sync_data();
}
}
if has_outputs {
self.opt.sync_data_dir();
}
}
}
pub trait CheckpointObserver: Send + Sync {
fn flush_directive(&self, bucket_id: u64) -> FlushDirective;
fn stage_unsynced_data_file(&self, file_id: u64);
fn stage_unsynced_blob_file(&self, file_id: u64);
fn stage_orphan_data_file(&self, file_id: u64);
fn stage_orphan_blob_file(&self, file_id: u64);
fn update_data_mem_interval_stat(&self, ivl: IntervalPair, stat: MemDataStat);
fn update_blob_mem_interval_stat(&self, ivl: IntervalPair, stat: MemBlobStat);
fn on_checkpoint(&self, result: FlushResult);
fn finish_checkpoint(&self);
}
fn checkpoint(mut task: CheckpointTask, ctx: Handle<Context>, observer: &dyn CheckpointObserver) {
let bucket_id = task.bucket_id;
let mut snapshot = task.snapshot();
let mut map_builder = MapBuilder::new(bucket_id, &snapshot.unmap_pid);
let mut file_builder = FileBuilder::new(bucket_id);
let pages = std::mem::take(&mut snapshot.pages);
for b in pages {
map_builder.add(&b);
file_builder.add(b);
}
let mapping = map_builder.table();
#[cfg(feature = "extra_check")]
for (&pid, &addr) in mapping.iter() {
assert!(
addr == NULL_ADDR || addr <= task.snap_addr,
"map addr {} for pid {} exceeds snap_addr {}",
addr,
pid,
task.snap_addr
);
}
if file_builder.is_empty() {
observer.on_checkpoint(FlushResult {
opt: ctx.opt.clone(),
bucket_id,
map_table: mapping,
data_ivls: Vec::new(),
data_stats: Vec::new(),
blob_ivls: Vec::new(),
blob_stats: Vec::new(),
blob_junk: std::mem::take(&mut snapshot.blob_junk),
data_junk: std::mem::take(&mut snapshot.data_junk),
writers: Vec::new(),
latest_chkpoint_lsn: task.last_chkpt_lsn.clone(),
});
task.done(snapshot);
return;
}
let mut data_ivls = Vec::new();
let mut data_stats = Vec::new();
let mut blob_ivls = Vec::new();
let mut blob_stats = Vec::new();
let mut writers = Vec::new();
let actual_bytes = file_builder.io_bytes();
let io_started = Instant::now();
if file_builder.has_data() {
let data_files = file_builder.flush_data_files(
ctx.opt.data_file_size,
|| {
let data_file_id = ctx.numerics.next_data_id.fetch_add(1, Relaxed);
observer.stage_orphan_data_file(data_file_id);
observer.stage_unsynced_data_file(data_file_id);
(data_file_id, ctx.opt.data_file(data_file_id))
},
|bytes| {
task.mark_checkpoint_progress(bytes);
},
|file| {
let ivl =
IntervalPair::new(file.interval.lo, file.interval.hi, file.file_id, bucket_id);
observer.update_data_mem_interval_stat(ivl, file.stat.clone_mem());
task.release_persisted_pages(&file.addrs);
},
);
for file in data_files {
let Interval { lo, hi } = file.interval;
data_ivls.push(IntervalPair::new(lo, hi, file.file_id, bucket_id));
data_stats.push(file.stat.copy());
writers.push(file.writer);
}
}
if file_builder.has_blob() {
let blob_files = file_builder.flush_blob_files(
ctx.opt.blob_file_size,
|| {
let blob_file_id = ctx.numerics.next_blob_id.fetch_add(1, Relaxed);
observer.stage_orphan_blob_file(blob_file_id);
observer.stage_unsynced_blob_file(blob_file_id);
(blob_file_id, ctx.opt.blob_file(blob_file_id))
},
|bytes| {
task.mark_checkpoint_progress(bytes);
},
|file| {
let ivl =
IntervalPair::new(file.interval.lo, file.interval.hi, file.file_id, bucket_id);
observer.update_blob_mem_interval_stat(ivl, file.stat.clone_mem());
task.release_persisted_pages(&file.addrs);
},
);
for file in blob_files {
let Interval { lo, hi } = file.interval;
blob_ivls.push(IntervalPair::new(lo, hi, file.file_id, bucket_id));
blob_stats.push(file.stat.copy());
writers.push(file.writer);
}
}
#[cfg(feature = "failpoints")]
crate::utils::failpoint::crash("mace_flush_after_data_sync");
task.mark_io_built(actual_bytes, io_started.elapsed());
observer.on_checkpoint(FlushResult {
opt: ctx.opt.clone(),
bucket_id,
map_table: mapping,
data_ivls,
data_stats,
blob_ivls,
blob_stats,
blob_junk: std::mem::take(&mut snapshot.blob_junk),
data_junk: std::mem::take(&mut snapshot.data_junk),
writers,
latest_chkpoint_lsn: task.last_chkpt_lsn.clone(),
});
task.done(snapshot);
}
fn process_task(
q: &mut VecDeque<CheckpointTask>,
ctx: Handle<Context>,
observer: &dyn CheckpointObserver,
) {
let mut processed_checkpoint = false;
while let Some(task) = q.pop_front() {
let directive = observer.flush_directive(task.bucket_id);
if let FlushDirective::Skip = directive {
task.force_done();
continue;
}
checkpoint(task, ctx, observer);
processed_checkpoint = true;
}
if processed_checkpoint {
observer.finish_checkpoint();
}
}
fn checkpoint_thread(
rx: Receiver<CheckpointTask>,
ctx: Handle<Context>,
observer: Arc<dyn CheckpointObserver>,
sync: Arc<Notifier>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("checkpointer".into())
.spawn(move || {
let mut q = VecDeque::new();
while !sync.is_quit() {
match rx.recv_timeout(Duration::from_millis(1)) {
Ok(x) => q.push_back(x),
Err(RecvTimeoutError::Disconnected) => break,
_ => {}
}
process_task(&mut q, ctx, observer.as_ref());
}
process_task(&mut q, ctx, observer.as_ref());
sync.notify_done();
log::info!("checkpoint thread exit");
})
.expect("can't build checkpoint thread")
}
struct Notifier {
quit: AtomicBool,
sem: Countblock,
}
impl Notifier {
fn new() -> Self {
Self {
quit: AtomicBool::new(false),
sem: Countblock::new(0),
}
}
fn is_quit(&self) -> bool {
self.quit.load(Relaxed)
}
fn wait_done(&self) {
self.sem.wait();
}
fn notify_quit(&self) {
self.quit.store(true, Relaxed);
}
fn notify_done(&self) {
self.sem.post();
}
}
#[derive(Clone)]
pub struct Checkpoint {
pub tx: Sender<CheckpointTask>,
sync: Arc<Notifier>,
}
impl Checkpoint {
pub fn new(ctx: Handle<Context>, observer: Arc<dyn CheckpointObserver>) -> Self {
let (tx, rx) = channel();
let sync = Arc::new(Notifier::new());
checkpoint_thread(rx, ctx, observer, sync.clone());
Self { tx, sync }
}
pub fn quit(&self) {
self.sync.notify_quit();
self.sync.wait_done();
}
}