use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
use garage_rpc::system::System;
use garage_rpc::*;
use crate::data::*;
use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
endpoint: Arc<Endpoint<GcRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
}
impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
system,
data,
endpoint,
});
gc.endpoint.set_handler(gc.clone());
gc
}
pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
bg.spawn_worker(GcWorker::new(self.clone()));
}
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
let mut candidates = vec![];
for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
if candidates.is_empty() {
return Ok(Some(Duration::from_millis(
todo_entry.deletion_time() - now,
)));
} else {
break;
}
}
candidates.push(todo_entry);
if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
break;
}
}
let mut entries = vec![];
let mut excluded = vec![];
for mut todo_entry in candidates {
let vhash = todo_entry.value_hash;
todo_entry.value = self
.data
.store
.get(&todo_entry.key[..])?
.filter(|v| blake2sum(&v[..]) == vhash)
.map(|v| v.to_vec());
if todo_entry.value.is_some() {
entries.push(todo_entry);
if entries.len() >= TABLE_GC_BATCH_SIZE {
break;
}
} else {
excluded.push(todo_entry);
}
}
for entry in excluded {
entry.remove_if_equal(&self.data.gc_todo)?;
}
if entries.is_empty() {
return Ok(Some(Duration::from_secs(60)));
}
debug!("({}) GC: doing {} items", F::TABLE_NAME, entries.len());
let mut partitions = HashMap::new();
for entry in entries {
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
let mut nodes = self.data.replication.storage_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
if !partitions.contains_key(&nodes) {
partitions.insert(nodes.clone(), vec![]);
}
partitions.get_mut(&nodes).unwrap().push(entry);
}
let resps = join_all(
partitions
.into_iter()
.map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
)
.await;
let mut errs = vec![];
for resp in resps {
if let Err(e) = resp {
errs.push(e);
}
}
if errs.is_empty() {
Ok(None)
} else {
Err(Error::Message(
errs.into_iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
))
.err_context("in try_send_and_delete in table GC:")
}
}
async fn try_send_and_delete(
&self,
nodes: Vec<Uuid>,
mut items: Vec<GcTodoEntry>,
) -> Result<(), Error> {
let n_items = items.len();
let mut updates = vec![];
let mut deletes = vec![];
for item in items.iter_mut() {
updates.push(ByteBuf::from(item.value.take().unwrap()));
deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
}
self.system
.rpc_helper()
.try_call_many(
&self.endpoint,
&nodes,
GcRpc::Update(updates),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
info!(
"({}) GC: {} items successfully pushed, will try to delete.",
F::TABLE_NAME,
n_items
);
self.system
.rpc_helper()
.try_call_many(
&self.endpoint,
&nodes,
GcRpc::DeleteIfEqualHash(deletes),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;
for item in items {
self.data
.delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
.err_context("GC: remove from todo list after successfull GC")?;
}
Ok(())
}
}
#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
Ok(GcRpc::Ok)
}
GcRpc::DeleteIfEqualHash(items) => {
for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?;
}
Ok(GcRpc::Ok)
}
m => Err(Error::unexpected_rpc_message(m)),
}
}
}
struct GcWorker<F: TableSchema, R: TableReplication> {
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
wait_delay: Duration::from_secs(0),
}
}
}
#[async_trait]
impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.gc.gc_loop_iter().await? {
None => Ok(WorkerState::Busy),
Some(delay) => {
self.wait_delay = delay;
Ok(WorkerState::Idle)
}
}
}
async fn wait_for_work(&mut self) -> WorkerState {
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy
}
}
pub(crate) struct GcTodoEntry {
tombstone_timestamp: u64,
key: Vec<u8>,
value_hash: Hash,
value: Option<Vec<u8>>,
}
impl GcTodoEntry {
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
Self {
tombstone_timestamp: now_msec(),
key,
value_hash,
value: None,
}
}
pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
Self {
tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
key: db_k[8..].to_vec(),
value_hash: Hash::try_from(db_v).unwrap(),
value: None,
}
}
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.db().transaction(|txn| {
let key = self.todo_table_key();
if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) {
txn.remove(gc_todo_tree, &key)?;
}
Ok(())
})?;
Ok(())
}
fn todo_table_key(&self) -> Vec<u8> {
[
&u64::to_be_bytes(self.tombstone_timestamp)[..],
&self.key[..],
]
.concat()
}
fn deletion_time(&self) -> u64 {
self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64
}
}