Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13    recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27    Efficient,
28    Acceptable,
29    Degraded,
30    Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Efficient => write!(f, "EFFICIENT"),
37            Self::Acceptable => write!(f, "ACCEPTABLE"),
38            Self::Degraded => write!(f, "DEGRADED"),
39            Self::Unsafe => write!(f, "UNSAFE"),
40        }
41    }
42}
43
44pub(crate) struct ExportDiagnostic {
45    pub export_name: String,
46    pub strategy: String,
47    pub mode: String,
48    pub cursor_column: Option<String>,
49    pub row_estimate: Option<i64>,
50    pub cursor_min: Option<String>,
51    pub cursor_max: Option<String>,
52    pub scan_type: Option<String>,
53    pub uses_index: bool,
54    pub verdict: HealthVerdict,
55    pub recommended_profile: &'static str,
56    pub recommended_parallel: (u32, &'static str),
57    pub warnings: Vec<String>,
58    pub suggestion: Option<String>,
59}
60
61/// Return the diagnostic for a single export without printing anything.
62///
63/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
64pub(crate) fn get_export_diagnostic(
65    config: &Config,
66    export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68    let url = config.source.resolve_url()?;
69    let tls = config.source.tls.as_ref();
70    crate::source::warn_if_tls_disabled(&config.source);
71    match config.source.source_type {
72        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74    }
75}
76
77pub fn check(
78    config_path: &str,
79    export_name: Option<&str>,
80    params: Option<&std::collections::HashMap<String, String>>,
81    show_type_report: bool,
82    strict: bool,
83    json_output: bool,
84    target: Option<ExportTarget>,
85) -> Result<()> {
86    let config = Config::load_with_params(config_path, params)?;
87
88    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
89        let e = config
90            .exports
91            .iter()
92            .find(|e| e.name == name)
93            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
94        vec![e]
95    } else {
96        config.exports.iter().collect()
97    };
98
99    let url = config.source.resolve_url()?;
100    let tls = config.source.tls.as_ref();
101    // Surface the plaintext-transport warning at preflight time too —
102    // operators should hear it from `rivet check` before they wait
103    // through a full `rivet run` to learn the same thing. `Once` inside
104    // the helper keeps emission to one line per process even when both
105    // `check` and `run` flow through it.
106    crate::source::warn_if_tls_disabled(&config.source);
107    match config.source.source_type {
108        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
109        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
110    }
111
112    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
113    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
114    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
115    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
116    // and has side effects) — but we *do* call `create_destination`, which
117    // resolves env vars / credentials_file existence at construction time.
118    // Each unique destination is probed once per `check` to keep multi-export
119    // configs cheap.
120    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
121    for export in &exports {
122        let dest_key = format!(
123            "{:?}:{}:{}:{}",
124            export.destination.destination_type,
125            export.destination.bucket.as_deref().unwrap_or("-"),
126            export.destination.endpoint.as_deref().unwrap_or("-"),
127            export.destination.path.as_deref().unwrap_or("-"),
128        );
129        if !seen_destinations.insert(dest_key) {
130            continue;
131        }
132        let expanded = crate::plan::build::expand_destination_templates(
133            export.destination.clone(),
134            &export.name,
135        );
136        crate::destination::create_destination(&expanded).map_err(|e| {
137            anyhow::anyhow!(
138                "export '{}': destination preflight failed: {:#}",
139                export.name,
140                e
141            )
142        })?;
143    }
144
145    if show_type_report {
146        let policy = if strict {
147            TypePolicy::strict()
148        } else {
149            TypePolicy::warn_only()
150        };
151
152        let mut any_fatal = false;
153        for export in &exports {
154            let column_overrides =
155                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
156            // CLI `--target` wins; otherwise fall back to the per-export
157            // `target:` from the config (slice #2a). A declared-but-unknown
158            // target is a loud error — never silently ignored.
159            if let Some(t) = export.target.as_deref()
160                && crate::types::target::ExportTarget::parse(t).is_none()
161            {
162                anyhow::bail!(
163                    "export '{}': unknown target '{t}' (expected: bigquery, duckdb)",
164                    export.name
165                );
166            }
167            let eff_target = target.or_else(|| {
168                export
169                    .target
170                    .as_deref()
171                    .and_then(crate::types::target::ExportTarget::parse)
172            });
173            let config_dir = std::path::Path::new(config_path)
174                .parent()
175                .unwrap_or_else(|| std::path::Path::new("."));
176            match type_report::collect_report(
177                &config,
178                export,
179                &column_overrides,
180                &policy,
181                eff_target,
182                config_dir,
183                params,
184            ) {
185                Ok(report) => {
186                    if report.has_fatal() {
187                        any_fatal = true;
188                    }
189                    if eff_target.is_some() && report.has_target_fail() {
190                        any_fatal = true;
191                    }
192                    if json_output {
193                        type_report::print_json(&report)?;
194                    } else {
195                        type_report::print_table(&report, eff_target);
196                    }
197                }
198                Err(e) => {
199                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
200                }
201            }
202        }
203
204        if strict && any_fatal {
205            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
206        }
207    }
208
209    Ok(())
210}
211
212fn print_diagnostic(diag: &ExportDiagnostic) {
213    println!();
214    println!("Export: {}", diag.export_name);
215    println!("  Strategy:     {}", diag.strategy);
216    println!("  Mode:         {}", diag.mode);
217    if let Some(est) = diag.row_estimate {
218        if est >= 1_000_000 {
219            println!("  Row estimate: ~{}M", est / 1_000_000);
220        } else if est >= 1_000 {
221            println!("  Row estimate: ~{}K", est / 1_000);
222        } else {
223            println!("  Row estimate: ~{}", est);
224        }
225    }
226    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
227        println!("  Cursor range: {} .. {}", min_v, max_v);
228    }
229    if let Some(col) = &diag.cursor_column {
230        println!("  Cursor col:   {}", col);
231    }
232    if let Some(scan) = &diag.scan_type {
233        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
234        println!("  Scan type:    {}{}", scan, idx_label);
235    }
236    println!("  Verdict:      {}", diag.verdict);
237    println!(
238        "  Recommended:  tuning.profile: {}",
239        diag.recommended_profile
240    );
241    let (par_level, par_reason) = diag.recommended_parallel;
242    if par_level > 1 {
243        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
244    } else {
245        println!("  Parallelism:  {} ({})", par_level, par_reason);
246    }
247    for w in &diag.warnings {
248        println!("  Warning:      {}", w);
249    }
250    if let Some(suggestion) = &diag.suggestion {
251        println!("  Suggestion:   {}", suggestion);
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::config::{
259        CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
260        IncrementalCursorMode, MetaColumns, TimeColumnType,
261    };
262    use doctor::{
263        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
264    };
265
266    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
267        ExportConfig {
268            name: name.to_string(),
269            target: None,
270            verify: crate::config::VerifyMode::Size,
271            query: Some("SELECT * FROM t".to_string()),
272            query_file: None,
273            table: None,
274            mode,
275            cursor_column: cursor.map(|s| s.to_string()),
276            cursor_fallback_column: None,
277            incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
278            chunk_column: None,
279            chunk_size: 100_000,
280            chunk_size_memory_mb: None,
281            chunk_count: None,
282            parallel: 1,
283            time_column: None,
284            time_column_type: TimeColumnType::Timestamp,
285            days_window: None,
286            format: FormatType::Csv,
287            compression: CompressionType::default(),
288            compression_level: None,
289            compression_profile: None,
290            skip_empty: false,
291            destination: DestinationConfig {
292                destination_type: DestinationType::Local,
293                path: Some("./out".to_string()),
294                ..Default::default()
295            },
296            meta_columns: MetaColumns::default(),
297            quality: None,
298            max_file_size: None,
299            chunk_checkpoint: false,
300            chunk_max_attempts: None,
301            tuning: None,
302            chunk_dense: false,
303            chunk_by_days: None,
304            chunk_by_key: None,
305            source_group: None,
306            reconcile_required: false,
307            columns: Default::default(),
308            on_schema_drift: Default::default(),
309            shape_drift_warn_factor: None,
310            parquet: None,
311        }
312    }
313
314    #[test]
315    fn verdict_small_indexed_with_cursor_is_efficient() {
316        let v = compute_verdict(Some(500_000), true, true);
317        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
318    }
319
320    #[test]
321    fn verdict_large_indexed_with_cursor_is_acceptable() {
322        let v = compute_verdict(Some(20_000_000), true, true);
323        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
324    }
325
326    #[test]
327    fn verdict_no_index_no_cursor_is_degraded() {
328        let v = compute_verdict(Some(500_000), false, false);
329        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
330    }
331
332    #[test]
333    fn verdict_huge_no_index_is_unsafe() {
334        let v = compute_verdict(Some(100_000_000), false, false);
335        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
336    }
337
338    #[test]
339    fn parse_pg_row_estimate_from_sort_plan() {
340        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
341        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
342    }
343
344    #[test]
345    fn parse_pg_row_estimate_from_index_scan() {
346        let plan =
347            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
348        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
349    }
350
351    #[test]
352    fn extract_scan_type_detects_seq_scan() {
353        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
354        let st = extract_scan_type(plan);
355        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
356    }
357
358    #[test]
359    fn extract_scan_type_detects_index_scan() {
360        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
361        let st = extract_scan_type(plan);
362        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
363    }
364
365    #[test]
366    fn suggestion_for_efficient_verdict_is_none() {
367        let e = make_export("t", ExportMode::Full, None);
368        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
369        assert!(
370            s.is_none(),
371            "efficient verdict should produce no suggestion"
372        );
373    }
374
375    #[test]
376    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
377        let e = make_export("t", ExportMode::Full, None);
378        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
379        let msg = s.expect("degraded verdict should produce a suggestion");
380        assert!(
381            msg.contains("safe"),
382            "suggestion should recommend safe profile, got: {msg}"
383        );
384    }
385
386    fn src_err(msg: &str) -> &'static str {
387        categorize_source_error(&anyhow::anyhow!("{}", msg))
388    }
389
390    #[test]
391    fn source_password_rejected_is_auth_error() {
392        assert_eq!(
393            src_err("password authentication failed for user \"rivet\""),
394            "auth error"
395        );
396    }
397
398    #[test]
399    fn source_authentication_failed_is_auth_error() {
400        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
401    }
402
403    #[test]
404    fn source_access_denied_is_auth_error() {
405        assert_eq!(
406            src_err("Access denied for user 'rivet'@'localhost'"),
407            "auth error"
408        );
409    }
410
411    #[test]
412    fn source_connection_refused_is_connectivity() {
413        assert_eq!(
414            src_err("connection refused (os error 61)"),
415            "connectivity error"
416        );
417    }
418
419    #[test]
420    fn source_timed_out_is_connectivity() {
421        assert_eq!(src_err("connection timed out"), "connectivity error");
422    }
423
424    #[test]
425    fn source_dns_translate_host_is_connectivity() {
426        assert_eq!(
427            src_err("could not translate host name \"db.bad\" to address"),
428            "connectivity error"
429        );
430    }
431
432    #[test]
433    fn source_name_not_known_is_connectivity() {
434        assert_eq!(src_err("Name or service not known"), "connectivity error");
435    }
436
437    #[test]
438    fn source_unknown_error_is_generic() {
439        assert_eq!(src_err("something totally unexpected"), "error");
440    }
441
442    fn dest_config(dtype: DestinationType) -> DestinationConfig {
443        DestinationConfig {
444            destination_type: dtype,
445            bucket: Some("b".to_string()),
446            ..Default::default()
447        }
448    }
449
450    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
451        let cfg = dest_config(dtype);
452        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
453    }
454
455    #[test]
456    fn dest_credential_loading_is_auth_error() {
457        assert_eq!(
458            dest_err(
459                "loading credential to sign http request",
460                DestinationType::Gcs
461            ),
462            "auth error"
463        );
464    }
465
466    #[test]
467    fn dest_permission_denied_is_auth_error() {
468        assert_eq!(
469            dest_err("permission denied on resource bucket", DestinationType::S3),
470            "auth error"
471        );
472    }
473
474    #[test]
475    fn dest_forbidden_is_auth_error() {
476        assert_eq!(
477            dest_err("403 Forbidden", DestinationType::Gcs),
478            "auth error"
479        );
480    }
481
482    #[test]
483    fn dest_unauthorized_is_auth_error() {
484        assert_eq!(
485            dest_err("401 Unauthorized", DestinationType::S3),
486            "auth error"
487        );
488    }
489
490    #[test]
491    fn dest_invalid_grant_is_auth_error() {
492        assert_eq!(
493            dest_err(
494                "invalid_grant: token has been revoked",
495                DestinationType::Gcs
496            ),
497            "auth error"
498        );
499    }
500
501    #[test]
502    fn dest_nosuchbucket_s3_is_bucket_not_found() {
503        assert_eq!(
504            dest_err(
505                "NoSuchBucket: the specified bucket does not exist",
506                DestinationType::S3
507            ),
508            "bucket not found"
509        );
510    }
511
512    #[test]
513    fn dest_not_found_gcs_is_bucket_not_found() {
514        assert_eq!(
515            dest_err("bucket not found (404)", DestinationType::Gcs),
516            "bucket not found"
517        );
518    }
519
520    #[test]
521    fn dest_not_found_local_is_path_not_found() {
522        assert_eq!(
523            dest_err("path not found: /tmp/missing", DestinationType::Local),
524            "path not found"
525        );
526    }
527
528    #[test]
529    fn dest_connection_refused_is_connectivity() {
530        assert_eq!(
531            dest_err("connection refused to endpoint", DestinationType::S3),
532            "connectivity error"
533        );
534    }
535
536    #[test]
537    fn dest_dns_error_is_connectivity() {
538        assert_eq!(
539            dest_err("dns error: failed to lookup address", DestinationType::S3),
540            "connectivity error"
541        );
542    }
543
544    #[test]
545    fn dest_timed_out_is_connectivity() {
546        assert_eq!(
547            dest_err("request timed out after 30s", DestinationType::Gcs),
548            "connectivity error"
549        );
550    }
551
552    #[test]
553    fn dest_unknown_error_is_generic() {
554        assert_eq!(
555            dest_err("something else entirely", DestinationType::S3),
556            "error"
557        );
558    }
559
560    #[test]
561    fn strategy_full_scan() {
562        let e = make_export("t", ExportMode::Full, None);
563        assert_eq!(derive_strategy(&e), "full-scan");
564    }
565
566    #[test]
567    fn strategy_full_parallel() {
568        let mut e = make_export("t", ExportMode::Full, None);
569        e.parallel = 4;
570        assert_eq!(derive_strategy(&e), "full-parallel(4)");
571    }
572
573    #[test]
574    fn strategy_incremental() {
575        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
576        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
577    }
578
579    #[test]
580    fn strategy_chunked() {
581        let mut e = make_export("t", ExportMode::Chunked, None);
582        e.chunk_column = Some("id".to_string());
583        e.chunk_size = 50_000;
584        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
585    }
586
587    #[test]
588    fn strategy_chunked_parallel() {
589        let mut e = make_export("t", ExportMode::Chunked, None);
590        e.chunk_column = Some("id".to_string());
591        e.chunk_size = 50_000;
592        e.parallel = 3;
593        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
594    }
595
596    #[test]
597    fn strategy_time_window() {
598        let mut e = make_export("t", ExportMode::TimeWindow, None);
599        e.time_column = Some("created_at".to_string());
600        e.days_window = Some(7);
601        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
602    }
603
604    #[test]
605    fn profile_small_indexed_is_fast() {
606        let e = make_export("t", ExportMode::Full, None);
607        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
608    }
609
610    #[test]
611    fn profile_medium_indexed_is_balanced() {
612        let e = make_export("t", ExportMode::Full, None);
613        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
614    }
615
616    #[test]
617    fn profile_large_indexed_is_safe() {
618        let e = make_export("t", ExportMode::Full, None);
619        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
620    }
621
622    #[test]
623    fn profile_small_no_index_is_balanced() {
624        let e = make_export("t", ExportMode::Full, None);
625        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
626    }
627
628    #[test]
629    fn profile_small_no_index_parallel_is_safe() {
630        let mut e = make_export("t", ExportMode::Full, None);
631        e.parallel = 4;
632        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
633    }
634
635    #[test]
636    fn profile_medium_no_index_is_balanced() {
637        let e = make_export("t", ExportMode::Full, None);
638        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
639    }
640
641    #[test]
642    fn profile_large_no_index_is_safe() {
643        let e = make_export("t", ExportMode::Full, None);
644        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
645    }
646
647    #[test]
648    fn sparse_range_warning_when_very_sparse() {
649        let mut e = make_export("t", ExportMode::Chunked, None);
650        e.chunk_column = Some("id".to_string());
651        e.chunk_size = 100_000;
652        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
653        assert!(w.is_some(), "should warn about sparse range");
654        let msg = w.unwrap();
655        assert!(msg.contains("Sparse key range"), "got: {msg}");
656        assert!(msg.contains("empty"), "got: {msg}");
657    }
658
659    #[test]
660    fn sparse_range_no_warning_when_dense() {
661        let mut e = make_export("t", ExportMode::Chunked, None);
662        e.chunk_column = Some("id".to_string());
663        e.chunk_size = 100_000;
664        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
665        assert!(w.is_none(), "should not warn for dense range");
666    }
667
668    #[test]
669    fn sparse_range_skipped_when_chunk_dense() {
670        let mut e = make_export("t", ExportMode::Chunked, None);
671        e.chunk_column = Some("id".to_string());
672        e.chunk_dense = true;
673        e.chunk_size = 100_000;
674        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
675        assert!(
676            w.is_none(),
677            "chunk_dense uses ordinals, not physical id span"
678        );
679    }
680
681    #[test]
682    fn dense_surrogate_warning_when_chunk_dense_builtin() {
683        let mut e = make_export("t", ExportMode::Chunked, None);
684        e.chunk_column = Some("id".to_string());
685        e.chunk_dense = true;
686        e.query = Some("SELECT id FROM orders".to_string());
687        let w = check_dense_surrogate_cost(&e);
688        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
689        assert!(w.unwrap().contains("global sort"));
690    }
691
692    #[test]
693    fn sparse_range_not_triggered_for_non_chunked() {
694        let e = make_export("t", ExportMode::Full, None);
695        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
696        assert!(w.is_none(), "should not warn for non-chunked mode");
697    }
698
699    #[test]
700    fn dense_surrogate_warning_with_row_number() {
701        let mut e = make_export("t", ExportMode::Chunked, None);
702        e.chunk_column = Some("rn".to_string());
703        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
704        let w = check_dense_surrogate_cost(&e);
705        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
706        assert!(w.unwrap().contains("global sort"));
707    }
708
709    #[test]
710    fn no_dense_surrogate_warning_without_row_number() {
711        let mut e = make_export("t", ExportMode::Chunked, None);
712        e.chunk_column = Some("id".to_string());
713        e.query = Some("SELECT * FROM orders".to_string());
714        let w = check_dense_surrogate_cost(&e);
715        assert!(w.is_none());
716    }
717
718    #[test]
719    fn no_dense_surrogate_warning_for_non_chunked() {
720        let mut e = make_export("t", ExportMode::Full, None);
721        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
722        let w = check_dense_surrogate_cost(&e);
723        assert!(w.is_none(), "should not warn for non-chunked mode");
724    }
725
726    #[test]
727    fn parallel_memory_warning_large_dataset() {
728        let mut e = make_export("t", ExportMode::Chunked, None);
729        e.parallel = 4;
730        let w = check_parallel_memory_risk(&e, Some(10_000_000));
731        assert!(w.is_some(), "should warn about memory risk");
732        let msg = w.unwrap();
733        assert!(msg.contains("Parallel=4"), "got: {msg}");
734        assert!(msg.contains("memory"), "got: {msg}");
735    }
736
737    #[test]
738    fn no_parallel_memory_warning_small_dataset() {
739        let mut e = make_export("t", ExportMode::Chunked, None);
740        e.parallel = 4;
741        let w = check_parallel_memory_risk(&e, Some(1_000));
742        assert!(w.is_none(), "should not warn for small dataset");
743    }
744
745    #[test]
746    fn no_parallel_memory_warning_single_worker() {
747        let e = make_export("t", ExportMode::Full, None);
748        let w = check_parallel_memory_risk(&e, Some(100_000_000));
749        assert!(w.is_none(), "should not warn when parallel=1");
750    }
751
752    #[test]
753    fn suggestion_degraded_full_recommends_incremental() {
754        let e = make_export("t", ExportMode::Full, None);
755        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
756        assert!(s.contains("incremental"), "got: {s}");
757    }
758
759    #[test]
760    fn suggestion_degraded_chunked_recommends_index() {
761        let mut e = make_export("t", ExportMode::Chunked, None);
762        e.chunk_column = Some("id".to_string());
763        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
764        assert!(s.contains("index on 'id'"), "got: {s}");
765    }
766
767    #[test]
768    fn suggestion_degraded_time_window_recommends_index() {
769        let mut e = make_export("t", ExportMode::TimeWindow, None);
770        e.time_column = Some("created_at".to_string());
771        e.days_window = Some(7);
772        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
773        assert!(s.contains("index on 'created_at'"), "got: {s}");
774    }
775
776    #[test]
777    fn suggestion_unsafe_full_recommends_incremental() {
778        let e = make_export("t", ExportMode::Full, None);
779        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
780        assert!(s.contains("incremental"), "got: {s}");
781    }
782
783    #[test]
784    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
785        let mut e = make_export("t", ExportMode::Chunked, None);
786        e.chunk_column = Some("id".to_string());
787        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
788        assert!(s.contains("index on 'id'"), "got: {s}");
789        assert!(s.contains("parallel"), "got: {s}");
790    }
791
792    #[test]
793    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
794        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
795        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
796        assert!(s.contains("index on 'updated_at'"), "got: {s}");
797    }
798
799    #[test]
800    fn suggestion_acceptable_large_full_recommends_incremental() {
801        let e = make_export("t", ExportMode::Full, None);
802        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
803        assert!(s.contains("incremental"), "got: {s}");
804    }
805
806    #[test]
807    fn parallel_only_for_chunked_mode() {
808        let e = make_export("t", ExportMode::Full, None);
809        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
810        assert_eq!(level, 1, "non-chunked mode should recommend 1");
811    }
812
813    #[test]
814    fn parallel_small_dataset_is_one() {
815        let mut e = make_export("t", ExportMode::Chunked, None);
816        e.chunk_column = Some("id".to_string());
817        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
818        assert_eq!(level, 1, "small dataset should recommend 1");
819    }
820
821    #[test]
822    fn parallel_moderate_indexed_is_two() {
823        let mut e = make_export("t", ExportMode::Chunked, None);
824        e.chunk_column = Some("id".to_string());
825        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
826        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
827    }
828
829    #[test]
830    fn parallel_large_indexed_is_four() {
831        let mut e = make_export("t", ExportMode::Chunked, None);
832        e.chunk_column = Some("id".to_string());
833        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
834        assert_eq!(level, 4, "large indexed dataset should recommend 4");
835    }
836
837    #[test]
838    fn parallel_no_index_large_is_one() {
839        let mut e = make_export("t", ExportMode::Chunked, None);
840        e.chunk_column = Some("id".to_string());
841        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
842        assert_eq!(level, 1, "no index + large should recommend 1");
843        assert!(reason.contains("no index"), "got: {reason}");
844    }
845
846    #[test]
847    fn parallel_no_index_moderate_is_conservative() {
848        let mut e = make_export("t", ExportMode::Chunked, None);
849        e.chunk_column = Some("id".to_string());
850        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
851        assert_eq!(
852            level, 2,
853            "no index + moderate should recommend 2 (conservative)"
854        );
855    }
856
857    #[test]
858    fn suggestion_acceptable_large_chunked_recommends_parallel() {
859        let mut e = make_export("t", ExportMode::Chunked, None);
860        e.chunk_column = Some("id".to_string());
861        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
862        assert!(s.contains("parallel"), "got: {s}");
863    }
864
865    #[test]
866    fn connection_limit_warn_when_parallel_meets_max() {
867        let w = check_connection_limit(20, Some(20));
868        assert!(w.is_some(), "should warn when parallel == max_connections");
869        let msg = w.unwrap();
870        assert!(msg.contains("max_connections=20"), "got: {msg}");
871        assert!(msg.contains("parallel=20"), "got: {msg}");
872    }
873
874    #[test]
875    fn connection_limit_warn_when_parallel_exceeds_max() {
876        let w = check_connection_limit(100, Some(20));
877        assert!(w.is_some(), "should warn when parallel > max_connections");
878        let msg = w.unwrap();
879        assert!(msg.contains("max_connections=20"), "got: {msg}");
880    }
881
882    #[test]
883    fn connection_limit_no_warn_when_parallel_below_max() {
884        let w = check_connection_limit(4, Some(100));
885        assert!(
886            w.is_none(),
887            "should not warn when parallel << max_connections"
888        );
889    }
890
891    #[test]
892    fn connection_limit_no_warn_when_parallel_is_one() {
893        let w = check_connection_limit(1, Some(5));
894        assert!(
895            w.is_none(),
896            "single worker never triggers connection warning"
897        );
898    }
899
900    #[test]
901    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
902        let w = check_connection_limit(100, None);
903        assert!(w.is_some(), "should note that check was skipped");
904        let msg = w.unwrap();
905        assert!(msg.contains("skipped"), "got: {msg}");
906    }
907
908    #[test]
909    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
910        let w = check_connection_limit(1, None);
911        assert!(
912            w.is_none(),
913            "single worker never triggers connection warning"
914        );
915    }
916
917    #[test]
918    fn connection_limit_suggests_headroom() {
919        let w = check_connection_limit(25, Some(20)).unwrap();
920        // Suggested safe max should be max_connections - 3 = 17
921        assert!(
922            w.contains("17"),
923            "should suggest leaving headroom, got: {w}"
924        );
925    }
926
927    // ── v0.7.4: actionable hints next to categorised errors ───────────
928
929    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
930        let err = anyhow::anyhow!("{}", msg);
931        let cat = categorize_source_error(&err);
932        source_error_hint(cat, &err, &st)
933    }
934
935    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
936        let err = anyhow::anyhow!("{}", msg);
937        let dest = DestinationConfig {
938            destination_type: dt,
939            bucket: Some("b".into()),
940            ..Default::default()
941        };
942        let cat = categorize_dest_error(&err, &dest);
943        destination_error_hint(cat, &dest)
944    }
945
946    #[test]
947    fn source_tls_handshake_returns_pg_specific_tls_hint() {
948        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
949        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
950    }
951
952    #[test]
953    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
954        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
955        assert!(h.contains("tls.mode"), "got: {h}");
956    }
957
958    #[test]
959    fn source_auth_error_postgres_mentions_pg_hba() {
960        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
961        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
962    }
963
964    #[test]
965    fn source_auth_error_mysql_mentions_grant() {
966        let h = src_hint(
967            "Access denied for user 'rivet'@'localhost'",
968            SourceType::Mysql,
969        )
970        .expect("hint");
971        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
972    }
973
974    #[test]
975    fn source_connectivity_error_mentions_bastion_and_network() {
976        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
977        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
978    }
979
980    #[test]
981    fn source_unknown_error_returns_no_hint() {
982        // Generic "error" category should yield no hint — better to
983        // print the raw driver message than to mislead.
984        let h = src_hint("totally unexpected", SourceType::Postgres);
985        assert!(h.is_none(), "unknown errors should not produce a hint");
986    }
987
988    #[test]
989    fn dest_s3_auth_error_names_concrete_actions() {
990        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
991        assert!(
992            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
993            "got: {h}"
994        );
995    }
996
997    #[test]
998    fn dest_gcs_auth_error_names_concrete_actions() {
999        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1000        assert!(
1001            h.contains("storage.objects") && h.contains("cloud-permissions"),
1002            "got: {h}"
1003        );
1004    }
1005
1006    #[test]
1007    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1008        // Guard the load-bearing ordering in categorize_dest_error: the
1009        // "sas expired" early-return must fire before the generic "token"
1010        // branch, or destination_error_hint produces the wrong hint.
1011        // This test pins the *category string*, not just the final hint text.
1012        let err = anyhow::anyhow!(
1013            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1014        );
1015        let dest = DestinationConfig {
1016            destination_type: DestinationType::Azure,
1017            bucket: Some("c".into()),
1018            ..Default::default()
1019        };
1020        let cat = categorize_dest_error(&err, &dest);
1021        assert_eq!(
1022            cat, "sas expired",
1023            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1024        );
1025    }
1026
1027    #[test]
1028    fn dest_azure_sas_expired_returns_regenerate_hint() {
1029        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1030        // the hint must steer the operator to `az storage container
1031        // generate-sas` not "your IAM role is broken".
1032        let h = dest_hint(
1033            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1034            DestinationType::Azure,
1035        )
1036        .expect("hint");
1037        assert!(
1038            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1039            "got: {h}"
1040        );
1041    }
1042
1043    #[test]
1044    fn dest_s3_bucket_not_found_says_no_auto_create() {
1045        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1046        assert!(
1047            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1048            "got: {h}"
1049        );
1050    }
1051
1052    #[test]
1053    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1054        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1055        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1056    }
1057}