1use vibesql_storage::{
27 statistics::{CostEstimator, TableIndexInfo, TableStatistics},
28 Database,
29};
30
31pub mod thresholds {
34 pub const HIGH_COST_INSERT_THRESHOLD: f64 = 0.5;
37
38 pub const SMALL_BATCH_SIZE: usize = 100;
40
41 pub const LARGE_BATCH_SIZE: usize = 1000;
43
44 pub const HIGH_DELETED_RATIO_THRESHOLD: f64 = 0.4;
46
47 pub const CHUNK_DELETE_ROW_THRESHOLD: usize = 10000;
49
50 pub const DELETE_CHUNK_SIZE: usize = 1000;
52}
53
54pub struct DmlOptimizer<'a> {
58 cost_estimator: CostEstimator,
60 index_info: Option<TableIndexInfo>,
62 table_stats: Option<TableStatistics>,
64 #[allow(dead_code)]
66 db: &'a Database,
67 table_name: &'a str,
69}
70
71impl<'a> DmlOptimizer<'a> {
72 pub fn new(db: &'a Database, table_name: &'a str) -> Self {
81 let index_info = db.get_table_index_info(table_name);
82 let table_stats = db
83 .get_table(table_name)
84 .and_then(|t| t.get_statistics().cloned());
85
86 Self {
87 cost_estimator: CostEstimator::default(),
88 index_info,
89 table_stats,
90 db,
91 table_name,
92 }
93 }
94
95 pub fn get_stats_with_fallback(&self) -> TableStatistics {
100 if let Some(ref stats) = self.table_stats {
101 stats.clone()
102 } else {
103 self.create_fallback_stats()
105 }
106 }
107
108 fn create_fallback_stats(&self) -> TableStatistics {
110 let row_count = self
112 .db
113 .get_table(self.table_name)
114 .map(|t| t.row_count())
115 .unwrap_or(0);
116
117 TableStatistics {
119 row_count,
120 columns: std::collections::HashMap::new(),
121 last_updated: instant::SystemTime::now(),
122 is_stale: true, sample_metadata: None,
124 avg_row_bytes: None, }
126 }
127
128 pub fn optimal_insert_batch_size(&self, total_rows: usize) -> usize {
140 let index_info = match &self.index_info {
142 Some(info) => info,
143 None => return thresholds::LARGE_BATCH_SIZE.min(total_rows),
144 };
145
146 let stats = self.get_stats_with_fallback();
148 let single_row_cost = self.cost_estimator.estimate_insert(1, &stats, index_info);
149
150 if std::env::var("DML_COST_DEBUG").is_ok() {
152 eprintln!(
153 "DML_COST_DEBUG: INSERT on {} - cost_per_row={:.3}, hash_indexes={}, btree_indexes={}",
154 self.table_name,
155 single_row_cost,
156 index_info.hash_index_count,
157 index_info.btree_index_count
158 );
159 }
160
161 if single_row_cost > thresholds::HIGH_COST_INSERT_THRESHOLD {
163 thresholds::SMALL_BATCH_SIZE.min(total_rows)
164 } else {
165 thresholds::LARGE_BATCH_SIZE.min(total_rows)
166 }
167 }
168
169 pub fn should_chunk_delete(&self, rows_to_delete: usize) -> bool {
181 if rows_to_delete < thresholds::CHUNK_DELETE_ROW_THRESHOLD {
183 return false;
184 }
185
186 let index_info = match &self.index_info {
187 Some(info) => info,
188 None => return false,
189 };
190
191 let stats = self.get_stats_with_fallback();
193 let delete_cost =
194 self.cost_estimator.estimate_delete(rows_to_delete, &stats, index_info);
195
196 if std::env::var("DML_COST_DEBUG").is_ok() {
198 eprintln!(
199 "DML_COST_DEBUG: DELETE on {} - rows={}, cost={:.3}, deleted_ratio={:.2}",
200 self.table_name, rows_to_delete, delete_cost, index_info.deleted_ratio
201 );
202 }
203
204 let high_deleted_ratio = index_info.deleted_ratio > thresholds::HIGH_DELETED_RATIO_THRESHOLD;
208 let many_indexes = index_info.btree_index_count >= 3;
209
210 high_deleted_ratio || many_indexes
211 }
212
213 pub fn delete_chunk_size(&self) -> usize {
215 thresholds::DELETE_CHUNK_SIZE
216 }
217
218 pub fn should_trigger_early_compaction(&self) -> bool {
226 let index_info = match &self.index_info {
227 Some(info) => info,
228 None => return false,
229 };
230
231 index_info.deleted_ratio > thresholds::HIGH_DELETED_RATIO_THRESHOLD
234 && index_info.btree_index_count >= 2
235 }
236
237 pub fn compute_indexes_affected_ratio(
249 &self,
250 changed_columns: &std::collections::HashSet<usize>,
251 schema: &vibesql_catalog::TableSchema,
252 ) -> f64 {
253 let index_info = match &self.index_info {
254 Some(info) => info,
255 None => return 0.0,
256 };
257
258 let total_indexes =
259 index_info.hash_index_count + index_info.btree_index_count;
260 if total_indexes == 0 {
261 return 0.0;
262 }
263
264 let mut affected_indexes = 0;
265
266 if let Some(pk_indices) = schema.get_primary_key_indices() {
268 if pk_indices.iter().any(|i| changed_columns.contains(i)) {
269 affected_indexes += 1;
270 }
271 }
272
273 for unique_cols in &schema.unique_constraints {
275 let unique_indices: Vec<usize> = unique_cols
276 .iter()
277 .filter_map(|name| schema.get_column_index(name))
278 .collect();
279 if unique_indices.iter().any(|i| changed_columns.contains(i)) {
280 affected_indexes += 1;
281 }
282 }
283
284 let changed_column_names: Vec<String> = changed_columns
287 .iter()
288 .filter_map(|&i| schema.columns.get(i).map(|c| c.name.clone()))
289 .collect();
290
291 for col_name in &changed_column_names {
292 if self.db.has_index_on_column(self.table_name, col_name) {
293 affected_indexes += 1;
294 break; }
296 }
297
298 affected_indexes as f64 / total_indexes as f64
299 }
300
301 pub fn estimate_update_cost(&self, row_count: usize, indexes_affected_ratio: f64) -> f64 {
310 let index_info = match &self.index_info {
311 Some(info) => info,
312 None => return 0.0,
313 };
314
315 let stats = self.get_stats_with_fallback();
316 let cost = self
317 .cost_estimator
318 .estimate_update(row_count, &stats, index_info, indexes_affected_ratio);
319
320 if std::env::var("DML_COST_DEBUG").is_ok() {
321 eprintln!(
322 "DML_COST_DEBUG: UPDATE on {} - rows={}, affected_ratio={:.2}, cost={:.3}",
323 self.table_name, row_count, indexes_affected_ratio, cost
324 );
325 }
326
327 cost
328 }
329
330 pub fn index_info(&self) -> Option<&TableIndexInfo> {
332 self.index_info.as_ref()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use vibesql_catalog::{ColumnSchema, TableSchema};
340 use vibesql_types::DataType;
341
342 fn create_test_db_with_table(
343 table_name: &str,
344 with_pk: bool,
345 btree_index_count: usize,
346 ) -> Database {
347 let mut db = Database::new();
348
349 let schema = if with_pk {
350 TableSchema::with_primary_key(
351 table_name.to_string(),
352 vec![
353 ColumnSchema::new("id".to_string(), DataType::Integer, false),
354 ColumnSchema::new("name".to_string(), DataType::Varchar { max_length: Some(100) }, false),
355 ColumnSchema::new("value".to_string(), DataType::Integer, true),
356 ],
357 vec!["id".to_string()],
358 )
359 } else {
360 TableSchema::new(
361 table_name.to_string(),
362 vec![
363 ColumnSchema::new("id".to_string(), DataType::Integer, false),
364 ColumnSchema::new("name".to_string(), DataType::Varchar { max_length: Some(100) }, false),
365 ],
366 )
367 };
368 db.create_table(schema).unwrap();
369
370 for i in 0..btree_index_count {
372 db.create_index(
373 format!("idx_{}_{}", table_name, i),
374 table_name.to_string(),
375 false,
376 vec![vibesql_ast::IndexColumn {
377 column_name: "name".to_string(),
378 direction: vibesql_ast::OrderDirection::Asc,
379 prefix_length: None,
380 }],
381 ).unwrap();
382 }
383
384 db
385 }
386
387 #[test]
388 fn test_optimal_insert_batch_size_low_cost() {
389 let db = create_test_db_with_table("test_table", true, 0);
390 let optimizer = DmlOptimizer::new(&db, "test_table");
391
392 let batch_size = optimizer.optimal_insert_batch_size(5000);
394 assert!(
396 batch_size == thresholds::SMALL_BATCH_SIZE
397 || batch_size == thresholds::LARGE_BATCH_SIZE
398 );
399 }
400
401 #[test]
402 fn test_optimal_insert_batch_size_high_cost() {
403 let db = create_test_db_with_table("test_table", true, 5);
404 let optimizer = DmlOptimizer::new(&db, "test_table");
405
406 let batch_size = optimizer.optimal_insert_batch_size(5000);
408 assert!(batch_size <= thresholds::LARGE_BATCH_SIZE);
410 assert!(batch_size >= thresholds::SMALL_BATCH_SIZE);
411 }
412
413 #[test]
414 fn test_optimal_insert_batch_size_more_indexes_smaller_batch() {
415 let db_few = create_test_db_with_table("table_few", true, 1);
417 let db_many = create_test_db_with_table("table_many", true, 5);
418
419 let optimizer_few = DmlOptimizer::new(&db_few, "table_few");
420 let optimizer_many = DmlOptimizer::new(&db_many, "table_many");
421
422 let batch_few = optimizer_few.optimal_insert_batch_size(5000);
423 let batch_many = optimizer_many.optimal_insert_batch_size(5000);
424
425 assert!(batch_many <= batch_few);
427 }
428
429 #[test]
430 fn test_should_chunk_delete_small() {
431 let db = create_test_db_with_table("test_table", true, 0);
432 let optimizer = DmlOptimizer::new(&db, "test_table");
433
434 assert!(!optimizer.should_chunk_delete(100));
436 }
437
438 #[test]
439 fn test_compute_indexes_affected_ratio_no_indexes() {
440 let db = create_test_db_with_table("test_table", false, 0);
441 let optimizer = DmlOptimizer::new(&db, "test_table");
442
443 let schema = db.catalog.get_table("test_table").unwrap();
444 let changed_columns: std::collections::HashSet<usize> = [1].into_iter().collect();
445
446 let ratio = optimizer.compute_indexes_affected_ratio(&changed_columns, schema);
447 assert_eq!(ratio, 0.0);
448 }
449
450 #[test]
451 fn test_compute_indexes_affected_ratio_pk_affected() {
452 let db = create_test_db_with_table("test_table", true, 0);
453 let optimizer = DmlOptimizer::new(&db, "test_table");
454
455 let schema = db.catalog.get_table("test_table").unwrap();
456 let changed_columns: std::collections::HashSet<usize> = [0].into_iter().collect();
458
459 let ratio = optimizer.compute_indexes_affected_ratio(&changed_columns, schema);
460 assert!(ratio > 0.0, "PK update should affect at least one index");
461 }
462
463 #[test]
464 fn test_fallback_stats() {
465 let db = create_test_db_with_table("test_table", true, 0);
466 let optimizer = DmlOptimizer::new(&db, "test_table");
467
468 let stats = optimizer.get_stats_with_fallback();
470 assert_eq!(stats.row_count, 0);
472 }
473
474 #[test]
475 fn test_estimate_update_cost() {
476 let db = create_test_db_with_table("test_table", true, 2);
477 let optimizer = DmlOptimizer::new(&db, "test_table");
478
479 let full_cost = optimizer.estimate_update_cost(100, 1.0);
481
482 let selective_cost = optimizer.estimate_update_cost(100, 0.0);
484
485 assert!(
486 full_cost > selective_cost,
487 "Full update should cost more than selective"
488 );
489 }
490}