1use arrow::datatypes::SchemaRef;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12
13use super::memory::{compute_batch_size_from_memory, estimate_row_bytes};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SourceTuning {
17 pub batch_size: usize,
18 pub batch_size_memory_mb: Option<usize>,
19 pub throttle_ms: u64,
20 pub statement_timeout_s: u64,
21 pub max_retries: u32,
22 pub retry_backoff_ms: u64,
23 pub lock_timeout_s: u64,
24 pub memory_threshold_mb: usize,
26 pub max_batch_memory_mb: Option<usize>,
28 pub on_batch_memory_exceeded: BatchMemoryPolicy,
29 pub adaptive: bool,
33 pub min_parallel: Option<usize>,
38 pub max_value_mb: Option<usize>,
44 configured_profile: TuningProfile,
45}
46
47#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
48#[serde(rename_all = "lowercase")]
49pub enum TuningProfile {
50 Fast,
51 Balanced,
52 Safe,
53}
54
55#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
57#[serde(rename_all = "snake_case")]
58pub enum BatchMemoryPolicy {
59 #[default]
61 Warn,
62 Fail,
64 AutoShrink,
67}
68
69#[derive(Debug, Deserialize, Serialize, JsonSchema, Default, Clone)]
70#[serde(deny_unknown_fields)]
71pub struct TuningConfig {
72 pub profile: Option<TuningProfile>,
73 pub batch_size: Option<usize>,
74 pub batch_size_memory_mb: Option<usize>,
76 pub throttle_ms: Option<u64>,
77 pub statement_timeout_s: Option<u64>,
78 pub max_retries: Option<u32>,
79 pub retry_backoff_ms: Option<u64>,
80 pub lock_timeout_s: Option<u64>,
81 pub memory_threshold_mb: Option<usize>,
82 pub max_batch_memory_mb: Option<usize>,
85 pub on_batch_memory_exceeded: Option<BatchMemoryPolicy>,
87 pub adaptive: Option<bool>,
91 pub min_parallel: Option<usize>,
94 pub max_value_mb: Option<usize>,
98}
99
100pub fn merge_tuning_config(
103 source: Option<&TuningConfig>,
104 export: Option<&TuningConfig>,
105) -> Option<TuningConfig> {
106 match (source, export) {
107 (None, None) => None,
108 (Some(s), None) => Some(s.clone()),
109 (None, Some(e)) => Some(e.clone()),
110 (Some(s), Some(e)) => Some(TuningConfig {
111 profile: e.profile.or(s.profile),
112 batch_size: e.batch_size.or(s.batch_size),
113 batch_size_memory_mb: e.batch_size_memory_mb.or(s.batch_size_memory_mb),
114 throttle_ms: e.throttle_ms.or(s.throttle_ms),
115 statement_timeout_s: e.statement_timeout_s.or(s.statement_timeout_s),
116 max_retries: e.max_retries.or(s.max_retries),
117 retry_backoff_ms: e.retry_backoff_ms.or(s.retry_backoff_ms),
118 lock_timeout_s: e.lock_timeout_s.or(s.lock_timeout_s),
119 memory_threshold_mb: e.memory_threshold_mb.or(s.memory_threshold_mb),
120 max_batch_memory_mb: e.max_batch_memory_mb.or(s.max_batch_memory_mb),
121 on_batch_memory_exceeded: e.on_batch_memory_exceeded.or(s.on_batch_memory_exceeded),
122 adaptive: e.adaptive.or(s.adaptive),
123 min_parallel: e.min_parallel.or(s.min_parallel),
124 max_value_mb: e.max_value_mb.or(s.max_value_mb),
125 }),
126 }
127}
128
129impl SourceTuning {
130 #[allow(dead_code)]
135 pub fn from_config(config: Option<&TuningConfig>) -> Self {
136 Self::from_config_with_default_profile(config, TuningProfile::Balanced)
137 }
138
139 pub fn from_config_with_default_profile(
142 config: Option<&TuningConfig>,
143 fallback_profile: TuningProfile,
144 ) -> Self {
145 let profile = config.and_then(|c| c.profile).unwrap_or(fallback_profile);
146
147 let mut tuning = Self::from_profile(profile);
148 tuning.configured_profile = profile;
149
150 if let Some(cfg) = config {
151 if let Some(v) = cfg.batch_size {
152 tuning.batch_size = v;
153 }
154 tuning.batch_size_memory_mb = cfg.batch_size_memory_mb;
155 if let Some(v) = cfg.throttle_ms {
156 tuning.throttle_ms = v;
157 }
158 if let Some(v) = cfg.statement_timeout_s {
159 tuning.statement_timeout_s = v;
160 }
161 if let Some(v) = cfg.max_retries {
162 tuning.max_retries = v;
163 }
164 if let Some(v) = cfg.retry_backoff_ms {
165 tuning.retry_backoff_ms = v;
166 }
167 if let Some(v) = cfg.lock_timeout_s {
168 tuning.lock_timeout_s = v;
169 }
170 if let Some(v) = cfg.memory_threshold_mb {
171 tuning.memory_threshold_mb = v;
172 }
173 tuning.max_batch_memory_mb = cfg.max_batch_memory_mb;
174 if let Some(v) = cfg.on_batch_memory_exceeded {
175 tuning.on_batch_memory_exceeded = v;
176 }
177 if let Some(v) = cfg.adaptive {
178 tuning.adaptive = v;
179 }
180 if cfg.min_parallel.is_some() {
181 tuning.min_parallel = cfg.min_parallel;
182 }
183 if cfg.max_value_mb.is_some() {
184 tuning.max_value_mb = cfg.max_value_mb;
185 }
186 }
187
188 tuning
189 }
190
191 fn from_profile(profile: TuningProfile) -> Self {
192 match profile {
193 TuningProfile::Fast => Self {
194 batch_size: 50_000,
195 batch_size_memory_mb: None,
196 throttle_ms: 0,
197 statement_timeout_s: 0,
198 max_retries: 1,
199 retry_backoff_ms: 1_000,
200 lock_timeout_s: 0,
201 memory_threshold_mb: 0,
202 max_batch_memory_mb: None,
203 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
204 adaptive: false,
205 min_parallel: None,
206 max_value_mb: Some(256),
207 configured_profile: TuningProfile::Fast,
208 },
209 TuningProfile::Balanced => Self {
210 batch_size: 10_000,
211 batch_size_memory_mb: None,
212 throttle_ms: 50,
213 statement_timeout_s: 300,
214 max_retries: 3,
215 retry_backoff_ms: 2_000,
216 lock_timeout_s: 30,
217 memory_threshold_mb: 4_096,
218 max_batch_memory_mb: None,
219 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
220 adaptive: false,
221 min_parallel: None,
222 max_value_mb: Some(256),
223 configured_profile: TuningProfile::Balanced,
224 },
225 TuningProfile::Safe => Self {
226 batch_size: 2_000,
227 batch_size_memory_mb: None,
228 throttle_ms: 500,
229 statement_timeout_s: 120,
230 max_retries: 5,
231 retry_backoff_ms: 5_000,
232 lock_timeout_s: 10,
233 memory_threshold_mb: 2_048,
234 max_batch_memory_mb: None,
235 on_batch_memory_exceeded: BatchMemoryPolicy::Warn,
236 adaptive: false,
237 min_parallel: None,
238 max_value_mb: Some(256),
239 configured_profile: TuningProfile::Safe,
240 },
241 }
242 }
243
244 pub fn profile_name(&self) -> &'static str {
245 match self.configured_profile {
246 TuningProfile::Fast => "fast",
247 TuningProfile::Balanced => "balanced",
248 TuningProfile::Safe => "safe",
249 }
250 }
251
252 pub fn effective_batch_size(&self, schema: Option<&SchemaRef>) -> usize {
255 if let (Some(mem_mb), Some(schema)) = (self.batch_size_memory_mb, schema) {
256 let computed = compute_batch_size_from_memory(mem_mb, schema);
257 log::info!(
258 "batch_size_memory_mb={}: estimated row ~{}B, computed batch_size={}",
259 mem_mb,
260 estimate_row_bytes(schema),
261 computed
262 );
263 computed
264 } else {
265 self.batch_size
266 }
267 }
268
269 pub fn batch_memory_bytes(batch: &arrow::record_batch::RecordBatch) -> usize {
275 batch
276 .columns()
277 .iter()
278 .map(|col| col.get_array_memory_size())
279 .sum()
280 }
281
282 pub fn resource_summary(&self) -> ResourceSummary {
289 const NARROW_BYTES: f64 = 200.0;
290 const WIDE_BYTES: f64 = 10_240.0;
291 let batch = self.batch_size as f64;
292 let batch_narrow_mb = batch * NARROW_BYTES / (1024.0 * 1024.0);
293 let batch_wide_mb = batch * WIDE_BYTES / (1024.0 * 1024.0);
294 ResourceSummary {
295 profile: self.profile_name().to_string(),
296 batch_size: self.batch_size,
297 batch_size_memory_mb: self.batch_size_memory_mb,
298 memory_threshold_mb: self.memory_threshold_mb,
299 throttle_ms: self.throttle_ms,
300 batch_narrow_mb,
301 batch_wide_mb,
302 wide_table_risk: batch_wide_mb > 128.0,
303 }
304 }
305}
306
307impl std::fmt::Display for SourceTuning {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 write!(
310 f,
311 "profile={}, batch_size={}, throttle={}ms, timeout={}s, retries={}, lock_timeout={}s",
312 self.profile_name(),
313 self.batch_size,
314 self.throttle_ms,
315 self.statement_timeout_s,
316 self.max_retries,
317 self.lock_timeout_s,
318 )
319 }
320}
321
322#[derive(Debug, Clone)]
331pub struct ResourceSummary {
332 #[allow(dead_code)]
333 pub profile: String,
334 pub batch_size: usize,
335 pub batch_size_memory_mb: Option<usize>,
336 pub memory_threshold_mb: usize,
337 pub throttle_ms: u64,
338 pub batch_narrow_mb: f64,
339 pub batch_wide_mb: f64,
340 pub wide_table_risk: bool,
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 fn cfg_with_profile(profile: TuningProfile) -> TuningConfig {
348 TuningConfig {
349 profile: Some(profile),
350 ..Default::default()
351 }
352 }
353
354 #[test]
355 fn default_config_uses_balanced_profile() {
356 let t = SourceTuning::from_config(None);
357 assert_eq!(t.batch_size, 10_000);
358 assert_eq!(t.throttle_ms, 50);
359 assert_eq!(t.statement_timeout_s, 300);
360 assert_eq!(t.max_retries, 3);
361 assert_eq!(t.retry_backoff_ms, 2_000);
362 assert_eq!(t.lock_timeout_s, 30);
363 }
364
365 #[test]
366 fn fallback_profile_used_when_no_profile_in_config() {
367 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Fast);
368 assert_eq!(t.batch_size, 50_000);
369 assert_eq!(t.throttle_ms, 0, "fallback to Fast must zero the throttle");
370 assert_eq!(t.profile_name(), "fast");
371
372 let t = SourceTuning::from_config_with_default_profile(None, TuningProfile::Safe);
373 assert_eq!(t.throttle_ms, 500);
374 assert_eq!(t.profile_name(), "safe");
375 }
376
377 #[test]
378 fn explicit_profile_wins_over_fallback() {
379 let cfg = cfg_with_profile(TuningProfile::Balanced);
380 let t = SourceTuning::from_config_with_default_profile(Some(&cfg), TuningProfile::Fast);
381 assert_eq!(
382 t.throttle_ms, 50,
383 "explicit balanced profile must keep its throttle"
384 );
385 assert_eq!(t.profile_name(), "balanced");
386 }
387
388 #[test]
389 fn fast_profile_favors_throughput() {
390 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
391 assert_eq!(t.batch_size, 50_000);
392 assert_eq!(t.throttle_ms, 0);
393 assert_eq!(t.statement_timeout_s, 0);
394 assert_eq!(t.max_retries, 1);
395 }
396
397 #[test]
398 fn safe_profile_limits_impact() {
399 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
400 assert_eq!(t.batch_size, 2_000);
401 assert_eq!(t.throttle_ms, 500);
402 assert_eq!(t.statement_timeout_s, 120);
403 assert_eq!(t.max_retries, 5);
404 assert_eq!(t.retry_backoff_ms, 5_000);
405 assert_eq!(t.lock_timeout_s, 10);
406 }
407
408 #[test]
409 fn explicit_fields_override_profile_defaults() {
410 let cfg = TuningConfig {
411 profile: Some(TuningProfile::Safe),
412 batch_size: Some(3_000),
413 throttle_ms: Some(250),
414 ..Default::default()
415 };
416 let t = SourceTuning::from_config(Some(&cfg));
417 assert_eq!(t.batch_size, 3_000, "explicit batch_size should win");
418 assert_eq!(t.throttle_ms, 250, "explicit throttle_ms should win");
419 assert_eq!(
420 t.statement_timeout_s, 120,
421 "non-overridden field stays at safe default"
422 );
423 assert_eq!(
424 t.max_retries, 5,
425 "non-overridden field stays at safe default"
426 );
427 }
428
429 #[test]
430 fn profile_name_fast() {
431 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Fast)));
432 assert_eq!(t.profile_name(), "fast");
433 }
434
435 #[test]
436 fn profile_name_balanced() {
437 let t = SourceTuning::from_config(None);
438 assert_eq!(t.profile_name(), "balanced");
439 }
440
441 #[test]
442 fn profile_name_safe() {
443 let t = SourceTuning::from_config(Some(&cfg_with_profile(TuningProfile::Safe)));
444 assert_eq!(t.profile_name(), "safe");
445 }
446
447 #[test]
448 fn display_contains_all_fields() {
449 let t = SourceTuning::from_config(None);
450 let s = t.to_string();
451 assert!(s.contains("profile=balanced"), "missing profile in: {s}");
452 assert!(s.contains("batch_size=10000"), "missing batch_size in: {s}");
453 assert!(s.contains("throttle=50ms"), "missing throttle in: {s}");
454 assert!(s.contains("timeout=300s"), "missing timeout in: {s}");
455 assert!(s.contains("retries=3"), "missing retries in: {s}");
456 assert!(
457 s.contains("lock_timeout=30s"),
458 "missing lock_timeout in: {s}"
459 );
460 }
461
462 #[test]
463 fn merge_tuning_export_overrides_source_fields() {
464 let source = TuningConfig {
465 profile: Some(TuningProfile::Fast),
466 batch_size: Some(1_000),
467 throttle_ms: Some(0),
468 ..Default::default()
469 };
470 let export = TuningConfig {
471 profile: Some(TuningProfile::Safe),
472 batch_size: None,
473 ..Default::default()
474 };
475 let m = merge_tuning_config(Some(&source), Some(&export)).expect("merged");
476 assert_eq!(m.profile, Some(TuningProfile::Safe));
477 assert_eq!(
478 m.batch_size,
479 Some(1_000),
480 "export omitted batch_size -> keep source"
481 );
482 assert_eq!(m.throttle_ms, Some(0));
483 }
484
485 #[test]
486 fn merge_tuning_export_only() {
487 let e = cfg_with_profile(TuningProfile::Fast);
488 let m = merge_tuning_config(None, Some(&e)).expect("merged");
489 assert_eq!(m.profile, Some(TuningProfile::Fast));
490 }
491
492 #[test]
493 fn effective_batch_size_without_memory() {
494 let t = SourceTuning::from_config(None);
495 assert_eq!(t.effective_batch_size(None), 10_000);
496 }
497
498 #[test]
499 fn effective_batch_size_with_memory() {
500 use arrow::datatypes::{DataType, Field, Schema};
501 use std::sync::Arc;
502 let cfg = TuningConfig {
503 batch_size_memory_mb: Some(256),
504 ..Default::default()
505 };
506 let t = SourceTuning::from_config(Some(&cfg));
507 let schema = Arc::new(Schema::new(vec![
508 Field::new("id", DataType::Int64, false),
509 Field::new("name", DataType::Utf8, true),
510 ]));
511 let bs = t.effective_batch_size(Some(&schema));
512 assert!((1_000..=500_000).contains(&bs), "got {bs}");
513 assert_eq!(bs, 500_000);
515 }
516
517 #[test]
518 fn resource_summary_balanced_profile() {
519 let t = SourceTuning::from_config(None);
520 let r = t.resource_summary();
521 assert_eq!(r.profile, "balanced");
522 assert_eq!(r.batch_size, 10_000);
523 assert!(r.batch_size_memory_mb.is_none());
524 assert_eq!(r.memory_threshold_mb, 4_096);
525 assert_eq!(r.throttle_ms, 50);
526 assert!(
528 r.batch_narrow_mb < 5.0,
529 "narrow too high: {}",
530 r.batch_narrow_mb
531 );
532 assert!(
534 !r.wide_table_risk,
535 "balanced 10k should not trigger wide_table_risk"
536 );
537 }
538
539 #[test]
540 fn resource_summary_fast_profile_triggers_wide_table_risk() {
541 let t = SourceTuning::from_config(Some(&TuningConfig {
542 profile: Some(TuningProfile::Fast),
543 ..Default::default()
544 }));
545 let r = t.resource_summary();
546 assert_eq!(r.batch_size, 50_000);
547 assert!(r.wide_table_risk, "fast 50k should trigger wide_table_risk");
549 }
550
551 #[test]
552 fn resource_summary_with_adaptive_batch() {
553 let cfg = TuningConfig {
554 batch_size_memory_mb: Some(64),
555 ..Default::default()
556 };
557 let t = SourceTuning::from_config(Some(&cfg));
558 let r = t.resource_summary();
559 assert_eq!(r.batch_size_memory_mb, Some(64));
560 }
561}