use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use crate::memory::config::MemoryConfig;
use crate::memory::tree::bucket_seal::{cascade_all_from, LabelStrategy};
use crate::memory::tree::store::{self, DEFAULT_FLUSH_AGE_SECS};
use crate::memory::tree::summarise::Summariser;
pub async fn flush_stale_buffers(
config: &MemoryConfig,
max_age: Duration,
summariser: &dyn Summariser,
strategy: &LabelStrategy,
) -> Result<usize> {
let now = Utc::now();
let cutoff = now - max_age;
let stale = store::list_stale_buffers(config, cutoff)?;
let distinct_tree_ids: Vec<String> = {
let mut seen = std::collections::HashSet::new();
let mut out = Vec::new();
for buf in &stale {
if seen.insert(buf.tree_id.clone()) {
out.push(buf.tree_id.clone());
}
}
out
};
let tree_by_id = store::get_trees_batch(config, &distinct_tree_ids)?;
let mut seals = 0;
for buf in stale {
let Some(tree) = tree_by_id.get(&buf.tree_id) else {
continue; };
let sealed =
cascade_all_from(config, tree, buf.level, Some(now), summariser, strategy).await?;
seals += sealed.len();
}
Ok(seals)
}
pub async fn flush_stale_buffers_default(
config: &MemoryConfig,
summariser: &dyn Summariser,
strategy: &LabelStrategy,
) -> Result<usize> {
flush_stale_buffers(
config,
Duration::seconds(DEFAULT_FLUSH_AGE_SECS),
summariser,
strategy,
)
.await
}
pub async fn force_flush_tree(
config: &MemoryConfig,
tree_id: &str,
now: Option<DateTime<Utc>>,
summariser: &dyn Summariser,
strategy: &LabelStrategy,
) -> Result<Vec<String>> {
let tree = store::get_tree(config, tree_id)?
.ok_or_else(|| anyhow::anyhow!("no tree with id {tree_id}"))?;
cascade_all_from(config, &tree, 0, now, summariser, strategy).await
}
#[cfg(test)]
#[path = "flush_tests.rs"]
mod tests;