Skip to main content

hyperdb_api/
inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//!     inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{
29    Date, Geography, Interval, Numeric, OffsetTimestamp, Time, Timestamp,
30};
31use tracing::{debug, info};
32
33use crate::catalog::Catalog;
34use crate::connection::Connection;
35use crate::error::{Error, Result};
36use crate::table_definition::TableDefinition;
37
38/// Initial buffer size (4 MB) to reduce early reallocations.
39///
40/// The COPY protocol sends data in chunks, and each chunk requires a
41/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
42/// during the first chunk for typical workloads while keeping initial
43/// memory allocation reasonable.
44const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
45
46/// Maximum buffer size per chunk before flushing to the server (16 MB).
47///
48/// This balances two competing concerns:
49/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
50///   network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
51/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
52///   keeps resident memory bounded even when rows are wide.
53///
54/// The 16 MB value was chosen empirically — it lands on the flat part of
55/// the throughput curve where further increases yield diminishing returns.
56const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
57
58/// Maximum rows per chunk before flushing to the server.
59///
60/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
61/// narrow rows (few bytes each), the byte limit alone would accumulate
62/// millions of rows before flushing, which delays server-side processing.
63/// 64K rows ensures timely flushes regardless of row width and aligns with
64/// the 64K chunk size used for query result streaming
65/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
66const CHUNK_ROW_LIMIT: usize = 64_000;
67
68/// A high-performance bulk data inserter.
69///
70/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
71/// using the COPY protocol with `HyperBinary` format for optimal performance.
72///
73/// # Example
74///
75/// ```no_run
76/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
77///
78/// fn main() -> Result<()> {
79///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
80///
81///     let table_def = TableDefinition::new("users")
82///         .add_required_column("id", SqlType::int())
83///         .add_nullable_column("name", SqlType::text());
84///
85///     Catalog::new(&conn).create_table(&table_def)?;
86///
87///     let mut inserter = Inserter::new(&conn, &table_def)?;
88///
89///     for i in 0..1000i32 {
90///         inserter.add_row(&[&i, &format!("User {}", i)])?;
91///     }
92///
93///     let rows = inserter.execute()?;
94///     println!("Inserted {} rows", rows);
95///     Ok(())
96/// }
97/// ```
98#[derive(Debug)]
99pub struct Inserter<'conn> {
100    connection: &'conn Connection,
101    table_def: TableDefinition,
102    /// The current chunk being populated (delegates encoding).
103    chunk: InsertChunk,
104    /// Total rows inserted across all chunks.
105    row_count: u64,
106    /// Number of chunks sent.
107    chunk_count: usize,
108    /// Active COPY writer (lazily initialized on first write).
109    writer: Option<CopyInWriter<'conn>>,
110    /// Start time for timing the insert operation.
111    start_time: Instant,
112}
113
114impl<'conn> Inserter<'conn> {
115    /// Creates a new inserter for the given table.
116    ///
117    /// The underlying COPY session is started lazily on the first flush or
118    /// execute, so construction is lightweight. However, the connection's
119    /// transport is validated eagerly — using a gRPC connection will return
120    /// an error immediately.
121    ///
122    /// # Errors
123    ///
124    /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
125    ///   columns.
126    /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
127    ///   (COPY is TCP-only).
128    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
129        if table_def.column_count() == 0 {
130            return Err(Error::invalid_table_definition(
131                "Table definition must have at least one column",
132            ));
133        }
134
135        // Fail fast: verify the connection supports COPY (TCP only)
136        if connection.tcp_client().is_none() {
137            return Err(Error::feature_not_supported(
138                "Inserter requires a TCP connection. \
139                 gRPC connections do not support COPY operations.",
140            ));
141        }
142
143        Ok(Inserter {
144            connection,
145            table_def: table_def.clone(),
146            chunk: InsertChunk::from_table_definition(table_def),
147            row_count: 0,
148            chunk_count: 0,
149            writer: None,
150            start_time: Instant::now(),
151        })
152    }
153
154    /// Creates an inserter by querying the table schema from the database.
155    ///
156    /// This method queries the database to get the table definition automatically,
157    /// which is useful when you want to insert into an existing table without
158    /// manually specifying the schema.
159    ///
160    /// # Arguments
161    ///
162    /// * `connection` - The database connection.
163    /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
168    ///
169    /// # Example
170    ///
171    /// ```no_run
172    /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
173    ///
174    /// fn main() -> Result<()> {
175    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
176    ///
177    ///     // Create a table first
178    ///     conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
179    ///
180    ///     // Create inserter by querying the schema directly from a string
181    ///     let mut inserter = Inserter::from_table(&conn, "public.products")?;
182    ///
183    ///     // Now we can insert data without knowing the exact schema
184    ///     inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
185    ///     inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
186    ///
187    ///     let rows = inserter.execute()?;
188    ///     println!("Inserted {} rows", rows);
189    ///     Ok(())
190    /// }
191    /// ```
192    pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
193    where
194        T: TryInto<crate::TableName>,
195        crate::Error: From<T::Error>,
196    {
197        let catalog = Catalog::new(connection);
198        let table_def = catalog.get_table_definition(table_name)?;
199        Self::new(connection, &table_def)
200    }
201
202    /// Creates an inserter with column mappings that allow SQL expressions.
203    ///
204    /// This method uses a temporary table and INSERT...SELECT to support
205    /// column mappings with SQL expressions. Data is first inserted into
206    /// a temporary staging table, then transformed using the mappings.
207    ///
208    /// # Arguments
209    ///
210    /// * `connection` - The database connection.
211    /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
212    /// * `target_table` - The qualified name of the target table to insert into.
213    ///   Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
214    /// * `mappings` - Column mappings defining how values are transformed.
215    ///
216    /// # Example
217    ///
218    /// ```no_run
219    /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
220    ///
221    /// fn main() -> Result<()> {
222    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
223    ///
224    ///     // Target table with computed columns
225    ///     conn.execute_command(r#"
226    ///         CREATE TABLE orders (
227    ///             id INT NOT NULL,
228    ///             product TEXT,
229    ///             quantity INT,
230    ///             price DOUBLE PRECISION,
231    ///             total DOUBLE PRECISION,
232    ///             created_at TIMESTAMP
233    ///         )
234    ///     "#)?;
235    ///
236    ///     // Inserter definition - what we provide
237    ///     let inserter_def = TableDefinition::new("_stage")
238    ///         .add_required_column("id", SqlType::int())
239    ///         .add_nullable_column("product", SqlType::text())
240    ///         .add_nullable_column("quantity", SqlType::int())
241    ///         .add_nullable_column("price", SqlType::double());
242    ///
243    ///     // Column mappings - how values are transformed
244    ///     let mappings = vec![
245    ///         ColumnMapping::new("id"),
246    ///         ColumnMapping::new("product"),
247    ///         ColumnMapping::new("quantity"),
248    ///         ColumnMapping::new("price"),
249    ///         ColumnMapping::with_expression("total", "quantity * price"),
250    ///         ColumnMapping::with_expression("created_at", "NOW()"),
251    ///     ];
252    ///
253    ///     // For simple table names in the public schema, use quoted name
254    ///     // For qualified names, use target_table_def.qualified_name()
255    ///     let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
256    ///
257    ///     inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
258    ///     inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
259    ///
260    ///     let rows = inserter.execute()?;
261    ///     Ok(())
262    /// }
263    /// ```
264    ///
265    /// # Errors
266    ///
267    /// - Returns an error if `target_table` fails to convert into a
268    ///   [`TableName`](crate::TableName).
269    /// - Returns [`Error::Server`] if creating the temporary staging table
270    ///   fails on the server.
271    /// - Returns the errors from [`Inserter::new`] for the staging table
272    ///   (zero-column table definition, gRPC transport).
273    pub fn with_column_mappings<T>(
274        connection: &'conn Connection,
275        inserter_def: &TableDefinition,
276        target_table: T,
277        mappings: &[ColumnMapping],
278    ) -> Result<MappedInserter<'conn>>
279    where
280        T: TryInto<crate::TableName>,
281        crate::Error: From<T::Error>,
282    {
283        MappedInserter::new(connection, inserter_def, target_table, mappings)
284    }
285
286    /// Returns the table definition.
287    pub fn table_definition(&self) -> &TableDefinition {
288        &self.table_def
289    }
290
291    /// Returns the number of columns.
292    #[must_use]
293    pub fn column_count(&self) -> usize {
294        self.table_def.column_count()
295    }
296
297    /// Returns the number of complete rows buffered.
298    #[must_use]
299    pub fn row_count(&self) -> u64 {
300        self.row_count
301    }
302
303    /// Adds a NULL value for the current column.
304    ///
305    /// # Errors
306    ///
307    /// Returns [`Error::InvalidTableDefinition`] if the current row already has all columns
308    /// supplied, or if the current column is marked `NOT NULL` in the table
309    /// definition.
310    #[inline]
311    pub fn add_null(&mut self) -> Result<()> {
312        self.chunk.add_null()
313    }
314
315    /// Adds a boolean value.
316    ///
317    /// # Errors
318    ///
319    /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
320    /// the current row already has all columns supplied.
321    #[inline]
322    pub fn add_bool(&mut self, value: bool) -> Result<()> {
323        self.chunk.add_bool(value)
324    }
325
326    /// Adds an i16 value (SMALLINT).
327    ///
328    /// # Errors
329    ///
330    /// See [`add_bool`](Self::add_bool).
331    #[inline]
332    pub fn add_i16(&mut self, value: i16) -> Result<()> {
333        self.chunk.add_i16(value)
334    }
335
336    /// Adds an i32 value (INT).
337    ///
338    /// # Errors
339    ///
340    /// See [`add_bool`](Self::add_bool).
341    #[inline]
342    pub fn add_i32(&mut self, value: i32) -> Result<()> {
343        self.chunk.add_i32(value)
344    }
345
346    /// Adds an i64 value (BIGINT).
347    ///
348    /// # Errors
349    ///
350    /// See [`add_bool`](Self::add_bool).
351    #[inline]
352    pub fn add_i64(&mut self, value: i64) -> Result<()> {
353        self.chunk.add_i64(value)
354    }
355
356    /// Adds an f32 value (REAL/FLOAT4).
357    ///
358    /// # Errors
359    ///
360    /// See [`add_bool`](Self::add_bool).
361    #[inline]
362    pub fn add_f32(&mut self, value: f32) -> Result<()> {
363        self.chunk.add_f32(value)
364    }
365
366    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
367    ///
368    /// # Errors
369    ///
370    /// See [`add_bool`](Self::add_bool).
371    #[inline]
372    pub fn add_f64(&mut self, value: f64) -> Result<()> {
373        self.chunk.add_f64(value)
374    }
375
376    /// Adds a string value (TEXT/VARCHAR).
377    ///
378    /// # Errors
379    ///
380    /// See [`add_bool`](Self::add_bool).
381    #[inline]
382    pub fn add_str(&mut self, value: &str) -> Result<()> {
383        self.chunk.add_str(value)
384    }
385
386    /// Adds a bytes value (BYTEA).
387    ///
388    /// # Errors
389    ///
390    /// See [`add_bool`](Self::add_bool).
391    #[inline]
392    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
393        self.chunk.add_bytes(value)
394    }
395
396    /// Adds a 128-bit value (NUMERIC/INTERVAL).
397    ///
398    /// # Errors
399    ///
400    /// See [`add_bool`](Self::add_bool).
401    #[inline]
402    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
403        self.chunk.add_data128(value)
404    }
405
406    /// Adds an optional value. If None, adds NULL.
407    ///
408    /// # Errors
409    ///
410    /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
411    /// return for the current row position.
412    pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
413    where
414        F: FnOnce(&mut Self, T) -> Result<()>,
415    {
416        match value {
417            Some(v) => add_fn(self, v),
418            None => self.add_null(),
419        }
420    }
421
422    /// Ends the current row.
423    ///
424    /// Returns an error if the wrong number of columns were added.
425    /// Automatically flushes the buffer if chunk limits are reached.
426    ///
427    /// # Errors
428    ///
429    /// - Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
430    ///   than the table definition requires.
431    /// - Returns any error from [`flush`](Self::flush) when an automatic
432    ///   flush is triggered by reaching the chunk byte/row limit.
433    pub fn end_row(&mut self) -> Result<()> {
434        self.chunk.end_row()?;
435        self.row_count += 1;
436
437        // Auto-flush if we've reached chunk limits
438        if self.chunk.should_flush() {
439            self.flush()?;
440        }
441
442        Ok(())
443    }
444
445    /// Flushes the current buffer to the server.
446    ///
447    /// This sends all buffered rows as a chunk and resets the buffer.
448    /// Called automatically when chunk limits are reached.
449    ///
450    /// # Errors
451    ///
452    /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
453    ///   (COPY is TCP-only) and no COPY session exists yet.
454    /// - Returns [`Error::Server`] if the server rejects the `COPY IN` start
455    ///   or the subsequent data send.
456    /// - Returns [`Error::Io`] on transport-level I/O failures while writing
457    ///   the chunk.
458    pub fn flush(&mut self) -> Result<()> {
459        if self.chunk.is_empty() {
460            return Ok(());
461        }
462
463        let chunk_rows = self.chunk.row_count();
464        let Some(buffer) = self.chunk.take() else {
465            return Ok(());
466        };
467
468        // Ensure the COPY connection is started
469        if self.writer.is_none() {
470            let client = self.connection.tcp_client().ok_or_else(|| {
471                crate::Error::feature_not_supported(
472                    "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
473                )
474            })?;
475            let columns: Vec<&str> = self
476                .table_def
477                .columns
478                .iter()
479                .map(|c| c.name.as_str())
480                .collect();
481            let table_name = self.table_def.qualified_name();
482            self.writer = Some(client.copy_in(&table_name, &columns)?);
483        }
484
485        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
486        // into the connection's write buffer. flush_stream ensures the data
487        // reaches the server before we return.
488        if let Some(ref mut writer) = self.writer {
489            writer.send_direct(&buffer)?;
490            writer.flush_stream()?;
491        }
492
493        debug!(
494            target: "hyperdb_api",
495            chunk = self.chunk_count,
496            rows = chunk_rows,
497            bytes = buffer.len(),
498            "inserter-chunk"
499        );
500
501        self.chunk_count += 1;
502        Ok(())
503    }
504
505    /// Adds a complete row of values.
506    ///
507    /// This is a convenience method that adds all column values at once
508    /// using the `IntoValue` trait for type-safe insertion.
509    ///
510    /// # Arguments
511    ///
512    /// * `values` - A slice of values implementing `IntoValue`.
513    ///
514    /// # Errors
515    ///
516    /// Returns an error if the number of values doesn't match the column count,
517    /// or if any value cannot be added.
518    ///
519    /// # Example
520    ///
521    /// ```no_run
522    /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
523    ///
524    /// fn main() -> Result<()> {
525    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
526    ///
527    ///     let table_def = TableDefinition::new("users")
528    ///         .add_required_column("id", SqlType::int())
529    ///         .add_nullable_column("name", SqlType::text());
530    ///
531    ///     Catalog::new(&conn).create_table(&table_def)?;
532    ///
533    ///     let mut inserter = Inserter::new(&conn, &table_def)?;
534    ///
535    ///     // Add rows using IntoValue trait
536    ///     inserter.add_row(&[&1i32, &"Alice"])?;
537    ///     inserter.add_row(&[&2i32, &"Bob"])?;
538    ///
539    ///     // Option<T> can be used for nullable columns
540    ///     inserter.add_row(&[&3i32, &None::<&str>])?;
541    ///
542    ///     let rows = inserter.execute()?;
543    ///     Ok(())
544    /// }
545    /// ```
546    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
547        let column_count = self.table_def.column_count();
548        if values.len() != column_count {
549            return Err(Error::invalid_table_definition(format!(
550                "Column count mismatch: expected {} columns but got {}",
551                column_count,
552                values.len()
553            )));
554        }
555
556        for value in values {
557            value.add_to_inserter(self)?;
558        }
559
560        self.end_row()?;
561        Ok(())
562    }
563
564    /// Adds a Date value.
565    ///
566    /// # Errors
567    ///
568    /// See [`add_bool`](Self::add_bool).
569    #[inline]
570    pub fn add_date(&mut self, value: Date) -> Result<()> {
571        self.chunk.add_date(value)
572    }
573
574    /// Adds a Time value.
575    ///
576    /// # Errors
577    ///
578    /// See [`add_bool`](Self::add_bool).
579    #[inline]
580    pub fn add_time(&mut self, value: Time) -> Result<()> {
581        self.chunk.add_time(value)
582    }
583
584    /// Adds a Timestamp value.
585    ///
586    /// # Errors
587    ///
588    /// See [`add_bool`](Self::add_bool).
589    #[inline]
590    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
591        self.chunk.add_timestamp(value)
592    }
593
594    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
595    ///
596    /// # Errors
597    ///
598    /// See [`add_bool`](Self::add_bool).
599    #[inline]
600    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
601        self.chunk.add_offset_timestamp(value)
602    }
603
604    /// Adds an Interval value.
605    ///
606    /// # Errors
607    ///
608    /// See [`add_bool`](Self::add_bool).
609    #[inline]
610    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
611        self.chunk.add_interval(value)
612    }
613
614    /// Adds a Geography value.
615    ///
616    /// # Errors
617    ///
618    /// See [`add_bool`](Self::add_bool).
619    #[inline]
620    pub fn add_geography(&mut self, value: &Geography) -> Result<()> {
621        self.chunk.add_geography(value)
622    }
623
624    /// Adds a Numeric value.
625    ///
626    /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
627    /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the column's precision cannot be determined from the
632    /// table definition. Ensure that NUMERIC columns are defined with explicit
633    /// `SqlType` information including precision.
634    pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
635        let column_index = self.chunk.column_index();
636
637        // Check the column's precision to determine storage format
638        let precision = self
639            .table_def
640            .columns
641            .get(column_index)
642            .and_then(super::table_definition::ColumnDefinition::sql_type)
643            .and_then(|t| t.precision())
644            .ok_or_else(|| {
645                let col_name = self
646                    .table_def
647                    .columns
648                    .get(column_index)
649                    .map_or("<unknown>", |c| c.name.as_str());
650                Error::conversion(format!(
651                    "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
652                     Ensure the column is defined with explicit SqlType including precision.\n\n\
653                     Example fix:\n  \
654                     table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
655                ))
656            })?;
657
658        if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
659            // Small numeric: stored as i64
660            let unscaled = value.unscaled_value();
661            let narrowed = i64::try_from(unscaled).map_err(|_| {
662                Error::conversion(format!(
663                    "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
664                ))
665            })?;
666            self.chunk.add_i64(narrowed)
667        } else {
668            // Big numeric: stored as 128-bit
669            self.chunk.add_data128(&value.to_packed())
670        }
671    }
672
673    /// Executes the insert and commits all buffered rows.
674    ///
675    /// This sends any remaining buffered data and finishes the COPY operation.
676    /// Returns the number of rows inserted.
677    ///
678    /// The inserter is single-use: calling `execute` a second time returns
679    /// `Ok(0)` because the internal row counter has been reset and no further
680    /// data has been added. To insert additional batches, create a new
681    /// [`Inserter`].
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if:
686    /// - There's an incomplete row (`column_index` != 0)
687    /// - The COPY connection fails to start
688    /// - Sending data fails
689    pub fn execute(&mut self) -> Result<u64> {
690        if self.chunk.column_index() != 0 {
691            return Err(Error::invalid_table_definition(
692                "Incomplete row at execute time",
693            ));
694        }
695
696        if self.row_count == 0 {
697            return Ok(0);
698        }
699
700        // Ensure COPY connection exists before proceeding when we have rows
701        if self.writer.is_none() {
702            let client = self.connection.tcp_client().ok_or_else(|| {
703                Error::feature_not_supported(
704                    "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
705                )
706            })?;
707            let columns: Vec<&str> = self
708                .table_def
709                .columns
710                .iter()
711                .map(|c| c.name.as_str())
712                .collect();
713            let table_name = self.table_def.qualified_name();
714            self.writer = Some(client.copy_in(&table_name, &columns)?);
715        }
716
717        // At this point, writer must exist since we have rows
718        let writer = self
719            .writer
720            .as_mut()
721            .ok_or_else(|| Error::internal("Failed to initialize COPY connection for inserter"))?;
722
723        // If we have buffered data that hasn't been sent yet
724        if !self.chunk.is_empty() {
725            writer.send(self.chunk.buffer())?;
726        }
727
728        // Write and send the COPY trailer
729        let mut trailer_buf = BytesMut::with_capacity(2);
730        copy::write_trailer(&mut trailer_buf);
731        writer.send(&trailer_buf)?;
732
733        // Finish the COPY operation
734        let rows = self
735            .writer
736            .take()
737            .map(hyperdb_api_core::client::CopyInWriter::finish)
738            .transpose()?
739            .unwrap_or(0);
740
741        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
742        info!(
743            target: "hyperdb_api",
744            rows,
745            chunks = self.chunk_count,
746            duration_ms,
747            table = %self.table_def.qualified_name(),
748            "inserter-end"
749        );
750
751        // Reset row counter so a stray second execute() call returns Ok(0)
752        // instead of attempting another COPY trailer on a finished writer.
753        self.row_count = 0;
754
755        Ok(rows)
756    }
757
758    /// Cancels the insert and discards all buffered rows.
759    pub fn cancel(&mut self) {
760        // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
761        // sends a CopyFail to the server.
762        self.writer = None;
763        self.row_count = 0;
764    }
765}
766
767// =============================================================================
768// ColumnMapping
769// =============================================================================
770
771/// Defines how a column receives its value during insertion.
772///
773/// Column mappings allow you to:
774/// - Insert values directly from the inserter stream
775/// - Compute values using SQL expressions
776/// - Use server-side functions like `NOW()` or `DEFAULT`
777///
778/// # Example
779///
780/// ```
781/// use hyperdb_api::ColumnMapping;
782///
783/// // Simple column - insert value directly
784/// let id_col = ColumnMapping::new("id");
785///
786/// // Column with expression - computed value
787/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
788/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
789/// ```
790#[derive(Debug, Clone)]
791#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
792pub struct ColumnMapping {
793    /// The name of the target column.
794    pub column_name: String,
795    /// Optional SQL expression. If None, the value is inserted directly.
796    pub expression: Option<String>,
797}
798
799impl ColumnMapping {
800    /// Creates a column mapping for direct value insertion.
801    ///
802    /// The column will receive values directly from the inserter.
803    pub fn new(column_name: impl Into<String>) -> Self {
804        ColumnMapping {
805            column_name: column_name.into(),
806            expression: None,
807        }
808    }
809
810    /// Creates a column mapping with a SQL expression.
811    ///
812    /// The column value will be computed using the given SQL expression.
813    /// The expression can reference other columns or use SQL functions.
814    ///
815    /// # Arguments
816    ///
817    /// * `column_name` - The name of the target column.
818    /// * `expression` - A SQL expression to compute the column value.
819    ///
820    /// # Example
821    ///
822    /// ```
823    /// use hyperdb_api::ColumnMapping;
824    ///
825    /// // Use current timestamp
826    /// let created = ColumnMapping::with_expression("created_at", "NOW()");
827    ///
828    /// // Compute from other columns
829    /// let total = ColumnMapping::with_expression("total", "quantity * price");
830    /// ```
831    pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
832        ColumnMapping {
833            column_name: column_name.into(),
834            expression: Some(expression.into()),
835        }
836    }
837
838    /// Returns the column name.
839    #[must_use]
840    pub fn column_name(&self) -> &str {
841        &self.column_name
842    }
843
844    /// Returns the SQL expression, if any.
845    #[must_use]
846    pub fn expression(&self) -> Option<&str> {
847        self.expression.as_deref()
848    }
849
850    /// Returns true if this is a direct value mapping (no expression).
851    #[must_use]
852    pub fn is_direct(&self) -> bool {
853        self.expression.is_none()
854    }
855
856    /// Returns the select list item for this mapping.
857    fn to_select_item(&self) -> String {
858        match &self.expression {
859            Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
860            None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
861        }
862    }
863}
864
865// =============================================================================
866// IntoValue Trait
867// =============================================================================
868
869/// Trait for types that can be inserted into a Hyper table.
870///
871/// This trait is implemented for common Rust types, allowing them to be
872/// used with [`Inserter::add_row()`] for type-safe insertion.
873///
874/// # Supported Types
875///
876/// - Integers: `i16`, `i32`, `i64`
877/// - Floats: `f32`, `f64`
878/// - `bool`
879/// - `&str`, `String`
880/// - `Option<T>` where `T: IntoValue` (for nullable columns)
881/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
882/// - `Numeric`, `Geography`, `Vec<u8>` (bytes)
883///
884/// # Example
885///
886/// ```no_run
887/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
888///
889/// fn main() -> Result<()> {
890///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
891///
892///     let table_def = TableDefinition::new("example")
893///         .add_required_column("a", SqlType::int())
894///         .add_nullable_column("b", SqlType::text())
895///         .add_nullable_column("c", SqlType::double());
896///     Catalog::new(&conn).create_table(&table_def)?;
897///
898///     let mut inserter = Inserter::new(&conn, &table_def)?;
899///
900///     // IntoValue allows adding rows with mixed types
901///     inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
902///     inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
903///
904///     inserter.execute()?;
905///     Ok(())
906/// }
907/// ```
908pub trait IntoValue {
909    /// Adds this value to the inserter.
910    ///
911    /// # Errors
912    ///
913    /// Implementations call the matching `Inserter::add_*` method and
914    /// forward its error — see [`Inserter::add_bool`] for the shared
915    /// failure modes (too many columns, NULL into non-nullable, etc).
916    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
917}
918
919// Implementations for basic types
920
921impl IntoValue for bool {
922    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923        inserter.add_bool(*self)
924    }
925}
926
927impl IntoValue for i16 {
928    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929        inserter.add_i16(*self)
930    }
931}
932
933impl IntoValue for i32 {
934    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935        inserter.add_i32(*self)
936    }
937}
938
939impl IntoValue for i64 {
940    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941        inserter.add_i64(*self)
942    }
943}
944
945impl IntoValue for f32 {
946    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947        inserter.add_f32(*self)
948    }
949}
950
951impl IntoValue for f64 {
952    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953        inserter.add_f64(*self)
954    }
955}
956
957impl IntoValue for str {
958    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959        inserter.add_str(self)
960    }
961}
962
963impl IntoValue for String {
964    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
965        inserter.add_str(self)
966    }
967}
968
969impl IntoValue for [u8] {
970    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
971        inserter.add_bytes(self)
972    }
973}
974
975impl IntoValue for Vec<u8> {
976    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
977        inserter.add_bytes(self)
978    }
979}
980
981// Hyper-specific types
982
983impl IntoValue for Date {
984    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985        inserter.add_date(*self)
986    }
987}
988
989impl IntoValue for Time {
990    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991        inserter.add_time(*self)
992    }
993}
994
995impl IntoValue for Timestamp {
996    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997        inserter.add_timestamp(*self)
998    }
999}
1000
1001impl IntoValue for OffsetTimestamp {
1002    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1003        inserter.add_offset_timestamp(*self)
1004    }
1005}
1006
1007impl IntoValue for Interval {
1008    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1009        inserter.add_interval(*self)
1010    }
1011}
1012
1013impl IntoValue for Numeric {
1014    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1015        inserter.add_numeric(*self)
1016    }
1017}
1018
1019impl IntoValue for Geography {
1020    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021        inserter.add_geography(self)
1022    }
1023}
1024
1025// Option<T> for nullable values
1026impl<T: IntoValue> IntoValue for Option<T> {
1027    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1028        match self {
1029            Some(value) => value.add_to_inserter(inserter),
1030            None => inserter.add_null(),
1031        }
1032    }
1033}
1034
1035// Reference implementations for primitives
1036
1037impl IntoValue for &bool {
1038    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039        inserter.add_bool(**self)
1040    }
1041}
1042
1043impl IntoValue for &i16 {
1044    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045        inserter.add_i16(**self)
1046    }
1047}
1048
1049impl IntoValue for &i32 {
1050    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051        inserter.add_i32(**self)
1052    }
1053}
1054
1055impl IntoValue for &i64 {
1056    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057        inserter.add_i64(**self)
1058    }
1059}
1060
1061impl IntoValue for &f32 {
1062    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063        inserter.add_f32(**self)
1064    }
1065}
1066
1067impl IntoValue for &f64 {
1068    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069        inserter.add_f64(**self)
1070    }
1071}
1072
1073impl IntoValue for &String {
1074    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075        inserter.add_str(self)
1076    }
1077}
1078
1079impl IntoValue for &str {
1080    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081        inserter.add_str(self)
1082    }
1083}
1084
1085impl IntoValue for &&str {
1086    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087        inserter.add_str(self)
1088    }
1089}
1090
1091impl IntoValue for &[u8] {
1092    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093        inserter.add_bytes(self)
1094    }
1095}
1096
1097impl IntoValue for &Date {
1098    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099        inserter.add_date(**self)
1100    }
1101}
1102
1103impl IntoValue for &Time {
1104    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105        inserter.add_time(**self)
1106    }
1107}
1108
1109impl IntoValue for &Timestamp {
1110    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1111        inserter.add_timestamp(**self)
1112    }
1113}
1114
1115impl IntoValue for &OffsetTimestamp {
1116    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1117        inserter.add_offset_timestamp(**self)
1118    }
1119}
1120
1121impl IntoValue for &Interval {
1122    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1123        inserter.add_interval(**self)
1124    }
1125}
1126
1127impl IntoValue for &Numeric {
1128    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1129        inserter.add_numeric(**self)
1130    }
1131}
1132
1133impl IntoValue for &Geography {
1134    fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1135        inserter.add_geography(self)
1136    }
1137}
1138
1139// =============================================================================
1140// MappedInserter
1141// =============================================================================
1142
1143/// An inserter that supports SQL expression mappings.
1144///
1145/// This inserter uses a staging table to support computed columns via
1146/// INSERT...SELECT with SQL expressions. It's created by
1147/// [`Inserter::with_column_mappings`].
1148#[derive(Debug)]
1149pub struct MappedInserter<'conn> {
1150    /// The underlying inserter for the staging table.
1151    inner: Inserter<'conn>,
1152    /// The target table name.
1153    target_table: crate::TableName,
1154    /// The column mappings.
1155    mappings: Vec<ColumnMapping>,
1156    /// The staging table name.
1157    staging_table: String,
1158}
1159
1160impl<'conn> MappedInserter<'conn> {
1161    /// Creates a new mapped inserter.
1162    fn new<T>(
1163        connection: &'conn Connection,
1164        inserter_def: &TableDefinition,
1165        target_table: T,
1166        mappings: &[ColumnMapping],
1167    ) -> Result<Self>
1168    where
1169        T: TryInto<crate::TableName>,
1170        crate::Error: From<T::Error>,
1171    {
1172        let target_table = target_table.try_into()?;
1173
1174        // Create a unique staging table name
1175        let staging_table = format!("_hyper_staging_{}", std::process::id());
1176
1177        // Create the staging table definition (temporary)
1178        let mut staging_def = inserter_def.clone();
1179        staging_def.name.clone_from(&staging_table);
1180
1181        // Create the staging table
1182        let create_sql = staging_def.to_create_sql(true)?;
1183        let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1184        connection.execute_command(&create_temp)?;
1185
1186        // Create the inner inserter for the staging table
1187        let inner = Inserter::new(connection, &staging_def)?;
1188
1189        Ok(MappedInserter {
1190            inner,
1191            target_table,
1192            mappings: mappings.to_vec(),
1193            staging_table,
1194        })
1195    }
1196
1197    /// Adds a row of values to the inserter.
1198    ///
1199    /// The values should correspond to the columns in the inserter definition,
1200    /// not the target table.
1201    ///
1202    /// # Errors
1203    ///
1204    /// Forwards the error from [`Inserter::add_row`].
1205    pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1206        self.inner.add_row(values)
1207    }
1208
1209    /// Adds a NULL value.
1210    ///
1211    /// # Errors
1212    ///
1213    /// Forwards the error from [`Inserter::add_null`].
1214    pub fn add_null(&mut self) -> Result<()> {
1215        self.inner.add_null()
1216    }
1217
1218    /// Adds a boolean value.
1219    ///
1220    /// # Errors
1221    ///
1222    /// Forwards the error from [`Inserter::add_bool`].
1223    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1224        self.inner.add_bool(value)
1225    }
1226
1227    /// Adds an i16 value.
1228    ///
1229    /// # Errors
1230    ///
1231    /// Forwards the error from [`Inserter::add_i16`].
1232    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1233        self.inner.add_i16(value)
1234    }
1235
1236    /// Adds an i32 value.
1237    ///
1238    /// # Errors
1239    ///
1240    /// Forwards the error from [`Inserter::add_i32`].
1241    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1242        self.inner.add_i32(value)
1243    }
1244
1245    /// Adds an i64 value.
1246    ///
1247    /// # Errors
1248    ///
1249    /// Forwards the error from [`Inserter::add_i64`].
1250    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1251        self.inner.add_i64(value)
1252    }
1253
1254    /// Adds an f32 value.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Forwards the error from [`Inserter::add_f32`].
1259    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1260        self.inner.add_f32(value)
1261    }
1262
1263    /// Adds an f64 value.
1264    ///
1265    /// # Errors
1266    ///
1267    /// Forwards the error from [`Inserter::add_f64`].
1268    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1269        self.inner.add_f64(value)
1270    }
1271
1272    /// Adds a string value.
1273    ///
1274    /// # Errors
1275    ///
1276    /// Forwards the error from [`Inserter::add_str`].
1277    pub fn add_str(&mut self, value: &str) -> Result<()> {
1278        self.inner.add_str(value)
1279    }
1280
1281    /// Adds a bytes value.
1282    ///
1283    /// # Errors
1284    ///
1285    /// Forwards the error from [`Inserter::add_bytes`].
1286    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1287        self.inner.add_bytes(value)
1288    }
1289
1290    /// Ends the current row.
1291    ///
1292    /// # Errors
1293    ///
1294    /// Forwards the error from [`Inserter::end_row`].
1295    pub fn end_row(&mut self) -> Result<()> {
1296        self.inner.end_row()
1297    }
1298
1299    /// Executes the insert with column mappings.
1300    ///
1301    /// This method:
1302    /// 1. Inserts all buffered rows into the staging table
1303    /// 2. Executes INSERT...SELECT from staging to target with mappings
1304    /// 3. Drops the staging table
1305    ///
1306    /// Returns the number of rows inserted into the target table.
1307    ///
1308    /// # Errors
1309    ///
1310    /// - Returns the error from the inner [`Inserter::execute`] if writing
1311    ///   the staging rows fails.
1312    /// - Returns [`Error::Server`] if the `INSERT ... SELECT` from staging
1313    ///   to the target table is rejected (e.g. a mapping expression fails
1314    ///   to evaluate).
1315    /// - Returns [`Error::Server`] if dropping the staging table fails.
1316    pub fn execute(&mut self) -> Result<u64> {
1317        let connection = self.inner.connection;
1318        let staging_table = self.staging_table.clone();
1319
1320        // Insert data into staging table
1321        let _staging_rows = self.inner.execute()?;
1322
1323        // Build the INSERT...SELECT statement
1324        use hyperdb_api_core::protocol::escape::SqlIdentifier;
1325
1326        let target_columns: Vec<String> = self
1327            .mappings
1328            .iter()
1329            .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1330            .collect();
1331
1332        let select_items: Vec<String> = self
1333            .mappings
1334            .iter()
1335            .map(ColumnMapping::to_select_item)
1336            .collect();
1337
1338        let sql = format!(
1339            "INSERT INTO {} ({}) SELECT {} FROM {}",
1340            self.target_table,
1341            target_columns.join(", "),
1342            select_items.join(", "),
1343            SqlIdentifier(&staging_table),
1344        );
1345
1346        // Execute the INSERT...SELECT (returns row count directly)
1347        let row_count = connection.execute_command(&sql)?;
1348
1349        // Drop the staging table
1350        connection.execute_command(&format!(
1351            "DROP TABLE IF EXISTS {}",
1352            SqlIdentifier(&staging_table)
1353        ))?;
1354
1355        // Return the number of rows inserted
1356        Ok(row_count)
1357    }
1358
1359    /// Cancels the insert and drops the staging table.
1360    ///
1361    /// This method handles cleanup failures gracefully by logging warnings
1362    /// instead of returning errors. This prevents masking the original error
1363    /// that caused the cancellation.
1364    ///
1365    /// # Logging
1366    ///
1367    /// Cleanup failures are logged using the `tracing` crate at WARN level.
1368    /// If `tracing` is not initialized, errors are written to stderr.
1369    pub fn cancel(&mut self) {
1370        let connection = self.inner.connection;
1371        let staging_table = &self.staging_table;
1372
1373        // Drop the staging table, but don't fail if cleanup fails
1374        // This avoids masking the original error that caused cancellation
1375        if let Err(e) = connection.execute_command(&format!(
1376            "DROP TABLE IF EXISTS \"{}\"",
1377            staging_table.replace('"', "\"\"")
1378        )) {
1379            // Log the cleanup failure for debugging
1380            // In production, consider using a logging framework like `tracing`
1381            eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1382        }
1383    }
1384}
1385
1386// =============================================================================
1387// InsertChunk - Thread-safe chunk for parallel encoding
1388// =============================================================================
1389
1390/// A thread-safe chunk for encoding rows in parallel.
1391///
1392/// `InsertChunk` can be created and populated in any thread, then sent to a
1393/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1394/// multiple worker threads while serializing the actual network sends.
1395///
1396/// # Example
1397///
1398/// ```no_run
1399/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1400///
1401/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1402///     let mut chunk = InsertChunk::from_table_definition(table_def);
1403///     
1404///     for i in 0..1000 {
1405///         chunk.add_i32(start_id + i)?;
1406///         chunk.add_str(&format!("Item {}", start_id + i))?;
1407///         chunk.end_row()?;
1408///     }
1409///     
1410///     Ok(chunk)
1411/// }
1412/// ```
1413#[derive(Debug)]
1414pub struct InsertChunk {
1415    buffer: BytesMut,
1416    header_written: bool,
1417    column_index: usize,
1418    column_count: usize,
1419    row_count: usize,
1420    column_nullable: Vec<bool>,
1421}
1422
1423// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1424// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1425// thread-local state. The manual `unsafe impl` exists only because the
1426// auto-trait derivation is conservative for this struct's compilation context;
1427// the compound type has no `!Send` components.
1428unsafe impl Send for InsertChunk {}
1429// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1430// and there is no interior mutability crossing a `&InsertChunk` boundary,
1431// so sharing `&InsertChunk` across threads is sound.
1432unsafe impl Sync for InsertChunk {}
1433
1434impl InsertChunk {
1435    /// Creates a new empty chunk with the given schema.
1436    ///
1437    /// # Arguments
1438    ///
1439    /// * `column_count` - Number of columns per row
1440    /// * `column_nullable` - Whether each column is nullable
1441    #[must_use]
1442    pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1443        debug_assert_eq!(column_count, column_nullable.len());
1444        InsertChunk {
1445            buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1446            header_written: false,
1447            column_index: 0,
1448            column_count,
1449            row_count: 0,
1450            column_nullable,
1451        }
1452    }
1453
1454    /// Creates a chunk from a table definition.
1455    #[must_use]
1456    pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1457        let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1458        Self::new(table_def.column_count(), column_nullable)
1459    }
1460
1461    /// Returns the number of complete rows in this chunk.
1462    #[must_use]
1463    pub fn row_count(&self) -> usize {
1464        self.row_count
1465    }
1466
1467    /// Returns the current buffer size in bytes.
1468    #[must_use]
1469    pub fn buffer_size(&self) -> usize {
1470        self.buffer.len()
1471    }
1472
1473    /// Returns true if the chunk has reached size or row limits and should be sent.
1474    #[must_use]
1475    pub fn should_flush(&self) -> bool {
1476        self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1477    }
1478
1479    /// Returns true if the chunk is empty (no rows).
1480    #[must_use]
1481    pub fn is_empty(&self) -> bool {
1482        self.row_count == 0
1483    }
1484
1485    /// Takes the buffer, consuming the chunk data.
1486    ///
1487    /// Returns `None` if the chunk is empty. After calling this, the chunk
1488    /// can be reused by calling the add_* methods again.
1489    ///
1490    /// Note: The header flag is NOT reset - subsequent chunks from the same
1491    /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1492    /// header per COPY stream).
1493    pub fn take(&mut self) -> Option<BytesMut> {
1494        if self.row_count == 0 {
1495            return None;
1496        }
1497        // Don't reset header_written - only first chunk should have header
1498        self.row_count = 0;
1499        Some(std::mem::take(&mut self.buffer))
1500    }
1501
1502    /// Resets the chunk for reuse without reallocating.
1503    pub fn clear(&mut self) {
1504        self.buffer.clear();
1505        self.header_written = false;
1506        self.column_index = 0;
1507        self.row_count = 0;
1508    }
1509
1510    #[allow(
1511        clippy::inline_always,
1512        reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1513    )]
1514    fn ensure_header(&mut self) {
1515        if !self.header_written {
1516            copy::write_header(&mut self.buffer);
1517            self.header_written = true;
1518        }
1519    }
1520
1521    #[expect(
1522        clippy::inline_always,
1523        reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1524    )]
1525    #[inline(always)]
1526    fn current_column_nullable(&self) -> bool {
1527        *self.column_nullable.get(self.column_index).unwrap_or(&true)
1528    }
1529
1530    /// Adds a NULL value for the current column.
1531    ///
1532    /// # Errors
1533    ///
1534    /// - Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"`
1535    ///   if the current row already has all columns supplied.
1536    /// - Returns [`Error::InvalidTableDefinition`] with message
1537    ///   `"Cannot add NULL to non-nullable column"` if the current column
1538    ///   is `NOT NULL` in the schema.
1539    pub fn add_null(&mut self) -> Result<()> {
1540        if self.column_index >= self.column_count {
1541            return Err(Error::invalid_table_definition("Too many columns in row"));
1542        }
1543        if !self.current_column_nullable() {
1544            return Err(Error::invalid_table_definition(
1545                "Cannot add NULL to non-nullable column",
1546            ));
1547        }
1548        self.ensure_header();
1549        copy::write_null(&mut self.buffer);
1550        self.column_index += 1;
1551        Ok(())
1552    }
1553
1554    /// Adds a boolean value.
1555    ///
1556    /// # Errors
1557    ///
1558    /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
1559    /// the current row already has all columns supplied.
1560    pub fn add_bool(&mut self, value: bool) -> Result<()> {
1561        if self.column_index >= self.column_count {
1562            return Err(Error::invalid_table_definition("Too many columns in row"));
1563        }
1564        self.ensure_header();
1565        let int_value = i8::from(value);
1566        if self.current_column_nullable() {
1567            copy::write_i8(&mut self.buffer, int_value);
1568        } else {
1569            copy::write_i8_not_null(&mut self.buffer, int_value);
1570        }
1571        self.column_index += 1;
1572        Ok(())
1573    }
1574
1575    /// Adds an i16 value (SMALLINT).
1576    ///
1577    /// # Errors
1578    ///
1579    /// See [`add_bool`](Self::add_bool).
1580    pub fn add_i16(&mut self, value: i16) -> Result<()> {
1581        if self.column_index >= self.column_count {
1582            return Err(Error::invalid_table_definition("Too many columns in row"));
1583        }
1584        self.ensure_header();
1585        if self.current_column_nullable() {
1586            copy::write_i16(&mut self.buffer, value);
1587        } else {
1588            copy::write_i16_not_null(&mut self.buffer, value);
1589        }
1590        self.column_index += 1;
1591        Ok(())
1592    }
1593
1594    /// Adds an i32 value (INT).
1595    ///
1596    /// # Errors
1597    ///
1598    /// See [`add_bool`](Self::add_bool).
1599    pub fn add_i32(&mut self, value: i32) -> Result<()> {
1600        if self.column_index >= self.column_count {
1601            return Err(Error::invalid_table_definition("Too many columns in row"));
1602        }
1603        self.ensure_header();
1604        if self.current_column_nullable() {
1605            copy::write_i32(&mut self.buffer, value);
1606        } else {
1607            copy::write_i32_not_null(&mut self.buffer, value);
1608        }
1609        self.column_index += 1;
1610        Ok(())
1611    }
1612
1613    /// Adds an i64 value (BIGINT).
1614    ///
1615    /// # Errors
1616    ///
1617    /// See [`add_bool`](Self::add_bool).
1618    pub fn add_i64(&mut self, value: i64) -> Result<()> {
1619        if self.column_index >= self.column_count {
1620            return Err(Error::invalid_table_definition("Too many columns in row"));
1621        }
1622        self.ensure_header();
1623        if self.current_column_nullable() {
1624            copy::write_i64(&mut self.buffer, value);
1625        } else {
1626            copy::write_i64_not_null(&mut self.buffer, value);
1627        }
1628        self.column_index += 1;
1629        Ok(())
1630    }
1631
1632    /// Adds an f32 value (REAL/FLOAT4).
1633    ///
1634    /// # Errors
1635    ///
1636    /// See [`add_bool`](Self::add_bool).
1637    pub fn add_f32(&mut self, value: f32) -> Result<()> {
1638        if self.column_index >= self.column_count {
1639            return Err(Error::invalid_table_definition("Too many columns in row"));
1640        }
1641        self.ensure_header();
1642        if self.current_column_nullable() {
1643            copy::write_f32(&mut self.buffer, value);
1644        } else {
1645            copy::write_f32_not_null(&mut self.buffer, value);
1646        }
1647        self.column_index += 1;
1648        Ok(())
1649    }
1650
1651    /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1652    ///
1653    /// # Errors
1654    ///
1655    /// See [`add_bool`](Self::add_bool).
1656    pub fn add_f64(&mut self, value: f64) -> Result<()> {
1657        if self.column_index >= self.column_count {
1658            return Err(Error::invalid_table_definition("Too many columns in row"));
1659        }
1660        self.ensure_header();
1661        if self.current_column_nullable() {
1662            copy::write_f64(&mut self.buffer, value);
1663        } else {
1664            copy::write_f64_not_null(&mut self.buffer, value);
1665        }
1666        self.column_index += 1;
1667        Ok(())
1668    }
1669
1670    /// Adds a string value (TEXT/VARCHAR).
1671    ///
1672    /// # Errors
1673    ///
1674    /// See [`add_bool`](Self::add_bool).
1675    pub fn add_str(&mut self, value: &str) -> Result<()> {
1676        self.add_bytes(value.as_bytes())
1677    }
1678
1679    /// Adds a bytes value (BYTEA).
1680    ///
1681    /// # Errors
1682    ///
1683    /// See [`add_bool`](Self::add_bool).
1684    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1685        if self.column_index >= self.column_count {
1686            return Err(Error::invalid_table_definition("Too many columns in row"));
1687        }
1688        if value.len() > u32::MAX as usize {
1689            return Err(Error::conversion(format!(
1690                "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1691                value.len(),
1692                u32::MAX
1693            )));
1694        }
1695        self.ensure_header();
1696        if self.current_column_nullable() {
1697            copy::write_varbinary(&mut self.buffer, value);
1698        } else {
1699            copy::write_varbinary_not_null(&mut self.buffer, value);
1700        }
1701        self.column_index += 1;
1702        Ok(())
1703    }
1704
1705    /// Adds a 128-bit value (NUMERIC/INTERVAL).
1706    ///
1707    /// # Errors
1708    ///
1709    /// See [`add_bool`](Self::add_bool).
1710    pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1711        if self.column_index >= self.column_count {
1712            return Err(Error::invalid_table_definition("Too many columns in row"));
1713        }
1714        self.ensure_header();
1715        if self.current_column_nullable() {
1716            copy::write_data128(&mut self.buffer, value);
1717        } else {
1718            copy::write_data128_not_null(&mut self.buffer, value);
1719        }
1720        self.column_index += 1;
1721        Ok(())
1722    }
1723
1724    /// Adds a Date value.
1725    ///
1726    /// # Errors
1727    ///
1728    /// See [`add_bool`](Self::add_bool).
1729    pub fn add_date(&mut self, value: Date) -> Result<()> {
1730        if self.column_index >= self.column_count {
1731            return Err(Error::invalid_table_definition("Too many columns in row"));
1732        }
1733        self.ensure_header();
1734        let julian_day = value.to_julian_day();
1735        if self.current_column_nullable() {
1736            copy::write_i32(&mut self.buffer, julian_day);
1737        } else {
1738            copy::write_i32_not_null(&mut self.buffer, julian_day);
1739        }
1740        self.column_index += 1;
1741        Ok(())
1742    }
1743
1744    /// Adds a Time value.
1745    ///
1746    /// # Errors
1747    ///
1748    /// See [`add_bool`](Self::add_bool).
1749    pub fn add_time(&mut self, value: Time) -> Result<()> {
1750        if self.column_index >= self.column_count {
1751            return Err(Error::invalid_table_definition("Too many columns in row"));
1752        }
1753        self.ensure_header();
1754        let micros = value.to_microseconds();
1755        if self.current_column_nullable() {
1756            copy::write_i64(&mut self.buffer, micros);
1757        } else {
1758            copy::write_i64_not_null(&mut self.buffer, micros);
1759        }
1760        self.column_index += 1;
1761        Ok(())
1762    }
1763
1764    /// Adds a Timestamp value.
1765    ///
1766    /// # Errors
1767    ///
1768    /// See [`add_bool`](Self::add_bool).
1769    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1770        if self.column_index >= self.column_count {
1771            return Err(Error::invalid_table_definition("Too many columns in row"));
1772        }
1773        self.ensure_header();
1774        let micros = value.to_microseconds();
1775        if self.current_column_nullable() {
1776            copy::write_i64(&mut self.buffer, micros);
1777        } else {
1778            copy::write_i64_not_null(&mut self.buffer, micros);
1779        }
1780        self.column_index += 1;
1781        Ok(())
1782    }
1783
1784    /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1785    ///
1786    /// # Errors
1787    ///
1788    /// See [`add_bool`](Self::add_bool).
1789    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1790        if self.column_index >= self.column_count {
1791            return Err(Error::invalid_table_definition("Too many columns in row"));
1792        }
1793        self.ensure_header();
1794        let micros = value.to_microseconds_utc();
1795        if self.current_column_nullable() {
1796            copy::write_i64(&mut self.buffer, micros);
1797        } else {
1798            copy::write_i64_not_null(&mut self.buffer, micros);
1799        }
1800        self.column_index += 1;
1801        Ok(())
1802    }
1803
1804    /// Adds an Interval value.
1805    ///
1806    /// # Errors
1807    ///
1808    /// See [`add_bool`](Self::add_bool).
1809    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1810        if self.column_index >= self.column_count {
1811            return Err(Error::invalid_table_definition("Too many columns in row"));
1812        }
1813        self.ensure_header();
1814        let packed = value.to_packed();
1815        if self.current_column_nullable() {
1816            copy::write_data128(&mut self.buffer, &packed);
1817        } else {
1818            copy::write_data128_not_null(&mut self.buffer, &packed);
1819        }
1820        self.column_index += 1;
1821        Ok(())
1822    }
1823
1824    /// Adds a Geography value.
1825    ///
1826    /// # Errors
1827    ///
1828    /// See [`add_bool`](Self::add_bool).
1829    pub fn add_geography(&mut self, value: &Geography) -> Result<()> {
1830        // Geography uses the same varbinary path as add_bytes
1831        self.add_bytes(value.as_bytes())
1832    }
1833
1834    /// Ends the current row.
1835    ///
1836    /// Returns an error if the wrong number of columns were added.
1837    ///
1838    /// # Errors
1839    ///
1840    /// Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
1841    /// for this row than the chunk's column count.
1842    pub fn end_row(&mut self) -> Result<()> {
1843        if self.column_index != self.column_count {
1844            return Err(Error::invalid_table_definition(format!(
1845                "Expected {} columns, got {}",
1846                self.column_count, self.column_index
1847            )));
1848        }
1849        self.column_index = 0;
1850        self.row_count += 1;
1851        Ok(())
1852    }
1853
1854    /// Returns the current column index (for checking incomplete rows).
1855    #[must_use]
1856    pub fn column_index(&self) -> usize {
1857        self.column_index
1858    }
1859
1860    /// Returns a reference to the internal buffer.
1861    pub(crate) fn buffer(&self) -> &BytesMut {
1862        &self.buffer
1863    }
1864}
1865
1866// =============================================================================
1867// ChunkSender - Mutex-protected sender for InsertChunks
1868// =============================================================================
1869
1870use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1871use std::sync::Mutex;
1872
1873/// A thread-safe sender for [`InsertChunk`]s.
1874///
1875/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1876/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1877/// the mutex ensures serialized access.
1878///
1879/// # Example
1880///
1881/// ```no_run
1882/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1883/// use std::sync::mpsc;
1884/// use std::thread;
1885///
1886/// fn main() -> Result<()> {
1887///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1888///     
1889///     let table_def = TableDefinition::new("products")
1890///         .add_required_column("id", SqlType::int())
1891///         .add_nullable_column("name", SqlType::text());
1892///     
1893///     Catalog::new(&conn).create_table(&table_def)?;
1894///     
1895///     let sender = ChunkSender::new(&conn, &table_def)?;
1896///     let (tx, rx) = mpsc::channel::<InsertChunk>();
1897///     
1898///     // Worker thread
1899///     let table_def_clone = table_def.clone();
1900///     let handle = thread::spawn(move || {
1901///         let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1902///         for i in 0..1000i32 {
1903///             chunk.add_i32(i).unwrap();
1904///             chunk.add_str(&format!("Product {}", i)).unwrap();
1905///             chunk.end_row().unwrap();
1906///         }
1907///         tx.send(chunk).unwrap();
1908///     });
1909///     
1910///     // Receive and send chunks
1911///     while let Ok(chunk) = rx.recv() {
1912///         sender.send_chunk(chunk)?;
1913///     }
1914///     
1915///     handle.join().unwrap();
1916///     let rows = sender.finish()?;
1917///     println!("Inserted {} rows", rows);
1918///     Ok(())
1919/// }
1920/// ```
1921#[derive(Debug)]
1922pub struct ChunkSender<'conn> {
1923    connection: &'conn Connection,
1924    table_name: String,
1925    columns: Vec<String>,
1926    writer: Mutex<Option<CopyInWriter<'conn>>>,
1927    header_sent: std::sync::atomic::AtomicBool,
1928    total_rows: AtomicU64,
1929    chunks_sent: AtomicUsize,
1930}
1931
1932impl<'conn> ChunkSender<'conn> {
1933    /// Creates a new chunk sender for the given table.
1934    ///
1935    /// # Errors
1936    ///
1937    /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1938    /// columns. The COPY session itself is opened lazily on the first
1939    /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1940    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1941        if table_def.column_count() == 0 {
1942            return Err(Error::invalid_table_definition(
1943                "Table definition must have at least one column",
1944            ));
1945        }
1946
1947        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1948        let table_name = table_def.qualified_name();
1949
1950        Ok(ChunkSender {
1951            connection,
1952            table_name,
1953            columns,
1954            writer: Mutex::new(None),
1955            header_sent: std::sync::atomic::AtomicBool::new(false),
1956            total_rows: AtomicU64::new(0),
1957            chunks_sent: AtomicUsize::new(0),
1958        })
1959    }
1960
1961    /// Sends a chunk to Hyper.
1962    ///
1963    /// This method is thread-safe - multiple threads can call it concurrently,
1964    /// but only one chunk will be sent at a time.
1965    ///
1966    /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1967    /// automatically handles headers: the first chunk's header is sent, and
1968    /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1969    /// header per COPY stream).
1970    ///
1971    /// # Errors
1972    ///
1973    /// Returns an error if the chunk is empty or if sending fails.
1974    pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1975        // Capture row count before take() resets it
1976        let row_count = chunk.row_count();
1977
1978        let Some(buffer) = chunk.take() else {
1979            return Ok(());
1980        };
1981
1982        // Acquire the lock for exclusive send access
1983        let mut writer_guard = self
1984            .writer
1985            .lock()
1986            .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
1987
1988        // Lazily initialize the COPY connection
1989        if writer_guard.is_none() {
1990            let client = self.connection.tcp_client().ok_or_else(|| {
1991                Error::feature_not_supported(
1992                    "ChunkSender requires a TCP connection. gRPC connections do not support COPY operations."
1993                )
1994            })?;
1995            let columns: Vec<&str> = self
1996                .columns
1997                .iter()
1998                .map(std::string::String::as_str)
1999                .collect();
2000            *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
2001        }
2002
2003        // Handle headers: only first chunk should have header in the COPY stream
2004        // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
2005        // strip headers from all chunks except the first one sent.
2006        let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
2007
2008        let data_to_send = if is_first {
2009            // First chunk: send with header
2010            &buffer[..]
2011        } else {
2012            // Subsequent chunks: strip the 19-byte header if present
2013            if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
2014                && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
2015            {
2016                &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
2017            } else {
2018                &buffer[..]
2019            }
2020        };
2021
2022        // Write the chunk directly to the socket, avoiding a full-chunk memcpy
2023        // into the connection's write buffer. flush_stream ensures the data
2024        // reaches the server before we return.
2025        if let Some(ref mut writer) = *writer_guard {
2026            writer.send_direct(data_to_send)?;
2027            writer.flush_stream()?;
2028        }
2029
2030        // Update counters (lock already released for these atomic ops)
2031        drop(writer_guard);
2032        self.total_rows
2033            .fetch_add(row_count as u64, Ordering::Relaxed);
2034        self.chunks_sent.fetch_add(1, Ordering::Relaxed);
2035
2036        debug!(
2037            target: "hyperdb_api",
2038            chunk = self.chunks_sent.load(Ordering::Relaxed),
2039            rows = row_count,
2040            bytes = data_to_send.len(),
2041            "chunk-sender"
2042        );
2043
2044        Ok(())
2045    }
2046
2047    /// Returns the total number of rows sent so far.
2048    pub fn total_rows(&self) -> u64 {
2049        self.total_rows.load(Ordering::Relaxed)
2050    }
2051
2052    /// Returns the number of chunks sent so far.
2053    pub fn chunks_sent(&self) -> usize {
2054        self.chunks_sent.load(Ordering::Relaxed)
2055    }
2056
2057    /// Finishes the COPY operation and returns the total row count.
2058    ///
2059    /// This method consumes the sender. After calling this, the COPY operation
2060    /// is complete and all data has been committed.
2061    ///
2062    /// # Errors
2063    ///
2064    /// - Returns [`Error::Internal`] with message `"ChunkSender mutex poisoned"`
2065    ///   if a sender thread panicked while holding the writer lock.
2066    /// - Returns [`Error::Server`] or [`Error::Io`] if sending the COPY
2067    ///   trailer or finishing the COPY operation fails.
2068    pub fn finish(self) -> Result<u64> {
2069        let mut writer_guard = self
2070            .writer
2071            .lock()
2072            .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
2073
2074        // If no chunks were sent, return 0
2075        let Some(writer) = writer_guard.take() else {
2076            return Ok(0);
2077        };
2078
2079        // Write and send the COPY trailer
2080        let mut trailer_buf = BytesMut::with_capacity(2);
2081        copy::write_trailer(&mut trailer_buf);
2082
2083        // Need to get mutable access to send trailer
2084        let mut writer = writer;
2085        writer.send(&trailer_buf)?;
2086
2087        // Finish the COPY operation
2088        let rows = writer.finish()?;
2089
2090        info!(
2091            target: "hyperdb_api",
2092            rows,
2093            chunks = self.chunks_sent.load(Ordering::Relaxed),
2094            table = %self.table_name,
2095            "chunk-sender-finish"
2096        );
2097
2098        Ok(rows)
2099    }
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104    use crate::table_definition::TableDefinition;
2105    use hyperdb_api_core::types::SqlType;
2106
2107    use super::InsertChunk;
2108
2109    fn create_test_table_def() -> TableDefinition {
2110        TableDefinition::new("test")
2111            .add_required_column("id", SqlType::int())
2112            .add_nullable_column("name", SqlType::text())
2113    }
2114
2115    #[test]
2116    fn test_inserter_column_validation() {
2117        // We can't fully test without a connection, but we can test validation logic
2118        let table_def = create_test_table_def();
2119        assert_eq!(table_def.column_count(), 2);
2120    }
2121
2122    #[test]
2123    fn test_insert_chunk_encoding() {
2124        let table_def = create_test_table_def();
2125        let mut chunk = InsertChunk::from_table_definition(&table_def);
2126
2127        // Add a row
2128        chunk.add_i32(42).unwrap();
2129        chunk.add_str("hello").unwrap();
2130        chunk.end_row().unwrap();
2131
2132        assert_eq!(chunk.row_count(), 1);
2133        assert!(!chunk.is_empty());
2134        assert!(!chunk.should_flush()); // Not at limit yet
2135
2136        // Add more rows
2137        for i in 0..100 {
2138            chunk.add_i32(i).unwrap();
2139            chunk.add_str(&format!("item {i}")).unwrap();
2140            chunk.end_row().unwrap();
2141        }
2142
2143        assert_eq!(chunk.row_count(), 101);
2144
2145        // Take the buffer
2146        let buffer = chunk.take().unwrap();
2147        assert!(!buffer.is_empty());
2148
2149        // Chunk should now be empty after take
2150        assert!(chunk.take().is_none());
2151    }
2152
2153    #[test]
2154    fn test_insert_chunk_null_handling() {
2155        let table_def = create_test_table_def();
2156        let mut chunk = InsertChunk::from_table_definition(&table_def);
2157
2158        // First column is NOT NULL, should fail
2159        assert!(chunk.add_null().is_err());
2160
2161        // Add the required column first
2162        chunk.add_i32(1).unwrap();
2163
2164        // Second column is nullable, should succeed
2165        chunk.add_null().unwrap();
2166        chunk.end_row().unwrap();
2167
2168        assert_eq!(chunk.row_count(), 1);
2169    }
2170
2171    #[test]
2172    fn test_insert_chunk_column_count_validation() {
2173        let table_def = create_test_table_def();
2174        let mut chunk = InsertChunk::from_table_definition(&table_def);
2175
2176        // Add only one column
2177        chunk.add_i32(1).unwrap();
2178
2179        // end_row should fail
2180        assert!(chunk.end_row().is_err());
2181
2182        // Add second column
2183        chunk.add_str("test").unwrap();
2184
2185        // Now end_row should succeed
2186        chunk.end_row().unwrap();
2187    }
2188
2189    #[test]
2190    fn test_insert_chunk_too_many_columns() {
2191        let table_def = create_test_table_def();
2192        let mut chunk = InsertChunk::from_table_definition(&table_def);
2193
2194        chunk.add_i32(1).unwrap();
2195        chunk.add_str("test").unwrap();
2196
2197        // Third column should fail
2198        assert!(chunk.add_i32(2).is_err());
2199    }
2200
2201    #[test]
2202    fn test_insert_chunk_clear() {
2203        let table_def = create_test_table_def();
2204        let mut chunk = InsertChunk::from_table_definition(&table_def);
2205
2206        chunk.add_i32(1).unwrap();
2207        chunk.add_str("test").unwrap();
2208        chunk.end_row().unwrap();
2209
2210        assert_eq!(chunk.row_count(), 1);
2211
2212        chunk.clear();
2213
2214        assert_eq!(chunk.row_count(), 0);
2215        assert!(chunk.is_empty());
2216    }
2217}