Skip to main content

hyperdb_api/
inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//!     inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
29use tracing::{debug, info};
30
31use crate::catalog::Catalog;
32use crate::connection::Connection;
33use crate::error::{Error, Result};
34use crate::table_definition::TableDefinition;
35
36/// Initial buffer size (4 MB) to reduce early reallocations.
37///
38/// The COPY protocol sends data in chunks, and each chunk requires a
39/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
40/// during the first chunk for typical workloads while keeping initial
41/// memory allocation reasonable.
42const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
43
44/// Maximum buffer size per chunk before flushing to the server (16 MB).
45///
46/// This balances two competing concerns:
47/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
48///   network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
49/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
50///   keeps resident memory bounded even when rows are wide.
51///
52/// The 16 MB value was chosen empirically — it lands on the flat part of
53/// the throughput curve where further increases yield diminishing returns.
54const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
55
56/// Maximum rows per chunk before flushing to the server.
57///
58/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
59/// narrow rows (few bytes each), the byte limit alone would accumulate
60/// millions of rows before flushing, which delays server-side processing.
61/// 64K rows ensures timely flushes regardless of row width and aligns with
62/// the 64K chunk size used for query result streaming
63/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
64const CHUNK_ROW_LIMIT: usize = 64_000;
65
66/// A high-performance bulk data inserter.
67///
68/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
69/// using the COPY protocol with `HyperBinary` format for optimal performance.
70///
71/// # Example
72///
73/// ```no_run
74/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
75///
76/// fn main() -> Result<()> {
77///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
78///
79///     let table_def = TableDefinition::new("users")
80///         .add_required_column("id", SqlType::int())
81///         .add_nullable_column("name", SqlType::text());
82///
83///     Catalog::new(&conn).create_table(&table_def)?;
84///
85///     let mut inserter = Inserter::new(&conn, &table_def)?;
86///
87///     for i in 0..1000i32 {
88///         inserter.add_row(&[&i, &format!("User {}", i)])?;
89///     }
90///
91///     let rows = inserter.execute()?;
92///     println!("Inserted {} rows", rows);
93///     Ok(())
94/// }
95/// ```
96#[derive(Debug)]
97pub struct Inserter<'conn> {
98    connection: &'conn Connection,
99    table_def: TableDefinition,
100    /// The current chunk being populated (delegates encoding).
101    chunk: InsertChunk,
102    /// Total rows inserted across all chunks.
103    row_count: u64,
104    /// Number of chunks sent.
105    chunk_count: usize,
106    /// Active COPY writer (lazily initialized on first write).
107    writer: Option<CopyInWriter<'conn>>,
108    /// Start time for timing the insert operation.
109    start_time: Instant,
110}
111
112impl<'conn> Inserter<'conn> {
113    /// Creates a new inserter for the given table.
114    ///
115    /// The underlying COPY session is started lazily on the first flush or
116    /// execute, so construction is lightweight. However, the connection's
117    /// transport is validated eagerly — using a gRPC connection will return
118    /// an error immediately.
119    ///
120    /// # Errors
121    ///
122    /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
123    ///   columns.
124    /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
125    ///   (COPY is TCP-only).
126    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
127        if table_def.column_count() == 0 {
128            return Err(Error::invalid_table_definition(
129                "Table definition must have at least one column",
130            ));
131        }
132
133        // Fail fast: verify the connection supports COPY (TCP only)
134        if connection.tcp_client().is_none() {
135            return Err(Error::feature_not_supported(
136                "Inserter requires a TCP connection. \
137                 gRPC connections do not support COPY operations.",
138            ));
139        }
140
141        Ok(Inserter {
142            connection,
143            table_def: table_def.clone(),
144            chunk: InsertChunk::from_table_definition(table_def),
145            row_count: 0,
146            chunk_count: 0,
147            writer: None,
148            start_time: Instant::now(),
149        })
150    }
151
152    /// Creates an inserter by querying the table schema from the database.
153    ///
154    /// This method queries the database to get the table definition automatically,
155    /// which is useful when you want to insert into an existing table without
156    /// manually specifying the schema.
157    ///
158    /// # Arguments
159    ///
160    /// * `connection` - The database connection.
161    /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
166    ///
167    /// # Example
168    ///
169    /// ```no_run
170    /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
171    ///
172    /// fn main() -> Result<()> {
173    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
174    ///
175    ///     // Create a table first
176    ///     conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
177    ///
178    ///     // Create inserter by querying the schema directly from a string
179    ///     let mut inserter = Inserter::from_table(&conn, "public.products")?;
180    ///
181    ///     // Now we can insert data without knowing the exact schema
182    ///     inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
183    ///     inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
184    ///
185    ///     let rows = inserter.execute()?;
186    ///     println!("Inserted {} rows", rows);
187    ///     Ok(())
188    /// }
189    /// ```
190    pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
191    where
192        T: TryInto<crate::TableName>,
193        crate::Error: From<T::Error>,
194    {
195        let catalog = Catalog::new(connection);
196        let table_def = catalog.get_table_definition(table_name)?;
197        Self::new(connection, &table_def)
198    }
199
200    /// Creates an inserter with column mappings that allow SQL expressions.
201    ///
202    /// This method uses a temporary table and INSERT...SELECT to support
203    /// column mappings with SQL expressions. Data is first inserted into
204    /// a temporary staging table, then transformed using the mappings.
205    ///
206    /// # Arguments
207    ///
208    /// * `connection` - The database connection.
209    /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
210    /// * `target_table` - The qualified name of the target table to insert into.
211    ///   Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
212    /// * `mappings` - Column mappings defining how values are transformed.
213    ///
214    /// # Example
215    ///
216    /// ```no_run
217    /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
218    ///
219    /// fn main() -> Result<()> {
220    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
221    ///
222    ///     // Target table with computed columns
223    ///     conn.execute_command(r#"
224    ///         CREATE TABLE orders (
225    ///             id INT NOT NULL,
226    ///             product TEXT,
227    ///             quantity INT,
228    ///             price DOUBLE PRECISION,
229    ///             total DOUBLE PRECISION,
230    ///             created_at TIMESTAMP
231    ///         )
232    ///     "#)?;
233    ///
234    ///     // Inserter definition - what we provide
235    ///     let inserter_def = TableDefinition::new("_stage")
236    ///         .add_required_column("id", SqlType::int())
237    ///         .add_nullable_column("product", SqlType::text())
238    ///         .add_nullable_column("quantity", SqlType::int())
239    ///         .add_nullable_column("price", SqlType::double());
240    ///
241    ///     // Column mappings - how values are transformed
242    ///     let mappings = vec![
243    ///         ColumnMapping::new("id"),
244    ///         ColumnMapping::new("product"),
245    ///         ColumnMapping::new("quantity"),
246    ///         ColumnMapping::new("price"),
247    ///         ColumnMapping::with_expression("total", "quantity * price"),
248    ///         ColumnMapping::with_expression("created_at", "NOW()"),
249    ///     ];
250    ///
251    ///     // For simple table names in the public schema, use quoted name
252    ///     // For qualified names, use target_table_def.qualified_name()
253    ///     let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
254    ///
255    ///     inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
256    ///     inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
257    ///
258    ///     let rows = inserter.execute()?;
259    ///     Ok(())
260    /// }
261    /// ```
262    ///
263    /// # Errors
264    ///
265    /// - Returns an error if `target_table` fails to convert into a
266    ///   [`TableName`](crate::TableName).
267    /// - Returns [`Error::Server`] if creating the temporary staging table
268    ///   fails on the server.
269    /// - Returns the errors from [`Inserter::new`] for the staging table
270    ///   (zero-column table definition, gRPC transport).
271    pub fn with_column_mappings<T>(
272        connection: &'conn Connection,
273        inserter_def: &TableDefinition,
274        target_table: T,
275        mappings: &[ColumnMapping],
276    ) -> Result<MappedInserter<'conn>>
277    where
278        T: TryInto<crate::TableName>,
279        crate::Error: From<T::Error>,
280    {
281        MappedInserter::new(connection, inserter_def, target_table, mappings)
282    }
283
284    /// Returns the table definition.
285    pub fn table_definition(&self) -> &TableDefinition {
286        &self.table_def
287    }
288
289    /// Returns the number of columns.
290    #[must_use]
291    pub fn column_count(&self) -> usize {
292        self.table_def.column_count()
293    }
294
295    /// Returns the number of complete rows buffered.
296    #[must_use]
297    pub fn row_count(&self) -> u64 {
298        self.row_count
299    }
300
301    /// Adds a NULL value for the current column.
302    ///
303    /// # Errors
304    ///
305    /// Returns [`Error::InvalidTableDefinition`] if the current row already has all columns
306    /// supplied, or if the current column is marked `NOT NULL` in the table
307    /// definition.
308    #[inline]
309    pub fn add_null(&mut self) -> Result<()> {
310        self.chunk.add_null()
311    }
312
313    /// Adds a boolean value.
314    ///
315    /// # Errors
316    ///
317    /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
318    /// the current row already has all columns supplied.
319    #[inline]
320    pub fn add_bool(&mut self, value: bool) -> Result<()> {
321        self.chunk.add_bool(value)
322    }
323
324    /// Adds an i16 value (SMALLINT).
325    ///
326    /// # Errors
327    ///
328    /// See [`add_bool`](Self::add_bool).
329    #[inline]
330    pub fn add_i16(&mut self, value: i16) -> Result<()> {
331        self.chunk.add_i16(value)
332    }
333
334    /// Adds an i32 value (INT).
335    ///
336    /// # Errors
337    ///
338    /// See [`add_bool`](Self::add_bool).
339    #[inline]
340    pub fn add_i32(&mut self, value: i32) -> Result<()> {
341        self.chunk.add_i32(value)
342    }
343
344    /// Adds an i64 value (BIGINT).
345    ///
346    /// # Errors
347    ///
348    /// See [`add_bool`](Self::add_bool).
349    #[inline]
350    pub fn add_i64(&mut self, value: i64) -> Result<()> {
351        self.chunk.add_i64(value)
352    }
353
354    /// Adds an f32 value (REAL/FLOAT4).
355    ///
356    /// # Errors
357    ///
358    /// See [`add_bool`](Self::add_bool).
359    #[inline]
360    pub fn add_f32(&mut self, value: f32) -> Result<()> {
361        self.chunk.add_f32(value)
362    }
363
364    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
365    ///
366    /// # Errors
367    ///
368    /// See [`add_bool`](Self::add_bool).
369    #[inline]
370    pub fn add_f64(&mut self, value: f64) -> Result<()> {
371        self.chunk.add_f64(value)
372    }
373
374    /// Adds a string value (TEXT/VARCHAR).
375    ///
376    /// # Errors
377    ///
378    /// See [`add_bool`](Self::add_bool).
379    #[inline]
380    pub fn add_str(&mut self, value: &str) -> Result<()> {
381        self.chunk.add_str(value)
382    }
383
384    /// Adds a bytes value (BYTEA).
385    ///
386    /// # Errors
387    ///
388    /// See [`add_bool`](Self::add_bool).
389    #[inline]
390    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
391        self.chunk.add_bytes(value)
392    }
393
394    /// Adds a 128-bit value (NUMERIC/INTERVAL).
395    ///
396    /// # Errors
397    ///
398    /// See [`add_bool`](Self::add_bool).
399    #[inline]
400    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
401        self.chunk.add_data128(value)
402    }
403
404    /// Adds an optional value. If None, adds NULL.
405    ///
406    /// # Errors
407    ///
408    /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
409    /// return for the current row position.
410    pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
411    where
412        F: FnOnce(&mut Self, T) -> Result<()>,
413    {
414        match value {
415            Some(v) => add_fn(self, v),
416            None => self.add_null(),
417        }
418    }
419
420    /// Ends the current row.
421    ///
422    /// Returns an error if the wrong number of columns were added.
423    /// Automatically flushes the buffer if chunk limits are reached.
424    ///
425    /// # Errors
426    ///
427    /// - Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
428    ///   than the table definition requires.
429    /// - Returns any error from [`flush`](Self::flush) when an automatic
430    ///   flush is triggered by reaching the chunk byte/row limit.
431    pub fn end_row(&mut self) -> Result<()> {
432        self.chunk.end_row()?;
433        self.row_count += 1;
434
435        // Auto-flush if we've reached chunk limits
436        if self.chunk.should_flush() {
437            self.flush()?;
438        }
439
440        Ok(())
441    }
442
443    /// Flushes the current buffer to the server.
444    ///
445    /// This sends all buffered rows as a chunk and resets the buffer.
446    /// Called automatically when chunk limits are reached.
447    ///
448    /// # Errors
449    ///
450    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
451    ///   (COPY is TCP-only) and no COPY session exists yet.
452    /// - Returns [`Error::Server`] if the server rejects the `COPY IN` start
453    ///   or the subsequent data send.
454    /// - Returns [`Error::Io`] on transport-level I/O failures while writing
455    ///   the chunk.
456    pub fn flush(&mut self) -> Result<()> {
457        if self.chunk.is_empty() {
458            return Ok(());
459        }
460
461        let chunk_rows = self.chunk.row_count();
462        let Some(buffer) = self.chunk.take() else {
463            return Ok(());
464        };
465
466        // Ensure the COPY connection is started
467        if self.writer.is_none() {
468            let client = self.connection.tcp_client().ok_or_else(|| {
469                crate::Error::feature_not_supported(
470                    "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
471                )
472            })?;
473            let columns: Vec<&str> = self
474                .table_def
475                .columns
476                .iter()
477                .map(|c| c.name.as_str())
478                .collect();
479            let table_name = self.table_def.qualified_name();
480            self.writer = Some(client.copy_in(&table_name, &columns)?);
481        }
482
483        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
484        // into the connection's write buffer. flush_stream ensures the data
485        // reaches the server before we return.
486        if let Some(ref mut writer) = self.writer {
487            writer.send_direct(&buffer)?;
488            writer.flush_stream()?;
489        }
490
491        debug!(
492            target: "hyperdb_api",
493            chunk = self.chunk_count,
494            rows = chunk_rows,
495            bytes = buffer.len(),
496            "inserter-chunk"
497        );
498
499        self.chunk_count += 1;
500        Ok(())
501    }
502
503    /// Adds a complete row of values.
504    ///
505    /// This is a convenience method that adds all column values at once
506    /// using the `IntoValue` trait for type-safe insertion.
507    ///
508    /// # Arguments
509    ///
510    /// * `values` - A slice of values implementing `IntoValue`.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the number of values doesn't match the column count,
515    /// or if any value cannot be added.
516    ///
517    /// # Example
518    ///
519    /// ```no_run
520    /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
521    ///
522    /// fn main() -> Result<()> {
523    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
524    ///
525    ///     let table_def = TableDefinition::new("users")
526    ///         .add_required_column("id", SqlType::int())
527    ///         .add_nullable_column("name", SqlType::text());
528    ///
529    ///     Catalog::new(&conn).create_table(&table_def)?;
530    ///
531    ///     let mut inserter = Inserter::new(&conn, &table_def)?;
532    ///
533    ///     // Add rows using IntoValue trait
534    ///     inserter.add_row(&[&1i32, &"Alice"])?;
535    ///     inserter.add_row(&[&2i32, &"Bob"])?;
536    ///
537    ///     // Option<T> can be used for nullable columns
538    ///     inserter.add_row(&[&3i32, &None::<&str>])?;
539    ///
540    ///     let rows = inserter.execute()?;
541    ///     Ok(())
542    /// }
543    /// ```
544    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
545        let column_count = self.table_def.column_count();
546        if values.len() != column_count {
547            return Err(Error::invalid_table_definition(format!(
548                "Column count mismatch: expected {} columns but got {}",
549                column_count,
550                values.len()
551            )));
552        }
553
554        for value in values {
555            value.add_to_inserter(self)?;
556        }
557
558        self.end_row()?;
559        Ok(())
560    }
561
562    /// Adds a Date value.
563    ///
564    /// # Errors
565    ///
566    /// See [`add_bool`](Self::add_bool).
567    #[inline]
568    pub fn add_date(&mut self, value: Date) -> Result<()> {
569        self.chunk.add_date(value)
570    }
571
572    /// Adds a Time value.
573    ///
574    /// # Errors
575    ///
576    /// See [`add_bool`](Self::add_bool).
577    #[inline]
578    pub fn add_time(&mut self, value: Time) -> Result<()> {
579        self.chunk.add_time(value)
580    }
581
582    /// Adds a Timestamp value.
583    ///
584    /// # Errors
585    ///
586    /// See [`add_bool`](Self::add_bool).
587    #[inline]
588    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
589        self.chunk.add_timestamp(value)
590    }
591
592    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
593    ///
594    /// # Errors
595    ///
596    /// See [`add_bool`](Self::add_bool).
597    #[inline]
598    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
599        self.chunk.add_offset_timestamp(value)
600    }
601
602    /// Adds an Interval value.
603    ///
604    /// # Errors
605    ///
606    /// See [`add_bool`](Self::add_bool).
607    #[inline]
608    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
609        self.chunk.add_interval(value)
610    }
611
612    /// Adds a Numeric value.
613    ///
614    /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
615    /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
616    ///
617    /// # Errors
618    ///
619    /// Returns an error if the column's precision cannot be determined from the
620    /// table definition. Ensure that NUMERIC columns are defined with explicit
621    /// `SqlType` information including precision.
622    pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
623        let column_index = self.chunk.column_index();
624
625        // Check the column's precision to determine storage format
626        let precision = self
627            .table_def
628            .columns
629            .get(column_index)
630            .and_then(super::table_definition::ColumnDefinition::sql_type)
631            .and_then(|t| t.precision())
632            .ok_or_else(|| {
633                let col_name = self
634                    .table_def
635                    .columns
636                    .get(column_index)
637                    .map_or("<unknown>", |c| c.name.as_str());
638                Error::conversion(format!(
639                    "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
640                     Ensure the column is defined with explicit SqlType including precision.\n\n\
641                     Example fix:\n  \
642                     table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
643                ))
644            })?;
645
646        if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
647            // Small numeric: stored as i64
648            let unscaled = value.unscaled_value();
649            let narrowed = i64::try_from(unscaled).map_err(|_| {
650                Error::conversion(format!(
651                    "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
652                ))
653            })?;
654            self.chunk.add_i64(narrowed)
655        } else {
656            // Big numeric: stored as 128-bit
657            self.chunk.add_data128(&value.to_packed())
658        }
659    }
660
661    /// Executes the insert and commits all buffered rows.
662    ///
663    /// This sends any remaining buffered data and finishes the COPY operation.
664    /// Returns the number of rows inserted.
665    ///
666    /// The inserter is single-use: calling `execute` a second time returns
667    /// `Ok(0)` because the internal row counter has been reset and no further
668    /// data has been added. To insert additional batches, create a new
669    /// [`Inserter`].
670    ///
671    /// # Errors
672    ///
673    /// Returns an error if:
674    /// - There's an incomplete row (`column_index` != 0)
675    /// - The COPY connection fails to start
676    /// - Sending data fails
677    pub fn execute(&mut self) -> Result<u64> {
678        if self.chunk.column_index() != 0 {
679            return Err(Error::invalid_table_definition(
680                "Incomplete row at execute time",
681            ));
682        }
683
684        if self.row_count == 0 {
685            return Ok(0);
686        }
687
688        // Ensure COPY connection exists before proceeding when we have rows
689        if self.writer.is_none() {
690            let client = self.connection.tcp_client().ok_or_else(|| {
691                Error::feature_not_supported(
692                    "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
693                )
694            })?;
695            let columns: Vec<&str> = self
696                .table_def
697                .columns
698                .iter()
699                .map(|c| c.name.as_str())
700                .collect();
701            let table_name = self.table_def.qualified_name();
702            self.writer = Some(client.copy_in(&table_name, &columns)?);
703        }
704
705        // At this point, writer must exist since we have rows
706        let writer = self
707            .writer
708            .as_mut()
709            .ok_or_else(|| Error::internal("Failed to initialize COPY connection for inserter"))?;
710
711        // If we have buffered data that hasn't been sent yet
712        if !self.chunk.is_empty() {
713            writer.send(self.chunk.buffer())?;
714        }
715
716        // Write and send the COPY trailer
717        let mut trailer_buf = BytesMut::with_capacity(2);
718        copy::write_trailer(&mut trailer_buf);
719        writer.send(&trailer_buf)?;
720
721        // Finish the COPY operation
722        let rows = self
723            .writer
724            .take()
725            .map(hyperdb_api_core::client::CopyInWriter::finish)
726            .transpose()?
727            .unwrap_or(0);
728
729        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
730        info!(
731            target: "hyperdb_api",
732            rows,
733            chunks = self.chunk_count,
734            duration_ms,
735            table = %self.table_def.qualified_name(),
736            "inserter-end"
737        );
738
739        // Reset row counter so a stray second execute() call returns Ok(0)
740        // instead of attempting another COPY trailer on a finished writer.
741        self.row_count = 0;
742
743        Ok(rows)
744    }
745
746    /// Cancels the insert and discards all buffered rows.
747    pub fn cancel(&mut self) {
748        // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
749        // sends a CopyFail to the server.
750        self.writer = None;
751        self.row_count = 0;
752    }
753}
754
755// =============================================================================
756// ColumnMapping
757// =============================================================================
758
759/// Defines how a column receives its value during insertion.
760///
761/// Column mappings allow you to:
762/// - Insert values directly from the inserter stream
763/// - Compute values using SQL expressions
764/// - Use server-side functions like `NOW()` or `DEFAULT`
765///
766/// # Example
767///
768/// ```
769/// use hyperdb_api::ColumnMapping;
770///
771/// // Simple column - insert value directly
772/// let id_col = ColumnMapping::new("id");
773///
774/// // Column with expression - computed value
775/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
776/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
777/// ```
778#[derive(Debug, Clone)]
779#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
780pub struct ColumnMapping {
781    /// The name of the target column.
782    pub column_name: String,
783    /// Optional SQL expression. If None, the value is inserted directly.
784    pub expression: Option<String>,
785}
786
787impl ColumnMapping {
788    /// Creates a column mapping for direct value insertion.
789    ///
790    /// The column will receive values directly from the inserter.
791    pub fn new(column_name: impl Into<String>) -> Self {
792        ColumnMapping {
793            column_name: column_name.into(),
794            expression: None,
795        }
796    }
797
798    /// Creates a column mapping with a SQL expression.
799    ///
800    /// The column value will be computed using the given SQL expression.
801    /// The expression can reference other columns or use SQL functions.
802    ///
803    /// # Arguments
804    ///
805    /// * `column_name` - The name of the target column.
806    /// * `expression` - A SQL expression to compute the column value.
807    ///
808    /// # Example
809    ///
810    /// ```
811    /// use hyperdb_api::ColumnMapping;
812    ///
813    /// // Use current timestamp
814    /// let created = ColumnMapping::with_expression("created_at", "NOW()");
815    ///
816    /// // Compute from other columns
817    /// let total = ColumnMapping::with_expression("total", "quantity * price");
818    /// ```
819    pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
820        ColumnMapping {
821            column_name: column_name.into(),
822            expression: Some(expression.into()),
823        }
824    }
825
826    /// Returns the column name.
827    #[must_use]
828    pub fn column_name(&self) -> &str {
829        &self.column_name
830    }
831
832    /// Returns the SQL expression, if any.
833    #[must_use]
834    pub fn expression(&self) -> Option<&str> {
835        self.expression.as_deref()
836    }
837
838    /// Returns true if this is a direct value mapping (no expression).
839    #[must_use]
840    pub fn is_direct(&self) -> bool {
841        self.expression.is_none()
842    }
843
844    /// Returns the select list item for this mapping.
845    fn to_select_item(&self) -> String {
846        match &self.expression {
847            Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
848            None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
849        }
850    }
851}
852
853// =============================================================================
854// IntoValue Trait
855// =============================================================================
856
857/// Trait for types that can be inserted into a Hyper table.
858///
859/// This trait is implemented for common Rust types, allowing them to be
860/// used with [`Inserter::add_row()`] for type-safe insertion.
861///
862/// # Supported Types
863///
864/// - Integers: `i16`, `i32`, `i64`
865/// - Floats: `f32`, `f64`
866/// - `bool`
867/// - `&str`, `String`
868/// - `Option<T>` where `T: IntoValue` (for nullable columns)
869/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
870/// - `Numeric`, `Geography`, `Oid`, `Vec<u8>` (bytes)
871///
872/// # Example
873///
874/// ```no_run
875/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
876///
877/// fn main() -> Result<()> {
878///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
879///
880///     let table_def = TableDefinition::new("example")
881///         .add_required_column("a", SqlType::int())
882///         .add_nullable_column("b", SqlType::text())
883///         .add_nullable_column("c", SqlType::double());
884///     Catalog::new(&conn).create_table(&table_def)?;
885///
886///     let mut inserter = Inserter::new(&conn, &table_def)?;
887///
888///     // IntoValue allows adding rows with mixed types
889///     inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
890///     inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
891///
892///     inserter.execute()?;
893///     Ok(())
894/// }
895/// ```
896pub trait IntoValue {
897    /// Adds this value to the inserter.
898    ///
899    /// # Errors
900    ///
901    /// Implementations call the matching `Inserter::add_*` method and
902    /// forward its error — see [`Inserter::add_bool`] for the shared
903    /// failure modes (too many columns, NULL into non-nullable, etc).
904    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
905}
906
907// Implementations for basic types
908
909impl IntoValue for bool {
910    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
911        inserter.add_bool(*self)
912    }
913}
914
915impl IntoValue for i16 {
916    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
917        inserter.add_i16(*self)
918    }
919}
920
921impl IntoValue for i32 {
922    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923        inserter.add_i32(*self)
924    }
925}
926
927impl IntoValue for i64 {
928    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929        inserter.add_i64(*self)
930    }
931}
932
933impl IntoValue for f32 {
934    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935        inserter.add_f32(*self)
936    }
937}
938
939impl IntoValue for f64 {
940    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941        inserter.add_f64(*self)
942    }
943}
944
945impl IntoValue for str {
946    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947        inserter.add_str(self)
948    }
949}
950
951impl IntoValue for String {
952    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953        inserter.add_str(self)
954    }
955}
956
957impl IntoValue for [u8] {
958    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959        inserter.add_bytes(self)
960    }
961}
962
963impl IntoValue for Vec<u8> {
964    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
965        inserter.add_bytes(self)
966    }
967}
968
969// Hyper-specific types
970
971impl IntoValue for Date {
972    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
973        inserter.add_date(*self)
974    }
975}
976
977impl IntoValue for Time {
978    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
979        inserter.add_time(*self)
980    }
981}
982
983impl IntoValue for Timestamp {
984    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985        inserter.add_timestamp(*self)
986    }
987}
988
989impl IntoValue for OffsetTimestamp {
990    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991        inserter.add_offset_timestamp(*self)
992    }
993}
994
995impl IntoValue for Interval {
996    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997        inserter.add_interval(*self)
998    }
999}
1000
1001impl IntoValue for Numeric {
1002    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1003        inserter.add_numeric(*self)
1004    }
1005}
1006
1007// Option<T> for nullable values
1008impl<T: IntoValue> IntoValue for Option<T> {
1009    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1010        match self {
1011            Some(value) => value.add_to_inserter(inserter),
1012            None => inserter.add_null(),
1013        }
1014    }
1015}
1016
1017// Reference implementations for primitives
1018
1019impl IntoValue for &bool {
1020    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021        inserter.add_bool(**self)
1022    }
1023}
1024
1025impl IntoValue for &i16 {
1026    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1027        inserter.add_i16(**self)
1028    }
1029}
1030
1031impl IntoValue for &i32 {
1032    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1033        inserter.add_i32(**self)
1034    }
1035}
1036
1037impl IntoValue for &i64 {
1038    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039        inserter.add_i64(**self)
1040    }
1041}
1042
1043impl IntoValue for &f32 {
1044    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045        inserter.add_f32(**self)
1046    }
1047}
1048
1049impl IntoValue for &f64 {
1050    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051        inserter.add_f64(**self)
1052    }
1053}
1054
1055impl IntoValue for &String {
1056    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057        inserter.add_str(self)
1058    }
1059}
1060
1061impl IntoValue for &str {
1062    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063        inserter.add_str(self)
1064    }
1065}
1066
1067impl IntoValue for &&str {
1068    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069        inserter.add_str(self)
1070    }
1071}
1072
1073impl IntoValue for &[u8] {
1074    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075        inserter.add_bytes(self)
1076    }
1077}
1078
1079impl IntoValue for &Date {
1080    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081        inserter.add_date(**self)
1082    }
1083}
1084
1085impl IntoValue for &Time {
1086    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087        inserter.add_time(**self)
1088    }
1089}
1090
1091impl IntoValue for &Timestamp {
1092    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093        inserter.add_timestamp(**self)
1094    }
1095}
1096
1097impl IntoValue for &OffsetTimestamp {
1098    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099        inserter.add_offset_timestamp(**self)
1100    }
1101}
1102
1103impl IntoValue for &Interval {
1104    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105        inserter.add_interval(**self)
1106    }
1107}
1108
1109impl IntoValue for &Numeric {
1110    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1111        inserter.add_numeric(**self)
1112    }
1113}
1114
1115// =============================================================================
1116// MappedInserter
1117// =============================================================================
1118
1119/// An inserter that supports SQL expression mappings.
1120///
1121/// This inserter uses a staging table to support computed columns via
1122/// INSERT...SELECT with SQL expressions. It's created by
1123/// [`Inserter::with_column_mappings`].
1124#[derive(Debug)]
1125pub struct MappedInserter<'conn> {
1126    /// The underlying inserter for the staging table.
1127    inner: Inserter<'conn>,
1128    /// The target table name.
1129    target_table: crate::TableName,
1130    /// The column mappings.
1131    mappings: Vec<ColumnMapping>,
1132    /// The staging table name.
1133    staging_table: String,
1134}
1135
1136impl<'conn> MappedInserter<'conn> {
1137    /// Creates a new mapped inserter.
1138    fn new<T>(
1139        connection: &'conn Connection,
1140        inserter_def: &TableDefinition,
1141        target_table: T,
1142        mappings: &[ColumnMapping],
1143    ) -> Result<Self>
1144    where
1145        T: TryInto<crate::TableName>,
1146        crate::Error: From<T::Error>,
1147    {
1148        let target_table = target_table.try_into()?;
1149
1150        // Create a unique staging table name
1151        let staging_table = format!("_hyper_staging_{}", std::process::id());
1152
1153        // Create the staging table definition (temporary)
1154        let mut staging_def = inserter_def.clone();
1155        staging_def.name.clone_from(&staging_table);
1156
1157        // Create the staging table
1158        let create_sql = staging_def.to_create_sql(true)?;
1159        let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1160        connection.execute_command(&create_temp)?;
1161
1162        // Create the inner inserter for the staging table
1163        let inner = Inserter::new(connection, &staging_def)?;
1164
1165        Ok(MappedInserter {
1166            inner,
1167            target_table,
1168            mappings: mappings.to_vec(),
1169            staging_table,
1170        })
1171    }
1172
1173    /// Adds a row of values to the inserter.
1174    ///
1175    /// The values should correspond to the columns in the inserter definition,
1176    /// not the target table.
1177    ///
1178    /// # Errors
1179    ///
1180    /// Forwards the error from [`Inserter::add_row`].
1181    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1182        self.inner.add_row(values)
1183    }
1184
1185    /// Adds a NULL value.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Forwards the error from [`Inserter::add_null`].
1190    pub fn add_null(&mut self) -> Result<()> {
1191        self.inner.add_null()
1192    }
1193
1194    /// Adds a boolean value.
1195    ///
1196    /// # Errors
1197    ///
1198    /// Forwards the error from [`Inserter::add_bool`].
1199    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1200        self.inner.add_bool(value)
1201    }
1202
1203    /// Adds an i16 value.
1204    ///
1205    /// # Errors
1206    ///
1207    /// Forwards the error from [`Inserter::add_i16`].
1208    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1209        self.inner.add_i16(value)
1210    }
1211
1212    /// Adds an i32 value.
1213    ///
1214    /// # Errors
1215    ///
1216    /// Forwards the error from [`Inserter::add_i32`].
1217    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1218        self.inner.add_i32(value)
1219    }
1220
1221    /// Adds an i64 value.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Forwards the error from [`Inserter::add_i64`].
1226    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1227        self.inner.add_i64(value)
1228    }
1229
1230    /// Adds an f32 value.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Forwards the error from [`Inserter::add_f32`].
1235    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1236        self.inner.add_f32(value)
1237    }
1238
1239    /// Adds an f64 value.
1240    ///
1241    /// # Errors
1242    ///
1243    /// Forwards the error from [`Inserter::add_f64`].
1244    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1245        self.inner.add_f64(value)
1246    }
1247
1248    /// Adds a string value.
1249    ///
1250    /// # Errors
1251    ///
1252    /// Forwards the error from [`Inserter::add_str`].
1253    pub fn add_str(&mut self, value: &str) -> Result<()> {
1254        self.inner.add_str(value)
1255    }
1256
1257    /// Adds a bytes value.
1258    ///
1259    /// # Errors
1260    ///
1261    /// Forwards the error from [`Inserter::add_bytes`].
1262    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1263        self.inner.add_bytes(value)
1264    }
1265
1266    /// Ends the current row.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Forwards the error from [`Inserter::end_row`].
1271    pub fn end_row(&mut self) -> Result<()> {
1272        self.inner.end_row()
1273    }
1274
1275    /// Executes the insert with column mappings.
1276    ///
1277    /// This method:
1278    /// 1. Inserts all buffered rows into the staging table
1279    /// 2. Executes INSERT...SELECT from staging to target with mappings
1280    /// 3. Drops the staging table
1281    ///
1282    /// Returns the number of rows inserted into the target table.
1283    ///
1284    /// # Errors
1285    ///
1286    /// - Returns the error from the inner [`Inserter::execute`] if writing
1287    ///   the staging rows fails.
1288    /// - Returns [`Error::Server`] if the `INSERT ... SELECT` from staging
1289    ///   to the target table is rejected (e.g. a mapping expression fails
1290    ///   to evaluate).
1291    /// - Returns [`Error::Server`] if dropping the staging table fails.
1292    pub fn execute(&mut self) -> Result<u64> {
1293        let connection = self.inner.connection;
1294        let staging_table = self.staging_table.clone();
1295
1296        // Insert data into staging table
1297        let _staging_rows = self.inner.execute()?;
1298
1299        // Build the INSERT...SELECT statement
1300        use hyperdb_api_core::protocol::escape::SqlIdentifier;
1301
1302        let target_columns: Vec<String> = self
1303            .mappings
1304            .iter()
1305            .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1306            .collect();
1307
1308        let select_items: Vec<String> = self
1309            .mappings
1310            .iter()
1311            .map(ColumnMapping::to_select_item)
1312            .collect();
1313
1314        let sql = format!(
1315            "INSERT INTO {} ({}) SELECT {} FROM {}",
1316            self.target_table,
1317            target_columns.join(", "),
1318            select_items.join(", "),
1319            SqlIdentifier(&staging_table),
1320        );
1321
1322        // Execute the INSERT...SELECT (returns row count directly)
1323        let row_count = connection.execute_command(&sql)?;
1324
1325        // Drop the staging table
1326        connection.execute_command(&format!(
1327            "DROP TABLE IF EXISTS {}",
1328            SqlIdentifier(&staging_table)
1329        ))?;
1330
1331        // Return the number of rows inserted
1332        Ok(row_count)
1333    }
1334
1335    /// Cancels the insert and drops the staging table.
1336    ///
1337    /// This method handles cleanup failures gracefully by logging warnings
1338    /// instead of returning errors. This prevents masking the original error
1339    /// that caused the cancellation.
1340    ///
1341    /// # Logging
1342    ///
1343    /// Cleanup failures are logged using the `tracing` crate at WARN level.
1344    /// If `tracing` is not initialized, errors are written to stderr.
1345    pub fn cancel(&mut self) {
1346        let connection = self.inner.connection;
1347        let staging_table = &self.staging_table;
1348
1349        // Drop the staging table, but don't fail if cleanup fails
1350        // This avoids masking the original error that caused cancellation
1351        if let Err(e) = connection.execute_command(&format!(
1352            "DROP TABLE IF EXISTS \"{}\"",
1353            staging_table.replace('"', "\"\"")
1354        )) {
1355            // Log the cleanup failure for debugging
1356            // In production, consider using a logging framework like `tracing`
1357            eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1358        }
1359    }
1360}
1361
1362// =============================================================================
1363// InsertChunk - Thread-safe chunk for parallel encoding
1364// =============================================================================
1365
1366/// A thread-safe chunk for encoding rows in parallel.
1367///
1368/// `InsertChunk` can be created and populated in any thread, then sent to a
1369/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1370/// multiple worker threads while serializing the actual network sends.
1371///
1372/// # Example
1373///
1374/// ```no_run
1375/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1376///
1377/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1378///     let mut chunk = InsertChunk::from_table_definition(table_def);
1379///     
1380///     for i in 0..1000 {
1381///         chunk.add_i32(start_id + i)?;
1382///         chunk.add_str(&format!("Item {}", start_id + i))?;
1383///         chunk.end_row()?;
1384///     }
1385///     
1386///     Ok(chunk)
1387/// }
1388/// ```
1389#[derive(Debug)]
1390pub struct InsertChunk {
1391    buffer: BytesMut,
1392    header_written: bool,
1393    column_index: usize,
1394    column_count: usize,
1395    row_count: usize,
1396    column_nullable: Vec<bool>,
1397}
1398
1399// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1400// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1401// thread-local state. The manual `unsafe impl` exists only because the
1402// auto-trait derivation is conservative for this struct's compilation context;
1403// the compound type has no `!Send` components.
1404unsafe impl Send for InsertChunk {}
1405// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1406// and there is no interior mutability crossing a `&InsertChunk` boundary,
1407// so sharing `&InsertChunk` across threads is sound.
1408unsafe impl Sync for InsertChunk {}
1409
1410impl InsertChunk {
1411    /// Creates a new empty chunk with the given schema.
1412    ///
1413    /// # Arguments
1414    ///
1415    /// * `column_count` - Number of columns per row
1416    /// * `column_nullable` - Whether each column is nullable
1417    #[must_use]
1418    pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1419        debug_assert_eq!(column_count, column_nullable.len());
1420        InsertChunk {
1421            buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1422            header_written: false,
1423            column_index: 0,
1424            column_count,
1425            row_count: 0,
1426            column_nullable,
1427        }
1428    }
1429
1430    /// Creates a chunk from a table definition.
1431    #[must_use]
1432    pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1433        let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1434        Self::new(table_def.column_count(), column_nullable)
1435    }
1436
1437    /// Returns the number of complete rows in this chunk.
1438    #[must_use]
1439    pub fn row_count(&self) -> usize {
1440        self.row_count
1441    }
1442
1443    /// Returns the current buffer size in bytes.
1444    #[must_use]
1445    pub fn buffer_size(&self) -> usize {
1446        self.buffer.len()
1447    }
1448
1449    /// Returns true if the chunk has reached size or row limits and should be sent.
1450    #[must_use]
1451    pub fn should_flush(&self) -> bool {
1452        self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1453    }
1454
1455    /// Returns true if the chunk is empty (no rows).
1456    #[must_use]
1457    pub fn is_empty(&self) -> bool {
1458        self.row_count == 0
1459    }
1460
1461    /// Takes the buffer, consuming the chunk data.
1462    ///
1463    /// Returns `None` if the chunk is empty. After calling this, the chunk
1464    /// can be reused by calling the add_* methods again.
1465    ///
1466    /// Note: The header flag is NOT reset - subsequent chunks from the same
1467    /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1468    /// header per COPY stream).
1469    pub fn take(&mut self) -> Option<BytesMut> {
1470        if self.row_count == 0 {
1471            return None;
1472        }
1473        // Don't reset header_written - only first chunk should have header
1474        self.row_count = 0;
1475        Some(std::mem::take(&mut self.buffer))
1476    }
1477
1478    /// Resets the chunk for reuse without reallocating.
1479    pub fn clear(&mut self) {
1480        self.buffer.clear();
1481        self.header_written = false;
1482        self.column_index = 0;
1483        self.row_count = 0;
1484    }
1485
1486    #[allow(
1487        clippy::inline_always,
1488        reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1489    )]
1490    fn ensure_header(&mut self) {
1491        if !self.header_written {
1492            copy::write_header(&mut self.buffer);
1493            self.header_written = true;
1494        }
1495    }
1496
1497    #[expect(
1498        clippy::inline_always,
1499        reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1500    )]
1501    #[inline(always)]
1502    fn current_column_nullable(&self) -> bool {
1503        *self.column_nullable.get(self.column_index).unwrap_or(&true)
1504    }
1505
1506    /// Adds a NULL value for the current column.
1507    ///
1508    /// # Errors
1509    ///
1510    /// - Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"`
1511    ///   if the current row already has all columns supplied.
1512    /// - Returns [`Error::InvalidTableDefinition`] with message
1513    ///   `"Cannot add NULL to non-nullable column"` if the current column
1514    ///   is `NOT NULL` in the schema.
1515    pub fn add_null(&mut self) -> Result<()> {
1516        if self.column_index >= self.column_count {
1517            return Err(Error::invalid_table_definition("Too many columns in row"));
1518        }
1519        if !self.current_column_nullable() {
1520            return Err(Error::invalid_table_definition(
1521                "Cannot add NULL to non-nullable column",
1522            ));
1523        }
1524        self.ensure_header();
1525        copy::write_null(&mut self.buffer);
1526        self.column_index += 1;
1527        Ok(())
1528    }
1529
1530    /// Adds a boolean value.
1531    ///
1532    /// # Errors
1533    ///
1534    /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
1535    /// the current row already has all columns supplied.
1536    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1537        if self.column_index >= self.column_count {
1538            return Err(Error::invalid_table_definition("Too many columns in row"));
1539        }
1540        self.ensure_header();
1541        let int_value = i8::from(value);
1542        if self.current_column_nullable() {
1543            copy::write_i8(&mut self.buffer, int_value);
1544        } else {
1545            copy::write_i8_not_null(&mut self.buffer, int_value);
1546        }
1547        self.column_index += 1;
1548        Ok(())
1549    }
1550
1551    /// Adds an i16 value (SMALLINT).
1552    ///
1553    /// # Errors
1554    ///
1555    /// See [`add_bool`](Self::add_bool).
1556    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1557        if self.column_index >= self.column_count {
1558            return Err(Error::invalid_table_definition("Too many columns in row"));
1559        }
1560        self.ensure_header();
1561        if self.current_column_nullable() {
1562            copy::write_i16(&mut self.buffer, value);
1563        } else {
1564            copy::write_i16_not_null(&mut self.buffer, value);
1565        }
1566        self.column_index += 1;
1567        Ok(())
1568    }
1569
1570    /// Adds an i32 value (INT).
1571    ///
1572    /// # Errors
1573    ///
1574    /// See [`add_bool`](Self::add_bool).
1575    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1576        if self.column_index >= self.column_count {
1577            return Err(Error::invalid_table_definition("Too many columns in row"));
1578        }
1579        self.ensure_header();
1580        if self.current_column_nullable() {
1581            copy::write_i32(&mut self.buffer, value);
1582        } else {
1583            copy::write_i32_not_null(&mut self.buffer, value);
1584        }
1585        self.column_index += 1;
1586        Ok(())
1587    }
1588
1589    /// Adds an i64 value (BIGINT).
1590    ///
1591    /// # Errors
1592    ///
1593    /// See [`add_bool`](Self::add_bool).
1594    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1595        if self.column_index >= self.column_count {
1596            return Err(Error::invalid_table_definition("Too many columns in row"));
1597        }
1598        self.ensure_header();
1599        if self.current_column_nullable() {
1600            copy::write_i64(&mut self.buffer, value);
1601        } else {
1602            copy::write_i64_not_null(&mut self.buffer, value);
1603        }
1604        self.column_index += 1;
1605        Ok(())
1606    }
1607
1608    /// Adds an f32 value (REAL/FLOAT4).
1609    ///
1610    /// # Errors
1611    ///
1612    /// See [`add_bool`](Self::add_bool).
1613    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1614        if self.column_index >= self.column_count {
1615            return Err(Error::invalid_table_definition("Too many columns in row"));
1616        }
1617        self.ensure_header();
1618        if self.current_column_nullable() {
1619            copy::write_f32(&mut self.buffer, value);
1620        } else {
1621            copy::write_f32_not_null(&mut self.buffer, value);
1622        }
1623        self.column_index += 1;
1624        Ok(())
1625    }
1626
1627    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1628    ///
1629    /// # Errors
1630    ///
1631    /// See [`add_bool`](Self::add_bool).
1632    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1633        if self.column_index >= self.column_count {
1634            return Err(Error::invalid_table_definition("Too many columns in row"));
1635        }
1636        self.ensure_header();
1637        if self.current_column_nullable() {
1638            copy::write_f64(&mut self.buffer, value);
1639        } else {
1640            copy::write_f64_not_null(&mut self.buffer, value);
1641        }
1642        self.column_index += 1;
1643        Ok(())
1644    }
1645
1646    /// Adds a string value (TEXT/VARCHAR).
1647    ///
1648    /// # Errors
1649    ///
1650    /// See [`add_bool`](Self::add_bool).
1651    pub fn add_str(&mut self, value: &str) -> Result<()> {
1652        self.add_bytes(value.as_bytes())
1653    }
1654
1655    /// Adds a bytes value (BYTEA).
1656    ///
1657    /// # Errors
1658    ///
1659    /// See [`add_bool`](Self::add_bool).
1660    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1661        if self.column_index >= self.column_count {
1662            return Err(Error::invalid_table_definition("Too many columns in row"));
1663        }
1664        if value.len() > u32::MAX as usize {
1665            return Err(Error::conversion(format!(
1666                "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1667                value.len(),
1668                u32::MAX
1669            )));
1670        }
1671        self.ensure_header();
1672        if self.current_column_nullable() {
1673            copy::write_varbinary(&mut self.buffer, value);
1674        } else {
1675            copy::write_varbinary_not_null(&mut self.buffer, value);
1676        }
1677        self.column_index += 1;
1678        Ok(())
1679    }
1680
1681    /// Adds a 128-bit value (NUMERIC/INTERVAL).
1682    ///
1683    /// # Errors
1684    ///
1685    /// See [`add_bool`](Self::add_bool).
1686    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1687        if self.column_index >= self.column_count {
1688            return Err(Error::invalid_table_definition("Too many columns in row"));
1689        }
1690        self.ensure_header();
1691        if self.current_column_nullable() {
1692            copy::write_data128(&mut self.buffer, value);
1693        } else {
1694            copy::write_data128_not_null(&mut self.buffer, value);
1695        }
1696        self.column_index += 1;
1697        Ok(())
1698    }
1699
1700    /// Adds a Date value.
1701    ///
1702    /// # Errors
1703    ///
1704    /// See [`add_bool`](Self::add_bool).
1705    pub fn add_date(&mut self, value: Date) -> Result<()> {
1706        if self.column_index >= self.column_count {
1707            return Err(Error::invalid_table_definition("Too many columns in row"));
1708        }
1709        self.ensure_header();
1710        let julian_day = value.to_julian_day();
1711        if self.current_column_nullable() {
1712            copy::write_i32(&mut self.buffer, julian_day);
1713        } else {
1714            copy::write_i32_not_null(&mut self.buffer, julian_day);
1715        }
1716        self.column_index += 1;
1717        Ok(())
1718    }
1719
1720    /// Adds a Time value.
1721    ///
1722    /// # Errors
1723    ///
1724    /// See [`add_bool`](Self::add_bool).
1725    pub fn add_time(&mut self, value: Time) -> Result<()> {
1726        if self.column_index >= self.column_count {
1727            return Err(Error::invalid_table_definition("Too many columns in row"));
1728        }
1729        self.ensure_header();
1730        let micros = value.to_microseconds();
1731        if self.current_column_nullable() {
1732            copy::write_i64(&mut self.buffer, micros);
1733        } else {
1734            copy::write_i64_not_null(&mut self.buffer, micros);
1735        }
1736        self.column_index += 1;
1737        Ok(())
1738    }
1739
1740    /// Adds a Timestamp value.
1741    ///
1742    /// # Errors
1743    ///
1744    /// See [`add_bool`](Self::add_bool).
1745    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1746        if self.column_index >= self.column_count {
1747            return Err(Error::invalid_table_definition("Too many columns in row"));
1748        }
1749        self.ensure_header();
1750        let micros = value.to_microseconds();
1751        if self.current_column_nullable() {
1752            copy::write_i64(&mut self.buffer, micros);
1753        } else {
1754            copy::write_i64_not_null(&mut self.buffer, micros);
1755        }
1756        self.column_index += 1;
1757        Ok(())
1758    }
1759
1760    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1761    ///
1762    /// # Errors
1763    ///
1764    /// See [`add_bool`](Self::add_bool).
1765    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1766        if self.column_index >= self.column_count {
1767            return Err(Error::invalid_table_definition("Too many columns in row"));
1768        }
1769        self.ensure_header();
1770        let micros = value.to_microseconds_utc();
1771        if self.current_column_nullable() {
1772            copy::write_i64(&mut self.buffer, micros);
1773        } else {
1774            copy::write_i64_not_null(&mut self.buffer, micros);
1775        }
1776        self.column_index += 1;
1777        Ok(())
1778    }
1779
1780    /// Adds an Interval value.
1781    ///
1782    /// # Errors
1783    ///
1784    /// See [`add_bool`](Self::add_bool).
1785    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1786        if self.column_index >= self.column_count {
1787            return Err(Error::invalid_table_definition("Too many columns in row"));
1788        }
1789        self.ensure_header();
1790        let packed = value.to_packed();
1791        if self.current_column_nullable() {
1792            copy::write_data128(&mut self.buffer, &packed);
1793        } else {
1794            copy::write_data128_not_null(&mut self.buffer, &packed);
1795        }
1796        self.column_index += 1;
1797        Ok(())
1798    }
1799
1800    /// Ends the current row.
1801    ///
1802    /// Returns an error if the wrong number of columns were added.
1803    ///
1804    /// # Errors
1805    ///
1806    /// Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
1807    /// for this row than the chunk's column count.
1808    pub fn end_row(&mut self) -> Result<()> {
1809        if self.column_index != self.column_count {
1810            return Err(Error::invalid_table_definition(format!(
1811                "Expected {} columns, got {}",
1812                self.column_count, self.column_index
1813            )));
1814        }
1815        self.column_index = 0;
1816        self.row_count += 1;
1817        Ok(())
1818    }
1819
1820    /// Returns the current column index (for checking incomplete rows).
1821    #[must_use]
1822    pub fn column_index(&self) -> usize {
1823        self.column_index
1824    }
1825
1826    /// Returns a reference to the internal buffer.
1827    pub(crate) fn buffer(&self) -> &BytesMut {
1828        &self.buffer
1829    }
1830}
1831
1832// =============================================================================
1833// ChunkSender - Mutex-protected sender for InsertChunks
1834// =============================================================================
1835
1836use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1837use std::sync::Mutex;
1838
1839/// A thread-safe sender for [`InsertChunk`]s.
1840///
1841/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1842/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1843/// the mutex ensures serialized access.
1844///
1845/// # Example
1846///
1847/// ```no_run
1848/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1849/// use std::sync::mpsc;
1850/// use std::thread;
1851///
1852/// fn main() -> Result<()> {
1853///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1854///     
1855///     let table_def = TableDefinition::new("products")
1856///         .add_required_column("id", SqlType::int())
1857///         .add_nullable_column("name", SqlType::text());
1858///     
1859///     Catalog::new(&conn).create_table(&table_def)?;
1860///     
1861///     let sender = ChunkSender::new(&conn, &table_def)?;
1862///     let (tx, rx) = mpsc::channel::<InsertChunk>();
1863///     
1864///     // Worker thread
1865///     let table_def_clone = table_def.clone();
1866///     let handle = thread::spawn(move || {
1867///         let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1868///         for i in 0..1000i32 {
1869///             chunk.add_i32(i).unwrap();
1870///             chunk.add_str(&format!("Product {}", i)).unwrap();
1871///             chunk.end_row().unwrap();
1872///         }
1873///         tx.send(chunk).unwrap();
1874///     });
1875///     
1876///     // Receive and send chunks
1877///     while let Ok(chunk) = rx.recv() {
1878///         sender.send_chunk(chunk)?;
1879///     }
1880///     
1881///     handle.join().unwrap();
1882///     let rows = sender.finish()?;
1883///     println!("Inserted {} rows", rows);
1884///     Ok(())
1885/// }
1886/// ```
1887#[derive(Debug)]
1888pub struct ChunkSender<'conn> {
1889    connection: &'conn Connection,
1890    table_name: String,
1891    columns: Vec<String>,
1892    writer: Mutex<Option<CopyInWriter<'conn>>>,
1893    header_sent: std::sync::atomic::AtomicBool,
1894    total_rows: AtomicU64,
1895    chunks_sent: AtomicUsize,
1896}
1897
1898impl<'conn> ChunkSender<'conn> {
1899    /// Creates a new chunk sender for the given table.
1900    ///
1901    /// # Errors
1902    ///
1903    /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1904    /// columns. The COPY session itself is opened lazily on the first
1905    /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1906    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1907        if table_def.column_count() == 0 {
1908            return Err(Error::invalid_table_definition(
1909                "Table definition must have at least one column",
1910            ));
1911        }
1912
1913        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1914        let table_name = table_def.qualified_name();
1915
1916        Ok(ChunkSender {
1917            connection,
1918            table_name,
1919            columns,
1920            writer: Mutex::new(None),
1921            header_sent: std::sync::atomic::AtomicBool::new(false),
1922            total_rows: AtomicU64::new(0),
1923            chunks_sent: AtomicUsize::new(0),
1924        })
1925    }
1926
1927    /// Sends a chunk to Hyper.
1928    ///
1929    /// This method is thread-safe - multiple threads can call it concurrently,
1930    /// but only one chunk will be sent at a time.
1931    ///
1932    /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1933    /// automatically handles headers: the first chunk's header is sent, and
1934    /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1935    /// header per COPY stream).
1936    ///
1937    /// # Errors
1938    ///
1939    /// Returns an error if the chunk is empty or if sending fails.
1940    pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1941        // Capture row count before take() resets it
1942        let row_count = chunk.row_count();
1943
1944        let Some(buffer) = chunk.take() else {
1945            return Ok(());
1946        };
1947
1948        // Acquire the lock for exclusive send access
1949        let mut writer_guard = self
1950            .writer
1951            .lock()
1952            .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
1953
1954        // Lazily initialize the COPY connection
1955        if writer_guard.is_none() {
1956            let client = self.connection.tcp_client().ok_or_else(|| {
1957                Error::feature_not_supported(
1958                    "ChunkSender requires a TCP connection. gRPC connections do not support COPY operations."
1959                )
1960            })?;
1961            let columns: Vec<&str> = self
1962                .columns
1963                .iter()
1964                .map(std::string::String::as_str)
1965                .collect();
1966            *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
1967        }
1968
1969        // Handle headers: only first chunk should have header in the COPY stream
1970        // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
1971        // strip headers from all chunks except the first one sent.
1972        let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
1973
1974        let data_to_send = if is_first {
1975            // First chunk: send with header
1976            &buffer[..]
1977        } else {
1978            // Subsequent chunks: strip the 19-byte header if present
1979            if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
1980                && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
1981            {
1982                &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
1983            } else {
1984                &buffer[..]
1985            }
1986        };
1987
1988        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
1989        // into the connection's write buffer. flush_stream ensures the data
1990        // reaches the server before we return.
1991        if let Some(ref mut writer) = *writer_guard {
1992            writer.send_direct(data_to_send)?;
1993            writer.flush_stream()?;
1994        }
1995
1996        // Update counters (lock already released for these atomic ops)
1997        drop(writer_guard);
1998        self.total_rows
1999            .fetch_add(row_count as u64, Ordering::Relaxed);
2000        self.chunks_sent.fetch_add(1, Ordering::Relaxed);
2001
2002        debug!(
2003            target: "hyperdb_api",
2004            chunk = self.chunks_sent.load(Ordering::Relaxed),
2005            rows = row_count,
2006            bytes = data_to_send.len(),
2007            "chunk-sender"
2008        );
2009
2010        Ok(())
2011    }
2012
2013    /// Returns the total number of rows sent so far.
2014    pub fn total_rows(&self) -> u64 {
2015        self.total_rows.load(Ordering::Relaxed)
2016    }
2017
2018    /// Returns the number of chunks sent so far.
2019    pub fn chunks_sent(&self) -> usize {
2020        self.chunks_sent.load(Ordering::Relaxed)
2021    }
2022
2023    /// Finishes the COPY operation and returns the total row count.
2024    ///
2025    /// This method consumes the sender. After calling this, the COPY operation
2026    /// is complete and all data has been committed.
2027    ///
2028    /// # Errors
2029    ///
2030    /// - Returns [`Error::Internal`] with message `"ChunkSender mutex poisoned"`
2031    ///   if a sender thread panicked while holding the writer lock.
2032    /// - Returns [`Error::Server`] or [`Error::Io`] if sending the COPY
2033    ///   trailer or finishing the COPY operation fails.
2034    pub fn finish(self) -> Result<u64> {
2035        let mut writer_guard = self
2036            .writer
2037            .lock()
2038            .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
2039
2040        // If no chunks were sent, return 0
2041        let Some(writer) = writer_guard.take() else {
2042            return Ok(0);
2043        };
2044
2045        // Write and send the COPY trailer
2046        let mut trailer_buf = BytesMut::with_capacity(2);
2047        copy::write_trailer(&mut trailer_buf);
2048
2049        // Need to get mutable access to send trailer
2050        let mut writer = writer;
2051        writer.send(&trailer_buf)?;
2052
2053        // Finish the COPY operation
2054        let rows = writer.finish()?;
2055
2056        info!(
2057            target: "hyperdb_api",
2058            rows,
2059            chunks = self.chunks_sent.load(Ordering::Relaxed),
2060            table = %self.table_name,
2061            "chunk-sender-finish"
2062        );
2063
2064        Ok(rows)
2065    }
2066}
2067
2068#[cfg(test)]
2069mod tests {
2070    use crate::table_definition::TableDefinition;
2071    use hyperdb_api_core::types::SqlType;
2072
2073    use super::InsertChunk;
2074
2075    fn create_test_table_def() -> TableDefinition {
2076        TableDefinition::new("test")
2077            .add_required_column("id", SqlType::int())
2078            .add_nullable_column("name", SqlType::text())
2079    }
2080
2081    #[test]
2082    fn test_inserter_column_validation() {
2083        // We can't fully test without a connection, but we can test validation logic
2084        let table_def = create_test_table_def();
2085        assert_eq!(table_def.column_count(), 2);
2086    }
2087
2088    #[test]
2089    fn test_insert_chunk_encoding() {
2090        let table_def = create_test_table_def();
2091        let mut chunk = InsertChunk::from_table_definition(&table_def);
2092
2093        // Add a row
2094        chunk.add_i32(42).unwrap();
2095        chunk.add_str("hello").unwrap();
2096        chunk.end_row().unwrap();
2097
2098        assert_eq!(chunk.row_count(), 1);
2099        assert!(!chunk.is_empty());
2100        assert!(!chunk.should_flush()); // Not at limit yet
2101
2102        // Add more rows
2103        for i in 0..100 {
2104            chunk.add_i32(i).unwrap();
2105            chunk.add_str(&format!("item {i}")).unwrap();
2106            chunk.end_row().unwrap();
2107        }
2108
2109        assert_eq!(chunk.row_count(), 101);
2110
2111        // Take the buffer
2112        let buffer = chunk.take().unwrap();
2113        assert!(!buffer.is_empty());
2114
2115        // Chunk should now be empty after take
2116        assert!(chunk.take().is_none());
2117    }
2118
2119    #[test]
2120    fn test_insert_chunk_null_handling() {
2121        let table_def = create_test_table_def();
2122        let mut chunk = InsertChunk::from_table_definition(&table_def);
2123
2124        // First column is NOT NULL, should fail
2125        assert!(chunk.add_null().is_err());
2126
2127        // Add the required column first
2128        chunk.add_i32(1).unwrap();
2129
2130        // Second column is nullable, should succeed
2131        chunk.add_null().unwrap();
2132        chunk.end_row().unwrap();
2133
2134        assert_eq!(chunk.row_count(), 1);
2135    }
2136
2137    #[test]
2138    fn test_insert_chunk_column_count_validation() {
2139        let table_def = create_test_table_def();
2140        let mut chunk = InsertChunk::from_table_definition(&table_def);
2141
2142        // Add only one column
2143        chunk.add_i32(1).unwrap();
2144
2145        // end_row should fail
2146        assert!(chunk.end_row().is_err());
2147
2148        // Add second column
2149        chunk.add_str("test").unwrap();
2150
2151        // Now end_row should succeed
2152        chunk.end_row().unwrap();
2153    }
2154
2155    #[test]
2156    fn test_insert_chunk_too_many_columns() {
2157        let table_def = create_test_table_def();
2158        let mut chunk = InsertChunk::from_table_definition(&table_def);
2159
2160        chunk.add_i32(1).unwrap();
2161        chunk.add_str("test").unwrap();
2162
2163        // Third column should fail
2164        assert!(chunk.add_i32(2).is_err());
2165    }
2166
2167    #[test]
2168    fn test_insert_chunk_clear() {
2169        let table_def = create_test_table_def();
2170        let mut chunk = InsertChunk::from_table_definition(&table_def);
2171
2172        chunk.add_i32(1).unwrap();
2173        chunk.add_str("test").unwrap();
2174        chunk.end_row().unwrap();
2175
2176        assert_eq!(chunk.row_count(), 1);
2177
2178        chunk.clear();
2179
2180        assert_eq!(chunk.row_count(), 0);
2181        assert!(chunk.is_empty());
2182    }
2183}