use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
use chrono::{Duration, TimeZone};
use chronoutil::RelativeDuration;
use thiserror::Error;
use hamelin_eval::{
eval, next_truncation_boundary, reverse_eval, truncate_timestamp,
value::{TimestampValue, Value},
Constraint, Environment, EvalError,
};
use hamelin_lib::{
err::TranslationError,
func::def::FunctionDef,
tree::{
ast::{
expression::{ExpressionKind, TruncUnit},
identifier::{Identifier, ParsedSimpleIdentifier, SimpleIdentifier},
},
typed_ast::{
clause::Projections,
command::{TypedCommandKind, TypedFromCommand, TypedUnionCommand},
expression::{TypedExpression, TypedExpressionKind},
pattern::TypedPattern,
pipeline::{TypedPipeline, ValidPipeline},
query::TypedStatement,
},
},
types::{Type, CALENDAR_INTERVAL, INTERVAL},
};
type TimeRange = RangeInclusive<chrono::DateTime<chrono::Utc>>;
#[derive(Debug, Clone)]
pub struct IncrementalRange {
pub replace_range: TimeRange,
pub query_range: TimeRange,
}
#[derive(Debug, Clone)]
pub struct StrategyDetectionResult {
pub supported: Vec<IncrementalStrategyKind>,
pub rejections: Vec<String>,
}
#[derive(Debug, Error)]
pub enum IncrementalAnalysisError {
#[error("Query had compilation errors: {0}")]
TreeHadError(TranslationError),
#[error("Query uses non-deterministic function: {0}")]
NonDeterministicFunction(String),
#[error("Stale range not provided for required table")]
DependencyNotProvided(Identifier),
#[error("Query has no start time")]
NoStart,
#[error("Query has no end time")]
NoEnd,
#[error("Could not compute incremental field value")]
ExpressionEvaluationError(EvalError),
#[error("Incremental field value is not a timestamp")]
ValueNotTimestamp,
#[error("WINDOW .. WITHIN has unbound range")]
UnboundRange,
#[error("WITHIN value is not an interval")]
ValueNotInterval,
#[error("Command not supported for incremental refresh: {0}")]
CommandNotSupported(String),
#[error("This query does not compute the timestamp field from the timestamp field")]
TimestampLineageError,
#[error("AGG command must group by the timestamp field for incremental refresh")]
AggWithoutTimestampGroupBy,
#[error("Pipeline is empty")]
EmptyPipeline,
#[error("DML statements cannot use incremental refresh")]
DmlNotSupported,
#[error("WITH clause had errors during incremental analysis")]
WithClauseError(Identifier),
}
fn contains_non_deterministic_functions(expr: &TypedExpression) -> Option<Arc<dyn FunctionDef>> {
expr.find(&mut |expr| {
match &expr.kind {
TypedExpressionKind::Apply(apply) => !apply.function_def.is_deterministic(),
_ => false,
}
})
.and_then(|expr| {
match &expr.kind {
TypedExpressionKind::Apply(apply) => Some(apply.function_def.clone()),
_ => None,
}
})
}
fn check_expression_for_non_deterministic(
expr: &TypedExpression,
) -> Result<(), IncrementalAnalysisError> {
contains_non_deterministic_functions(expr)
.map(|bad_expr| IncrementalAnalysisError::NonDeterministicFunction(bad_expr.to_string()))
.map_or(Ok(()), Err)
}
fn check_projections_for_non_deterministic(
projections: &Projections,
) -> Result<(), IncrementalAnalysisError> {
projections
.assignments
.iter()
.try_for_each(|assignment| check_expression_for_non_deterministic(&assignment.expression))
}
fn check_sort_for_non_deterministic(
sort_expressions: &[hamelin_lib::tree::typed_ast::command::TypedSortExpression],
) -> Result<(), IncrementalAnalysisError> {
sort_expressions
.iter()
.try_for_each(|sort_expr| check_expression_for_non_deterministic(&sort_expr.expression))
}
fn initialize_replace_range(
refs: &[Identifier],
stale_ranges: &HashMap<Identifier, Option<TimeRange>>,
current_replace_range: &mut Option<TimeRange>,
) -> Result<(), IncrementalAnalysisError> {
let local_ranges: Result<Vec<_>, _> = refs
.iter()
.map(|id| {
stale_ranges
.get(id)
.cloned()
.ok_or_else(|| IncrementalAnalysisError::DependencyNotProvided(id.clone()))
})
.collect();
let local_ranges = local_ranges?;
let min = local_ranges
.iter()
.flatten()
.map(|x| x.start())
.min()
.ok_or(IncrementalAnalysisError::NoStart)?;
let max = local_ranges
.iter()
.flatten()
.map(|x| x.end())
.max()
.ok_or(IncrementalAnalysisError::NoEnd)?;
*current_replace_range = Some(*min..=*max);
Ok(())
}
fn is_interval_or_range_of_interval_type(expr: &TypedExpression) -> bool {
match expr.resolved_type.as_ref() {
t if *t == INTERVAL || *t == CALENDAR_INTERVAL => true,
Type::Range(range) if *range.of == INTERVAL || *range.of == CALENDAR_INTERVAL => true,
_ => false,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IncrementalStrategyKind {
CascadedAppend,
TimeRangeRefresh,
}
fn compute_replace_range_for_projection(
input_range: &TimeRange,
incremental_field_name: &SimpleIdentifier,
projections: &Projections,
) -> Result<TimeRange, IncrementalAnalysisError> {
let incremental_field_projection = projections.lookup(&incremental_field_name.clone().into());
if let Some(ifp) = incremental_field_projection {
let mut env = Environment::new();
env.bind(
incremental_field_name.clone().into(),
TimestampValue::utc(*input_range.start()).into(),
);
let new_start_value = eval(ifp.expression.as_ref(), &env)
.map_err(|e| IncrementalAnalysisError::ExpressionEvaluationError(e))?
.try_unwrap_timestamp()
.map_err(|_| IncrementalAnalysisError::ValueNotTimestamp)?;
env.bind(
incremental_field_name.clone().into(),
TimestampValue::utc(*input_range.end()).into(),
);
let new_end_value = eval(ifp.expression.as_ref(), &env)
.map_err(|e| IncrementalAnalysisError::ExpressionEvaluationError(e))?
.try_unwrap_timestamp()
.map_err(|_| IncrementalAnalysisError::ValueNotTimestamp)?;
Ok(*new_start_value.instant()..=*new_end_value.instant())
} else {
Ok(input_range.clone())
}
}
fn reverse_eval_timestamp_projection(
output_range: &TimeRange,
incremental_field_name: &SimpleIdentifier,
projections: &Projections,
) -> Result<TimeRange, IncrementalAnalysisError> {
let timestamp_id: Identifier = incremental_field_name.clone().into();
let incremental_field_projection = projections.lookup(×tamp_id);
if let Some(ifp) = incremental_field_projection {
let output_constraint = Constraint::Range {
min: Some(TimestampValue::utc(*output_range.start()).into()),
max: Some(TimestampValue::utc(*output_range.end()).into()),
};
let env = Environment::new();
match reverse_eval(ifp.expression.as_ref(), output_constraint, &env) {
Ok(Some(Constraint::Range {
min: Some(Value::Timestamp(start)),
max: Some(Value::Timestamp(end)),
})) => {
Ok(*start.instant()..=*end.instant())
}
_ => {
Err(IncrementalAnalysisError::TimestampLineageError)
}
}
} else {
Ok(output_range.clone())
}
}
fn compute_replace_range_forward_pass(
pipeline: &TypedPipeline,
stale_ranges: &HashMap<Identifier, Option<TimeRange>>,
incremental_field_name: &SimpleIdentifier,
allow_lookups: bool,
) -> Result<TimeRange, IncrementalAnalysisError> {
let command_list = match &pipeline.kind {
Ok(ValidPipeline { commands, .. }) => commands,
Err(translation_error) => {
return Err(IncrementalAnalysisError::TreeHadError(
translation_error.as_ref().clone(),
))
}
};
let mut current_replace_range: Option<TimeRange> = None;
for command in command_list {
match &command.kind {
TypedCommandKind::Join(_)
| TypedCommandKind::Append(_)
| TypedCommandKind::Limit(_) => {
return Err(IncrementalAnalysisError::CommandNotSupported(
command.ast.kind.command_name().to_string(),
))
}
TypedCommandKind::Lookup(typed_lookup_command) => {
if !allow_lookups {
return Err(IncrementalAnalysisError::CommandNotSupported(
command.ast.kind.command_name().to_string(),
));
}
if let Some(condition) = &typed_lookup_command.condition {
check_expression_for_non_deterministic(condition)?;
}
continue;
}
TypedCommandKind::Error(translation_error) => {
return Err(IncrementalAnalysisError::TreeHadError(
translation_error.as_ref().clone(),
))
}
TypedCommandKind::Where(typed_where_command) => {
check_expression_for_non_deterministic(&typed_where_command.predicate)?;
continue;
}
TypedCommandKind::Within(typed_within_command) => {
if is_interval_or_range_of_interval_type(&typed_within_command.duration) {
return Err(IncrementalAnalysisError::NonDeterministicFunction(
"WITHIN".to_string(),
));
}
check_expression_for_non_deterministic(&typed_within_command.duration)?;
continue;
}
TypedCommandKind::Sort(_) => {
continue;
}
TypedCommandKind::Drop(typed_drop_command) => {
let timestamp_id: Identifier = incremental_field_name.clone().into();
if typed_drop_command.dropped_fields.contains(×tamp_id) {
return Err(IncrementalAnalysisError::TimestampLineageError);
}
continue;
}
TypedCommandKind::From(TypedFromCommand { clauses })
| TypedCommandKind::Union(TypedUnionCommand { clauses }) => {
let refs: Result<Vec<_>, _> =
clauses
.iter()
.filter_map(|clause| clause.reference().ok())
.map(|reference| {
reference.identifier.clone().valid().map_err(|e| {
IncrementalAnalysisError::TreeHadError(e.as_ref().clone())
})
})
.collect();
initialize_replace_range(&refs?, stale_ranges, &mut current_replace_range)?;
}
TypedCommandKind::Window(typed_window_command) => {
check_projections_for_non_deterministic(&typed_window_command.projections)?;
check_projections_for_non_deterministic(&typed_window_command.group_by)?;
if let Some(within) = &typed_window_command.within {
check_expression_for_non_deterministic(within)?;
}
if let Some(last_range) = ¤t_replace_range {
let new_range = compute_replace_range_for_projection(
last_range,
incremental_field_name,
&typed_window_command.projections,
)?;
current_replace_range = Some(new_range);
}
}
TypedCommandKind::Agg(typed_agg_command) => {
check_projections_for_non_deterministic(&typed_agg_command.group_by)?;
check_projections_for_non_deterministic(&typed_agg_command.aggregates)?;
check_sort_for_non_deterministic(&typed_agg_command.sort_by)?;
if let Some(last_range) = ¤t_replace_range {
let timestamp_id: Identifier = incremental_field_name.clone().into();
if typed_agg_command.group_by.lookup(×tamp_id).is_none() {
return Err(IncrementalAnalysisError::AggWithoutTimestampGroupBy);
}
let new_range = compute_replace_range_for_projection(
last_range,
incremental_field_name,
&typed_agg_command.group_by,
)?;
current_replace_range = Some(new_range);
}
}
TypedCommandKind::Let(typed_let_command) => {
check_projections_for_non_deterministic(&typed_let_command.projections)?;
if let Some(last_range) = ¤t_replace_range {
let new_range = compute_replace_range_for_projection(
last_range,
incremental_field_name,
&typed_let_command.projections,
)?;
current_replace_range = Some(new_range);
}
}
TypedCommandKind::Match(typed_match_command) => {
check_projections_for_non_deterministic(&typed_match_command.group_by)?;
check_projections_for_non_deterministic(&typed_match_command.agg)?;
check_sort_for_non_deterministic(&typed_match_command.sort)?;
if let Some(within) = &typed_match_command.within {
check_expression_for_non_deterministic(within)?;
if typed_match_command.sort.len() != 1 {
return Err(IncrementalAnalysisError::CommandNotSupported(
"MATCH with WITHIN requires exactly one sort expression for incremental refresh".to_string(),
));
}
let is_timestamp_sort = matches!(
&typed_match_command.sort[0].expression.ast.kind,
ExpressionKind::FieldReference(col_ref)
if matches!(&col_ref.field_name, ParsedSimpleIdentifier::Valid(id) if id == "timestamp")
);
if !is_timestamp_sort {
return Err(IncrementalAnalysisError::CommandNotSupported(
"MATCH with WITHIN requires SORT by 'timestamp' field for incremental refresh".to_string(),
));
}
}
let refs = collect_match_pattern_references(&typed_match_command.patterns)?;
initialize_replace_range(&refs, stale_ranges, &mut current_replace_range)?;
}
TypedCommandKind::Select(typed_select_command) => {
check_projections_for_non_deterministic(&typed_select_command.projections)?;
if let Some(last_range) = ¤t_replace_range {
let timestamp_id: Identifier = incremental_field_name.clone().into();
if typed_select_command
.projections
.lookup(×tamp_id)
.is_none()
{
return Err(IncrementalAnalysisError::TimestampLineageError);
}
let new_range = compute_replace_range_for_projection(
last_range,
incremental_field_name,
&typed_select_command.projections,
)?;
current_replace_range = Some(new_range);
}
}
TypedCommandKind::Explode(typed_explode_command) => {
let timestamp_id: Identifier = incremental_field_name.clone().into();
for item in &typed_explode_command.items {
check_expression_for_non_deterministic(&item.assignment.expression)?;
if let Ok(identifier) = item.assignment.identifier.valid_ref() {
if *identifier == timestamp_id {
return Err(IncrementalAnalysisError::TimestampLineageError);
}
}
}
continue;
}
TypedCommandKind::Parse(typed_parse_command) => {
if let Some(source) = &typed_parse_command.source {
check_expression_for_non_deterministic(source)?;
}
let timestamp_id: Identifier = incremental_field_name.clone().into();
if typed_parse_command
.identifiers
.iter()
.any(|id| id.valid_ref().ok() == Some(×tamp_id))
{
return Err(IncrementalAnalysisError::TimestampLineageError);
}
continue;
}
TypedCommandKind::Unnest(typed_unnest_command) => {
check_expression_for_non_deterministic(&typed_unnest_command.expression)?;
let timestamp_id: Identifier = incremental_field_name.clone().into();
if command.output_schema.lookup(×tamp_id).is_some() {
return Err(IncrementalAnalysisError::TimestampLineageError);
}
continue;
}
TypedCommandKind::Nest(typed_nest_command) => {
let timestamp_id: Identifier = incremental_field_name.clone().into();
if let Ok(identifier) = typed_nest_command.identifier.valid_ref() {
if *identifier == timestamp_id {
return Err(IncrementalAnalysisError::TimestampLineageError);
}
}
continue;
}
}
}
current_replace_range.ok_or(IncrementalAnalysisError::EmptyPipeline)
}
fn snap_to_partition_boundaries(range: &TimeRange, unit: &TruncUnit) -> TimeRange {
let snapped_start = truncate_timestamp(&TimestampValue::utc(*range.start()), unit)
.map(|ts| *ts.instant())
.unwrap_or(*range.start());
let truncated_end = truncate_timestamp(&TimestampValue::utc(*range.end()), unit)
.unwrap_or_else(|_| TimestampValue::utc(*range.end()));
let snapped_end = next_truncation_boundary(&truncated_end, unit)
.map(|ts| *ts.instant())
.unwrap_or(*range.end());
snapped_start..=snapped_end
}
fn compute_query_range_backward_pass(
pipeline: &TypedPipeline,
final_replace_range: &TimeRange,
incremental_field_name: &SimpleIdentifier,
) -> Result<TimeRange, IncrementalAnalysisError> {
let command_list = match &pipeline.kind {
Ok(ValidPipeline { commands, .. }) => commands,
Err(e) => return Err(IncrementalAnalysisError::TreeHadError(e.as_ref().clone())),
};
let mut current_output_range = final_replace_range.clone();
let mut query_ranges: Vec<TimeRange> = Vec::new();
for command in command_list.iter().rev() {
match &command.kind {
TypedCommandKind::Let(typed_let_command) => {
let input_range = reverse_eval_timestamp_projection(
¤t_output_range,
incremental_field_name,
&typed_let_command.projections,
)?;
query_ranges.push(input_range.clone());
current_output_range = input_range;
}
TypedCommandKind::Select(typed_select_command) => {
let input_range = reverse_eval_timestamp_projection(
¤t_output_range,
incremental_field_name,
&typed_select_command.projections,
)?;
query_ranges.push(input_range.clone());
current_output_range = input_range;
}
TypedCommandKind::Window(typed_window_command) => {
let input_range = reverse_eval_timestamp_projection(
¤t_output_range,
incremental_field_name,
&typed_window_command.projections,
)?;
let mut query_range_lower = *input_range.start();
let mut query_range_upper = *input_range.end();
if let Some(within) = &typed_window_command.within {
if let TypedExpressionKind::Error(err) = &within.kind {
return Err(IncrementalAnalysisError::TreeHadError(
err.error.as_ref().clone(),
));
}
let env = Environment::new();
if let Ok(evaluated_within) = eval(within.as_ref(), &env) {
match evaluated_within {
Value::Interval(time_delta) => {
if time_delta > Duration::zero() {
query_range_upper = query_range_upper + time_delta;
} else {
query_range_lower = query_range_lower + time_delta;
}
}
Value::CalendarInterval(months) => {
let calendar_interval = RelativeDuration::months(months);
if months > 0 {
query_range_upper = query_range_upper + calendar_interval;
} else {
query_range_lower = query_range_lower + calendar_interval;
}
}
Value::Range(range_value) => {
if let (Some(l), Some(u)) = (&range_value.lower, &range_value.upper)
{
match (l, u) {
(Value::Interval(l), Value::Interval(u)) => {
query_range_lower = query_range_lower + *l;
query_range_upper = query_range_upper + *u;
}
(
Value::CalendarInterval(l),
Value::CalendarInterval(u),
) => {
query_range_lower =
query_range_lower + RelativeDuration::months(*l);
query_range_upper =
query_range_upper + RelativeDuration::months(*u);
}
_ => {}
}
}
}
_ => {}
}
}
}
query_ranges.push(query_range_lower..=query_range_upper);
current_output_range = input_range;
}
TypedCommandKind::Match(typed_match_command) => {
let input_range = current_output_range.clone();
let mut query_range_lower = *input_range.start();
let query_range_upper = *input_range.end();
if let Some(within) = &typed_match_command.within {
if let TypedExpressionKind::Error(err) = &within.kind {
return Err(IncrementalAnalysisError::TreeHadError(
err.error.as_ref().clone(),
));
}
let env = Environment::new();
if let Ok(evaluated_within) = eval(within, &env) {
match evaluated_within {
Value::Interval(time_delta) => {
query_range_lower = query_range_lower - time_delta;
}
Value::CalendarInterval(months) => {
query_range_lower =
query_range_lower + RelativeDuration::months(-months);
}
_ => {}
}
}
}
query_ranges.push(query_range_lower..=query_range_upper);
break;
}
TypedCommandKind::Agg(typed_agg_command) => {
let input_range = reverse_eval_timestamp_projection(
¤t_output_range,
incremental_field_name,
&typed_agg_command.group_by,
)?;
query_ranges.push(input_range.clone());
current_output_range = input_range;
}
TypedCommandKind::Where(_)
| TypedCommandKind::Sort(_)
| TypedCommandKind::Limit(_)
| TypedCommandKind::Within(_)
| TypedCommandKind::Drop(_)
| TypedCommandKind::Lookup(_) => {
query_ranges.push(current_output_range.clone());
}
TypedCommandKind::From(_) | TypedCommandKind::Union(_) => {
query_ranges.push(current_output_range.clone());
break;
}
_ => {
query_ranges.push(current_output_range.clone());
}
}
}
let query_range_start = query_ranges
.iter()
.map(|r| r.start())
.min()
.ok_or(IncrementalAnalysisError::EmptyPipeline)?;
let query_range_end = query_ranges
.iter()
.map(|r| r.end())
.max()
.ok_or(IncrementalAnalysisError::EmptyPipeline)?;
Ok(*query_range_start..=*query_range_end)
}
pub fn compute_incremental_ranges_for_pipeline(
pipeline: &TypedPipeline,
stale_ranges: HashMap<Identifier, Option<TimeRange>>,
incremental_field_name: &SimpleIdentifier,
partition_unit: Option<TruncUnit>,
allow_lookups: bool,
) -> Result<IncrementalRange, IncrementalAnalysisError> {
let replace_range = compute_replace_range_forward_pass(
&pipeline,
&stale_ranges,
incremental_field_name,
allow_lookups,
)?;
let replace_range = if let Some(unit) = partition_unit {
snap_to_partition_boundaries(&replace_range, &unit)
} else {
replace_range
};
let query_range =
compute_query_range_backward_pass(&pipeline, &replace_range, incremental_field_name)?;
Ok(IncrementalRange {
replace_range,
query_range,
})
}
pub fn is_cascaded_append_eligible_for_pipeline(
pipeline: Arc<TypedPipeline>,
allow_lookups: bool,
) -> Result<(), IncrementalAnalysisError> {
let command_list = match &pipeline.kind {
Ok(ValidPipeline { commands, .. }) => commands,
Err(e) => return Err(IncrementalAnalysisError::TreeHadError(e.as_ref().clone())),
};
for command in command_list {
match &command.kind {
TypedCommandKind::From(_) | TypedCommandKind::Union(_) | TypedCommandKind::Sort(_) => {
continue;
}
TypedCommandKind::Where(typed_where_command) => {
check_expression_for_non_deterministic(&typed_where_command.predicate)?;
continue;
}
TypedCommandKind::Let(typed_let_command) => {
check_projections_for_non_deterministic(&typed_let_command.projections)?;
continue;
}
TypedCommandKind::Select(typed_select_command) => {
check_projections_for_non_deterministic(&typed_select_command.projections)?;
continue;
}
TypedCommandKind::Explode(typed_explode_command) => {
for item in &typed_explode_command.items {
check_expression_for_non_deterministic(&item.assignment.expression)?;
}
continue;
}
TypedCommandKind::Unnest(typed_unnest_command) => {
check_expression_for_non_deterministic(&typed_unnest_command.expression)?;
continue;
}
TypedCommandKind::Within(typed_within_command) => {
if is_interval_or_range_of_interval_type(&typed_within_command.duration) {
return Err(IncrementalAnalysisError::NonDeterministicFunction(
"WITHIN".to_string(),
));
}
check_expression_for_non_deterministic(&typed_within_command.duration)?;
continue;
}
TypedCommandKind::Drop(_) => {
continue;
}
TypedCommandKind::Lookup(typed_lookup_command) => {
if !allow_lookups {
return Err(IncrementalAnalysisError::CommandNotSupported(
command.ast.kind.command_name().to_string(),
));
}
if let Some(condition) = &typed_lookup_command.condition {
check_expression_for_non_deterministic(condition)?;
}
continue;
}
TypedCommandKind::Window(_)
| TypedCommandKind::Agg(_)
| TypedCommandKind::Join(_)
| TypedCommandKind::Match(_)
| TypedCommandKind::Append(_)
| TypedCommandKind::Nest(_)
| TypedCommandKind::Parse(_)
| TypedCommandKind::Limit(_) => {
return Err(IncrementalAnalysisError::CommandNotSupported(
command.ast.kind.command_name().to_string(),
))
}
TypedCommandKind::Error(translation_error) => {
return Err(IncrementalAnalysisError::TreeHadError(
translation_error.as_ref().clone(),
))
}
}
}
Ok(())
}
fn collect_match_pattern_references(
patterns: &[TypedPattern],
) -> Result<Vec<Identifier>, IncrementalAnalysisError> {
let mut refs = Vec::new();
for pattern in patterns {
match pattern {
TypedPattern::Quantified(quant) => {
if let Ok(table_ref) = quant.typed_from.reference() {
let id =
table_ref.identifier.clone().valid().map_err(|e| {
IncrementalAnalysisError::TreeHadError(e.as_ref().clone())
})?;
refs.push(id);
}
}
TypedPattern::Nested(nested) => {
refs.extend(collect_match_pattern_references(&nested.patterns)?);
}
TypedPattern::Error(_) => {}
}
}
Ok(refs)
}
fn extract_table_references(pipeline: &TypedPipeline) -> Vec<Identifier> {
let first = match &pipeline.kind {
Ok(ValidPipeline { commands, .. }) => commands.first(),
Err(_) => return Vec::new(),
};
match first.map(|c| &c.kind) {
Some(TypedCommandKind::From(TypedFromCommand { clauses }))
| Some(TypedCommandKind::Union(TypedUnionCommand { clauses })) => clauses
.iter()
.filter_map(|c| c.reference().ok()?.identifier.clone().valid().ok())
.collect(),
Some(TypedCommandKind::Match(match_cmd)) => {
collect_match_pattern_references(&match_cmd.patterns).unwrap_or_default()
}
_ => Vec::new(),
}
}
pub fn detect_supported_strategies_for_pipeline(
pipeline: Arc<TypedPipeline>,
allow_lookups: bool,
) -> StrategyDetectionResult {
let mut supported = Vec::new();
let mut rejections = Vec::new();
match is_cascaded_append_eligible_for_pipeline(Arc::clone(&pipeline), allow_lookups) {
Ok(()) => supported.push(IncrementalStrategyKind::CascadedAppend),
Err(e) => rejections.push(format!(
"Cannot compute incrementally with cascaded append: {e}"
)),
}
let test_start = chrono::Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
let test_end = chrono::Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap();
let test_range = test_start..=test_end;
let table_refs = extract_table_references(&pipeline);
let bogus_stale_ranges: HashMap<Identifier, Option<TimeRange>> = table_refs
.into_iter()
.map(|id| (id, Some(test_range.clone())))
.collect();
let incremental_field = SimpleIdentifier::new("timestamp");
match compute_incremental_ranges_for_pipeline(
&pipeline,
bogus_stale_ranges,
&incremental_field,
None,
allow_lookups,
) {
Ok(_) => supported.push(IncrementalStrategyKind::TimeRangeRefresh),
Err(e) => rejections.push(format!(
"Cannot compute incrementally with time range refresh: {e}"
)),
}
StrategyDetectionResult {
supported,
rejections,
}
}
pub fn compute_incremental_ranges_for_query(
typed_statement: &TypedStatement,
initial_stale_ranges: HashMap<Identifier, Option<TimeRange>>,
incremental_field_name: &SimpleIdentifier,
partition_unit: Option<TruncUnit>,
allow_lookups: bool,
) -> Result<IncrementalRange, IncrementalAnalysisError> {
if !typed_statement.side_effect.is_none() {
return Err(IncrementalAnalysisError::DmlNotSupported);
}
let main_pipeline = &typed_statement.pipeline;
let mut cte_replace_ranges: HashMap<Identifier, TimeRange> = HashMap::new();
for with_clause in &typed_statement.with_clauses {
let mut cte_stale_ranges = initial_stale_ranges.clone();
for (name, range) in &cte_replace_ranges {
cte_stale_ranges.insert(name.clone(), Some(range.clone()));
}
let cte_replace_range = compute_replace_range_forward_pass(
&with_clause.pipeline,
&cte_stale_ranges,
incremental_field_name,
allow_lookups,
)?;
let cte_name = with_clause
.name
.clone()
.valid()
.map_err(|e| IncrementalAnalysisError::TreeHadError(e.as_ref().clone()))?;
cte_replace_ranges.insert(cte_name, cte_replace_range);
}
let mut main_stale_ranges = initial_stale_ranges.clone();
for (name, range) in &cte_replace_ranges {
main_stale_ranges.insert(name.clone(), Some(range.clone()));
}
let main_replace_range = compute_replace_range_forward_pass(
main_pipeline,
&main_stale_ranges,
incremental_field_name,
allow_lookups,
)?;
let final_replace_range = if let Some(unit) = partition_unit {
snap_to_partition_boundaries(&main_replace_range, &unit)
} else {
main_replace_range
};
let main_query_range = compute_query_range_backward_pass(
main_pipeline,
&final_replace_range,
incremental_field_name,
)?;
let mut all_query_starts = vec![*main_query_range.start()];
let mut all_query_ends = vec![*main_query_range.end()];
for with_clause in &typed_statement.with_clauses {
let cte_name = with_clause
.name
.clone()
.valid()
.map_err(|e| IncrementalAnalysisError::TreeHadError(e.as_ref().clone()))?;
let cte_replace_range = cte_replace_ranges.get(&cte_name).unwrap();
let cte_query_range = compute_query_range_backward_pass(
&with_clause.pipeline,
cte_replace_range,
incremental_field_name,
)?;
all_query_starts.push(*cte_query_range.start());
all_query_ends.push(*cte_query_range.end());
}
let aggregated_query_start = all_query_starts
.into_iter()
.min()
.ok_or(IncrementalAnalysisError::EmptyPipeline)?;
let aggregated_query_end = all_query_ends
.into_iter()
.max()
.ok_or(IncrementalAnalysisError::EmptyPipeline)?;
Ok(IncrementalRange {
query_range: aggregated_query_start..=aggregated_query_end,
replace_range: final_replace_range,
})
}
pub fn is_cascaded_append_eligible_for_query(
typed_statement: &TypedStatement,
allow_lookups: bool,
) -> Result<(), IncrementalAnalysisError> {
for with_clause in &typed_statement.with_clauses {
is_cascaded_append_eligible_for_pipeline(with_clause.pipeline.clone(), allow_lookups)?;
}
if !typed_statement.side_effect.is_none() {
return Err(IncrementalAnalysisError::DmlNotSupported);
}
is_cascaded_append_eligible_for_pipeline(typed_statement.pipeline.clone(), allow_lookups)
}
pub fn detect_supported_strategies_for_query(
typed_statement: &TypedStatement,
allow_lookups: bool,
) -> StrategyDetectionResult {
let mut supported = Vec::new();
let mut rejections = Vec::new();
match is_cascaded_append_eligible_for_query(typed_statement, allow_lookups) {
Ok(()) => supported.push(IncrementalStrategyKind::CascadedAppend),
Err(e) => rejections.push(format!("CascadedAppend: {e}")),
}
let test_start = chrono::Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
let test_end = chrono::Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap();
let test_range = test_start..=test_end;
let mut all_table_refs = Vec::new();
for with_clause in &typed_statement.with_clauses {
all_table_refs.extend(extract_table_references(&with_clause.pipeline));
}
all_table_refs.extend(extract_table_references(&typed_statement.pipeline));
let bogus_stale_ranges: HashMap<Identifier, Option<TimeRange>> = all_table_refs
.into_iter()
.map(|id| (id, Some(test_range.clone())))
.collect();
let incremental_field = SimpleIdentifier::new("timestamp");
match compute_incremental_ranges_for_query(
typed_statement,
bogus_stale_ranges,
&incremental_field,
None,
allow_lookups,
) {
Ok(_) => supported.push(IncrementalStrategyKind::TimeRangeRefresh),
Err(e) => rejections.push(format!("TimeRangeRefresh: {e}")),
}
StrategyDetectionResult {
supported,
rejections,
}
}
#[cfg(test)]
mod tests;