use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::TimeseriesOp;
use crate::engine::timeseries::retention_policy::RetentionPolicyDef;
use crate::types::{DatabaseId, TenantId, VShardId};
use super::physical::{PhysicalTask, PostSetOp};
#[derive(Clone, Copy, Debug)]
pub(super) struct ScopeIds {
pub(super) tenant_id: TenantId,
pub(super) database_id: DatabaseId,
}
#[derive(Debug)]
struct TierSegment {
tier_index: u32,
collection: String,
time_range: (i64, i64),
bucket_interval_ms: i64,
}
struct AggQueryParams {
filters: Vec<u8>,
group_by: Vec<String>,
aggregates: Vec<(String, String)>,
gap_fill: String,
}
pub(super) fn plan_tiered_scan(
policy: &RetentionPolicyDef,
scope: ScopeIds,
query_time_range: (i64, i64),
filters: Vec<u8>,
group_by: Vec<String>,
aggregates: Vec<(String, String)>,
gap_fill: String,
) -> Vec<PhysicalTask> {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let segments = compute_tier_segments(policy, query_time_range, now_ms);
let agg = AggQueryParams {
filters,
group_by,
aggregates,
gap_fill,
};
if segments.is_empty() {
return vec![build_scan_task(
scope,
&policy.collection,
query_time_range,
0,
&agg,
)];
}
segments
.iter()
.map(|seg| {
build_scan_task(
scope,
&seg.collection,
seg.time_range,
seg.bucket_interval_ms,
&agg,
)
})
.collect()
}
fn compute_tier_segments(
policy: &RetentionPolicyDef,
query_range: (i64, i64),
now_ms: i64,
) -> Vec<TierSegment> {
let (query_start, query_end) = query_range;
let mut segments = Vec::new();
let mut boundary = now_ms;
for tier in &policy.tiers {
if tier.retain_ms == 0 {
let tier_start = i64::MIN;
let seg_start = query_start.max(tier_start);
let seg_end = query_end.min(boundary);
if seg_start < seg_end {
segments.push(TierSegment {
tier_index: tier.tier_index,
collection: collection_for_tier(policy, tier.tier_index),
time_range: (seg_start, seg_end),
bucket_interval_ms: tier.resolution_ms as i64,
});
}
break; }
let tier_start = boundary - tier.retain_ms as i64;
let seg_start = query_start.max(tier_start);
let seg_end = query_end.min(boundary);
if seg_start < seg_end {
segments.push(TierSegment {
tier_index: tier.tier_index,
collection: collection_for_tier(policy, tier.tier_index),
time_range: (seg_start, seg_end),
bucket_interval_ms: tier.resolution_ms as i64,
});
}
boundary = tier_start;
}
segments.reverse();
segments
}
fn collection_for_tier(policy: &RetentionPolicyDef, tier_index: u32) -> String {
if tier_index == 0 {
policy.collection.clone()
} else {
policy.aggregate_name(tier_index)
}
}
pub(crate) fn explain_tier_selection(
policy: &RetentionPolicyDef,
query_time_range: (i64, i64),
) -> String {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let segments = compute_tier_segments(policy, query_time_range, now_ms);
if segments.is_empty() {
return format!(
"AUTO_TIER on '{}': no tier covers the requested range",
policy.collection
);
}
let mut lines = vec![format!(
"AUTO_TIER plan for '{}' ({} tiers, {} segments):",
policy.collection,
policy.tiers.len(),
segments.len()
)];
for seg in &segments {
let tier_label = if seg.tier_index == 0 {
"RAW".to_string()
} else {
format!(
"tier{} ({}ms buckets)",
seg.tier_index, seg.bucket_interval_ms
)
};
lines.push(format!(
" [{} .. {}] → {} ({})",
seg.time_range.0, seg.time_range.1, tier_label, seg.collection,
));
}
lines.join("\n")
}
fn build_scan_task(
scope: ScopeIds,
collection: &str,
time_range: (i64, i64),
bucket_interval_ms: i64,
agg: &AggQueryParams,
) -> PhysicalTask {
let ScopeIds {
tenant_id,
database_id,
} = scope;
PhysicalTask {
tenant_id,
vshard_id: VShardId::from_collection_in_database(database_id, collection),
database_id,
plan: PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection: collection.to_string(),
time_range,
projection: Vec::new(),
limit: usize::MAX,
filters: agg.filters.clone(),
bucket_interval_ms,
group_by: agg.group_by.clone(),
aggregates: agg.aggregates.clone(),
gap_fill: agg.gap_fill.clone(),
computed_columns: Vec::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
}),
post_set_op: PostSetOp::None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::timeseries::retention_policy::types::TierDef;
fn make_policy() -> RetentionPolicyDef {
RetentionPolicyDef {
tenant_id: 1,
name: "sensor_policy".into(),
collection: "sensor_data".into(),
tiers: vec![
TierDef {
tier_index: 0,
resolution_ms: 0,
aggregates: Vec::new(),
retain_ms: 604_800_000, archive: None,
},
TierDef {
tier_index: 1,
resolution_ms: 60_000, aggregates: Vec::new(),
retain_ms: 7_776_000_000, archive: None,
},
TierDef {
tier_index: 2,
resolution_ms: 3_600_000, aggregates: Vec::new(),
retain_ms: 63_072_000_000, archive: None,
},
],
auto_tier: true,
enabled: true,
eval_interval_ms: 3_600_000,
owner: "admin".into(),
created_at: 0,
}
}
#[test]
fn segments_cover_full_range() {
let policy = make_policy();
let now = 100_000_000_000i64; let query_start = now - 30 * 86_400_000; let query_end = now;
let segments = compute_tier_segments(&policy, (query_start, query_end), now);
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].tier_index, 1);
assert_eq!(segments[0].collection, "_policy_sensor_policy_tier1");
assert_eq!(segments[0].bucket_interval_ms, 60_000);
assert_eq!(segments[1].tier_index, 0);
assert_eq!(segments[1].collection, "sensor_data");
assert_eq!(segments[1].bucket_interval_ms, 0);
}
#[test]
fn recent_query_uses_raw_only() {
let policy = make_policy();
let now = 100_000_000_000i64;
let query_start = now - 3_600_000; let query_end = now;
let segments = compute_tier_segments(&policy, (query_start, query_end), now);
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].tier_index, 0);
assert_eq!(segments[0].collection, "sensor_data");
}
#[test]
fn old_query_uses_highest_tier() {
let policy = make_policy();
let now = 100_000_000_000i64;
let query_start = now - 365 * 86_400_000; let query_end = now - 180 * 86_400_000;
let segments = compute_tier_segments(&policy, (query_start, query_end), now);
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].tier_index, 2);
assert_eq!(segments[0].collection, "_policy_sensor_policy_tier2");
}
#[test]
fn plan_produces_tasks_per_segment() {
let policy = make_policy();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let tasks = plan_tiered_scan(
&policy,
ScopeIds {
tenant_id: TenantId::new(1),
database_id: crate::types::DatabaseId::DEFAULT,
},
(now - 30 * 86_400_000, now),
Vec::new(),
Vec::new(),
vec![("avg".into(), "temperature".into())],
String::new(),
);
assert!(tasks.len() >= 2); }
}