use crate::physical::CollectionContract;
use crate::storage::query::unified::UnifiedRecord;
use crate::storage::schema::Value;
pub(crate) fn apply(records: &mut Vec<UnifiedRecord>, contract: Option<&CollectionContract>) {
let Some(contract) = contract else {
return;
};
let Some(retention_ms) = contract.retention_duration_ms else {
return;
};
let Some(ts_column) = resolve_timestamp_column(contract) else {
return;
};
let now_ms = current_unix_ms();
let cutoff = now_ms.saturating_sub(retention_ms as u128);
records.retain(|record| {
match record.get(&ts_column) {
Some(value) => value_as_unix_ms(value)
.map(|ts| (ts as u128) >= cutoff)
.unwrap_or(true),
None => true,
}
});
}
pub(crate) fn resolve_timestamp_column(contract: &CollectionContract) -> Option<String> {
if contract.timestamps_enabled {
return Some("created_at".to_string());
}
contract
.declared_columns
.iter()
.find(|column| is_temporal_data_type(&column.data_type))
.map(|column| column.name.clone())
}
fn is_temporal_data_type(data_type: &str) -> bool {
matches!(
data_type.to_ascii_uppercase().as_str(),
"TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
)
}
fn value_as_unix_ms(value: &Value) -> Option<i64> {
match value {
Value::TimestampMs(v) => Some(*v),
Value::Timestamp(v) => Some(v.saturating_mul(1_000)),
Value::BigInt(v) => Some(*v),
Value::UnsignedInteger(v) => i64::try_from(*v).ok(),
Value::Integer(v) => Some(*v as i64),
_ => None,
}
}
fn current_unix_ms() -> u128 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::catalog::{CollectionModel, SchemaMode};
use crate::physical::{
CollectionContract, ContractOrigin, DeclaredColumnContract,
};
use crate::storage::query::unified::{sys_key_created_at, UnifiedRecord};
use std::sync::Arc;
fn base_contract() -> CollectionContract {
CollectionContract {
name: "events".to_string(),
declared_model: CollectionModel::Table,
schema_mode: SchemaMode::SemiStructured,
origin: ContractOrigin::Explicit,
version: 1,
created_at_unix_ms: 0,
updated_at_unix_ms: 0,
default_ttl_ms: None,
vector_dimension: None,
vector_metric: None,
context_index_fields: Vec::new(),
declared_columns: Vec::new(),
table_def: None,
timestamps_enabled: true,
context_index_enabled: false,
metrics_raw_retention_ms: None,
metrics_rollup_policies: Vec::new(),
metrics_tenant_identity: None,
metrics_namespace: None,
append_only: false,
subscriptions: Vec::new(),
session_key: None,
session_gap_ms: None,
retention_duration_ms: Some(60_000), }
}
fn record_with_created_at(ts_ms: i64) -> UnifiedRecord {
let schema = Arc::new(vec![sys_key_created_at()]);
UnifiedRecord::with_schema(schema, vec![Value::BigInt(ts_ms)])
}
#[test]
fn drops_expired_keeps_fresh() {
let contract = base_contract();
let now = current_unix_ms() as i64;
let mut records = vec![
record_with_created_at(now - 10_000), record_with_created_at(now - 120_000), ];
apply(&mut records, Some(&contract));
assert_eq!(records.len(), 1, "exactly the fresh row should survive");
}
#[test]
fn no_policy_leaves_records_alone() {
let mut contract = base_contract();
contract.retention_duration_ms = None;
let mut records = vec![
record_with_created_at(0),
record_with_created_at(0),
];
apply(&mut records, Some(&contract));
assert_eq!(records.len(), 2);
}
#[test]
fn missing_column_does_not_filter() {
let mut contract = base_contract();
contract.timestamps_enabled = false;
contract.declared_columns.push(DeclaredColumnContract {
name: "ts".to_string(),
data_type: "TIMESTAMP".to_string(),
sql_type: None,
not_null: false,
default: None,
compress: None,
unique: false,
primary_key: false,
enum_variants: Vec::new(),
array_element: None,
decimal_precision: None,
});
let schema = Arc::new(vec![Arc::<str>::from("other_col")]);
let mut records =
vec![UnifiedRecord::with_schema(schema, vec![Value::text("v")])];
apply(&mut records, Some(&contract));
assert_eq!(records.len(), 1);
}
#[test]
fn resolves_declared_temporal_column_name() {
let mut contract = base_contract();
contract.timestamps_enabled = false;
contract.declared_columns.push(DeclaredColumnContract {
name: "ts".to_string(),
data_type: "TIMESTAMPMS".to_string(),
sql_type: None,
not_null: false,
default: None,
compress: None,
unique: false,
primary_key: false,
enum_variants: Vec::new(),
array_element: None,
decimal_precision: None,
});
assert_eq!(resolve_timestamp_column(&contract).as_deref(), Some("ts"));
}
}