use {
crate::{
Partitions, TRACER,
database::{
chunk::RecordWriter,
gc::GcPolicy,
},
protocol::lsp::LanguageServer, scheduler::task::TaskContext,
},
std::{
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
},
};
type GcTaskFn<P, T> = Box<
dyn FnOnce(
TaskContext<P, T>,
) -> Pin<Box<dyn Future<Output = Option<RecordWriter<P>>> + Send>>
+ Send
+ 'static,
>;
pub(crate) fn periodic_gc_task<P: Partitions, T: LanguageServer<P>>(
shutdown_flag: Arc<AtomicBool>,
) -> GcTaskFn<P, T> {
Box::new(move |ctx| {
Box::pin({
let scheduler = ctx.scheduler();
async move {
let policy = GcPolicy::default();
let mut last_gc_time = Instant::now();
let mut commits_since_gc: usize = 0;
let mut last_idle_activity = Instant::now();
loop {
smol::Timer::after(Duration::from_millis(100)).await;
if shutdown_flag.load(Ordering::Acquire) {
break;
}
let oldest_epoch = scheduler.oldest_running_epoch();
if !scheduler.reaper.is_empty() {
let removed = {
otel::span!("laburnum.gc.reaper");
scheduler.reaper.reap(oldest_epoch, policy.reap_budget)
};
if removed > 0 {
otel::event!(
"laburnum.gc.reaper.removed",
"removed_count" = removed as i64
);
last_idle_activity = Instant::now();
}
}
let current_epoch = scheduler.db.get_current_epoch();
let new_commits = current_epoch.get() as usize;
let commits_delta = new_commits.saturating_sub(commits_since_gc);
let should_gc = {
let periodic_due =
last_gc_time.elapsed() >= policy.periodic_interval;
let idle_trigger = policy
.idle_timeout
.map(|timeout| last_idle_activity.elapsed() >= timeout)
.unwrap_or(false);
let commit_trigger = policy
.commit_threshold
.map(|threshold| commits_delta >= threshold)
.unwrap_or(false);
periodic_due || idle_trigger || commit_trigger
};
if !should_gc || scheduler.gc.is_active() {
continue;
}
{
otel::span!("laburnum.gc.mark_sweep");
let roots = scheduler.db.collect_index_hashes();
if !scheduler.gc.start_marking(roots.into_iter()) {
continue;
}
}
loop {
if shutdown_flag.load(Ordering::Acquire) {
scheduler.gc.finish_sweep();
break;
}
let done = P::gc_mark_tick(
&scheduler.gc,
scheduler.db.get_store().stores(),
policy.mark_budget,
);
if done {
break;
}
smol::future::yield_now().await;
}
if shutdown_flag.load(Ordering::Acquire) {
break;
}
if !scheduler.gc.finish_marking() {
scheduler.gc.finish_sweep();
continue;
}
P::gc_sweep(&scheduler.gc, scheduler.db.get_store().stores());
scheduler.gc.finish_sweep();
last_gc_time = Instant::now();
commits_since_gc = current_epoch.get() as usize;
otel::event!("laburnum.gc.mark_sweep.complete");
}
None
}
})
})
}
#[allow(dead_code)]
pub(crate) fn on_demand_gc_task<P: Partitions, T: LanguageServer<P>>(
_parent_trace_context: crate::protocol::otel::TraceContext,
) -> GcTaskFn<P, T> {
Box::new(move |ctx| {
Box::pin({
let scheduler = ctx.scheduler();
async move {
let oldest_epoch = scheduler.oldest_running_epoch();
if !scheduler.reaper.is_empty() {
let removed = {
otel::span!("laburnum.gc.on_demand");
scheduler.reaper.reap(oldest_epoch, GcPolicy::default().reap_budget)
};
if removed > 0 {
otel::event!(
"laburnum.gc.on_demand.removed",
"removed_count" = removed as i64
);
}
}
None
}
})
})
}