Skip to main content

rivet/tuning/
profile.rs

1//! Tuning profiles and the resolved `SourceTuning` config.
2//!
3//! Three baked-in profiles (`Fast`, `Balanced`, `Safe`) ship per-field defaults;
4//! a YAML `TuningConfig` can override individual fields and the chosen profile.
5//! `from_config_with_default_profile` is the production entry point and is
6//! wired in [`crate::plan::build`] to honour `source.environment:` —
7//! `Local` → `Fast`, `Replica`/`Production` → `Balanced`.
8
9use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17    pub batch_size: usize,
18    pub batch_size_memory_mb: Option<usize>,
19    pub throttle_ms: u64,
20    pub statement_timeout_s: u64,
21    pub max_retries: u32,
22    pub retry_backoff_ms: u64,
23    pub lock_timeout_s: u64,
24    /// RSS limit in MB before chunk processing throttles. `0` = no limit (disabled).
25    pub memory_threshold_mb: usize,
26    /// Hard cap on a single Arrow batch in MB. `None` = no cap.
27    pub max_batch_memory_mb: Option<usize>,
28    pub on_batch_memory_exceeded: BatchMemoryPolicy,
29    /// When true, Rivet samples DB pressure metrics every
30    /// [`super::ADAPTIVE_SAMPLE_INTERVAL`] batches and shrinks/restores the
31    /// fetch size in response. Default: false.
32    pub adaptive: bool,
33    configured_profile: TuningProfile,
34}
35
36#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
37#[serde(rename_all = "lowercase")]
38pub enum TuningProfile {
39    Fast,
40    Balanced,
41    Safe,
42}
43
44/// Action taken when a single Arrow batch exceeds `max_batch_memory_mb`.
45#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
46#[serde(rename_all = "snake_case")]
47pub enum BatchMemoryPolicy {
48    /// Log a warning and continue. (default)
49    #[default]
50    Warn,
51    /// Return an error — the export fails immediately.
52    Fail,
53    /// Split the oversized batch in half recursively until each sub-batch fits,
54    /// then process them individually. Transparent to the rest of the pipeline.
55    AutoShrink,
56}
57
58#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
59#[serde(deny_unknown_fields)]
60pub struct TuningConfig {
61    pub profile: Option<TuningProfile>,
62    pub batch_size: Option<usize>,
63    /// Target memory per batch in MB. Mutually exclusive with batch_size.
64    pub batch_size_memory_mb: Option<usize>,
65    pub throttle_ms: Option<u64>,
66    pub statement_timeout_s: Option<u64>,
67    pub max_retries: Option<u32>,
68    pub retry_backoff_ms: Option<u64>,
69    pub lock_timeout_s: Option<u64>,
70    pub memory_threshold_mb: Option<usize>,
71    /// Hard cap on Arrow batch memory in MB. When a batch exceeds this limit,
72    /// `on_batch_memory_exceeded` determines the response.
73    pub max_batch_memory_mb: Option<usize>,
74    /// Policy applied when a batch exceeds `max_batch_memory_mb`. Default: `warn`.
75    pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
76    /// Enable real-time batch size adaptation based on DB pressure metrics.
77    /// Postgres: samples `pg_stat_bgwriter`. MySQL: samples `Innodb_log_waits`.
78    pub adaptive: Option<bool>,
79}
80
81/// Layer `export` on top of `source`: each field uses export when set, otherwise source.
82/// `None` only when both inputs are `None`.
83pub fn merge_tuning_config(
84    source: Option<&TuningConfig>,
85    export: Option<&TuningConfig>,
86) -> Option<TuningConfig> {
87    match (source, export) {
88        (None, None) => None,
89        (Some(s), None) => Some(s.clone()),
90        (None, Some(e)) => Some(e.clone()),
91        (Some(s), Some(e)) => Some(TuningConfig {
92            profile: e.profile.or(s.profile),
93            batch_size: e.batch_size.or(s.batch_size),
94            batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
95            throttle_ms: e.throttle_ms.or(s.throttle_ms),
96            statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
97            max_retries: e.max_retries.or(s.max_retries),
98            retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
99            lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
100            memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
101            max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
102            on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
103            adaptive: e.adaptive.or(s.adaptive),
104        }),
105    }
106}
107
108impl SourceTuning {
109    /// Build tuning with the legacy `Balanced` fallback. Public for downstream
110    /// callers and tests; production resolution in [`crate::plan::build`] uses
111    /// [`Self::from_config_with_default_profile`] so that `source.environment:`
112    /// can pick the right default.
113    #[allow(dead_code)]
114    pub fn from_config(config: Option<&TuningConfig>) -> Self {
115        Self::from_config_with_default_profile(config, TuningProfile::Balanced)
116    }
117
118    /// Like [`Self::from_config`] but lets the caller override the fallback
119    /// profile used when `config.profile` is unset.
120    pub fn from_config_with_default_profile(
121        config: Option<&TuningConfig>,
122        fallback_profile: TuningProfile,
123    ) -> Self {
124        let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
125
126        let mut tuning = Self::from_profile(profile);
127        tuning.configured_profile = profile;
128
129        if let Some(cfg) = config {
130            if let Some(v) = cfg.batch_size {
131                tuning.batch_size = v;
132            }
133            tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
134            if let Some(v) = cfg.throttle_ms {
135                tuning.throttle_ms = v;
136            }
137            if let Some(v) = cfg.statement_timeout_s {
138                tuning.statement_timeout_s = v;
139            }
140            if let Some(v) = cfg.max_retries {
141                tuning.max_retries = v;
142            }
143            if let Some(v) = cfg.retry_backoff_ms {
144                tuning.retry_backoff_ms = v;
145            }
146            if let Some(v) = cfg.lock_timeout_s {
147                tuning.lock_timeout_s = v;
148            }
149            if let Some(v) = cfg.memory_threshold_mb {
150                tuning.memory_threshold_mb = v;
151            }
152            tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
153            if let Some(v) = cfg.on_batch_memory_exceeded {
154                tuning.on_batch_memory_exceeded = v;
155            }
156            if let Some(v) = cfg.adaptive {
157                tuning.adaptive = v;
158            }
159        }
160
161        tuning
162    }
163
164    fn from_profile(profile: TuningProfile) -> Self {
165        match profile {
166            TuningProfile::Fast => Self {
167                batch_size: 50_000,
168                batch_size_memory_mb: None,
169                throttle_ms: 0,
170                statement_timeout_s: 0,
171                max_retries: 1,
172                retry_backoff_ms: 1_000,
173                lock_timeout_s: 0,
174                memory_threshold_mb: 0,
175                max_batch_memory_mb: None,
176                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
177                adaptive: false,
178                configured_profile: TuningProfile::Fast,
179            },
180            TuningProfile::Balanced => Self {
181                batch_size: 10_000,
182                batch_size_memory_mb: None,
183                throttle_ms: 50,
184                statement_timeout_s: 300,
185                max_retries: 3,
186                retry_backoff_ms: 2_000,
187                lock_timeout_s: 30,
188                memory_threshold_mb: 4_096,
189                max_batch_memory_mb: None,
190                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
191                adaptive: false,
192                configured_profile: TuningProfile::Balanced,
193            },
194            TuningProfile::Safe => Self {
195                batch_size: 2_000,
196                batch_size_memory_mb: None,
197                throttle_ms: 500,
198                statement_timeout_s: 120,
199                max_retries: 5,
200                retry_backoff_ms: 5_000,
201                lock_timeout_s: 10,
202                memory_threshold_mb: 2_048,
203                max_batch_memory_mb: None,
204                on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
205                adaptive: false,
206                configured_profile: TuningProfile::Safe,
207            },
208        }
209    }
210
211    pub fn profile_name(&self) -> &'static str {
212        match self.configured_profile {
213            TuningProfile::Fast => "fast",
214            TuningProfile::Balanced => "balanced",
215            TuningProfile::Safe => "safe",
216        }
217    }
218
219    /// If `batch_size_memory_mb` is set, compute and return an adjusted batch_size
220    /// from the schema; otherwise return the configured `batch_size`.
221    pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
222        if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
223            let computed = compute_batch_size_from_memory(mem_mb, schema);
224            log::info!(
225                "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
226                mem_mb,
227                estimate_row_bytes(schema),
228                computed
229            );
230            computed
231        } else {
232            self.batch_size
233        }
234    }
235
236    /// Return the actual Arrow memory footprint of a batch in bytes.
237    ///
238    /// Sums `get_array_memory_size()` across all columns — includes buffers for
239    /// validity bitmaps, offsets, and value data. Does not include Arrow struct
240    /// overhead (~few hundred bytes) which is negligible at batch scale.
241    pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
242        batch
243            .columns()
244            .iter()
245            .map(|col| col.get_array_memory_size())
246            .sum()
247    }
248
249    /// Produce a `ResourceSummary` from the resolved tuning settings.
250    ///
251    /// The summary requires no database connection. It reports two batch-memory
252    /// bounds based on narrow-table (~200 B/row) and wide-table (~10 KB/row)
253    /// heuristics. A `wide_table_risk` flag is set when the upper bound exceeds
254    /// 128 MB per batch.
255    pub fn resource_summary(&self) -> ResourceSummary {
256        const NARROW_BYTES: f64 = 200.0;
257        const WIDE_BYTES: f64 = 10_240.0;
258        let batch = self.batch_size as f64;
259        let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
260        let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
261        ResourceSummary {
262            profile: self.profile_name().to_string(),
263            batch_size: self.batch_size,
264            batch_size_memory_mb: self.batch_size_memory_mb,
265            memory_threshold_mb: self.memory_threshold_mb,
266            throttle_ms: self.throttle_ms,
267            batch_narrow_mb,
268            batch_wide_mb,
269            wide_table_risk: batch_wide_mb > 128.0,
270        }
271    }
272}
273
274impl std::fmt::Display for SourceTuning {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        write!(
277            f,
278            "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
279            self.profile_name(),
280            self.batch_size,
281            self.throttle_ms,
282            self.statement_timeout_s,
283            self.max_retries,
284            self.lock_timeout_s,
285        )
286    }
287}
288
289/// Resource estimate computed from tuning settings alone (no DB connection required).
290///
291/// `batch_narrow_mb` and `batch_wide_mb` bracket the expected per-batch memory:
292/// - narrow table: ~200 B/row (int-heavy, no text blobs)
293/// - wide table  : ~10 KB/row (many text/JSON/binary columns)
294///
295/// Use `wide_table_risk` to decide whether to recommend `adaptive_batch` or a
296/// lower `batch_size`.
297#[derive(Debug, Clone)]
298pub struct ResourceSummary {
299    #[allow(dead_code)]
300    pub profile: String,
301    pub batch_size: usize,
302    pub batch_size_memory_mb: Option<usize>,
303    pub memory_threshold_mb: usize,
304    pub throttle_ms: u64,
305    pub batch_narrow_mb: f64,
306    pub batch_wide_mb: f64,
307    pub wide_table_risk: bool,
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
315        TuningConfig {
316            profile: Some(profile),
317            ..Default::default()
318        }
319    }
320
321    #[test]
322    fn default_config_uses_balanced_profile() {
323        let t = SourceTuning::from_config(None);
324        assert_eq!(t.batch_size, 10_000);
325        assert_eq!(t.throttle_ms, 50);
326        assert_eq!(t.statement_timeout_s, 300);
327        assert_eq!(t.max_retries, 3);
328        assert_eq!(t.retry_backoff_ms, 2_000);
329        assert_eq!(t.lock_timeout_s, 30);
330    }
331
332    #[test]
333    fn fallback_profile_used_when_no_profile_in_config() {
334        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
335        assert_eq!(t.batch_size, 50_000);
336        assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
337        assert_eq!(t.profile_name(), "fast");
338
339        let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
340        assert_eq!(t.throttle_ms, 500);
341        assert_eq!(t.profile_name(), "safe");
342    }
343
344    #[test]
345    fn explicit_profile_wins_over_fallback() {
346        let cfg = cfg_with_profile(TuningProfile::Balanced);
347        let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
348        assert_eq!(
349            t.throttle_ms, 50,
350            "explicit balanced profile must keep its throttle"
351        );
352        assert_eq!(t.profile_name(), "balanced");
353    }
354
355    #[test]
356    fn fast_profile_favors_throughput() {
357        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
358        assert_eq!(t.batch_size, 50_000);
359        assert_eq!(t.throttle_ms, 0);
360        assert_eq!(t.statement_timeout_s, 0);
361        assert_eq!(t.max_retries, 1);
362    }
363
364    #[test]
365    fn safe_profile_limits_impact() {
366        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
367        assert_eq!(t.batch_size, 2_000);
368        assert_eq!(t.throttle_ms, 500);
369        assert_eq!(t.statement_timeout_s, 120);
370        assert_eq!(t.max_retries, 5);
371        assert_eq!(t.retry_backoff_ms, 5_000);
372        assert_eq!(t.lock_timeout_s, 10);
373    }
374
375    #[test]
376    fn explicit_fields_override_profile_defaults() {
377        let cfg = TuningConfig {
378            profile: Some(TuningProfile::Safe),
379            batch_size: Some(3_000),
380            throttle_ms: Some(250),
381            ..Default::default()
382        };
383        let t = SourceTuning::from_config(Some(&cfg));
384        assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
385        assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
386        assert_eq!(
387            t.statement_timeout_s, 120,
388            "non-overridden field stays at safe default"
389        );
390        assert_eq!(
391            t.max_retries, 5,
392            "non-overridden field stays at safe default"
393        );
394    }
395
396    #[test]
397    fn profile_name_fast() {
398        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
399        assert_eq!(t.profile_name(), "fast");
400    }
401
402    #[test]
403    fn profile_name_balanced() {
404        let t = SourceTuning::from_config(None);
405        assert_eq!(t.profile_name(), "balanced");
406    }
407
408    #[test]
409    fn profile_name_safe() {
410        let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
411        assert_eq!(t.profile_name(), "safe");
412    }
413
414    #[test]
415    fn display_contains_all_fields() {
416        let t = SourceTuning::from_config(None);
417        let s = t.to_string();
418        assert!(s.contains("profile=balanced"), "missing profile in: {s}");
419        assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
420        assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
421        assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
422        assert!(s.contains("retries=3"), "missing retries in: {s}");
423        assert!(
424            s.contains("lock_timeout=30s"),
425            "missing lock_timeout in: {s}"
426        );
427    }
428
429    #[test]
430    fn merge_tuning_export_overrides_source_fields() {
431        let source = TuningConfig {
432            profile: Some(TuningProfile::Fast),
433            batch_size: Some(1_000),
434            throttle_ms: Some(0),
435            ..Default::default()
436        };
437        let export = TuningConfig {
438            profile: Some(TuningProfile::Safe),
439            batch_size: None,
440            ..Default::default()
441        };
442        let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
443        assert_eq!(m.profile, Some(TuningProfile::Safe));
444        assert_eq!(
445            m.batch_size,
446            Some(1_000),
447            "export omitted batch_size -> keep source"
448        );
449        assert_eq!(m.throttle_ms, Some(0));
450    }
451
452    #[test]
453    fn merge_tuning_export_only() {
454        let e = cfg_with_profile(TuningProfile::Fast);
455        let m = merge_tuning_config(None, Some(&e)).expect("merged");
456        assert_eq!(m.profile, Some(TuningProfile::Fast));
457    }
458
459    #[test]
460    fn effective_batch_size_without_memory() {
461        let t = SourceTuning::from_config(None);
462        assert_eq!(t.effective_batch_size(None), 10_000);
463    }
464
465    #[test]
466    fn effective_batch_size_with_memory() {
467        use arrow::datatypes::{DataType, Field, Schema};
468        use std::sync::Arc;
469        let cfg = TuningConfig {
470            batch_size_memory_mb: Some(256),
471            ..Default::default()
472        };
473        let t = SourceTuning::from_config(Some(&cfg));
474        let schema = Arc::new(Schema::new(vec![
475            Field::new("id", DataType::Int64, false),
476            Field::new("name", DataType::Utf8, true),
477        ]));
478        let bs = t.effective_batch_size(Some(&schema));
479        assert!((1_000..=500_000).contains(&bs), "got {bs}");
480        // 256MB / 266B ≈ 1_009_022, clamped to 500_000
481        assert_eq!(bs, 500_000);
482    }
483
484    #[test]
485    fn resource_summary_balanced_profile() {
486        let t = SourceTuning::from_config(None);
487        let r = t.resource_summary();
488        assert_eq!(r.profile, "balanced");
489        assert_eq!(r.batch_size, 10_000);
490        assert!(r.batch_size_memory_mb.is_none());
491        assert_eq!(r.memory_threshold_mb, 4_096);
492        assert_eq!(r.throttle_ms, 50);
493        // narrow: 10_000 × 200 B = ~1.9 MB
494        assert!(
495            r.batch_narrow_mb < 5.0,
496            "narrow too high: {}",
497            r.batch_narrow_mb
498        );
499        // wide: 10_000 × 10 KB = ~95 MB — no risk (< 128 MB)
500        assert!(
501            !r.wide_table_risk,
502            "balanced 10k should not trigger wide_table_risk"
503        );
504    }
505
506    #[test]
507    fn resource_summary_fast_profile_triggers_wide_table_risk() {
508        let t = SourceTuning::from_config(Some(&TuningConfig {
509            profile: Some(TuningProfile::Fast),
510            ..Default::default()
511        }));
512        let r = t.resource_summary();
513        assert_eq!(r.batch_size, 50_000);
514        // wide: 50_000 × 10 KB = ~476 MB → high risk
515        assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
516    }
517
518    #[test]
519    fn resource_summary_with_adaptive_batch() {
520        let cfg = TuningConfig {
521            batch_size_memory_mb: Some(64),
522            ..Default::default()
523        };
524        let t = SourceTuning::from_config(Some(&cfg));
525        let r = t.resource_summary();
526        assert_eq!(r.batch_size_memory_mb, Some(64));
527    }
528}