toondb/
batch.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Batch Writer with Group Commit & Streaming Auto-Commit
16//!
17//! High-throughput batch operations with adaptive sizing and transaction chunking.
18//!
19//! ## Streaming BatchWriter
20//!
21//! When `auto_commit` is enabled, the batch writer automatically commits transactions
22//! when they reach `max_batch_size` operations. This provides:
23//!
24//! - **Bounded memory**: O(max_batch_size) instead of O(total_stream)
25//! - **Predictable latency**: p95/p99 commit latency bounded by one chunk
26//! - **Tunable throughput**: Batch size can be tuned to saturate fsync throughput
27//!
28//! ## Performance Model
29//!
30//! Per-operation cost:
31//! ```text
32//! C_op = c + L_fsync / K
33//! ```
34//! Where:
35//! - c = CPU cost per write (~500ns)
36//! - L_fsync = fsync latency (~5ms)
37//! - K = batch size (ops per txn)
38//!
39//! Optimal batch size formula (from GroupCommitBuffer):
40//! ```text
41//! N* = sqrt(2 × L_fsync × λ / C_wait)
42//! ```
43//!
44//! ## Example
45//!
46//! ```ignore
47//! // Streaming mode: auto-commits every 1000 ops
48//! let result = conn.batch()
49//!     .max_batch_size(1000)
50//!     .auto_commit(true)
51//!     .insert("events", event1)
52//!     .insert("events", event2)
53//!     // ... millions of events
54//!     .execute()?; // Commits final partial batch
55//! ```
56
57use parking_lot::{Condvar, Mutex};
58use std::collections::VecDeque;
59use std::sync::Arc;
60use std::time::{Duration, Instant};
61
62use crate::connection::ToonConnection;
63use crate::error::Result;
64
65use toondb_core::toon::ToonValue;
66
67/// Batch write operation
68#[derive(Debug, Clone)]
69pub enum BatchOp {
70    Insert {
71        table: String,
72        values: Vec<(String, ToonValue)>,
73    },
74    /// Zero-allocation insert using slice-based values in schema order
75    InsertSlice {
76        table: String,
77        row_id: u64,
78        /// Values in schema column order (indices match column positions)
79        values: Vec<Option<ToonValue>>,
80    },
81    Update {
82        table: String,
83        key_field: String,
84        key_value: ToonValue,
85        updates: Vec<(String, ToonValue)>,
86    },
87    Delete {
88        table: String,
89        key_field: String,
90        key_value: ToonValue,
91    },
92}
93
94/// Batch result
95#[derive(Debug, Clone)]
96pub struct BatchResult {
97    pub ops_executed: usize,
98    pub ops_failed: usize,
99    pub duration_ms: u64,
100    pub fsync_count: u64,
101    /// Number of transaction chunks committed (for streaming mode)
102    pub chunks_committed: usize,
103}
104
105/// Batch writer for high-throughput operations
106///
107/// Supports two modes:
108/// - **Buffered mode** (auto_commit=false): All ops buffered, single commit at execute()
109/// - **Streaming mode** (auto_commit=true): Auto-commits every max_batch_size ops
110pub struct BatchWriter<'a> {
111    conn: &'a ToonConnection,
112    ops: Vec<BatchOp>,
113    max_batch_size: usize,
114    auto_commit: bool,
115    /// Number of chunks already committed (streaming mode)
116    chunks_committed: usize,
117    /// Total ops executed across all chunks
118    total_ops_executed: usize,
119    /// Total ops failed across all chunks
120    total_ops_failed: usize,
121    /// Cumulative duration of all chunks
122    cumulative_duration_ms: u64,
123}
124
125impl<'a> BatchWriter<'a> {
126    /// Create new batch writer
127    pub fn new(conn: &'a ToonConnection) -> Self {
128        Self {
129            conn,
130            ops: Vec::new(),
131            max_batch_size: 1000,
132            auto_commit: false,
133            chunks_committed: 0,
134            total_ops_executed: 0,
135            total_ops_failed: 0,
136            cumulative_duration_ms: 0,
137        }
138    }
139
140    /// Set maximum batch size
141    ///
142    /// In streaming mode (auto_commit=true), a commit is triggered when
143    /// the batch reaches this size. Recommended values:
144    /// - 100-500: Low latency, more fsyncs
145    /// - 1000-5000: Balanced (default)
146    /// - 10000+: Maximum throughput, higher latency spikes
147    pub fn max_batch_size(mut self, size: usize) -> Self {
148        self.max_batch_size = size.max(1); // At least 1
149        self
150    }
151
152    /// Enable auto-commit when batch is full
153    ///
154    /// When enabled, the batch writer will automatically commit transactions
155    /// when they reach `max_batch_size` operations. This bounds memory usage
156    /// to O(max_batch_size) and provides predictable commit latency.
157    pub fn auto_commit(mut self, enabled: bool) -> Self {
158        self.auto_commit = enabled;
159        self
160    }
161
162    /// Flush current batch if it's full (internal method)
163    fn maybe_auto_flush(&mut self) -> Result<()> {
164        if self.auto_commit && self.ops.len() >= self.max_batch_size {
165            self.flush_current_batch()?;
166        }
167        Ok(())
168    }
169
170    /// Flush the current batch of operations (internal method)
171    fn flush_current_batch(&mut self) -> Result<()> {
172        if self.ops.is_empty() {
173            return Ok(());
174        }
175
176        let start = Instant::now();
177        let batch_ops = std::mem::take(&mut self.ops);
178        let batch_size = batch_ops.len();
179        let mut ops_failed = 0;
180
181        {
182            let mut tch = self.conn.tch.write();
183
184            for op in batch_ops {
185                match op {
186                    BatchOp::Insert { table, values } => {
187                        let map: std::collections::HashMap<String, ToonValue> =
188                            values.into_iter().collect();
189                        tch.insert_row(&table, &map);
190                    }
191                    BatchOp::InsertSlice {
192                        table,
193                        row_id: _,
194                        values,
195                    } => {
196                        // Convert to HashMap for now; the optimized path is in storage layer
197                        let schema = tch.get_table_schema(&table);
198                        if let Some(schema) = schema {
199                            let columns: Vec<_> = schema
200                                .fields
201                                .iter()
202                                .zip(values.into_iter())
203                                .filter_map(|(name, val)| val.map(|v| (name.clone(), v)))
204                                .collect();
205                            let map: std::collections::HashMap<String, ToonValue> =
206                                columns.into_iter().collect();
207                            tch.insert_row(&table, &map);
208                        } else {
209                            ops_failed += 1;
210                        }
211                    }
212                    BatchOp::Update {
213                        table,
214                        key_field,
215                        key_value,
216                        updates,
217                    } => {
218                        let map: std::collections::HashMap<String, ToonValue> =
219                            updates.into_iter().collect();
220                        let where_clause = crate::connection::WhereClause::Simple {
221                            field: key_field,
222                            op: crate::connection::CompareOp::Eq,
223                            value: key_value,
224                        };
225                        // MutationResult contains affected_row_ids for future CDC/WAL integration
226                        let _mutation_result = tch.update_rows(&table, &map, Some(&where_clause));
227                    }
228                    BatchOp::Delete {
229                        table,
230                        key_field,
231                        key_value,
232                    } => {
233                        let where_clause = crate::connection::WhereClause::Simple {
234                            field: key_field,
235                            op: crate::connection::CompareOp::Eq,
236                            value: key_value,
237                        };
238                        // MutationResult contains affected_row_ids for future CDC/WAL integration
239                        let _mutation_result = tch.delete_rows(&table, Some(&where_clause));
240                    }
241                }
242            }
243        }
244
245        // Single fsync for this chunk
246        self.conn.storage.fsync()?;
247
248        let duration = start.elapsed().as_millis() as u64;
249        self.chunks_committed += 1;
250        self.total_ops_executed += batch_size - ops_failed;
251        self.total_ops_failed += ops_failed;
252        self.cumulative_duration_ms += duration;
253
254        Ok(())
255    }
256
257    /// Add insert operation
258    pub fn insert(mut self, table: &str, values: Vec<(&str, ToonValue)>) -> Self {
259        self.ops.push(BatchOp::Insert {
260            table: table.to_string(),
261            values: values
262                .into_iter()
263                .map(|(k, v)| (k.to_string(), v))
264                .collect(),
265        });
266
267        // Auto-flush if batch is full
268        if let Err(_e) = self.maybe_auto_flush() {
269            // In streaming mode, errors are accumulated; execute() returns final result
270        }
271
272        self
273    }
274
275    /// Add insert operation using slice-based values (zero-allocation path)
276    ///
277    /// Values must be in schema column order. Use None for NULL values.
278    /// This is the fastest insert path, matching benchmark performance.
279    ///
280    /// # Example
281    /// ```ignore
282    /// batch.insert_slice("users", 1, vec![
283    ///     Some(ToonValue::UInt(1)),
284    ///     Some(ToonValue::Text("Alice".into())),
285    ///     None, // NULL
286    /// ])
287    /// ```
288    pub fn insert_slice(
289        mut self,
290        table: &str,
291        row_id: u64,
292        values: Vec<Option<ToonValue>>,
293    ) -> Self {
294        self.ops.push(BatchOp::InsertSlice {
295            table: table.to_string(),
296            row_id,
297            values,
298        });
299
300        if let Err(_e) = self.maybe_auto_flush() {
301            // Errors accumulated
302        }
303
304        self
305    }
306
307    /// Add update operation
308    pub fn update(
309        mut self,
310        table: &str,
311        key_field: &str,
312        key_value: ToonValue,
313        updates: Vec<(&str, ToonValue)>,
314    ) -> Self {
315        self.ops.push(BatchOp::Update {
316            table: table.to_string(),
317            key_field: key_field.to_string(),
318            key_value,
319            updates: updates
320                .into_iter()
321                .map(|(k, v)| (k.to_string(), v))
322                .collect(),
323        });
324
325        if let Err(_e) = self.maybe_auto_flush() {
326            // Errors accumulated
327        }
328
329        self
330    }
331
332    /// Add delete operation
333    pub fn delete(mut self, table: &str, key_field: &str, key_value: ToonValue) -> Self {
334        self.ops.push(BatchOp::Delete {
335            table: table.to_string(),
336            key_field: key_field.to_string(),
337            key_value,
338        });
339
340        if let Err(_e) = self.maybe_auto_flush() {
341            // Errors accumulated
342        }
343
344        self
345    }
346
347    /// Get number of pending operations (in current unflushed batch)
348    pub fn pending_count(&self) -> usize {
349        self.ops.len()
350    }
351
352    /// Get total operations processed so far (including flushed chunks)
353    pub fn total_count(&self) -> usize {
354        self.total_ops_executed + self.total_ops_failed + self.ops.len()
355    }
356
357    /// Execute all pending operations
358    ///
359    /// In streaming mode, this commits any remaining operations in the final
360    /// partial batch. Returns cumulative results from all chunks.
361    pub fn execute(mut self) -> Result<BatchResult> {
362        let _start = Instant::now();
363
364        // Flush any remaining operations
365        if !self.ops.is_empty() {
366            self.flush_current_batch()?;
367        }
368
369        Ok(BatchResult {
370            ops_executed: self.total_ops_executed,
371            ops_failed: self.total_ops_failed,
372            duration_ms: self.cumulative_duration_ms,
373            fsync_count: self.chunks_committed as u64,
374            chunks_committed: self.chunks_committed,
375        })
376    }
377}
378
379/// Group commit buffer for high-throughput durability
380///
381/// **DEPRECATED**: Use `toondb_storage::GroupCommitBuffer` instead.
382/// This client-side implementation is purely in-memory and doesn't
383/// actually perform I/O or integrate with the WAL. The storage layer's
384/// `EventDrivenGroupCommit` or `GroupCommitBuffer` should be used for
385/// actual durability guarantees.
386///
387/// Batches multiple transactions into single fsync for efficiency.
388/// Optimal batch size: N* = sqrt(2 × L_fsync × λ / C_wait)
389#[deprecated(
390    since = "0.2.0",
391    note = "Use toondb_storage::EventDrivenGroupCommit for actual WAL integration"
392)]
393pub struct GroupCommitBuffer {
394    inner: Arc<Mutex<GroupCommitInner>>,
395    condvar: Arc<Condvar>,
396    config: GroupCommitConfig,
397}
398
399struct GroupCommitInner {
400    pending: VecDeque<PendingCommit>,
401    batch_id: u64,
402}
403
404#[allow(dead_code)]
405struct PendingCommit {
406    id: u64,
407    batch_id: u64,
408    committed: bool,
409}
410
411/// Group commit configuration
412#[derive(Debug, Clone)]
413pub struct GroupCommitConfig {
414    /// Maximum wait time before forced flush
415    pub max_wait_ms: u64,
416    /// Maximum operations before forced flush
417    pub max_batch_size: usize,
418    /// Target batch size (adaptive)
419    pub target_batch_size: usize,
420    /// Average fsync latency in microseconds
421    pub fsync_latency_us: u64,
422}
423
424impl Default for GroupCommitConfig {
425    fn default() -> Self {
426        Self {
427            max_wait_ms: 10,
428            max_batch_size: 1000,
429            target_batch_size: 100,
430            fsync_latency_us: 5000, // 5ms default
431        }
432    }
433}
434
435#[allow(deprecated)]
436impl GroupCommitBuffer {
437    /// Create new group commit buffer
438    pub fn new(config: GroupCommitConfig) -> Self {
439        Self {
440            inner: Arc::new(Mutex::new(GroupCommitInner {
441                pending: VecDeque::new(),
442                batch_id: 0,
443            })),
444            condvar: Arc::new(Condvar::new()),
445            config,
446        }
447    }
448
449    /// Calculate optimal batch size
450    ///
451    /// Formula: N* = sqrt(2 × L_fsync × λ / C_wait)
452    /// - L_fsync: fsync latency
453    /// - λ: arrival rate (ops/sec)
454    /// - C_wait: cost per unit wait time
455    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
456        let l_fsync = self.config.fsync_latency_us as f64 / 1_000_000.0;
457        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost).sqrt();
458        (n_star as usize).clamp(1, self.config.max_batch_size)
459    }
460
461    /// Submit operation and wait for commit
462    pub fn submit_and_wait(&self, op_id: u64) -> Result<u64> {
463        let timeout = Duration::from_millis(self.config.max_wait_ms);
464        let target_size = self.config.target_batch_size;
465
466        let mut inner = self.inner.lock();
467        let current_batch_id = inner.batch_id;
468        inner.pending.push_back(PendingCommit {
469            id: op_id,
470            batch_id: current_batch_id,
471            committed: false,
472        });
473
474        // Check if we should flush
475        let need_flush = inner.pending.len() >= target_size;
476        if need_flush {
477            inner.batch_id += 1;
478        }
479
480        let batch_id = inner.batch_id;
481
482        // Wait for batch to be committed
483        let result = self.condvar.wait_for(&mut inner, timeout);
484
485        if result.timed_out() {
486            // Force flush on timeout
487            inner.batch_id += 1;
488        }
489
490        Ok(batch_id)
491    }
492
493    /// Flush pending commits
494    pub fn flush(&self) {
495        let mut inner = self.inner.lock();
496
497        // Mark all pending as committed
498        for pending in inner.pending.iter_mut() {
499            pending.committed = true;
500        }
501        inner.pending.clear();
502        inner.batch_id += 1;
503
504        // Wake all waiters
505        self.condvar.notify_all();
506    }
507
508    /// Get pending count
509    pub fn pending_count(&self) -> usize {
510        self.inner.lock().pending.len()
511    }
512}
513
514/// Batch operations on connection
515impl ToonConnection {
516    /// Start batch writer
517    pub fn batch<'a>(&'a self) -> BatchWriter<'a> {
518        BatchWriter::new(self)
519    }
520
521    /// Bulk insert rows (uses streaming mode internally)
522    pub fn bulk_insert(
523        &self,
524        table: &str,
525        rows: Vec<Vec<(&str, ToonValue)>>,
526    ) -> Result<BatchResult> {
527        let mut batch = BatchWriter::new(self)
528            .max_batch_size(1000)
529            .auto_commit(true); // Enable streaming for bulk inserts
530        for row in rows {
531            batch = batch.insert(table, row);
532        }
533        batch.execute()
534    }
535
536    /// Bulk insert with zero-allocation path (fastest)
537    ///
538    /// Values must be in schema column order.
539    pub fn bulk_insert_slice(
540        &self,
541        table: &str,
542        rows: Vec<(u64, Vec<Option<ToonValue>>)>,
543    ) -> Result<BatchResult> {
544        let mut batch = BatchWriter::new(self)
545            .max_batch_size(1000)
546            .auto_commit(true);
547        for (row_id, values) in rows {
548            batch = batch.insert_slice(table, row_id, values);
549        }
550        batch.execute()
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557
558    #[test]
559    fn test_batch_writer() {
560        let conn = ToonConnection::open("./test").unwrap();
561
562        let result = conn
563            .batch()
564            .insert(
565                "users",
566                vec![
567                    ("id", ToonValue::Int(1)),
568                    ("name", ToonValue::Text("Alice".to_string())),
569                ],
570            )
571            .insert(
572                "users",
573                vec![
574                    ("id", ToonValue::Int(2)),
575                    ("name", ToonValue::Text("Bob".to_string())),
576                ],
577            )
578            .execute()
579            .unwrap();
580
581        assert_eq!(result.ops_executed, 2);
582        assert_eq!(result.fsync_count, 1);
583        assert_eq!(result.chunks_committed, 1);
584    }
585
586    #[test]
587    fn test_streaming_batch_writer() {
588        let conn = ToonConnection::open("./test_streaming").unwrap();
589
590        // With auto_commit and max_batch_size=2, should commit after every 2 ops
591        let result = conn
592            .batch()
593            .max_batch_size(2)
594            .auto_commit(true)
595            .insert("users", vec![("id", ToonValue::Int(1))])
596            .insert("users", vec![("id", ToonValue::Int(2))])
597            .insert("users", vec![("id", ToonValue::Int(3))])
598            .insert("users", vec![("id", ToonValue::Int(4))])
599            .insert("users", vec![("id", ToonValue::Int(5))])
600            .execute()
601            .unwrap();
602
603        assert_eq!(result.ops_executed, 5);
604        assert_eq!(result.chunks_committed, 3); // 2 full chunks + 1 partial
605    }
606
607    #[test]
608    fn test_group_commit_config() {
609        let config = GroupCommitConfig::default();
610        assert_eq!(config.max_wait_ms, 10);
611        assert_eq!(config.max_batch_size, 1000);
612    }
613
614    #[test]
615    #[allow(deprecated)]
616    fn test_optimal_batch_size() {
617        let config = GroupCommitConfig {
618            fsync_latency_us: 5000, // 5ms
619            ..Default::default()
620        };
621        let buffer = GroupCommitBuffer::new(config);
622
623        // arrival_rate = 10000 ops/sec, wait_cost = 0.001
624        let optimal = buffer.optimal_batch_size(10000.0, 0.001);
625        assert!(optimal > 1);
626        assert!(optimal <= 1000);
627    }
628
629    #[test]
630    fn test_bulk_insert() {
631        let conn = ToonConnection::open("./test").unwrap();
632
633        let rows = vec![
634            vec![
635                ("id", ToonValue::Int(1)),
636                ("name", ToonValue::Text("A".to_string())),
637            ],
638            vec![
639                ("id", ToonValue::Int(2)),
640                ("name", ToonValue::Text("B".to_string())),
641            ],
642            vec![
643                ("id", ToonValue::Int(3)),
644                ("name", ToonValue::Text("C".to_string())),
645            ],
646        ];
647
648        let result = conn.bulk_insert("users", rows).unwrap();
649        assert_eq!(result.ops_executed, 3);
650    }
651
652    #[test]
653    fn test_insert_slice() {
654        use crate::connection::FieldType;
655
656        let conn = ToonConnection::open("./test_slice").unwrap();
657
658        // Register the table schema first - insert_slice requires a pre-existing schema
659        conn.register_table(
660            "users",
661            &[
662                ("id".to_string(), FieldType::UInt64),
663                ("name".to_string(), FieldType::Text),
664            ],
665        )
666        .unwrap();
667
668        let result = conn
669            .batch()
670            .insert_slice(
671                "users",
672                1,
673                vec![
674                    Some(ToonValue::UInt(1)),
675                    Some(ToonValue::Text("Alice".to_string())),
676                ],
677            )
678            .insert_slice(
679                "users",
680                2,
681                vec![
682                    Some(ToonValue::UInt(2)),
683                    Some(ToonValue::Text("Bob".to_string())),
684                ],
685            )
686            .execute()
687            .unwrap();
688
689        assert_eq!(result.ops_executed, 2);
690    }
691}