1use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17 pub batch_size: usize,
18 pub batch_size_memory_mb: Option<usize>,
19 pub throttle_ms: u64,
20 pub statement_timeout_s: u64,
21 pub max_retries: u32,
22 pub retry_backoff_ms: u64,
23 pub lock_timeout_s: u64,
24 pub memory_threshold_mb: usize,
26 pub max_batch_memory_mb: Option<usize>,
28 pub on_batch_memory_exceeded: BatchMemoryPolicy,
29 pub adaptive: bool,
33 pub min_parallel: Option<usize>,
38 pub max_value_mb: Option<usize>,
44 configured_profile: TuningProfile,
45}
46
47#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
48#[serde(rename_all = "lowercase")]
49pub enum TuningProfile {
50 Fast,
51 Balanced,
52 Safe,
53}
54
55#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum BatchMemoryPolicy {
59 #[default]
61 Warn,
62 Fail,
64 AutoShrink,
67}
68
69#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
70#[serde(deny_unknown_fields)]
71pub struct TuningConfig {
72 pub profile: Option<TuningProfile>,
73 pub batch_size: Option<usize>,
74 pub batch_size_memory_mb: Option<usize>,
76 pub throttle_ms: Option<u64>,
77 pub statement_timeout_s: Option<u64>,
78 pub max_retries: Option<u32>,
79 pub retry_backoff_ms: Option<u64>,
80 pub lock_timeout_s: Option<u64>,
81 pub memory_threshold_mb: Option<usize>,
82 pub max_batch_memory_mb: Option<usize>,
85 pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
87 pub adaptive: Option<bool>,
91 pub min_parallel: Option<usize>,
94 pub max_value_mb: Option<usize>,
98}
99
100pub fn merge_tuning_config(
103 source: Option<&TuningConfig>,
104 export: Option<&TuningConfig>,
105) -> Option<TuningConfig> {
106 match (source, export) {
107 (None, None) => None,
108 (Some(s), None) => Some(s.clone()),
109 (None, Some(e)) => Some(e.clone()),
110 (Some(s), Some(e)) => Some(TuningConfig {
111 profile: e.profile.or(s.profile),
112 batch_size: e.batch_size.or(s.batch_size),
113 batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
114 throttle_ms: e.throttle_ms.or(s.throttle_ms),
115 statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
116 max_retries: e.max_retries.or(s.max_retries),
117 retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
118 lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
119 memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
120 max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
121 on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
122 adaptive: e.adaptive.or(s.adaptive),
123 min_parallel: e.min_parallel.or(s.min_parallel),
124 max_value_mb: e.max_value_mb.or(s.max_value_mb),
125 }),
126 }
127}
128
129impl SourceTuning {
130 pub(crate) fn max_value_bytes(&self) -> Option<usize> {
136 self.max_value_mb
137 .filter(|&mb| mb > 0)
138 .map(|mb| mb * 1024 * 1024)
139 }
140
141 #[allow(dead_code)]
146 pub fn from_config(config: Option<&TuningConfig>) -> Self {
147 Self::from_config_with_default_profile(config, TuningProfile::Balanced)
148 }
149
150 pub fn from_config_with_default_profile(
153 config: Option<&TuningConfig>,
154 fallback_profile: TuningProfile,
155 ) -> Self {
156 let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
157
158 let mut tuning = Self::from_profile(profile);
159 tuning.configured_profile = profile;
160
161 if let Some(cfg) = config {
162 if let Some(v) = cfg.batch_size {
163 tuning.batch_size = v;
164 }
165 tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
166 if let Some(v) = cfg.throttle_ms {
167 tuning.throttle_ms = v;
168 }
169 if let Some(v) = cfg.statement_timeout_s {
170 tuning.statement_timeout_s = v;
171 }
172 if let Some(v) = cfg.max_retries {
173 tuning.max_retries = v;
174 }
175 if let Some(v) = cfg.retry_backoff_ms {
176 tuning.retry_backoff_ms = v;
177 }
178 if let Some(v) = cfg.lock_timeout_s {
179 tuning.lock_timeout_s = v;
180 }
181 if let Some(v) = cfg.memory_threshold_mb {
182 tuning.memory_threshold_mb = v;
183 }
184 tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
185 if let Some(v) = cfg.on_batch_memory_exceeded {
186 tuning.on_batch_memory_exceeded = v;
187 }
188 if let Some(v) = cfg.adaptive {
189 tuning.adaptive = v;
190 }
191 if cfg.min_parallel.is_some() {
192 tuning.min_parallel = cfg.min_parallel;
193 }
194 if cfg.max_value_mb.is_some() {
195 tuning.max_value_mb = cfg.max_value_mb;
196 }
197 }
198
199 tuning
200 }
201
202 fn from_profile(profile: TuningProfile) -> Self {
203 match profile {
204 TuningProfile::Fast => Self {
205 batch_size: 50_000,
208 batch_size_memory_mb: Some(64),
209 throttle_ms: 0,
210 statement_timeout_s: 0,
211 max_retries: 1,
212 retry_backoff_ms: 1_000,
213 lock_timeout_s: 0,
214 memory_threshold_mb: 0,
215 max_batch_memory_mb: None,
216 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
217 adaptive: false,
218 min_parallel: None,
219 max_value_mb: Some(256),
220 configured_profile: TuningProfile::Fast,
221 },
222 TuningProfile::Balanced => Self {
223 batch_size: 10_000,
233 batch_size_memory_mb: Some(32),
234 throttle_ms: 50,
235 statement_timeout_s: 300,
236 max_retries: 3,
237 retry_backoff_ms: 2_000,
238 lock_timeout_s: 30,
239 memory_threshold_mb: 4_096,
240 max_batch_memory_mb: None,
241 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
242 adaptive: false,
243 min_parallel: None,
244 max_value_mb: Some(256),
245 configured_profile: TuningProfile::Balanced,
246 },
247 TuningProfile::Safe => Self {
248 batch_size: 2_000,
249 batch_size_memory_mb: None,
250 throttle_ms: 500,
251 statement_timeout_s: 120,
252 max_retries: 5,
253 retry_backoff_ms: 5_000,
254 lock_timeout_s: 10,
255 memory_threshold_mb: 2_048,
256 max_batch_memory_mb: None,
257 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
258 adaptive: false,
259 min_parallel: None,
260 max_value_mb: Some(256),
261 configured_profile: TuningProfile::Safe,
262 },
263 }
264 }
265
266 pub fn profile_name(&self) -> &'static str {
267 match self.configured_profile {
268 TuningProfile::Fast => "fast",
269 TuningProfile::Balanced => "balanced",
270 TuningProfile::Safe => "safe",
271 }
272 }
273
274 pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
277 if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
278 let computed = compute_batch_size_from_memory(mem_mb, schema);
279 log::info!(
280 "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
281 mem_mb,
282 estimate_row_bytes(schema),
283 computed
284 );
285 computed
286 } else {
287 self.batch_size
288 }
289 }
290
291 pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
297 batch
298 .columns()
299 .iter()
300 .map(|col| col.get_array_memory_size())
301 .sum()
302 }
303
304 pub fn resource_summary(&self) -> ResourceSummary {
311 const NARROW_BYTES: f64 = 200.0;
312 const WIDE_BYTES: f64 = 10_240.0;
313 let batch = self.batch_size as f64;
314 let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
315 let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
316 ResourceSummary {
317 profile: self.profile_name().to_string(),
318 batch_size: self.batch_size,
319 batch_size_memory_mb: self.batch_size_memory_mb,
320 memory_threshold_mb: self.memory_threshold_mb,
321 throttle_ms: self.throttle_ms,
322 batch_narrow_mb,
323 batch_wide_mb,
324 wide_table_risk: batch_wide_mb > 128.0,
325 }
326 }
327}
328
329impl std::fmt::Display for SourceTuning {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 write!(
332 f,
333 "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
334 self.profile_name(),
335 self.batch_size,
336 self.throttle_ms,
337 self.statement_timeout_s,
338 self.max_retries,
339 self.lock_timeout_s,
340 )
341 }
342}
343
344#[derive(Debug, Clone)]
353pub struct ResourceSummary {
354 #[allow(dead_code)]
355 pub profile: String,
356 pub batch_size: usize,
357 pub batch_size_memory_mb: Option<usize>,
358 pub memory_threshold_mb: usize,
359 pub throttle_ms: u64,
360 pub batch_narrow_mb: f64,
361 pub batch_wide_mb: f64,
362 pub wide_table_risk: bool,
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
370 TuningConfig {
371 profile: Some(profile),
372 ..Default::default()
373 }
374 }
375
376 #[test]
377 fn default_config_uses_balanced_profile() {
378 let t = SourceTuning::from_config(None);
379 assert_eq!(t.batch_size, 10_000);
380 assert_eq!(t.throttle_ms, 50);
381 assert_eq!(t.statement_timeout_s, 300);
382 assert_eq!(t.max_retries, 3);
383 assert_eq!(t.retry_backoff_ms, 2_000);
384 assert_eq!(t.lock_timeout_s, 30);
385 }
386
387 #[test]
388 fn fallback_profile_used_when_no_profile_in_config() {
389 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
390 assert_eq!(t.batch_size, 50_000);
391 assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
392 assert_eq!(t.profile_name(), "fast");
393
394 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
395 assert_eq!(t.throttle_ms, 500);
396 assert_eq!(t.profile_name(), "safe");
397 }
398
399 #[test]
400 fn explicit_profile_wins_over_fallback() {
401 let cfg = cfg_with_profile(TuningProfile::Balanced);
402 let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
403 assert_eq!(
404 t.throttle_ms, 50,
405 "explicit balanced profile must keep its throttle"
406 );
407 assert_eq!(t.profile_name(), "balanced");
408 }
409
410 #[test]
411 fn fast_profile_favors_throughput() {
412 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
413 assert_eq!(t.batch_size, 50_000);
414 assert_eq!(t.throttle_ms, 0);
415 assert_eq!(t.statement_timeout_s, 0);
416 assert_eq!(t.max_retries, 1);
417 }
418
419 #[test]
420 fn safe_profile_limits_impact() {
421 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
422 assert_eq!(t.batch_size, 2_000);
423 assert_eq!(t.throttle_ms, 500);
424 assert_eq!(t.statement_timeout_s, 120);
425 assert_eq!(t.max_retries, 5);
426 assert_eq!(t.retry_backoff_ms, 5_000);
427 assert_eq!(t.lock_timeout_s, 10);
428 }
429
430 #[test]
431 fn explicit_fields_override_profile_defaults() {
432 let cfg = TuningConfig {
433 profile: Some(TuningProfile::Safe),
434 batch_size: Some(3_000),
435 throttle_ms: Some(250),
436 ..Default::default()
437 };
438 let t = SourceTuning::from_config(Some(&cfg));
439 assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
440 assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
441 assert_eq!(
442 t.statement_timeout_s, 120,
443 "non-overridden field stays at safe default"
444 );
445 assert_eq!(
446 t.max_retries, 5,
447 "non-overridden field stays at safe default"
448 );
449 }
450
451 #[test]
452 fn profile_name_fast() {
453 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
454 assert_eq!(t.profile_name(), "fast");
455 }
456
457 #[test]
458 fn profile_name_balanced() {
459 let t = SourceTuning::from_config(None);
460 assert_eq!(t.profile_name(), "balanced");
461 }
462
463 #[test]
464 fn profile_name_safe() {
465 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
466 assert_eq!(t.profile_name(), "safe");
467 }
468
469 #[test]
470 fn display_contains_all_fields() {
471 let t = SourceTuning::from_config(None);
472 let s = t.to_string();
473 assert!(s.contains("profile=balanced"), "missing profile in: {s}");
474 assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
475 assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
476 assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
477 assert!(s.contains("retries=3"), "missing retries in: {s}");
478 assert!(
479 s.contains("lock_timeout=30s"),
480 "missing lock_timeout in: {s}"
481 );
482 }
483
484 #[test]
485 fn merge_tuning_export_overrides_source_fields() {
486 let source = TuningConfig {
487 profile: Some(TuningProfile::Fast),
488 batch_size: Some(1_000),
489 throttle_ms: Some(0),
490 ..Default::default()
491 };
492 let export = TuningConfig {
493 profile: Some(TuningProfile::Safe),
494 batch_size: None,
495 ..Default::default()
496 };
497 let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
498 assert_eq!(m.profile, Some(TuningProfile::Safe));
499 assert_eq!(
500 m.batch_size,
501 Some(1_000),
502 "export omitted batch_size -> keep source"
503 );
504 assert_eq!(m.throttle_ms, Some(0));
505 }
506
507 #[test]
508 fn merge_tuning_export_only() {
509 let e = cfg_with_profile(TuningProfile::Fast);
510 let m = merge_tuning_config(None, Some(&e)).expect("merged");
511 assert_eq!(m.profile, Some(TuningProfile::Fast));
512 }
513
514 #[test]
515 fn effective_batch_size_without_memory() {
516 let t = SourceTuning::from_config(None);
517 assert_eq!(t.effective_batch_size(None), 10_000);
518 }
519
520 #[test]
521 fn effective_batch_size_with_memory() {
522 use arrow::datatypes::{DataType, Field, Schema};
523 use std::sync::Arc;
524 let cfg = TuningConfig {
525 batch_size_memory_mb: Some(256),
526 ..Default::default()
527 };
528 let t = SourceTuning::from_config(Some(&cfg));
529 let schema = Arc::new(Schema::new(vec![
530 Field::new("id", DataType::Int64, false),
531 Field::new("name", DataType::Utf8, true),
532 ]));
533 let bs = t.effective_batch_size(Some(&schema));
534 assert!((1_000..=150_000).contains(&bs), "got {bs}");
535 assert_eq!(bs, 150_000);
537 }
538
539 #[test]
540 fn resource_summary_balanced_profile() {
541 let t = SourceTuning::from_config(None);
542 let r = t.resource_summary();
543 assert_eq!(r.profile, "balanced");
544 assert_eq!(r.batch_size, 10_000);
545 assert_eq!(r.batch_size_memory_mb, Some(32));
550 assert_eq!(r.memory_threshold_mb, 4_096);
551 assert_eq!(r.throttle_ms, 50);
552 assert!(
554 r.batch_narrow_mb < 5.0,
555 "narrow too high: {}",
556 r.batch_narrow_mb
557 );
558 assert!(
560 !r.wide_table_risk,
561 "balanced 10k should not trigger wide_table_risk"
562 );
563 }
564
565 #[test]
566 fn resource_summary_fast_profile_triggers_wide_table_risk() {
567 let t = SourceTuning::from_config(Some(&TuningConfig {
568 profile: Some(TuningProfile::Fast),
569 ..Default::default()
570 }));
571 let r = t.resource_summary();
572 assert_eq!(r.batch_size, 50_000);
573 assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
575 }
576
577 #[test]
578 fn resource_summary_with_adaptive_batch() {
579 let cfg = TuningConfig {
580 batch_size_memory_mb: Some(64),
581 ..Default::default()
582 };
583 let t = SourceTuning::from_config(Some(&cfg));
584 let r = t.resource_summary();
585 assert_eq!(r.batch_size_memory_mb, Some(64));
586 }
587}