hyperdb_api/inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//! inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
29use tracing::{debug, info};
30
31use crate::catalog::Catalog;
32use crate::connection::Connection;
33use crate::error::{Error, Result};
34use crate::table_definition::TableDefinition;
35
36/// Initial buffer size (4 MB) to reduce early reallocations.
37///
38/// The COPY protocol sends data in chunks, and each chunk requires a
39/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
40/// during the first chunk for typical workloads while keeping initial
41/// memory allocation reasonable.
42const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
43
44/// Maximum buffer size per chunk before flushing to the server (16 MB).
45///
46/// This balances two competing concerns:
47/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
48/// network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
49/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
50/// keeps resident memory bounded even when rows are wide.
51///
52/// The 16 MB value was chosen empirically — it lands on the flat part of
53/// the throughput curve where further increases yield diminishing returns.
54const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
55
56/// Maximum rows per chunk before flushing to the server.
57///
58/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
59/// narrow rows (few bytes each), the byte limit alone would accumulate
60/// millions of rows before flushing, which delays server-side processing.
61/// 64K rows ensures timely flushes regardless of row width and aligns with
62/// the 64K chunk size used for query result streaming
63/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
64const CHUNK_ROW_LIMIT: usize = 64_000;
65
66/// A high-performance bulk data inserter.
67///
68/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
69/// using the COPY protocol with `HyperBinary` format for optimal performance.
70///
71/// # Example
72///
73/// ```no_run
74/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
75///
76/// fn main() -> Result<()> {
77/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
78///
79/// let table_def = TableDefinition::new("users")
80/// .add_required_column("id", SqlType::int())
81/// .add_nullable_column("name", SqlType::text());
82///
83/// Catalog::new(&conn).create_table(&table_def)?;
84///
85/// let mut inserter = Inserter::new(&conn, &table_def)?;
86///
87/// for i in 0..1000i32 {
88/// inserter.add_row(&[&i, &format!("User {}", i)])?;
89/// }
90///
91/// let rows = inserter.execute()?;
92/// println!("Inserted {} rows", rows);
93/// Ok(())
94/// }
95/// ```
96#[derive(Debug)]
97pub struct Inserter<'conn> {
98 connection: &'conn Connection,
99 table_def: TableDefinition,
100 /// The current chunk being populated (delegates encoding).
101 chunk: InsertChunk,
102 /// Total rows inserted across all chunks.
103 row_count: u64,
104 /// Number of chunks sent.
105 chunk_count: usize,
106 /// Active COPY writer (lazily initialized on first write).
107 writer: Option<CopyInWriter<'conn>>,
108 /// Start time for timing the insert operation.
109 start_time: Instant,
110}
111
112impl<'conn> Inserter<'conn> {
113 /// Creates a new inserter for the given table.
114 ///
115 /// The underlying COPY session is started lazily on the first flush or
116 /// execute, so construction is lightweight. However, the connection's
117 /// transport is validated eagerly — using a gRPC connection will return
118 /// an error immediately.
119 ///
120 /// # Errors
121 ///
122 /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
123 /// columns.
124 /// - Returns [`Error::Other`] if `connection` is using gRPC transport
125 /// (COPY is TCP-only).
126 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
127 if table_def.column_count() == 0 {
128 return Err(Error::InvalidTableDefinition(
129 "Table definition must have at least one column".into(),
130 ));
131 }
132
133 // Fail fast: verify the connection supports COPY (TCP only)
134 if connection.tcp_client().is_none() {
135 return Err(Error::new(
136 "Inserter requires a TCP connection. \
137 gRPC connections do not support COPY operations.",
138 ));
139 }
140
141 Ok(Inserter {
142 connection,
143 table_def: table_def.clone(),
144 chunk: InsertChunk::from_table_definition(table_def),
145 row_count: 0,
146 chunk_count: 0,
147 writer: None,
148 start_time: Instant::now(),
149 })
150 }
151
152 /// Creates an inserter by querying the table schema from the database.
153 ///
154 /// This method queries the database to get the table definition automatically,
155 /// which is useful when you want to insert into an existing table without
156 /// manually specifying the schema.
157 ///
158 /// # Arguments
159 ///
160 /// * `connection` - The database connection.
161 /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
162 ///
163 /// # Errors
164 ///
165 /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
166 ///
167 /// # Example
168 ///
169 /// ```no_run
170 /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
171 ///
172 /// fn main() -> Result<()> {
173 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
174 ///
175 /// // Create a table first
176 /// conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
177 ///
178 /// // Create inserter by querying the schema directly from a string
179 /// let mut inserter = Inserter::from_table(&conn, "public.products")?;
180 ///
181 /// // Now we can insert data without knowing the exact schema
182 /// inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
183 /// inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
184 ///
185 /// let rows = inserter.execute()?;
186 /// println!("Inserted {} rows", rows);
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
191 where
192 T: TryInto<crate::TableName>,
193 crate::Error: From<T::Error>,
194 {
195 let catalog = Catalog::new(connection);
196 let table_def = catalog.get_table_definition(table_name)?;
197 Self::new(connection, &table_def)
198 }
199
200 /// Creates an inserter with column mappings that allow SQL expressions.
201 ///
202 /// This method uses a temporary table and INSERT...SELECT to support
203 /// column mappings with SQL expressions. Data is first inserted into
204 /// a temporary staging table, then transformed using the mappings.
205 ///
206 /// # Arguments
207 ///
208 /// * `connection` - The database connection.
209 /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
210 /// * `target_table` - The qualified name of the target table to insert into.
211 /// Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
212 /// * `mappings` - Column mappings defining how values are transformed.
213 ///
214 /// # Example
215 ///
216 /// ```no_run
217 /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
218 ///
219 /// fn main() -> Result<()> {
220 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
221 ///
222 /// // Target table with computed columns
223 /// conn.execute_command(r#"
224 /// CREATE TABLE orders (
225 /// id INT NOT NULL,
226 /// product TEXT,
227 /// quantity INT,
228 /// price DOUBLE PRECISION,
229 /// total DOUBLE PRECISION,
230 /// created_at TIMESTAMP
231 /// )
232 /// "#)?;
233 ///
234 /// // Inserter definition - what we provide
235 /// let inserter_def = TableDefinition::new("_stage")
236 /// .add_required_column("id", SqlType::int())
237 /// .add_nullable_column("product", SqlType::text())
238 /// .add_nullable_column("quantity", SqlType::int())
239 /// .add_nullable_column("price", SqlType::double());
240 ///
241 /// // Column mappings - how values are transformed
242 /// let mappings = vec![
243 /// ColumnMapping::new("id"),
244 /// ColumnMapping::new("product"),
245 /// ColumnMapping::new("quantity"),
246 /// ColumnMapping::new("price"),
247 /// ColumnMapping::with_expression("total", "quantity * price"),
248 /// ColumnMapping::with_expression("created_at", "NOW()"),
249 /// ];
250 ///
251 /// // For simple table names in the public schema, use quoted name
252 /// // For qualified names, use target_table_def.qualified_name()
253 /// let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
254 ///
255 /// inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
256 /// inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
257 ///
258 /// let rows = inserter.execute()?;
259 /// Ok(())
260 /// }
261 /// ```
262 ///
263 /// # Errors
264 ///
265 /// - Returns an error if `target_table` fails to convert into a
266 /// [`TableName`](crate::TableName).
267 /// - Returns [`Error::Client`] if creating the temporary staging table
268 /// fails on the server.
269 /// - Returns the errors from [`Inserter::new`] for the staging table
270 /// (zero-column table definition, gRPC transport).
271 pub fn with_column_mappings<T>(
272 connection: &'conn Connection,
273 inserter_def: &TableDefinition,
274 target_table: T,
275 mappings: &[ColumnMapping],
276 ) -> Result<MappedInserter<'conn>>
277 where
278 T: TryInto<crate::TableName>,
279 crate::Error: From<T::Error>,
280 {
281 MappedInserter::new(connection, inserter_def, target_table, mappings)
282 }
283
284 /// Returns the table definition.
285 pub fn table_definition(&self) -> &TableDefinition {
286 &self.table_def
287 }
288
289 /// Returns the number of columns.
290 #[must_use]
291 pub fn column_count(&self) -> usize {
292 self.table_def.column_count()
293 }
294
295 /// Returns the number of complete rows buffered.
296 #[must_use]
297 pub fn row_count(&self) -> u64 {
298 self.row_count
299 }
300
301 /// Adds a NULL value for the current column.
302 ///
303 /// # Errors
304 ///
305 /// Returns [`Error::Other`] if the current row already has all columns
306 /// supplied, or if the current column is marked `NOT NULL` in the table
307 /// definition.
308 #[inline]
309 pub fn add_null(&mut self) -> Result<()> {
310 self.chunk.add_null()
311 }
312
313 /// Adds a boolean value.
314 ///
315 /// # Errors
316 ///
317 /// Returns [`Error::Other`] with message `"Too many columns in row"` if
318 /// the current row already has all columns supplied.
319 #[inline]
320 pub fn add_bool(&mut self, value: bool) -> Result<()> {
321 self.chunk.add_bool(value)
322 }
323
324 /// Adds an i16 value (SMALLINT).
325 ///
326 /// # Errors
327 ///
328 /// See [`add_bool`](Self::add_bool).
329 #[inline]
330 pub fn add_i16(&mut self, value: i16) -> Result<()> {
331 self.chunk.add_i16(value)
332 }
333
334 /// Adds an i32 value (INT).
335 ///
336 /// # Errors
337 ///
338 /// See [`add_bool`](Self::add_bool).
339 #[inline]
340 pub fn add_i32(&mut self, value: i32) -> Result<()> {
341 self.chunk.add_i32(value)
342 }
343
344 /// Adds an i64 value (BIGINT).
345 ///
346 /// # Errors
347 ///
348 /// See [`add_bool`](Self::add_bool).
349 #[inline]
350 pub fn add_i64(&mut self, value: i64) -> Result<()> {
351 self.chunk.add_i64(value)
352 }
353
354 /// Adds an f32 value (REAL/FLOAT4).
355 ///
356 /// # Errors
357 ///
358 /// See [`add_bool`](Self::add_bool).
359 #[inline]
360 pub fn add_f32(&mut self, value: f32) -> Result<()> {
361 self.chunk.add_f32(value)
362 }
363
364 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
365 ///
366 /// # Errors
367 ///
368 /// See [`add_bool`](Self::add_bool).
369 #[inline]
370 pub fn add_f64(&mut self, value: f64) -> Result<()> {
371 self.chunk.add_f64(value)
372 }
373
374 /// Adds a string value (TEXT/VARCHAR).
375 ///
376 /// # Errors
377 ///
378 /// See [`add_bool`](Self::add_bool).
379 #[inline]
380 pub fn add_str(&mut self, value: &str) -> Result<()> {
381 self.chunk.add_str(value)
382 }
383
384 /// Adds a bytes value (BYTEA).
385 ///
386 /// # Errors
387 ///
388 /// See [`add_bool`](Self::add_bool).
389 #[inline]
390 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
391 self.chunk.add_bytes(value)
392 }
393
394 /// Adds a 128-bit value (NUMERIC/INTERVAL).
395 ///
396 /// # Errors
397 ///
398 /// See [`add_bool`](Self::add_bool).
399 #[inline]
400 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
401 self.chunk.add_data128(value)
402 }
403
404 /// Adds an optional value. If None, adds NULL.
405 ///
406 /// # Errors
407 ///
408 /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
409 /// return for the current row position.
410 pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
411 where
412 F: FnOnce(&mut Self, T) -> Result<()>,
413 {
414 match value {
415 Some(v) => add_fn(self, v),
416 None => self.add_null(),
417 }
418 }
419
420 /// Ends the current row.
421 ///
422 /// Returns an error if the wrong number of columns were added.
423 /// Automatically flushes the buffer if chunk limits are reached.
424 ///
425 /// # Errors
426 ///
427 /// - Returns [`Error::Other`] if fewer (or more) columns were supplied
428 /// than the table definition requires.
429 /// - Returns any error from [`flush`](Self::flush) when an automatic
430 /// flush is triggered by reaching the chunk byte/row limit.
431 pub fn end_row(&mut self) -> Result<()> {
432 self.chunk.end_row()?;
433 self.row_count += 1;
434
435 // Auto-flush if we've reached chunk limits
436 if self.chunk.should_flush() {
437 self.flush()?;
438 }
439
440 Ok(())
441 }
442
443 /// Flushes the current buffer to the server.
444 ///
445 /// This sends all buffered rows as a chunk and resets the buffer.
446 /// Called automatically when chunk limits are reached.
447 ///
448 /// # Errors
449 ///
450 /// - Returns [`Error::Other`] if the connection is using gRPC transport
451 /// (COPY is TCP-only) and no COPY session exists yet.
452 /// - Returns [`Error::Client`] if the server rejects the `COPY IN` start
453 /// or the subsequent data send.
454 /// - Returns [`Error::Io`] on transport-level I/O failures while writing
455 /// the chunk.
456 pub fn flush(&mut self) -> Result<()> {
457 if self.chunk.is_empty() {
458 return Ok(());
459 }
460
461 let chunk_rows = self.chunk.row_count();
462 let Some(buffer) = self.chunk.take() else {
463 return Ok(());
464 };
465
466 // Ensure the COPY connection is started
467 if self.writer.is_none() {
468 let client = self.connection.tcp_client().ok_or_else(|| {
469 crate::Error::new("Inserter requires a TCP connection. gRPC connections do not support COPY operations.")
470 })?;
471 let columns: Vec<&str> = self
472 .table_def
473 .columns
474 .iter()
475 .map(|c| c.name.as_str())
476 .collect();
477 let table_name = self.table_def.qualified_name();
478 self.writer = Some(client.copy_in(&table_name, &columns)?);
479 }
480
481 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
482 // into the connection's write buffer. flush_stream ensures the data
483 // reaches the server before we return.
484 if let Some(ref mut writer) = self.writer {
485 writer.send_direct(&buffer)?;
486 writer.flush_stream()?;
487 }
488
489 debug!(
490 target: "hyperdb_api",
491 chunk = self.chunk_count,
492 rows = chunk_rows,
493 bytes = buffer.len(),
494 "inserter-chunk"
495 );
496
497 self.chunk_count += 1;
498 Ok(())
499 }
500
501 /// Adds a complete row of values.
502 ///
503 /// This is a convenience method that adds all column values at once
504 /// using the `IntoValue` trait for type-safe insertion.
505 ///
506 /// # Arguments
507 ///
508 /// * `values` - A slice of values implementing `IntoValue`.
509 ///
510 /// # Errors
511 ///
512 /// Returns an error if the number of values doesn't match the column count,
513 /// or if any value cannot be added.
514 ///
515 /// # Example
516 ///
517 /// ```no_run
518 /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
519 ///
520 /// fn main() -> Result<()> {
521 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
522 ///
523 /// let table_def = TableDefinition::new("users")
524 /// .add_required_column("id", SqlType::int())
525 /// .add_nullable_column("name", SqlType::text());
526 ///
527 /// Catalog::new(&conn).create_table(&table_def)?;
528 ///
529 /// let mut inserter = Inserter::new(&conn, &table_def)?;
530 ///
531 /// // Add rows using IntoValue trait
532 /// inserter.add_row(&[&1i32, &"Alice"])?;
533 /// inserter.add_row(&[&2i32, &"Bob"])?;
534 ///
535 /// // Option<T> can be used for nullable columns
536 /// inserter.add_row(&[&3i32, &None::<&str>])?;
537 ///
538 /// let rows = inserter.execute()?;
539 /// Ok(())
540 /// }
541 /// ```
542 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
543 let column_count = self.table_def.column_count();
544 if values.len() != column_count {
545 return Err(Error::new(format!(
546 "Column count mismatch: expected {} columns but got {}",
547 column_count,
548 values.len()
549 )));
550 }
551
552 for value in values {
553 value.add_to_inserter(self)?;
554 }
555
556 self.end_row()?;
557 Ok(())
558 }
559
560 /// Adds a Date value.
561 ///
562 /// # Errors
563 ///
564 /// See [`add_bool`](Self::add_bool).
565 #[inline]
566 pub fn add_date(&mut self, value: Date) -> Result<()> {
567 self.chunk.add_date(value)
568 }
569
570 /// Adds a Time value.
571 ///
572 /// # Errors
573 ///
574 /// See [`add_bool`](Self::add_bool).
575 #[inline]
576 pub fn add_time(&mut self, value: Time) -> Result<()> {
577 self.chunk.add_time(value)
578 }
579
580 /// Adds a Timestamp value.
581 ///
582 /// # Errors
583 ///
584 /// See [`add_bool`](Self::add_bool).
585 #[inline]
586 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
587 self.chunk.add_timestamp(value)
588 }
589
590 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
591 ///
592 /// # Errors
593 ///
594 /// See [`add_bool`](Self::add_bool).
595 #[inline]
596 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
597 self.chunk.add_offset_timestamp(value)
598 }
599
600 /// Adds an Interval value.
601 ///
602 /// # Errors
603 ///
604 /// See [`add_bool`](Self::add_bool).
605 #[inline]
606 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
607 self.chunk.add_interval(value)
608 }
609
610 /// Adds a Numeric value.
611 ///
612 /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
613 /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
614 ///
615 /// # Errors
616 ///
617 /// Returns an error if the column's precision cannot be determined from the
618 /// table definition. Ensure that NUMERIC columns are defined with explicit
619 /// `SqlType` information including precision.
620 pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
621 let column_index = self.chunk.column_index();
622
623 // Check the column's precision to determine storage format
624 let precision = self
625 .table_def
626 .columns
627 .get(column_index)
628 .and_then(super::table_definition::ColumnDefinition::sql_type)
629 .and_then(|t| t.precision())
630 .ok_or_else(|| {
631 let col_name = self
632 .table_def
633 .columns
634 .get(column_index)
635 .map_or("<unknown>", |c| c.name.as_str());
636 Error::new(format!(
637 "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
638 Ensure the column is defined with explicit SqlType including precision.\n\n\
639 Example fix:\n \
640 table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
641 ))
642 })?;
643
644 if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
645 // Small numeric: stored as i64
646 let unscaled = value.unscaled_value();
647 let narrowed = i64::try_from(unscaled).map_err(|_| {
648 Error::new(format!(
649 "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
650 ))
651 })?;
652 self.chunk.add_i64(narrowed)
653 } else {
654 // Big numeric: stored as 128-bit
655 self.chunk.add_data128(&value.to_packed())
656 }
657 }
658
659 /// Executes the insert and commits all buffered rows.
660 ///
661 /// This sends any remaining buffered data and finishes the COPY operation.
662 /// Returns the number of rows inserted.
663 ///
664 /// The inserter is single-use: calling `execute` a second time returns
665 /// `Ok(0)` because the internal row counter has been reset and no further
666 /// data has been added. To insert additional batches, create a new
667 /// [`Inserter`].
668 ///
669 /// # Errors
670 ///
671 /// Returns an error if:
672 /// - There's an incomplete row (`column_index` != 0)
673 /// - The COPY connection fails to start
674 /// - Sending data fails
675 pub fn execute(&mut self) -> Result<u64> {
676 if self.chunk.column_index() != 0 {
677 return Err(Error::new("Incomplete row at execute time"));
678 }
679
680 if self.row_count == 0 {
681 return Ok(0);
682 }
683
684 // Ensure COPY connection exists before proceeding when we have rows
685 if self.writer.is_none() {
686 let client = self.connection.tcp_client().ok_or_else(|| {
687 Error::new("Inserter requires a TCP connection. gRPC connections do not support COPY operations.")
688 })?;
689 let columns: Vec<&str> = self
690 .table_def
691 .columns
692 .iter()
693 .map(|c| c.name.as_str())
694 .collect();
695 let table_name = self.table_def.qualified_name();
696 self.writer = Some(client.copy_in(&table_name, &columns)?);
697 }
698
699 // At this point, writer must exist since we have rows
700 let writer = self
701 .writer
702 .as_mut()
703 .ok_or_else(|| Error::new("Failed to initialize COPY connection for inserter"))?;
704
705 // If we have buffered data that hasn't been sent yet
706 if !self.chunk.is_empty() {
707 writer.send(self.chunk.buffer())?;
708 }
709
710 // Write and send the COPY trailer
711 let mut trailer_buf = BytesMut::with_capacity(2);
712 copy::write_trailer(&mut trailer_buf);
713 writer.send(&trailer_buf)?;
714
715 // Finish the COPY operation
716 let rows = self
717 .writer
718 .take()
719 .map(hyperdb_api_core::client::CopyInWriter::finish)
720 .transpose()?
721 .unwrap_or(0);
722
723 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
724 info!(
725 target: "hyperdb_api",
726 rows,
727 chunks = self.chunk_count,
728 duration_ms,
729 table = %self.table_def.qualified_name(),
730 "inserter-end"
731 );
732
733 // Reset row counter so a stray second execute() call returns Ok(0)
734 // instead of attempting another COPY trailer on a finished writer.
735 self.row_count = 0;
736
737 Ok(rows)
738 }
739
740 /// Cancels the insert and discards all buffered rows.
741 pub fn cancel(&mut self) {
742 // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
743 // sends a CopyFail to the server.
744 self.writer = None;
745 self.row_count = 0;
746 }
747}
748
749// =============================================================================
750// ColumnMapping
751// =============================================================================
752
753/// Defines how a column receives its value during insertion.
754///
755/// Column mappings allow you to:
756/// - Insert values directly from the inserter stream
757/// - Compute values using SQL expressions
758/// - Use server-side functions like `NOW()` or `DEFAULT`
759///
760/// # Example
761///
762/// ```
763/// use hyperdb_api::ColumnMapping;
764///
765/// // Simple column - insert value directly
766/// let id_col = ColumnMapping::new("id");
767///
768/// // Column with expression - computed value
769/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
770/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
771/// ```
772#[derive(Debug, Clone)]
773#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
774pub struct ColumnMapping {
775 /// The name of the target column.
776 pub column_name: String,
777 /// Optional SQL expression. If None, the value is inserted directly.
778 pub expression: Option<String>,
779}
780
781impl ColumnMapping {
782 /// Creates a column mapping for direct value insertion.
783 ///
784 /// The column will receive values directly from the inserter.
785 pub fn new(column_name: impl Into<String>) -> Self {
786 ColumnMapping {
787 column_name: column_name.into(),
788 expression: None,
789 }
790 }
791
792 /// Creates a column mapping with a SQL expression.
793 ///
794 /// The column value will be computed using the given SQL expression.
795 /// The expression can reference other columns or use SQL functions.
796 ///
797 /// # Arguments
798 ///
799 /// * `column_name` - The name of the target column.
800 /// * `expression` - A SQL expression to compute the column value.
801 ///
802 /// # Example
803 ///
804 /// ```
805 /// use hyperdb_api::ColumnMapping;
806 ///
807 /// // Use current timestamp
808 /// let created = ColumnMapping::with_expression("created_at", "NOW()");
809 ///
810 /// // Compute from other columns
811 /// let total = ColumnMapping::with_expression("total", "quantity * price");
812 /// ```
813 pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
814 ColumnMapping {
815 column_name: column_name.into(),
816 expression: Some(expression.into()),
817 }
818 }
819
820 /// Returns the column name.
821 #[must_use]
822 pub fn column_name(&self) -> &str {
823 &self.column_name
824 }
825
826 /// Returns the SQL expression, if any.
827 #[must_use]
828 pub fn expression(&self) -> Option<&str> {
829 self.expression.as_deref()
830 }
831
832 /// Returns true if this is a direct value mapping (no expression).
833 #[must_use]
834 pub fn is_direct(&self) -> bool {
835 self.expression.is_none()
836 }
837
838 /// Returns the select list item for this mapping.
839 fn to_select_item(&self) -> String {
840 match &self.expression {
841 Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
842 None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
843 }
844 }
845}
846
847// =============================================================================
848// IntoValue Trait
849// =============================================================================
850
851/// Trait for types that can be inserted into a Hyper table.
852///
853/// This trait is implemented for common Rust types, allowing them to be
854/// used with [`Inserter::add_row()`] for type-safe insertion.
855///
856/// # Supported Types
857///
858/// - Integers: `i16`, `i32`, `i64`
859/// - Floats: `f32`, `f64`
860/// - `bool`
861/// - `&str`, `String`
862/// - `Option<T>` where `T: IntoValue` (for nullable columns)
863/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
864/// - `Numeric`, `Geography`, `Oid`, `Vec<u8>` (bytes)
865///
866/// # Example
867///
868/// ```no_run
869/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
870///
871/// fn main() -> Result<()> {
872/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
873///
874/// let table_def = TableDefinition::new("example")
875/// .add_required_column("a", SqlType::int())
876/// .add_nullable_column("b", SqlType::text())
877/// .add_nullable_column("c", SqlType::double());
878/// Catalog::new(&conn).create_table(&table_def)?;
879///
880/// let mut inserter = Inserter::new(&conn, &table_def)?;
881///
882/// // IntoValue allows adding rows with mixed types
883/// inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
884/// inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
885///
886/// inserter.execute()?;
887/// Ok(())
888/// }
889/// ```
890pub trait IntoValue {
891 /// Adds this value to the inserter.
892 ///
893 /// # Errors
894 ///
895 /// Implementations call the matching `Inserter::add_*` method and
896 /// forward its error — see [`Inserter::add_bool`] for the shared
897 /// failure modes (too many columns, NULL into non-nullable, etc).
898 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
899}
900
901// Implementations for basic types
902
903impl IntoValue for bool {
904 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
905 inserter.add_bool(*self)
906 }
907}
908
909impl IntoValue for i16 {
910 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
911 inserter.add_i16(*self)
912 }
913}
914
915impl IntoValue for i32 {
916 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
917 inserter.add_i32(*self)
918 }
919}
920
921impl IntoValue for i64 {
922 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923 inserter.add_i64(*self)
924 }
925}
926
927impl IntoValue for f32 {
928 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929 inserter.add_f32(*self)
930 }
931}
932
933impl IntoValue for f64 {
934 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935 inserter.add_f64(*self)
936 }
937}
938
939impl IntoValue for str {
940 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941 inserter.add_str(self)
942 }
943}
944
945impl IntoValue for String {
946 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947 inserter.add_str(self)
948 }
949}
950
951impl IntoValue for [u8] {
952 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953 inserter.add_bytes(self)
954 }
955}
956
957impl IntoValue for Vec<u8> {
958 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959 inserter.add_bytes(self)
960 }
961}
962
963// Hyper-specific types
964
965impl IntoValue for Date {
966 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
967 inserter.add_date(*self)
968 }
969}
970
971impl IntoValue for Time {
972 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
973 inserter.add_time(*self)
974 }
975}
976
977impl IntoValue for Timestamp {
978 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
979 inserter.add_timestamp(*self)
980 }
981}
982
983impl IntoValue for OffsetTimestamp {
984 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985 inserter.add_offset_timestamp(*self)
986 }
987}
988
989impl IntoValue for Interval {
990 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991 inserter.add_interval(*self)
992 }
993}
994
995impl IntoValue for Numeric {
996 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997 inserter.add_numeric(*self)
998 }
999}
1000
1001// Option<T> for nullable values
1002impl<T: IntoValue> IntoValue for Option<T> {
1003 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1004 match self {
1005 Some(value) => value.add_to_inserter(inserter),
1006 None => inserter.add_null(),
1007 }
1008 }
1009}
1010
1011// Reference implementations for primitives
1012
1013impl IntoValue for &bool {
1014 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1015 inserter.add_bool(**self)
1016 }
1017}
1018
1019impl IntoValue for &i16 {
1020 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021 inserter.add_i16(**self)
1022 }
1023}
1024
1025impl IntoValue for &i32 {
1026 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1027 inserter.add_i32(**self)
1028 }
1029}
1030
1031impl IntoValue for &i64 {
1032 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1033 inserter.add_i64(**self)
1034 }
1035}
1036
1037impl IntoValue for &f32 {
1038 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039 inserter.add_f32(**self)
1040 }
1041}
1042
1043impl IntoValue for &f64 {
1044 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045 inserter.add_f64(**self)
1046 }
1047}
1048
1049impl IntoValue for &String {
1050 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051 inserter.add_str(self)
1052 }
1053}
1054
1055impl IntoValue for &str {
1056 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057 inserter.add_str(self)
1058 }
1059}
1060
1061impl IntoValue for &&str {
1062 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063 inserter.add_str(self)
1064 }
1065}
1066
1067impl IntoValue for &[u8] {
1068 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069 inserter.add_bytes(self)
1070 }
1071}
1072
1073impl IntoValue for &Date {
1074 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075 inserter.add_date(**self)
1076 }
1077}
1078
1079impl IntoValue for &Time {
1080 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081 inserter.add_time(**self)
1082 }
1083}
1084
1085impl IntoValue for &Timestamp {
1086 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087 inserter.add_timestamp(**self)
1088 }
1089}
1090
1091impl IntoValue for &OffsetTimestamp {
1092 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093 inserter.add_offset_timestamp(**self)
1094 }
1095}
1096
1097impl IntoValue for &Interval {
1098 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099 inserter.add_interval(**self)
1100 }
1101}
1102
1103impl IntoValue for &Numeric {
1104 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105 inserter.add_numeric(**self)
1106 }
1107}
1108
1109// =============================================================================
1110// MappedInserter
1111// =============================================================================
1112
1113/// An inserter that supports SQL expression mappings.
1114///
1115/// This inserter uses a staging table to support computed columns via
1116/// INSERT...SELECT with SQL expressions. It's created by
1117/// [`Inserter::with_column_mappings`].
1118#[derive(Debug)]
1119pub struct MappedInserter<'conn> {
1120 /// The underlying inserter for the staging table.
1121 inner: Inserter<'conn>,
1122 /// The target table name.
1123 target_table: crate::TableName,
1124 /// The column mappings.
1125 mappings: Vec<ColumnMapping>,
1126 /// The staging table name.
1127 staging_table: String,
1128}
1129
1130impl<'conn> MappedInserter<'conn> {
1131 /// Creates a new mapped inserter.
1132 fn new<T>(
1133 connection: &'conn Connection,
1134 inserter_def: &TableDefinition,
1135 target_table: T,
1136 mappings: &[ColumnMapping],
1137 ) -> Result<Self>
1138 where
1139 T: TryInto<crate::TableName>,
1140 crate::Error: From<T::Error>,
1141 {
1142 let target_table = target_table.try_into()?;
1143
1144 // Create a unique staging table name
1145 let staging_table = format!("_hyper_staging_{}", std::process::id());
1146
1147 // Create the staging table definition (temporary)
1148 let mut staging_def = inserter_def.clone();
1149 staging_def.name.clone_from(&staging_table);
1150
1151 // Create the staging table
1152 let create_sql = staging_def.to_create_sql(true)?;
1153 let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1154 connection.execute_command(&create_temp)?;
1155
1156 // Create the inner inserter for the staging table
1157 let inner = Inserter::new(connection, &staging_def)?;
1158
1159 Ok(MappedInserter {
1160 inner,
1161 target_table,
1162 mappings: mappings.to_vec(),
1163 staging_table,
1164 })
1165 }
1166
1167 /// Adds a row of values to the inserter.
1168 ///
1169 /// The values should correspond to the columns in the inserter definition,
1170 /// not the target table.
1171 ///
1172 /// # Errors
1173 ///
1174 /// Forwards the error from [`Inserter::add_row`].
1175 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1176 self.inner.add_row(values)
1177 }
1178
1179 /// Adds a NULL value.
1180 ///
1181 /// # Errors
1182 ///
1183 /// Forwards the error from [`Inserter::add_null`].
1184 pub fn add_null(&mut self) -> Result<()> {
1185 self.inner.add_null()
1186 }
1187
1188 /// Adds a boolean value.
1189 ///
1190 /// # Errors
1191 ///
1192 /// Forwards the error from [`Inserter::add_bool`].
1193 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1194 self.inner.add_bool(value)
1195 }
1196
1197 /// Adds an i16 value.
1198 ///
1199 /// # Errors
1200 ///
1201 /// Forwards the error from [`Inserter::add_i16`].
1202 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1203 self.inner.add_i16(value)
1204 }
1205
1206 /// Adds an i32 value.
1207 ///
1208 /// # Errors
1209 ///
1210 /// Forwards the error from [`Inserter::add_i32`].
1211 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1212 self.inner.add_i32(value)
1213 }
1214
1215 /// Adds an i64 value.
1216 ///
1217 /// # Errors
1218 ///
1219 /// Forwards the error from [`Inserter::add_i64`].
1220 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1221 self.inner.add_i64(value)
1222 }
1223
1224 /// Adds an f32 value.
1225 ///
1226 /// # Errors
1227 ///
1228 /// Forwards the error from [`Inserter::add_f32`].
1229 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1230 self.inner.add_f32(value)
1231 }
1232
1233 /// Adds an f64 value.
1234 ///
1235 /// # Errors
1236 ///
1237 /// Forwards the error from [`Inserter::add_f64`].
1238 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1239 self.inner.add_f64(value)
1240 }
1241
1242 /// Adds a string value.
1243 ///
1244 /// # Errors
1245 ///
1246 /// Forwards the error from [`Inserter::add_str`].
1247 pub fn add_str(&mut self, value: &str) -> Result<()> {
1248 self.inner.add_str(value)
1249 }
1250
1251 /// Adds a bytes value.
1252 ///
1253 /// # Errors
1254 ///
1255 /// Forwards the error from [`Inserter::add_bytes`].
1256 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1257 self.inner.add_bytes(value)
1258 }
1259
1260 /// Ends the current row.
1261 ///
1262 /// # Errors
1263 ///
1264 /// Forwards the error from [`Inserter::end_row`].
1265 pub fn end_row(&mut self) -> Result<()> {
1266 self.inner.end_row()
1267 }
1268
1269 /// Executes the insert with column mappings.
1270 ///
1271 /// This method:
1272 /// 1. Inserts all buffered rows into the staging table
1273 /// 2. Executes INSERT...SELECT from staging to target with mappings
1274 /// 3. Drops the staging table
1275 ///
1276 /// Returns the number of rows inserted into the target table.
1277 ///
1278 /// # Errors
1279 ///
1280 /// - Returns the error from the inner [`Inserter::execute`] if writing
1281 /// the staging rows fails.
1282 /// - Returns [`Error::Client`] if the `INSERT ... SELECT` from staging
1283 /// to the target table is rejected (e.g. a mapping expression fails
1284 /// to evaluate).
1285 /// - Returns [`Error::Client`] if dropping the staging table fails.
1286 pub fn execute(&mut self) -> Result<u64> {
1287 let connection = self.inner.connection;
1288 let staging_table = self.staging_table.clone();
1289
1290 // Insert data into staging table
1291 let _staging_rows = self.inner.execute()?;
1292
1293 // Build the INSERT...SELECT statement
1294 use hyperdb_api_core::protocol::escape::SqlIdentifier;
1295
1296 let target_columns: Vec<String> = self
1297 .mappings
1298 .iter()
1299 .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1300 .collect();
1301
1302 let select_items: Vec<String> = self
1303 .mappings
1304 .iter()
1305 .map(ColumnMapping::to_select_item)
1306 .collect();
1307
1308 let sql = format!(
1309 "INSERT INTO {} ({}) SELECT {} FROM {}",
1310 self.target_table,
1311 target_columns.join(", "),
1312 select_items.join(", "),
1313 SqlIdentifier(&staging_table),
1314 );
1315
1316 // Execute the INSERT...SELECT (returns row count directly)
1317 let row_count = connection.execute_command(&sql)?;
1318
1319 // Drop the staging table
1320 connection.execute_command(&format!(
1321 "DROP TABLE IF EXISTS {}",
1322 SqlIdentifier(&staging_table)
1323 ))?;
1324
1325 // Return the number of rows inserted
1326 Ok(row_count)
1327 }
1328
1329 /// Cancels the insert and drops the staging table.
1330 ///
1331 /// This method handles cleanup failures gracefully by logging warnings
1332 /// instead of returning errors. This prevents masking the original error
1333 /// that caused the cancellation.
1334 ///
1335 /// # Logging
1336 ///
1337 /// Cleanup failures are logged using the `tracing` crate at WARN level.
1338 /// If `tracing` is not initialized, errors are written to stderr.
1339 pub fn cancel(&mut self) {
1340 let connection = self.inner.connection;
1341 let staging_table = &self.staging_table;
1342
1343 // Drop the staging table, but don't fail if cleanup fails
1344 // This avoids masking the original error that caused cancellation
1345 if let Err(e) = connection.execute_command(&format!(
1346 "DROP TABLE IF EXISTS \"{}\"",
1347 staging_table.replace('"', "\"\"")
1348 )) {
1349 // Log the cleanup failure for debugging
1350 // In production, consider using a logging framework like `tracing`
1351 eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1352 }
1353 }
1354}
1355
1356// =============================================================================
1357// InsertChunk - Thread-safe chunk for parallel encoding
1358// =============================================================================
1359
1360/// A thread-safe chunk for encoding rows in parallel.
1361///
1362/// `InsertChunk` can be created and populated in any thread, then sent to a
1363/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1364/// multiple worker threads while serializing the actual network sends.
1365///
1366/// # Example
1367///
1368/// ```no_run
1369/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1370///
1371/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1372/// let mut chunk = InsertChunk::from_table_definition(table_def);
1373///
1374/// for i in 0..1000 {
1375/// chunk.add_i32(start_id + i)?;
1376/// chunk.add_str(&format!("Item {}", start_id + i))?;
1377/// chunk.end_row()?;
1378/// }
1379///
1380/// Ok(chunk)
1381/// }
1382/// ```
1383#[derive(Debug)]
1384pub struct InsertChunk {
1385 buffer: BytesMut,
1386 header_written: bool,
1387 column_index: usize,
1388 column_count: usize,
1389 row_count: usize,
1390 column_nullable: Vec<bool>,
1391}
1392
1393// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1394// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1395// thread-local state. The manual `unsafe impl` exists only because the
1396// auto-trait derivation is conservative for this struct's compilation context;
1397// the compound type has no `!Send` components.
1398unsafe impl Send for InsertChunk {}
1399// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1400// and there is no interior mutability crossing a `&InsertChunk` boundary,
1401// so sharing `&InsertChunk` across threads is sound.
1402unsafe impl Sync for InsertChunk {}
1403
1404impl InsertChunk {
1405 /// Creates a new empty chunk with the given schema.
1406 ///
1407 /// # Arguments
1408 ///
1409 /// * `column_count` - Number of columns per row
1410 /// * `column_nullable` - Whether each column is nullable
1411 #[must_use]
1412 pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1413 debug_assert_eq!(column_count, column_nullable.len());
1414 InsertChunk {
1415 buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1416 header_written: false,
1417 column_index: 0,
1418 column_count,
1419 row_count: 0,
1420 column_nullable,
1421 }
1422 }
1423
1424 /// Creates a chunk from a table definition.
1425 #[must_use]
1426 pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1427 let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1428 Self::new(table_def.column_count(), column_nullable)
1429 }
1430
1431 /// Returns the number of complete rows in this chunk.
1432 #[must_use]
1433 pub fn row_count(&self) -> usize {
1434 self.row_count
1435 }
1436
1437 /// Returns the current buffer size in bytes.
1438 #[must_use]
1439 pub fn buffer_size(&self) -> usize {
1440 self.buffer.len()
1441 }
1442
1443 /// Returns true if the chunk has reached size or row limits and should be sent.
1444 #[must_use]
1445 pub fn should_flush(&self) -> bool {
1446 self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1447 }
1448
1449 /// Returns true if the chunk is empty (no rows).
1450 #[must_use]
1451 pub fn is_empty(&self) -> bool {
1452 self.row_count == 0
1453 }
1454
1455 /// Takes the buffer, consuming the chunk data.
1456 ///
1457 /// Returns `None` if the chunk is empty. After calling this, the chunk
1458 /// can be reused by calling the add_* methods again.
1459 ///
1460 /// Note: The header flag is NOT reset - subsequent chunks from the same
1461 /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1462 /// header per COPY stream).
1463 pub fn take(&mut self) -> Option<BytesMut> {
1464 if self.row_count == 0 {
1465 return None;
1466 }
1467 // Don't reset header_written - only first chunk should have header
1468 self.row_count = 0;
1469 Some(std::mem::take(&mut self.buffer))
1470 }
1471
1472 /// Resets the chunk for reuse without reallocating.
1473 pub fn clear(&mut self) {
1474 self.buffer.clear();
1475 self.header_written = false;
1476 self.column_index = 0;
1477 self.row_count = 0;
1478 }
1479
1480 #[allow(
1481 clippy::inline_always,
1482 reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1483 )]
1484 fn ensure_header(&mut self) {
1485 if !self.header_written {
1486 copy::write_header(&mut self.buffer);
1487 self.header_written = true;
1488 }
1489 }
1490
1491 #[expect(
1492 clippy::inline_always,
1493 reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1494 )]
1495 #[inline(always)]
1496 fn current_column_nullable(&self) -> bool {
1497 *self.column_nullable.get(self.column_index).unwrap_or(&true)
1498 }
1499
1500 /// Adds a NULL value for the current column.
1501 ///
1502 /// # Errors
1503 ///
1504 /// - Returns [`Error::Other`] with message `"Too many columns in row"`
1505 /// if the current row already has all columns supplied.
1506 /// - Returns [`Error::Other`] with message
1507 /// `"Cannot add NULL to non-nullable column"` if the current column
1508 /// is `NOT NULL` in the schema.
1509 pub fn add_null(&mut self) -> Result<()> {
1510 if self.column_index >= self.column_count {
1511 return Err(Error::new("Too many columns in row"));
1512 }
1513 if !self.current_column_nullable() {
1514 return Err(Error::new("Cannot add NULL to non-nullable column"));
1515 }
1516 self.ensure_header();
1517 copy::write_null(&mut self.buffer);
1518 self.column_index += 1;
1519 Ok(())
1520 }
1521
1522 /// Adds a boolean value.
1523 ///
1524 /// # Errors
1525 ///
1526 /// Returns [`Error::Other`] with message `"Too many columns in row"` if
1527 /// the current row already has all columns supplied.
1528 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1529 if self.column_index >= self.column_count {
1530 return Err(Error::new("Too many columns in row"));
1531 }
1532 self.ensure_header();
1533 let int_value = i8::from(value);
1534 if self.current_column_nullable() {
1535 copy::write_i8(&mut self.buffer, int_value);
1536 } else {
1537 copy::write_i8_not_null(&mut self.buffer, int_value);
1538 }
1539 self.column_index += 1;
1540 Ok(())
1541 }
1542
1543 /// Adds an i16 value (SMALLINT).
1544 ///
1545 /// # Errors
1546 ///
1547 /// See [`add_bool`](Self::add_bool).
1548 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1549 if self.column_index >= self.column_count {
1550 return Err(Error::new("Too many columns in row"));
1551 }
1552 self.ensure_header();
1553 if self.current_column_nullable() {
1554 copy::write_i16(&mut self.buffer, value);
1555 } else {
1556 copy::write_i16_not_null(&mut self.buffer, value);
1557 }
1558 self.column_index += 1;
1559 Ok(())
1560 }
1561
1562 /// Adds an i32 value (INT).
1563 ///
1564 /// # Errors
1565 ///
1566 /// See [`add_bool`](Self::add_bool).
1567 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1568 if self.column_index >= self.column_count {
1569 return Err(Error::new("Too many columns in row"));
1570 }
1571 self.ensure_header();
1572 if self.current_column_nullable() {
1573 copy::write_i32(&mut self.buffer, value);
1574 } else {
1575 copy::write_i32_not_null(&mut self.buffer, value);
1576 }
1577 self.column_index += 1;
1578 Ok(())
1579 }
1580
1581 /// Adds an i64 value (BIGINT).
1582 ///
1583 /// # Errors
1584 ///
1585 /// See [`add_bool`](Self::add_bool).
1586 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1587 if self.column_index >= self.column_count {
1588 return Err(Error::new("Too many columns in row"));
1589 }
1590 self.ensure_header();
1591 if self.current_column_nullable() {
1592 copy::write_i64(&mut self.buffer, value);
1593 } else {
1594 copy::write_i64_not_null(&mut self.buffer, value);
1595 }
1596 self.column_index += 1;
1597 Ok(())
1598 }
1599
1600 /// Adds an f32 value (REAL/FLOAT4).
1601 ///
1602 /// # Errors
1603 ///
1604 /// See [`add_bool`](Self::add_bool).
1605 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1606 if self.column_index >= self.column_count {
1607 return Err(Error::new("Too many columns in row"));
1608 }
1609 self.ensure_header();
1610 if self.current_column_nullable() {
1611 copy::write_f32(&mut self.buffer, value);
1612 } else {
1613 copy::write_f32_not_null(&mut self.buffer, value);
1614 }
1615 self.column_index += 1;
1616 Ok(())
1617 }
1618
1619 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1620 ///
1621 /// # Errors
1622 ///
1623 /// See [`add_bool`](Self::add_bool).
1624 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1625 if self.column_index >= self.column_count {
1626 return Err(Error::new("Too many columns in row"));
1627 }
1628 self.ensure_header();
1629 if self.current_column_nullable() {
1630 copy::write_f64(&mut self.buffer, value);
1631 } else {
1632 copy::write_f64_not_null(&mut self.buffer, value);
1633 }
1634 self.column_index += 1;
1635 Ok(())
1636 }
1637
1638 /// Adds a string value (TEXT/VARCHAR).
1639 ///
1640 /// # Errors
1641 ///
1642 /// See [`add_bool`](Self::add_bool).
1643 pub fn add_str(&mut self, value: &str) -> Result<()> {
1644 self.add_bytes(value.as_bytes())
1645 }
1646
1647 /// Adds a bytes value (BYTEA).
1648 ///
1649 /// # Errors
1650 ///
1651 /// See [`add_bool`](Self::add_bool).
1652 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1653 if self.column_index >= self.column_count {
1654 return Err(Error::new("Too many columns in row"));
1655 }
1656 if value.len() > u32::MAX as usize {
1657 return Err(Error::new(format!(
1658 "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1659 value.len(),
1660 u32::MAX
1661 )));
1662 }
1663 self.ensure_header();
1664 if self.current_column_nullable() {
1665 copy::write_varbinary(&mut self.buffer, value);
1666 } else {
1667 copy::write_varbinary_not_null(&mut self.buffer, value);
1668 }
1669 self.column_index += 1;
1670 Ok(())
1671 }
1672
1673 /// Adds a 128-bit value (NUMERIC/INTERVAL).
1674 ///
1675 /// # Errors
1676 ///
1677 /// See [`add_bool`](Self::add_bool).
1678 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1679 if self.column_index >= self.column_count {
1680 return Err(Error::new("Too many columns in row"));
1681 }
1682 self.ensure_header();
1683 if self.current_column_nullable() {
1684 copy::write_data128(&mut self.buffer, value);
1685 } else {
1686 copy::write_data128_not_null(&mut self.buffer, value);
1687 }
1688 self.column_index += 1;
1689 Ok(())
1690 }
1691
1692 /// Adds a Date value.
1693 ///
1694 /// # Errors
1695 ///
1696 /// See [`add_bool`](Self::add_bool).
1697 pub fn add_date(&mut self, value: Date) -> Result<()> {
1698 if self.column_index >= self.column_count {
1699 return Err(Error::new("Too many columns in row"));
1700 }
1701 self.ensure_header();
1702 let julian_day = value.to_julian_day();
1703 if self.current_column_nullable() {
1704 copy::write_i32(&mut self.buffer, julian_day);
1705 } else {
1706 copy::write_i32_not_null(&mut self.buffer, julian_day);
1707 }
1708 self.column_index += 1;
1709 Ok(())
1710 }
1711
1712 /// Adds a Time value.
1713 ///
1714 /// # Errors
1715 ///
1716 /// See [`add_bool`](Self::add_bool).
1717 pub fn add_time(&mut self, value: Time) -> Result<()> {
1718 if self.column_index >= self.column_count {
1719 return Err(Error::new("Too many columns in row"));
1720 }
1721 self.ensure_header();
1722 let micros = value.to_microseconds();
1723 if self.current_column_nullable() {
1724 copy::write_i64(&mut self.buffer, micros);
1725 } else {
1726 copy::write_i64_not_null(&mut self.buffer, micros);
1727 }
1728 self.column_index += 1;
1729 Ok(())
1730 }
1731
1732 /// Adds a Timestamp value.
1733 ///
1734 /// # Errors
1735 ///
1736 /// See [`add_bool`](Self::add_bool).
1737 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1738 if self.column_index >= self.column_count {
1739 return Err(Error::new("Too many columns in row"));
1740 }
1741 self.ensure_header();
1742 let micros = value.to_microseconds();
1743 if self.current_column_nullable() {
1744 copy::write_i64(&mut self.buffer, micros);
1745 } else {
1746 copy::write_i64_not_null(&mut self.buffer, micros);
1747 }
1748 self.column_index += 1;
1749 Ok(())
1750 }
1751
1752 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1753 ///
1754 /// # Errors
1755 ///
1756 /// See [`add_bool`](Self::add_bool).
1757 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1758 if self.column_index >= self.column_count {
1759 return Err(Error::new("Too many columns in row"));
1760 }
1761 self.ensure_header();
1762 let micros = value.to_microseconds_utc();
1763 if self.current_column_nullable() {
1764 copy::write_i64(&mut self.buffer, micros);
1765 } else {
1766 copy::write_i64_not_null(&mut self.buffer, micros);
1767 }
1768 self.column_index += 1;
1769 Ok(())
1770 }
1771
1772 /// Adds an Interval value.
1773 ///
1774 /// # Errors
1775 ///
1776 /// See [`add_bool`](Self::add_bool).
1777 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1778 if self.column_index >= self.column_count {
1779 return Err(Error::new("Too many columns in row"));
1780 }
1781 self.ensure_header();
1782 let packed = value.to_packed();
1783 if self.current_column_nullable() {
1784 copy::write_data128(&mut self.buffer, &packed);
1785 } else {
1786 copy::write_data128_not_null(&mut self.buffer, &packed);
1787 }
1788 self.column_index += 1;
1789 Ok(())
1790 }
1791
1792 /// Ends the current row.
1793 ///
1794 /// Returns an error if the wrong number of columns were added.
1795 ///
1796 /// # Errors
1797 ///
1798 /// Returns [`Error::Other`] if fewer (or more) columns were supplied
1799 /// for this row than the chunk's column count.
1800 pub fn end_row(&mut self) -> Result<()> {
1801 if self.column_index != self.column_count {
1802 return Err(Error::new(format!(
1803 "Expected {} columns, got {}",
1804 self.column_count, self.column_index
1805 )));
1806 }
1807 self.column_index = 0;
1808 self.row_count += 1;
1809 Ok(())
1810 }
1811
1812 /// Returns the current column index (for checking incomplete rows).
1813 #[must_use]
1814 pub fn column_index(&self) -> usize {
1815 self.column_index
1816 }
1817
1818 /// Returns a reference to the internal buffer.
1819 pub(crate) fn buffer(&self) -> &BytesMut {
1820 &self.buffer
1821 }
1822}
1823
1824// =============================================================================
1825// ChunkSender - Mutex-protected sender for InsertChunks
1826// =============================================================================
1827
1828use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1829use std::sync::Mutex;
1830
1831/// A thread-safe sender for [`InsertChunk`]s.
1832///
1833/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1834/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1835/// the mutex ensures serialized access.
1836///
1837/// # Example
1838///
1839/// ```no_run
1840/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1841/// use std::sync::mpsc;
1842/// use std::thread;
1843///
1844/// fn main() -> Result<()> {
1845/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1846///
1847/// let table_def = TableDefinition::new("products")
1848/// .add_required_column("id", SqlType::int())
1849/// .add_nullable_column("name", SqlType::text());
1850///
1851/// Catalog::new(&conn).create_table(&table_def)?;
1852///
1853/// let sender = ChunkSender::new(&conn, &table_def)?;
1854/// let (tx, rx) = mpsc::channel::<InsertChunk>();
1855///
1856/// // Worker thread
1857/// let table_def_clone = table_def.clone();
1858/// let handle = thread::spawn(move || {
1859/// let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1860/// for i in 0..1000i32 {
1861/// chunk.add_i32(i).unwrap();
1862/// chunk.add_str(&format!("Product {}", i)).unwrap();
1863/// chunk.end_row().unwrap();
1864/// }
1865/// tx.send(chunk).unwrap();
1866/// });
1867///
1868/// // Receive and send chunks
1869/// while let Ok(chunk) = rx.recv() {
1870/// sender.send_chunk(chunk)?;
1871/// }
1872///
1873/// handle.join().unwrap();
1874/// let rows = sender.finish()?;
1875/// println!("Inserted {} rows", rows);
1876/// Ok(())
1877/// }
1878/// ```
1879#[derive(Debug)]
1880pub struct ChunkSender<'conn> {
1881 connection: &'conn Connection,
1882 table_name: String,
1883 columns: Vec<String>,
1884 writer: Mutex<Option<CopyInWriter<'conn>>>,
1885 header_sent: std::sync::atomic::AtomicBool,
1886 total_rows: AtomicU64,
1887 chunks_sent: AtomicUsize,
1888}
1889
1890impl<'conn> ChunkSender<'conn> {
1891 /// Creates a new chunk sender for the given table.
1892 ///
1893 /// # Errors
1894 ///
1895 /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1896 /// columns. The COPY session itself is opened lazily on the first
1897 /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1898 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1899 if table_def.column_count() == 0 {
1900 return Err(Error::InvalidTableDefinition(
1901 "Table definition must have at least one column".into(),
1902 ));
1903 }
1904
1905 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1906 let table_name = table_def.qualified_name();
1907
1908 Ok(ChunkSender {
1909 connection,
1910 table_name,
1911 columns,
1912 writer: Mutex::new(None),
1913 header_sent: std::sync::atomic::AtomicBool::new(false),
1914 total_rows: AtomicU64::new(0),
1915 chunks_sent: AtomicUsize::new(0),
1916 })
1917 }
1918
1919 /// Sends a chunk to Hyper.
1920 ///
1921 /// This method is thread-safe - multiple threads can call it concurrently,
1922 /// but only one chunk will be sent at a time.
1923 ///
1924 /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1925 /// automatically handles headers: the first chunk's header is sent, and
1926 /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1927 /// header per COPY stream).
1928 ///
1929 /// # Errors
1930 ///
1931 /// Returns an error if the chunk is empty or if sending fails.
1932 pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1933 // Capture row count before take() resets it
1934 let row_count = chunk.row_count();
1935
1936 let Some(buffer) = chunk.take() else {
1937 return Ok(());
1938 };
1939
1940 // Acquire the lock for exclusive send access
1941 let mut writer_guard = self
1942 .writer
1943 .lock()
1944 .map_err(|_| Error::new("ChunkSender mutex poisoned"))?;
1945
1946 // Lazily initialize the COPY connection
1947 if writer_guard.is_none() {
1948 let client = self.connection.tcp_client().ok_or_else(|| {
1949 Error::new("ChunkSender requires a TCP connection. gRPC connections do not support COPY operations.")
1950 })?;
1951 let columns: Vec<&str> = self
1952 .columns
1953 .iter()
1954 .map(std::string::String::as_str)
1955 .collect();
1956 *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
1957 }
1958
1959 // Handle headers: only first chunk should have header in the COPY stream
1960 // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
1961 // strip headers from all chunks except the first one sent.
1962 let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
1963
1964 let data_to_send = if is_first {
1965 // First chunk: send with header
1966 &buffer[..]
1967 } else {
1968 // Subsequent chunks: strip the 19-byte header if present
1969 if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
1970 && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
1971 {
1972 &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
1973 } else {
1974 &buffer[..]
1975 }
1976 };
1977
1978 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
1979 // into the connection's write buffer. flush_stream ensures the data
1980 // reaches the server before we return.
1981 if let Some(ref mut writer) = *writer_guard {
1982 writer.send_direct(data_to_send)?;
1983 writer.flush_stream()?;
1984 }
1985
1986 // Update counters (lock already released for these atomic ops)
1987 drop(writer_guard);
1988 self.total_rows
1989 .fetch_add(row_count as u64, Ordering::Relaxed);
1990 self.chunks_sent.fetch_add(1, Ordering::Relaxed);
1991
1992 debug!(
1993 target: "hyperdb_api",
1994 chunk = self.chunks_sent.load(Ordering::Relaxed),
1995 rows = row_count,
1996 bytes = data_to_send.len(),
1997 "chunk-sender"
1998 );
1999
2000 Ok(())
2001 }
2002
2003 /// Returns the total number of rows sent so far.
2004 pub fn total_rows(&self) -> u64 {
2005 self.total_rows.load(Ordering::Relaxed)
2006 }
2007
2008 /// Returns the number of chunks sent so far.
2009 pub fn chunks_sent(&self) -> usize {
2010 self.chunks_sent.load(Ordering::Relaxed)
2011 }
2012
2013 /// Finishes the COPY operation and returns the total row count.
2014 ///
2015 /// This method consumes the sender. After calling this, the COPY operation
2016 /// is complete and all data has been committed.
2017 ///
2018 /// # Errors
2019 ///
2020 /// - Returns [`Error::Other`] with message `"ChunkSender mutex poisoned"`
2021 /// if a sender thread panicked while holding the writer lock.
2022 /// - Returns [`Error::Client`] or [`Error::Io`] if sending the COPY
2023 /// trailer or finishing the COPY operation fails.
2024 pub fn finish(self) -> Result<u64> {
2025 let mut writer_guard = self
2026 .writer
2027 .lock()
2028 .map_err(|_| Error::new("ChunkSender mutex poisoned"))?;
2029
2030 // If no chunks were sent, return 0
2031 let Some(writer) = writer_guard.take() else {
2032 return Ok(0);
2033 };
2034
2035 // Write and send the COPY trailer
2036 let mut trailer_buf = BytesMut::with_capacity(2);
2037 copy::write_trailer(&mut trailer_buf);
2038
2039 // Need to get mutable access to send trailer
2040 let mut writer = writer;
2041 writer.send(&trailer_buf)?;
2042
2043 // Finish the COPY operation
2044 let rows = writer.finish()?;
2045
2046 info!(
2047 target: "hyperdb_api",
2048 rows,
2049 chunks = self.chunks_sent.load(Ordering::Relaxed),
2050 table = %self.table_name,
2051 "chunk-sender-finish"
2052 );
2053
2054 Ok(rows)
2055 }
2056}
2057
2058#[cfg(test)]
2059mod tests {
2060 use crate::table_definition::TableDefinition;
2061 use hyperdb_api_core::types::SqlType;
2062
2063 use super::InsertChunk;
2064
2065 fn create_test_table_def() -> TableDefinition {
2066 TableDefinition::new("test")
2067 .add_required_column("id", SqlType::int())
2068 .add_nullable_column("name", SqlType::text())
2069 }
2070
2071 #[test]
2072 fn test_inserter_column_validation() {
2073 // We can't fully test without a connection, but we can test validation logic
2074 let table_def = create_test_table_def();
2075 assert_eq!(table_def.column_count(), 2);
2076 }
2077
2078 #[test]
2079 fn test_insert_chunk_encoding() {
2080 let table_def = create_test_table_def();
2081 let mut chunk = InsertChunk::from_table_definition(&table_def);
2082
2083 // Add a row
2084 chunk.add_i32(42).unwrap();
2085 chunk.add_str("hello").unwrap();
2086 chunk.end_row().unwrap();
2087
2088 assert_eq!(chunk.row_count(), 1);
2089 assert!(!chunk.is_empty());
2090 assert!(!chunk.should_flush()); // Not at limit yet
2091
2092 // Add more rows
2093 for i in 0..100 {
2094 chunk.add_i32(i).unwrap();
2095 chunk.add_str(&format!("item {i}")).unwrap();
2096 chunk.end_row().unwrap();
2097 }
2098
2099 assert_eq!(chunk.row_count(), 101);
2100
2101 // Take the buffer
2102 let buffer = chunk.take().unwrap();
2103 assert!(!buffer.is_empty());
2104
2105 // Chunk should now be empty after take
2106 assert!(chunk.take().is_none());
2107 }
2108
2109 #[test]
2110 fn test_insert_chunk_null_handling() {
2111 let table_def = create_test_table_def();
2112 let mut chunk = InsertChunk::from_table_definition(&table_def);
2113
2114 // First column is NOT NULL, should fail
2115 assert!(chunk.add_null().is_err());
2116
2117 // Add the required column first
2118 chunk.add_i32(1).unwrap();
2119
2120 // Second column is nullable, should succeed
2121 chunk.add_null().unwrap();
2122 chunk.end_row().unwrap();
2123
2124 assert_eq!(chunk.row_count(), 1);
2125 }
2126
2127 #[test]
2128 fn test_insert_chunk_column_count_validation() {
2129 let table_def = create_test_table_def();
2130 let mut chunk = InsertChunk::from_table_definition(&table_def);
2131
2132 // Add only one column
2133 chunk.add_i32(1).unwrap();
2134
2135 // end_row should fail
2136 assert!(chunk.end_row().is_err());
2137
2138 // Add second column
2139 chunk.add_str("test").unwrap();
2140
2141 // Now end_row should succeed
2142 chunk.end_row().unwrap();
2143 }
2144
2145 #[test]
2146 fn test_insert_chunk_too_many_columns() {
2147 let table_def = create_test_table_def();
2148 let mut chunk = InsertChunk::from_table_definition(&table_def);
2149
2150 chunk.add_i32(1).unwrap();
2151 chunk.add_str("test").unwrap();
2152
2153 // Third column should fail
2154 assert!(chunk.add_i32(2).is_err());
2155 }
2156
2157 #[test]
2158 fn test_insert_chunk_clear() {
2159 let table_def = create_test_table_def();
2160 let mut chunk = InsertChunk::from_table_definition(&table_def);
2161
2162 chunk.add_i32(1).unwrap();
2163 chunk.add_str("test").unwrap();
2164 chunk.end_row().unwrap();
2165
2166 assert_eq!(chunk.row_count(), 1);
2167
2168 chunk.clear();
2169
2170 assert_eq!(chunk.row_count(), 0);
2171 assert!(chunk.is_empty());
2172 }
2173}