use chrono::{Datelike, Months, NaiveDate};
use crate::config::{PartitionGranularity, SourceType};
pub(crate) const HIVE_NULL_PARTITION: &str = "__HIVE_DEFAULT_PARTITION__";
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct PartitionRange {
pub(crate) label_value: String,
pub(crate) lo: NaiveDate,
pub(crate) hi: NaiveDate,
}
pub(crate) fn generate_ranges(
min_day: NaiveDate,
max_day: NaiveDate,
granularity: PartitionGranularity,
) -> Vec<PartitionRange> {
if max_day < min_day {
return Vec::new();
}
match granularity {
PartitionGranularity::Day => day_ranges(min_day, max_day),
PartitionGranularity::Month => month_ranges(min_day, max_day),
PartitionGranularity::Year => year_ranges(min_day, max_day),
}
}
pub(crate) fn build_range_query(
base_query: &str,
col: &str,
range: &PartitionRange,
source_type: SourceType,
) -> String {
let q = crate::sql::quote_ident(source_type, col);
format!(
"SELECT * FROM ({base}) AS _rivet_part WHERE {q} >= '{lo}' AND {q} < '{hi}'",
base = base_query,
lo = range.lo.format("%Y-%m-%d"),
hi = range.hi.format("%Y-%m-%d"),
)
}
pub(crate) fn build_null_query(base_query: &str, col: &str, source_type: SourceType) -> String {
let q = crate::sql::quote_ident(source_type, col);
format!("SELECT * FROM ({base_query}) AS _rivet_part WHERE {q} IS NULL")
}
pub(crate) fn build_null_count_query(
base_query: &str,
col: &str,
source_type: SourceType,
) -> String {
let q = crate::sql::quote_ident(source_type, col);
format!("SELECT count(*) FROM ({base_query}) AS _rivet_nc WHERE {q} IS NULL")
}
fn day_ranges(min_day: NaiveDate, max_day: NaiveDate) -> Vec<PartitionRange> {
let mut out = Vec::new();
let mut cur = min_day;
while cur <= max_day {
let hi = cur.succ_opt().unwrap_or(cur);
out.push(PartitionRange {
label_value: cur.format("%Y-%m-%d").to_string(),
lo: cur,
hi,
});
if hi == cur {
break;
}
cur = hi;
}
out
}
fn month_start(d: NaiveDate) -> NaiveDate {
NaiveDate::from_ymd_opt(d.year(), d.month(), 1).expect("month-start is always valid")
}
fn month_ranges(min_day: NaiveDate, max_day: NaiveDate) -> Vec<PartitionRange> {
let mut out = Vec::new();
let mut cur = month_start(min_day);
let last = month_start(max_day);
loop {
let hi = cur.checked_add_months(Months::new(1)).unwrap_or(cur);
out.push(PartitionRange {
label_value: cur.format("%Y-%m").to_string(),
lo: cur,
hi,
});
if cur >= last || hi == cur {
break;
}
cur = hi;
}
out
}
fn year_start(d: NaiveDate) -> NaiveDate {
NaiveDate::from_ymd_opt(d.year(), 1, 1).expect("year-start is always valid")
}
fn year_ranges(min_day: NaiveDate, max_day: NaiveDate) -> Vec<PartitionRange> {
let mut out = Vec::new();
let mut cur = year_start(min_day);
let last = year_start(max_day);
loop {
let hi = NaiveDate::from_ymd_opt(cur.year() + 1, 1, 1).unwrap_or(cur);
out.push(PartitionRange {
label_value: cur.format("%Y").to_string(),
lo: cur,
hi,
});
if cur >= last || hi == cur {
break;
}
cur = hi;
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn d(s: &str) -> NaiveDate {
NaiveDate::parse_from_str(s, "%Y-%m-%d").unwrap()
}
fn labels(ranges: &[PartitionRange]) -> Vec<String> {
ranges.iter().map(|r| r.label_value.clone()).collect()
}
#[test]
fn day_three_day_span() {
let r = generate_ranges(d("2023-01-01"), d("2023-01-03"), PartitionGranularity::Day);
assert_eq!(labels(&r), ["2023-01-01", "2023-01-02", "2023-01-03"]);
assert_eq!(r[0].lo, d("2023-01-01"));
assert_eq!(r[0].hi, d("2023-01-02"));
assert_eq!(r[2].hi, d("2023-01-04"));
}
#[test]
fn day_single_day_when_min_equals_max() {
let r = generate_ranges(d("2023-06-15"), d("2023-06-15"), PartitionGranularity::Day);
assert_eq!(labels(&r), ["2023-06-15"]);
assert_eq!(r[0].hi, d("2023-06-16"));
}
#[test]
fn day_crosses_month_boundary() {
let r = generate_ranges(d("2023-01-31"), d("2023-02-01"), PartitionGranularity::Day);
assert_eq!(labels(&r), ["2023-01-31", "2023-02-01"]);
}
#[test]
fn day_crosses_leap_day() {
let r = generate_ranges(d("2024-02-28"), d("2024-03-01"), PartitionGranularity::Day);
assert_eq!(labels(&r), ["2024-02-28", "2024-02-29", "2024-03-01"]);
}
#[test]
fn month_spans_three_months_snaps_to_month_edges() {
let r = generate_ranges(
d("2023-01-15"),
d("2023-03-05"),
PartitionGranularity::Month,
);
assert_eq!(labels(&r), ["2023-01", "2023-02", "2023-03"]);
assert_eq!(r[0].lo, d("2023-01-01"));
assert_eq!(r[2].hi, d("2023-04-01"));
}
#[test]
fn month_single_when_within_one_month() {
let r = generate_ranges(
d("2023-06-10"),
d("2023-06-20"),
PartitionGranularity::Month,
);
assert_eq!(labels(&r), ["2023-06"]);
assert_eq!(r[0].lo, d("2023-06-01"));
assert_eq!(r[0].hi, d("2023-07-01"));
}
#[test]
fn month_crosses_year_boundary() {
let r = generate_ranges(
d("2023-12-15"),
d("2024-01-10"),
PartitionGranularity::Month,
);
assert_eq!(labels(&r), ["2023-12", "2024-01"]);
assert_eq!(r[1].hi, d("2024-02-01"));
}
#[test]
fn year_spans_three_years() {
let r = generate_ranges(d("2022-05-01"), d("2024-02-01"), PartitionGranularity::Year);
assert_eq!(labels(&r), ["2022", "2023", "2024"]);
assert_eq!(r[0].lo, d("2022-01-01"));
assert_eq!(r[2].hi, d("2025-01-01"));
}
#[test]
fn year_single() {
let r = generate_ranges(d("2023-03-03"), d("2023-11-11"), PartitionGranularity::Year);
assert_eq!(labels(&r), ["2023"]);
}
#[test]
fn empty_when_max_before_min() {
let r = generate_ranges(d("2023-02-01"), d("2023-01-01"), PartitionGranularity::Day);
assert!(r.is_empty());
}
#[test]
fn ranges_are_contiguous_and_gap_free_day() {
let r = generate_ranges(d("2023-01-01"), d("2023-01-10"), PartitionGranularity::Day);
for w in r.windows(2) {
assert_eq!(w[0].hi, w[1].lo);
}
}
fn one_day(day: &str) -> PartitionRange {
generate_ranges(d(day), d(day), PartitionGranularity::Day)
.into_iter()
.next()
.unwrap()
}
#[test]
fn range_query_pg_is_half_open_and_quotes_ident() {
let q = build_range_query(
"SELECT * FROM events",
"created_at",
&one_day("2023-01-03"),
SourceType::Postgres,
);
assert_eq!(
q,
"SELECT * FROM (SELECT * FROM events) AS _rivet_part \
WHERE \"created_at\" >= '2023-01-03' AND \"created_at\" < '2023-01-04'"
);
}
#[test]
fn range_query_mysql_backtick_quotes() {
let q = build_range_query(
"SELECT * FROM events",
"created_at",
&one_day("2023-01-03"),
SourceType::Mysql,
);
assert!(q.contains("WHERE `created_at` >= '2023-01-03' AND `created_at` < '2023-01-04'"));
}
#[test]
fn null_query_filters_is_null() {
let q = build_null_query("SELECT * FROM events", "created_at", SourceType::Postgres);
assert_eq!(
q,
"SELECT * FROM (SELECT * FROM events) AS _rivet_part WHERE \"created_at\" IS NULL"
);
}
#[test]
fn null_count_query_filters_is_null() {
assert!(
build_null_count_query("SELECT * FROM events", "created_at", SourceType::Postgres)
.contains("WHERE \"created_at\" IS NULL")
);
}
}