Skip to main content

rivet/tuning/
profile.rs

1//! Tuning profiles and the resolved `SourceTuning` config.
2//!
3//! Three baked-in profiles (`Fast`, `Balanced`, `Safe`) ship per-field defaults;
4//! a YAML `TuningConfig` can override individual fields and the chosen profile.
5//! `from_config_with_default_profile` is the production entry point and is
6//! wired in [`crate::plan::build`] to honour `source.environment:` —
7//! `Local` → `Fast`, `Replica`/`Production` → `Balanced`.
8
9use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17    pub batch_size: usize,
18    pub batch_size_memory_mb: Option<usize>,
19    pub throttle_ms: u64,
20    pub statement_timeout_s: u64,
21    pub max_retries: u32,
22    pub retry_backoff_ms: u64,
23    pub lock_timeout_s: u64,
24    /// RSS limit in MB before chunk processing throttles. `0` = no limit (disabled).
25    pub memory_threshold_mb: usize,
26    /// Hard cap on a single Arrow batch in MB. `None` = no cap.
27    pub max_batch_memory_mb: Option<usize>,
28    pub on_batch_memory_exceeded: BatchMemoryPolicy,
29    /// When true, Rivet samples DB pressure metrics every
30    /// [`super::ADAPTIVE_SAMPLE_INTERVAL`] batches and shrinks/restores the
31    /// fetch size in response. Default: false.
32    pub adaptive: bool,
33    /// Floor for the OPT-2 concurrency governor: the lowest worker/connection
34    /// count it will back parallelism down to under source pressure. `None`
35    /// ⇒ 1. The ceiling is the export's configured `parallel`. Only consulted
36    /// when `adaptive` is on and `parallel > 1`.
37    pub min_parallel: Option<usize>,
38    /// Hard ceiling on a single cell/value in MB (OPT-1 memory hardening): a
39    /// variable-length value (text/JSON/blob) larger than this aborts the run
40    /// with `RIVET_VALUE_TOO_LARGE` instead of risking OOM, since the
41    /// average-based batch cap can't bound one giant cell. `Some(0)` / `None`
42    /// disable the guard. Default: 256 MiB.
43    pub max_value_mb: Option<usize>,
44    configured_profile: TuningProfile,
45}
46
47#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
48#[serde(rename_all = "lowercase")]
49pub enum TuningProfile {
50    Fast,
51    Balanced,
52    Safe,
53}
54
55/// Action taken when a single Arrow batch exceeds `max_batch_memory_mb`.
56#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum BatchMemoryPolicy {
59    /// Log a warning and continue. (default)
60    #[default]
61    Warn,
62    /// Return an error — the export fails immediately.
63    Fail,
64    /// Split the oversized batch in half recursively until each sub-batch fits,
65    /// then process them individually. Transparent to the rest of the pipeline.
66    AutoShrink,
67}
68
69#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
70#[serde(deny_unknown_fields)]
71pub struct TuningConfig {
72    pub profile: Option<TuningProfile>,
73    pub batch_size: Option<usize>,
74    /// Target memory per batch in MB. Mutually exclusive with batch_size.
75    pub batch_size_memory_mb: Option<usize>,
76    pub throttle_ms: Option<u64>,
77    pub statement_timeout_s: Option<u64>,
78    pub max_retries: Option<u32>,
79    pub retry_backoff_ms: Option<u64>,
80    pub lock_timeout_s: Option<u64>,
81    pub memory_threshold_mb: Option<usize>,
82    /// Hard cap on Arrow batch memory in MB. When a batch exceeds this limit,
83    /// `on_batch_memory_exceeded` determines the response.
84    pub max_batch_memory_mb: Option<usize>,
85    /// Policy applied when a batch exceeds `max_batch_memory_mb`. Default: `warn`.
86    pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
87    /// Enable real-time batch size adaptation based on DB pressure metrics.
88    /// Postgres: samples `pg_stat_bgwriter`. MySQL: samples `Innodb_log_waits`.
89    /// Also arms the OPT-2 concurrency governor when `parallel > 1`.
90    pub adaptive: Option<bool>,
91    /// Floor for the concurrency governor (lowest parallelism under pressure).
92    /// Default 1. Ceiling is the export's `parallel`.
93    pub min_parallel: Option<usize>,
94    /// Hard per-value size ceiling in MB. A single text/JSON/blob cell larger
95    /// than this aborts the run with `RIVET_VALUE_TOO_LARGE`. `0` disables the
96    /// guard. Default: 256.
97    pub max_value_mb: Option<usize>,
98}
99
100/// Layer `export` on top of `source`: each field uses export when set, otherwise source.
101/// `None` only when both inputs are `None`.
102pub fn merge_tuning_config(
103    source: Option<&TuningConfig>,
104    export: Option<&TuningConfig>,
105) -> Option<TuningConfig> {
106    match (source, export) {
107        (None, None) => None,
108        (Some(s), None) => Some(s.clone()),
109        (None, Some(e)) => Some(e.clone()),
110        (Some(s), Some(e)) => Some(TuningConfig {
111            profile: e.profile.or(s.profile),
112            batch_size: e.batch_size.or(s.batch_size),
113            batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
114            throttle_ms: e.throttle_ms.or(s.throttle_ms),
115            statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
116            max_retries: e.max_retries.or(s.max_retries),
117            retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
118            lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
119            memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
120            max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
121            on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
122            adaptive: e.adaptive.or(s.adaptive),
123            min_parallel: e.min_parallel.or(s.min_parallel),
124            max_value_mb: e.max_value_mb.or(s.max_value_mb),
125        }),
126    }
127}
128
129impl SourceTuning {
130    /// Build tuning with the legacy `Balanced` fallback. Public for downstream
131    /// callers and tests; production resolution in [`crate::plan::build`] uses
132    /// [`Self::from_config_with_default_profile`] so that `source.environment:`
133    /// can pick the right default.
134    #[allow(dead_code)]
135    pub fn from_config(config: Option<&TuningConfig>) -> Self {
136        Self::from_config_with_default_profile(config, TuningProfile::Balanced)
137    }
138
139    /// Like [`Self::from_config`] but lets the caller override the fallback
140    /// profile used when `config.profile` is unset.
141    pub fn from_config_with_default_profile(
142        config: Option<&TuningConfig>,
143        fallback_profile: TuningProfile,
144    ) -> Self {
145        let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
146
147        let mut tuning = Self::from_profile(profile);
148        tuning.configured_profile = profile;
149
150        if let Some(cfg) = config {
151            if let Some(v) = cfg.batch_size {
152                tuning.batch_size = v;
153            }
154            tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
155            if let Some(v) = cfg.throttle_ms {
156                tuning.throttle_ms = v;
157            }
158            if let Some(v) = cfg.statement_timeout_s {
159                tuning.statement_timeout_s = v;
160            }
161            if let Some(v) = cfg.max_retries {
162                tuning.max_retries = v;
163            }
164            if let Some(v) = cfg.retry_backoff_ms {
165                tuning.retry_backoff_ms = v;
166            }
167            if let Some(v) = cfg.lock_timeout_s {
168                tuning.lock_timeout_s = v;
169            }
170            if let Some(v) = cfg.memory_threshold_mb {
171                tuning.memory_threshold_mb = v;
172            }
173            tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
174            if let Some(v) = cfg.on_batch_memory_exceeded {
175                tuning.on_batch_memory_exceeded = v;
176            }
177            if let Some(v) = cfg.adaptive {
178                tuning.adaptive = v;
179            }
180            if cfg.min_parallel.is_some() {
181                tuning.min_parallel = cfg.min_parallel;
182            }
183            if cfg.max_value_mb.is_some() {
184                tuning.max_value_mb = cfg.max_value_mb;
185            }
186        }
187
188        tuning
189    }
190
191    fn from_profile(profile: TuningProfile) -> Self {
192        match profile {
193            TuningProfile::Fast => Self {
194                batch_size: 50_000,
195                batch_size_memory_mb: None,
196                throttle_ms: 0,
197                statement_timeout_s: 0,
198                max_retries: 1,
199                retry_backoff_ms: 1_000,
200                lock_timeout_s: 0,
201                memory_threshold_mb: 0,
202                max_batch_memory_mb: None,
203                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
204                adaptive: false,
205                min_parallel: None,
206                max_value_mb: Some(256),
207                configured_profile: TuningProfile::Fast,
208            },
209            TuningProfile::Balanced => Self {
210                batch_size: 10_000,
211                batch_size_memory_mb: None,
212                throttle_ms: 50,
213                statement_timeout_s: 300,
214                max_retries: 3,
215                retry_backoff_ms: 2_000,
216                lock_timeout_s: 30,
217                memory_threshold_mb: 4_096,
218                max_batch_memory_mb: None,
219                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
220                adaptive: false,
221                min_parallel: None,
222                max_value_mb: Some(256),
223                configured_profile: TuningProfile::Balanced,
224            },
225            TuningProfile::Safe => Self {
226                batch_size: 2_000,
227                batch_size_memory_mb: None,
228                throttle_ms: 500,
229                statement_timeout_s: 120,
230                max_retries: 5,
231                retry_backoff_ms: 5_000,
232                lock_timeout_s: 10,
233                memory_threshold_mb: 2_048,
234                max_batch_memory_mb: None,
235                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
236                adaptive: false,
237                min_parallel: None,
238                max_value_mb: Some(256),
239                configured_profile: TuningProfile::Safe,
240            },
241        }
242    }
243
244    pub fn profile_name(&self) -> &'static str {
245        match self.configured_profile {
246            TuningProfile::Fast => "fast",
247            TuningProfile::Balanced => "balanced",
248            TuningProfile::Safe => "safe",
249        }
250    }
251
252    /// If `batch_size_memory_mb` is set, compute and return an adjusted batch_size
253    /// from the schema; otherwise return the configured `batch_size`.
254    pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
255        if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
256            let computed = compute_batch_size_from_memory(mem_mb, schema);
257            log::info!(
258                "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
259                mem_mb,
260                estimate_row_bytes(schema),
261                computed
262            );
263            computed
264        } else {
265            self.batch_size
266        }
267    }
268
269    /// Return the actual Arrow memory footprint of a batch in bytes.
270    ///
271    /// Sums `get_array_memory_size()` across all columns — includes buffers for
272    /// validity bitmaps, offsets, and value data. Does not include Arrow struct
273    /// overhead (~few hundred bytes) which is negligible at batch scale.
274    pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
275        batch
276            .columns()
277            .iter()
278            .map(|col| col.get_array_memory_size())
279            .sum()
280    }
281
282    /// Produce a `ResourceSummary` from the resolved tuning settings.
283    ///
284    /// The summary requires no database connection. It reports two batch-memory
285    /// bounds based on narrow-table (~200 B/row) and wide-table (~10 KB/row)
286    /// heuristics. A `wide_table_risk` flag is set when the upper bound exceeds
287    /// 128 MB per batch.
288    pub fn resource_summary(&self) -> ResourceSummary {
289        const NARROW_BYTES: f64 = 200.0;
290        const WIDE_BYTES: f64 = 10_240.0;
291        let batch = self.batch_size as f64;
292        let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
293        let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
294        ResourceSummary {
295            profile: self.profile_name().to_string(),
296            batch_size: self.batch_size,
297            batch_size_memory_mb: self.batch_size_memory_mb,
298            memory_threshold_mb: self.memory_threshold_mb,
299            throttle_ms: self.throttle_ms,
300            batch_narrow_mb,
301            batch_wide_mb,
302            wide_table_risk: batch_wide_mb > 128.0,
303        }
304    }
305}
306
307impl std::fmt::Display for SourceTuning {
308    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309        write!(
310            f,
311            "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
312            self.profile_name(),
313            self.batch_size,
314            self.throttle_ms,
315            self.statement_timeout_s,
316            self.max_retries,
317            self.lock_timeout_s,
318        )
319    }
320}
321
322/// Resource estimate computed from tuning settings alone (no DB connection required).
323///
324/// `batch_narrow_mb` and `batch_wide_mb` bracket the expected per-batch memory:
325/// - narrow table: ~200 B/row (int-heavy, no text blobs)
326/// - wide table  : ~10 KB/row (many text/JSON/binary columns)
327///
328/// Use `wide_table_risk` to decide whether to recommend `adaptive_batch` or a
329/// lower `batch_size`.
330#[derive(Debug, Clone)]
331pub struct ResourceSummary {
332    #[allow(dead_code)]
333    pub profile: String,
334    pub batch_size: usize,
335    pub batch_size_memory_mb: Option<usize>,
336    pub memory_threshold_mb: usize,
337    pub throttle_ms: u64,
338    pub batch_narrow_mb: f64,
339    pub batch_wide_mb: f64,
340    pub wide_table_risk: bool,
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
348        TuningConfig {
349            profile: Some(profile),
350            ..Default::default()
351        }
352    }
353
354    #[test]
355    fn default_config_uses_balanced_profile() {
356        let t = SourceTuning::from_config(None);
357        assert_eq!(t.batch_size, 10_000);
358        assert_eq!(t.throttle_ms, 50);
359        assert_eq!(t.statement_timeout_s, 300);
360        assert_eq!(t.max_retries, 3);
361        assert_eq!(t.retry_backoff_ms, 2_000);
362        assert_eq!(t.lock_timeout_s, 30);
363    }
364
365    #[test]
366    fn fallback_profile_used_when_no_profile_in_config() {
367        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
368        assert_eq!(t.batch_size, 50_000);
369        assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
370        assert_eq!(t.profile_name(), "fast");
371
372        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
373        assert_eq!(t.throttle_ms, 500);
374        assert_eq!(t.profile_name(), "safe");
375    }
376
377    #[test]
378    fn explicit_profile_wins_over_fallback() {
379        let cfg = cfg_with_profile(TuningProfile::Balanced);
380        let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
381        assert_eq!(
382            t.throttle_ms, 50,
383            "explicit balanced profile must keep its throttle"
384        );
385        assert_eq!(t.profile_name(), "balanced");
386    }
387
388    #[test]
389    fn fast_profile_favors_throughput() {
390        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
391        assert_eq!(t.batch_size, 50_000);
392        assert_eq!(t.throttle_ms, 0);
393        assert_eq!(t.statement_timeout_s, 0);
394        assert_eq!(t.max_retries, 1);
395    }
396
397    #[test]
398    fn safe_profile_limits_impact() {
399        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
400        assert_eq!(t.batch_size, 2_000);
401        assert_eq!(t.throttle_ms, 500);
402        assert_eq!(t.statement_timeout_s, 120);
403        assert_eq!(t.max_retries, 5);
404        assert_eq!(t.retry_backoff_ms, 5_000);
405        assert_eq!(t.lock_timeout_s, 10);
406    }
407
408    #[test]
409    fn explicit_fields_override_profile_defaults() {
410        let cfg = TuningConfig {
411            profile: Some(TuningProfile::Safe),
412            batch_size: Some(3_000),
413            throttle_ms: Some(250),
414            ..Default::default()
415        };
416        let t = SourceTuning::from_config(Some(&cfg));
417        assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
418        assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
419        assert_eq!(
420            t.statement_timeout_s, 120,
421            "non-overridden field stays at safe default"
422        );
423        assert_eq!(
424            t.max_retries, 5,
425            "non-overridden field stays at safe default"
426        );
427    }
428
429    #[test]
430    fn profile_name_fast() {
431        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
432        assert_eq!(t.profile_name(), "fast");
433    }
434
435    #[test]
436    fn profile_name_balanced() {
437        let t = SourceTuning::from_config(None);
438        assert_eq!(t.profile_name(), "balanced");
439    }
440
441    #[test]
442    fn profile_name_safe() {
443        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
444        assert_eq!(t.profile_name(), "safe");
445    }
446
447    #[test]
448    fn display_contains_all_fields() {
449        let t = SourceTuning::from_config(None);
450        let s = t.to_string();
451        assert!(s.contains("profile=balanced"), "missing profile in: {s}");
452        assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
453        assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
454        assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
455        assert!(s.contains("retries=3"), "missing retries in: {s}");
456        assert!(
457            s.contains("lock_timeout=30s"),
458            "missing lock_timeout in: {s}"
459        );
460    }
461
462    #[test]
463    fn merge_tuning_export_overrides_source_fields() {
464        let source = TuningConfig {
465            profile: Some(TuningProfile::Fast),
466            batch_size: Some(1_000),
467            throttle_ms: Some(0),
468            ..Default::default()
469        };
470        let export = TuningConfig {
471            profile: Some(TuningProfile::Safe),
472            batch_size: None,
473            ..Default::default()
474        };
475        let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
476        assert_eq!(m.profile, Some(TuningProfile::Safe));
477        assert_eq!(
478            m.batch_size,
479            Some(1_000),
480            "export omitted batch_size -> keep source"
481        );
482        assert_eq!(m.throttle_ms, Some(0));
483    }
484
485    #[test]
486    fn merge_tuning_export_only() {
487        let e = cfg_with_profile(TuningProfile::Fast);
488        let m = merge_tuning_config(None, Some(&e)).expect("merged");
489        assert_eq!(m.profile, Some(TuningProfile::Fast));
490    }
491
492    #[test]
493    fn effective_batch_size_without_memory() {
494        let t = SourceTuning::from_config(None);
495        assert_eq!(t.effective_batch_size(None), 10_000);
496    }
497
498    #[test]
499    fn effective_batch_size_with_memory() {
500        use arrow::datatypes::{DataType, Field, Schema};
501        use std::sync::Arc;
502        let cfg = TuningConfig {
503            batch_size_memory_mb: Some(256),
504            ..Default::default()
505        };
506        let t = SourceTuning::from_config(Some(&cfg));
507        let schema = Arc::new(Schema::new(vec![
508            Field::new("id", DataType::Int64, false),
509            Field::new("name", DataType::Utf8, true),
510        ]));
511        let bs = t.effective_batch_size(Some(&schema));
512        assert!((1_000..=500_000).contains(&bs), "got {bs}");
513        // 256MB / 266B ≈ 1_009_022, clamped to 500_000
514        assert_eq!(bs, 500_000);
515    }
516
517    #[test]
518    fn resource_summary_balanced_profile() {
519        let t = SourceTuning::from_config(None);
520        let r = t.resource_summary();
521        assert_eq!(r.profile, "balanced");
522        assert_eq!(r.batch_size, 10_000);
523        assert!(r.batch_size_memory_mb.is_none());
524        assert_eq!(r.memory_threshold_mb, 4_096);
525        assert_eq!(r.throttle_ms, 50);
526        // narrow: 10_000 × 200 B = ~1.9 MB
527        assert!(
528            r.batch_narrow_mb < 5.0,
529            "narrow too high: {}",
530            r.batch_narrow_mb
531        );
532        // wide: 10_000 × 10 KB = ~95 MB — no risk (< 128 MB)
533        assert!(
534            !r.wide_table_risk,
535            "balanced 10k should not trigger wide_table_risk"
536        );
537    }
538
539    #[test]
540    fn resource_summary_fast_profile_triggers_wide_table_risk() {
541        let t = SourceTuning::from_config(Some(&TuningConfig {
542            profile: Some(TuningProfile::Fast),
543            ..Default::default()
544        }));
545        let r = t.resource_summary();
546        assert_eq!(r.batch_size, 50_000);
547        // wide: 50_000 × 10 KB = ~476 MB → high risk
548        assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
549    }
550
551    #[test]
552    fn resource_summary_with_adaptive_batch() {
553        let cfg = TuningConfig {
554            batch_size_memory_mb: Some(64),
555            ..Default::default()
556        };
557        let t = SourceTuning::from_config(Some(&cfg));
558        let r = t.resource_summary();
559        assert_eq!(r.batch_size_memory_mb, Some(64));
560    }
561}