extern crate libc;
extern crate nix;
use crate::consts::{TAIL_ENTRY_KEY, TOMB_STONE_MARKER};
use crate::err::Error;
use crate::fs::P;
use crate::index::Index;
use crate::memtable::{Entry, MemTable, SkipMapValue, K};
use crate::sst::Table;
use crate::types::{CreatedAt, ImmutableMemTables, Key, KeyRangeHandle, ValOffset, Value};
use crate::vlog::{ValueLog, ValueLogEntry};
use crate::{err, util};
use chrono::Utc;
use crossbeam_skiplist::SkipMap;
use err::Error::*;
use futures::future::join_all;
use nix::libc::{c_int, off_t};
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use tokio::time::sleep;
extern "C" {
fn fallocate(fd: libc::c_int, mode: c_int, offset: off_t, len: off_t) -> c_int;
}
const FALLOC_FL_PUNCH_HOLE: c_int = 0x2;
const FALLOC_FL_KEEP_SIZE: c_int = 0x1;
type GCTable = Arc<RwLock<MemTable<Key>>>;
type GCLog = Arc<RwLock<ValueLog>>;
type ValidEntries = Arc<RwLock<Vec<(Key, Value, ValOffset)>>>;
type SyncedEntries = Arc<RwLock<Vec<(Key, Value, ValOffset)>>>;
type GCUpdatedEntries<K> = Arc<RwLock<SkipMap<K, SkipMapValue<ValOffset>>>>;
type Tail = usize;
type Head = usize;
#[derive(Debug)]
pub struct GC {
pub(crate) table: GCTable,
pub(crate) vlog: GCLog,
pub(crate) config: Config,
pub(crate) gc_updated_entries: GCUpdatedEntries<Key>,
pub(crate) punch_marker: Arc<Mutex<PunchMarker>>,
}
#[derive(Clone, Debug)]
pub(crate) struct Config {
pub online_gc_interval: std::time::Duration,
pub gc_chunk_size: usize,
}
#[derive(Clone, Debug, Default)]
pub struct PunchMarker {
pub(crate) punch_hole_start_offset: usize,
pub(crate) punch_hole_length: usize,
}
impl GC {
pub fn new(
online_gc_interval: std::time::Duration,
gc_chunk_size: usize,
table: GCTable,
vlog: GCLog,
gc_updated_entries: GCUpdatedEntries<Key>,
) -> Self {
Self {
table,
vlog,
punch_marker: Arc::new(Mutex::new(PunchMarker::default())),
gc_updated_entries,
config: Config {
online_gc_interval,
gc_chunk_size,
},
}
}
pub fn start_gc_worker(&self, key_range: KeyRangeHandle, read_only_memtables: ImmutableMemTables<Key>) {
let cfg = self.config.to_owned();
let memtable = self.table.clone();
let vlog = self.vlog.clone();
let table_ref = memtable.clone();
let vlog_ref = vlog.clone();
let key_range_ref = key_range.clone();
let read_only_memtables_ref = read_only_memtables.clone();
let gc_updated_entries_ref = self.gc_updated_entries.clone();
let punch_marker_ref = self.punch_marker.clone();
tokio::spawn(async move {
loop {
sleep_gc_task(cfg.online_gc_interval).await;
if !gc_updated_entries_ref.read().await.is_empty() {
continue;
}
let res = GC::gc_handler(
&cfg,
table_ref.clone(),
vlog_ref.clone(),
key_range_ref.clone(),
read_only_memtables_ref.clone(),
gc_updated_entries_ref.clone(),
punch_marker_ref.clone(),
)
.await;
match res {
Ok(_) => {
log::info!("GC successful, awaiting sync")
}
Err(err) => {
log::error!("GC Error {}", err.to_string());
}
}
}
});
}
pub(crate) async fn gc_handler(
cfg: &Config,
memtable: GCTable,
vlog: GCLog,
key_range: KeyRangeHandle,
read_only_memtables: ImmutableMemTables<Key>,
gc_updated_entries: GCUpdatedEntries<Key>,
punch_marker: Arc<Mutex<PunchMarker>>,
) -> Result<(), Error> {
let invalid_entries = Arc::new(RwLock::new(Vec::new()));
let valid_entries = Arc::new(RwLock::new(Vec::new()));
let synced_entries = Arc::new(RwLock::new(Vec::new()));
let vlog_reader = vlog.read().await;
let chunk_res = vlog_reader.read_chunk_to_garbage_collect(cfg.gc_chunk_size).await;
drop(vlog_reader);
match chunk_res {
Ok((entries, total_bytes_read)) => {
let tasks = entries.into_iter().map(|entry| {
let invalid_entries_ref = invalid_entries.clone();
let valid_entries_ref = valid_entries.clone();
let table_ref = memtable.clone();
let vlog_ref = vlog.clone();
let key_range_ref = key_range.clone();
let read_only_memtables_ref = read_only_memtables.clone();
tokio::spawn(async move {
let most_recent_value = GC::get(
std::str::from_utf8(&entry.key).unwrap(),
table_ref.clone(),
key_range_ref.clone(),
vlog_ref.clone(),
read_only_memtables_ref.clone(),
)
.await;
match most_recent_value {
Ok((value, creation_time)) => {
if entry.created_at < creation_time
|| value == TOMB_STONE_MARKER.as_bytes().to_vec()
{
invalid_entries_ref.write().await.push(entry);
} else {
valid_entries_ref.write().await.push((entry.key, value));
}
Ok(())
}
Err(err) => GC::handle_deleted_entries(invalid_entries_ref, entry, err).await,
}
})
});
let all_results = join_all(tasks).await;
for tokio_res in all_results {
match tokio_res {
Ok(res) => res?,
Err(_) => {
return Err(TokioJoin);
}
}
}
if invalid_entries.read().await.is_empty() {
return Ok(());
}
let new_tail_offset = vlog.read().await.tail_offset + total_bytes_read;
let v_offset = GC::write_tail_to_disk(Arc::clone(&vlog), new_tail_offset).await?;
synced_entries.write().await.push((
TAIL_ENTRY_KEY.to_vec(),
new_tail_offset.to_le_bytes().to_vec(),
v_offset,
));
GC::write_valid_entries_to_vlog(valid_entries, synced_entries.to_owned(), Arc::clone(&vlog))
.await?;
vlog.write().await.sync_to_disk().await?;
GC::write_valid_entries_to_store(
synced_entries.to_owned(),
memtable.clone(),
gc_updated_entries,
vlog.clone(),
)
.await?;
let mut marker_lock = punch_marker.lock().await;
marker_lock.punch_hole_start_offset = vlog.read().await.tail_offset;
marker_lock.punch_hole_length = total_bytes_read;
}
Err(err) => return Err(err),
};
Ok(())
}
pub(crate) async fn write_tail_to_disk(vlog: GCLog, new_tail_offset: usize) -> Result<ValOffset, Error> {
vlog.write()
.await
.append(
&TAIL_ENTRY_KEY.to_vec(),
&new_tail_offset.to_le_bytes().to_vec(),
Utc::now(),
false,
)
.await
}
pub(crate) async fn write_valid_entries_to_store(
valid_entries: ValidEntries,
table: GCTable,
gc_updated_entries: GCUpdatedEntries<Key>,
vlog: GCLog,
) -> Result<(), Error> {
gc_updated_entries.write().await.clear();
for (key, value, existing_v_offset) in valid_entries.to_owned().read().await.iter() {
GC::put(
key,
value,
*existing_v_offset,
table.clone(),
gc_updated_entries.clone(),
)
.await;
if existing_v_offset > &vlog.read().await.head_offset {
vlog.write().await.set_head(*existing_v_offset)
}
}
Ok(())
}
pub(crate) async fn write_valid_entries_to_vlog(
valid_entries: Arc<RwLock<Vec<(Key, Value)>>>,
synced_entries: SyncedEntries,
vlog: GCLog,
) -> Result<(), Error> {
for (key, value) in valid_entries.to_owned().read().await.iter() {
let v_offset = vlog.write().await.append(&key, &value, Utc::now(), false).await?;
synced_entries
.write()
.await
.push((key.to_owned(), value.to_owned(), v_offset));
}
Ok(())
}
#[allow(unused_variables)] pub(crate) async fn free_unused_space(&mut self) -> std::result::Result<(Head, Tail), Error> {
if !self.gc_updated_entries.read().await.is_empty() {
return Err(GCErrorAttemptToRemoveUnsyncedEntries);
}
let vlog_path = self.vlog.read().await.content.file.node.file_path.to_owned();
let marker_lock = self.punch_marker.lock().await;
#[cfg(target_os = "linux")]
{
GC::punch_holes(
vlog_path,
marker_lock.punch_hole_start_offset as i64,
marker_lock.punch_hole_length as i64,
)
.await?;
(self.vlog.write().await).tail_offset += marker_lock.punch_hole_length;
let vlog_reader = self.vlog.read().await;
Ok((vlog_reader.head_offset, vlog_reader.tail_offset))
}
#[cfg(not(target_os = "linux"))]
{
log::info!(
"{}",
GCErrorUnsupportedPlatform(String::from("File system does not support file punch hole",))
);
(self.vlog.write().await).tail_offset += marker_lock.punch_hole_length;
let vlog_reader = self.vlog.read().await;
Ok((vlog_reader.head_offset, vlog_reader.tail_offset))
}
}
#[allow(dead_code)] pub(crate) async fn punch_holes(
file_path: impl 'static + P,
offset: off_t,
length: off_t,
) -> std::result::Result<(), Error> {
let punch_handle = tokio::task::spawn_blocking(move || {
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&file_path)
.map_err(|err| Error::FileOpen {
path: file_path.as_ref().to_path_buf(),
error: err,
})?;
let fd = file.as_raw_fd();
unsafe {
let result = fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length);
if result == 0 {
Ok(())
} else {
Err(Error::GCErrorFailedToPunchHoleInVlogFile(
std::io::Error::last_os_error(),
))
}
}
});
punch_handle.await.unwrap()
}
pub(crate) async fn put(
key: impl AsRef<[u8]>,
value: impl AsRef<[u8]>,
val_offset: ValOffset,
memtable: GCTable,
gc_updated_entries: GCUpdatedEntries<Key>,
) {
let is_tombstone = value.as_ref().is_empty();
let created_at = Utc::now();
let v_offset = val_offset;
let entry = Entry::new(key.as_ref(), v_offset, created_at, is_tombstone);
memtable.write().await.insert(&entry);
gc_updated_entries.write().await.insert(
key.as_ref().to_vec(),
SkipMapValue::new(v_offset, created_at, is_tombstone),
);
}
pub(crate) async fn get(
key: impl K,
memtable: GCTable,
key_range: KeyRangeHandle,
vlog: Arc<RwLock<ValueLog>>,
read_only_memtables: ImmutableMemTables<Key>,
) -> Result<(Value, CreatedAt), Error> {
let key = key.as_ref().to_vec();
let mut offset = 0;
let lowest_insert_date = util::default_datetime();
let mut insert_time = util::default_datetime();
if let Some(value) = memtable.read().await.get(&key) {
if value.is_tombstone {
return Err(NotFoundInDB);
}
GC::get_value_from_vlog(&vlog, value.val_offset, value.created_at).await
} else {
let mut is_deleted = false;
for table in read_only_memtables.iter() {
if let Some(value) = table.value().get(&key) {
if value.created_at > insert_time {
offset = value.val_offset;
insert_time = value.created_at;
is_deleted = value.is_tombstone
}
}
}
if GC::found_in_table(insert_time, lowest_insert_date) {
if is_deleted {
return Err(NotFoundInDB);
}
GC::get_value_from_vlog(&vlog, offset, insert_time).await
} else {
let ssts = &key_range.filter_sstables_by_key_range(&key).await?;
GC::search_key_in_sstables(key, ssts.to_vec(), &vlog).await
}
}
}
pub(crate) async fn search_key_in_sstables(
key: impl AsRef<[u8]>,
ssts: Vec<Table>,
val_log: &GCLog,
) -> Result<(Value, CreatedAt), Error> {
let mut insert_time = util::default_datetime();
let lowest_insert_date = util::default_datetime();
let mut offset = 0;
let mut is_deleted = false;
for sst in ssts.iter() {
let index = Index::new(sst.index_file.path.to_owned(), sst.index_file.file.to_owned());
let block_handle = index.get(&key).await?;
if block_handle.is_some() {
let sst_res = sst.get(block_handle.unwrap(), &key).await?;
if sst_res.as_ref().is_some() {
let (val_offset, created_at, is_tombstone) = sst_res.unwrap();
if created_at > insert_time {
offset = val_offset;
insert_time = created_at;
is_deleted = is_tombstone;
}
}
}
}
if GC::found_in_table(insert_time, lowest_insert_date) {
if is_deleted {
return Err(NotFoundInDB);
}
return GC::get_value_from_vlog(val_log, offset, insert_time).await;
}
Err(NotFoundInDB)
}
pub(crate) fn found_in_table(insert_time: CreatedAt, lowest_insert_date: CreatedAt) -> bool {
insert_time > lowest_insert_date
}
pub(crate) async fn get_value_from_vlog(
val_log: &GCLog,
offset: ValOffset,
creation_at: CreatedAt,
) -> Result<(Value, CreatedAt), Error> {
let res = val_log.read().await.get(offset).await?;
if let Some((value, is_tombstone)) = res {
if is_tombstone {
return Err(NotFoundInDB);
}
return Ok((value, creation_at));
}
Err(NotFoundInDB)
}
pub(crate) async fn handle_deleted_entries(
invalid_entries: Arc<RwLock<Vec<ValueLogEntry>>>,
entry: ValueLogEntry,
err: Error,
) -> std::result::Result<(), Error> {
match err {
NotFoundInDB => {
invalid_entries.write().await.push(entry);
Ok(())
}
_ => Err(err),
}
}
}
async fn sleep_gc_task(duration: std::time::Duration) {
sleep(duration).await;
}