use crate::error::Result;
use crate::tuning::memory::{DEFAULT_MEM_BUDGET_MB, estimate_peak_rss_mb, per_worker_rss_mb};
use super::{InitYamlDestination, TableInfo};
pub(super) fn parse_table(table: &str) -> (String, &str) {
match table.split_once('.') {
Some((s, t)) => (s.to_string(), t),
None => ("public".to_string(), table),
}
}
pub(super) fn generate_config(
info: &TableInfo,
source_url: &str,
provenance: &super::SourceProvenance,
dest: &InitYamlDestination,
mode_override: Option<&str>,
) -> Result<String> {
let st = super::source_type(source_url)?;
let qualified_table = if info.schema == "public" || st == "mysql" {
info.table.clone()
} else {
format!("{}.{}", info.schema, info.table)
};
let row_note = if info.row_estimate > 1_000_000 {
format!("~{:.1}M rows", info.row_estimate as f64 / 1_000_000.0)
} else if info.row_estimate > 1_000 {
format!("~{:.0}K rows", info.row_estimate as f64 / 1_000.0)
} else {
format!("~{} rows", info.row_estimate)
};
let header = format!("# Generated by rivet init — {qualified_table} ({row_note})");
let unbounded = table_has_unbounded_decimal_columns(info);
let mut lines = config_header_lines(st, &header, unbounded, provenance);
lines.push("exports:".to_string());
lines.extend(export_block_lines(info, st, dest, mode_override));
Ok(wrap_comments(&(lines.join("\n") + "\n")))
}
pub(super) fn generate_schema_config(
infos: &[TableInfo],
source_url: &str,
provenance: &super::SourceProvenance,
scope_label: &str,
dest: &InitYamlDestination,
mode_override: Option<&str>,
) -> Result<String> {
let st = super::source_type(source_url)?;
let header = format!("# Generated by rivet init — {scope_label}");
let unbounded = infos.iter().any(table_has_unbounded_decimal_columns);
let mut lines = config_header_lines(st, &header, unbounded, provenance);
let dest_note = if dest.gcs_bucket.is_some() || dest.s3_bucket.is_some() {
"# One export per table/view — per-table prefix `exports/<name>/` under the given bucket; review modes before running."
} else {
"# One export per table/view — review modes and destinations before running."
};
lines.push(dest_note.to_string());
lines.push("exports:".to_string());
for info in infos {
lines.extend(export_block_lines(info, st, dest, mode_override));
}
Ok(wrap_comments(&(lines.join("\n") + "\n")))
}
const COMMENT_WRAP_WIDTH: usize = 100;
fn wrap_comments(yaml: &str) -> String {
let mut out = String::with_capacity(yaml.len());
for line in yaml.split_inclusive('\n') {
let nl = line.ends_with('\n');
let content = line.strip_suffix('\n').unwrap_or(line);
if content.chars().count() <= COMMENT_WRAP_WIDTH {
out.push_str(line);
continue;
}
match comment_split(content) {
Some((prefix, comment)) => {
let indent: String = content.chars().take_while(|c| *c == ' ').collect();
out.push_str(&wrap_comment(prefix, &indent, comment));
if nl {
out.push('\n');
}
}
None => out.push_str(line),
}
}
out
}
fn comment_split(line: &str) -> Option<(&str, &str)> {
let trimmed = line.trim_start();
if let Some(rest) = trimmed.strip_prefix('#') {
let prefix = &line[..line.len() - trimmed.len()];
return Some((prefix, rest.trim_start()));
}
let (mut in_s, mut in_d) = (false, false);
let bytes = line.as_bytes();
for i in 0..bytes.len() {
match bytes[i] {
b'\'' if !in_d => in_s = !in_s,
b'"' if !in_s => in_d = !in_d,
b'#' if !in_s && !in_d && i > 0 && bytes[i - 1] == b' ' => {
return Some((&line[..i], line[i + 1..].trim_start()));
}
_ => {}
}
}
None
}
fn wrap_comment(prefix: &str, indent: &str, comment: &str) -> String {
let mut lines: Vec<String> = Vec::new();
let mut cur = format!("{prefix}# ");
let mut has_word = false;
for word in comment.split_whitespace() {
let extra = usize::from(has_word) + word.chars().count();
if has_word && cur.chars().count() + extra > COMMENT_WRAP_WIDTH {
lines.push(cur);
cur = format!("{indent}# {word}");
} else {
if has_word {
cur.push(' ');
}
cur.push_str(word);
}
has_word = true;
}
lines.push(cur);
lines.join("\n")
}
fn config_header_lines(
source_type: &str,
title_line: &str,
unbounded_decimal_note: bool,
provenance: &super::SourceProvenance,
) -> Vec<String> {
let mut lines = vec![
title_line.to_string(),
"# Review and adjust before running: rivet check -c rivet.yaml".to_string(),
];
if unbounded_decimal_note {
lines.push(
"# NOTE: some exports use default decimal(p,s) for NUMERIC without precision in the DDL — search for \"# REVIEW:\" under columns:."
.to_string(),
);
}
lines.push("".to_string());
lines.extend([
"source:".to_string(),
format!(" type: {source_type}"),
source_url_line(provenance),
"".to_string(),
]);
lines
}
fn source_url_line(provenance: &super::SourceProvenance) -> String {
match provenance {
super::SourceProvenance::Inline => {
" url_env: DATABASE_URL # export DATABASE_URL='<your-url>' before running".to_string()
}
super::SourceProvenance::Env(var) => {
format!(
" url_env: {} # reads the variable you already exported",
yaml_quote_if_needed(var)
)
}
super::SourceProvenance::File(path) => {
format!(
" url_file: {} # reads the URL from this file (keep it out of version control)",
yaml_quote_if_needed(path)
)
}
}
}
fn export_segment(info: &TableInfo, source_type: &str) -> String {
if source_type == "postgres" && info.schema != "public" {
format!("{}__{}", info.schema, info.table)
} else {
info.table.clone()
}
}
fn table_export_prefix(info: &TableInfo, source_type: &str) -> String {
format!("exports/{}/", export_segment(info, source_type))
}
pub(super) fn yaml_quote_if_needed(v: &str) -> String {
if needs_yaml_quoting(v) {
yaml_double_quote(v)
} else {
v.to_string()
}
}
fn needs_yaml_quoting(v: &str) -> bool {
if v.is_empty() {
return true;
}
if v.trim() != v {
return true;
}
let lower = v.to_ascii_lowercase();
if matches!(
lower.as_str(),
"true" | "false" | "yes" | "no" | "on" | "off" | "null" | "~"
) {
return true;
}
if v.parse::<i64>().is_ok() || v.parse::<f64>().is_ok() {
return true;
}
if let Some(first) = v.chars().next()
&& matches!(
first,
'!' | '&'
| '*'
| '@'
| '`'
| '|'
| '>'
| '%'
| '?'
| ':'
| '-'
| '['
| ']'
| '{'
| '}'
| ','
| '#'
| '\''
| '"'
)
{
return true;
}
let bytes: Vec<char> = v.chars().collect();
for (i, c) in bytes.iter().enumerate() {
if c.is_control() {
return true;
}
if matches!(c, '[' | ']' | '{' | '}' | ',' | '"' | '\'' | '\\' | '\t') {
return true;
}
if *c == '#' && i > 0 && bytes[i - 1].is_whitespace() {
return true;
}
if *c == ':' {
let next = bytes.get(i + 1);
if next.is_none() || next.is_some_and(|n| n.is_whitespace()) {
return true;
}
}
}
false
}
fn yaml_double_quote(v: &str) -> String {
let mut s = String::with_capacity(v.len() + 2);
s.push('"');
for c in v.chars() {
match c {
'\\' => s.push_str("\\\\"),
'"' => s.push_str("\\\""),
'\n' => s.push_str("\\n"),
'\r' => s.push_str("\\r"),
'\t' => s.push_str("\\t"),
c if (c as u32) < 0x20 || c == '\x7f' => {
s.push_str(&format!("\\x{:02X}", c as u32));
}
c => s.push(c),
}
}
s.push('"');
s
}
fn is_decimal_type(data_type: &str) -> bool {
let t = data_type.to_ascii_lowercase();
t == "numeric" || t == "decimal"
}
fn is_simple_pg_ident(s: &str) -> bool {
let parts: Vec<&str> = s.split('.').collect();
if parts.is_empty() || parts.len() > 2 {
return false;
}
parts.iter().all(|p| {
let mut chars = p.chars();
match chars.next() {
Some(c) if c.is_ascii_alphabetic() || c == '_' => {
chars.all(|c| c.is_ascii_alphanumeric() || c == '_')
}
_ => false,
}
})
}
pub(super) fn table_has_unbounded_decimal_columns(info: &TableInfo) -> bool {
info.columns.iter().any(|c| {
is_decimal_type(&c.data_type)
&& !matches!(
(c.numeric_precision.as_ref(), c.numeric_scale.as_ref()),
(Some(_), Some(_))
)
})
}
const INIT_UNBOUNDED_DECIMAL_DEFAULT_PRECISION: u32 = 38;
const INIT_UNBOUNDED_DECIMAL_DEFAULT_SCALE: u32 = 18;
pub(crate) const INIT_DECIMAL_REVIEW_MARKER: &str = "# REVIEW:";
fn init_default_decimal_yaml_line(col_name: &str) -> String {
format!(
" {}: decimal({},{}) # REVIEW: DDL has no numeric(p,s); edit to the real decimal(p,s) or change the column type — values outside this bound may truncate or fail export.",
col_name, INIT_UNBOUNDED_DECIMAL_DEFAULT_PRECISION, INIT_UNBOUNDED_DECIMAL_DEFAULT_SCALE,
)
}
fn export_block_lines(
info: &TableInfo,
source_type: &str,
dest: &InitYamlDestination,
mode_override: Option<&str>,
) -> Vec<String> {
let mode = mode_override.unwrap_or_else(|| info.suggest_mode());
let columns: Vec<&str> = info.columns.iter().map(|c| c.name.as_str()).collect();
let col_list = columns.join(", ");
let qualified_table = if info.schema == "public" || source_type == "mysql" {
info.table.clone()
} else {
format!("{}.{}", info.schema, info.table)
};
if mode == "cdc" {
return cdc_export_lines(info, source_type, dest, &qualified_table);
}
let mut lines = vec![format!(" - name: {}", yaml_quote_if_needed(&info.table))];
if mode == "full" && source_type == "postgres" && is_simple_pg_ident(&qualified_table) {
lines.push(format!(" table: {qualified_table}"));
} else {
lines.push(" query: >".to_string());
lines.push(format!(" SELECT {col_list}"));
lines.push(format!(" FROM {qualified_table}"));
}
lines.push(format!(" # {}", info.mode_rationale(mode)));
lines.push(format!(" mode: {mode}"));
match mode {
"chunked" => {
let chunk_col = info.best_chunk_column().unwrap_or("id");
let parallel = suggest_parallel(info.row_estimate, info.avg_row_bytes(), source_type);
let chunk_size = info.suggest_chunk_size();
lines.push(format!(
" chunk_column: {}",
yaml_quote_if_needed(chunk_col)
));
lines.push(format!(" chunk_size: {chunk_size}"));
lines.push(
" chunk_checkpoint: true # record per-chunk progress so `rivet run --resume` can finish a crashed run"
.to_string(),
);
if parallel.workers > 1 {
if let Some(b) = info.avg_row_bytes() {
lines.push(format!(
" # est. peak RSS ≈ {} MB ({} workers × ~{} MB/worker @ ~{} B/row); lower `parallel` to spend less memory",
estimate_peak_rss_mb(parallel.workers, b),
parallel.workers,
per_worker_rss_mb(b),
b,
));
}
lines.push(format!(" parallel: {}", parallel.workers));
} else if parallel.wide_mysql_single {
lines.push(
" # parallel: 1 (wide rows on MySQL: single scan is faster than chunks)"
.to_string(),
);
}
}
"incremental" => {
let cursor = info.best_cursor_column().unwrap_or("updated_at");
lines.push(format!(
" cursor_column: {}",
yaml_quote_if_needed(cursor)
));
}
_ => {}
}
lines.push(" format: parquet".to_string());
if mode == "chunked" || info.row_estimate > 100_000 {
lines.push(" parquet:".to_string());
lines.push(" row_group_strategy: auto".to_string());
lines.push(format!(
" target_row_group_mb: {}",
suggest_row_group_mb(info)
));
}
lines.push(
" meta_columns: # adds 2 columns to the output (drop this block to omit them)"
.to_string(),
);
lines.push(
" exported_at: true # _rivet_exported_at: when the row was exported".to_string(),
);
lines.push(
" row_hash: true # _rivet_row_hash: per-row hash for change detection".to_string(),
);
lines.extend(destination_scaffold(info, source_type, dest));
lines.extend(decimal_override_lines(info));
lines
}
fn cdc_export_lines(
info: &TableInfo,
source_type: &str,
dest: &InitYamlDestination,
qualified_table: &str,
) -> Vec<String> {
let mut lines = vec![
format!(" - name: {}", yaml_quote_if_needed(&info.table)),
" # change data capture — reads the transaction log (grants: docs/reference/cdc.md)"
.to_string(),
format!(" table: {qualified_table}"),
" mode: cdc".to_string(),
" format: parquet".to_string(),
" cdc:".to_string(),
format!(
" checkpoint: ./cdc/{}.ckpt # resume position; omit to tail from now",
cdc_ident(&info.table)
),
" until_current: true # drain to the current log end and exit (good for a scheduler); omit to stream"
.to_string(),
];
match source_type {
"mysql" => lines.push(
" server_id: 4271 # unique replica id; source needs binlog_format=ROW + a REPLICATION SLAVE grant"
.to_string(),
),
"postgres" => lines.push(format!(
" slot: rivet_{} # logical slot (auto-created); source needs wal_level=logical + a REPLICATION role",
cdc_ident(&info.table)
)),
"mssql" => lines.push(format!(
" capture_instance: {}_{} # sp_cdc_enable_table instance; needs CDC enabled + SQL Server Agent",
info.schema, info.table
)),
_ => {}
}
lines.extend(destination_scaffold(info, source_type, dest));
lines.extend(decimal_override_lines(info));
lines
}
fn decimal_override_lines(info: &TableInfo) -> Vec<String> {
let decimal_cols: Vec<&super::ColumnInfo> = info
.columns
.iter()
.filter(|c| is_decimal_type(&c.data_type))
.collect();
let mut lines = Vec::new();
if !decimal_cols.is_empty() {
lines.push(" columns:".to_string());
for col in decimal_cols {
match (col.numeric_precision, col.numeric_scale) {
(Some(p), Some(s)) => {
lines.push(format!(" {}: decimal({p},{s})", col.name));
}
_ => {
lines.push(init_default_decimal_yaml_line(&col.name));
}
}
}
}
lines
}
fn cdc_ident(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect()
}
fn destination_scaffold(
info: &TableInfo,
source_type: &str,
dest: &InitYamlDestination,
) -> Vec<String> {
let prefix = yaml_quote_if_needed(&table_export_prefix(info, source_type));
if let Some(bucket) = &dest.gcs_bucket {
let bucket = yaml_quote_if_needed(bucket);
let mut v = vec![
" destination:".to_string(),
" type: gcs".to_string(),
format!(" bucket: {bucket}"),
format!(" prefix: {prefix}"),
];
if let Some(p) = &dest.gcs_credentials_file {
v.push(format!(
" credentials_file: {}",
yaml_quote_if_needed(p)
));
}
v
} else if let Some(bucket) = &dest.s3_bucket {
let bucket = yaml_quote_if_needed(bucket);
let mut v = vec![
" destination:".to_string(),
" type: s3".to_string(),
format!(" bucket: {bucket}"),
format!(" prefix: {prefix}"),
];
if let Some(r) = &dest.s3_region {
v.push(format!(" region: {}", yaml_quote_if_needed(r)));
}
v
} else {
let path =
yaml_quote_if_needed(&format!("./output/{}/", export_segment(info, source_type)));
vec![
" destination:".to_string(),
" type: local".to_string(),
" # rivet also writes manifest.json + _SUCCESS here (its metadata)".to_string(),
format!(" path: {path}"),
]
}
}
fn suggest_row_group_mb(info: &TableInfo) -> u64 {
let wide_cols = info.columns.iter().filter(|c| {
matches!(
c.data_type.to_ascii_lowercase().as_str(),
"text" | "varchar" | "character varying" | "jsonb" | "json" | "bytea"
)
});
if wide_cols.count() >= 5 { 64 } else { 128 }
}
fn suggest_parallel(rows: i64, avg_row_bytes: Option<i64>, source_type: &str) -> ParallelChoice {
const WIDE_BYTES: i64 = 1024;
if source_type == "mysql" && avg_row_bytes.is_some_and(|b| b >= WIDE_BYTES) {
return ParallelChoice {
workers: 1,
wide_mysql_single: true,
};
}
let by_rows = match rows {
r if r < 500_000 => 1,
r if r < 5_000_000 => 2,
_ => 4,
};
let workers = match avg_row_bytes {
Some(b) => memory_capped_parallel(by_rows, b, DEFAULT_MEM_BUDGET_MB),
None => by_rows,
};
ParallelChoice {
workers,
wide_mysql_single: false,
}
}
struct ParallelChoice {
workers: usize,
wide_mysql_single: bool,
}
fn memory_capped_parallel(suggested: usize, avg_row_bytes: i64, budget_mb: u64) -> usize {
const PROCESS_BASE_MB: u64 = 16;
let per_worker = per_worker_rss_mb(avg_row_bytes).max(1);
let max_by_mem = (budget_mb.saturating_sub(PROCESS_BASE_MB) / per_worker).max(1) as usize;
suggested.min(max_by_mem)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn wrap_long_standalone_comment_by_words() {
let yaml = "exports:\n # auto: this is a very long generated comment that goes well beyond one hundred columns so it has to wrap onto several lines\n mode: full\n";
let out = wrap_comments(yaml);
for l in out.lines() {
assert!(l.chars().count() <= COMMENT_WRAP_WIDTH, "over width: {l:?}");
}
assert!(
out.starts_with("exports:\n # auto: "),
"first comment line kept"
);
assert!(
out.contains("\n # "),
"continuation comment at the same indent"
);
assert!(out.contains(" mode: full\n"), "value line untouched");
}
#[test]
fn wrap_leaves_quoted_value_with_hash_untouched() {
let long = "x".repeat(140);
let yaml = format!(" query: \"SELECT '{long}' -- not # a real comment\"\n");
assert_eq!(
wrap_comments(&yaml),
yaml,
"a quoted scalar containing # must stay byte-identical"
);
}
#[test]
fn wrap_long_inline_comment_continues_below() {
let yaml = " chunk_checkpoint: true # record per-chunk progress so the resume command can finish a crashed run without redoing work\n";
let out = wrap_comments(yaml);
for l in out.lines() {
assert!(l.chars().count() <= COMMENT_WRAP_WIDTH, "over width: {l:?}");
}
assert!(
out.starts_with(" chunk_checkpoint: true # "),
"value + first comment chunk stay inline"
);
assert!(
out.contains("\n # "),
"continuation comment at line indent"
);
}
#[test]
fn wrap_leaves_short_comment_untouched() {
let yaml = " mode: full # full table scan\n";
assert_eq!(wrap_comments(yaml), yaml);
}
use crate::init::{ColumnInfo, TableInfo};
#[test]
fn suggest_parallel_is_cost_and_engine_aware() {
let big = 10_000_000;
let wide = Some(4096);
let narrow = Some(40);
assert_eq!(suggest_parallel(big, wide, "mysql").workers, 1);
assert_eq!(suggest_parallel(500_000, wide, "mysql").workers, 1);
assert_eq!(suggest_parallel(big, narrow, "mysql").workers, 4);
assert_eq!(suggest_parallel(big, narrow, "postgres").workers, 4);
assert_eq!(suggest_parallel(big, wide, "postgres").workers, 4);
assert_eq!(suggest_parallel(big, wide, "mssql").workers, 4);
assert_eq!(suggest_parallel(big, None, "mysql").workers, 4);
assert_eq!(suggest_parallel(100_000, None, "postgres").workers, 1);
}
#[test]
fn peak_rss_estimate_matches_sweep() {
let wide = 4096;
let narrow = 40;
let near = |est: u64, measured: u64| {
let d = est.abs_diff(measured);
assert!(
d * 100 / measured <= 12,
"estimate {est} too far from measured {measured}"
);
};
near(estimate_peak_rss_mb(4, wide), 444);
near(estimate_peak_rss_mb(1, narrow), 34);
near(estimate_peak_rss_mb(4, narrow), 92);
near(estimate_peak_rss_mb(8, narrow), 169);
assert_eq!(per_worker_rss_mb(40), 18);
assert!(per_worker_rss_mb(65_536) <= 130, "must clamp to ceiling");
}
#[test]
fn memory_cap_binds_only_under_budget() {
assert_eq!(memory_capped_parallel(4, 4096, 256), 2);
assert_eq!(memory_capped_parallel(4, 4096, 2048), 4);
assert_eq!(memory_capped_parallel(4, 65_536, 1), 1);
}
fn make_table(cols: Vec<ColumnInfo>) -> TableInfo {
TableInfo {
schema: "public".into(),
table: "payments".into(),
row_estimate: 100,
total_bytes: None,
columns: cols,
}
}
fn col(name: &str, ty: &str) -> ColumnInfo {
ColumnInfo {
name: name.into(),
data_type: ty.into(),
is_primary_key: false,
is_nullable: true,
numeric_precision: None,
numeric_scale: None,
}
}
fn decimal_col(name: &str, p: u32, s: u32) -> ColumnInfo {
ColumnInfo {
numeric_precision: Some(p),
numeric_scale: Some(s),
..col(name, "numeric")
}
}
fn unbounded_col(name: &str) -> ColumnInfo {
col(name, "numeric")
}
#[test]
fn cdc_mode_scaffolds_cdc_block_not_batch() {
let info = make_table(vec![col("id", "bigint"), decimal_col("amount", 18, 2)]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"mysql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
Some("cdc"),
)
.unwrap();
assert!(yaml.contains("mode: cdc"), "got:\n{yaml}");
assert!(yaml.contains(" cdc:"), "cdc block missing:\n{yaml}");
assert!(yaml.contains("checkpoint:"), "got:\n{yaml}");
assert!(yaml.contains("server_id:"), "got:\n{yaml}");
assert!(
!yaml.contains("meta_columns"),
"cdc must not emit meta_columns:\n{yaml}"
);
assert!(
!yaml.contains("cursor_column"),
"cdc must not emit cursor_column:\n{yaml}"
);
assert!(yaml.contains("amount: decimal(18,2)"), "got:\n{yaml}");
}
#[test]
fn decimal_with_precision_emits_override() {
let info = make_table(vec![col("id", "bigint"), decimal_col("amount", 18, 2)]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(yaml.contains(" columns:"), "columns block missing");
assert!(
yaml.contains(" amount: decimal(18,2)"),
"decimal override missing:\n{yaml}"
);
assert!(
!yaml.contains("id:"),
"non-decimal column must not appear in columns block"
);
}
#[test]
fn unbounded_decimal_emits_default_with_review_marker() {
let info = make_table(vec![col("id", "bigint"), unbounded_col("price")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains("# NOTE: some exports use default decimal"),
"header NOTE missing:\n{yaml}"
);
assert!(
yaml.contains(" price: decimal(38,18) # REVIEW:"),
"default decimal with REVIEW missing:\n{yaml}"
);
}
#[test]
fn no_decimal_columns_no_columns_block() {
let info = make_table(vec![col("id", "bigint"), col("label", "text")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
!yaml.contains(" columns:"),
"columns block must not appear:\n{yaml}"
);
}
fn chunked_table(rows: i64, cols: Vec<ColumnInfo>) -> TableInfo {
TableInfo {
schema: "public".into(),
table: "events".into(),
row_estimate: rows,
total_bytes: None,
columns: cols,
}
}
#[test]
fn chunked_large_table_emits_parquet_block() {
let info = chunked_table(
2_000_000,
vec![
col("id", "bigint"),
col("name", "text"),
col("ts", "timestamptz"),
],
);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" parquet:"),
"parquet block must be emitted for chunked large table:\n{yaml}"
);
assert!(
yaml.contains(" row_group_strategy: auto"),
"auto strategy must be present:\n{yaml}"
);
assert!(
yaml.contains(" target_row_group_mb: 128"),
"128 MB target expected for narrow table:\n{yaml}"
);
}
#[test]
fn wide_table_suggests_smaller_row_group_mb() {
let info = chunked_table(
5_000_000,
vec![
col("id", "bigint"),
col("body", "text"),
col("raw_html", "text"),
col("metadata", "jsonb"),
col("extra", "json"),
col("notes", "text"),
],
);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" target_row_group_mb: 64"),
"wide table (≥5 text/json cols) must suggest 64 MB:\n{yaml}"
);
}
#[test]
fn small_full_mode_table_has_no_parquet_block() {
let info = make_table(vec![col("id", "bigint"), col("label", "text")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
!yaml.contains(" parquet:"),
"small full-mode table must not emit parquet block:\n{yaml}"
);
}
#[test]
fn full_mode_pg_emits_table_shortcut_not_select_query() {
let info = make_table(vec![col("id", "bigint"), col("name", "text")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" table: payments"),
"full-mode PG export should emit `table:` shortcut:\n{yaml}"
);
assert!(
!yaml.contains(" query: >"),
"explicit SELECT block should be replaced by `table:`:\n{yaml}"
);
assert!(
!yaml.contains("SELECT id"),
"no enumerated SELECT for the table-shortcut form:\n{yaml}"
);
}
#[test]
fn full_mode_pg_non_public_schema_emits_qualified_table() {
let info = TableInfo {
schema: "billing".into(),
table: "invoices".into(),
row_estimate: 100,
total_bytes: None,
columns: vec![col("id", "bigint")],
};
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" table: billing.invoices"),
"qualified table name expected for non-public schema:\n{yaml}"
);
}
#[test]
fn full_mode_mysql_keeps_select_query_form() {
let info = make_table(vec![col("id", "bigint"), col("name", "text")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"mysql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" query: >"),
"MySQL full-mode should keep the explicit SELECT form:\n{yaml}"
);
assert!(
!yaml.contains(" table: "),
"no `table:` shortcut for MySQL:\n{yaml}"
);
}
#[test]
fn chunked_mode_keeps_select_query_form() {
let info = chunked_table(2_000_000, vec![col("id", "bigint"), col("name", "text")]);
let dest = InitYamlDestination::default();
let yaml = generate_config(
&info,
"postgresql://rivet:rivet@localhost/rivet",
&crate::init::SourceProvenance::Inline,
&dest,
None,
)
.unwrap();
assert!(
yaml.contains(" query: >"),
"chunked mode should preserve explicit SELECT:\n{yaml}"
);
assert!(
!yaml.contains(" table: "),
"no `table:` shortcut for chunked mode:\n{yaml}"
);
}
}