use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
const MIN_TIMESTAMP_CONTEXT_KEY: &str = "headers.x-lancedb-min-timestamp";
pub type FreshnessBaselines = Arc<Mutex<HashMap<String, SystemTime>>>;
fn compute_min_timestamp(
baseline: Option<SystemTime>,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
match prev {
Some(p) => p.max(now),
None => now,
}
}
#[derive(Clone, Debug)]
pub struct TableFreshness {
baselines: FreshnessBaselines,
key: String,
}
impl TableFreshness {
pub fn new(baselines: FreshnessBaselines, key: String) -> Self {
Self { baselines, key }
}
pub fn bump(&self) {
let now = SystemTime::now();
let mut baselines = self.baselines.lock().unwrap();
let prev = baselines.get(&self.key).copied();
baselines.insert(self.key.clone(), next_freshness_baseline(prev, now));
}
}
fn is_read_operation(operation: &str) -> bool {
matches!(
operation,
"describe_table" | "list_table_versions" | "query_table" | "list_tables"
)
}
#[derive(Debug)]
pub struct ReadFreshnessContextProvider {
baselines: FreshnessBaselines,
read_consistency_interval: Option<Duration>,
}
impl ReadFreshnessContextProvider {
pub fn new(baselines: FreshnessBaselines, read_consistency_interval: Option<Duration>) -> Self {
Self {
baselines,
read_consistency_interval,
}
}
}
impl DynamicContextProvider for ReadFreshnessContextProvider {
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
if !is_read_operation(&info.operation) {
return HashMap::new();
}
let baseline = self.baselines.lock().unwrap().get(&info.object_id).copied();
match compute_min_timestamp(baseline, self.read_consistency_interval, SystemTime::now()) {
Some(ts) => {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
HashMap::from([(MIN_TIMESTAMP_CONTEXT_KEY.to_string(), dt.to_rfc3339())])
}
None => HashMap::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const TOLERANCE: Duration = Duration::from_secs(1);
fn parse_header_ts(headers: &HashMap<String, String>) -> SystemTime {
let value = headers
.get(MIN_TIMESTAMP_CONTEXT_KEY)
.expect("expected min-timestamp context key");
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
assert_eq!(compute_min_timestamp(None, None, now), None);
assert_eq!(
compute_min_timestamp(Some(baseline), None, now),
Some(baseline)
);
assert_eq!(
compute_min_timestamp(None, Some(Duration::ZERO), now),
Some(now)
);
assert_eq!(
compute_min_timestamp(None, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
assert_eq!(
compute_min_timestamp(Some(baseline), Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
let recent_baseline = now - Duration::from_secs(5);
assert_eq!(
compute_min_timestamp(Some(recent_baseline), Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
#[test]
fn test_next_freshness_baseline_is_monotonic() {
let now = SystemTime::now();
let earlier = now - Duration::from_secs(30);
let later = now + Duration::from_secs(30);
assert_eq!(next_freshness_baseline(None, now), now);
assert_eq!(next_freshness_baseline(Some(earlier), now), now);
assert_eq!(next_freshness_baseline(Some(later), now), later);
}
fn provider_with(
entries: &[(&str, SystemTime)],
interval: Option<Duration>,
) -> ReadFreshnessContextProvider {
let map: HashMap<String, SystemTime> =
entries.iter().map(|(k, v)| (k.to_string(), *v)).collect();
ReadFreshnessContextProvider::new(Arc::new(Mutex::new(map)), interval)
}
#[test]
fn test_provider_emits_header_at_or_after_bumped_baseline() {
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$tbl", baseline)], None);
for op in ["describe_table", "list_table_versions", "query_table"] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
let sent = parse_header_ts(&ctx);
assert!(
sent >= baseline - TOLERANCE && sent <= baseline + TOLERANCE,
"operation {op} should carry a floor at the bumped baseline"
);
}
}
#[test]
fn test_provider_list_tables_uses_interval_floor_not_table_baseline() {
let provider = provider_with(&[("ns$tbl", SystemTime::now())], None);
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
assert!(
ctx.is_empty(),
"list_tables must not inherit a per-table baseline"
);
let interval = Duration::from_secs(30);
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"list_tables should carry the interval floor"
);
}
#[test]
fn test_provider_no_header_for_empty_baseline_and_no_interval() {
let provider = provider_with(&[], None);
let ctx = provider.provide_context(&OperationInfo::new("describe_table", "ns$tbl"));
assert!(ctx.is_empty());
}
#[test]
fn test_provider_interval_floor_applies_without_baseline() {
let interval = Duration::from_secs(30);
let provider = provider_with(&[], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("query_table", "ns$tbl"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"expected floor at roughly now - interval"
);
}
#[test]
fn test_provider_non_read_ops_emit_nothing() {
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(Duration::ZERO));
for op in [
"create_table",
"register_table",
"drop_table",
"rename_table",
"describe_table_version",
] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
assert!(
ctx.is_empty(),
"operation {op} must not send a freshness header"
);
}
}
#[test]
fn test_provider_uses_per_table_baseline() {
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$has_baseline", baseline)], None);
let hit =
provider.provide_context(&OperationInfo::new("describe_table", "ns$has_baseline"));
assert!(!hit.is_empty());
let miss = provider.provide_context(&OperationInfo::new("describe_table", "ns$other"));
assert!(miss.is_empty());
}
}