use std::sync::{Arc, Mutex};
use futures::future::BoxFuture;
use tokio::task::JoinHandle;
use super::compact::{self, CompactionCtx};
use crate::session::{CompactionReport, History};
type OnDone = Arc<dyn Fn(CompactionReport) -> BoxFuture<'static, ()> + Send + Sync>;
struct SlotInner {
flight: Option<JoinHandle<()>>,
}
#[derive(Clone)]
pub struct CompactionSlot {
inner: Arc<Mutex<SlotInner>>,
}
impl Default for CompactionSlot {
fn default() -> Self {
Self::new()
}
}
impl CompactionSlot {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(SlotInner { flight: None })),
}
}
#[must_use]
pub(crate) fn is_in_flight(&self) -> bool {
let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
if let Some(h) = &inner.flight
&& h.is_finished()
{
inner.flight = None;
}
inner.flight.is_some()
}
pub(crate) fn try_spawn(
&self,
history: Arc<dyn History>,
ctx: CompactionCtx,
threshold: u64,
on_done: OnDone,
) -> bool {
let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
if let Some(h) = &inner.flight
&& h.is_finished()
{
inner.flight = None;
}
if inner.flight.is_some() {
return false;
}
let slot = self.inner.clone();
let handle = tokio::spawn(async move {
run_once(history.as_ref(), &ctx, threshold, &on_done).await;
if let Ok(mut inner) = slot.lock()
&& let Some(h) = &inner.flight
&& h.is_finished()
{
inner.flight = None;
}
});
inner.flight = Some(handle);
true
}
pub(crate) async fn await_in_flight(&self) {
let handle = {
let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
inner.flight.take()
};
if let Some(handle) = handle {
let _ = handle.await;
}
}
}
async fn run_once(history: &dyn History, ctx: &CompactionCtx, threshold: u64, on_done: &OnDone) {
let messages = history.snapshot();
let Some(plan) = compact::plan(&messages, threshold) else {
return;
};
let Some(summary) = compact::summarize(ctx, &plan.head, plan.prev_summary.as_deref()).await
else {
return;
};
let summary_msg = compact::summary_message(&summary);
history.splice_prefix(plan.drop_count, summary_msg);
let tokens_after = history.token_estimate().unwrap_or(plan.tokens_before);
tracing::info!(
drop_count = plan.drop_count,
tokens_before = plan.tokens_before,
tokens_after,
"context compacted (background)"
);
on_done(CompactionReport {
tokens_before: plan.tokens_before,
tokens_after,
})
.await;
}