use std::collections::BTreeMap;
use std::time::Instant;
use a2a_protocol_types::task::TaskId;
use super::{TaskEntry, TaskStoreConfig};
use super::InMemoryTaskStore;
impl InMemoryTaskStore {
pub async fn run_eviction(&self) {
let mut store = self.entries.write().await;
Self::evict(&mut store, &self.config);
}
pub(super) fn should_evict(&self, store_len: usize) -> bool {
let count = self
.write_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let over_capacity = self.config.max_capacity.is_some_and(|max| store_len > max);
count.is_multiple_of(self.config.eviction_interval) || over_capacity
}
pub(super) async fn maybe_evict(&self) {
if self
.eviction_in_progress
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Relaxed,
)
.is_err()
{
return;
}
let mut store = self.entries.write().await;
Self::evict(&mut store, &self.config);
drop(store);
self.eviction_in_progress
.store(false, std::sync::atomic::Ordering::Release);
}
pub(super) fn evict(store: &mut BTreeMap<TaskId, TaskEntry>, config: &TaskStoreConfig) {
let now = Instant::now();
if let Some(ttl) = config.task_ttl {
store.retain(|_, entry| {
if entry.task.status.state.is_terminal() {
now.duration_since(entry.last_updated) < ttl
} else {
true
}
});
}
if let Some(max) = config.max_capacity {
if store.len() > max {
let overflow = store.len() - max;
let mut terminal: Vec<(TaskId, Instant)> = store
.iter()
.filter(|(_, e)| e.task.status.state.is_terminal())
.map(|(id, e)| (id.clone(), e.last_updated))
.collect();
terminal.sort_by_key(|(_, t)| *t);
let mut removed = 0;
for (id, _) in terminal.into_iter().take(overflow) {
store.remove(&id);
removed += 1;
}
if removed < overflow && store.len() > max {
let remaining = store.len() - max;
let mut non_terminal: Vec<(TaskId, Instant)> = store
.iter()
.map(|(id, e)| (id.clone(), e.last_updated))
.collect();
non_terminal.sort_by_key(|(_, t)| *t);
for (id, _) in non_terminal.into_iter().take(remaining) {
store.remove(&id);
}
}
}
}
}
}