1use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17 pub batch_size: usize,
18 pub batch_size_memory_mb: Option<usize>,
19 pub throttle_ms: u64,
20 pub statement_timeout_s: u64,
21 pub max_retries: u32,
22 pub retry_backoff_ms: u64,
23 pub lock_timeout_s: u64,
24 pub memory_threshold_mb: usize,
26 pub max_batch_memory_mb: Option<usize>,
28 pub on_batch_memory_exceeded: BatchMemoryPolicy,
29 pub adaptive: bool,
33 pub min_parallel: Option<usize>,
38 pub max_value_mb: Option<usize>,
44 configured_profile: TuningProfile,
45}
46
47#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
48#[serde(rename_all = "lowercase")]
49pub enum TuningProfile {
50 Fast,
51 Balanced,
52 Safe,
53}
54
55#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum BatchMemoryPolicy {
59 #[default]
61 Warn,
62 Fail,
64 AutoShrink,
67}
68
69#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
70#[serde(deny_unknown_fields)]
71pub struct TuningConfig {
72 pub profile: Option<TuningProfile>,
73 pub batch_size: Option<usize>,
74 pub batch_size_memory_mb: Option<usize>,
76 pub throttle_ms: Option<u64>,
77 pub statement_timeout_s: Option<u64>,
78 pub max_retries: Option<u32>,
79 pub retry_backoff_ms: Option<u64>,
80 pub lock_timeout_s: Option<u64>,
81 pub memory_threshold_mb: Option<usize>,
82 pub max_batch_memory_mb: Option<usize>,
85 pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
87 pub adaptive: Option<bool>,
91 pub min_parallel: Option<usize>,
94 pub max_value_mb: Option<usize>,
98}
99
100pub fn merge_tuning_config(
103 source: Option<&TuningConfig>,
104 export: Option<&TuningConfig>,
105) -> Option<TuningConfig> {
106 match (source, export) {
107 (None, None) => None,
108 (Some(s), None) => Some(s.clone()),
109 (None, Some(e)) => Some(e.clone()),
110 (Some(s), Some(e)) => Some(TuningConfig {
111 profile: e.profile.or(s.profile),
112 batch_size: e.batch_size.or(s.batch_size),
113 batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
114 throttle_ms: e.throttle_ms.or(s.throttle_ms),
115 statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
116 max_retries: e.max_retries.or(s.max_retries),
117 retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
118 lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
119 memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
120 max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
121 on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
122 adaptive: e.adaptive.or(s.adaptive),
123 min_parallel: e.min_parallel.or(s.min_parallel),
124 max_value_mb: e.max_value_mb.or(s.max_value_mb),
125 }),
126 }
127}
128
129impl SourceTuning {
130 pub(crate) fn max_value_bytes(&self) -> Option<usize> {
136 self.max_value_mb
137 .filter(|&mb| mb > 0)
138 .map(|mb| mb * 1024 * 1024)
139 }
140
141 #[allow(dead_code)]
146 pub fn from_config(config: Option<&TuningConfig>) -> Self {
147 Self::from_config_with_default_profile(config, TuningProfile::Balanced)
148 }
149
150 pub fn from_config_with_default_profile(
153 config: Option<&TuningConfig>,
154 fallback_profile: TuningProfile,
155 ) -> Self {
156 let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
157
158 let mut tuning = Self::from_profile(profile);
159 tuning.configured_profile = profile;
160
161 if let Some(cfg) = config {
162 if let Some(v) = cfg.batch_size {
163 tuning.batch_size = v;
164 }
165 tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
166 if let Some(v) = cfg.throttle_ms {
167 tuning.throttle_ms = v;
168 }
169 if let Some(v) = cfg.statement_timeout_s {
170 tuning.statement_timeout_s = v;
171 }
172 if let Some(v) = cfg.max_retries {
173 tuning.max_retries = v;
174 }
175 if let Some(v) = cfg.retry_backoff_ms {
176 tuning.retry_backoff_ms = v;
177 }
178 if let Some(v) = cfg.lock_timeout_s {
179 tuning.lock_timeout_s = v;
180 }
181 if let Some(v) = cfg.memory_threshold_mb {
182 tuning.memory_threshold_mb = v;
183 }
184 tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
185 if let Some(v) = cfg.on_batch_memory_exceeded {
186 tuning.on_batch_memory_exceeded = v;
187 }
188 if let Some(v) = cfg.adaptive {
189 tuning.adaptive = v;
190 }
191 if cfg.min_parallel.is_some() {
192 tuning.min_parallel = cfg.min_parallel;
193 }
194 if cfg.max_value_mb.is_some() {
195 tuning.max_value_mb = cfg.max_value_mb;
196 }
197 }
198
199 tuning
200 }
201
202 fn from_profile(profile: TuningProfile) -> Self {
203 match profile {
204 TuningProfile::Fast => Self {
205 batch_size: 50_000,
206 batch_size_memory_mb: None,
207 throttle_ms: 0,
208 statement_timeout_s: 0,
209 max_retries: 1,
210 retry_backoff_ms: 1_000,
211 lock_timeout_s: 0,
212 memory_threshold_mb: 0,
213 max_batch_memory_mb: None,
214 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
215 adaptive: false,
216 min_parallel: None,
217 max_value_mb: Some(256),
218 configured_profile: TuningProfile::Fast,
219 },
220 TuningProfile::Balanced => Self {
221 batch_size: 10_000,
222 batch_size_memory_mb: None,
223 throttle_ms: 50,
224 statement_timeout_s: 300,
225 max_retries: 3,
226 retry_backoff_ms: 2_000,
227 lock_timeout_s: 30,
228 memory_threshold_mb: 4_096,
229 max_batch_memory_mb: None,
230 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
231 adaptive: false,
232 min_parallel: None,
233 max_value_mb: Some(256),
234 configured_profile: TuningProfile::Balanced,
235 },
236 TuningProfile::Safe => Self {
237 batch_size: 2_000,
238 batch_size_memory_mb: None,
239 throttle_ms: 500,
240 statement_timeout_s: 120,
241 max_retries: 5,
242 retry_backoff_ms: 5_000,
243 lock_timeout_s: 10,
244 memory_threshold_mb: 2_048,
245 max_batch_memory_mb: None,
246 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
247 adaptive: false,
248 min_parallel: None,
249 max_value_mb: Some(256),
250 configured_profile: TuningProfile::Safe,
251 },
252 }
253 }
254
255 pub fn profile_name(&self) -> &'static str {
256 match self.configured_profile {
257 TuningProfile::Fast => "fast",
258 TuningProfile::Balanced => "balanced",
259 TuningProfile::Safe => "safe",
260 }
261 }
262
263 pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
266 if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
267 let computed = compute_batch_size_from_memory(mem_mb, schema);
268 log::info!(
269 "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
270 mem_mb,
271 estimate_row_bytes(schema),
272 computed
273 );
274 computed
275 } else {
276 self.batch_size
277 }
278 }
279
280 pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
286 batch
287 .columns()
288 .iter()
289 .map(|col| col.get_array_memory_size())
290 .sum()
291 }
292
293 pub fn resource_summary(&self) -> ResourceSummary {
300 const NARROW_BYTES: f64 = 200.0;
301 const WIDE_BYTES: f64 = 10_240.0;
302 let batch = self.batch_size as f64;
303 let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
304 let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
305 ResourceSummary {
306 profile: self.profile_name().to_string(),
307 batch_size: self.batch_size,
308 batch_size_memory_mb: self.batch_size_memory_mb,
309 memory_threshold_mb: self.memory_threshold_mb,
310 throttle_ms: self.throttle_ms,
311 batch_narrow_mb,
312 batch_wide_mb,
313 wide_table_risk: batch_wide_mb > 128.0,
314 }
315 }
316}
317
318impl std::fmt::Display for SourceTuning {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 write!(
321 f,
322 "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
323 self.profile_name(),
324 self.batch_size,
325 self.throttle_ms,
326 self.statement_timeout_s,
327 self.max_retries,
328 self.lock_timeout_s,
329 )
330 }
331}
332
333#[derive(Debug, Clone)]
342pub struct ResourceSummary {
343 #[allow(dead_code)]
344 pub profile: String,
345 pub batch_size: usize,
346 pub batch_size_memory_mb: Option<usize>,
347 pub memory_threshold_mb: usize,
348 pub throttle_ms: u64,
349 pub batch_narrow_mb: f64,
350 pub batch_wide_mb: f64,
351 pub wide_table_risk: bool,
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
359 TuningConfig {
360 profile: Some(profile),
361 ..Default::default()
362 }
363 }
364
365 #[test]
366 fn default_config_uses_balanced_profile() {
367 let t = SourceTuning::from_config(None);
368 assert_eq!(t.batch_size, 10_000);
369 assert_eq!(t.throttle_ms, 50);
370 assert_eq!(t.statement_timeout_s, 300);
371 assert_eq!(t.max_retries, 3);
372 assert_eq!(t.retry_backoff_ms, 2_000);
373 assert_eq!(t.lock_timeout_s, 30);
374 }
375
376 #[test]
377 fn fallback_profile_used_when_no_profile_in_config() {
378 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
379 assert_eq!(t.batch_size, 50_000);
380 assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
381 assert_eq!(t.profile_name(), "fast");
382
383 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
384 assert_eq!(t.throttle_ms, 500);
385 assert_eq!(t.profile_name(), "safe");
386 }
387
388 #[test]
389 fn explicit_profile_wins_over_fallback() {
390 let cfg = cfg_with_profile(TuningProfile::Balanced);
391 let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
392 assert_eq!(
393 t.throttle_ms, 50,
394 "explicit balanced profile must keep its throttle"
395 );
396 assert_eq!(t.profile_name(), "balanced");
397 }
398
399 #[test]
400 fn fast_profile_favors_throughput() {
401 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
402 assert_eq!(t.batch_size, 50_000);
403 assert_eq!(t.throttle_ms, 0);
404 assert_eq!(t.statement_timeout_s, 0);
405 assert_eq!(t.max_retries, 1);
406 }
407
408 #[test]
409 fn safe_profile_limits_impact() {
410 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
411 assert_eq!(t.batch_size, 2_000);
412 assert_eq!(t.throttle_ms, 500);
413 assert_eq!(t.statement_timeout_s, 120);
414 assert_eq!(t.max_retries, 5);
415 assert_eq!(t.retry_backoff_ms, 5_000);
416 assert_eq!(t.lock_timeout_s, 10);
417 }
418
419 #[test]
420 fn explicit_fields_override_profile_defaults() {
421 let cfg = TuningConfig {
422 profile: Some(TuningProfile::Safe),
423 batch_size: Some(3_000),
424 throttle_ms: Some(250),
425 ..Default::default()
426 };
427 let t = SourceTuning::from_config(Some(&cfg));
428 assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
429 assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
430 assert_eq!(
431 t.statement_timeout_s, 120,
432 "non-overridden field stays at safe default"
433 );
434 assert_eq!(
435 t.max_retries, 5,
436 "non-overridden field stays at safe default"
437 );
438 }
439
440 #[test]
441 fn profile_name_fast() {
442 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
443 assert_eq!(t.profile_name(), "fast");
444 }
445
446 #[test]
447 fn profile_name_balanced() {
448 let t = SourceTuning::from_config(None);
449 assert_eq!(t.profile_name(), "balanced");
450 }
451
452 #[test]
453 fn profile_name_safe() {
454 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
455 assert_eq!(t.profile_name(), "safe");
456 }
457
458 #[test]
459 fn display_contains_all_fields() {
460 let t = SourceTuning::from_config(None);
461 let s = t.to_string();
462 assert!(s.contains("profile=balanced"), "missing profile in: {s}");
463 assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
464 assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
465 assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
466 assert!(s.contains("retries=3"), "missing retries in: {s}");
467 assert!(
468 s.contains("lock_timeout=30s"),
469 "missing lock_timeout in: {s}"
470 );
471 }
472
473 #[test]
474 fn merge_tuning_export_overrides_source_fields() {
475 let source = TuningConfig {
476 profile: Some(TuningProfile::Fast),
477 batch_size: Some(1_000),
478 throttle_ms: Some(0),
479 ..Default::default()
480 };
481 let export = TuningConfig {
482 profile: Some(TuningProfile::Safe),
483 batch_size: None,
484 ..Default::default()
485 };
486 let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
487 assert_eq!(m.profile, Some(TuningProfile::Safe));
488 assert_eq!(
489 m.batch_size,
490 Some(1_000),
491 "export omitted batch_size -> keep source"
492 );
493 assert_eq!(m.throttle_ms, Some(0));
494 }
495
496 #[test]
497 fn merge_tuning_export_only() {
498 let e = cfg_with_profile(TuningProfile::Fast);
499 let m = merge_tuning_config(None, Some(&e)).expect("merged");
500 assert_eq!(m.profile, Some(TuningProfile::Fast));
501 }
502
503 #[test]
504 fn effective_batch_size_without_memory() {
505 let t = SourceTuning::from_config(None);
506 assert_eq!(t.effective_batch_size(None), 10_000);
507 }
508
509 #[test]
510 fn effective_batch_size_with_memory() {
511 use arrow::datatypes::{DataType, Field, Schema};
512 use std::sync::Arc;
513 let cfg = TuningConfig {
514 batch_size_memory_mb: Some(256),
515 ..Default::default()
516 };
517 let t = SourceTuning::from_config(Some(&cfg));
518 let schema = Arc::new(Schema::new(vec![
519 Field::new("id", DataType::Int64, false),
520 Field::new("name", DataType::Utf8, true),
521 ]));
522 let bs = t.effective_batch_size(Some(&schema));
523 assert!((1_000..=500_000).contains(&bs), "got {bs}");
524 assert_eq!(bs, 500_000);
526 }
527
528 #[test]
529 fn resource_summary_balanced_profile() {
530 let t = SourceTuning::from_config(None);
531 let r = t.resource_summary();
532 assert_eq!(r.profile, "balanced");
533 assert_eq!(r.batch_size, 10_000);
534 assert!(r.batch_size_memory_mb.is_none());
535 assert_eq!(r.memory_threshold_mb, 4_096);
536 assert_eq!(r.throttle_ms, 50);
537 assert!(
539 r.batch_narrow_mb < 5.0,
540 "narrow too high: {}",
541 r.batch_narrow_mb
542 );
543 assert!(
545 !r.wide_table_risk,
546 "balanced 10k should not trigger wide_table_risk"
547 );
548 }
549
550 #[test]
551 fn resource_summary_fast_profile_triggers_wide_table_risk() {
552 let t = SourceTuning::from_config(Some(&TuningConfig {
553 profile: Some(TuningProfile::Fast),
554 ..Default::default()
555 }));
556 let r = t.resource_summary();
557 assert_eq!(r.batch_size, 50_000);
558 assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
560 }
561
562 #[test]
563 fn resource_summary_with_adaptive_batch() {
564 let cfg = TuningConfig {
565 batch_size_memory_mb: Some(64),
566 ..Default::default()
567 };
568 let t = SourceTuning::from_config(Some(&cfg));
569 let r = t.resource_summary();
570 assert_eq!(r.batch_size_memory_mb, Some(64));
571 }
572}