use std::collections::HashMap;
use std::path::Path;
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::dataset::RetrievedSet;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StalenessAnnotation {
pub doc_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version_key: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub effective_timestamp: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub superseded_by: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CorpusVersions {
annotations: HashMap<String, StalenessAnnotation>,
}
impl CorpusVersions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with(mut self, annotation: StalenessAnnotation) -> Self {
self.annotations
.insert(annotation.doc_id.clone(), annotation);
self
}
pub fn load_jsonl<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
debug!(?path, "loading corpus versions");
let text = std::fs::read_to_string(path)?;
Self::from_jsonl_str(&text)
}
pub fn from_jsonl_str(text: &str) -> Result<Self> {
let mut annotations: HashMap<String, StalenessAnnotation> = HashMap::new();
for (idx, raw_line) in text.lines().enumerate() {
let line = raw_line.trim();
if line.is_empty() {
continue;
}
let ann: StalenessAnnotation =
serde_json::from_str(line).map_err(|source| Error::DatasetParse {
line: idx + 1,
source,
})?;
annotations.insert(ann.doc_id.clone(), ann);
}
Ok(Self { annotations })
}
#[must_use]
pub fn get(&self, doc_id: &str) -> Option<&StalenessAnnotation> {
self.annotations.get(doc_id)
}
#[must_use]
pub fn len(&self) -> usize {
self.annotations.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.annotations.is_empty()
}
#[must_use]
pub fn version_key_of(&self, doc_id: &str) -> Option<&str> {
self.annotations
.get(doc_id)
.and_then(|a| a.version_key.as_deref())
}
#[must_use]
pub fn superseded_by(&self, doc_id: &str) -> Option<&str> {
let ann = self.annotations.get(doc_id)?;
if let Some(other) = ann.superseded_by.as_deref() {
return Some(other);
}
let key = ann.version_key.as_deref()?;
let ts = ann.effective_timestamp?;
let mut winner: Option<(&str, i64)> = None;
for (other_id, other) in &self.annotations {
if other_id == doc_id {
continue;
}
if other.version_key.as_deref() != Some(key) {
continue;
}
let Some(other_ts) = other.effective_timestamp else {
continue;
};
if other_ts <= ts {
continue;
}
match winner {
Some((_, best_ts)) if other_ts <= best_ts => {}
_ => winner = Some((other_id.as_str(), other_ts)),
}
}
winner.map(|(id, _)| id)
}
#[must_use]
pub fn is_stale(&self, doc_id: &str) -> bool {
self.superseded_by(doc_id).is_some()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StaleHit {
pub doc_id: String,
pub rank: usize,
pub superseded_by: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StalenessReport {
pub query_id: String,
pub stale_hits: Vec<StaleHit>,
pub considered: usize,
}
impl StalenessReport {
#[must_use]
pub fn stale_rate(&self) -> f64 {
if self.considered == 0 {
return 0.0;
}
self.stale_hits.len() as f64 / self.considered as f64
}
#[must_use]
pub fn has_stale_hits(&self) -> bool {
!self.stale_hits.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ConflictGroup {
pub version_key: String,
pub doc_ids: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ConflictReport {
pub query_id: String,
pub groups: Vec<ConflictGroup>,
pub conflicting_doc_count: usize,
pub considered: usize,
}
impl ConflictReport {
#[must_use]
pub fn has_conflicts(&self) -> bool {
!self.groups.is_empty()
}
#[must_use]
pub fn conflict_rate(&self) -> f64 {
if self.considered == 0 {
return 0.0;
}
self.conflicting_doc_count as f64 / self.considered as f64
}
}
#[must_use]
pub fn detect_stale_hits(
retrieved: &RetrievedSet,
versions: &CorpusVersions,
k: usize,
) -> StalenessReport {
let limit = k.min(retrieved.ranked.len());
let window = retrieved.ranked.get(..limit).unwrap_or(&[]);
let mut stale_hits = Vec::new();
for (rank, doc) in window.iter().enumerate() {
if let Some(newer) = versions.superseded_by(&doc.doc_id) {
stale_hits.push(StaleHit {
doc_id: doc.doc_id.clone(),
rank,
superseded_by: newer.to_string(),
});
}
}
StalenessReport {
query_id: retrieved.query_id.clone(),
stale_hits,
considered: limit,
}
}
#[must_use]
pub fn detect_conflicts(
retrieved: &RetrievedSet,
versions: &CorpusVersions,
k: usize,
) -> ConflictReport {
let limit = k.min(retrieved.ranked.len());
let window = retrieved.ranked.get(..limit).unwrap_or(&[]);
let mut order: Vec<String> = Vec::new();
let mut buckets: HashMap<String, Vec<String>> = HashMap::new();
for doc in window {
let Some(key) = versions.version_key_of(&doc.doc_id) else {
continue;
};
let key_owned = key.to_string();
match buckets.get_mut(&key_owned) {
Some(slot) => slot.push(doc.doc_id.clone()),
None => {
order.push(key_owned.clone());
buckets.insert(key_owned, vec![doc.doc_id.clone()]);
}
}
}
let mut groups: Vec<ConflictGroup> = Vec::new();
let mut conflicting_doc_count = 0usize;
for key in order {
let Some(doc_ids) = buckets.remove(&key) else {
continue;
};
if doc_ids.len() < 2 {
continue;
}
conflicting_doc_count += doc_ids.len();
groups.push(ConflictGroup {
version_key: key,
doc_ids,
});
}
ConflictReport {
query_id: retrieved.query_id.clone(),
groups,
conflicting_doc_count,
considered: limit,
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::indexing_slicing
)]
mod tests {
use super::*;
use crate::dataset::RetrievedDoc;
fn ann(
doc_id: &str,
key: Option<&str>,
ts: Option<i64>,
sup: Option<&str>,
) -> StalenessAnnotation {
StalenessAnnotation {
doc_id: doc_id.into(),
version_key: key.map(str::to_owned),
effective_timestamp: ts,
superseded_by: sup.map(str::to_owned),
}
}
fn ranked(query_id: &str, ids: &[&str]) -> RetrievedSet {
RetrievedSet {
query_id: query_id.into(),
ranked: ids
.iter()
.enumerate()
.map(|(rank, id)| RetrievedDoc {
doc_id: (*id).into(),
score: 1.0 / (rank as f64 + 1.0),
})
.collect(),
}
}
#[test]
fn jsonl_round_trip_preserves_optional_fields() {
let text = r#"{"doc_id":"a","version_key":"k","effective_timestamp":10}
{"doc_id":"b","version_key":"k","effective_timestamp":20}
{"doc_id":"c","superseded_by":"a"}
"#;
let versions = CorpusVersions::from_jsonl_str(text).unwrap();
assert_eq!(versions.len(), 3);
assert_eq!(versions.version_key_of("a"), Some("k"));
assert_eq!(versions.version_key_of("c"), None);
assert_eq!(versions.superseded_by("c"), Some("a"));
assert_eq!(versions.superseded_by("a"), Some("b"));
assert_eq!(versions.superseded_by("b"), None);
}
#[test]
fn unknown_doc_is_not_stale() {
let versions = CorpusVersions::new();
let r = ranked("q", &["unknown-1"]);
let report = detect_stale_hits(&r, &versions, 5);
assert!(!report.has_stale_hits());
assert_eq!(report.considered, 1);
assert_eq!(report.stale_rate(), 0.0);
}
#[test]
fn explicit_supersession_flags_stale_hit() {
let versions = CorpusVersions::new()
.with(ann("old", Some("addr"), Some(1), Some("new")))
.with(ann("new", Some("addr"), Some(2), None));
let r = ranked("q1", &["old", "unrelated"]);
let report = detect_stale_hits(&r, &versions, 5);
assert_eq!(report.stale_hits.len(), 1);
assert_eq!(report.stale_hits[0].doc_id, "old");
assert_eq!(report.stale_hits[0].rank, 0);
assert_eq!(report.stale_hits[0].superseded_by, "new");
assert!((report.stale_rate() - 0.5).abs() < 1e-9);
}
#[test]
fn implicit_supersession_uses_latest_timestamp() {
let versions = CorpusVersions::new()
.with(ann("v1", Some("price"), Some(100), None))
.with(ann("v2", Some("price"), Some(200), None))
.with(ann("v3", Some("price"), Some(150), None));
let r = ranked("q", &["v1", "v3"]);
let report = detect_stale_hits(&r, &versions, 10);
assert_eq!(report.stale_hits.len(), 2);
assert!(report.stale_hits.iter().all(|h| h.superseded_by == "v2"));
}
#[test]
fn k_window_caps_considered_positions() {
let versions = CorpusVersions::new()
.with(ann("old", Some("k"), Some(1), Some("new")))
.with(ann("new", Some("k"), Some(2), None));
let r = ranked("q", &["new", "old"]);
let report = detect_stale_hits(&r, &versions, 1);
assert_eq!(report.considered, 1);
assert!(report.stale_hits.is_empty());
}
#[test]
fn detect_conflicts_skips_when_only_one_per_key() {
let versions = CorpusVersions::new()
.with(ann("a", Some("k1"), Some(1), None))
.with(ann("b", Some("k2"), Some(1), None));
let r = ranked("q", &["a", "b"]);
let report = detect_conflicts(&r, &versions, 5);
assert!(!report.has_conflicts());
assert_eq!(report.considered, 2);
assert_eq!(report.conflict_rate(), 0.0);
}
#[test]
fn detect_conflicts_groups_repeated_version_keys() {
let versions = CorpusVersions::new()
.with(ann("a1", Some("addr"), Some(1), None))
.with(ann("a2", Some("addr"), Some(2), None))
.with(ann("p1", Some("price"), Some(1), None))
.with(ann("p2", Some("price"), Some(2), None))
.with(ann("misc", None, None, None));
let r = ranked("q42", &["a1", "p1", "a2", "p2", "misc"]);
let report = detect_conflicts(&r, &versions, 10);
assert_eq!(report.groups.len(), 2);
assert_eq!(report.groups[0].version_key, "addr");
assert_eq!(report.groups[0].doc_ids, vec!["a1", "a2"]);
assert_eq!(report.groups[1].version_key, "price");
assert_eq!(report.groups[1].doc_ids, vec!["p1", "p2"]);
assert_eq!(report.conflicting_doc_count, 4);
assert!((report.conflict_rate() - 0.8).abs() < 1e-9);
}
#[test]
fn version_key_collision_outside_window_is_not_a_conflict() {
let versions = CorpusVersions::new()
.with(ann("a1", Some("addr"), Some(1), None))
.with(ann("a2", Some("addr"), Some(2), None));
let r = ranked("q", &["a1", "filler", "a2"]);
let report = detect_conflicts(&r, &versions, 2);
assert!(!report.has_conflicts());
assert_eq!(report.considered, 2);
}
}