use std::collections::{HashMap, HashSet};
use anyhow::Result;
use crate::memory::chunks::{list_chunks, Chunk, ListChunksQuery, Metadata, SourceKind};
use crate::memory::config::MemoryConfig;
use crate::memory::tree::store::{get_tree_by_scope, list_summaries_in_window};
use crate::memory::tree::{SummaryNode, TreeKind};
use super::types::{hit_from_chunk, hit_from_summary, QueryResponse, RetrievalHit};
const DEFAULT_LIMIT: usize = 200;
const MAX_WINDOW_CHUNKS: usize = 5_000;
fn chunk_tree_scope(metadata: &Metadata) -> String {
metadata
.path_scope
.clone()
.unwrap_or_else(|| metadata.source_id.clone())
}
pub fn cover_window(
config: &MemoryConfig,
since_ms: i64,
until_ms: i64,
source_id: Option<&str>,
source_kind: Option<SourceKind>,
limit: usize,
) -> Result<QueryResponse> {
let limit = if limit == 0 { DEFAULT_LIMIT } else { limit };
if until_ms < since_ms {
return Err(anyhow::anyhow!(
"cover_window: until_ms ({until_ms}) precedes since_ms ({since_ms})"
));
}
let mut hits = collect_cover(config, since_ms, until_ms, source_id, source_kind)?;
hits.sort_by(|a, b| {
a.tree_scope
.cmp(&b.tree_scope)
.then(a.time_range_start.cmp(&b.time_range_start))
});
let total = hits.len();
hits.truncate(limit);
Ok(QueryResponse::new(hits, total))
}
fn collect_cover(
config: &MemoryConfig,
since_ms: i64,
until_ms: i64,
source_id: Option<&str>,
source_kind: Option<SourceKind>,
) -> Result<Vec<RetrievalHit>> {
let chunks = list_chunks(
config,
&ListChunksQuery {
source_id: source_id.map(|s| s.to_string()),
source_kind,
since_ms: Some(since_ms),
until_ms: Some(until_ms),
limit: Some(MAX_WINDOW_CHUNKS),
exclude_dropped: true,
..Default::default()
},
)?;
let mut by_source: HashMap<String, Vec<Chunk>> = HashMap::new();
for chunk in chunks {
let scope = chunk_tree_scope(&chunk.metadata);
by_source.entry(scope).or_default().push(chunk);
}
let exact_source = source_id.is_some();
let mut hits: Vec<RetrievalHit> = Vec::new();
for (source, src_chunks) in by_source {
cover_one_source(
config,
&source,
since_ms,
until_ms,
src_chunks,
exact_source,
&mut hits,
)?;
}
Ok(hits)
}
fn cover_one_source(
config: &MemoryConfig,
source: &str,
since_ms: i64,
until_ms: i64,
chunks: Vec<Chunk>,
exact_source: bool,
out: &mut Vec<RetrievalHit>,
) -> Result<()> {
let tree = get_tree_by_scope(config, TreeKind::Source, source)?;
let (tree_id, eligible) = match &tree {
Some(t) => (
t.id.clone(),
list_summaries_in_window(config, &t.id, since_ms, until_ms)?,
),
None => (String::new(), Vec::new()),
};
let (eligible, suppressed_chunk_ids) = filter_superseded_doc_versions(eligible);
let present: HashSet<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
let plan = plan_cover(&eligible, exact_source.then_some(&present));
let by_id: HashMap<&str, &SummaryNode> = eligible.iter().map(|s| (s.id.as_str(), s)).collect();
for id in &plan.maximal_ids {
if let Some(node) = by_id.get(id.as_str()) {
out.push(hit_from_summary(node, source));
}
}
for chunk in &chunks {
if plan.covered_chunk_ids.contains(&chunk.id) || suppressed_chunk_ids.contains(&chunk.id) {
continue;
}
out.push(hit_from_chunk(chunk, &tree_id, source, 0.0));
}
Ok(())
}
pub(crate) struct CoverPlan {
pub(crate) maximal_ids: Vec<String>,
pub(crate) covered_chunk_ids: HashSet<String>,
}
pub(crate) fn plan_cover(
eligible: &[SummaryNode],
restrict_to_present: Option<&HashSet<&str>>,
) -> CoverPlan {
let eligible_ids: HashSet<&str> = eligible.iter().map(|s| s.id.as_str()).collect();
let by_id: HashMap<&str, &SummaryNode> = eligible.iter().map(|s| (s.id.as_str(), s)).collect();
let mut maximal_ids: Vec<String> = Vec::new();
let mut covered_chunk_ids: HashSet<String> = HashSet::new();
for node in eligible.iter().filter(|s| match &s.parent_id {
Some(parent) => !eligible_ids.contains(parent.as_str()),
None => true,
}) {
let mut sub: HashSet<String> = HashSet::new();
collect_descendant_chunks(node, &by_id, &mut sub);
if let Some(present) = restrict_to_present {
if !sub.iter().all(|c| present.contains(c.as_str())) {
continue;
}
}
maximal_ids.push(node.id.clone());
covered_chunk_ids.extend(sub);
}
CoverPlan {
maximal_ids,
covered_chunk_ids,
}
}
fn collect_descendant_chunks(
node: &SummaryNode,
by_id: &HashMap<&str, &SummaryNode>,
covered: &mut HashSet<String>,
) {
for child in &node.child_ids {
match by_id.get(child.as_str()) {
Some(child_summary) => collect_descendant_chunks(child_summary, by_id, covered),
None => {
covered.insert(child.clone());
}
}
}
}
pub(crate) fn filter_superseded_doc_versions(
eligible: Vec<SummaryNode>,
) -> (Vec<SummaryNode>, HashSet<String>) {
if !eligible.iter().any(|s| s.doc_id.is_some()) {
return (eligible, HashSet::new());
}
let by_id: HashMap<&str, &SummaryNode> = eligible.iter().map(|s| (s.id.as_str(), s)).collect();
let mut max_version_by_doc: HashMap<&str, i64> = HashMap::new();
for s in &eligible {
if let Some(doc) = s.doc_id.as_deref() {
let v = s.version_ms.unwrap_or(i64::MIN);
max_version_by_doc
.entry(doc)
.and_modify(|m| {
if v > *m {
*m = v;
}
})
.or_insert(v);
}
}
let mut winners_seen: HashSet<&str> = HashSet::new();
let mut removed_summary_ids: HashSet<String> = HashSet::new();
let mut suppressed_chunk_ids: HashSet<String> = HashSet::new();
for s in &eligible {
let Some(doc) = s.doc_id.as_deref() else {
continue;
};
let v = s.version_ms.unwrap_or(i64::MIN);
let max = max_version_by_doc.get(doc).copied().unwrap_or(i64::MIN);
let loser = v < max || !winners_seen.insert(doc);
if loser {
removed_summary_ids.insert(s.id.clone());
collect_subtree_ids(
s,
&by_id,
&mut removed_summary_ids,
&mut suppressed_chunk_ids,
);
}
}
let kept = eligible
.into_iter()
.filter(|s| !removed_summary_ids.contains(&s.id))
.collect();
(kept, suppressed_chunk_ids)
}
fn collect_subtree_ids(
node: &SummaryNode,
by_id: &HashMap<&str, &SummaryNode>,
summaries: &mut HashSet<String>,
chunks: &mut HashSet<String>,
) {
for child in &node.child_ids {
match by_id.get(child.as_str()) {
Some(child_summary) => {
summaries.insert(child.clone());
collect_subtree_ids(child_summary, by_id, summaries, chunks);
}
None => {
chunks.insert(child.clone());
}
}
}
}
#[cfg(test)]
#[path = "cover_tests.rs"]
mod tests;