Skip to main content

hyperdb_api/
inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//!     inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
29use tracing::{debug, info};
30
31use crate::catalog::Catalog;
32use crate::connection::Connection;
33use crate::error::{Error, Result};
34use crate::table_definition::TableDefinition;
35
36/// Initial buffer size (4 MB) to reduce early reallocations.
37///
38/// The COPY protocol sends data in chunks, and each chunk requires a
39/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
40/// during the first chunk for typical workloads while keeping initial
41/// memory allocation reasonable.
42const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
43
44/// Maximum buffer size per chunk before flushing to the server (16 MB).
45///
46/// This balances two competing concerns:
47/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
48///   network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
49/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
50///   keeps resident memory bounded even when rows are wide.
51///
52/// The 16 MB value was chosen empirically — it lands on the flat part of
53/// the throughput curve where further increases yield diminishing returns.
54const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
55
56/// Maximum rows per chunk before flushing to the server.
57///
58/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
59/// narrow rows (few bytes each), the byte limit alone would accumulate
60/// millions of rows before flushing, which delays server-side processing.
61/// 64K rows ensures timely flushes regardless of row width and aligns with
62/// the 64K chunk size used for query result streaming
63/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
64const CHUNK_ROW_LIMIT: usize = 64_000;
65
66/// A high-performance bulk data inserter.
67///
68/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
69/// using the COPY protocol with `HyperBinary` format for optimal performance.
70///
71/// # Example
72///
73/// ```no_run
74/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
75///
76/// fn main() -> Result<()> {
77///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
78///
79///     let table_def = TableDefinition::new("users")
80///         .add_required_column("id", SqlType::int())
81///         .add_nullable_column("name", SqlType::text());
82///
83///     Catalog::new(&conn).create_table(&table_def)?;
84///
85///     let mut inserter = Inserter::new(&conn, &table_def)?;
86///
87///     for i in 0..1000i32 {
88///         inserter.add_row(&[&i, &format!("User {}", i)])?;
89///     }
90///
91///     let rows = inserter.execute()?;
92///     println!("Inserted {} rows", rows);
93///     Ok(())
94/// }
95/// ```
96#[derive(Debug)]
97pub struct Inserter<'conn> {
98    connection: &'conn Connection,
99    table_def: TableDefinition,
100    /// The current chunk being populated (delegates encoding).
101    chunk: InsertChunk,
102    /// Total rows inserted across all chunks.
103    row_count: u64,
104    /// Number of chunks sent.
105    chunk_count: usize,
106    /// Active COPY writer (lazily initialized on first write).
107    writer: Option<CopyInWriter<'conn>>,
108    /// Start time for timing the insert operation.
109    start_time: Instant,
110}
111
112impl<'conn> Inserter<'conn> {
113    /// Creates a new inserter for the given table.
114    ///
115    /// The underlying COPY session is started lazily on the first flush or
116    /// execute, so construction is lightweight. However, the connection's
117    /// transport is validated eagerly — using a gRPC connection will return
118    /// an error immediately.
119    ///
120    /// # Errors
121    ///
122    /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
123    ///   columns.
124    /// - Returns [`Error::Other`] if `connection` is using gRPC transport
125    ///   (COPY is TCP-only).
126    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
127        if table_def.column_count() == 0 {
128            return Err(Error::InvalidTableDefinition(
129                "Table definition must have at least one column".into(),
130            ));
131        }
132
133        // Fail fast: verify the connection supports COPY (TCP only)
134        if connection.tcp_client().is_none() {
135            return Err(Error::new(
136                "Inserter requires a TCP connection. \
137                 gRPC connections do not support COPY operations.",
138            ));
139        }
140
141        Ok(Inserter {
142            connection,
143            table_def: table_def.clone(),
144            chunk: InsertChunk::from_table_definition(table_def),
145            row_count: 0,
146            chunk_count: 0,
147            writer: None,
148            start_time: Instant::now(),
149        })
150    }
151
152    /// Creates an inserter by querying the table schema from the database.
153    ///
154    /// This method queries the database to get the table definition automatically,
155    /// which is useful when you want to insert into an existing table without
156    /// manually specifying the schema.
157    ///
158    /// # Arguments
159    ///
160    /// * `connection` - The database connection.
161    /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
166    ///
167    /// # Example
168    ///
169    /// ```no_run
170    /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
171    ///
172    /// fn main() -> Result<()> {
173    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
174    ///
175    ///     // Create a table first
176    ///     conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
177    ///
178    ///     // Create inserter by querying the schema directly from a string
179    ///     let mut inserter = Inserter::from_table(&conn, "public.products")?;
180    ///
181    ///     // Now we can insert data without knowing the exact schema
182    ///     inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
183    ///     inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
184    ///
185    ///     let rows = inserter.execute()?;
186    ///     println!("Inserted {} rows", rows);
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
191    where
192        T: TryInto<crate::TableName>,
193        crate::Error: From<T::Error>,
194    {
195        let catalog = Catalog::new(connection);
196        let table_def = catalog.get_table_definition(table_name)?;
197        Self::new(connection, &table_def)
198    }
199
200    /// Creates an inserter with column mappings that allow SQL expressions.
201    ///
202    /// This method uses a temporary table and INSERT...SELECT to support
203    /// column mappings with SQL expressions. Data is first inserted into
204    /// a temporary staging table, then transformed using the mappings.
205    ///
206    /// # Arguments
207    ///
208    /// * `connection` - The database connection.
209    /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
210    /// * `target_table` - The qualified name of the target table to insert into.
211    ///   Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
212    /// * `mappings` - Column mappings defining how values are transformed.
213    ///
214    /// # Example
215    ///
216    /// ```no_run
217    /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
218    ///
219    /// fn main() -> Result<()> {
220    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
221    ///
222    ///     // Target table with computed columns
223    ///     conn.execute_command(r#"
224    ///         CREATE TABLE orders (
225    ///             id INT NOT NULL,
226    ///             product TEXT,
227    ///             quantity INT,
228    ///             price DOUBLE PRECISION,
229    ///             total DOUBLE PRECISION,
230    ///             created_at TIMESTAMP
231    ///         )
232    ///     "#)?;
233    ///
234    ///     // Inserter definition - what we provide
235    ///     let inserter_def = TableDefinition::new("_stage")
236    ///         .add_required_column("id", SqlType::int())
237    ///         .add_nullable_column("product", SqlType::text())
238    ///         .add_nullable_column("quantity", SqlType::int())
239    ///         .add_nullable_column("price", SqlType::double());
240    ///
241    ///     // Column mappings - how values are transformed
242    ///     let mappings = vec![
243    ///         ColumnMapping::new("id"),
244    ///         ColumnMapping::new("product"),
245    ///         ColumnMapping::new("quantity"),
246    ///         ColumnMapping::new("price"),
247    ///         ColumnMapping::with_expression("total", "quantity * price"),
248    ///         ColumnMapping::with_expression("created_at", "NOW()"),
249    ///     ];
250    ///
251    ///     // For simple table names in the public schema, use quoted name
252    ///     // For qualified names, use target_table_def.qualified_name()
253    ///     let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
254    ///
255    ///     inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
256    ///     inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
257    ///
258    ///     let rows = inserter.execute()?;
259    ///     Ok(())
260    /// }
261    /// ```
262    ///
263    /// # Errors
264    ///
265    /// - Returns an error if `target_table` fails to convert into a
266    ///   [`TableName`](crate::TableName).
267    /// - Returns [`Error::Client`] if creating the temporary staging table
268    ///   fails on the server.
269    /// - Returns the errors from [`Inserter::new`] for the staging table
270    ///   (zero-column table definition, gRPC transport).
271    pub fn with_column_mappings<T>(
272        connection: &'conn Connection,
273        inserter_def: &TableDefinition,
274        target_table: T,
275        mappings: &[ColumnMapping],
276    ) -> Result<MappedInserter<'conn>>
277    where
278        T: TryInto<crate::TableName>,
279        crate::Error: From<T::Error>,
280    {
281        MappedInserter::new(connection, inserter_def, target_table, mappings)
282    }
283
284    /// Returns the table definition.
285    pub fn table_definition(&self) -> &TableDefinition {
286        &self.table_def
287    }
288
289    /// Returns the number of columns.
290    #[must_use]
291    pub fn column_count(&self) -> usize {
292        self.table_def.column_count()
293    }
294
295    /// Returns the number of complete rows buffered.
296    #[must_use]
297    pub fn row_count(&self) -> u64 {
298        self.row_count
299    }
300
301    /// Adds a NULL value for the current column.
302    ///
303    /// # Errors
304    ///
305    /// Returns [`Error::Other`] if the current row already has all columns
306    /// supplied, or if the current column is marked `NOT NULL` in the table
307    /// definition.
308    #[inline]
309    pub fn add_null(&mut self) -> Result<()> {
310        self.chunk.add_null()
311    }
312
313    /// Adds a boolean value.
314    ///
315    /// # Errors
316    ///
317    /// Returns [`Error::Other`] with message `"Too many columns in row"` if
318    /// the current row already has all columns supplied.
319    #[inline]
320    pub fn add_bool(&mut self, value: bool) -> Result<()> {
321        self.chunk.add_bool(value)
322    }
323
324    /// Adds an i16 value (SMALLINT).
325    ///
326    /// # Errors
327    ///
328    /// See [`add_bool`](Self::add_bool).
329    #[inline]
330    pub fn add_i16(&mut self, value: i16) -> Result<()> {
331        self.chunk.add_i16(value)
332    }
333
334    /// Adds an i32 value (INT).
335    ///
336    /// # Errors
337    ///
338    /// See [`add_bool`](Self::add_bool).
339    #[inline]
340    pub fn add_i32(&mut self, value: i32) -> Result<()> {
341        self.chunk.add_i32(value)
342    }
343
344    /// Adds an i64 value (BIGINT).
345    ///
346    /// # Errors
347    ///
348    /// See [`add_bool`](Self::add_bool).
349    #[inline]
350    pub fn add_i64(&mut self, value: i64) -> Result<()> {
351        self.chunk.add_i64(value)
352    }
353
354    /// Adds an f32 value (REAL/FLOAT4).
355    ///
356    /// # Errors
357    ///
358    /// See [`add_bool`](Self::add_bool).
359    #[inline]
360    pub fn add_f32(&mut self, value: f32) -> Result<()> {
361        self.chunk.add_f32(value)
362    }
363
364    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
365    ///
366    /// # Errors
367    ///
368    /// See [`add_bool`](Self::add_bool).
369    #[inline]
370    pub fn add_f64(&mut self, value: f64) -> Result<()> {
371        self.chunk.add_f64(value)
372    }
373
374    /// Adds a string value (TEXT/VARCHAR).
375    ///
376    /// # Errors
377    ///
378    /// See [`add_bool`](Self::add_bool).
379    #[inline]
380    pub fn add_str(&mut self, value: &str) -> Result<()> {
381        self.chunk.add_str(value)
382    }
383
384    /// Adds a bytes value (BYTEA).
385    ///
386    /// # Errors
387    ///
388    /// See [`add_bool`](Self::add_bool).
389    #[inline]
390    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
391        self.chunk.add_bytes(value)
392    }
393
394    /// Adds a 128-bit value (NUMERIC/INTERVAL).
395    ///
396    /// # Errors
397    ///
398    /// See [`add_bool`](Self::add_bool).
399    #[inline]
400    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
401        self.chunk.add_data128(value)
402    }
403
404    /// Adds an optional value. If None, adds NULL.
405    ///
406    /// # Errors
407    ///
408    /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
409    /// return for the current row position.
410    pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
411    where
412        F: FnOnce(&mut Self, T) -> Result<()>,
413    {
414        match value {
415            Some(v) => add_fn(self, v),
416            None => self.add_null(),
417        }
418    }
419
420    /// Ends the current row.
421    ///
422    /// Returns an error if the wrong number of columns were added.
423    /// Automatically flushes the buffer if chunk limits are reached.
424    ///
425    /// # Errors
426    ///
427    /// - Returns [`Error::Other`] if fewer (or more) columns were supplied
428    ///   than the table definition requires.
429    /// - Returns any error from [`flush`](Self::flush) when an automatic
430    ///   flush is triggered by reaching the chunk byte/row limit.
431    pub fn end_row(&mut self) -> Result<()> {
432        self.chunk.end_row()?;
433        self.row_count += 1;
434
435        // Auto-flush if we've reached chunk limits
436        if self.chunk.should_flush() {
437            self.flush()?;
438        }
439
440        Ok(())
441    }
442
443    /// Flushes the current buffer to the server.
444    ///
445    /// This sends all buffered rows as a chunk and resets the buffer.
446    /// Called automatically when chunk limits are reached.
447    ///
448    /// # Errors
449    ///
450    /// - Returns [`Error::Other`] if the connection is using gRPC transport
451    ///   (COPY is TCP-only) and no COPY session exists yet.
452    /// - Returns [`Error::Client`] if the server rejects the `COPY IN` start
453    ///   or the subsequent data send.
454    /// - Returns [`Error::Io`] on transport-level I/O failures while writing
455    ///   the chunk.
456    pub fn flush(&mut self) -> Result<()> {
457        if self.chunk.is_empty() {
458            return Ok(());
459        }
460
461        let chunk_rows = self.chunk.row_count();
462        let Some(buffer) = self.chunk.take() else {
463            return Ok(());
464        };
465
466        // Ensure the COPY connection is started
467        if self.writer.is_none() {
468            let client = self.connection.tcp_client().ok_or_else(|| {
469                crate::Error::new("Inserter requires a TCP connection. gRPC connections do not support COPY operations.")
470            })?;
471            let columns: Vec<&str> = self
472                .table_def
473                .columns
474                .iter()
475                .map(|c| c.name.as_str())
476                .collect();
477            let table_name = self.table_def.qualified_name();
478            self.writer = Some(client.copy_in(&table_name, &columns)?);
479        }
480
481        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
482        // into the connection's write buffer. flush_stream ensures the data
483        // reaches the server before we return.
484        if let Some(ref mut writer) = self.writer {
485            writer.send_direct(&buffer)?;
486            writer.flush_stream()?;
487        }
488
489        debug!(
490            target: "hyperdb_api",
491            chunk = self.chunk_count,
492            rows = chunk_rows,
493            bytes = buffer.len(),
494            "inserter-chunk"
495        );
496
497        self.chunk_count += 1;
498        Ok(())
499    }
500
501    /// Adds a complete row of values.
502    ///
503    /// This is a convenience method that adds all column values at once
504    /// using the `IntoValue` trait for type-safe insertion.
505    ///
506    /// # Arguments
507    ///
508    /// * `values` - A slice of values implementing `IntoValue`.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the number of values doesn't match the column count,
513    /// or if any value cannot be added.
514    ///
515    /// # Example
516    ///
517    /// ```no_run
518    /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
519    ///
520    /// fn main() -> Result<()> {
521    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
522    ///
523    ///     let table_def = TableDefinition::new("users")
524    ///         .add_required_column("id", SqlType::int())
525    ///         .add_nullable_column("name", SqlType::text());
526    ///
527    ///     Catalog::new(&conn).create_table(&table_def)?;
528    ///
529    ///     let mut inserter = Inserter::new(&conn, &table_def)?;
530    ///
531    ///     // Add rows using IntoValue trait
532    ///     inserter.add_row(&[&1i32, &"Alice"])?;
533    ///     inserter.add_row(&[&2i32, &"Bob"])?;
534    ///
535    ///     // Option<T> can be used for nullable columns
536    ///     inserter.add_row(&[&3i32, &None::<&str>])?;
537    ///
538    ///     let rows = inserter.execute()?;
539    ///     Ok(())
540    /// }
541    /// ```
542    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
543        let column_count = self.table_def.column_count();
544        if values.len() != column_count {
545            return Err(Error::new(format!(
546                "Column count mismatch: expected {} columns but got {}",
547                column_count,
548                values.len()
549            )));
550        }
551
552        for value in values {
553            value.add_to_inserter(self)?;
554        }
555
556        self.end_row()?;
557        Ok(())
558    }
559
560    /// Adds a Date value.
561    ///
562    /// # Errors
563    ///
564    /// See [`add_bool`](Self::add_bool).
565    #[inline]
566    pub fn add_date(&mut self, value: Date) -> Result<()> {
567        self.chunk.add_date(value)
568    }
569
570    /// Adds a Time value.
571    ///
572    /// # Errors
573    ///
574    /// See [`add_bool`](Self::add_bool).
575    #[inline]
576    pub fn add_time(&mut self, value: Time) -> Result<()> {
577        self.chunk.add_time(value)
578    }
579
580    /// Adds a Timestamp value.
581    ///
582    /// # Errors
583    ///
584    /// See [`add_bool`](Self::add_bool).
585    #[inline]
586    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
587        self.chunk.add_timestamp(value)
588    }
589
590    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
591    ///
592    /// # Errors
593    ///
594    /// See [`add_bool`](Self::add_bool).
595    #[inline]
596    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
597        self.chunk.add_offset_timestamp(value)
598    }
599
600    /// Adds an Interval value.
601    ///
602    /// # Errors
603    ///
604    /// See [`add_bool`](Self::add_bool).
605    #[inline]
606    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
607        self.chunk.add_interval(value)
608    }
609
610    /// Adds a Numeric value.
611    ///
612    /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
613    /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if the column's precision cannot be determined from the
618    /// table definition. Ensure that NUMERIC columns are defined with explicit
619    /// `SqlType` information including precision.
620    pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
621        let column_index = self.chunk.column_index();
622
623        // Check the column's precision to determine storage format
624        let precision = self
625            .table_def
626            .columns
627            .get(column_index)
628            .and_then(super::table_definition::ColumnDefinition::sql_type)
629            .and_then(|t| t.precision())
630            .ok_or_else(|| {
631                let col_name = self
632                    .table_def
633                    .columns
634                    .get(column_index)
635                    .map_or("<unknown>", |c| c.name.as_str());
636                Error::new(format!(
637                    "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
638                     Ensure the column is defined with explicit SqlType including precision.\n\n\
639                     Example fix:\n  \
640                     table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
641                ))
642            })?;
643
644        if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
645            // Small numeric: stored as i64
646            let unscaled = value.unscaled_value();
647            let narrowed = i64::try_from(unscaled).map_err(|_| {
648                Error::new(format!(
649                    "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
650                ))
651            })?;
652            self.chunk.add_i64(narrowed)
653        } else {
654            // Big numeric: stored as 128-bit
655            self.chunk.add_data128(&value.to_packed())
656        }
657    }
658
659    /// Executes the insert and commits all buffered rows.
660    ///
661    /// This sends any remaining buffered data and finishes the COPY operation.
662    /// Returns the number of rows inserted.
663    ///
664    /// The inserter is single-use: calling `execute` a second time returns
665    /// `Ok(0)` because the internal row counter has been reset and no further
666    /// data has been added. To insert additional batches, create a new
667    /// [`Inserter`].
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if:
672    /// - There's an incomplete row (`column_index` != 0)
673    /// - The COPY connection fails to start
674    /// - Sending data fails
675    pub fn execute(&mut self) -> Result<u64> {
676        if self.chunk.column_index() != 0 {
677            return Err(Error::new("Incomplete row at execute time"));
678        }
679
680        if self.row_count == 0 {
681            return Ok(0);
682        }
683
684        // Ensure COPY connection exists before proceeding when we have rows
685        if self.writer.is_none() {
686            let client = self.connection.tcp_client().ok_or_else(|| {
687                Error::new("Inserter requires a TCP connection. gRPC connections do not support COPY operations.")
688            })?;
689            let columns: Vec<&str> = self
690                .table_def
691                .columns
692                .iter()
693                .map(|c| c.name.as_str())
694                .collect();
695            let table_name = self.table_def.qualified_name();
696            self.writer = Some(client.copy_in(&table_name, &columns)?);
697        }
698
699        // At this point, writer must exist since we have rows
700        let writer = self
701            .writer
702            .as_mut()
703            .ok_or_else(|| Error::new("Failed to initialize COPY connection for inserter"))?;
704
705        // If we have buffered data that hasn't been sent yet
706        if !self.chunk.is_empty() {
707            writer.send(self.chunk.buffer())?;
708        }
709
710        // Write and send the COPY trailer
711        let mut trailer_buf = BytesMut::with_capacity(2);
712        copy::write_trailer(&mut trailer_buf);
713        writer.send(&trailer_buf)?;
714
715        // Finish the COPY operation
716        let rows = self
717            .writer
718            .take()
719            .map(hyperdb_api_core::client::CopyInWriter::finish)
720            .transpose()?
721            .unwrap_or(0);
722
723        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
724        info!(
725            target: "hyperdb_api",
726            rows,
727            chunks = self.chunk_count,
728            duration_ms,
729            table = %self.table_def.qualified_name(),
730            "inserter-end"
731        );
732
733        // Reset row counter so a stray second execute() call returns Ok(0)
734        // instead of attempting another COPY trailer on a finished writer.
735        self.row_count = 0;
736
737        Ok(rows)
738    }
739
740    /// Cancels the insert and discards all buffered rows.
741    pub fn cancel(&mut self) {
742        // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
743        // sends a CopyFail to the server.
744        self.writer = None;
745        self.row_count = 0;
746    }
747}
748
749// =============================================================================
750// ColumnMapping
751// =============================================================================
752
753/// Defines how a column receives its value during insertion.
754///
755/// Column mappings allow you to:
756/// - Insert values directly from the inserter stream
757/// - Compute values using SQL expressions
758/// - Use server-side functions like `NOW()` or `DEFAULT`
759///
760/// # Example
761///
762/// ```
763/// use hyperdb_api::ColumnMapping;
764///
765/// // Simple column - insert value directly
766/// let id_col = ColumnMapping::new("id");
767///
768/// // Column with expression - computed value
769/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
770/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
771/// ```
772#[derive(Debug, Clone)]
773#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
774pub struct ColumnMapping {
775    /// The name of the target column.
776    pub column_name: String,
777    /// Optional SQL expression. If None, the value is inserted directly.
778    pub expression: Option<String>,
779}
780
781impl ColumnMapping {
782    /// Creates a column mapping for direct value insertion.
783    ///
784    /// The column will receive values directly from the inserter.
785    pub fn new(column_name: impl Into<String>) -> Self {
786        ColumnMapping {
787            column_name: column_name.into(),
788            expression: None,
789        }
790    }
791
792    /// Creates a column mapping with a SQL expression.
793    ///
794    /// The column value will be computed using the given SQL expression.
795    /// The expression can reference other columns or use SQL functions.
796    ///
797    /// # Arguments
798    ///
799    /// * `column_name` - The name of the target column.
800    /// * `expression` - A SQL expression to compute the column value.
801    ///
802    /// # Example
803    ///
804    /// ```
805    /// use hyperdb_api::ColumnMapping;
806    ///
807    /// // Use current timestamp
808    /// let created = ColumnMapping::with_expression("created_at", "NOW()");
809    ///
810    /// // Compute from other columns
811    /// let total = ColumnMapping::with_expression("total", "quantity * price");
812    /// ```
813    pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
814        ColumnMapping {
815            column_name: column_name.into(),
816            expression: Some(expression.into()),
817        }
818    }
819
820    /// Returns the column name.
821    #[must_use]
822    pub fn column_name(&self) -> &str {
823        &self.column_name
824    }
825
826    /// Returns the SQL expression, if any.
827    #[must_use]
828    pub fn expression(&self) -> Option<&str> {
829        self.expression.as_deref()
830    }
831
832    /// Returns true if this is a direct value mapping (no expression).
833    #[must_use]
834    pub fn is_direct(&self) -> bool {
835        self.expression.is_none()
836    }
837
838    /// Returns the select list item for this mapping.
839    fn to_select_item(&self) -> String {
840        match &self.expression {
841            Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
842            None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
843        }
844    }
845}
846
847// =============================================================================
848// IntoValue Trait
849// =============================================================================
850
851/// Trait for types that can be inserted into a Hyper table.
852///
853/// This trait is implemented for common Rust types, allowing them to be
854/// used with [`Inserter::add_row()`] for type-safe insertion.
855///
856/// # Supported Types
857///
858/// - Integers: `i16`, `i32`, `i64`
859/// - Floats: `f32`, `f64`
860/// - `bool`
861/// - `&str`, `String`
862/// - `Option<T>` where `T: IntoValue` (for nullable columns)
863/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
864/// - `Numeric`, `Geography`, `Oid`, `Vec<u8>` (bytes)
865///
866/// # Example
867///
868/// ```no_run
869/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
870///
871/// fn main() -> Result<()> {
872///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
873///
874///     let table_def = TableDefinition::new("example")
875///         .add_required_column("a", SqlType::int())
876///         .add_nullable_column("b", SqlType::text())
877///         .add_nullable_column("c", SqlType::double());
878///     Catalog::new(&conn).create_table(&table_def)?;
879///
880///     let mut inserter = Inserter::new(&conn, &table_def)?;
881///
882///     // IntoValue allows adding rows with mixed types
883///     inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
884///     inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
885///
886///     inserter.execute()?;
887///     Ok(())
888/// }
889/// ```
890pub trait IntoValue {
891    /// Adds this value to the inserter.
892    ///
893    /// # Errors
894    ///
895    /// Implementations call the matching `Inserter::add_*` method and
896    /// forward its error — see [`Inserter::add_bool`] for the shared
897    /// failure modes (too many columns, NULL into non-nullable, etc).
898    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
899}
900
901// Implementations for basic types
902
903impl IntoValue for bool {
904    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
905        inserter.add_bool(*self)
906    }
907}
908
909impl IntoValue for i16 {
910    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
911        inserter.add_i16(*self)
912    }
913}
914
915impl IntoValue for i32 {
916    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
917        inserter.add_i32(*self)
918    }
919}
920
921impl IntoValue for i64 {
922    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923        inserter.add_i64(*self)
924    }
925}
926
927impl IntoValue for f32 {
928    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929        inserter.add_f32(*self)
930    }
931}
932
933impl IntoValue for f64 {
934    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935        inserter.add_f64(*self)
936    }
937}
938
939impl IntoValue for str {
940    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941        inserter.add_str(self)
942    }
943}
944
945impl IntoValue for String {
946    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947        inserter.add_str(self)
948    }
949}
950
951impl IntoValue for [u8] {
952    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953        inserter.add_bytes(self)
954    }
955}
956
957impl IntoValue for Vec<u8> {
958    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959        inserter.add_bytes(self)
960    }
961}
962
963// Hyper-specific types
964
965impl IntoValue for Date {
966    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
967        inserter.add_date(*self)
968    }
969}
970
971impl IntoValue for Time {
972    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
973        inserter.add_time(*self)
974    }
975}
976
977impl IntoValue for Timestamp {
978    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
979        inserter.add_timestamp(*self)
980    }
981}
982
983impl IntoValue for OffsetTimestamp {
984    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985        inserter.add_offset_timestamp(*self)
986    }
987}
988
989impl IntoValue for Interval {
990    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991        inserter.add_interval(*self)
992    }
993}
994
995impl IntoValue for Numeric {
996    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997        inserter.add_numeric(*self)
998    }
999}
1000
1001// Option<T> for nullable values
1002impl<T: IntoValue> IntoValue for Option<T> {
1003    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1004        match self {
1005            Some(value) => value.add_to_inserter(inserter),
1006            None => inserter.add_null(),
1007        }
1008    }
1009}
1010
1011// Reference implementations for primitives
1012
1013impl IntoValue for &bool {
1014    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1015        inserter.add_bool(**self)
1016    }
1017}
1018
1019impl IntoValue for &i16 {
1020    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021        inserter.add_i16(**self)
1022    }
1023}
1024
1025impl IntoValue for &i32 {
1026    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1027        inserter.add_i32(**self)
1028    }
1029}
1030
1031impl IntoValue for &i64 {
1032    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1033        inserter.add_i64(**self)
1034    }
1035}
1036
1037impl IntoValue for &f32 {
1038    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039        inserter.add_f32(**self)
1040    }
1041}
1042
1043impl IntoValue for &f64 {
1044    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045        inserter.add_f64(**self)
1046    }
1047}
1048
1049impl IntoValue for &String {
1050    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051        inserter.add_str(self)
1052    }
1053}
1054
1055impl IntoValue for &str {
1056    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057        inserter.add_str(self)
1058    }
1059}
1060
1061impl IntoValue for &&str {
1062    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063        inserter.add_str(self)
1064    }
1065}
1066
1067impl IntoValue for &[u8] {
1068    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069        inserter.add_bytes(self)
1070    }
1071}
1072
1073impl IntoValue for &Date {
1074    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075        inserter.add_date(**self)
1076    }
1077}
1078
1079impl IntoValue for &Time {
1080    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081        inserter.add_time(**self)
1082    }
1083}
1084
1085impl IntoValue for &Timestamp {
1086    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087        inserter.add_timestamp(**self)
1088    }
1089}
1090
1091impl IntoValue for &OffsetTimestamp {
1092    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093        inserter.add_offset_timestamp(**self)
1094    }
1095}
1096
1097impl IntoValue for &Interval {
1098    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099        inserter.add_interval(**self)
1100    }
1101}
1102
1103impl IntoValue for &Numeric {
1104    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105        inserter.add_numeric(**self)
1106    }
1107}
1108
1109// =============================================================================
1110// MappedInserter
1111// =============================================================================
1112
1113/// An inserter that supports SQL expression mappings.
1114///
1115/// This inserter uses a staging table to support computed columns via
1116/// INSERT...SELECT with SQL expressions. It's created by
1117/// [`Inserter::with_column_mappings`].
1118#[derive(Debug)]
1119pub struct MappedInserter<'conn> {
1120    /// The underlying inserter for the staging table.
1121    inner: Inserter<'conn>,
1122    /// The target table name.
1123    target_table: crate::TableName,
1124    /// The column mappings.
1125    mappings: Vec<ColumnMapping>,
1126    /// The staging table name.
1127    staging_table: String,
1128}
1129
1130impl<'conn> MappedInserter<'conn> {
1131    /// Creates a new mapped inserter.
1132    fn new<T>(
1133        connection: &'conn Connection,
1134        inserter_def: &TableDefinition,
1135        target_table: T,
1136        mappings: &[ColumnMapping],
1137    ) -> Result<Self>
1138    where
1139        T: TryInto<crate::TableName>,
1140        crate::Error: From<T::Error>,
1141    {
1142        let target_table = target_table.try_into()?;
1143
1144        // Create a unique staging table name
1145        let staging_table = format!("_hyper_staging_{}", std::process::id());
1146
1147        // Create the staging table definition (temporary)
1148        let mut staging_def = inserter_def.clone();
1149        staging_def.name.clone_from(&staging_table);
1150
1151        // Create the staging table
1152        let create_sql = staging_def.to_create_sql(true)?;
1153        let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1154        connection.execute_command(&create_temp)?;
1155
1156        // Create the inner inserter for the staging table
1157        let inner = Inserter::new(connection, &staging_def)?;
1158
1159        Ok(MappedInserter {
1160            inner,
1161            target_table,
1162            mappings: mappings.to_vec(),
1163            staging_table,
1164        })
1165    }
1166
1167    /// Adds a row of values to the inserter.
1168    ///
1169    /// The values should correspond to the columns in the inserter definition,
1170    /// not the target table.
1171    ///
1172    /// # Errors
1173    ///
1174    /// Forwards the error from [`Inserter::add_row`].
1175    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1176        self.inner.add_row(values)
1177    }
1178
1179    /// Adds a NULL value.
1180    ///
1181    /// # Errors
1182    ///
1183    /// Forwards the error from [`Inserter::add_null`].
1184    pub fn add_null(&mut self) -> Result<()> {
1185        self.inner.add_null()
1186    }
1187
1188    /// Adds a boolean value.
1189    ///
1190    /// # Errors
1191    ///
1192    /// Forwards the error from [`Inserter::add_bool`].
1193    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1194        self.inner.add_bool(value)
1195    }
1196
1197    /// Adds an i16 value.
1198    ///
1199    /// # Errors
1200    ///
1201    /// Forwards the error from [`Inserter::add_i16`].
1202    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1203        self.inner.add_i16(value)
1204    }
1205
1206    /// Adds an i32 value.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Forwards the error from [`Inserter::add_i32`].
1211    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1212        self.inner.add_i32(value)
1213    }
1214
1215    /// Adds an i64 value.
1216    ///
1217    /// # Errors
1218    ///
1219    /// Forwards the error from [`Inserter::add_i64`].
1220    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1221        self.inner.add_i64(value)
1222    }
1223
1224    /// Adds an f32 value.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Forwards the error from [`Inserter::add_f32`].
1229    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1230        self.inner.add_f32(value)
1231    }
1232
1233    /// Adds an f64 value.
1234    ///
1235    /// # Errors
1236    ///
1237    /// Forwards the error from [`Inserter::add_f64`].
1238    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1239        self.inner.add_f64(value)
1240    }
1241
1242    /// Adds a string value.
1243    ///
1244    /// # Errors
1245    ///
1246    /// Forwards the error from [`Inserter::add_str`].
1247    pub fn add_str(&mut self, value: &str) -> Result<()> {
1248        self.inner.add_str(value)
1249    }
1250
1251    /// Adds a bytes value.
1252    ///
1253    /// # Errors
1254    ///
1255    /// Forwards the error from [`Inserter::add_bytes`].
1256    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1257        self.inner.add_bytes(value)
1258    }
1259
1260    /// Ends the current row.
1261    ///
1262    /// # Errors
1263    ///
1264    /// Forwards the error from [`Inserter::end_row`].
1265    pub fn end_row(&mut self) -> Result<()> {
1266        self.inner.end_row()
1267    }
1268
1269    /// Executes the insert with column mappings.
1270    ///
1271    /// This method:
1272    /// 1. Inserts all buffered rows into the staging table
1273    /// 2. Executes INSERT...SELECT from staging to target with mappings
1274    /// 3. Drops the staging table
1275    ///
1276    /// Returns the number of rows inserted into the target table.
1277    ///
1278    /// # Errors
1279    ///
1280    /// - Returns the error from the inner [`Inserter::execute`] if writing
1281    ///   the staging rows fails.
1282    /// - Returns [`Error::Client`] if the `INSERT ... SELECT` from staging
1283    ///   to the target table is rejected (e.g. a mapping expression fails
1284    ///   to evaluate).
1285    /// - Returns [`Error::Client`] if dropping the staging table fails.
1286    pub fn execute(&mut self) -> Result<u64> {
1287        let connection = self.inner.connection;
1288        let staging_table = self.staging_table.clone();
1289
1290        // Insert data into staging table
1291        let _staging_rows = self.inner.execute()?;
1292
1293        // Build the INSERT...SELECT statement
1294        use hyperdb_api_core::protocol::escape::SqlIdentifier;
1295
1296        let target_columns: Vec<String> = self
1297            .mappings
1298            .iter()
1299            .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1300            .collect();
1301
1302        let select_items: Vec<String> = self
1303            .mappings
1304            .iter()
1305            .map(ColumnMapping::to_select_item)
1306            .collect();
1307
1308        let sql = format!(
1309            "INSERT INTO {} ({}) SELECT {} FROM {}",
1310            self.target_table,
1311            target_columns.join(", "),
1312            select_items.join(", "),
1313            SqlIdentifier(&staging_table),
1314        );
1315
1316        // Execute the INSERT...SELECT (returns row count directly)
1317        let row_count = connection.execute_command(&sql)?;
1318
1319        // Drop the staging table
1320        connection.execute_command(&format!(
1321            "DROP TABLE IF EXISTS {}",
1322            SqlIdentifier(&staging_table)
1323        ))?;
1324
1325        // Return the number of rows inserted
1326        Ok(row_count)
1327    }
1328
1329    /// Cancels the insert and drops the staging table.
1330    ///
1331    /// This method handles cleanup failures gracefully by logging warnings
1332    /// instead of returning errors. This prevents masking the original error
1333    /// that caused the cancellation.
1334    ///
1335    /// # Logging
1336    ///
1337    /// Cleanup failures are logged using the `tracing` crate at WARN level.
1338    /// If `tracing` is not initialized, errors are written to stderr.
1339    pub fn cancel(&mut self) {
1340        let connection = self.inner.connection;
1341        let staging_table = &self.staging_table;
1342
1343        // Drop the staging table, but don't fail if cleanup fails
1344        // This avoids masking the original error that caused cancellation
1345        if let Err(e) = connection.execute_command(&format!(
1346            "DROP TABLE IF EXISTS \"{}\"",
1347            staging_table.replace('"', "\"\"")
1348        )) {
1349            // Log the cleanup failure for debugging
1350            // In production, consider using a logging framework like `tracing`
1351            eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1352        }
1353    }
1354}
1355
1356// =============================================================================
1357// InsertChunk - Thread-safe chunk for parallel encoding
1358// =============================================================================
1359
1360/// A thread-safe chunk for encoding rows in parallel.
1361///
1362/// `InsertChunk` can be created and populated in any thread, then sent to a
1363/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1364/// multiple worker threads while serializing the actual network sends.
1365///
1366/// # Example
1367///
1368/// ```no_run
1369/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1370///
1371/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1372///     let mut chunk = InsertChunk::from_table_definition(table_def);
1373///     
1374///     for i in 0..1000 {
1375///         chunk.add_i32(start_id + i)?;
1376///         chunk.add_str(&format!("Item {}", start_id + i))?;
1377///         chunk.end_row()?;
1378///     }
1379///     
1380///     Ok(chunk)
1381/// }
1382/// ```
1383#[derive(Debug)]
1384pub struct InsertChunk {
1385    buffer: BytesMut,
1386    header_written: bool,
1387    column_index: usize,
1388    column_count: usize,
1389    row_count: usize,
1390    column_nullable: Vec<bool>,
1391}
1392
1393// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1394// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1395// thread-local state. The manual `unsafe impl` exists only because the
1396// auto-trait derivation is conservative for this struct's compilation context;
1397// the compound type has no `!Send` components.
1398unsafe impl Send for InsertChunk {}
1399// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1400// and there is no interior mutability crossing a `&InsertChunk` boundary,
1401// so sharing `&InsertChunk` across threads is sound.
1402unsafe impl Sync for InsertChunk {}
1403
1404impl InsertChunk {
1405    /// Creates a new empty chunk with the given schema.
1406    ///
1407    /// # Arguments
1408    ///
1409    /// * `column_count` - Number of columns per row
1410    /// * `column_nullable` - Whether each column is nullable
1411    #[must_use]
1412    pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1413        debug_assert_eq!(column_count, column_nullable.len());
1414        InsertChunk {
1415            buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1416            header_written: false,
1417            column_index: 0,
1418            column_count,
1419            row_count: 0,
1420            column_nullable,
1421        }
1422    }
1423
1424    /// Creates a chunk from a table definition.
1425    #[must_use]
1426    pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1427        let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1428        Self::new(table_def.column_count(), column_nullable)
1429    }
1430
1431    /// Returns the number of complete rows in this chunk.
1432    #[must_use]
1433    pub fn row_count(&self) -> usize {
1434        self.row_count
1435    }
1436
1437    /// Returns the current buffer size in bytes.
1438    #[must_use]
1439    pub fn buffer_size(&self) -> usize {
1440        self.buffer.len()
1441    }
1442
1443    /// Returns true if the chunk has reached size or row limits and should be sent.
1444    #[must_use]
1445    pub fn should_flush(&self) -> bool {
1446        self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1447    }
1448
1449    /// Returns true if the chunk is empty (no rows).
1450    #[must_use]
1451    pub fn is_empty(&self) -> bool {
1452        self.row_count == 0
1453    }
1454
1455    /// Takes the buffer, consuming the chunk data.
1456    ///
1457    /// Returns `None` if the chunk is empty. After calling this, the chunk
1458    /// can be reused by calling the add_* methods again.
1459    ///
1460    /// Note: The header flag is NOT reset - subsequent chunks from the same
1461    /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1462    /// header per COPY stream).
1463    pub fn take(&mut self) -> Option<BytesMut> {
1464        if self.row_count == 0 {
1465            return None;
1466        }
1467        // Don't reset header_written - only first chunk should have header
1468        self.row_count = 0;
1469        Some(std::mem::take(&mut self.buffer))
1470    }
1471
1472    /// Resets the chunk for reuse without reallocating.
1473    pub fn clear(&mut self) {
1474        self.buffer.clear();
1475        self.header_written = false;
1476        self.column_index = 0;
1477        self.row_count = 0;
1478    }
1479
1480    #[allow(
1481        clippy::inline_always,
1482        reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1483    )]
1484    fn ensure_header(&mut self) {
1485        if !self.header_written {
1486            copy::write_header(&mut self.buffer);
1487            self.header_written = true;
1488        }
1489    }
1490
1491    #[expect(
1492        clippy::inline_always,
1493        reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1494    )]
1495    #[inline(always)]
1496    fn current_column_nullable(&self) -> bool {
1497        *self.column_nullable.get(self.column_index).unwrap_or(&true)
1498    }
1499
1500    /// Adds a NULL value for the current column.
1501    ///
1502    /// # Errors
1503    ///
1504    /// - Returns [`Error::Other`] with message `"Too many columns in row"`
1505    ///   if the current row already has all columns supplied.
1506    /// - Returns [`Error::Other`] with message
1507    ///   `"Cannot add NULL to non-nullable column"` if the current column
1508    ///   is `NOT NULL` in the schema.
1509    pub fn add_null(&mut self) -> Result<()> {
1510        if self.column_index >= self.column_count {
1511            return Err(Error::new("Too many columns in row"));
1512        }
1513        if !self.current_column_nullable() {
1514            return Err(Error::new("Cannot add NULL to non-nullable column"));
1515        }
1516        self.ensure_header();
1517        copy::write_null(&mut self.buffer);
1518        self.column_index += 1;
1519        Ok(())
1520    }
1521
1522    /// Adds a boolean value.
1523    ///
1524    /// # Errors
1525    ///
1526    /// Returns [`Error::Other`] with message `"Too many columns in row"` if
1527    /// the current row already has all columns supplied.
1528    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1529        if self.column_index >= self.column_count {
1530            return Err(Error::new("Too many columns in row"));
1531        }
1532        self.ensure_header();
1533        let int_value = i8::from(value);
1534        if self.current_column_nullable() {
1535            copy::write_i8(&mut self.buffer, int_value);
1536        } else {
1537            copy::write_i8_not_null(&mut self.buffer, int_value);
1538        }
1539        self.column_index += 1;
1540        Ok(())
1541    }
1542
1543    /// Adds an i16 value (SMALLINT).
1544    ///
1545    /// # Errors
1546    ///
1547    /// See [`add_bool`](Self::add_bool).
1548    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1549        if self.column_index >= self.column_count {
1550            return Err(Error::new("Too many columns in row"));
1551        }
1552        self.ensure_header();
1553        if self.current_column_nullable() {
1554            copy::write_i16(&mut self.buffer, value);
1555        } else {
1556            copy::write_i16_not_null(&mut self.buffer, value);
1557        }
1558        self.column_index += 1;
1559        Ok(())
1560    }
1561
1562    /// Adds an i32 value (INT).
1563    ///
1564    /// # Errors
1565    ///
1566    /// See [`add_bool`](Self::add_bool).
1567    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1568        if self.column_index >= self.column_count {
1569            return Err(Error::new("Too many columns in row"));
1570        }
1571        self.ensure_header();
1572        if self.current_column_nullable() {
1573            copy::write_i32(&mut self.buffer, value);
1574        } else {
1575            copy::write_i32_not_null(&mut self.buffer, value);
1576        }
1577        self.column_index += 1;
1578        Ok(())
1579    }
1580
1581    /// Adds an i64 value (BIGINT).
1582    ///
1583    /// # Errors
1584    ///
1585    /// See [`add_bool`](Self::add_bool).
1586    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1587        if self.column_index >= self.column_count {
1588            return Err(Error::new("Too many columns in row"));
1589        }
1590        self.ensure_header();
1591        if self.current_column_nullable() {
1592            copy::write_i64(&mut self.buffer, value);
1593        } else {
1594            copy::write_i64_not_null(&mut self.buffer, value);
1595        }
1596        self.column_index += 1;
1597        Ok(())
1598    }
1599
1600    /// Adds an f32 value (REAL/FLOAT4).
1601    ///
1602    /// # Errors
1603    ///
1604    /// See [`add_bool`](Self::add_bool).
1605    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1606        if self.column_index >= self.column_count {
1607            return Err(Error::new("Too many columns in row"));
1608        }
1609        self.ensure_header();
1610        if self.current_column_nullable() {
1611            copy::write_f32(&mut self.buffer, value);
1612        } else {
1613            copy::write_f32_not_null(&mut self.buffer, value);
1614        }
1615        self.column_index += 1;
1616        Ok(())
1617    }
1618
1619    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1620    ///
1621    /// # Errors
1622    ///
1623    /// See [`add_bool`](Self::add_bool).
1624    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1625        if self.column_index >= self.column_count {
1626            return Err(Error::new("Too many columns in row"));
1627        }
1628        self.ensure_header();
1629        if self.current_column_nullable() {
1630            copy::write_f64(&mut self.buffer, value);
1631        } else {
1632            copy::write_f64_not_null(&mut self.buffer, value);
1633        }
1634        self.column_index += 1;
1635        Ok(())
1636    }
1637
1638    /// Adds a string value (TEXT/VARCHAR).
1639    ///
1640    /// # Errors
1641    ///
1642    /// See [`add_bool`](Self::add_bool).
1643    pub fn add_str(&mut self, value: &str) -> Result<()> {
1644        self.add_bytes(value.as_bytes())
1645    }
1646
1647    /// Adds a bytes value (BYTEA).
1648    ///
1649    /// # Errors
1650    ///
1651    /// See [`add_bool`](Self::add_bool).
1652    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1653        if self.column_index >= self.column_count {
1654            return Err(Error::new("Too many columns in row"));
1655        }
1656        if value.len() > u32::MAX as usize {
1657            return Err(Error::new(format!(
1658                "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1659                value.len(),
1660                u32::MAX
1661            )));
1662        }
1663        self.ensure_header();
1664        if self.current_column_nullable() {
1665            copy::write_varbinary(&mut self.buffer, value);
1666        } else {
1667            copy::write_varbinary_not_null(&mut self.buffer, value);
1668        }
1669        self.column_index += 1;
1670        Ok(())
1671    }
1672
1673    /// Adds a 128-bit value (NUMERIC/INTERVAL).
1674    ///
1675    /// # Errors
1676    ///
1677    /// See [`add_bool`](Self::add_bool).
1678    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1679        if self.column_index >= self.column_count {
1680            return Err(Error::new("Too many columns in row"));
1681        }
1682        self.ensure_header();
1683        if self.current_column_nullable() {
1684            copy::write_data128(&mut self.buffer, value);
1685        } else {
1686            copy::write_data128_not_null(&mut self.buffer, value);
1687        }
1688        self.column_index += 1;
1689        Ok(())
1690    }
1691
1692    /// Adds a Date value.
1693    ///
1694    /// # Errors
1695    ///
1696    /// See [`add_bool`](Self::add_bool).
1697    pub fn add_date(&mut self, value: Date) -> Result<()> {
1698        if self.column_index >= self.column_count {
1699            return Err(Error::new("Too many columns in row"));
1700        }
1701        self.ensure_header();
1702        let julian_day = value.to_julian_day();
1703        if self.current_column_nullable() {
1704            copy::write_i32(&mut self.buffer, julian_day);
1705        } else {
1706            copy::write_i32_not_null(&mut self.buffer, julian_day);
1707        }
1708        self.column_index += 1;
1709        Ok(())
1710    }
1711
1712    /// Adds a Time value.
1713    ///
1714    /// # Errors
1715    ///
1716    /// See [`add_bool`](Self::add_bool).
1717    pub fn add_time(&mut self, value: Time) -> Result<()> {
1718        if self.column_index >= self.column_count {
1719            return Err(Error::new("Too many columns in row"));
1720        }
1721        self.ensure_header();
1722        let micros = value.to_microseconds();
1723        if self.current_column_nullable() {
1724            copy::write_i64(&mut self.buffer, micros);
1725        } else {
1726            copy::write_i64_not_null(&mut self.buffer, micros);
1727        }
1728        self.column_index += 1;
1729        Ok(())
1730    }
1731
1732    /// Adds a Timestamp value.
1733    ///
1734    /// # Errors
1735    ///
1736    /// See [`add_bool`](Self::add_bool).
1737    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1738        if self.column_index >= self.column_count {
1739            return Err(Error::new("Too many columns in row"));
1740        }
1741        self.ensure_header();
1742        let micros = value.to_microseconds();
1743        if self.current_column_nullable() {
1744            copy::write_i64(&mut self.buffer, micros);
1745        } else {
1746            copy::write_i64_not_null(&mut self.buffer, micros);
1747        }
1748        self.column_index += 1;
1749        Ok(())
1750    }
1751
1752    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1753    ///
1754    /// # Errors
1755    ///
1756    /// See [`add_bool`](Self::add_bool).
1757    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1758        if self.column_index >= self.column_count {
1759            return Err(Error::new("Too many columns in row"));
1760        }
1761        self.ensure_header();
1762        let micros = value.to_microseconds_utc();
1763        if self.current_column_nullable() {
1764            copy::write_i64(&mut self.buffer, micros);
1765        } else {
1766            copy::write_i64_not_null(&mut self.buffer, micros);
1767        }
1768        self.column_index += 1;
1769        Ok(())
1770    }
1771
1772    /// Adds an Interval value.
1773    ///
1774    /// # Errors
1775    ///
1776    /// See [`add_bool`](Self::add_bool).
1777    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1778        if self.column_index >= self.column_count {
1779            return Err(Error::new("Too many columns in row"));
1780        }
1781        self.ensure_header();
1782        let packed = value.to_packed();
1783        if self.current_column_nullable() {
1784            copy::write_data128(&mut self.buffer, &packed);
1785        } else {
1786            copy::write_data128_not_null(&mut self.buffer, &packed);
1787        }
1788        self.column_index += 1;
1789        Ok(())
1790    }
1791
1792    /// Ends the current row.
1793    ///
1794    /// Returns an error if the wrong number of columns were added.
1795    ///
1796    /// # Errors
1797    ///
1798    /// Returns [`Error::Other`] if fewer (or more) columns were supplied
1799    /// for this row than the chunk's column count.
1800    pub fn end_row(&mut self) -> Result<()> {
1801        if self.column_index != self.column_count {
1802            return Err(Error::new(format!(
1803                "Expected {} columns, got {}",
1804                self.column_count, self.column_index
1805            )));
1806        }
1807        self.column_index = 0;
1808        self.row_count += 1;
1809        Ok(())
1810    }
1811
1812    /// Returns the current column index (for checking incomplete rows).
1813    #[must_use]
1814    pub fn column_index(&self) -> usize {
1815        self.column_index
1816    }
1817
1818    /// Returns a reference to the internal buffer.
1819    pub(crate) fn buffer(&self) -> &BytesMut {
1820        &self.buffer
1821    }
1822}
1823
1824// =============================================================================
1825// ChunkSender - Mutex-protected sender for InsertChunks
1826// =============================================================================
1827
1828use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1829use std::sync::Mutex;
1830
1831/// A thread-safe sender for [`InsertChunk`]s.
1832///
1833/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1834/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1835/// the mutex ensures serialized access.
1836///
1837/// # Example
1838///
1839/// ```no_run
1840/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1841/// use std::sync::mpsc;
1842/// use std::thread;
1843///
1844/// fn main() -> Result<()> {
1845///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1846///     
1847///     let table_def = TableDefinition::new("products")
1848///         .add_required_column("id", SqlType::int())
1849///         .add_nullable_column("name", SqlType::text());
1850///     
1851///     Catalog::new(&conn).create_table(&table_def)?;
1852///     
1853///     let sender = ChunkSender::new(&conn, &table_def)?;
1854///     let (tx, rx) = mpsc::channel::<InsertChunk>();
1855///     
1856///     // Worker thread
1857///     let table_def_clone = table_def.clone();
1858///     let handle = thread::spawn(move || {
1859///         let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1860///         for i in 0..1000i32 {
1861///             chunk.add_i32(i).unwrap();
1862///             chunk.add_str(&format!("Product {}", i)).unwrap();
1863///             chunk.end_row().unwrap();
1864///         }
1865///         tx.send(chunk).unwrap();
1866///     });
1867///     
1868///     // Receive and send chunks
1869///     while let Ok(chunk) = rx.recv() {
1870///         sender.send_chunk(chunk)?;
1871///     }
1872///     
1873///     handle.join().unwrap();
1874///     let rows = sender.finish()?;
1875///     println!("Inserted {} rows", rows);
1876///     Ok(())
1877/// }
1878/// ```
1879#[derive(Debug)]
1880pub struct ChunkSender<'conn> {
1881    connection: &'conn Connection,
1882    table_name: String,
1883    columns: Vec<String>,
1884    writer: Mutex<Option<CopyInWriter<'conn>>>,
1885    header_sent: std::sync::atomic::AtomicBool,
1886    total_rows: AtomicU64,
1887    chunks_sent: AtomicUsize,
1888}
1889
1890impl<'conn> ChunkSender<'conn> {
1891    /// Creates a new chunk sender for the given table.
1892    ///
1893    /// # Errors
1894    ///
1895    /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1896    /// columns. The COPY session itself is opened lazily on the first
1897    /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1898    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1899        if table_def.column_count() == 0 {
1900            return Err(Error::InvalidTableDefinition(
1901                "Table definition must have at least one column".into(),
1902            ));
1903        }
1904
1905        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1906        let table_name = table_def.qualified_name();
1907
1908        Ok(ChunkSender {
1909            connection,
1910            table_name,
1911            columns,
1912            writer: Mutex::new(None),
1913            header_sent: std::sync::atomic::AtomicBool::new(false),
1914            total_rows: AtomicU64::new(0),
1915            chunks_sent: AtomicUsize::new(0),
1916        })
1917    }
1918
1919    /// Sends a chunk to Hyper.
1920    ///
1921    /// This method is thread-safe - multiple threads can call it concurrently,
1922    /// but only one chunk will be sent at a time.
1923    ///
1924    /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1925    /// automatically handles headers: the first chunk's header is sent, and
1926    /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1927    /// header per COPY stream).
1928    ///
1929    /// # Errors
1930    ///
1931    /// Returns an error if the chunk is empty or if sending fails.
1932    pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1933        // Capture row count before take() resets it
1934        let row_count = chunk.row_count();
1935
1936        let Some(buffer) = chunk.take() else {
1937            return Ok(());
1938        };
1939
1940        // Acquire the lock for exclusive send access
1941        let mut writer_guard = self
1942            .writer
1943            .lock()
1944            .map_err(|_| Error::new("ChunkSender mutex poisoned"))?;
1945
1946        // Lazily initialize the COPY connection
1947        if writer_guard.is_none() {
1948            let client = self.connection.tcp_client().ok_or_else(|| {
1949                Error::new("ChunkSender requires a TCP connection. gRPC connections do not support COPY operations.")
1950            })?;
1951            let columns: Vec<&str> = self
1952                .columns
1953                .iter()
1954                .map(std::string::String::as_str)
1955                .collect();
1956            *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
1957        }
1958
1959        // Handle headers: only first chunk should have header in the COPY stream
1960        // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
1961        // strip headers from all chunks except the first one sent.
1962        let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
1963
1964        let data_to_send = if is_first {
1965            // First chunk: send with header
1966            &buffer[..]
1967        } else {
1968            // Subsequent chunks: strip the 19-byte header if present
1969            if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
1970                && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
1971            {
1972                &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
1973            } else {
1974                &buffer[..]
1975            }
1976        };
1977
1978        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
1979        // into the connection's write buffer. flush_stream ensures the data
1980        // reaches the server before we return.
1981        if let Some(ref mut writer) = *writer_guard {
1982            writer.send_direct(data_to_send)?;
1983            writer.flush_stream()?;
1984        }
1985
1986        // Update counters (lock already released for these atomic ops)
1987        drop(writer_guard);
1988        self.total_rows
1989            .fetch_add(row_count as u64, Ordering::Relaxed);
1990        self.chunks_sent.fetch_add(1, Ordering::Relaxed);
1991
1992        debug!(
1993            target: "hyperdb_api",
1994            chunk = self.chunks_sent.load(Ordering::Relaxed),
1995            rows = row_count,
1996            bytes = data_to_send.len(),
1997            "chunk-sender"
1998        );
1999
2000        Ok(())
2001    }
2002
2003    /// Returns the total number of rows sent so far.
2004    pub fn total_rows(&self) -> u64 {
2005        self.total_rows.load(Ordering::Relaxed)
2006    }
2007
2008    /// Returns the number of chunks sent so far.
2009    pub fn chunks_sent(&self) -> usize {
2010        self.chunks_sent.load(Ordering::Relaxed)
2011    }
2012
2013    /// Finishes the COPY operation and returns the total row count.
2014    ///
2015    /// This method consumes the sender. After calling this, the COPY operation
2016    /// is complete and all data has been committed.
2017    ///
2018    /// # Errors
2019    ///
2020    /// - Returns [`Error::Other`] with message `"ChunkSender mutex poisoned"`
2021    ///   if a sender thread panicked while holding the writer lock.
2022    /// - Returns [`Error::Client`] or [`Error::Io`] if sending the COPY
2023    ///   trailer or finishing the COPY operation fails.
2024    pub fn finish(self) -> Result<u64> {
2025        let mut writer_guard = self
2026            .writer
2027            .lock()
2028            .map_err(|_| Error::new("ChunkSender mutex poisoned"))?;
2029
2030        // If no chunks were sent, return 0
2031        let Some(writer) = writer_guard.take() else {
2032            return Ok(0);
2033        };
2034
2035        // Write and send the COPY trailer
2036        let mut trailer_buf = BytesMut::with_capacity(2);
2037        copy::write_trailer(&mut trailer_buf);
2038
2039        // Need to get mutable access to send trailer
2040        let mut writer = writer;
2041        writer.send(&trailer_buf)?;
2042
2043        // Finish the COPY operation
2044        let rows = writer.finish()?;
2045
2046        info!(
2047            target: "hyperdb_api",
2048            rows,
2049            chunks = self.chunks_sent.load(Ordering::Relaxed),
2050            table = %self.table_name,
2051            "chunk-sender-finish"
2052        );
2053
2054        Ok(rows)
2055    }
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060    use crate::table_definition::TableDefinition;
2061    use hyperdb_api_core::types::SqlType;
2062
2063    use super::InsertChunk;
2064
2065    fn create_test_table_def() -> TableDefinition {
2066        TableDefinition::new("test")
2067            .add_required_column("id", SqlType::int())
2068            .add_nullable_column("name", SqlType::text())
2069    }
2070
2071    #[test]
2072    fn test_inserter_column_validation() {
2073        // We can't fully test without a connection, but we can test validation logic
2074        let table_def = create_test_table_def();
2075        assert_eq!(table_def.column_count(), 2);
2076    }
2077
2078    #[test]
2079    fn test_insert_chunk_encoding() {
2080        let table_def = create_test_table_def();
2081        let mut chunk = InsertChunk::from_table_definition(&table_def);
2082
2083        // Add a row
2084        chunk.add_i32(42).unwrap();
2085        chunk.add_str("hello").unwrap();
2086        chunk.end_row().unwrap();
2087
2088        assert_eq!(chunk.row_count(), 1);
2089        assert!(!chunk.is_empty());
2090        assert!(!chunk.should_flush()); // Not at limit yet
2091
2092        // Add more rows
2093        for i in 0..100 {
2094            chunk.add_i32(i).unwrap();
2095            chunk.add_str(&format!("item {i}")).unwrap();
2096            chunk.end_row().unwrap();
2097        }
2098
2099        assert_eq!(chunk.row_count(), 101);
2100
2101        // Take the buffer
2102        let buffer = chunk.take().unwrap();
2103        assert!(!buffer.is_empty());
2104
2105        // Chunk should now be empty after take
2106        assert!(chunk.take().is_none());
2107    }
2108
2109    #[test]
2110    fn test_insert_chunk_null_handling() {
2111        let table_def = create_test_table_def();
2112        let mut chunk = InsertChunk::from_table_definition(&table_def);
2113
2114        // First column is NOT NULL, should fail
2115        assert!(chunk.add_null().is_err());
2116
2117        // Add the required column first
2118        chunk.add_i32(1).unwrap();
2119
2120        // Second column is nullable, should succeed
2121        chunk.add_null().unwrap();
2122        chunk.end_row().unwrap();
2123
2124        assert_eq!(chunk.row_count(), 1);
2125    }
2126
2127    #[test]
2128    fn test_insert_chunk_column_count_validation() {
2129        let table_def = create_test_table_def();
2130        let mut chunk = InsertChunk::from_table_definition(&table_def);
2131
2132        // Add only one column
2133        chunk.add_i32(1).unwrap();
2134
2135        // end_row should fail
2136        assert!(chunk.end_row().is_err());
2137
2138        // Add second column
2139        chunk.add_str("test").unwrap();
2140
2141        // Now end_row should succeed
2142        chunk.end_row().unwrap();
2143    }
2144
2145    #[test]
2146    fn test_insert_chunk_too_many_columns() {
2147        let table_def = create_test_table_def();
2148        let mut chunk = InsertChunk::from_table_definition(&table_def);
2149
2150        chunk.add_i32(1).unwrap();
2151        chunk.add_str("test").unwrap();
2152
2153        // Third column should fail
2154        assert!(chunk.add_i32(2).is_err());
2155    }
2156
2157    #[test]
2158    fn test_insert_chunk_clear() {
2159        let table_def = create_test_table_def();
2160        let mut chunk = InsertChunk::from_table_definition(&table_def);
2161
2162        chunk.add_i32(1).unwrap();
2163        chunk.add_str("test").unwrap();
2164        chunk.end_row().unwrap();
2165
2166        assert_eq!(chunk.row_count(), 1);
2167
2168        chunk.clear();
2169
2170        assert_eq!(chunk.row_count(), 0);
2171        assert!(chunk.is_empty());
2172    }
2173}