Skip to main content

rivet/tuning/
profile.rs

1//! Tuning profiles and the resolved `SourceTuning` config.
2//!
3//! Three baked-in profiles (`Fast`, `Balanced`, `Safe`) ship per-field defaults;
4//! a YAML `TuningConfig` can override individual fields and the chosen profile.
5//! `from_config_with_default_profile` is the production entry point and is
6//! wired in [`crate::plan::build`] to honour `source.environment:` —
7//! `Local` → `Fast`, `Replica`/`Production` → `Balanced`.
8
9use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17    pub batch_size: usize,
18    pub batch_size_memory_mb: Option<usize>,
19    pub throttle_ms: u64,
20    pub statement_timeout_s: u64,
21    pub max_retries: u32,
22    pub retry_backoff_ms: u64,
23    pub lock_timeout_s: u64,
24    /// RSS limit in MB before chunk processing throttles. `0` = no limit (disabled).
25    pub memory_threshold_mb: usize,
26    /// Hard cap on a single Arrow batch in MB. `None` = no cap.
27    pub max_batch_memory_mb: Option<usize>,
28    pub on_batch_memory_exceeded: BatchMemoryPolicy,
29    /// When true, Rivet samples DB pressure metrics every
30    /// [`super::ADAPTIVE_SAMPLE_INTERVAL`] batches and shrinks/restores the
31    /// fetch size in response. Default: false.
32    pub adaptive: bool,
33    /// Floor for the OPT-2 concurrency governor: the lowest worker/connection
34    /// count it will back parallelism down to under source pressure. `None`
35    /// ⇒ 1. The ceiling is the export's configured `parallel`. Only consulted
36    /// when `adaptive` is on and `parallel > 1`.
37    pub min_parallel: Option<usize>,
38    /// Hard ceiling on a single cell/value in MB (OPT-1 memory hardening): a
39    /// variable-length value (text/JSON/blob) larger than this aborts the run
40    /// with `RIVET_VALUE_TOO_LARGE` instead of risking OOM, since the
41    /// average-based batch cap can't bound one giant cell. `Some(0)` / `None`
42    /// disable the guard. Default: 256 MiB.
43    pub max_value_mb: Option<usize>,
44    configured_profile: TuningProfile,
45}
46
47#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
48#[serde(rename_all = "lowercase")]
49pub enum TuningProfile {
50    Fast,
51    Balanced,
52    Safe,
53}
54
55/// Action taken when a single Arrow batch exceeds `max_batch_memory_mb`.
56#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum BatchMemoryPolicy {
59    /// Log a warning and continue. (default)
60    #[default]
61    Warn,
62    /// Return an error — the export fails immediately.
63    Fail,
64    /// Split the oversized batch in half recursively until each sub-batch fits,
65    /// then process them individually. Transparent to the rest of the pipeline.
66    AutoShrink,
67}
68
69#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
70#[serde(deny_unknown_fields)]
71pub struct TuningConfig {
72    pub profile: Option<TuningProfile>,
73    pub batch_size: Option<usize>,
74    /// Target memory per batch in MB. Mutually exclusive with batch_size.
75    pub batch_size_memory_mb: Option<usize>,
76    pub throttle_ms: Option<u64>,
77    pub statement_timeout_s: Option<u64>,
78    pub max_retries: Option<u32>,
79    pub retry_backoff_ms: Option<u64>,
80    pub lock_timeout_s: Option<u64>,
81    pub memory_threshold_mb: Option<usize>,
82    /// Hard cap on Arrow batch memory in MB. When a batch exceeds this limit,
83    /// `on_batch_memory_exceeded` determines the response.
84    pub max_batch_memory_mb: Option<usize>,
85    /// Policy applied when a batch exceeds `max_batch_memory_mb`. Default: `warn`.
86    pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
87    /// Enable real-time batch size adaptation based on DB pressure metrics.
88    /// Postgres: samples `pg_stat_bgwriter`. MySQL: samples `Innodb_log_waits`.
89    /// Also arms the OPT-2 concurrency governor when `parallel > 1`.
90    pub adaptive: Option<bool>,
91    /// Floor for the concurrency governor (lowest parallelism under pressure).
92    /// Default 1. Ceiling is the export's `parallel`.
93    pub min_parallel: Option<usize>,
94    /// Hard per-value size ceiling in MB. A single text/JSON/blob cell larger
95    /// than this aborts the run with `RIVET_VALUE_TOO_LARGE`. `0` disables the
96    /// guard. Default: 256.
97    pub max_value_mb: Option<usize>,
98}
99
100/// Layer `export` on top of `source`: each field uses export when set, otherwise source.
101/// `None` only when both inputs are `None`.
102pub fn merge_tuning_config(
103    source: Option<&TuningConfig>,
104    export: Option<&TuningConfig>,
105) -> Option<TuningConfig> {
106    match (source, export) {
107        (None, None) => None,
108        (Some(s), None) => Some(s.clone()),
109        (None, Some(e)) => Some(e.clone()),
110        (Some(s), Some(e)) => Some(TuningConfig {
111            profile: e.profile.or(s.profile),
112            batch_size: e.batch_size.or(s.batch_size),
113            batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
114            throttle_ms: e.throttle_ms.or(s.throttle_ms),
115            statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
116            max_retries: e.max_retries.or(s.max_retries),
117            retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
118            lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
119            memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
120            max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
121            on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
122            adaptive: e.adaptive.or(s.adaptive),
123            min_parallel: e.min_parallel.or(s.min_parallel),
124            max_value_mb: e.max_value_mb.or(s.max_value_mb),
125        }),
126    }
127}
128
129impl SourceTuning {
130    /// The per-value byte ceiling: `max_value_mb` converted to bytes, with
131    /// `Some(0)` / `None` meaning "disabled". Single source of truth shared by
132    /// the sink's post-materialization guard (`sink::check_value_ceiling`) and
133    /// the per-engine pre-allocation guard (`source::value_within_ceiling`), so
134    /// the two definitions of "0 disables" can never drift apart.
135    pub(crate) fn max_value_bytes(&self) -> Option<usize> {
136        self.max_value_mb
137            .filter(|&mb| mb > 0)
138            .map(|mb| mb * 1024 * 1024)
139    }
140
141    /// Build tuning with the legacy `Balanced` fallback. Public for downstream
142    /// callers and tests; production resolution in [`crate::plan::build`] uses
143    /// [`Self::from_config_with_default_profile`] so that `source.environment:`
144    /// can pick the right default.
145    #[allow(dead_code)]
146    pub fn from_config(config: Option<&TuningConfig>) -> Self {
147        Self::from_config_with_default_profile(config, TuningProfile::Balanced)
148    }
149
150    /// Like [`Self::from_config`] but lets the caller override the fallback
151    /// profile used when `config.profile` is unset.
152    pub fn from_config_with_default_profile(
153        config: Option<&TuningConfig>,
154        fallback_profile: TuningProfile,
155    ) -> Self {
156        let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
157
158        let mut tuning = Self::from_profile(profile);
159        tuning.configured_profile = profile;
160
161        if let Some(cfg) = config {
162            if let Some(v) = cfg.batch_size {
163                tuning.batch_size = v;
164            }
165            tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
166            if let Some(v) = cfg.throttle_ms {
167                tuning.throttle_ms = v;
168            }
169            if let Some(v) = cfg.statement_timeout_s {
170                tuning.statement_timeout_s = v;
171            }
172            if let Some(v) = cfg.max_retries {
173                tuning.max_retries = v;
174            }
175            if let Some(v) = cfg.retry_backoff_ms {
176                tuning.retry_backoff_ms = v;
177            }
178            if let Some(v) = cfg.lock_timeout_s {
179                tuning.lock_timeout_s = v;
180            }
181            if let Some(v) = cfg.memory_threshold_mb {
182                tuning.memory_threshold_mb = v;
183            }
184            tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
185            if let Some(v) = cfg.on_batch_memory_exceeded {
186                tuning.on_batch_memory_exceeded = v;
187            }
188            if let Some(v) = cfg.adaptive {
189                tuning.adaptive = v;
190            }
191            if cfg.min_parallel.is_some() {
192                tuning.min_parallel = cfg.min_parallel;
193            }
194            if cfg.max_value_mb.is_some() {
195                tuning.max_value_mb = cfg.max_value_mb;
196            }
197        }
198
199        tuning
200    }
201
202    fn from_profile(profile: TuningProfile) -> Self {
203        match profile {
204            TuningProfile::Fast => Self {
205                batch_size: 50_000,
206                batch_size_memory_mb: None,
207                throttle_ms: 0,
208                statement_timeout_s: 0,
209                max_retries: 1,
210                retry_backoff_ms: 1_000,
211                lock_timeout_s: 0,
212                memory_threshold_mb: 0,
213                max_batch_memory_mb: None,
214                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
215                adaptive: false,
216                min_parallel: None,
217                max_value_mb: Some(256),
218                configured_profile: TuningProfile::Fast,
219            },
220            TuningProfile::Balanced => Self {
221                batch_size: 10_000,
222                batch_size_memory_mb: None,
223                throttle_ms: 50,
224                statement_timeout_s: 300,
225                max_retries: 3,
226                retry_backoff_ms: 2_000,
227                lock_timeout_s: 30,
228                memory_threshold_mb: 4_096,
229                max_batch_memory_mb: None,
230                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
231                adaptive: false,
232                min_parallel: None,
233                max_value_mb: Some(256),
234                configured_profile: TuningProfile::Balanced,
235            },
236            TuningProfile::Safe => Self {
237                batch_size: 2_000,
238                batch_size_memory_mb: None,
239                throttle_ms: 500,
240                statement_timeout_s: 120,
241                max_retries: 5,
242                retry_backoff_ms: 5_000,
243                lock_timeout_s: 10,
244                memory_threshold_mb: 2_048,
245                max_batch_memory_mb: None,
246                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
247                adaptive: false,
248                min_parallel: None,
249                max_value_mb: Some(256),
250                configured_profile: TuningProfile::Safe,
251            },
252        }
253    }
254
255    pub fn profile_name(&self) -> &'static str {
256        match self.configured_profile {
257            TuningProfile::Fast => "fast",
258            TuningProfile::Balanced => "balanced",
259            TuningProfile::Safe => "safe",
260        }
261    }
262
263    /// If `batch_size_memory_mb` is set, compute and return an adjusted batch_size
264    /// from the schema; otherwise return the configured `batch_size`.
265    pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
266        if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
267            let computed = compute_batch_size_from_memory(mem_mb, schema);
268            log::info!(
269                "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
270                mem_mb,
271                estimate_row_bytes(schema),
272                computed
273            );
274            computed
275        } else {
276            self.batch_size
277        }
278    }
279
280    /// Return the actual Arrow memory footprint of a batch in bytes.
281    ///
282    /// Sums `get_array_memory_size()` across all columns — includes buffers for
283    /// validity bitmaps, offsets, and value data. Does not include Arrow struct
284    /// overhead (~few hundred bytes) which is negligible at batch scale.
285    pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
286        batch
287            .columns()
288            .iter()
289            .map(|col| col.get_array_memory_size())
290            .sum()
291    }
292
293    /// Produce a `ResourceSummary` from the resolved tuning settings.
294    ///
295    /// The summary requires no database connection. It reports two batch-memory
296    /// bounds based on narrow-table (~200 B/row) and wide-table (~10 KB/row)
297    /// heuristics. A `wide_table_risk` flag is set when the upper bound exceeds
298    /// 128 MB per batch.
299    pub fn resource_summary(&self) -> ResourceSummary {
300        const NARROW_BYTES: f64 = 200.0;
301        const WIDE_BYTES: f64 = 10_240.0;
302        let batch = self.batch_size as f64;
303        let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
304        let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
305        ResourceSummary {
306            profile: self.profile_name().to_string(),
307            batch_size: self.batch_size,
308            batch_size_memory_mb: self.batch_size_memory_mb,
309            memory_threshold_mb: self.memory_threshold_mb,
310            throttle_ms: self.throttle_ms,
311            batch_narrow_mb,
312            batch_wide_mb,
313            wide_table_risk: batch_wide_mb > 128.0,
314        }
315    }
316}
317
318impl std::fmt::Display for SourceTuning {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        write!(
321            f,
322            "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
323            self.profile_name(),
324            self.batch_size,
325            self.throttle_ms,
326            self.statement_timeout_s,
327            self.max_retries,
328            self.lock_timeout_s,
329        )
330    }
331}
332
333/// Resource estimate computed from tuning settings alone (no DB connection required).
334///
335/// `batch_narrow_mb` and `batch_wide_mb` bracket the expected per-batch memory:
336/// - narrow table: ~200 B/row (int-heavy, no text blobs)
337/// - wide table  : ~10 KB/row (many text/JSON/binary columns)
338///
339/// Use `wide_table_risk` to decide whether to recommend `adaptive_batch` or a
340/// lower `batch_size`.
341#[derive(Debug, Clone)]
342pub struct ResourceSummary {
343    #[allow(dead_code)]
344    pub profile: String,
345    pub batch_size: usize,
346    pub batch_size_memory_mb: Option<usize>,
347    pub memory_threshold_mb: usize,
348    pub throttle_ms: u64,
349    pub batch_narrow_mb: f64,
350    pub batch_wide_mb: f64,
351    pub wide_table_risk: bool,
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
359        TuningConfig {
360            profile: Some(profile),
361            ..Default::default()
362        }
363    }
364
365    #[test]
366    fn default_config_uses_balanced_profile() {
367        let t = SourceTuning::from_config(None);
368        assert_eq!(t.batch_size, 10_000);
369        assert_eq!(t.throttle_ms, 50);
370        assert_eq!(t.statement_timeout_s, 300);
371        assert_eq!(t.max_retries, 3);
372        assert_eq!(t.retry_backoff_ms, 2_000);
373        assert_eq!(t.lock_timeout_s, 30);
374    }
375
376    #[test]
377    fn fallback_profile_used_when_no_profile_in_config() {
378        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
379        assert_eq!(t.batch_size, 50_000);
380        assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
381        assert_eq!(t.profile_name(), "fast");
382
383        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
384        assert_eq!(t.throttle_ms, 500);
385        assert_eq!(t.profile_name(), "safe");
386    }
387
388    #[test]
389    fn explicit_profile_wins_over_fallback() {
390        let cfg = cfg_with_profile(TuningProfile::Balanced);
391        let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
392        assert_eq!(
393            t.throttle_ms, 50,
394            "explicit balanced profile must keep its throttle"
395        );
396        assert_eq!(t.profile_name(), "balanced");
397    }
398
399    #[test]
400    fn fast_profile_favors_throughput() {
401        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
402        assert_eq!(t.batch_size, 50_000);
403        assert_eq!(t.throttle_ms, 0);
404        assert_eq!(t.statement_timeout_s, 0);
405        assert_eq!(t.max_retries, 1);
406    }
407
408    #[test]
409    fn safe_profile_limits_impact() {
410        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
411        assert_eq!(t.batch_size, 2_000);
412        assert_eq!(t.throttle_ms, 500);
413        assert_eq!(t.statement_timeout_s, 120);
414        assert_eq!(t.max_retries, 5);
415        assert_eq!(t.retry_backoff_ms, 5_000);
416        assert_eq!(t.lock_timeout_s, 10);
417    }
418
419    #[test]
420    fn explicit_fields_override_profile_defaults() {
421        let cfg = TuningConfig {
422            profile: Some(TuningProfile::Safe),
423            batch_size: Some(3_000),
424            throttle_ms: Some(250),
425            ..Default::default()
426        };
427        let t = SourceTuning::from_config(Some(&cfg));
428        assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
429        assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
430        assert_eq!(
431            t.statement_timeout_s, 120,
432            "non-overridden field stays at safe default"
433        );
434        assert_eq!(
435            t.max_retries, 5,
436            "non-overridden field stays at safe default"
437        );
438    }
439
440    #[test]
441    fn profile_name_fast() {
442        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
443        assert_eq!(t.profile_name(), "fast");
444    }
445
446    #[test]
447    fn profile_name_balanced() {
448        let t = SourceTuning::from_config(None);
449        assert_eq!(t.profile_name(), "balanced");
450    }
451
452    #[test]
453    fn profile_name_safe() {
454        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
455        assert_eq!(t.profile_name(), "safe");
456    }
457
458    #[test]
459    fn display_contains_all_fields() {
460        let t = SourceTuning::from_config(None);
461        let s = t.to_string();
462        assert!(s.contains("profile=balanced"), "missing profile in: {s}");
463        assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
464        assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
465        assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
466        assert!(s.contains("retries=3"), "missing retries in: {s}");
467        assert!(
468            s.contains("lock_timeout=30s"),
469            "missing lock_timeout in: {s}"
470        );
471    }
472
473    #[test]
474    fn merge_tuning_export_overrides_source_fields() {
475        let source = TuningConfig {
476            profile: Some(TuningProfile::Fast),
477            batch_size: Some(1_000),
478            throttle_ms: Some(0),
479            ..Default::default()
480        };
481        let export = TuningConfig {
482            profile: Some(TuningProfile::Safe),
483            batch_size: None,
484            ..Default::default()
485        };
486        let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
487        assert_eq!(m.profile, Some(TuningProfile::Safe));
488        assert_eq!(
489            m.batch_size,
490            Some(1_000),
491            "export omitted batch_size -> keep source"
492        );
493        assert_eq!(m.throttle_ms, Some(0));
494    }
495
496    #[test]
497    fn merge_tuning_export_only() {
498        let e = cfg_with_profile(TuningProfile::Fast);
499        let m = merge_tuning_config(None, Some(&e)).expect("merged");
500        assert_eq!(m.profile, Some(TuningProfile::Fast));
501    }
502
503    #[test]
504    fn effective_batch_size_without_memory() {
505        let t = SourceTuning::from_config(None);
506        assert_eq!(t.effective_batch_size(None), 10_000);
507    }
508
509    #[test]
510    fn effective_batch_size_with_memory() {
511        use arrow::datatypes::{DataType, Field, Schema};
512        use std::sync::Arc;
513        let cfg = TuningConfig {
514            batch_size_memory_mb: Some(256),
515            ..Default::default()
516        };
517        let t = SourceTuning::from_config(Some(&cfg));
518        let schema = Arc::new(Schema::new(vec![
519            Field::new("id", DataType::Int64, false),
520            Field::new("name", DataType::Utf8, true),
521        ]));
522        let bs = t.effective_batch_size(Some(&schema));
523        assert!((1_000..=500_000).contains(&bs), "got {bs}");
524        // 256MB / 266B ≈ 1_009_022, clamped to 500_000
525        assert_eq!(bs, 500_000);
526    }
527
528    #[test]
529    fn resource_summary_balanced_profile() {
530        let t = SourceTuning::from_config(None);
531        let r = t.resource_summary();
532        assert_eq!(r.profile, "balanced");
533        assert_eq!(r.batch_size, 10_000);
534        assert!(r.batch_size_memory_mb.is_none());
535        assert_eq!(r.memory_threshold_mb, 4_096);
536        assert_eq!(r.throttle_ms, 50);
537        // narrow: 10_000 × 200 B = ~1.9 MB
538        assert!(
539            r.batch_narrow_mb < 5.0,
540            "narrow too high: {}",
541            r.batch_narrow_mb
542        );
543        // wide: 10_000 × 10 KB = ~95 MB — no risk (< 128 MB)
544        assert!(
545            !r.wide_table_risk,
546            "balanced 10k should not trigger wide_table_risk"
547        );
548    }
549
550    #[test]
551    fn resource_summary_fast_profile_triggers_wide_table_risk() {
552        let t = SourceTuning::from_config(Some(&TuningConfig {
553            profile: Some(TuningProfile::Fast),
554            ..Default::default()
555        }));
556        let r = t.resource_summary();
557        assert_eq!(r.batch_size, 50_000);
558        // wide: 50_000 × 10 KB = ~476 MB → high risk
559        assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
560    }
561
562    #[test]
563    fn resource_summary_with_adaptive_batch() {
564        let cfg = TuningConfig {
565            batch_size_memory_mb: Some(64),
566            ..Default::default()
567        };
568        let t = SourceTuning::from_config(Some(&cfg));
569        let r = t.resource_summary();
570        assert_eq!(r.batch_size_memory_mb, Some(64));
571    }
572}