1use parking_lot::{Condvar, Mutex};
58use std::collections::VecDeque;
59use std::sync::Arc;
60use std::time::{Duration, Instant};
61
62use crate::connection::ToonConnection;
63use crate::error::Result;
64
65use toondb_core::toon::ToonValue;
66
67#[derive(Debug, Clone)]
69pub enum BatchOp {
70 Insert {
71 table: String,
72 values: Vec<(String, ToonValue)>,
73 },
74 InsertSlice {
76 table: String,
77 row_id: u64,
78 values: Vec<Option<ToonValue>>,
80 },
81 Update {
82 table: String,
83 key_field: String,
84 key_value: ToonValue,
85 updates: Vec<(String, ToonValue)>,
86 },
87 Delete {
88 table: String,
89 key_field: String,
90 key_value: ToonValue,
91 },
92}
93
94#[derive(Debug, Clone)]
96pub struct BatchResult {
97 pub ops_executed: usize,
98 pub ops_failed: usize,
99 pub duration_ms: u64,
100 pub fsync_count: u64,
101 pub chunks_committed: usize,
103}
104
105pub struct BatchWriter<'a> {
111 conn: &'a ToonConnection,
112 ops: Vec<BatchOp>,
113 max_batch_size: usize,
114 auto_commit: bool,
115 chunks_committed: usize,
117 total_ops_executed: usize,
119 total_ops_failed: usize,
121 cumulative_duration_ms: u64,
123}
124
125impl<'a> BatchWriter<'a> {
126 pub fn new(conn: &'a ToonConnection) -> Self {
128 Self {
129 conn,
130 ops: Vec::new(),
131 max_batch_size: 1000,
132 auto_commit: false,
133 chunks_committed: 0,
134 total_ops_executed: 0,
135 total_ops_failed: 0,
136 cumulative_duration_ms: 0,
137 }
138 }
139
140 pub fn max_batch_size(mut self, size: usize) -> Self {
148 self.max_batch_size = size.max(1); self
150 }
151
152 pub fn auto_commit(mut self, enabled: bool) -> Self {
158 self.auto_commit = enabled;
159 self
160 }
161
162 fn maybe_auto_flush(&mut self) -> Result<()> {
164 if self.auto_commit && self.ops.len() >= self.max_batch_size {
165 self.flush_current_batch()?;
166 }
167 Ok(())
168 }
169
170 fn flush_current_batch(&mut self) -> Result<()> {
172 if self.ops.is_empty() {
173 return Ok(());
174 }
175
176 let start = Instant::now();
177 let batch_ops = std::mem::take(&mut self.ops);
178 let batch_size = batch_ops.len();
179 let mut ops_failed = 0;
180
181 {
182 let mut tch = self.conn.tch.write();
183
184 for op in batch_ops {
185 match op {
186 BatchOp::Insert { table, values } => {
187 let map: std::collections::HashMap<String, ToonValue> =
188 values.into_iter().collect();
189 tch.insert_row(&table, &map);
190 }
191 BatchOp::InsertSlice {
192 table,
193 row_id: _,
194 values,
195 } => {
196 let schema = tch.get_table_schema(&table);
198 if let Some(schema) = schema {
199 let columns: Vec<_> = schema
200 .fields
201 .iter()
202 .zip(values.into_iter())
203 .filter_map(|(name, val)| val.map(|v| (name.clone(), v)))
204 .collect();
205 let map: std::collections::HashMap<String, ToonValue> =
206 columns.into_iter().collect();
207 tch.insert_row(&table, &map);
208 } else {
209 ops_failed += 1;
210 }
211 }
212 BatchOp::Update {
213 table,
214 key_field,
215 key_value,
216 updates,
217 } => {
218 let map: std::collections::HashMap<String, ToonValue> =
219 updates.into_iter().collect();
220 let where_clause = crate::connection::WhereClause::Simple {
221 field: key_field,
222 op: crate::connection::CompareOp::Eq,
223 value: key_value,
224 };
225 let _mutation_result = tch.update_rows(&table, &map, Some(&where_clause));
227 }
228 BatchOp::Delete {
229 table,
230 key_field,
231 key_value,
232 } => {
233 let where_clause = crate::connection::WhereClause::Simple {
234 field: key_field,
235 op: crate::connection::CompareOp::Eq,
236 value: key_value,
237 };
238 let _mutation_result = tch.delete_rows(&table, Some(&where_clause));
240 }
241 }
242 }
243 }
244
245 self.conn.storage.fsync()?;
247
248 let duration = start.elapsed().as_millis() as u64;
249 self.chunks_committed += 1;
250 self.total_ops_executed += batch_size - ops_failed;
251 self.total_ops_failed += ops_failed;
252 self.cumulative_duration_ms += duration;
253
254 Ok(())
255 }
256
257 pub fn insert(mut self, table: &str, values: Vec<(&str, ToonValue)>) -> Self {
259 self.ops.push(BatchOp::Insert {
260 table: table.to_string(),
261 values: values
262 .into_iter()
263 .map(|(k, v)| (k.to_string(), v))
264 .collect(),
265 });
266
267 if let Err(_e) = self.maybe_auto_flush() {
269 }
271
272 self
273 }
274
275 pub fn insert_slice(
289 mut self,
290 table: &str,
291 row_id: u64,
292 values: Vec<Option<ToonValue>>,
293 ) -> Self {
294 self.ops.push(BatchOp::InsertSlice {
295 table: table.to_string(),
296 row_id,
297 values,
298 });
299
300 if let Err(_e) = self.maybe_auto_flush() {
301 }
303
304 self
305 }
306
307 pub fn update(
309 mut self,
310 table: &str,
311 key_field: &str,
312 key_value: ToonValue,
313 updates: Vec<(&str, ToonValue)>,
314 ) -> Self {
315 self.ops.push(BatchOp::Update {
316 table: table.to_string(),
317 key_field: key_field.to_string(),
318 key_value,
319 updates: updates
320 .into_iter()
321 .map(|(k, v)| (k.to_string(), v))
322 .collect(),
323 });
324
325 if let Err(_e) = self.maybe_auto_flush() {
326 }
328
329 self
330 }
331
332 pub fn delete(mut self, table: &str, key_field: &str, key_value: ToonValue) -> Self {
334 self.ops.push(BatchOp::Delete {
335 table: table.to_string(),
336 key_field: key_field.to_string(),
337 key_value,
338 });
339
340 if let Err(_e) = self.maybe_auto_flush() {
341 }
343
344 self
345 }
346
347 pub fn pending_count(&self) -> usize {
349 self.ops.len()
350 }
351
352 pub fn total_count(&self) -> usize {
354 self.total_ops_executed + self.total_ops_failed + self.ops.len()
355 }
356
357 pub fn execute(mut self) -> Result<BatchResult> {
362 let _start = Instant::now();
363
364 if !self.ops.is_empty() {
366 self.flush_current_batch()?;
367 }
368
369 Ok(BatchResult {
370 ops_executed: self.total_ops_executed,
371 ops_failed: self.total_ops_failed,
372 duration_ms: self.cumulative_duration_ms,
373 fsync_count: self.chunks_committed as u64,
374 chunks_committed: self.chunks_committed,
375 })
376 }
377}
378
379#[deprecated(
390 since = "0.2.0",
391 note = "Use toondb_storage::EventDrivenGroupCommit for actual WAL integration"
392)]
393pub struct GroupCommitBuffer {
394 inner: Arc<Mutex<GroupCommitInner>>,
395 condvar: Arc<Condvar>,
396 config: GroupCommitConfig,
397}
398
399struct GroupCommitInner {
400 pending: VecDeque<PendingCommit>,
401 batch_id: u64,
402}
403
404#[allow(dead_code)]
405struct PendingCommit {
406 id: u64,
407 batch_id: u64,
408 committed: bool,
409}
410
411#[derive(Debug, Clone)]
413pub struct GroupCommitConfig {
414 pub max_wait_ms: u64,
416 pub max_batch_size: usize,
418 pub target_batch_size: usize,
420 pub fsync_latency_us: u64,
422}
423
424impl Default for GroupCommitConfig {
425 fn default() -> Self {
426 Self {
427 max_wait_ms: 10,
428 max_batch_size: 1000,
429 target_batch_size: 100,
430 fsync_latency_us: 5000, }
432 }
433}
434
435#[allow(deprecated)]
436impl GroupCommitBuffer {
437 pub fn new(config: GroupCommitConfig) -> Self {
439 Self {
440 inner: Arc::new(Mutex::new(GroupCommitInner {
441 pending: VecDeque::new(),
442 batch_id: 0,
443 })),
444 condvar: Arc::new(Condvar::new()),
445 config,
446 }
447 }
448
449 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
456 let l_fsync = self.config.fsync_latency_us as f64 / 1_000_000.0;
457 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost).sqrt();
458 (n_star as usize).clamp(1, self.config.max_batch_size)
459 }
460
461 pub fn submit_and_wait(&self, op_id: u64) -> Result<u64> {
463 let timeout = Duration::from_millis(self.config.max_wait_ms);
464 let target_size = self.config.target_batch_size;
465
466 let mut inner = self.inner.lock();
467 let current_batch_id = inner.batch_id;
468 inner.pending.push_back(PendingCommit {
469 id: op_id,
470 batch_id: current_batch_id,
471 committed: false,
472 });
473
474 let need_flush = inner.pending.len() >= target_size;
476 if need_flush {
477 inner.batch_id += 1;
478 }
479
480 let batch_id = inner.batch_id;
481
482 let result = self.condvar.wait_for(&mut inner, timeout);
484
485 if result.timed_out() {
486 inner.batch_id += 1;
488 }
489
490 Ok(batch_id)
491 }
492
493 pub fn flush(&self) {
495 let mut inner = self.inner.lock();
496
497 for pending in inner.pending.iter_mut() {
499 pending.committed = true;
500 }
501 inner.pending.clear();
502 inner.batch_id += 1;
503
504 self.condvar.notify_all();
506 }
507
508 pub fn pending_count(&self) -> usize {
510 self.inner.lock().pending.len()
511 }
512}
513
514impl ToonConnection {
516 pub fn batch<'a>(&'a self) -> BatchWriter<'a> {
518 BatchWriter::new(self)
519 }
520
521 pub fn bulk_insert(
523 &self,
524 table: &str,
525 rows: Vec<Vec<(&str, ToonValue)>>,
526 ) -> Result<BatchResult> {
527 let mut batch = BatchWriter::new(self)
528 .max_batch_size(1000)
529 .auto_commit(true); for row in rows {
531 batch = batch.insert(table, row);
532 }
533 batch.execute()
534 }
535
536 pub fn bulk_insert_slice(
540 &self,
541 table: &str,
542 rows: Vec<(u64, Vec<Option<ToonValue>>)>,
543 ) -> Result<BatchResult> {
544 let mut batch = BatchWriter::new(self)
545 .max_batch_size(1000)
546 .auto_commit(true);
547 for (row_id, values) in rows {
548 batch = batch.insert_slice(table, row_id, values);
549 }
550 batch.execute()
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557
558 #[test]
559 fn test_batch_writer() {
560 let conn = ToonConnection::open("./test").unwrap();
561
562 let result = conn
563 .batch()
564 .insert(
565 "users",
566 vec![
567 ("id", ToonValue::Int(1)),
568 ("name", ToonValue::Text("Alice".to_string())),
569 ],
570 )
571 .insert(
572 "users",
573 vec![
574 ("id", ToonValue::Int(2)),
575 ("name", ToonValue::Text("Bob".to_string())),
576 ],
577 )
578 .execute()
579 .unwrap();
580
581 assert_eq!(result.ops_executed, 2);
582 assert_eq!(result.fsync_count, 1);
583 assert_eq!(result.chunks_committed, 1);
584 }
585
586 #[test]
587 fn test_streaming_batch_writer() {
588 let conn = ToonConnection::open("./test_streaming").unwrap();
589
590 let result = conn
592 .batch()
593 .max_batch_size(2)
594 .auto_commit(true)
595 .insert("users", vec![("id", ToonValue::Int(1))])
596 .insert("users", vec![("id", ToonValue::Int(2))])
597 .insert("users", vec![("id", ToonValue::Int(3))])
598 .insert("users", vec![("id", ToonValue::Int(4))])
599 .insert("users", vec![("id", ToonValue::Int(5))])
600 .execute()
601 .unwrap();
602
603 assert_eq!(result.ops_executed, 5);
604 assert_eq!(result.chunks_committed, 3); }
606
607 #[test]
608 fn test_group_commit_config() {
609 let config = GroupCommitConfig::default();
610 assert_eq!(config.max_wait_ms, 10);
611 assert_eq!(config.max_batch_size, 1000);
612 }
613
614 #[test]
615 #[allow(deprecated)]
616 fn test_optimal_batch_size() {
617 let config = GroupCommitConfig {
618 fsync_latency_us: 5000, ..Default::default()
620 };
621 let buffer = GroupCommitBuffer::new(config);
622
623 let optimal = buffer.optimal_batch_size(10000.0, 0.001);
625 assert!(optimal > 1);
626 assert!(optimal <= 1000);
627 }
628
629 #[test]
630 fn test_bulk_insert() {
631 let conn = ToonConnection::open("./test").unwrap();
632
633 let rows = vec![
634 vec![
635 ("id", ToonValue::Int(1)),
636 ("name", ToonValue::Text("A".to_string())),
637 ],
638 vec![
639 ("id", ToonValue::Int(2)),
640 ("name", ToonValue::Text("B".to_string())),
641 ],
642 vec![
643 ("id", ToonValue::Int(3)),
644 ("name", ToonValue::Text("C".to_string())),
645 ],
646 ];
647
648 let result = conn.bulk_insert("users", rows).unwrap();
649 assert_eq!(result.ops_executed, 3);
650 }
651
652 #[test]
653 fn test_insert_slice() {
654 use crate::connection::FieldType;
655
656 let conn = ToonConnection::open("./test_slice").unwrap();
657
658 conn.register_table(
660 "users",
661 &[
662 ("id".to_string(), FieldType::UInt64),
663 ("name".to_string(), FieldType::Text),
664 ],
665 )
666 .unwrap();
667
668 let result = conn
669 .batch()
670 .insert_slice(
671 "users",
672 1,
673 vec![
674 Some(ToonValue::UInt(1)),
675 Some(ToonValue::Text("Alice".to_string())),
676 ],
677 )
678 .insert_slice(
679 "users",
680 2,
681 vec![
682 Some(ToonValue::UInt(2)),
683 Some(ToonValue::Text("Bob".to_string())),
684 ],
685 )
686 .execute()
687 .unwrap();
688
689 assert_eq!(result.ops_executed, 2);
690 }
691}