use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, Handler};
use quickwit_metastore::Metastore;
use tracing::info;
use crate::garbage_collection::run_garbage_collect;
use crate::split_store::IndexingSplitStore;
const RUN_INTERVAL: Duration = Duration::from_secs(60); const STAGED_GRACE_PERIOD: Duration = Duration::from_secs(60 * 60 * 24); const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(120);
#[derive(Debug, Clone, Default)]
pub struct GarbageCollectorCounters {
pub num_passes: usize,
pub num_deleted_files: usize,
pub num_deleted_bytes: usize,
}
#[derive(Debug)]
struct Loop;
pub struct GarbageCollector {
index_id: String,
split_store: IndexingSplitStore,
metastore: Arc<dyn Metastore>,
counters: GarbageCollectorCounters,
}
impl GarbageCollector {
pub fn new(
index_id: String,
split_store: IndexingSplitStore,
metastore: Arc<dyn Metastore>,
) -> Self {
Self {
index_id,
split_store,
metastore,
counters: GarbageCollectorCounters::default(),
}
}
}
#[async_trait]
impl Actor for GarbageCollector {
type ObservableState = GarbageCollectorCounters;
fn observable_state(&self) -> Self::ObservableState {
self.counters.clone()
}
fn name(&self) -> String {
"GarbageCollector".to_string()
}
async fn initialize(
&mut self,
ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
self.handle(Loop, ctx).await
}
}
#[async_trait]
impl Handler<Loop> for GarbageCollector {
type Reply = ();
async fn handle(
&mut self,
_: Loop,
ctx: &ActorContext<Self>,
) -> Result<(), quickwit_actors::ActorExitStatus> {
info!("garbage-collect-operation");
self.counters.num_passes += 1;
let deleted_file_entries = run_garbage_collect(
&self.index_id,
self.split_store.clone(),
self.metastore.clone(),
STAGED_GRACE_PERIOD,
DELETION_GRACE_PERIOD,
false,
Some(ctx),
)
.await?;
if !deleted_file_entries.is_empty() {
let deleted_files: HashSet<&str> = deleted_file_entries
.iter()
.map(|deleted_entry| deleted_entry.file_name.as_str())
.collect();
info!(deleted_files=?deleted_files, "gc-delete");
self.counters.num_deleted_files += deleted_file_entries.len();
self.counters.num_deleted_bytes += deleted_file_entries
.iter()
.map(|entry| entry.file_size_in_bytes as usize)
.sum::<usize>();
}
ctx.schedule_self_msg(RUN_INTERVAL, Loop).await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use quickwit_actors::Universe;
use quickwit_metastore::{MockMetastore, Split, SplitMetadata, SplitState};
use quickwit_storage::MockStorage;
use super::*;
fn make_splits(split_ids: &[&str], split_state: SplitState) -> Vec<Split> {
split_ids
.iter()
.map(|split_id| Split {
split_metadata: SplitMetadata {
split_id: split_id.to_string(),
footer_offsets: 5..20,
..Default::default()
},
split_state,
update_timestamp: 0i64,
})
.collect()
}
#[tokio::test]
async fn test_garbage_collect_calls_dependencies_appropriately() {
quickwit_common::setup_logging_for_tests();
let foo_index = "foo-index";
let mut mock_storage = MockStorage::default();
mock_storage.expect_delete().times(3).returning(|path| {
assert!(
path == Path::new("a.split")
|| path == Path::new("b.split")
|| path == Path::new("c.split")
);
Ok(())
});
let mut mock_metastore = MockMetastore::default();
mock_metastore.expect_list_splits().times(2).returning(
|index_id, split_state, _time_range, _tags| {
assert_eq!(index_id, "foo-index");
let splits = match split_state {
SplitState::Staged => make_splits(&["a"], SplitState::Staged),
SplitState::MarkedForDeletion => {
make_splits(&["a", "b", "c"], SplitState::MarkedForDeletion)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
};
Ok(splits)
},
);
mock_metastore
.expect_mark_splits_for_deletion()
.times(1)
.returning(|index_id, split_ids| {
assert_eq!(index_id, "foo-index");
assert_eq!(split_ids, vec!["a"]);
Ok(())
});
mock_metastore
.expect_delete_splits()
.times(1)
.returning(|index_id, split_ids| {
assert_eq!(index_id, "foo-index");
assert_eq!(split_ids, vec!["a", "b", "c"]);
Ok(())
});
let universe = Universe::new();
let garbage_collect_actor = GarbageCollector::new(
foo_index.to_string(),
IndexingSplitStore::create_with_no_local_store(Arc::new(mock_storage)),
Arc::new(mock_metastore),
);
let (_maibox, handler) = universe.spawn_actor(garbage_collect_actor).spawn();
let state_after_initialization = handler.process_pending_and_observe().await.state;
assert_eq!(state_after_initialization.num_passes, 1);
assert_eq!(state_after_initialization.num_deleted_files, 3);
assert_eq!(state_after_initialization.num_deleted_bytes, 60);
}
#[tokio::test]
async fn test_garbage_collect_get_calls_repeatedly() {
quickwit_common::setup_logging_for_tests();
let foo_index = "foo-index";
let mut mock_storage = MockStorage::default();
mock_storage.expect_delete().times(4).returning(|path| {
assert!(path == Path::new("a.split") || path == Path::new("b.split"));
Ok(())
});
let mut mock_metastore = MockMetastore::default();
mock_metastore.expect_list_splits().times(4).returning(
|index_id, split_state, _time_range, _tags| {
assert_eq!(index_id, "foo-index");
let splits = match split_state {
SplitState::Staged => make_splits(&["a"], SplitState::Staged),
SplitState::MarkedForDeletion => {
make_splits(&["a", "b"], SplitState::MarkedForDeletion)
}
_ => panic!("only Staged and MarkedForDeletion expected."),
};
Ok(splits)
},
);
mock_metastore
.expect_mark_splits_for_deletion()
.times(2)
.returning(|index_id, split_ids| {
assert_eq!(index_id, "foo-index");
assert_eq!(split_ids, vec!["a"]);
Ok(())
});
mock_metastore
.expect_delete_splits()
.times(2)
.returning(|index_id, split_ids| {
assert_eq!(index_id, "foo-index");
assert_eq!(split_ids, vec!["a", "b"]);
Ok(())
});
let universe = Universe::new();
let garbage_collect_actor = GarbageCollector::new(
foo_index.to_string(),
IndexingSplitStore::create_with_no_local_store(Arc::new(mock_storage)),
Arc::new(mock_metastore),
);
let (_maibox, handler) = universe.spawn_actor(garbage_collect_actor).spawn();
let state_after_initialization = handler.process_pending_and_observe().await.state;
assert_eq!(state_after_initialization.num_passes, 1);
assert_eq!(state_after_initialization.num_deleted_files, 2);
assert_eq!(state_after_initialization.num_deleted_bytes, 40);
universe.simulate_time_shift(Duration::from_secs(30)).await;
let state_after_initialization = handler.process_pending_and_observe().await.state;
assert_eq!(state_after_initialization.num_passes, 1);
assert_eq!(state_after_initialization.num_deleted_files, 2);
assert_eq!(state_after_initialization.num_deleted_bytes, 40);
universe.simulate_time_shift(RUN_INTERVAL).await;
let state_after_initialization = handler.process_pending_and_observe().await.state;
assert_eq!(state_after_initialization.num_passes, 2);
assert_eq!(state_after_initialization.num_deleted_files, 4);
assert_eq!(state_after_initialization.num_deleted_bytes, 80);
}
}