hyperdb_api/inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//! inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
29use tracing::{debug, info};
30
31use crate::catalog::Catalog;
32use crate::connection::Connection;
33use crate::error::{Error, Result};
34use crate::table_definition::TableDefinition;
35
36/// Initial buffer size (4 MB) to reduce early reallocations.
37///
38/// The COPY protocol sends data in chunks, and each chunk requires a
39/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
40/// during the first chunk for typical workloads while keeping initial
41/// memory allocation reasonable.
42const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
43
44/// Maximum buffer size per chunk before flushing to the server (16 MB).
45///
46/// This balances two competing concerns:
47/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
48/// network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
49/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
50/// keeps resident memory bounded even when rows are wide.
51///
52/// The 16 MB value was chosen empirically — it lands on the flat part of
53/// the throughput curve where further increases yield diminishing returns.
54const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
55
56/// Maximum rows per chunk before flushing to the server.
57///
58/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
59/// narrow rows (few bytes each), the byte limit alone would accumulate
60/// millions of rows before flushing, which delays server-side processing.
61/// 64K rows ensures timely flushes regardless of row width and aligns with
62/// the 64K chunk size used for query result streaming
63/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
64const CHUNK_ROW_LIMIT: usize = 64_000;
65
66/// A high-performance bulk data inserter.
67///
68/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
69/// using the COPY protocol with `HyperBinary` format for optimal performance.
70///
71/// # Example
72///
73/// ```no_run
74/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
75///
76/// fn main() -> Result<()> {
77/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
78///
79/// let table_def = TableDefinition::new("users")
80/// .add_required_column("id", SqlType::int())
81/// .add_nullable_column("name", SqlType::text());
82///
83/// Catalog::new(&conn).create_table(&table_def)?;
84///
85/// let mut inserter = Inserter::new(&conn, &table_def)?;
86///
87/// for i in 0..1000i32 {
88/// inserter.add_row(&[&i, &format!("User {}", i)])?;
89/// }
90///
91/// let rows = inserter.execute()?;
92/// println!("Inserted {} rows", rows);
93/// Ok(())
94/// }
95/// ```
96#[derive(Debug)]
97pub struct Inserter<'conn> {
98 connection: &'conn Connection,
99 table_def: TableDefinition,
100 /// The current chunk being populated (delegates encoding).
101 chunk: InsertChunk,
102 /// Total rows inserted across all chunks.
103 row_count: u64,
104 /// Number of chunks sent.
105 chunk_count: usize,
106 /// Active COPY writer (lazily initialized on first write).
107 writer: Option<CopyInWriter<'conn>>,
108 /// Start time for timing the insert operation.
109 start_time: Instant,
110}
111
112impl<'conn> Inserter<'conn> {
113 /// Creates a new inserter for the given table.
114 ///
115 /// The underlying COPY session is started lazily on the first flush or
116 /// execute, so construction is lightweight. However, the connection's
117 /// transport is validated eagerly — using a gRPC connection will return
118 /// an error immediately.
119 ///
120 /// # Errors
121 ///
122 /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
123 /// columns.
124 /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
125 /// (COPY is TCP-only).
126 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
127 if table_def.column_count() == 0 {
128 return Err(Error::invalid_table_definition(
129 "Table definition must have at least one column",
130 ));
131 }
132
133 // Fail fast: verify the connection supports COPY (TCP only)
134 if connection.tcp_client().is_none() {
135 return Err(Error::feature_not_supported(
136 "Inserter requires a TCP connection. \
137 gRPC connections do not support COPY operations.",
138 ));
139 }
140
141 Ok(Inserter {
142 connection,
143 table_def: table_def.clone(),
144 chunk: InsertChunk::from_table_definition(table_def),
145 row_count: 0,
146 chunk_count: 0,
147 writer: None,
148 start_time: Instant::now(),
149 })
150 }
151
152 /// Creates an inserter by querying the table schema from the database.
153 ///
154 /// This method queries the database to get the table definition automatically,
155 /// which is useful when you want to insert into an existing table without
156 /// manually specifying the schema.
157 ///
158 /// # Arguments
159 ///
160 /// * `connection` - The database connection.
161 /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
162 ///
163 /// # Errors
164 ///
165 /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
166 ///
167 /// # Example
168 ///
169 /// ```no_run
170 /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
171 ///
172 /// fn main() -> Result<()> {
173 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
174 ///
175 /// // Create a table first
176 /// conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
177 ///
178 /// // Create inserter by querying the schema directly from a string
179 /// let mut inserter = Inserter::from_table(&conn, "public.products")?;
180 ///
181 /// // Now we can insert data without knowing the exact schema
182 /// inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
183 /// inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
184 ///
185 /// let rows = inserter.execute()?;
186 /// println!("Inserted {} rows", rows);
187 /// Ok(())
188 /// }
189 /// ```
190 pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
191 where
192 T: TryInto<crate::TableName>,
193 crate::Error: From<T::Error>,
194 {
195 let catalog = Catalog::new(connection);
196 let table_def = catalog.get_table_definition(table_name)?;
197 Self::new(connection, &table_def)
198 }
199
200 /// Creates an inserter with column mappings that allow SQL expressions.
201 ///
202 /// This method uses a temporary table and INSERT...SELECT to support
203 /// column mappings with SQL expressions. Data is first inserted into
204 /// a temporary staging table, then transformed using the mappings.
205 ///
206 /// # Arguments
207 ///
208 /// * `connection` - The database connection.
209 /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
210 /// * `target_table` - The qualified name of the target table to insert into.
211 /// Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
212 /// * `mappings` - Column mappings defining how values are transformed.
213 ///
214 /// # Example
215 ///
216 /// ```no_run
217 /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
218 ///
219 /// fn main() -> Result<()> {
220 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
221 ///
222 /// // Target table with computed columns
223 /// conn.execute_command(r#"
224 /// CREATE TABLE orders (
225 /// id INT NOT NULL,
226 /// product TEXT,
227 /// quantity INT,
228 /// price DOUBLE PRECISION,
229 /// total DOUBLE PRECISION,
230 /// created_at TIMESTAMP
231 /// )
232 /// "#)?;
233 ///
234 /// // Inserter definition - what we provide
235 /// let inserter_def = TableDefinition::new("_stage")
236 /// .add_required_column("id", SqlType::int())
237 /// .add_nullable_column("product", SqlType::text())
238 /// .add_nullable_column("quantity", SqlType::int())
239 /// .add_nullable_column("price", SqlType::double());
240 ///
241 /// // Column mappings - how values are transformed
242 /// let mappings = vec![
243 /// ColumnMapping::new("id"),
244 /// ColumnMapping::new("product"),
245 /// ColumnMapping::new("quantity"),
246 /// ColumnMapping::new("price"),
247 /// ColumnMapping::with_expression("total", "quantity * price"),
248 /// ColumnMapping::with_expression("created_at", "NOW()"),
249 /// ];
250 ///
251 /// // For simple table names in the public schema, use quoted name
252 /// // For qualified names, use target_table_def.qualified_name()
253 /// let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
254 ///
255 /// inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
256 /// inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
257 ///
258 /// let rows = inserter.execute()?;
259 /// Ok(())
260 /// }
261 /// ```
262 ///
263 /// # Errors
264 ///
265 /// - Returns an error if `target_table` fails to convert into a
266 /// [`TableName`](crate::TableName).
267 /// - Returns [`Error::Server`] if creating the temporary staging table
268 /// fails on the server.
269 /// - Returns the errors from [`Inserter::new`] for the staging table
270 /// (zero-column table definition, gRPC transport).
271 pub fn with_column_mappings<T>(
272 connection: &'conn Connection,
273 inserter_def: &TableDefinition,
274 target_table: T,
275 mappings: &[ColumnMapping],
276 ) -> Result<MappedInserter<'conn>>
277 where
278 T: TryInto<crate::TableName>,
279 crate::Error: From<T::Error>,
280 {
281 MappedInserter::new(connection, inserter_def, target_table, mappings)
282 }
283
284 /// Returns the table definition.
285 pub fn table_definition(&self) -> &TableDefinition {
286 &self.table_def
287 }
288
289 /// Returns the number of columns.
290 #[must_use]
291 pub fn column_count(&self) -> usize {
292 self.table_def.column_count()
293 }
294
295 /// Returns the number of complete rows buffered.
296 #[must_use]
297 pub fn row_count(&self) -> u64 {
298 self.row_count
299 }
300
301 /// Adds a NULL value for the current column.
302 ///
303 /// # Errors
304 ///
305 /// Returns [`Error::InvalidTableDefinition`] if the current row already has all columns
306 /// supplied, or if the current column is marked `NOT NULL` in the table
307 /// definition.
308 #[inline]
309 pub fn add_null(&mut self) -> Result<()> {
310 self.chunk.add_null()
311 }
312
313 /// Adds a boolean value.
314 ///
315 /// # Errors
316 ///
317 /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
318 /// the current row already has all columns supplied.
319 #[inline]
320 pub fn add_bool(&mut self, value: bool) -> Result<()> {
321 self.chunk.add_bool(value)
322 }
323
324 /// Adds an i16 value (SMALLINT).
325 ///
326 /// # Errors
327 ///
328 /// See [`add_bool`](Self::add_bool).
329 #[inline]
330 pub fn add_i16(&mut self, value: i16) -> Result<()> {
331 self.chunk.add_i16(value)
332 }
333
334 /// Adds an i32 value (INT).
335 ///
336 /// # Errors
337 ///
338 /// See [`add_bool`](Self::add_bool).
339 #[inline]
340 pub fn add_i32(&mut self, value: i32) -> Result<()> {
341 self.chunk.add_i32(value)
342 }
343
344 /// Adds an i64 value (BIGINT).
345 ///
346 /// # Errors
347 ///
348 /// See [`add_bool`](Self::add_bool).
349 #[inline]
350 pub fn add_i64(&mut self, value: i64) -> Result<()> {
351 self.chunk.add_i64(value)
352 }
353
354 /// Adds an f32 value (REAL/FLOAT4).
355 ///
356 /// # Errors
357 ///
358 /// See [`add_bool`](Self::add_bool).
359 #[inline]
360 pub fn add_f32(&mut self, value: f32) -> Result<()> {
361 self.chunk.add_f32(value)
362 }
363
364 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
365 ///
366 /// # Errors
367 ///
368 /// See [`add_bool`](Self::add_bool).
369 #[inline]
370 pub fn add_f64(&mut self, value: f64) -> Result<()> {
371 self.chunk.add_f64(value)
372 }
373
374 /// Adds a string value (TEXT/VARCHAR).
375 ///
376 /// # Errors
377 ///
378 /// See [`add_bool`](Self::add_bool).
379 #[inline]
380 pub fn add_str(&mut self, value: &str) -> Result<()> {
381 self.chunk.add_str(value)
382 }
383
384 /// Adds a bytes value (BYTEA).
385 ///
386 /// # Errors
387 ///
388 /// See [`add_bool`](Self::add_bool).
389 #[inline]
390 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
391 self.chunk.add_bytes(value)
392 }
393
394 /// Adds a 128-bit value (NUMERIC/INTERVAL).
395 ///
396 /// # Errors
397 ///
398 /// See [`add_bool`](Self::add_bool).
399 #[inline]
400 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
401 self.chunk.add_data128(value)
402 }
403
404 /// Adds an optional value. If None, adds NULL.
405 ///
406 /// # Errors
407 ///
408 /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
409 /// return for the current row position.
410 pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
411 where
412 F: FnOnce(&mut Self, T) -> Result<()>,
413 {
414 match value {
415 Some(v) => add_fn(self, v),
416 None => self.add_null(),
417 }
418 }
419
420 /// Ends the current row.
421 ///
422 /// Returns an error if the wrong number of columns were added.
423 /// Automatically flushes the buffer if chunk limits are reached.
424 ///
425 /// # Errors
426 ///
427 /// - Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
428 /// than the table definition requires.
429 /// - Returns any error from [`flush`](Self::flush) when an automatic
430 /// flush is triggered by reaching the chunk byte/row limit.
431 pub fn end_row(&mut self) -> Result<()> {
432 self.chunk.end_row()?;
433 self.row_count += 1;
434
435 // Auto-flush if we've reached chunk limits
436 if self.chunk.should_flush() {
437 self.flush()?;
438 }
439
440 Ok(())
441 }
442
443 /// Flushes the current buffer to the server.
444 ///
445 /// This sends all buffered rows as a chunk and resets the buffer.
446 /// Called automatically when chunk limits are reached.
447 ///
448 /// # Errors
449 ///
450 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
451 /// (COPY is TCP-only) and no COPY session exists yet.
452 /// - Returns [`Error::Server`] if the server rejects the `COPY IN` start
453 /// or the subsequent data send.
454 /// - Returns [`Error::Io`] on transport-level I/O failures while writing
455 /// the chunk.
456 pub fn flush(&mut self) -> Result<()> {
457 if self.chunk.is_empty() {
458 return Ok(());
459 }
460
461 let chunk_rows = self.chunk.row_count();
462 let Some(buffer) = self.chunk.take() else {
463 return Ok(());
464 };
465
466 // Ensure the COPY connection is started
467 if self.writer.is_none() {
468 let client = self.connection.tcp_client().ok_or_else(|| {
469 crate::Error::feature_not_supported(
470 "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
471 )
472 })?;
473 let columns: Vec<&str> = self
474 .table_def
475 .columns
476 .iter()
477 .map(|c| c.name.as_str())
478 .collect();
479 let table_name = self.table_def.qualified_name();
480 self.writer = Some(client.copy_in(&table_name, &columns)?);
481 }
482
483 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
484 // into the connection's write buffer. flush_stream ensures the data
485 // reaches the server before we return.
486 if let Some(ref mut writer) = self.writer {
487 writer.send_direct(&buffer)?;
488 writer.flush_stream()?;
489 }
490
491 debug!(
492 target: "hyperdb_api",
493 chunk = self.chunk_count,
494 rows = chunk_rows,
495 bytes = buffer.len(),
496 "inserter-chunk"
497 );
498
499 self.chunk_count += 1;
500 Ok(())
501 }
502
503 /// Adds a complete row of values.
504 ///
505 /// This is a convenience method that adds all column values at once
506 /// using the `IntoValue` trait for type-safe insertion.
507 ///
508 /// # Arguments
509 ///
510 /// * `values` - A slice of values implementing `IntoValue`.
511 ///
512 /// # Errors
513 ///
514 /// Returns an error if the number of values doesn't match the column count,
515 /// or if any value cannot be added.
516 ///
517 /// # Example
518 ///
519 /// ```no_run
520 /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
521 ///
522 /// fn main() -> Result<()> {
523 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
524 ///
525 /// let table_def = TableDefinition::new("users")
526 /// .add_required_column("id", SqlType::int())
527 /// .add_nullable_column("name", SqlType::text());
528 ///
529 /// Catalog::new(&conn).create_table(&table_def)?;
530 ///
531 /// let mut inserter = Inserter::new(&conn, &table_def)?;
532 ///
533 /// // Add rows using IntoValue trait
534 /// inserter.add_row(&[&1i32, &"Alice"])?;
535 /// inserter.add_row(&[&2i32, &"Bob"])?;
536 ///
537 /// // Option<T> can be used for nullable columns
538 /// inserter.add_row(&[&3i32, &None::<&str>])?;
539 ///
540 /// let rows = inserter.execute()?;
541 /// Ok(())
542 /// }
543 /// ```
544 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
545 let column_count = self.table_def.column_count();
546 if values.len() != column_count {
547 return Err(Error::invalid_table_definition(format!(
548 "Column count mismatch: expected {} columns but got {}",
549 column_count,
550 values.len()
551 )));
552 }
553
554 for value in values {
555 value.add_to_inserter(self)?;
556 }
557
558 self.end_row()?;
559 Ok(())
560 }
561
562 /// Adds a Date value.
563 ///
564 /// # Errors
565 ///
566 /// See [`add_bool`](Self::add_bool).
567 #[inline]
568 pub fn add_date(&mut self, value: Date) -> Result<()> {
569 self.chunk.add_date(value)
570 }
571
572 /// Adds a Time value.
573 ///
574 /// # Errors
575 ///
576 /// See [`add_bool`](Self::add_bool).
577 #[inline]
578 pub fn add_time(&mut self, value: Time) -> Result<()> {
579 self.chunk.add_time(value)
580 }
581
582 /// Adds a Timestamp value.
583 ///
584 /// # Errors
585 ///
586 /// See [`add_bool`](Self::add_bool).
587 #[inline]
588 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
589 self.chunk.add_timestamp(value)
590 }
591
592 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
593 ///
594 /// # Errors
595 ///
596 /// See [`add_bool`](Self::add_bool).
597 #[inline]
598 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
599 self.chunk.add_offset_timestamp(value)
600 }
601
602 /// Adds an Interval value.
603 ///
604 /// # Errors
605 ///
606 /// See [`add_bool`](Self::add_bool).
607 #[inline]
608 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
609 self.chunk.add_interval(value)
610 }
611
612 /// Adds a Numeric value.
613 ///
614 /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
615 /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
616 ///
617 /// # Errors
618 ///
619 /// Returns an error if the column's precision cannot be determined from the
620 /// table definition. Ensure that NUMERIC columns are defined with explicit
621 /// `SqlType` information including precision.
622 pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
623 let column_index = self.chunk.column_index();
624
625 // Check the column's precision to determine storage format
626 let precision = self
627 .table_def
628 .columns
629 .get(column_index)
630 .and_then(super::table_definition::ColumnDefinition::sql_type)
631 .and_then(|t| t.precision())
632 .ok_or_else(|| {
633 let col_name = self
634 .table_def
635 .columns
636 .get(column_index)
637 .map_or("<unknown>", |c| c.name.as_str());
638 Error::conversion(format!(
639 "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
640 Ensure the column is defined with explicit SqlType including precision.\n\n\
641 Example fix:\n \
642 table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
643 ))
644 })?;
645
646 if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
647 // Small numeric: stored as i64
648 let unscaled = value.unscaled_value();
649 let narrowed = i64::try_from(unscaled).map_err(|_| {
650 Error::conversion(format!(
651 "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
652 ))
653 })?;
654 self.chunk.add_i64(narrowed)
655 } else {
656 // Big numeric: stored as 128-bit
657 self.chunk.add_data128(&value.to_packed())
658 }
659 }
660
661 /// Executes the insert and commits all buffered rows.
662 ///
663 /// This sends any remaining buffered data and finishes the COPY operation.
664 /// Returns the number of rows inserted.
665 ///
666 /// The inserter is single-use: calling `execute` a second time returns
667 /// `Ok(0)` because the internal row counter has been reset and no further
668 /// data has been added. To insert additional batches, create a new
669 /// [`Inserter`].
670 ///
671 /// # Errors
672 ///
673 /// Returns an error if:
674 /// - There's an incomplete row (`column_index` != 0)
675 /// - The COPY connection fails to start
676 /// - Sending data fails
677 pub fn execute(&mut self) -> Result<u64> {
678 if self.chunk.column_index() != 0 {
679 return Err(Error::invalid_table_definition(
680 "Incomplete row at execute time",
681 ));
682 }
683
684 if self.row_count == 0 {
685 return Ok(0);
686 }
687
688 // Ensure COPY connection exists before proceeding when we have rows
689 if self.writer.is_none() {
690 let client = self.connection.tcp_client().ok_or_else(|| {
691 Error::feature_not_supported(
692 "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
693 )
694 })?;
695 let columns: Vec<&str> = self
696 .table_def
697 .columns
698 .iter()
699 .map(|c| c.name.as_str())
700 .collect();
701 let table_name = self.table_def.qualified_name();
702 self.writer = Some(client.copy_in(&table_name, &columns)?);
703 }
704
705 // At this point, writer must exist since we have rows
706 let writer = self
707 .writer
708 .as_mut()
709 .ok_or_else(|| Error::internal("Failed to initialize COPY connection for inserter"))?;
710
711 // If we have buffered data that hasn't been sent yet
712 if !self.chunk.is_empty() {
713 writer.send(self.chunk.buffer())?;
714 }
715
716 // Write and send the COPY trailer
717 let mut trailer_buf = BytesMut::with_capacity(2);
718 copy::write_trailer(&mut trailer_buf);
719 writer.send(&trailer_buf)?;
720
721 // Finish the COPY operation
722 let rows = self
723 .writer
724 .take()
725 .map(hyperdb_api_core::client::CopyInWriter::finish)
726 .transpose()?
727 .unwrap_or(0);
728
729 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
730 info!(
731 target: "hyperdb_api",
732 rows,
733 chunks = self.chunk_count,
734 duration_ms,
735 table = %self.table_def.qualified_name(),
736 "inserter-end"
737 );
738
739 // Reset row counter so a stray second execute() call returns Ok(0)
740 // instead of attempting another COPY trailer on a finished writer.
741 self.row_count = 0;
742
743 Ok(rows)
744 }
745
746 /// Cancels the insert and discards all buffered rows.
747 pub fn cancel(&mut self) {
748 // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
749 // sends a CopyFail to the server.
750 self.writer = None;
751 self.row_count = 0;
752 }
753}
754
755// =============================================================================
756// ColumnMapping
757// =============================================================================
758
759/// Defines how a column receives its value during insertion.
760///
761/// Column mappings allow you to:
762/// - Insert values directly from the inserter stream
763/// - Compute values using SQL expressions
764/// - Use server-side functions like `NOW()` or `DEFAULT`
765///
766/// # Example
767///
768/// ```
769/// use hyperdb_api::ColumnMapping;
770///
771/// // Simple column - insert value directly
772/// let id_col = ColumnMapping::new("id");
773///
774/// // Column with expression - computed value
775/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
776/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
777/// ```
778#[derive(Debug, Clone)]
779#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
780pub struct ColumnMapping {
781 /// The name of the target column.
782 pub column_name: String,
783 /// Optional SQL expression. If None, the value is inserted directly.
784 pub expression: Option<String>,
785}
786
787impl ColumnMapping {
788 /// Creates a column mapping for direct value insertion.
789 ///
790 /// The column will receive values directly from the inserter.
791 pub fn new(column_name: impl Into<String>) -> Self {
792 ColumnMapping {
793 column_name: column_name.into(),
794 expression: None,
795 }
796 }
797
798 /// Creates a column mapping with a SQL expression.
799 ///
800 /// The column value will be computed using the given SQL expression.
801 /// The expression can reference other columns or use SQL functions.
802 ///
803 /// # Arguments
804 ///
805 /// * `column_name` - The name of the target column.
806 /// * `expression` - A SQL expression to compute the column value.
807 ///
808 /// # Example
809 ///
810 /// ```
811 /// use hyperdb_api::ColumnMapping;
812 ///
813 /// // Use current timestamp
814 /// let created = ColumnMapping::with_expression("created_at", "NOW()");
815 ///
816 /// // Compute from other columns
817 /// let total = ColumnMapping::with_expression("total", "quantity * price");
818 /// ```
819 pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
820 ColumnMapping {
821 column_name: column_name.into(),
822 expression: Some(expression.into()),
823 }
824 }
825
826 /// Returns the column name.
827 #[must_use]
828 pub fn column_name(&self) -> &str {
829 &self.column_name
830 }
831
832 /// Returns the SQL expression, if any.
833 #[must_use]
834 pub fn expression(&self) -> Option<&str> {
835 self.expression.as_deref()
836 }
837
838 /// Returns true if this is a direct value mapping (no expression).
839 #[must_use]
840 pub fn is_direct(&self) -> bool {
841 self.expression.is_none()
842 }
843
844 /// Returns the select list item for this mapping.
845 fn to_select_item(&self) -> String {
846 match &self.expression {
847 Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
848 None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
849 }
850 }
851}
852
853// =============================================================================
854// IntoValue Trait
855// =============================================================================
856
857/// Trait for types that can be inserted into a Hyper table.
858///
859/// This trait is implemented for common Rust types, allowing them to be
860/// used with [`Inserter::add_row()`] for type-safe insertion.
861///
862/// # Supported Types
863///
864/// - Integers: `i16`, `i32`, `i64`
865/// - Floats: `f32`, `f64`
866/// - `bool`
867/// - `&str`, `String`
868/// - `Option<T>` where `T: IntoValue` (for nullable columns)
869/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
870/// - `Numeric`, `Geography`, `Oid`, `Vec<u8>` (bytes)
871///
872/// # Example
873///
874/// ```no_run
875/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
876///
877/// fn main() -> Result<()> {
878/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
879///
880/// let table_def = TableDefinition::new("example")
881/// .add_required_column("a", SqlType::int())
882/// .add_nullable_column("b", SqlType::text())
883/// .add_nullable_column("c", SqlType::double());
884/// Catalog::new(&conn).create_table(&table_def)?;
885///
886/// let mut inserter = Inserter::new(&conn, &table_def)?;
887///
888/// // IntoValue allows adding rows with mixed types
889/// inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
890/// inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
891///
892/// inserter.execute()?;
893/// Ok(())
894/// }
895/// ```
896pub trait IntoValue {
897 /// Adds this value to the inserter.
898 ///
899 /// # Errors
900 ///
901 /// Implementations call the matching `Inserter::add_*` method and
902 /// forward its error — see [`Inserter::add_bool`] for the shared
903 /// failure modes (too many columns, NULL into non-nullable, etc).
904 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
905}
906
907// Implementations for basic types
908
909impl IntoValue for bool {
910 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
911 inserter.add_bool(*self)
912 }
913}
914
915impl IntoValue for i16 {
916 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
917 inserter.add_i16(*self)
918 }
919}
920
921impl IntoValue for i32 {
922 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923 inserter.add_i32(*self)
924 }
925}
926
927impl IntoValue for i64 {
928 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929 inserter.add_i64(*self)
930 }
931}
932
933impl IntoValue for f32 {
934 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935 inserter.add_f32(*self)
936 }
937}
938
939impl IntoValue for f64 {
940 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941 inserter.add_f64(*self)
942 }
943}
944
945impl IntoValue for str {
946 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947 inserter.add_str(self)
948 }
949}
950
951impl IntoValue for String {
952 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953 inserter.add_str(self)
954 }
955}
956
957impl IntoValue for [u8] {
958 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959 inserter.add_bytes(self)
960 }
961}
962
963impl IntoValue for Vec<u8> {
964 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
965 inserter.add_bytes(self)
966 }
967}
968
969// Hyper-specific types
970
971impl IntoValue for Date {
972 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
973 inserter.add_date(*self)
974 }
975}
976
977impl IntoValue for Time {
978 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
979 inserter.add_time(*self)
980 }
981}
982
983impl IntoValue for Timestamp {
984 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985 inserter.add_timestamp(*self)
986 }
987}
988
989impl IntoValue for OffsetTimestamp {
990 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991 inserter.add_offset_timestamp(*self)
992 }
993}
994
995impl IntoValue for Interval {
996 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997 inserter.add_interval(*self)
998 }
999}
1000
1001impl IntoValue for Numeric {
1002 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1003 inserter.add_numeric(*self)
1004 }
1005}
1006
1007// Option<T> for nullable values
1008impl<T: IntoValue> IntoValue for Option<T> {
1009 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1010 match self {
1011 Some(value) => value.add_to_inserter(inserter),
1012 None => inserter.add_null(),
1013 }
1014 }
1015}
1016
1017// Reference implementations for primitives
1018
1019impl IntoValue for &bool {
1020 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021 inserter.add_bool(**self)
1022 }
1023}
1024
1025impl IntoValue for &i16 {
1026 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1027 inserter.add_i16(**self)
1028 }
1029}
1030
1031impl IntoValue for &i32 {
1032 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1033 inserter.add_i32(**self)
1034 }
1035}
1036
1037impl IntoValue for &i64 {
1038 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039 inserter.add_i64(**self)
1040 }
1041}
1042
1043impl IntoValue for &f32 {
1044 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045 inserter.add_f32(**self)
1046 }
1047}
1048
1049impl IntoValue for &f64 {
1050 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051 inserter.add_f64(**self)
1052 }
1053}
1054
1055impl IntoValue for &String {
1056 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057 inserter.add_str(self)
1058 }
1059}
1060
1061impl IntoValue for &str {
1062 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063 inserter.add_str(self)
1064 }
1065}
1066
1067impl IntoValue for &&str {
1068 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069 inserter.add_str(self)
1070 }
1071}
1072
1073impl IntoValue for &[u8] {
1074 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075 inserter.add_bytes(self)
1076 }
1077}
1078
1079impl IntoValue for &Date {
1080 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081 inserter.add_date(**self)
1082 }
1083}
1084
1085impl IntoValue for &Time {
1086 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087 inserter.add_time(**self)
1088 }
1089}
1090
1091impl IntoValue for &Timestamp {
1092 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093 inserter.add_timestamp(**self)
1094 }
1095}
1096
1097impl IntoValue for &OffsetTimestamp {
1098 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099 inserter.add_offset_timestamp(**self)
1100 }
1101}
1102
1103impl IntoValue for &Interval {
1104 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105 inserter.add_interval(**self)
1106 }
1107}
1108
1109impl IntoValue for &Numeric {
1110 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1111 inserter.add_numeric(**self)
1112 }
1113}
1114
1115// =============================================================================
1116// MappedInserter
1117// =============================================================================
1118
1119/// An inserter that supports SQL expression mappings.
1120///
1121/// This inserter uses a staging table to support computed columns via
1122/// INSERT...SELECT with SQL expressions. It's created by
1123/// [`Inserter::with_column_mappings`].
1124#[derive(Debug)]
1125pub struct MappedInserter<'conn> {
1126 /// The underlying inserter for the staging table.
1127 inner: Inserter<'conn>,
1128 /// The target table name.
1129 target_table: crate::TableName,
1130 /// The column mappings.
1131 mappings: Vec<ColumnMapping>,
1132 /// The staging table name.
1133 staging_table: String,
1134}
1135
1136impl<'conn> MappedInserter<'conn> {
1137 /// Creates a new mapped inserter.
1138 fn new<T>(
1139 connection: &'conn Connection,
1140 inserter_def: &TableDefinition,
1141 target_table: T,
1142 mappings: &[ColumnMapping],
1143 ) -> Result<Self>
1144 where
1145 T: TryInto<crate::TableName>,
1146 crate::Error: From<T::Error>,
1147 {
1148 let target_table = target_table.try_into()?;
1149
1150 // Create a unique staging table name
1151 let staging_table = format!("_hyper_staging_{}", std::process::id());
1152
1153 // Create the staging table definition (temporary)
1154 let mut staging_def = inserter_def.clone();
1155 staging_def.name.clone_from(&staging_table);
1156
1157 // Create the staging table
1158 let create_sql = staging_def.to_create_sql(true)?;
1159 let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1160 connection.execute_command(&create_temp)?;
1161
1162 // Create the inner inserter for the staging table
1163 let inner = Inserter::new(connection, &staging_def)?;
1164
1165 Ok(MappedInserter {
1166 inner,
1167 target_table,
1168 mappings: mappings.to_vec(),
1169 staging_table,
1170 })
1171 }
1172
1173 /// Adds a row of values to the inserter.
1174 ///
1175 /// The values should correspond to the columns in the inserter definition,
1176 /// not the target table.
1177 ///
1178 /// # Errors
1179 ///
1180 /// Forwards the error from [`Inserter::add_row`].
1181 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1182 self.inner.add_row(values)
1183 }
1184
1185 /// Adds a NULL value.
1186 ///
1187 /// # Errors
1188 ///
1189 /// Forwards the error from [`Inserter::add_null`].
1190 pub fn add_null(&mut self) -> Result<()> {
1191 self.inner.add_null()
1192 }
1193
1194 /// Adds a boolean value.
1195 ///
1196 /// # Errors
1197 ///
1198 /// Forwards the error from [`Inserter::add_bool`].
1199 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1200 self.inner.add_bool(value)
1201 }
1202
1203 /// Adds an i16 value.
1204 ///
1205 /// # Errors
1206 ///
1207 /// Forwards the error from [`Inserter::add_i16`].
1208 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1209 self.inner.add_i16(value)
1210 }
1211
1212 /// Adds an i32 value.
1213 ///
1214 /// # Errors
1215 ///
1216 /// Forwards the error from [`Inserter::add_i32`].
1217 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1218 self.inner.add_i32(value)
1219 }
1220
1221 /// Adds an i64 value.
1222 ///
1223 /// # Errors
1224 ///
1225 /// Forwards the error from [`Inserter::add_i64`].
1226 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1227 self.inner.add_i64(value)
1228 }
1229
1230 /// Adds an f32 value.
1231 ///
1232 /// # Errors
1233 ///
1234 /// Forwards the error from [`Inserter::add_f32`].
1235 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1236 self.inner.add_f32(value)
1237 }
1238
1239 /// Adds an f64 value.
1240 ///
1241 /// # Errors
1242 ///
1243 /// Forwards the error from [`Inserter::add_f64`].
1244 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1245 self.inner.add_f64(value)
1246 }
1247
1248 /// Adds a string value.
1249 ///
1250 /// # Errors
1251 ///
1252 /// Forwards the error from [`Inserter::add_str`].
1253 pub fn add_str(&mut self, value: &str) -> Result<()> {
1254 self.inner.add_str(value)
1255 }
1256
1257 /// Adds a bytes value.
1258 ///
1259 /// # Errors
1260 ///
1261 /// Forwards the error from [`Inserter::add_bytes`].
1262 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1263 self.inner.add_bytes(value)
1264 }
1265
1266 /// Ends the current row.
1267 ///
1268 /// # Errors
1269 ///
1270 /// Forwards the error from [`Inserter::end_row`].
1271 pub fn end_row(&mut self) -> Result<()> {
1272 self.inner.end_row()
1273 }
1274
1275 /// Executes the insert with column mappings.
1276 ///
1277 /// This method:
1278 /// 1. Inserts all buffered rows into the staging table
1279 /// 2. Executes INSERT...SELECT from staging to target with mappings
1280 /// 3. Drops the staging table
1281 ///
1282 /// Returns the number of rows inserted into the target table.
1283 ///
1284 /// # Errors
1285 ///
1286 /// - Returns the error from the inner [`Inserter::execute`] if writing
1287 /// the staging rows fails.
1288 /// - Returns [`Error::Server`] if the `INSERT ... SELECT` from staging
1289 /// to the target table is rejected (e.g. a mapping expression fails
1290 /// to evaluate).
1291 /// - Returns [`Error::Server`] if dropping the staging table fails.
1292 pub fn execute(&mut self) -> Result<u64> {
1293 let connection = self.inner.connection;
1294 let staging_table = self.staging_table.clone();
1295
1296 // Insert data into staging table
1297 let _staging_rows = self.inner.execute()?;
1298
1299 // Build the INSERT...SELECT statement
1300 use hyperdb_api_core::protocol::escape::SqlIdentifier;
1301
1302 let target_columns: Vec<String> = self
1303 .mappings
1304 .iter()
1305 .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1306 .collect();
1307
1308 let select_items: Vec<String> = self
1309 .mappings
1310 .iter()
1311 .map(ColumnMapping::to_select_item)
1312 .collect();
1313
1314 let sql = format!(
1315 "INSERT INTO {} ({}) SELECT {} FROM {}",
1316 self.target_table,
1317 target_columns.join(", "),
1318 select_items.join(", "),
1319 SqlIdentifier(&staging_table),
1320 );
1321
1322 // Execute the INSERT...SELECT (returns row count directly)
1323 let row_count = connection.execute_command(&sql)?;
1324
1325 // Drop the staging table
1326 connection.execute_command(&format!(
1327 "DROP TABLE IF EXISTS {}",
1328 SqlIdentifier(&staging_table)
1329 ))?;
1330
1331 // Return the number of rows inserted
1332 Ok(row_count)
1333 }
1334
1335 /// Cancels the insert and drops the staging table.
1336 ///
1337 /// This method handles cleanup failures gracefully by logging warnings
1338 /// instead of returning errors. This prevents masking the original error
1339 /// that caused the cancellation.
1340 ///
1341 /// # Logging
1342 ///
1343 /// Cleanup failures are logged using the `tracing` crate at WARN level.
1344 /// If `tracing` is not initialized, errors are written to stderr.
1345 pub fn cancel(&mut self) {
1346 let connection = self.inner.connection;
1347 let staging_table = &self.staging_table;
1348
1349 // Drop the staging table, but don't fail if cleanup fails
1350 // This avoids masking the original error that caused cancellation
1351 if let Err(e) = connection.execute_command(&format!(
1352 "DROP TABLE IF EXISTS \"{}\"",
1353 staging_table.replace('"', "\"\"")
1354 )) {
1355 // Log the cleanup failure for debugging
1356 // In production, consider using a logging framework like `tracing`
1357 eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1358 }
1359 }
1360}
1361
1362// =============================================================================
1363// InsertChunk - Thread-safe chunk for parallel encoding
1364// =============================================================================
1365
1366/// A thread-safe chunk for encoding rows in parallel.
1367///
1368/// `InsertChunk` can be created and populated in any thread, then sent to a
1369/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1370/// multiple worker threads while serializing the actual network sends.
1371///
1372/// # Example
1373///
1374/// ```no_run
1375/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1376///
1377/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1378/// let mut chunk = InsertChunk::from_table_definition(table_def);
1379///
1380/// for i in 0..1000 {
1381/// chunk.add_i32(start_id + i)?;
1382/// chunk.add_str(&format!("Item {}", start_id + i))?;
1383/// chunk.end_row()?;
1384/// }
1385///
1386/// Ok(chunk)
1387/// }
1388/// ```
1389#[derive(Debug)]
1390pub struct InsertChunk {
1391 buffer: BytesMut,
1392 header_written: bool,
1393 column_index: usize,
1394 column_count: usize,
1395 row_count: usize,
1396 column_nullable: Vec<bool>,
1397}
1398
1399// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1400// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1401// thread-local state. The manual `unsafe impl` exists only because the
1402// auto-trait derivation is conservative for this struct's compilation context;
1403// the compound type has no `!Send` components.
1404unsafe impl Send for InsertChunk {}
1405// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1406// and there is no interior mutability crossing a `&InsertChunk` boundary,
1407// so sharing `&InsertChunk` across threads is sound.
1408unsafe impl Sync for InsertChunk {}
1409
1410impl InsertChunk {
1411 /// Creates a new empty chunk with the given schema.
1412 ///
1413 /// # Arguments
1414 ///
1415 /// * `column_count` - Number of columns per row
1416 /// * `column_nullable` - Whether each column is nullable
1417 #[must_use]
1418 pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1419 debug_assert_eq!(column_count, column_nullable.len());
1420 InsertChunk {
1421 buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1422 header_written: false,
1423 column_index: 0,
1424 column_count,
1425 row_count: 0,
1426 column_nullable,
1427 }
1428 }
1429
1430 /// Creates a chunk from a table definition.
1431 #[must_use]
1432 pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1433 let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1434 Self::new(table_def.column_count(), column_nullable)
1435 }
1436
1437 /// Returns the number of complete rows in this chunk.
1438 #[must_use]
1439 pub fn row_count(&self) -> usize {
1440 self.row_count
1441 }
1442
1443 /// Returns the current buffer size in bytes.
1444 #[must_use]
1445 pub fn buffer_size(&self) -> usize {
1446 self.buffer.len()
1447 }
1448
1449 /// Returns true if the chunk has reached size or row limits and should be sent.
1450 #[must_use]
1451 pub fn should_flush(&self) -> bool {
1452 self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1453 }
1454
1455 /// Returns true if the chunk is empty (no rows).
1456 #[must_use]
1457 pub fn is_empty(&self) -> bool {
1458 self.row_count == 0
1459 }
1460
1461 /// Takes the buffer, consuming the chunk data.
1462 ///
1463 /// Returns `None` if the chunk is empty. After calling this, the chunk
1464 /// can be reused by calling the add_* methods again.
1465 ///
1466 /// Note: The header flag is NOT reset - subsequent chunks from the same
1467 /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1468 /// header per COPY stream).
1469 pub fn take(&mut self) -> Option<BytesMut> {
1470 if self.row_count == 0 {
1471 return None;
1472 }
1473 // Don't reset header_written - only first chunk should have header
1474 self.row_count = 0;
1475 Some(std::mem::take(&mut self.buffer))
1476 }
1477
1478 /// Resets the chunk for reuse without reallocating.
1479 pub fn clear(&mut self) {
1480 self.buffer.clear();
1481 self.header_written = false;
1482 self.column_index = 0;
1483 self.row_count = 0;
1484 }
1485
1486 #[allow(
1487 clippy::inline_always,
1488 reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1489 )]
1490 fn ensure_header(&mut self) {
1491 if !self.header_written {
1492 copy::write_header(&mut self.buffer);
1493 self.header_written = true;
1494 }
1495 }
1496
1497 #[expect(
1498 clippy::inline_always,
1499 reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1500 )]
1501 #[inline(always)]
1502 fn current_column_nullable(&self) -> bool {
1503 *self.column_nullable.get(self.column_index).unwrap_or(&true)
1504 }
1505
1506 /// Adds a NULL value for the current column.
1507 ///
1508 /// # Errors
1509 ///
1510 /// - Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"`
1511 /// if the current row already has all columns supplied.
1512 /// - Returns [`Error::InvalidTableDefinition`] with message
1513 /// `"Cannot add NULL to non-nullable column"` if the current column
1514 /// is `NOT NULL` in the schema.
1515 pub fn add_null(&mut self) -> Result<()> {
1516 if self.column_index >= self.column_count {
1517 return Err(Error::invalid_table_definition("Too many columns in row"));
1518 }
1519 if !self.current_column_nullable() {
1520 return Err(Error::invalid_table_definition(
1521 "Cannot add NULL to non-nullable column",
1522 ));
1523 }
1524 self.ensure_header();
1525 copy::write_null(&mut self.buffer);
1526 self.column_index += 1;
1527 Ok(())
1528 }
1529
1530 /// Adds a boolean value.
1531 ///
1532 /// # Errors
1533 ///
1534 /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
1535 /// the current row already has all columns supplied.
1536 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1537 if self.column_index >= self.column_count {
1538 return Err(Error::invalid_table_definition("Too many columns in row"));
1539 }
1540 self.ensure_header();
1541 let int_value = i8::from(value);
1542 if self.current_column_nullable() {
1543 copy::write_i8(&mut self.buffer, int_value);
1544 } else {
1545 copy::write_i8_not_null(&mut self.buffer, int_value);
1546 }
1547 self.column_index += 1;
1548 Ok(())
1549 }
1550
1551 /// Adds an i16 value (SMALLINT).
1552 ///
1553 /// # Errors
1554 ///
1555 /// See [`add_bool`](Self::add_bool).
1556 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1557 if self.column_index >= self.column_count {
1558 return Err(Error::invalid_table_definition("Too many columns in row"));
1559 }
1560 self.ensure_header();
1561 if self.current_column_nullable() {
1562 copy::write_i16(&mut self.buffer, value);
1563 } else {
1564 copy::write_i16_not_null(&mut self.buffer, value);
1565 }
1566 self.column_index += 1;
1567 Ok(())
1568 }
1569
1570 /// Adds an i32 value (INT).
1571 ///
1572 /// # Errors
1573 ///
1574 /// See [`add_bool`](Self::add_bool).
1575 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1576 if self.column_index >= self.column_count {
1577 return Err(Error::invalid_table_definition("Too many columns in row"));
1578 }
1579 self.ensure_header();
1580 if self.current_column_nullable() {
1581 copy::write_i32(&mut self.buffer, value);
1582 } else {
1583 copy::write_i32_not_null(&mut self.buffer, value);
1584 }
1585 self.column_index += 1;
1586 Ok(())
1587 }
1588
1589 /// Adds an i64 value (BIGINT).
1590 ///
1591 /// # Errors
1592 ///
1593 /// See [`add_bool`](Self::add_bool).
1594 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1595 if self.column_index >= self.column_count {
1596 return Err(Error::invalid_table_definition("Too many columns in row"));
1597 }
1598 self.ensure_header();
1599 if self.current_column_nullable() {
1600 copy::write_i64(&mut self.buffer, value);
1601 } else {
1602 copy::write_i64_not_null(&mut self.buffer, value);
1603 }
1604 self.column_index += 1;
1605 Ok(())
1606 }
1607
1608 /// Adds an f32 value (REAL/FLOAT4).
1609 ///
1610 /// # Errors
1611 ///
1612 /// See [`add_bool`](Self::add_bool).
1613 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1614 if self.column_index >= self.column_count {
1615 return Err(Error::invalid_table_definition("Too many columns in row"));
1616 }
1617 self.ensure_header();
1618 if self.current_column_nullable() {
1619 copy::write_f32(&mut self.buffer, value);
1620 } else {
1621 copy::write_f32_not_null(&mut self.buffer, value);
1622 }
1623 self.column_index += 1;
1624 Ok(())
1625 }
1626
1627 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1628 ///
1629 /// # Errors
1630 ///
1631 /// See [`add_bool`](Self::add_bool).
1632 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1633 if self.column_index >= self.column_count {
1634 return Err(Error::invalid_table_definition("Too many columns in row"));
1635 }
1636 self.ensure_header();
1637 if self.current_column_nullable() {
1638 copy::write_f64(&mut self.buffer, value);
1639 } else {
1640 copy::write_f64_not_null(&mut self.buffer, value);
1641 }
1642 self.column_index += 1;
1643 Ok(())
1644 }
1645
1646 /// Adds a string value (TEXT/VARCHAR).
1647 ///
1648 /// # Errors
1649 ///
1650 /// See [`add_bool`](Self::add_bool).
1651 pub fn add_str(&mut self, value: &str) -> Result<()> {
1652 self.add_bytes(value.as_bytes())
1653 }
1654
1655 /// Adds a bytes value (BYTEA).
1656 ///
1657 /// # Errors
1658 ///
1659 /// See [`add_bool`](Self::add_bool).
1660 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1661 if self.column_index >= self.column_count {
1662 return Err(Error::invalid_table_definition("Too many columns in row"));
1663 }
1664 if value.len() > u32::MAX as usize {
1665 return Err(Error::conversion(format!(
1666 "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1667 value.len(),
1668 u32::MAX
1669 )));
1670 }
1671 self.ensure_header();
1672 if self.current_column_nullable() {
1673 copy::write_varbinary(&mut self.buffer, value);
1674 } else {
1675 copy::write_varbinary_not_null(&mut self.buffer, value);
1676 }
1677 self.column_index += 1;
1678 Ok(())
1679 }
1680
1681 /// Adds a 128-bit value (NUMERIC/INTERVAL).
1682 ///
1683 /// # Errors
1684 ///
1685 /// See [`add_bool`](Self::add_bool).
1686 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1687 if self.column_index >= self.column_count {
1688 return Err(Error::invalid_table_definition("Too many columns in row"));
1689 }
1690 self.ensure_header();
1691 if self.current_column_nullable() {
1692 copy::write_data128(&mut self.buffer, value);
1693 } else {
1694 copy::write_data128_not_null(&mut self.buffer, value);
1695 }
1696 self.column_index += 1;
1697 Ok(())
1698 }
1699
1700 /// Adds a Date value.
1701 ///
1702 /// # Errors
1703 ///
1704 /// See [`add_bool`](Self::add_bool).
1705 pub fn add_date(&mut self, value: Date) -> Result<()> {
1706 if self.column_index >= self.column_count {
1707 return Err(Error::invalid_table_definition("Too many columns in row"));
1708 }
1709 self.ensure_header();
1710 let julian_day = value.to_julian_day();
1711 if self.current_column_nullable() {
1712 copy::write_i32(&mut self.buffer, julian_day);
1713 } else {
1714 copy::write_i32_not_null(&mut self.buffer, julian_day);
1715 }
1716 self.column_index += 1;
1717 Ok(())
1718 }
1719
1720 /// Adds a Time value.
1721 ///
1722 /// # Errors
1723 ///
1724 /// See [`add_bool`](Self::add_bool).
1725 pub fn add_time(&mut self, value: Time) -> Result<()> {
1726 if self.column_index >= self.column_count {
1727 return Err(Error::invalid_table_definition("Too many columns in row"));
1728 }
1729 self.ensure_header();
1730 let micros = value.to_microseconds();
1731 if self.current_column_nullable() {
1732 copy::write_i64(&mut self.buffer, micros);
1733 } else {
1734 copy::write_i64_not_null(&mut self.buffer, micros);
1735 }
1736 self.column_index += 1;
1737 Ok(())
1738 }
1739
1740 /// Adds a Timestamp value.
1741 ///
1742 /// # Errors
1743 ///
1744 /// See [`add_bool`](Self::add_bool).
1745 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1746 if self.column_index >= self.column_count {
1747 return Err(Error::invalid_table_definition("Too many columns in row"));
1748 }
1749 self.ensure_header();
1750 let micros = value.to_microseconds();
1751 if self.current_column_nullable() {
1752 copy::write_i64(&mut self.buffer, micros);
1753 } else {
1754 copy::write_i64_not_null(&mut self.buffer, micros);
1755 }
1756 self.column_index += 1;
1757 Ok(())
1758 }
1759
1760 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1761 ///
1762 /// # Errors
1763 ///
1764 /// See [`add_bool`](Self::add_bool).
1765 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1766 if self.column_index >= self.column_count {
1767 return Err(Error::invalid_table_definition("Too many columns in row"));
1768 }
1769 self.ensure_header();
1770 let micros = value.to_microseconds_utc();
1771 if self.current_column_nullable() {
1772 copy::write_i64(&mut self.buffer, micros);
1773 } else {
1774 copy::write_i64_not_null(&mut self.buffer, micros);
1775 }
1776 self.column_index += 1;
1777 Ok(())
1778 }
1779
1780 /// Adds an Interval value.
1781 ///
1782 /// # Errors
1783 ///
1784 /// See [`add_bool`](Self::add_bool).
1785 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1786 if self.column_index >= self.column_count {
1787 return Err(Error::invalid_table_definition("Too many columns in row"));
1788 }
1789 self.ensure_header();
1790 let packed = value.to_packed();
1791 if self.current_column_nullable() {
1792 copy::write_data128(&mut self.buffer, &packed);
1793 } else {
1794 copy::write_data128_not_null(&mut self.buffer, &packed);
1795 }
1796 self.column_index += 1;
1797 Ok(())
1798 }
1799
1800 /// Ends the current row.
1801 ///
1802 /// Returns an error if the wrong number of columns were added.
1803 ///
1804 /// # Errors
1805 ///
1806 /// Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
1807 /// for this row than the chunk's column count.
1808 pub fn end_row(&mut self) -> Result<()> {
1809 if self.column_index != self.column_count {
1810 return Err(Error::invalid_table_definition(format!(
1811 "Expected {} columns, got {}",
1812 self.column_count, self.column_index
1813 )));
1814 }
1815 self.column_index = 0;
1816 self.row_count += 1;
1817 Ok(())
1818 }
1819
1820 /// Returns the current column index (for checking incomplete rows).
1821 #[must_use]
1822 pub fn column_index(&self) -> usize {
1823 self.column_index
1824 }
1825
1826 /// Returns a reference to the internal buffer.
1827 pub(crate) fn buffer(&self) -> &BytesMut {
1828 &self.buffer
1829 }
1830}
1831
1832// =============================================================================
1833// ChunkSender - Mutex-protected sender for InsertChunks
1834// =============================================================================
1835
1836use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1837use std::sync::Mutex;
1838
1839/// A thread-safe sender for [`InsertChunk`]s.
1840///
1841/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1842/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1843/// the mutex ensures serialized access.
1844///
1845/// # Example
1846///
1847/// ```no_run
1848/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1849/// use std::sync::mpsc;
1850/// use std::thread;
1851///
1852/// fn main() -> Result<()> {
1853/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1854///
1855/// let table_def = TableDefinition::new("products")
1856/// .add_required_column("id", SqlType::int())
1857/// .add_nullable_column("name", SqlType::text());
1858///
1859/// Catalog::new(&conn).create_table(&table_def)?;
1860///
1861/// let sender = ChunkSender::new(&conn, &table_def)?;
1862/// let (tx, rx) = mpsc::channel::<InsertChunk>();
1863///
1864/// // Worker thread
1865/// let table_def_clone = table_def.clone();
1866/// let handle = thread::spawn(move || {
1867/// let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1868/// for i in 0..1000i32 {
1869/// chunk.add_i32(i).unwrap();
1870/// chunk.add_str(&format!("Product {}", i)).unwrap();
1871/// chunk.end_row().unwrap();
1872/// }
1873/// tx.send(chunk).unwrap();
1874/// });
1875///
1876/// // Receive and send chunks
1877/// while let Ok(chunk) = rx.recv() {
1878/// sender.send_chunk(chunk)?;
1879/// }
1880///
1881/// handle.join().unwrap();
1882/// let rows = sender.finish()?;
1883/// println!("Inserted {} rows", rows);
1884/// Ok(())
1885/// }
1886/// ```
1887#[derive(Debug)]
1888pub struct ChunkSender<'conn> {
1889 connection: &'conn Connection,
1890 table_name: String,
1891 columns: Vec<String>,
1892 writer: Mutex<Option<CopyInWriter<'conn>>>,
1893 header_sent: std::sync::atomic::AtomicBool,
1894 total_rows: AtomicU64,
1895 chunks_sent: AtomicUsize,
1896}
1897
1898impl<'conn> ChunkSender<'conn> {
1899 /// Creates a new chunk sender for the given table.
1900 ///
1901 /// # Errors
1902 ///
1903 /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1904 /// columns. The COPY session itself is opened lazily on the first
1905 /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1906 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1907 if table_def.column_count() == 0 {
1908 return Err(Error::invalid_table_definition(
1909 "Table definition must have at least one column",
1910 ));
1911 }
1912
1913 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1914 let table_name = table_def.qualified_name();
1915
1916 Ok(ChunkSender {
1917 connection,
1918 table_name,
1919 columns,
1920 writer: Mutex::new(None),
1921 header_sent: std::sync::atomic::AtomicBool::new(false),
1922 total_rows: AtomicU64::new(0),
1923 chunks_sent: AtomicUsize::new(0),
1924 })
1925 }
1926
1927 /// Sends a chunk to Hyper.
1928 ///
1929 /// This method is thread-safe - multiple threads can call it concurrently,
1930 /// but only one chunk will be sent at a time.
1931 ///
1932 /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1933 /// automatically handles headers: the first chunk's header is sent, and
1934 /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1935 /// header per COPY stream).
1936 ///
1937 /// # Errors
1938 ///
1939 /// Returns an error if the chunk is empty or if sending fails.
1940 pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1941 // Capture row count before take() resets it
1942 let row_count = chunk.row_count();
1943
1944 let Some(buffer) = chunk.take() else {
1945 return Ok(());
1946 };
1947
1948 // Acquire the lock for exclusive send access
1949 let mut writer_guard = self
1950 .writer
1951 .lock()
1952 .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
1953
1954 // Lazily initialize the COPY connection
1955 if writer_guard.is_none() {
1956 let client = self.connection.tcp_client().ok_or_else(|| {
1957 Error::feature_not_supported(
1958 "ChunkSender requires a TCP connection. gRPC connections do not support COPY operations."
1959 )
1960 })?;
1961 let columns: Vec<&str> = self
1962 .columns
1963 .iter()
1964 .map(std::string::String::as_str)
1965 .collect();
1966 *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
1967 }
1968
1969 // Handle headers: only first chunk should have header in the COPY stream
1970 // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
1971 // strip headers from all chunks except the first one sent.
1972 let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
1973
1974 let data_to_send = if is_first {
1975 // First chunk: send with header
1976 &buffer[..]
1977 } else {
1978 // Subsequent chunks: strip the 19-byte header if present
1979 if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
1980 && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
1981 {
1982 &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
1983 } else {
1984 &buffer[..]
1985 }
1986 };
1987
1988 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
1989 // into the connection's write buffer. flush_stream ensures the data
1990 // reaches the server before we return.
1991 if let Some(ref mut writer) = *writer_guard {
1992 writer.send_direct(data_to_send)?;
1993 writer.flush_stream()?;
1994 }
1995
1996 // Update counters (lock already released for these atomic ops)
1997 drop(writer_guard);
1998 self.total_rows
1999 .fetch_add(row_count as u64, Ordering::Relaxed);
2000 self.chunks_sent.fetch_add(1, Ordering::Relaxed);
2001
2002 debug!(
2003 target: "hyperdb_api",
2004 chunk = self.chunks_sent.load(Ordering::Relaxed),
2005 rows = row_count,
2006 bytes = data_to_send.len(),
2007 "chunk-sender"
2008 );
2009
2010 Ok(())
2011 }
2012
2013 /// Returns the total number of rows sent so far.
2014 pub fn total_rows(&self) -> u64 {
2015 self.total_rows.load(Ordering::Relaxed)
2016 }
2017
2018 /// Returns the number of chunks sent so far.
2019 pub fn chunks_sent(&self) -> usize {
2020 self.chunks_sent.load(Ordering::Relaxed)
2021 }
2022
2023 /// Finishes the COPY operation and returns the total row count.
2024 ///
2025 /// This method consumes the sender. After calling this, the COPY operation
2026 /// is complete and all data has been committed.
2027 ///
2028 /// # Errors
2029 ///
2030 /// - Returns [`Error::Internal`] with message `"ChunkSender mutex poisoned"`
2031 /// if a sender thread panicked while holding the writer lock.
2032 /// - Returns [`Error::Server`] or [`Error::Io`] if sending the COPY
2033 /// trailer or finishing the COPY operation fails.
2034 pub fn finish(self) -> Result<u64> {
2035 let mut writer_guard = self
2036 .writer
2037 .lock()
2038 .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
2039
2040 // If no chunks were sent, return 0
2041 let Some(writer) = writer_guard.take() else {
2042 return Ok(0);
2043 };
2044
2045 // Write and send the COPY trailer
2046 let mut trailer_buf = BytesMut::with_capacity(2);
2047 copy::write_trailer(&mut trailer_buf);
2048
2049 // Need to get mutable access to send trailer
2050 let mut writer = writer;
2051 writer.send(&trailer_buf)?;
2052
2053 // Finish the COPY operation
2054 let rows = writer.finish()?;
2055
2056 info!(
2057 target: "hyperdb_api",
2058 rows,
2059 chunks = self.chunks_sent.load(Ordering::Relaxed),
2060 table = %self.table_name,
2061 "chunk-sender-finish"
2062 );
2063
2064 Ok(rows)
2065 }
2066}
2067
2068#[cfg(test)]
2069mod tests {
2070 use crate::table_definition::TableDefinition;
2071 use hyperdb_api_core::types::SqlType;
2072
2073 use super::InsertChunk;
2074
2075 fn create_test_table_def() -> TableDefinition {
2076 TableDefinition::new("test")
2077 .add_required_column("id", SqlType::int())
2078 .add_nullable_column("name", SqlType::text())
2079 }
2080
2081 #[test]
2082 fn test_inserter_column_validation() {
2083 // We can't fully test without a connection, but we can test validation logic
2084 let table_def = create_test_table_def();
2085 assert_eq!(table_def.column_count(), 2);
2086 }
2087
2088 #[test]
2089 fn test_insert_chunk_encoding() {
2090 let table_def = create_test_table_def();
2091 let mut chunk = InsertChunk::from_table_definition(&table_def);
2092
2093 // Add a row
2094 chunk.add_i32(42).unwrap();
2095 chunk.add_str("hello").unwrap();
2096 chunk.end_row().unwrap();
2097
2098 assert_eq!(chunk.row_count(), 1);
2099 assert!(!chunk.is_empty());
2100 assert!(!chunk.should_flush()); // Not at limit yet
2101
2102 // Add more rows
2103 for i in 0..100 {
2104 chunk.add_i32(i).unwrap();
2105 chunk.add_str(&format!("item {i}")).unwrap();
2106 chunk.end_row().unwrap();
2107 }
2108
2109 assert_eq!(chunk.row_count(), 101);
2110
2111 // Take the buffer
2112 let buffer = chunk.take().unwrap();
2113 assert!(!buffer.is_empty());
2114
2115 // Chunk should now be empty after take
2116 assert!(chunk.take().is_none());
2117 }
2118
2119 #[test]
2120 fn test_insert_chunk_null_handling() {
2121 let table_def = create_test_table_def();
2122 let mut chunk = InsertChunk::from_table_definition(&table_def);
2123
2124 // First column is NOT NULL, should fail
2125 assert!(chunk.add_null().is_err());
2126
2127 // Add the required column first
2128 chunk.add_i32(1).unwrap();
2129
2130 // Second column is nullable, should succeed
2131 chunk.add_null().unwrap();
2132 chunk.end_row().unwrap();
2133
2134 assert_eq!(chunk.row_count(), 1);
2135 }
2136
2137 #[test]
2138 fn test_insert_chunk_column_count_validation() {
2139 let table_def = create_test_table_def();
2140 let mut chunk = InsertChunk::from_table_definition(&table_def);
2141
2142 // Add only one column
2143 chunk.add_i32(1).unwrap();
2144
2145 // end_row should fail
2146 assert!(chunk.end_row().is_err());
2147
2148 // Add second column
2149 chunk.add_str("test").unwrap();
2150
2151 // Now end_row should succeed
2152 chunk.end_row().unwrap();
2153 }
2154
2155 #[test]
2156 fn test_insert_chunk_too_many_columns() {
2157 let table_def = create_test_table_def();
2158 let mut chunk = InsertChunk::from_table_definition(&table_def);
2159
2160 chunk.add_i32(1).unwrap();
2161 chunk.add_str("test").unwrap();
2162
2163 // Third column should fail
2164 assert!(chunk.add_i32(2).is_err());
2165 }
2166
2167 #[test]
2168 fn test_insert_chunk_clear() {
2169 let table_def = create_test_table_def();
2170 let mut chunk = InsertChunk::from_table_definition(&table_def);
2171
2172 chunk.add_i32(1).unwrap();
2173 chunk.add_str("test").unwrap();
2174 chunk.end_row().unwrap();
2175
2176 assert_eq!(chunk.row_count(), 1);
2177
2178 chunk.clear();
2179
2180 assert_eq!(chunk.row_count(), 0);
2181 assert!(chunk.is_empty());
2182 }
2183}