use crate::grpc::scheduler::SchedulerClient;
use chrono::Utc;
use dragonfly_api::scheduler::v2::DeleteTaskRequest;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use dragonfly_client_storage::{metadata, Storage};
use dragonfly_client_util::shutdown;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{error, info, instrument};
pub const DOWNLOAD_TASK_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60);
pub struct GC {
config: Arc<Config>,
host_id: String,
storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>,
shutdown: shutdown::Shutdown,
_shutdown_complete: mpsc::UnboundedSender<()>,
}
impl GC {
pub fn new(
config: Arc<Config>,
host_id: String,
storage: Arc<Storage>,
scheduler_client: Arc<SchedulerClient>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
Self {
config,
host_id,
storage,
scheduler_client,
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
pub async fn run(&self) {
let mut shutdown = self.shutdown.clone();
let mut interval = tokio::time::interval(self.config.gc.interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(err) = self.evict_task_by_ttl().await {
info!("failed to evict task by ttl: {}", err);
}
if let Err(err) = self.evict_task_by_disk_usage().await {
info!("failed to evict task by disk usage: {}", err);
}
if let Err(err) = self.evict_persistent_cache_task_by_ttl().await {
info!("failed to evict persistent cache task by ttl: {}", err);
}
if let Err(err) = self.evict_persistent_cache_task_by_disk_usage().await {
info!("failed to evict persistent cache task by disk usage: {}", err);
}
if let Err(err) = self.evict_persistent_task_by_ttl().await {
info!("failed to evict persistent task by ttl: {}", err);
}
if let Err(err) = self.evict_persistent_task_by_disk_usage().await {
info!("failed to evict persistent task by disk usage: {}", err);
}
}
_ = shutdown.recv() => {
info!("garbage collector shutting down");
return
}
}
}
}
#[instrument(skip_all)]
async fn evict_task_by_ttl(&self) -> Result<()> {
info!("start to evict by task ttl");
for task in self.storage.get_tasks()? {
if task.is_expired(self.config.gc.policy.task_ttl) {
self.storage.delete_task(&task.id).await;
info!("evict task {}", task.id);
self.delete_task_from_scheduler(task.clone()).await;
info!("delete task {} from scheduler", task.id);
}
}
info!("evict by task ttl done");
Ok(())
}
#[instrument(skip_all)]
async fn evict_task_by_disk_usage(&self) -> Result<()> {
let available_space = self.storage.available_space()?;
let total_space = self.storage.total_space()?;
let usage_percent = (100 - available_space * 100 / total_space) as u8;
if usage_percent >= self.config.gc.policy.disk_high_threshold_percent {
info!(
"start to evict task by disk usage, disk usage {}% is higher than high threshold {}%",
usage_percent, self.config.gc.policy.disk_high_threshold_percent
);
let need_evict_space = total_space as f64
* ((usage_percent - self.config.gc.policy.disk_low_threshold_percent) as f64
/ 100.0);
if let Err(err) = self.evict_task_space(need_evict_space as u64).await {
info!("failed to evict task by disk usage: {}", err);
}
info!("evict task by disk usage done");
}
Ok(())
}
#[instrument(skip_all)]
async fn evict_task_space(&self, need_evict_space: u64) -> Result<()> {
let mut tasks = self.storage.get_tasks()?;
tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at));
let mut evicted_space = 0;
for task in tasks {
if evicted_space >= need_evict_space {
break;
}
let task_space = match task.content_length() {
Some(content_length) => content_length,
None => {
if !task.is_failed() {
error!("task {} has no content length", task.id);
continue;
}
info!("task {} is failed, has no content length", task.id);
0
}
};
if task.is_started()
&& !task.is_finished()
&& !task.is_failed()
&& (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc())
{
info!("task {} is started and not finished, skip it", task.id);
continue;
}
self.storage.delete_task(&task.id).await;
evicted_space += task_space;
info!("evict task {} size {}", task.id, task_space);
self.delete_task_from_scheduler(task.clone()).await;
info!("delete task {} from scheduler", task.id);
}
info!("evict total size {}", evicted_space);
Ok(())
}
#[instrument(skip_all)]
async fn delete_task_from_scheduler(&self, task: metadata::Task) {
self.scheduler_client
.delete_task(DeleteTaskRequest {
host_id: self.host_id.clone(),
task_id: task.id.clone(),
})
.await
.unwrap_or_else(|err| {
error!("failed to delete peer {}: {}", task.id, err);
});
}
#[instrument(skip_all)]
async fn evict_persistent_task_by_ttl(&self) -> Result<()> {
info!("start to evict by persistent task ttl");
for task in self.storage.get_persistent_tasks()? {
if task.is_expired() {
self.storage.delete_persistent_task(&task.id).await;
info!("evict persistent task {}", task.id);
}
}
info!("evict by persistent task ttl done");
Ok(())
}
#[instrument(skip_all)]
async fn evict_persistent_task_by_disk_usage(&self) -> Result<()> {
let available_space = self.storage.available_space()?;
let total_space = self.storage.total_space()?;
let usage_percent = (100 - available_space * 100 / total_space) as u8;
if usage_percent >= self.config.gc.policy.disk_high_threshold_percent {
info!(
"start to evict persistent task by disk usage, disk usage {}% is higher than high threshold {}%",
usage_percent, self.config.gc.policy.disk_high_threshold_percent
);
let need_evict_space = total_space as f64
* ((usage_percent - self.config.gc.policy.disk_low_threshold_percent) as f64
/ 100.0);
if let Err(err) = self
.evict_persistent_task_space(need_evict_space as u64)
.await
{
info!("failed to evict task by disk usage: {}", err);
}
info!("evict persistent task by disk usage done");
}
Ok(())
}
#[instrument(skip_all)]
async fn evict_persistent_cache_task_by_ttl(&self) -> Result<()> {
info!("start to evict by persistent cache task ttl");
for task in self.storage.get_persistent_cache_tasks()? {
if task.is_expired() {
self.storage.delete_persistent_cache_task(&task.id).await;
info!("evict persistent cache task {}", task.id);
}
}
info!("evict by persistent cache task ttl done");
Ok(())
}
#[instrument(skip_all)]
async fn evict_persistent_cache_task_by_disk_usage(&self) -> Result<()> {
let available_space = self.storage.available_space()?;
let total_space = self.storage.total_space()?;
let usage_percent = (100 - available_space * 100 / total_space) as u8;
if usage_percent >= self.config.gc.policy.disk_high_threshold_percent {
info!(
"start to evict persistent cache task by disk usage, disk usage {}% is higher than high threshold {}%",
usage_percent, self.config.gc.policy.disk_high_threshold_percent
);
let need_evict_space = total_space as f64
* ((usage_percent - self.config.gc.policy.disk_low_threshold_percent) as f64
/ 100.0);
if let Err(err) = self
.evict_persistent_cache_task_space(need_evict_space as u64)
.await
{
info!("failed to evict task by disk usage: {}", err);
}
info!("evict persistent cache task by disk usage done");
}
Ok(())
}
#[instrument(skip_all)]
async fn evict_persistent_task_space(&self, need_evict_space: u64) -> Result<()> {
let mut tasks = self.storage.get_persistent_tasks()?;
tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at));
let mut evicted_space = 0;
for task in tasks {
if evicted_space >= need_evict_space {
break;
}
if task.is_persistent() {
continue;
}
if task.is_started()
&& !task.is_finished()
&& !task.is_failed()
&& (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc())
{
info!(
"persistent task {} is started and not finished, skip it",
task.id
);
continue;
}
self.storage.delete_persistent_task(&task.id).await;
let task_space = task.content_length();
evicted_space += task_space;
info!("evict persistent task {} size {}", task.id, task_space);
}
info!("evict total size {}", evicted_space);
Ok(())
}
#[instrument(skip_all)]
async fn evict_persistent_cache_task_space(&self, need_evict_space: u64) -> Result<()> {
let mut tasks = self.storage.get_persistent_cache_tasks()?;
tasks.sort_by(|a, b| a.updated_at.cmp(&b.updated_at));
let mut evicted_space = 0;
for task in tasks {
if evicted_space >= need_evict_space {
break;
}
if task.is_persistent() {
continue;
}
if task.is_started()
&& !task.is_finished()
&& !task.is_failed()
&& (task.created_at + DOWNLOAD_TASK_TIMEOUT > Utc::now().naive_utc())
{
info!(
"persistent cache task {} is started and not finished, skip it",
task.id
);
continue;
}
self.storage.delete_persistent_cache_task(&task.id).await;
let task_space = task.content_length();
evicted_space += task_space;
info!(
"evict persistent cache task {} size {}",
task.id, task_space
);
}
info!("evict total size {}", evicted_space);
Ok(())
}
}