use serde::{Deserialize, Serialize};
use crate::config::{
CompressionType, DestinationConfig, FormatType, IncrementalCursorMode, MetaColumns,
ParquetConfig, QualityConfig, SchemaDriftPolicy, SourceConfig, TimeColumnType,
};
use crate::tuning::SourceTuning;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkedPlan {
pub column: String,
pub chunk_size: usize,
pub chunk_count: Option<usize>,
pub parallel: usize,
pub dense: bool,
pub by_days: Option<u32>,
pub checkpoint: bool,
pub max_attempts: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeysetPlan {
pub key_column: String,
pub chunk_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResolvedRunPlan {
pub export_name: String,
pub base_query: String,
pub strategy: ExtractionStrategy,
pub format: FormatType,
pub compression: CompressionType,
pub compression_level: Option<u32>,
pub max_file_size_bytes: Option<u64>,
pub skip_empty: bool,
pub meta_columns: MetaColumns,
pub destination: DestinationConfig,
pub quality: Option<QualityConfig>,
pub tuning: SourceTuning,
pub tuning_profile_label: String,
pub validate: bool,
pub reconcile: bool,
pub resume: bool,
pub verify: crate::config::VerifyMode,
pub source: SourceConfig,
pub column_overrides: crate::types::ColumnOverrides,
pub schema_drift_policy: SchemaDriftPolicy,
pub shape_drift_warn_factor: f64,
pub parquet: Option<ParquetConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalCursorPlan {
pub primary_column: String,
pub fallback_column: Option<String>,
pub mode: IncrementalCursorMode,
}
impl IncrementalCursorPlan {
pub const RIVET_COALESCE_CURSOR_COL: &'static str = "_rivet_coalesced_cursor";
pub fn column_for_storage_extract(&self) -> &str {
match self.mode {
IncrementalCursorMode::SingleColumn => self.primary_column.as_str(),
IncrementalCursorMode::Coalesce => Self::RIVET_COALESCE_CURSOR_COL,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExtractionStrategy {
Snapshot,
Incremental(IncrementalCursorPlan),
Chunked(ChunkedPlan),
Keyset(KeysetPlan),
TimeWindow {
column: String,
column_type: TimeColumnType,
days_window: u32,
},
}
impl ExtractionStrategy {
pub fn mode_label(&self) -> &'static str {
match self {
ExtractionStrategy::Snapshot => "full",
ExtractionStrategy::Incremental(_) => "incremental",
ExtractionStrategy::Chunked(_) => "chunked",
ExtractionStrategy::Keyset(_) => "keyset",
ExtractionStrategy::TimeWindow { .. } => "timewindow",
}
}
pub fn needs_cursor_state(&self) -> bool {
matches!(self, ExtractionStrategy::Incremental(_))
}
pub fn requires_parallel_execution(&self) -> bool {
matches!(self, ExtractionStrategy::Chunked(cp) if cp.parallel > 1)
}
pub fn is_resumable(&self) -> bool {
matches!(self, ExtractionStrategy::Chunked(cp) if cp.checkpoint)
}
pub fn cursor_column(&self) -> Option<&str> {
match self {
ExtractionStrategy::Incremental(p) => Some(p.primary_column.as_str()),
_ => None,
}
}
pub fn incremental_plan(&self) -> Option<&IncrementalCursorPlan> {
match self {
ExtractionStrategy::Incremental(p) => Some(p),
_ => None,
}
}
pub fn cursor_extract_column(&self) -> Option<&str> {
match self {
ExtractionStrategy::Incremental(p) => Some(p.column_for_storage_extract()),
ExtractionStrategy::Keyset(k) => Some(k.key_column.as_str()),
_ => None,
}
}
pub fn resolve_query(
&self,
base_query: &str,
source_type: crate::config::SourceType,
) -> Option<String> {
match self {
ExtractionStrategy::Snapshot | ExtractionStrategy::Incremental(_) => {
Some(base_query.to_string())
}
ExtractionStrategy::TimeWindow {
column,
column_type,
days_window,
} => Some(build_time_window_query(
base_query,
column,
*column_type,
*days_window,
source_type,
)),
ExtractionStrategy::Chunked(_) | ExtractionStrategy::Keyset(_) => None,
}
}
}
pub fn build_time_window_query(
base_query: &str,
time_column: &str,
time_type: TimeColumnType,
days_window: u32,
source_type: crate::config::SourceType,
) -> String {
let quoted_col = crate::sql::quote_ident(source_type, time_column);
let now = chrono::Utc::now();
let window_start = chrono::Duration::try_days(days_window as i64)
.and_then(|d| now.checked_sub_signed(d))
.unwrap_or(chrono::DateTime::<chrono::Utc>::MIN_UTC);
let truncated = window_start
.date_naive()
.and_hms_opt(0, 0, 0)
.expect("midnight is always valid");
let condition = match time_type {
TimeColumnType::Timestamp => {
format!(
"{} >= '{}'",
quoted_col,
truncated.format("%Y-%m-%d %H:%M:%S")
)
}
TimeColumnType::Unix => {
format!("{} >= {}", quoted_col, truncated.and_utc().timestamp())
}
};
format!(
"SELECT * FROM ({base}) AS _rivet WHERE {cond}",
base = base_query,
cond = condition,
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::SourceType;
#[test]
fn snapshot_strategy_contracts() {
let s = ExtractionStrategy::Snapshot;
assert!(!s.needs_cursor_state());
assert!(!s.is_resumable());
assert!(s.cursor_column().is_none());
let q = s.resolve_query("SELECT 1", SourceType::Postgres).unwrap();
assert_eq!(q, "SELECT 1");
}
#[test]
fn incremental_strategy_contracts() {
let s = ExtractionStrategy::Incremental(IncrementalCursorPlan {
primary_column: "updated_at".into(),
fallback_column: None,
mode: IncrementalCursorMode::SingleColumn,
});
assert!(s.needs_cursor_state());
assert!(!s.is_resumable());
assert_eq!(s.cursor_column(), Some("updated_at"));
let q = s
.resolve_query("SELECT * FROM orders", SourceType::Postgres)
.unwrap();
assert_eq!(q, "SELECT * FROM orders");
}
#[test]
fn chunked_without_checkpoint_contracts() {
let s = ExtractionStrategy::Chunked(ChunkedPlan {
column: "id".into(),
chunk_size: 10_000,
chunk_count: None,
parallel: 1,
dense: false,
by_days: None,
checkpoint: false,
max_attempts: 3,
});
assert!(!s.needs_cursor_state());
assert!(!s.is_resumable());
assert!(s.cursor_column().is_none());
assert!(s.resolve_query("SELECT 1", SourceType::Postgres).is_none());
}
#[test]
fn chunked_with_checkpoint_is_resumable() {
let s = ExtractionStrategy::Chunked(ChunkedPlan {
column: "id".into(),
chunk_size: 10_000,
chunk_count: None,
parallel: 1,
dense: false,
by_days: None,
checkpoint: true,
max_attempts: 3,
});
assert!(s.is_resumable());
assert!(s.resolve_query("SELECT 1", SourceType::Postgres).is_none());
}
#[test]
fn time_window_strategy_contracts() {
let s = ExtractionStrategy::TimeWindow {
column: "created_at".into(),
column_type: TimeColumnType::Timestamp,
days_window: 7,
};
assert!(!s.needs_cursor_state());
assert!(!s.is_resumable());
assert!(s.cursor_column().is_none());
let q = s
.resolve_query("SELECT * FROM events", SourceType::Postgres)
.unwrap();
assert!(q.contains("_rivet WHERE"));
assert!(q.contains("\"created_at\" >="));
}
#[test]
fn build_time_window_query_timestamp() {
let q = build_time_window_query(
"SELECT * FROM events",
"created_at",
TimeColumnType::Timestamp,
7,
SourceType::Postgres,
);
assert!(q.contains("\"created_at\" >= '"), "got: {}", q);
assert!(q.contains("_rivet WHERE"));
}
#[test]
fn build_time_window_query_unix() {
let q = build_time_window_query(
"SELECT * FROM events",
"ts",
TimeColumnType::Unix,
30,
SourceType::Postgres,
);
assert!(q.contains("\"ts\" >= "), "got: {}", q);
assert!(
!q.contains("'"),
"unix should not have value quotes, got: {}",
q
);
}
}