1use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17 pub batch_size: usize,
18 pub batch_size_memory_mb: Option<usize>,
19 pub throttle_ms: u64,
20 pub statement_timeout_s: u64,
21 pub max_retries: u32,
22 pub retry_backoff_ms: u64,
23 pub lock_timeout_s: u64,
24 pub memory_threshold_mb: usize,
26 pub max_batch_memory_mb: Option<usize>,
28 pub on_batch_memory_exceeded: BatchMemoryPolicy,
29 pub adaptive: bool,
33 configured_profile: TuningProfile,
34}
35
36#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
37#[serde(rename_all = "lowercase")]
38pub enum TuningProfile {
39 Fast,
40 Balanced,
41 Safe,
42}
43
44#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
46#[serde(rename_all = "snake_case")]
47pub enum BatchMemoryPolicy {
48 #[default]
50 Warn,
51 Fail,
53 AutoShrink,
56}
57
58#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
59#[serde(deny_unknown_fields)]
60pub struct TuningConfig {
61 pub profile: Option<TuningProfile>,
62 pub batch_size: Option<usize>,
63 pub batch_size_memory_mb: Option<usize>,
65 pub throttle_ms: Option<u64>,
66 pub statement_timeout_s: Option<u64>,
67 pub max_retries: Option<u32>,
68 pub retry_backoff_ms: Option<u64>,
69 pub lock_timeout_s: Option<u64>,
70 pub memory_threshold_mb: Option<usize>,
71 pub max_batch_memory_mb: Option<usize>,
74 pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
76 pub adaptive: Option<bool>,
79}
80
81pub fn merge_tuning_config(
84 source: Option<&TuningConfig>,
85 export: Option<&TuningConfig>,
86) -> Option<TuningConfig> {
87 match (source, export) {
88 (None, None) => None,
89 (Some(s), None) => Some(s.clone()),
90 (None, Some(e)) => Some(e.clone()),
91 (Some(s), Some(e)) => Some(TuningConfig {
92 profile: e.profile.or(s.profile),
93 batch_size: e.batch_size.or(s.batch_size),
94 batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
95 throttle_ms: e.throttle_ms.or(s.throttle_ms),
96 statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
97 max_retries: e.max_retries.or(s.max_retries),
98 retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
99 lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
100 memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
101 max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
102 on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
103 adaptive: e.adaptive.or(s.adaptive),
104 }),
105 }
106}
107
108impl SourceTuning {
109 #[allow(dead_code)]
114 pub fn from_config(config: Option<&TuningConfig>) -> Self {
115 Self::from_config_with_default_profile(config, TuningProfile::Balanced)
116 }
117
118 pub fn from_config_with_default_profile(
121 config: Option<&TuningConfig>,
122 fallback_profile: TuningProfile,
123 ) -> Self {
124 let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
125
126 let mut tuning = Self::from_profile(profile);
127 tuning.configured_profile = profile;
128
129 if let Some(cfg) = config {
130 if let Some(v) = cfg.batch_size {
131 tuning.batch_size = v;
132 }
133 tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
134 if let Some(v) = cfg.throttle_ms {
135 tuning.throttle_ms = v;
136 }
137 if let Some(v) = cfg.statement_timeout_s {
138 tuning.statement_timeout_s = v;
139 }
140 if let Some(v) = cfg.max_retries {
141 tuning.max_retries = v;
142 }
143 if let Some(v) = cfg.retry_backoff_ms {
144 tuning.retry_backoff_ms = v;
145 }
146 if let Some(v) = cfg.lock_timeout_s {
147 tuning.lock_timeout_s = v;
148 }
149 if let Some(v) = cfg.memory_threshold_mb {
150 tuning.memory_threshold_mb = v;
151 }
152 tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
153 if let Some(v) = cfg.on_batch_memory_exceeded {
154 tuning.on_batch_memory_exceeded = v;
155 }
156 if let Some(v) = cfg.adaptive {
157 tuning.adaptive = v;
158 }
159 }
160
161 tuning
162 }
163
164 fn from_profile(profile: TuningProfile) -> Self {
165 match profile {
166 TuningProfile::Fast => Self {
167 batch_size: 50_000,
168 batch_size_memory_mb: None,
169 throttle_ms: 0,
170 statement_timeout_s: 0,
171 max_retries: 1,
172 retry_backoff_ms: 1_000,
173 lock_timeout_s: 0,
174 memory_threshold_mb: 0,
175 max_batch_memory_mb: None,
176 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
177 adaptive: false,
178 configured_profile: TuningProfile::Fast,
179 },
180 TuningProfile::Balanced => Self {
181 batch_size: 10_000,
182 batch_size_memory_mb: None,
183 throttle_ms: 50,
184 statement_timeout_s: 300,
185 max_retries: 3,
186 retry_backoff_ms: 2_000,
187 lock_timeout_s: 30,
188 memory_threshold_mb: 4_096,
189 max_batch_memory_mb: None,
190 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
191 adaptive: false,
192 configured_profile: TuningProfile::Balanced,
193 },
194 TuningProfile::Safe => Self {
195 batch_size: 2_000,
196 batch_size_memory_mb: None,
197 throttle_ms: 500,
198 statement_timeout_s: 120,
199 max_retries: 5,
200 retry_backoff_ms: 5_000,
201 lock_timeout_s: 10,
202 memory_threshold_mb: 2_048,
203 max_batch_memory_mb: None,
204 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
205 adaptive: false,
206 configured_profile: TuningProfile::Safe,
207 },
208 }
209 }
210
211 pub fn profile_name(&self) -> &'static str {
212 match self.configured_profile {
213 TuningProfile::Fast => "fast",
214 TuningProfile::Balanced => "balanced",
215 TuningProfile::Safe => "safe",
216 }
217 }
218
219 pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
222 if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
223 let computed = compute_batch_size_from_memory(mem_mb, schema);
224 log::info!(
225 "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
226 mem_mb,
227 estimate_row_bytes(schema),
228 computed
229 );
230 computed
231 } else {
232 self.batch_size
233 }
234 }
235
236 pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
242 batch
243 .columns()
244 .iter()
245 .map(|col| col.get_array_memory_size())
246 .sum()
247 }
248
249 pub fn resource_summary(&self) -> ResourceSummary {
256 const NARROW_BYTES: f64 = 200.0;
257 const WIDE_BYTES: f64 = 10_240.0;
258 let batch = self.batch_size as f64;
259 let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
260 let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
261 ResourceSummary {
262 profile: self.profile_name().to_string(),
263 batch_size: self.batch_size,
264 batch_size_memory_mb: self.batch_size_memory_mb,
265 memory_threshold_mb: self.memory_threshold_mb,
266 throttle_ms: self.throttle_ms,
267 batch_narrow_mb,
268 batch_wide_mb,
269 wide_table_risk: batch_wide_mb > 128.0,
270 }
271 }
272}
273
274impl std::fmt::Display for SourceTuning {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 write!(
277 f,
278 "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
279 self.profile_name(),
280 self.batch_size,
281 self.throttle_ms,
282 self.statement_timeout_s,
283 self.max_retries,
284 self.lock_timeout_s,
285 )
286 }
287}
288
289#[derive(Debug, Clone)]
298pub struct ResourceSummary {
299 #[allow(dead_code)]
300 pub profile: String,
301 pub batch_size: usize,
302 pub batch_size_memory_mb: Option<usize>,
303 pub memory_threshold_mb: usize,
304 pub throttle_ms: u64,
305 pub batch_narrow_mb: f64,
306 pub batch_wide_mb: f64,
307 pub wide_table_risk: bool,
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
315 TuningConfig {
316 profile: Some(profile),
317 ..Default::default()
318 }
319 }
320
321 #[test]
322 fn default_config_uses_balanced_profile() {
323 let t = SourceTuning::from_config(None);
324 assert_eq!(t.batch_size, 10_000);
325 assert_eq!(t.throttle_ms, 50);
326 assert_eq!(t.statement_timeout_s, 300);
327 assert_eq!(t.max_retries, 3);
328 assert_eq!(t.retry_backoff_ms, 2_000);
329 assert_eq!(t.lock_timeout_s, 30);
330 }
331
332 #[test]
333 fn fallback_profile_used_when_no_profile_in_config() {
334 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
335 assert_eq!(t.batch_size, 50_000);
336 assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
337 assert_eq!(t.profile_name(), "fast");
338
339 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
340 assert_eq!(t.throttle_ms, 500);
341 assert_eq!(t.profile_name(), "safe");
342 }
343
344 #[test]
345 fn explicit_profile_wins_over_fallback() {
346 let cfg = cfg_with_profile(TuningProfile::Balanced);
347 let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
348 assert_eq!(
349 t.throttle_ms, 50,
350 "explicit balanced profile must keep its throttle"
351 );
352 assert_eq!(t.profile_name(), "balanced");
353 }
354
355 #[test]
356 fn fast_profile_favors_throughput() {
357 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
358 assert_eq!(t.batch_size, 50_000);
359 assert_eq!(t.throttle_ms, 0);
360 assert_eq!(t.statement_timeout_s, 0);
361 assert_eq!(t.max_retries, 1);
362 }
363
364 #[test]
365 fn safe_profile_limits_impact() {
366 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
367 assert_eq!(t.batch_size, 2_000);
368 assert_eq!(t.throttle_ms, 500);
369 assert_eq!(t.statement_timeout_s, 120);
370 assert_eq!(t.max_retries, 5);
371 assert_eq!(t.retry_backoff_ms, 5_000);
372 assert_eq!(t.lock_timeout_s, 10);
373 }
374
375 #[test]
376 fn explicit_fields_override_profile_defaults() {
377 let cfg = TuningConfig {
378 profile: Some(TuningProfile::Safe),
379 batch_size: Some(3_000),
380 throttle_ms: Some(250),
381 ..Default::default()
382 };
383 let t = SourceTuning::from_config(Some(&cfg));
384 assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
385 assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
386 assert_eq!(
387 t.statement_timeout_s, 120,
388 "non-overridden field stays at safe default"
389 );
390 assert_eq!(
391 t.max_retries, 5,
392 "non-overridden field stays at safe default"
393 );
394 }
395
396 #[test]
397 fn profile_name_fast() {
398 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
399 assert_eq!(t.profile_name(), "fast");
400 }
401
402 #[test]
403 fn profile_name_balanced() {
404 let t = SourceTuning::from_config(None);
405 assert_eq!(t.profile_name(), "balanced");
406 }
407
408 #[test]
409 fn profile_name_safe() {
410 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
411 assert_eq!(t.profile_name(), "safe");
412 }
413
414 #[test]
415 fn display_contains_all_fields() {
416 let t = SourceTuning::from_config(None);
417 let s = t.to_string();
418 assert!(s.contains("profile=balanced"), "missing profile in: {s}");
419 assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
420 assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
421 assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
422 assert!(s.contains("retries=3"), "missing retries in: {s}");
423 assert!(
424 s.contains("lock_timeout=30s"),
425 "missing lock_timeout in: {s}"
426 );
427 }
428
429 #[test]
430 fn merge_tuning_export_overrides_source_fields() {
431 let source = TuningConfig {
432 profile: Some(TuningProfile::Fast),
433 batch_size: Some(1_000),
434 throttle_ms: Some(0),
435 ..Default::default()
436 };
437 let export = TuningConfig {
438 profile: Some(TuningProfile::Safe),
439 batch_size: None,
440 ..Default::default()
441 };
442 let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
443 assert_eq!(m.profile, Some(TuningProfile::Safe));
444 assert_eq!(
445 m.batch_size,
446 Some(1_000),
447 "export omitted batch_size -> keep source"
448 );
449 assert_eq!(m.throttle_ms, Some(0));
450 }
451
452 #[test]
453 fn merge_tuning_export_only() {
454 let e = cfg_with_profile(TuningProfile::Fast);
455 let m = merge_tuning_config(None, Some(&e)).expect("merged");
456 assert_eq!(m.profile, Some(TuningProfile::Fast));
457 }
458
459 #[test]
460 fn effective_batch_size_without_memory() {
461 let t = SourceTuning::from_config(None);
462 assert_eq!(t.effective_batch_size(None), 10_000);
463 }
464
465 #[test]
466 fn effective_batch_size_with_memory() {
467 use arrow::datatypes::{DataType, Field, Schema};
468 use std::sync::Arc;
469 let cfg = TuningConfig {
470 batch_size_memory_mb: Some(256),
471 ..Default::default()
472 };
473 let t = SourceTuning::from_config(Some(&cfg));
474 let schema = Arc::new(Schema::new(vec![
475 Field::new("id", DataType::Int64, false),
476 Field::new("name", DataType::Utf8, true),
477 ]));
478 let bs = t.effective_batch_size(Some(&schema));
479 assert!((1_000..=500_000).contains(&bs), "got {bs}");
480 assert_eq!(bs, 500_000);
482 }
483
484 #[test]
485 fn resource_summary_balanced_profile() {
486 let t = SourceTuning::from_config(None);
487 let r = t.resource_summary();
488 assert_eq!(r.profile, "balanced");
489 assert_eq!(r.batch_size, 10_000);
490 assert!(r.batch_size_memory_mb.is_none());
491 assert_eq!(r.memory_threshold_mb, 4_096);
492 assert_eq!(r.throttle_ms, 50);
493 assert!(
495 r.batch_narrow_mb < 5.0,
496 "narrow too high: {}",
497 r.batch_narrow_mb
498 );
499 assert!(
501 !r.wide_table_risk,
502 "balanced 10k should not trigger wide_table_risk"
503 );
504 }
505
506 #[test]
507 fn resource_summary_fast_profile_triggers_wide_table_risk() {
508 let t = SourceTuning::from_config(Some(&TuningConfig {
509 profile: Some(TuningProfile::Fast),
510 ..Default::default()
511 }));
512 let r = t.resource_summary();
513 assert_eq!(r.batch_size, 50_000);
514 assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
516 }
517
518 #[test]
519 fn resource_summary_with_adaptive_batch() {
520 let cfg = TuningConfig {
521 batch_size_memory_mb: Some(64),
522 ..Default::default()
523 };
524 let t = SourceTuning::from_config(Some(&cfg));
525 let r = t.resource_summary();
526 assert_eq!(r.batch_size_memory_mb, Some(64));
527 }
528}