use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{info, warn};
use ailake_catalog::{
new_snapshot_id, CatalogProvider, NewSnapshot, SnapshotOperation, TableIdent,
};
use ailake_core::{AilakeError, AilakeResult, VectorStoragePolicy};
use ailake_file::{AilakeFileReader, AilakeFileWriter};
use ailake_store::Store;
use ailake_vec::compute_centroid_and_radius;
use arrow_array::{
Array, Float32Array, RecordBatch, TimestampMicrosecondArray, TimestampNanosecondArray,
};
use arrow_schema::{DataType, Field};
const LAST_ACCESSED_COL: &str = "last_accessed_at";
const RECENCY_WEIGHT_COL: &str = "recency_weight";
pub struct MemoryDecayJob {
catalog: Arc<dyn CatalogProvider>,
store: Arc<dyn Store>,
policy: VectorStoragePolicy,
pub decay_lambda: f32,
}
impl MemoryDecayJob {
pub fn new(
catalog: Arc<dyn CatalogProvider>,
store: Arc<dyn Store>,
policy: VectorStoragePolicy,
decay_lambda: f32,
) -> Self {
Self {
catalog,
store,
policy,
decay_lambda,
}
}
pub async fn run(&self, table: &TableIdent) -> AilakeResult<usize> {
let files = self.catalog.list_files(table, None).await?;
if files.is_empty() {
return Ok(0);
}
let today_day = current_day_since_epoch();
let mut new_entries = Vec::with_capacity(files.len());
let mut updated = 0usize;
for file_entry in &files {
let file_bytes = self.store.get(&file_entry.path).await?;
let reader =
AilakeFileReader::new(file_bytes, &self.policy.column_name, self.policy.dim);
if !reader.is_ailake_file() {
new_entries.push(file_entry.clone());
continue;
}
let (batch, embeddings) = match reader.read_parquet() {
Ok(pair) => pair,
Err(e) => {
warn!(
"ailake: MemoryDecayJob skipping {} — read error: {}",
file_entry.path, e
);
new_entries.push(file_entry.clone());
continue;
}
};
if batch.column_by_name(LAST_ACCESSED_COL).is_none() {
new_entries.push(file_entry.clone());
continue;
}
let updated_batch = apply_decay(&batch, today_day, self.decay_lambda)?;
let file_writer = AilakeFileWriter::new(self.policy.clone());
let new_bytes = file_writer.write(&updated_batch, &embeddings)?;
let new_size = new_bytes.len() as u64;
self.store.put(&file_entry.path, new_bytes.clone()).await?;
let centroid = compute_centroid_and_radius(&embeddings, self.policy.metric);
let new_reader =
AilakeFileReader::new(new_bytes, &self.policy.column_name, self.policy.dim);
let header = new_reader.read_header()?;
let ailk_start = new_reader.ailk_offset()?;
let new_entry = ailake_catalog::make_data_file_entry(
&file_entry.path,
updated_batch.num_rows() as u64,
new_size,
¢roid,
ailake_catalog::VectorIndexInfo {
column: &self.policy.column_name,
dim: self.policy.dim,
hnsw_offset: ailk_start + header.hnsw_offset,
hnsw_len: header.hnsw_len,
},
);
new_entries.push(new_entry);
updated += 1;
}
if updated == 0 {
info!(
"ailake: MemoryDecayJob — no files with last_accessed_at column; skipping commit"
);
return Ok(0);
}
let snap = NewSnapshot {
snapshot_id: new_snapshot_id(),
parent_snapshot_id: None,
files: new_entries,
operation: SnapshotOperation::Overwrite,
iceberg_schema: None,
extra_properties: std::collections::HashMap::new(),
bloom_filters: vec![],
equality_delete_files: vec![],
};
self.catalog.commit_snapshot(table, snap).await?;
info!(
"ailake: MemoryDecayJob — updated recency_weight in {} files (lambda={})",
updated, self.decay_lambda
);
Ok(updated)
}
}
fn days_old_vec(col: &Arc<dyn Array>, today_day: i64) -> AilakeResult<Vec<f32>> {
if let Some(ts) = col.as_any().downcast_ref::<TimestampNanosecondArray>() {
return Ok((0..ts.len())
.map(|i| {
if !ts.is_valid(i) {
return 0.0f32;
}
let day = ts.value(i) / (86_400 * 1_000_000_000i64);
(today_day - day).max(0) as f32
})
.collect());
}
if let Some(ts) = col.as_any().downcast_ref::<TimestampMicrosecondArray>() {
return Ok((0..ts.len())
.map(|i| {
if !ts.is_valid(i) {
return 0.0f32;
}
let day = ts.value(i) / (86_400 * 1_000_000i64);
(today_day - day).max(0) as f32
})
.collect());
}
if let Some(sa) = col.as_any().downcast_ref::<arrow_array::StringArray>() {
return Ok((0..sa.len())
.map(|i| {
if !sa.is_valid(i) {
return 0.0f32;
}
let access_day = parse_iso_date_days(sa.value(i)).unwrap_or(today_day);
(today_day - access_day).max(0) as f32
})
.collect());
}
Err(AilakeError::Catalog(
"last_accessed_at must be Timestamp(Nanosecond/Microsecond) or Utf8".into(),
))
}
fn apply_decay(batch: &RecordBatch, today_day: i64, lambda: f32) -> AilakeResult<RecordBatch> {
let col = batch
.column_by_name(LAST_ACCESSED_COL)
.ok_or_else(|| AilakeError::Catalog("last_accessed_at column not found".into()))?;
let days_old = days_old_vec(col, today_day)?;
let new_weights: Vec<f32> = days_old.into_iter().map(|d| (-lambda * d).exp()).collect();
let new_weight_array = Arc::new(Float32Array::from(new_weights));
let old_schema = batch.schema();
let decay_field = Field::new(RECENCY_WEIGHT_COL, DataType::Float32, false);
let mut new_fields: Vec<arrow_schema::FieldRef> = old_schema.fields().iter().cloned().collect();
let mut new_columns: Vec<Arc<dyn Array>> = (0..batch.num_columns())
.map(|i| batch.column(i).clone())
.collect();
if let Some(pos) = old_schema
.fields()
.iter()
.position(|f| f.name() == RECENCY_WEIGHT_COL)
{
new_fields[pos] = Arc::new(decay_field);
new_columns[pos] = new_weight_array;
} else {
new_fields.push(Arc::new(decay_field));
new_columns.push(new_weight_array);
}
let new_schema = Arc::new(arrow_schema::Schema::new(new_fields));
RecordBatch::try_new(new_schema, new_columns).map_err(|e| AilakeError::Arrow(e.to_string()))
}
fn parse_iso_date_days(s: &str) -> Option<i64> {
if s.len() < 10 {
return None;
}
let y: i64 = s[0..4].parse().ok()?;
let m: i64 = s[5..7].parse().ok()?;
let d: i64 = s[8..10].parse().ok()?;
let a = (14 - m) / 12;
let y2 = y + 4800 - a;
let m2 = m + 12 * a - 3;
let jdn = d + (153 * m2 + 2) / 5 + 365 * y2 + y2 / 4 - y2 / 100 + y2 / 400 - 32045;
Some(jdn - 2440588)
}
fn current_day_since_epoch() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64 / 86400)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_iso_date_unix_epoch() {
assert_eq!(parse_iso_date_days("1970-01-01T00:00:00"), Some(0));
}
#[test]
fn parse_iso_date_known_date() {
let days = parse_iso_date_days("2024-01-15").unwrap();
assert_eq!(days, 19737);
}
#[test]
fn parse_iso_date_returns_none_on_short_string() {
assert!(parse_iso_date_days("2024").is_none());
assert!(parse_iso_date_days("").is_none());
}
#[test]
fn apply_decay_updates_recency_weight() {
use arrow_array::StringArray;
use arrow_schema::{Field, Schema};
let today = current_day_since_epoch();
let past_day = today - 10;
let y = 1970 + past_day / 365; let past_str = "2024-01-05T00:00:00";
let schema = Arc::new(Schema::new(vec![
Field::new(LAST_ACCESSED_COL, DataType::Utf8, true),
Field::new(RECENCY_WEIGHT_COL, DataType::Float32, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![past_str])),
Arc::new(Float32Array::from(vec![1.0f32])),
],
)
.unwrap();
let today_day = 19737i64;
let result = apply_decay(&batch, today_day, 0.1).unwrap();
let weights = result
.column_by_name(RECENCY_WEIGHT_COL)
.unwrap()
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
let w = weights.value(0);
let expected = (-0.1f32 * 10.0).exp();
assert!((w - expected).abs() < 0.001, "expected {expected}, got {w}");
let _ = y; }
#[test]
fn apply_decay_handles_timestamp_nanosecond() {
use arrow_schema::{Field, Schema, TimeUnit};
let day_19727_ns: i64 = 19727i64 * 86_400 * 1_000_000_000;
let schema = Arc::new(Schema::new(vec![
Field::new(
LAST_ACCESSED_COL,
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
true,
),
Field::new(RECENCY_WEIGHT_COL, DataType::Float32, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampNanosecondArray::from(vec![day_19727_ns]).with_timezone("UTC")),
Arc::new(Float32Array::from(vec![1.0f32])),
],
)
.unwrap();
let today_day = 19737i64;
let result = apply_decay(&batch, today_day, 0.1).unwrap();
let weights = result
.column_by_name(RECENCY_WEIGHT_COL)
.unwrap()
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
let w = weights.value(0);
let expected = (-0.1f32 * 10.0).exp();
assert!((w - expected).abs() < 0.001, "expected {expected}, got {w}");
}
#[test]
fn now_ns_is_recent() {
let floor_2025_ns: i64 = 55 * 365 * 86_400 * 1_000_000_000i64; let t = ailake_core::now_ns();
assert!(
t > floor_2025_ns,
"now_ns() returned suspiciously small value: {t}"
);
}
}