hyperdb_api/inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance bulk data inserter using COPY protocol.
5//!
6//! This module provides the `Inserter` struct for efficient bulk data insertion
7//! into Hyper tables, along with the `IntoValue` trait for type-safe insertion.
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{Inserter, Connection, CreateMode, Result};
13//! # fn example(conn: &Connection, table_def: &hyperdb_api::TableDefinition) -> Result<()> {
14//! let mut inserter = Inserter::new(&conn, &table_def)?;
15//! for i in 0..10000i32 {
16//! inserter.add_row(&[&i, &format!("item {}", i), &(i as f64 * 1.5)])?;
17//! }
18//! inserter.execute()?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Instant;
24
25use hyperdb_api_core::client::client::CopyInWriter;
26use hyperdb_api_core::protocol::copy;
27use hyperdb_api_core::types::bytes::BytesMut;
28use hyperdb_api_core::types::{
29 Date, Geography, Interval, Numeric, OffsetTimestamp, Time, Timestamp,
30};
31use tracing::{debug, info};
32
33use crate::catalog::Catalog;
34use crate::connection::Connection;
35use crate::error::{Error, Result};
36use crate::table_definition::TableDefinition;
37
38/// Initial buffer size (4 MB) to reduce early reallocations.
39///
40/// The COPY protocol sends data in chunks, and each chunk requires a
41/// contiguous buffer. Starting at 4 MB avoids repeated reallocations
42/// during the first chunk for typical workloads while keeping initial
43/// memory allocation reasonable.
44const INITIAL_BUFFER_SIZE: usize = 4 * 1024 * 1024;
45
46/// Maximum buffer size per chunk before flushing to the server (16 MB).
47///
48/// This balances two competing concerns:
49/// - **Throughput**: Larger chunks amortize per-chunk overhead (COPY header,
50/// network round-trip). Below ~1 MB, per-chunk overhead becomes significant.
51/// - **Memory**: The buffer must be fully materialized before sending. 16 MB
52/// keeps resident memory bounded even when rows are wide.
53///
54/// The 16 MB value was chosen empirically — it lands on the flat part of
55/// the throughput curve where further increases yield diminishing returns.
56const CHUNK_SIZE_LIMIT: usize = 16 * 1024 * 1024;
57
58/// Maximum rows per chunk before flushing to the server.
59///
60/// This is a secondary flush trigger alongside [`CHUNK_SIZE_LIMIT`]. For
61/// narrow rows (few bytes each), the byte limit alone would accumulate
62/// millions of rows before flushing, which delays server-side processing.
63/// 64K rows ensures timely flushes regardless of row width and aligns with
64/// the 64K chunk size used for query result streaming
65/// ([`DEFAULT_BINARY_CHUNK_SIZE`](crate::result::DEFAULT_BINARY_CHUNK_SIZE)).
66const CHUNK_ROW_LIMIT: usize = 64_000;
67
68/// A high-performance bulk data inserter.
69///
70/// The `Inserter` efficiently inserts large amounts of data into a Hyper table
71/// using the COPY protocol with `HyperBinary` format for optimal performance.
72///
73/// # Example
74///
75/// ```no_run
76/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
77///
78/// fn main() -> Result<()> {
79/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
80///
81/// let table_def = TableDefinition::new("users")
82/// .add_required_column("id", SqlType::int())
83/// .add_nullable_column("name", SqlType::text());
84///
85/// Catalog::new(&conn).create_table(&table_def)?;
86///
87/// let mut inserter = Inserter::new(&conn, &table_def)?;
88///
89/// for i in 0..1000i32 {
90/// inserter.add_row(&[&i, &format!("User {}", i)])?;
91/// }
92///
93/// let rows = inserter.execute()?;
94/// println!("Inserted {} rows", rows);
95/// Ok(())
96/// }
97/// ```
98#[derive(Debug)]
99pub struct Inserter<'conn> {
100 connection: &'conn Connection,
101 table_def: TableDefinition,
102 /// The current chunk being populated (delegates encoding).
103 chunk: InsertChunk,
104 /// Total rows inserted across all chunks.
105 row_count: u64,
106 /// Number of chunks sent.
107 chunk_count: usize,
108 /// Active COPY writer (lazily initialized on first write).
109 writer: Option<CopyInWriter<'conn>>,
110 /// Start time for timing the insert operation.
111 start_time: Instant,
112}
113
114impl<'conn> Inserter<'conn> {
115 /// Creates a new inserter for the given table.
116 ///
117 /// The underlying COPY session is started lazily on the first flush or
118 /// execute, so construction is lightweight. However, the connection's
119 /// transport is validated eagerly — using a gRPC connection will return
120 /// an error immediately.
121 ///
122 /// # Errors
123 ///
124 /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
125 /// columns.
126 /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
127 /// (COPY is TCP-only).
128 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
129 if table_def.column_count() == 0 {
130 return Err(Error::invalid_table_definition(
131 "Table definition must have at least one column",
132 ));
133 }
134
135 // Fail fast: verify the connection supports COPY (TCP only)
136 if connection.tcp_client().is_none() {
137 return Err(Error::feature_not_supported(
138 "Inserter requires a TCP connection. \
139 gRPC connections do not support COPY operations.",
140 ));
141 }
142
143 Ok(Inserter {
144 connection,
145 table_def: table_def.clone(),
146 chunk: InsertChunk::from_table_definition(table_def),
147 row_count: 0,
148 chunk_count: 0,
149 writer: None,
150 start_time: Instant::now(),
151 })
152 }
153
154 /// Creates an inserter by querying the table schema from the database.
155 ///
156 /// This method queries the database to get the table definition automatically,
157 /// which is useful when you want to insert into an existing table without
158 /// manually specifying the schema.
159 ///
160 /// # Arguments
161 ///
162 /// * `connection` - The database connection.
163 /// * `table_name` - The table name (can be a simple name, or "schema.table", etc.)
164 ///
165 /// # Errors
166 ///
167 /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
168 ///
169 /// # Example
170 ///
171 /// ```no_run
172 /// use hyperdb_api::{Connection, CreateMode, Inserter, Result};
173 ///
174 /// fn main() -> Result<()> {
175 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
176 ///
177 /// // Create a table first
178 /// conn.execute_command("CREATE TABLE IF NOT EXISTS products (id INT NOT NULL, name TEXT, price DOUBLE PRECISION)")?;
179 ///
180 /// // Create inserter by querying the schema directly from a string
181 /// let mut inserter = Inserter::from_table(&conn, "public.products")?;
182 ///
183 /// // Now we can insert data without knowing the exact schema
184 /// inserter.add_row(&[&1i32, &"Widget", &19.99f64])?;
185 /// inserter.add_row(&[&2i32, &"Gadget", &29.99f64])?;
186 ///
187 /// let rows = inserter.execute()?;
188 /// println!("Inserted {} rows", rows);
189 /// Ok(())
190 /// }
191 /// ```
192 pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
193 where
194 T: TryInto<crate::TableName>,
195 crate::Error: From<T::Error>,
196 {
197 let catalog = Catalog::new(connection);
198 let table_def = catalog.get_table_definition(table_name)?;
199 Self::new(connection, &table_def)
200 }
201
202 /// Creates an inserter with column mappings that allow SQL expressions.
203 ///
204 /// This method uses a temporary table and INSERT...SELECT to support
205 /// column mappings with SQL expressions. Data is first inserted into
206 /// a temporary staging table, then transformed using the mappings.
207 ///
208 /// # Arguments
209 ///
210 /// * `connection` - The database connection.
211 /// * `inserter_def` - Defines the columns to be provided to the inserter (staging table).
212 /// * `target_table` - The qualified name of the target table to insert into.
213 /// Use `TableDefinition::qualified_name()` for properly escaped names like `"schema"."table"`.
214 /// * `mappings` - Column mappings defining how values are transformed.
215 ///
216 /// # Example
217 ///
218 /// ```no_run
219 /// use hyperdb_api::{Connection, CreateMode, TableDefinition, ColumnMapping, Inserter, SqlType, Result};
220 ///
221 /// fn main() -> Result<()> {
222 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
223 ///
224 /// // Target table with computed columns
225 /// conn.execute_command(r#"
226 /// CREATE TABLE orders (
227 /// id INT NOT NULL,
228 /// product TEXT,
229 /// quantity INT,
230 /// price DOUBLE PRECISION,
231 /// total DOUBLE PRECISION,
232 /// created_at TIMESTAMP
233 /// )
234 /// "#)?;
235 ///
236 /// // Inserter definition - what we provide
237 /// let inserter_def = TableDefinition::new("_stage")
238 /// .add_required_column("id", SqlType::int())
239 /// .add_nullable_column("product", SqlType::text())
240 /// .add_nullable_column("quantity", SqlType::int())
241 /// .add_nullable_column("price", SqlType::double());
242 ///
243 /// // Column mappings - how values are transformed
244 /// let mappings = vec![
245 /// ColumnMapping::new("id"),
246 /// ColumnMapping::new("product"),
247 /// ColumnMapping::new("quantity"),
248 /// ColumnMapping::new("price"),
249 /// ColumnMapping::with_expression("total", "quantity * price"),
250 /// ColumnMapping::with_expression("created_at", "NOW()"),
251 /// ];
252 ///
253 /// // For simple table names in the public schema, use quoted name
254 /// // For qualified names, use target_table_def.qualified_name()
255 /// let mut inserter = Inserter::with_column_mappings(&conn, &inserter_def, "orders", &mappings)?;
256 ///
257 /// inserter.add_row(&[&1i32, &"Widget", &5i32, &10.0f64])?;
258 /// inserter.add_row(&[&2i32, &"Gadget", &3i32, &25.0f64])?;
259 ///
260 /// let rows = inserter.execute()?;
261 /// Ok(())
262 /// }
263 /// ```
264 ///
265 /// # Errors
266 ///
267 /// - Returns an error if `target_table` fails to convert into a
268 /// [`TableName`](crate::TableName).
269 /// - Returns [`Error::Server`] if creating the temporary staging table
270 /// fails on the server.
271 /// - Returns the errors from [`Inserter::new`] for the staging table
272 /// (zero-column table definition, gRPC transport).
273 pub fn with_column_mappings<T>(
274 connection: &'conn Connection,
275 inserter_def: &TableDefinition,
276 target_table: T,
277 mappings: &[ColumnMapping],
278 ) -> Result<MappedInserter<'conn>>
279 where
280 T: TryInto<crate::TableName>,
281 crate::Error: From<T::Error>,
282 {
283 MappedInserter::new(connection, inserter_def, target_table, mappings)
284 }
285
286 /// Returns the table definition.
287 pub fn table_definition(&self) -> &TableDefinition {
288 &self.table_def
289 }
290
291 /// Returns the number of columns.
292 #[must_use]
293 pub fn column_count(&self) -> usize {
294 self.table_def.column_count()
295 }
296
297 /// Returns the number of complete rows buffered.
298 #[must_use]
299 pub fn row_count(&self) -> u64 {
300 self.row_count
301 }
302
303 /// Adds a NULL value for the current column.
304 ///
305 /// # Errors
306 ///
307 /// Returns [`Error::InvalidTableDefinition`] if the current row already has all columns
308 /// supplied, or if the current column is marked `NOT NULL` in the table
309 /// definition.
310 #[inline]
311 pub fn add_null(&mut self) -> Result<()> {
312 self.chunk.add_null()
313 }
314
315 /// Adds a boolean value.
316 ///
317 /// # Errors
318 ///
319 /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
320 /// the current row already has all columns supplied.
321 #[inline]
322 pub fn add_bool(&mut self, value: bool) -> Result<()> {
323 self.chunk.add_bool(value)
324 }
325
326 /// Adds an i16 value (SMALLINT).
327 ///
328 /// # Errors
329 ///
330 /// See [`add_bool`](Self::add_bool).
331 #[inline]
332 pub fn add_i16(&mut self, value: i16) -> Result<()> {
333 self.chunk.add_i16(value)
334 }
335
336 /// Adds an i32 value (INT).
337 ///
338 /// # Errors
339 ///
340 /// See [`add_bool`](Self::add_bool).
341 #[inline]
342 pub fn add_i32(&mut self, value: i32) -> Result<()> {
343 self.chunk.add_i32(value)
344 }
345
346 /// Adds an i64 value (BIGINT).
347 ///
348 /// # Errors
349 ///
350 /// See [`add_bool`](Self::add_bool).
351 #[inline]
352 pub fn add_i64(&mut self, value: i64) -> Result<()> {
353 self.chunk.add_i64(value)
354 }
355
356 /// Adds an f32 value (REAL/FLOAT4).
357 ///
358 /// # Errors
359 ///
360 /// See [`add_bool`](Self::add_bool).
361 #[inline]
362 pub fn add_f32(&mut self, value: f32) -> Result<()> {
363 self.chunk.add_f32(value)
364 }
365
366 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
367 ///
368 /// # Errors
369 ///
370 /// See [`add_bool`](Self::add_bool).
371 #[inline]
372 pub fn add_f64(&mut self, value: f64) -> Result<()> {
373 self.chunk.add_f64(value)
374 }
375
376 /// Adds a string value (TEXT/VARCHAR).
377 ///
378 /// # Errors
379 ///
380 /// See [`add_bool`](Self::add_bool).
381 #[inline]
382 pub fn add_str(&mut self, value: &str) -> Result<()> {
383 self.chunk.add_str(value)
384 }
385
386 /// Adds a bytes value (BYTEA).
387 ///
388 /// # Errors
389 ///
390 /// See [`add_bool`](Self::add_bool).
391 #[inline]
392 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
393 self.chunk.add_bytes(value)
394 }
395
396 /// Adds a 128-bit value (NUMERIC/INTERVAL).
397 ///
398 /// # Errors
399 ///
400 /// See [`add_bool`](Self::add_bool).
401 #[inline]
402 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
403 self.chunk.add_data128(value)
404 }
405
406 /// Adds an optional value. If None, adds NULL.
407 ///
408 /// # Errors
409 ///
410 /// Propagates whatever `add_fn` or [`add_null`](Self::add_null) would
411 /// return for the current row position.
412 pub fn add_optional<T, F>(&mut self, value: Option<T>, add_fn: F) -> Result<()>
413 where
414 F: FnOnce(&mut Self, T) -> Result<()>,
415 {
416 match value {
417 Some(v) => add_fn(self, v),
418 None => self.add_null(),
419 }
420 }
421
422 /// Ends the current row.
423 ///
424 /// Returns an error if the wrong number of columns were added.
425 /// Automatically flushes the buffer if chunk limits are reached.
426 ///
427 /// # Errors
428 ///
429 /// - Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
430 /// than the table definition requires.
431 /// - Returns any error from [`flush`](Self::flush) when an automatic
432 /// flush is triggered by reaching the chunk byte/row limit.
433 pub fn end_row(&mut self) -> Result<()> {
434 self.chunk.end_row()?;
435 self.row_count += 1;
436
437 // Auto-flush if we've reached chunk limits
438 if self.chunk.should_flush() {
439 self.flush()?;
440 }
441
442 Ok(())
443 }
444
445 /// Flushes the current buffer to the server.
446 ///
447 /// This sends all buffered rows as a chunk and resets the buffer.
448 /// Called automatically when chunk limits are reached.
449 ///
450 /// # Errors
451 ///
452 /// - Returns [`Error::FeatureNotSupported`] if the connection is using gRPC transport
453 /// (COPY is TCP-only) and no COPY session exists yet.
454 /// - Returns [`Error::Server`] if the server rejects the `COPY IN` start
455 /// or the subsequent data send.
456 /// - Returns [`Error::Io`] on transport-level I/O failures while writing
457 /// the chunk.
458 pub fn flush(&mut self) -> Result<()> {
459 if self.chunk.is_empty() {
460 return Ok(());
461 }
462
463 let chunk_rows = self.chunk.row_count();
464 let Some(buffer) = self.chunk.take() else {
465 return Ok(());
466 };
467
468 // Ensure the COPY connection is started
469 if self.writer.is_none() {
470 let client = self.connection.tcp_client().ok_or_else(|| {
471 crate::Error::feature_not_supported(
472 "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
473 )
474 })?;
475 let columns: Vec<&str> = self
476 .table_def
477 .columns
478 .iter()
479 .map(|c| c.name.as_str())
480 .collect();
481 let table_name = self.table_def.qualified_name();
482 self.writer = Some(client.copy_in(&table_name, &columns)?);
483 }
484
485 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
486 // into the connection's write buffer. flush_stream ensures the data
487 // reaches the server before we return.
488 if let Some(ref mut writer) = self.writer {
489 writer.send_direct(&buffer)?;
490 writer.flush_stream()?;
491 }
492
493 debug!(
494 target: "hyperdb_api",
495 chunk = self.chunk_count,
496 rows = chunk_rows,
497 bytes = buffer.len(),
498 "inserter-chunk"
499 );
500
501 self.chunk_count += 1;
502 Ok(())
503 }
504
505 /// Adds a complete row of values.
506 ///
507 /// This is a convenience method that adds all column values at once
508 /// using the `IntoValue` trait for type-safe insertion.
509 ///
510 /// # Arguments
511 ///
512 /// * `values` - A slice of values implementing `IntoValue`.
513 ///
514 /// # Errors
515 ///
516 /// Returns an error if the number of values doesn't match the column count,
517 /// or if any value cannot be added.
518 ///
519 /// # Example
520 ///
521 /// ```no_run
522 /// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, SqlType, Result};
523 ///
524 /// fn main() -> Result<()> {
525 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
526 ///
527 /// let table_def = TableDefinition::new("users")
528 /// .add_required_column("id", SqlType::int())
529 /// .add_nullable_column("name", SqlType::text());
530 ///
531 /// Catalog::new(&conn).create_table(&table_def)?;
532 ///
533 /// let mut inserter = Inserter::new(&conn, &table_def)?;
534 ///
535 /// // Add rows using IntoValue trait
536 /// inserter.add_row(&[&1i32, &"Alice"])?;
537 /// inserter.add_row(&[&2i32, &"Bob"])?;
538 ///
539 /// // Option<T> can be used for nullable columns
540 /// inserter.add_row(&[&3i32, &None::<&str>])?;
541 ///
542 /// let rows = inserter.execute()?;
543 /// Ok(())
544 /// }
545 /// ```
546 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
547 let column_count = self.table_def.column_count();
548 if values.len() != column_count {
549 return Err(Error::invalid_table_definition(format!(
550 "Column count mismatch: expected {} columns but got {}",
551 column_count,
552 values.len()
553 )));
554 }
555
556 for value in values {
557 value.add_to_inserter(self)?;
558 }
559
560 self.end_row()?;
561 Ok(())
562 }
563
564 /// Adds a Date value.
565 ///
566 /// # Errors
567 ///
568 /// See [`add_bool`](Self::add_bool).
569 #[inline]
570 pub fn add_date(&mut self, value: Date) -> Result<()> {
571 self.chunk.add_date(value)
572 }
573
574 /// Adds a Time value.
575 ///
576 /// # Errors
577 ///
578 /// See [`add_bool`](Self::add_bool).
579 #[inline]
580 pub fn add_time(&mut self, value: Time) -> Result<()> {
581 self.chunk.add_time(value)
582 }
583
584 /// Adds a Timestamp value.
585 ///
586 /// # Errors
587 ///
588 /// See [`add_bool`](Self::add_bool).
589 #[inline]
590 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
591 self.chunk.add_timestamp(value)
592 }
593
594 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
595 ///
596 /// # Errors
597 ///
598 /// See [`add_bool`](Self::add_bool).
599 #[inline]
600 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
601 self.chunk.add_offset_timestamp(value)
602 }
603
604 /// Adds an Interval value.
605 ///
606 /// # Errors
607 ///
608 /// See [`add_bool`](Self::add_bool).
609 #[inline]
610 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
611 self.chunk.add_interval(value)
612 }
613
614 /// Adds a Geography value.
615 ///
616 /// # Errors
617 ///
618 /// See [`add_bool`](Self::add_bool).
619 #[inline]
620 pub fn add_geography(&mut self, value: &Geography) -> Result<()> {
621 self.chunk.add_geography(value)
622 }
623
624 /// Adds a Numeric value.
625 ///
626 /// For NUMERIC(precision, scale) where precision ≤ [`Numeric::SMALL_NUMERIC_MAX_PRECISION`]
627 /// (18), the value is stored as i64. For higher precision, 128-bit storage is used.
628 ///
629 /// # Errors
630 ///
631 /// Returns an error if the column's precision cannot be determined from the
632 /// table definition. Ensure that NUMERIC columns are defined with explicit
633 /// `SqlType` information including precision.
634 pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
635 let column_index = self.chunk.column_index();
636
637 // Check the column's precision to determine storage format
638 let precision = self
639 .table_def
640 .columns
641 .get(column_index)
642 .and_then(super::table_definition::ColumnDefinition::sql_type)
643 .and_then(|t| t.precision())
644 .ok_or_else(|| {
645 let col_name = self
646 .table_def
647 .columns
648 .get(column_index)
649 .map_or("<unknown>", |c| c.name.as_str());
650 Error::conversion(format!(
651 "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
652 Ensure the column is defined with explicit SqlType including precision.\n\n\
653 Example fix:\n \
654 table_def.add_column_with_type(\"{col_name}\", SqlType::Numeric {{ precision: 10, scale: 2 }}, true);"
655 ))
656 })?;
657
658 if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
659 // Small numeric: stored as i64
660 let unscaled = value.unscaled_value();
661 let narrowed = i64::try_from(unscaled).map_err(|_| {
662 Error::conversion(format!(
663 "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
664 ))
665 })?;
666 self.chunk.add_i64(narrowed)
667 } else {
668 // Big numeric: stored as 128-bit
669 self.chunk.add_data128(&value.to_packed())
670 }
671 }
672
673 /// Executes the insert and commits all buffered rows.
674 ///
675 /// This sends any remaining buffered data and finishes the COPY operation.
676 /// Returns the number of rows inserted.
677 ///
678 /// The inserter is single-use: calling `execute` a second time returns
679 /// `Ok(0)` because the internal row counter has been reset and no further
680 /// data has been added. To insert additional batches, create a new
681 /// [`Inserter`].
682 ///
683 /// # Errors
684 ///
685 /// Returns an error if:
686 /// - There's an incomplete row (`column_index` != 0)
687 /// - The COPY connection fails to start
688 /// - Sending data fails
689 pub fn execute(&mut self) -> Result<u64> {
690 if self.chunk.column_index() != 0 {
691 return Err(Error::invalid_table_definition(
692 "Incomplete row at execute time",
693 ));
694 }
695
696 if self.row_count == 0 {
697 return Ok(0);
698 }
699
700 // Ensure COPY connection exists before proceeding when we have rows
701 if self.writer.is_none() {
702 let client = self.connection.tcp_client().ok_or_else(|| {
703 Error::feature_not_supported(
704 "Inserter requires a TCP connection. gRPC connections do not support COPY operations.",
705 )
706 })?;
707 let columns: Vec<&str> = self
708 .table_def
709 .columns
710 .iter()
711 .map(|c| c.name.as_str())
712 .collect();
713 let table_name = self.table_def.qualified_name();
714 self.writer = Some(client.copy_in(&table_name, &columns)?);
715 }
716
717 // At this point, writer must exist since we have rows
718 let writer = self
719 .writer
720 .as_mut()
721 .ok_or_else(|| Error::internal("Failed to initialize COPY connection for inserter"))?;
722
723 // If we have buffered data that hasn't been sent yet
724 if !self.chunk.is_empty() {
725 writer.send(self.chunk.buffer())?;
726 }
727
728 // Write and send the COPY trailer
729 let mut trailer_buf = BytesMut::with_capacity(2);
730 copy::write_trailer(&mut trailer_buf);
731 writer.send(&trailer_buf)?;
732
733 // Finish the COPY operation
734 let rows = self
735 .writer
736 .take()
737 .map(hyperdb_api_core::client::CopyInWriter::finish)
738 .transpose()?
739 .unwrap_or(0);
740
741 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
742 info!(
743 target: "hyperdb_api",
744 rows,
745 chunks = self.chunk_count,
746 duration_ms,
747 table = %self.table_def.qualified_name(),
748 "inserter-end"
749 );
750
751 // Reset row counter so a stray second execute() call returns Ok(0)
752 // instead of attempting another COPY trailer on a finished writer.
753 self.row_count = 0;
754
755 Ok(rows)
756 }
757
758 /// Cancels the insert and discards all buffered rows.
759 pub fn cancel(&mut self) {
760 // Drop the in-progress writer (if any). The Drop impl on CopyInWriter
761 // sends a CopyFail to the server.
762 self.writer = None;
763 self.row_count = 0;
764 }
765}
766
767// =============================================================================
768// ColumnMapping
769// =============================================================================
770
771/// Defines how a column receives its value during insertion.
772///
773/// Column mappings allow you to:
774/// - Insert values directly from the inserter stream
775/// - Compute values using SQL expressions
776/// - Use server-side functions like `NOW()` or `DEFAULT`
777///
778/// # Example
779///
780/// ```
781/// use hyperdb_api::ColumnMapping;
782///
783/// // Simple column - insert value directly
784/// let id_col = ColumnMapping::new("id");
785///
786/// // Column with expression - computed value
787/// let created_at = ColumnMapping::with_expression("created_at", "NOW()");
788/// let full_name = ColumnMapping::with_expression("full_name", "first_name || ' ' || last_name");
789/// ```
790#[derive(Debug, Clone)]
791#[must_use = "ColumnMapping represents a column configuration that should not be discarded. Use it when defining inserter column mappings"]
792pub struct ColumnMapping {
793 /// The name of the target column.
794 pub column_name: String,
795 /// Optional SQL expression. If None, the value is inserted directly.
796 pub expression: Option<String>,
797}
798
799impl ColumnMapping {
800 /// Creates a column mapping for direct value insertion.
801 ///
802 /// The column will receive values directly from the inserter.
803 pub fn new(column_name: impl Into<String>) -> Self {
804 ColumnMapping {
805 column_name: column_name.into(),
806 expression: None,
807 }
808 }
809
810 /// Creates a column mapping with a SQL expression.
811 ///
812 /// The column value will be computed using the given SQL expression.
813 /// The expression can reference other columns or use SQL functions.
814 ///
815 /// # Arguments
816 ///
817 /// * `column_name` - The name of the target column.
818 /// * `expression` - A SQL expression to compute the column value.
819 ///
820 /// # Example
821 ///
822 /// ```
823 /// use hyperdb_api::ColumnMapping;
824 ///
825 /// // Use current timestamp
826 /// let created = ColumnMapping::with_expression("created_at", "NOW()");
827 ///
828 /// // Compute from other columns
829 /// let total = ColumnMapping::with_expression("total", "quantity * price");
830 /// ```
831 pub fn with_expression(column_name: impl Into<String>, expression: impl Into<String>) -> Self {
832 ColumnMapping {
833 column_name: column_name.into(),
834 expression: Some(expression.into()),
835 }
836 }
837
838 /// Returns the column name.
839 #[must_use]
840 pub fn column_name(&self) -> &str {
841 &self.column_name
842 }
843
844 /// Returns the SQL expression, if any.
845 #[must_use]
846 pub fn expression(&self) -> Option<&str> {
847 self.expression.as_deref()
848 }
849
850 /// Returns true if this is a direct value mapping (no expression).
851 #[must_use]
852 pub fn is_direct(&self) -> bool {
853 self.expression.is_none()
854 }
855
856 /// Returns the select list item for this mapping.
857 fn to_select_item(&self) -> String {
858 match &self.expression {
859 Some(expr) => format!("{} AS \"{}\"", expr, self.column_name.replace('"', "\"\"")),
860 None => format!("\"{}\"", self.column_name.replace('"', "\"\"")),
861 }
862 }
863}
864
865// =============================================================================
866// IntoValue Trait
867// =============================================================================
868
869/// Trait for types that can be inserted into a Hyper table.
870///
871/// This trait is implemented for common Rust types, allowing them to be
872/// used with [`Inserter::add_row()`] for type-safe insertion.
873///
874/// # Supported Types
875///
876/// - Integers: `i16`, `i32`, `i64`
877/// - Floats: `f32`, `f64`
878/// - `bool`
879/// - `&str`, `String`
880/// - `Option<T>` where `T: IntoValue` (for nullable columns)
881/// - Date/time types: `Date`, `Time`, `Timestamp`, `Interval`
882/// - `Numeric`, `Geography`, `Vec<u8>` (bytes)
883///
884/// # Example
885///
886/// ```no_run
887/// use hyperdb_api::{Connection, CreateMode, Catalog, TableDefinition, Inserter, IntoValue, SqlType, Result};
888///
889/// fn main() -> Result<()> {
890/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
891///
892/// let table_def = TableDefinition::new("example")
893/// .add_required_column("a", SqlType::int())
894/// .add_nullable_column("b", SqlType::text())
895/// .add_nullable_column("c", SqlType::double());
896/// Catalog::new(&conn).create_table(&table_def)?;
897///
898/// let mut inserter = Inserter::new(&conn, &table_def)?;
899///
900/// // IntoValue allows adding rows with mixed types
901/// inserter.add_row(&[&1i32, &"Alice", &Some(3.14f64)])?;
902/// inserter.add_row(&[&2i32, &"Bob", &None::<f64>])?; // NULL value
903///
904/// inserter.execute()?;
905/// Ok(())
906/// }
907/// ```
908pub trait IntoValue {
909 /// Adds this value to the inserter.
910 ///
911 /// # Errors
912 ///
913 /// Implementations call the matching `Inserter::add_*` method and
914 /// forward its error — see [`Inserter::add_bool`] for the shared
915 /// failure modes (too many columns, NULL into non-nullable, etc).
916 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()>;
917}
918
919// Implementations for basic types
920
921impl IntoValue for bool {
922 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
923 inserter.add_bool(*self)
924 }
925}
926
927impl IntoValue for i16 {
928 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
929 inserter.add_i16(*self)
930 }
931}
932
933impl IntoValue for i32 {
934 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
935 inserter.add_i32(*self)
936 }
937}
938
939impl IntoValue for i64 {
940 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
941 inserter.add_i64(*self)
942 }
943}
944
945impl IntoValue for f32 {
946 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
947 inserter.add_f32(*self)
948 }
949}
950
951impl IntoValue for f64 {
952 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
953 inserter.add_f64(*self)
954 }
955}
956
957impl IntoValue for str {
958 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
959 inserter.add_str(self)
960 }
961}
962
963impl IntoValue for String {
964 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
965 inserter.add_str(self)
966 }
967}
968
969impl IntoValue for [u8] {
970 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
971 inserter.add_bytes(self)
972 }
973}
974
975impl IntoValue for Vec<u8> {
976 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
977 inserter.add_bytes(self)
978 }
979}
980
981// Hyper-specific types
982
983impl IntoValue for Date {
984 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
985 inserter.add_date(*self)
986 }
987}
988
989impl IntoValue for Time {
990 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
991 inserter.add_time(*self)
992 }
993}
994
995impl IntoValue for Timestamp {
996 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
997 inserter.add_timestamp(*self)
998 }
999}
1000
1001impl IntoValue for OffsetTimestamp {
1002 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1003 inserter.add_offset_timestamp(*self)
1004 }
1005}
1006
1007impl IntoValue for Interval {
1008 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1009 inserter.add_interval(*self)
1010 }
1011}
1012
1013impl IntoValue for Numeric {
1014 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1015 inserter.add_numeric(*self)
1016 }
1017}
1018
1019impl IntoValue for Geography {
1020 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1021 inserter.add_geography(self)
1022 }
1023}
1024
1025// Option<T> for nullable values
1026impl<T: IntoValue> IntoValue for Option<T> {
1027 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1028 match self {
1029 Some(value) => value.add_to_inserter(inserter),
1030 None => inserter.add_null(),
1031 }
1032 }
1033}
1034
1035// Reference implementations for primitives
1036
1037impl IntoValue for &bool {
1038 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1039 inserter.add_bool(**self)
1040 }
1041}
1042
1043impl IntoValue for &i16 {
1044 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1045 inserter.add_i16(**self)
1046 }
1047}
1048
1049impl IntoValue for &i32 {
1050 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1051 inserter.add_i32(**self)
1052 }
1053}
1054
1055impl IntoValue for &i64 {
1056 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1057 inserter.add_i64(**self)
1058 }
1059}
1060
1061impl IntoValue for &f32 {
1062 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1063 inserter.add_f32(**self)
1064 }
1065}
1066
1067impl IntoValue for &f64 {
1068 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1069 inserter.add_f64(**self)
1070 }
1071}
1072
1073impl IntoValue for &String {
1074 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1075 inserter.add_str(self)
1076 }
1077}
1078
1079impl IntoValue for &str {
1080 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1081 inserter.add_str(self)
1082 }
1083}
1084
1085impl IntoValue for &&str {
1086 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1087 inserter.add_str(self)
1088 }
1089}
1090
1091impl IntoValue for &[u8] {
1092 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1093 inserter.add_bytes(self)
1094 }
1095}
1096
1097impl IntoValue for &Date {
1098 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1099 inserter.add_date(**self)
1100 }
1101}
1102
1103impl IntoValue for &Time {
1104 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1105 inserter.add_time(**self)
1106 }
1107}
1108
1109impl IntoValue for &Timestamp {
1110 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1111 inserter.add_timestamp(**self)
1112 }
1113}
1114
1115impl IntoValue for &OffsetTimestamp {
1116 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1117 inserter.add_offset_timestamp(**self)
1118 }
1119}
1120
1121impl IntoValue for &Interval {
1122 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1123 inserter.add_interval(**self)
1124 }
1125}
1126
1127impl IntoValue for &Numeric {
1128 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1129 inserter.add_numeric(**self)
1130 }
1131}
1132
1133impl IntoValue for &Geography {
1134 fn add_to_inserter(&self, inserter: &mut Inserter<'_>) -> Result<()> {
1135 inserter.add_geography(self)
1136 }
1137}
1138
1139// =============================================================================
1140// MappedInserter
1141// =============================================================================
1142
1143/// An inserter that supports SQL expression mappings.
1144///
1145/// This inserter uses a staging table to support computed columns via
1146/// INSERT...SELECT with SQL expressions. It's created by
1147/// [`Inserter::with_column_mappings`].
1148#[derive(Debug)]
1149pub struct MappedInserter<'conn> {
1150 /// The underlying inserter for the staging table.
1151 inner: Inserter<'conn>,
1152 /// The target table name.
1153 target_table: crate::TableName,
1154 /// The column mappings.
1155 mappings: Vec<ColumnMapping>,
1156 /// The staging table name.
1157 staging_table: String,
1158}
1159
1160impl<'conn> MappedInserter<'conn> {
1161 /// Creates a new mapped inserter.
1162 fn new<T>(
1163 connection: &'conn Connection,
1164 inserter_def: &TableDefinition,
1165 target_table: T,
1166 mappings: &[ColumnMapping],
1167 ) -> Result<Self>
1168 where
1169 T: TryInto<crate::TableName>,
1170 crate::Error: From<T::Error>,
1171 {
1172 let target_table = target_table.try_into()?;
1173
1174 // Create a unique staging table name
1175 let staging_table = format!("_hyper_staging_{}", std::process::id());
1176
1177 // Create the staging table definition (temporary)
1178 let mut staging_def = inserter_def.clone();
1179 staging_def.name.clone_from(&staging_table);
1180
1181 // Create the staging table
1182 let create_sql = staging_def.to_create_sql(true)?;
1183 let create_temp = create_sql.replace("CREATE TABLE", "CREATE TEMPORARY TABLE");
1184 connection.execute_command(&create_temp)?;
1185
1186 // Create the inner inserter for the staging table
1187 let inner = Inserter::new(connection, &staging_def)?;
1188
1189 Ok(MappedInserter {
1190 inner,
1191 target_table,
1192 mappings: mappings.to_vec(),
1193 staging_table,
1194 })
1195 }
1196
1197 /// Adds a row of values to the inserter.
1198 ///
1199 /// The values should correspond to the columns in the inserter definition,
1200 /// not the target table.
1201 ///
1202 /// # Errors
1203 ///
1204 /// Forwards the error from [`Inserter::add_row`].
1205 pub fn add_row(&mut self, values: &[&dyn IntoValue]) -> Result<()> {
1206 self.inner.add_row(values)
1207 }
1208
1209 /// Adds a NULL value.
1210 ///
1211 /// # Errors
1212 ///
1213 /// Forwards the error from [`Inserter::add_null`].
1214 pub fn add_null(&mut self) -> Result<()> {
1215 self.inner.add_null()
1216 }
1217
1218 /// Adds a boolean value.
1219 ///
1220 /// # Errors
1221 ///
1222 /// Forwards the error from [`Inserter::add_bool`].
1223 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1224 self.inner.add_bool(value)
1225 }
1226
1227 /// Adds an i16 value.
1228 ///
1229 /// # Errors
1230 ///
1231 /// Forwards the error from [`Inserter::add_i16`].
1232 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1233 self.inner.add_i16(value)
1234 }
1235
1236 /// Adds an i32 value.
1237 ///
1238 /// # Errors
1239 ///
1240 /// Forwards the error from [`Inserter::add_i32`].
1241 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1242 self.inner.add_i32(value)
1243 }
1244
1245 /// Adds an i64 value.
1246 ///
1247 /// # Errors
1248 ///
1249 /// Forwards the error from [`Inserter::add_i64`].
1250 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1251 self.inner.add_i64(value)
1252 }
1253
1254 /// Adds an f32 value.
1255 ///
1256 /// # Errors
1257 ///
1258 /// Forwards the error from [`Inserter::add_f32`].
1259 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1260 self.inner.add_f32(value)
1261 }
1262
1263 /// Adds an f64 value.
1264 ///
1265 /// # Errors
1266 ///
1267 /// Forwards the error from [`Inserter::add_f64`].
1268 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1269 self.inner.add_f64(value)
1270 }
1271
1272 /// Adds a string value.
1273 ///
1274 /// # Errors
1275 ///
1276 /// Forwards the error from [`Inserter::add_str`].
1277 pub fn add_str(&mut self, value: &str) -> Result<()> {
1278 self.inner.add_str(value)
1279 }
1280
1281 /// Adds a bytes value.
1282 ///
1283 /// # Errors
1284 ///
1285 /// Forwards the error from [`Inserter::add_bytes`].
1286 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1287 self.inner.add_bytes(value)
1288 }
1289
1290 /// Ends the current row.
1291 ///
1292 /// # Errors
1293 ///
1294 /// Forwards the error from [`Inserter::end_row`].
1295 pub fn end_row(&mut self) -> Result<()> {
1296 self.inner.end_row()
1297 }
1298
1299 /// Executes the insert with column mappings.
1300 ///
1301 /// This method:
1302 /// 1. Inserts all buffered rows into the staging table
1303 /// 2. Executes INSERT...SELECT from staging to target with mappings
1304 /// 3. Drops the staging table
1305 ///
1306 /// Returns the number of rows inserted into the target table.
1307 ///
1308 /// # Errors
1309 ///
1310 /// - Returns the error from the inner [`Inserter::execute`] if writing
1311 /// the staging rows fails.
1312 /// - Returns [`Error::Server`] if the `INSERT ... SELECT` from staging
1313 /// to the target table is rejected (e.g. a mapping expression fails
1314 /// to evaluate).
1315 /// - Returns [`Error::Server`] if dropping the staging table fails.
1316 pub fn execute(&mut self) -> Result<u64> {
1317 let connection = self.inner.connection;
1318 let staging_table = self.staging_table.clone();
1319
1320 // Insert data into staging table
1321 let _staging_rows = self.inner.execute()?;
1322
1323 // Build the INSERT...SELECT statement
1324 use hyperdb_api_core::protocol::escape::SqlIdentifier;
1325
1326 let target_columns: Vec<String> = self
1327 .mappings
1328 .iter()
1329 .map(|m| format!("{}", SqlIdentifier(&m.column_name)))
1330 .collect();
1331
1332 let select_items: Vec<String> = self
1333 .mappings
1334 .iter()
1335 .map(ColumnMapping::to_select_item)
1336 .collect();
1337
1338 let sql = format!(
1339 "INSERT INTO {} ({}) SELECT {} FROM {}",
1340 self.target_table,
1341 target_columns.join(", "),
1342 select_items.join(", "),
1343 SqlIdentifier(&staging_table),
1344 );
1345
1346 // Execute the INSERT...SELECT (returns row count directly)
1347 let row_count = connection.execute_command(&sql)?;
1348
1349 // Drop the staging table
1350 connection.execute_command(&format!(
1351 "DROP TABLE IF EXISTS {}",
1352 SqlIdentifier(&staging_table)
1353 ))?;
1354
1355 // Return the number of rows inserted
1356 Ok(row_count)
1357 }
1358
1359 /// Cancels the insert and drops the staging table.
1360 ///
1361 /// This method handles cleanup failures gracefully by logging warnings
1362 /// instead of returning errors. This prevents masking the original error
1363 /// that caused the cancellation.
1364 ///
1365 /// # Logging
1366 ///
1367 /// Cleanup failures are logged using the `tracing` crate at WARN level.
1368 /// If `tracing` is not initialized, errors are written to stderr.
1369 pub fn cancel(&mut self) {
1370 let connection = self.inner.connection;
1371 let staging_table = &self.staging_table;
1372
1373 // Drop the staging table, but don't fail if cleanup fails
1374 // This avoids masking the original error that caused cancellation
1375 if let Err(e) = connection.execute_command(&format!(
1376 "DROP TABLE IF EXISTS \"{}\"",
1377 staging_table.replace('"', "\"\"")
1378 )) {
1379 // Log the cleanup failure for debugging
1380 // In production, consider using a logging framework like `tracing`
1381 eprintln!("Warning: Failed to drop staging table '{staging_table}' during cancel: {e}");
1382 }
1383 }
1384}
1385
1386// =============================================================================
1387// InsertChunk - Thread-safe chunk for parallel encoding
1388// =============================================================================
1389
1390/// A thread-safe chunk for encoding rows in parallel.
1391///
1392/// `InsertChunk` can be created and populated in any thread, then sent to a
1393/// [`ChunkSender`] for transmission. This enables parallel data encoding across
1394/// multiple worker threads while serializing the actual network sends.
1395///
1396/// # Example
1397///
1398/// ```no_run
1399/// use hyperdb_api::{InsertChunk, TableDefinition, SqlType, Result};
1400///
1401/// fn encode_chunk(table_def: &TableDefinition, start_id: i32) -> Result<InsertChunk> {
1402/// let mut chunk = InsertChunk::from_table_definition(table_def);
1403///
1404/// for i in 0..1000 {
1405/// chunk.add_i32(start_id + i)?;
1406/// chunk.add_str(&format!("Item {}", start_id + i))?;
1407/// chunk.end_row()?;
1408/// }
1409///
1410/// Ok(chunk)
1411/// }
1412/// ```
1413#[derive(Debug)]
1414pub struct InsertChunk {
1415 buffer: BytesMut,
1416 header_written: bool,
1417 column_index: usize,
1418 column_count: usize,
1419 row_count: usize,
1420 column_nullable: Vec<bool>,
1421}
1422
1423// SAFETY: Every field of `InsertChunk` (`BytesMut`, `bool`, `usize`,
1424// `Vec<bool>`) is itself `Send`, and none of them hold raw pointers or
1425// thread-local state. The manual `unsafe impl` exists only because the
1426// auto-trait derivation is conservative for this struct's compilation context;
1427// the compound type has no `!Send` components.
1428unsafe impl Send for InsertChunk {}
1429// SAFETY: Same reasoning as the `Send` impl above — all fields are `Sync`
1430// and there is no interior mutability crossing a `&InsertChunk` boundary,
1431// so sharing `&InsertChunk` across threads is sound.
1432unsafe impl Sync for InsertChunk {}
1433
1434impl InsertChunk {
1435 /// Creates a new empty chunk with the given schema.
1436 ///
1437 /// # Arguments
1438 ///
1439 /// * `column_count` - Number of columns per row
1440 /// * `column_nullable` - Whether each column is nullable
1441 #[must_use]
1442 pub fn new(column_count: usize, column_nullable: Vec<bool>) -> Self {
1443 debug_assert_eq!(column_count, column_nullable.len());
1444 InsertChunk {
1445 buffer: BytesMut::with_capacity(INITIAL_BUFFER_SIZE),
1446 header_written: false,
1447 column_index: 0,
1448 column_count,
1449 row_count: 0,
1450 column_nullable,
1451 }
1452 }
1453
1454 /// Creates a chunk from a table definition.
1455 #[must_use]
1456 pub fn from_table_definition(table_def: &TableDefinition) -> Self {
1457 let column_nullable: Vec<bool> = table_def.columns.iter().map(|c| c.nullable).collect();
1458 Self::new(table_def.column_count(), column_nullable)
1459 }
1460
1461 /// Returns the number of complete rows in this chunk.
1462 #[must_use]
1463 pub fn row_count(&self) -> usize {
1464 self.row_count
1465 }
1466
1467 /// Returns the current buffer size in bytes.
1468 #[must_use]
1469 pub fn buffer_size(&self) -> usize {
1470 self.buffer.len()
1471 }
1472
1473 /// Returns true if the chunk has reached size or row limits and should be sent.
1474 #[must_use]
1475 pub fn should_flush(&self) -> bool {
1476 self.row_count >= CHUNK_ROW_LIMIT || self.buffer.len() >= CHUNK_SIZE_LIMIT
1477 }
1478
1479 /// Returns true if the chunk is empty (no rows).
1480 #[must_use]
1481 pub fn is_empty(&self) -> bool {
1482 self.row_count == 0
1483 }
1484
1485 /// Takes the buffer, consuming the chunk data.
1486 ///
1487 /// Returns `None` if the chunk is empty. After calling this, the chunk
1488 /// can be reused by calling the add_* methods again.
1489 ///
1490 /// Note: The header flag is NOT reset - subsequent chunks from the same
1491 /// `InsertChunk` will NOT include the header (`HyperBinary` only needs one
1492 /// header per COPY stream).
1493 pub fn take(&mut self) -> Option<BytesMut> {
1494 if self.row_count == 0 {
1495 return None;
1496 }
1497 // Don't reset header_written - only first chunk should have header
1498 self.row_count = 0;
1499 Some(std::mem::take(&mut self.buffer))
1500 }
1501
1502 /// Resets the chunk for reuse without reallocating.
1503 pub fn clear(&mut self) {
1504 self.buffer.clear();
1505 self.header_written = false;
1506 self.column_index = 0;
1507 self.row_count = 0;
1508 }
1509
1510 #[allow(
1511 clippy::inline_always,
1512 reason = "hot-path numeric kernel; forced inlining measured to matter on this specific function"
1513 )]
1514 fn ensure_header(&mut self) {
1515 if !self.header_written {
1516 copy::write_header(&mut self.buffer);
1517 self.header_written = true;
1518 }
1519 }
1520
1521 #[expect(
1522 clippy::inline_always,
1523 reason = "hot inner loop of the inserter; measured to matter for per-row throughput"
1524 )]
1525 #[inline(always)]
1526 fn current_column_nullable(&self) -> bool {
1527 *self.column_nullable.get(self.column_index).unwrap_or(&true)
1528 }
1529
1530 /// Adds a NULL value for the current column.
1531 ///
1532 /// # Errors
1533 ///
1534 /// - Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"`
1535 /// if the current row already has all columns supplied.
1536 /// - Returns [`Error::InvalidTableDefinition`] with message
1537 /// `"Cannot add NULL to non-nullable column"` if the current column
1538 /// is `NOT NULL` in the schema.
1539 pub fn add_null(&mut self) -> Result<()> {
1540 if self.column_index >= self.column_count {
1541 return Err(Error::invalid_table_definition("Too many columns in row"));
1542 }
1543 if !self.current_column_nullable() {
1544 return Err(Error::invalid_table_definition(
1545 "Cannot add NULL to non-nullable column",
1546 ));
1547 }
1548 self.ensure_header();
1549 copy::write_null(&mut self.buffer);
1550 self.column_index += 1;
1551 Ok(())
1552 }
1553
1554 /// Adds a boolean value.
1555 ///
1556 /// # Errors
1557 ///
1558 /// Returns [`Error::InvalidTableDefinition`] with message `"Too many columns in row"` if
1559 /// the current row already has all columns supplied.
1560 pub fn add_bool(&mut self, value: bool) -> Result<()> {
1561 if self.column_index >= self.column_count {
1562 return Err(Error::invalid_table_definition("Too many columns in row"));
1563 }
1564 self.ensure_header();
1565 let int_value = i8::from(value);
1566 if self.current_column_nullable() {
1567 copy::write_i8(&mut self.buffer, int_value);
1568 } else {
1569 copy::write_i8_not_null(&mut self.buffer, int_value);
1570 }
1571 self.column_index += 1;
1572 Ok(())
1573 }
1574
1575 /// Adds an i16 value (SMALLINT).
1576 ///
1577 /// # Errors
1578 ///
1579 /// See [`add_bool`](Self::add_bool).
1580 pub fn add_i16(&mut self, value: i16) -> Result<()> {
1581 if self.column_index >= self.column_count {
1582 return Err(Error::invalid_table_definition("Too many columns in row"));
1583 }
1584 self.ensure_header();
1585 if self.current_column_nullable() {
1586 copy::write_i16(&mut self.buffer, value);
1587 } else {
1588 copy::write_i16_not_null(&mut self.buffer, value);
1589 }
1590 self.column_index += 1;
1591 Ok(())
1592 }
1593
1594 /// Adds an i32 value (INT).
1595 ///
1596 /// # Errors
1597 ///
1598 /// See [`add_bool`](Self::add_bool).
1599 pub fn add_i32(&mut self, value: i32) -> Result<()> {
1600 if self.column_index >= self.column_count {
1601 return Err(Error::invalid_table_definition("Too many columns in row"));
1602 }
1603 self.ensure_header();
1604 if self.current_column_nullable() {
1605 copy::write_i32(&mut self.buffer, value);
1606 } else {
1607 copy::write_i32_not_null(&mut self.buffer, value);
1608 }
1609 self.column_index += 1;
1610 Ok(())
1611 }
1612
1613 /// Adds an i64 value (BIGINT).
1614 ///
1615 /// # Errors
1616 ///
1617 /// See [`add_bool`](Self::add_bool).
1618 pub fn add_i64(&mut self, value: i64) -> Result<()> {
1619 if self.column_index >= self.column_count {
1620 return Err(Error::invalid_table_definition("Too many columns in row"));
1621 }
1622 self.ensure_header();
1623 if self.current_column_nullable() {
1624 copy::write_i64(&mut self.buffer, value);
1625 } else {
1626 copy::write_i64_not_null(&mut self.buffer, value);
1627 }
1628 self.column_index += 1;
1629 Ok(())
1630 }
1631
1632 /// Adds an f32 value (REAL/FLOAT4).
1633 ///
1634 /// # Errors
1635 ///
1636 /// See [`add_bool`](Self::add_bool).
1637 pub fn add_f32(&mut self, value: f32) -> Result<()> {
1638 if self.column_index >= self.column_count {
1639 return Err(Error::invalid_table_definition("Too many columns in row"));
1640 }
1641 self.ensure_header();
1642 if self.current_column_nullable() {
1643 copy::write_f32(&mut self.buffer, value);
1644 } else {
1645 copy::write_f32_not_null(&mut self.buffer, value);
1646 }
1647 self.column_index += 1;
1648 Ok(())
1649 }
1650
1651 /// Adds an f64 value (DOUBLE PRECISION/FLOAT8).
1652 ///
1653 /// # Errors
1654 ///
1655 /// See [`add_bool`](Self::add_bool).
1656 pub fn add_f64(&mut self, value: f64) -> Result<()> {
1657 if self.column_index >= self.column_count {
1658 return Err(Error::invalid_table_definition("Too many columns in row"));
1659 }
1660 self.ensure_header();
1661 if self.current_column_nullable() {
1662 copy::write_f64(&mut self.buffer, value);
1663 } else {
1664 copy::write_f64_not_null(&mut self.buffer, value);
1665 }
1666 self.column_index += 1;
1667 Ok(())
1668 }
1669
1670 /// Adds a string value (TEXT/VARCHAR).
1671 ///
1672 /// # Errors
1673 ///
1674 /// See [`add_bool`](Self::add_bool).
1675 pub fn add_str(&mut self, value: &str) -> Result<()> {
1676 self.add_bytes(value.as_bytes())
1677 }
1678
1679 /// Adds a bytes value (BYTEA).
1680 ///
1681 /// # Errors
1682 ///
1683 /// See [`add_bool`](Self::add_bool).
1684 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
1685 if self.column_index >= self.column_count {
1686 return Err(Error::invalid_table_definition("Too many columns in row"));
1687 }
1688 if value.len() > u32::MAX as usize {
1689 return Err(Error::conversion(format!(
1690 "Value length {} exceeds HyperBinary 4-byte length limit ({})",
1691 value.len(),
1692 u32::MAX
1693 )));
1694 }
1695 self.ensure_header();
1696 if self.current_column_nullable() {
1697 copy::write_varbinary(&mut self.buffer, value);
1698 } else {
1699 copy::write_varbinary_not_null(&mut self.buffer, value);
1700 }
1701 self.column_index += 1;
1702 Ok(())
1703 }
1704
1705 /// Adds a 128-bit value (NUMERIC/INTERVAL).
1706 ///
1707 /// # Errors
1708 ///
1709 /// See [`add_bool`](Self::add_bool).
1710 pub fn add_data128(&mut self, value: &[u8; 16]) -> Result<()> {
1711 if self.column_index >= self.column_count {
1712 return Err(Error::invalid_table_definition("Too many columns in row"));
1713 }
1714 self.ensure_header();
1715 if self.current_column_nullable() {
1716 copy::write_data128(&mut self.buffer, value);
1717 } else {
1718 copy::write_data128_not_null(&mut self.buffer, value);
1719 }
1720 self.column_index += 1;
1721 Ok(())
1722 }
1723
1724 /// Adds a Date value.
1725 ///
1726 /// # Errors
1727 ///
1728 /// See [`add_bool`](Self::add_bool).
1729 pub fn add_date(&mut self, value: Date) -> Result<()> {
1730 if self.column_index >= self.column_count {
1731 return Err(Error::invalid_table_definition("Too many columns in row"));
1732 }
1733 self.ensure_header();
1734 let julian_day = value.to_julian_day();
1735 if self.current_column_nullable() {
1736 copy::write_i32(&mut self.buffer, julian_day);
1737 } else {
1738 copy::write_i32_not_null(&mut self.buffer, julian_day);
1739 }
1740 self.column_index += 1;
1741 Ok(())
1742 }
1743
1744 /// Adds a Time value.
1745 ///
1746 /// # Errors
1747 ///
1748 /// See [`add_bool`](Self::add_bool).
1749 pub fn add_time(&mut self, value: Time) -> Result<()> {
1750 if self.column_index >= self.column_count {
1751 return Err(Error::invalid_table_definition("Too many columns in row"));
1752 }
1753 self.ensure_header();
1754 let micros = value.to_microseconds();
1755 if self.current_column_nullable() {
1756 copy::write_i64(&mut self.buffer, micros);
1757 } else {
1758 copy::write_i64_not_null(&mut self.buffer, micros);
1759 }
1760 self.column_index += 1;
1761 Ok(())
1762 }
1763
1764 /// Adds a Timestamp value.
1765 ///
1766 /// # Errors
1767 ///
1768 /// See [`add_bool`](Self::add_bool).
1769 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
1770 if self.column_index >= self.column_count {
1771 return Err(Error::invalid_table_definition("Too many columns in row"));
1772 }
1773 self.ensure_header();
1774 let micros = value.to_microseconds();
1775 if self.current_column_nullable() {
1776 copy::write_i64(&mut self.buffer, micros);
1777 } else {
1778 copy::write_i64_not_null(&mut self.buffer, micros);
1779 }
1780 self.column_index += 1;
1781 Ok(())
1782 }
1783
1784 /// Adds an `OffsetTimestamp` (TIMESTAMP WITH TIME ZONE) value.
1785 ///
1786 /// # Errors
1787 ///
1788 /// See [`add_bool`](Self::add_bool).
1789 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
1790 if self.column_index >= self.column_count {
1791 return Err(Error::invalid_table_definition("Too many columns in row"));
1792 }
1793 self.ensure_header();
1794 let micros = value.to_microseconds_utc();
1795 if self.current_column_nullable() {
1796 copy::write_i64(&mut self.buffer, micros);
1797 } else {
1798 copy::write_i64_not_null(&mut self.buffer, micros);
1799 }
1800 self.column_index += 1;
1801 Ok(())
1802 }
1803
1804 /// Adds an Interval value.
1805 ///
1806 /// # Errors
1807 ///
1808 /// See [`add_bool`](Self::add_bool).
1809 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
1810 if self.column_index >= self.column_count {
1811 return Err(Error::invalid_table_definition("Too many columns in row"));
1812 }
1813 self.ensure_header();
1814 let packed = value.to_packed();
1815 if self.current_column_nullable() {
1816 copy::write_data128(&mut self.buffer, &packed);
1817 } else {
1818 copy::write_data128_not_null(&mut self.buffer, &packed);
1819 }
1820 self.column_index += 1;
1821 Ok(())
1822 }
1823
1824 /// Adds a Geography value.
1825 ///
1826 /// # Errors
1827 ///
1828 /// See [`add_bool`](Self::add_bool).
1829 pub fn add_geography(&mut self, value: &Geography) -> Result<()> {
1830 // Geography uses the same varbinary path as add_bytes
1831 self.add_bytes(value.as_bytes())
1832 }
1833
1834 /// Ends the current row.
1835 ///
1836 /// Returns an error if the wrong number of columns were added.
1837 ///
1838 /// # Errors
1839 ///
1840 /// Returns [`Error::InvalidTableDefinition`] if fewer (or more) columns were supplied
1841 /// for this row than the chunk's column count.
1842 pub fn end_row(&mut self) -> Result<()> {
1843 if self.column_index != self.column_count {
1844 return Err(Error::invalid_table_definition(format!(
1845 "Expected {} columns, got {}",
1846 self.column_count, self.column_index
1847 )));
1848 }
1849 self.column_index = 0;
1850 self.row_count += 1;
1851 Ok(())
1852 }
1853
1854 /// Returns the current column index (for checking incomplete rows).
1855 #[must_use]
1856 pub fn column_index(&self) -> usize {
1857 self.column_index
1858 }
1859
1860 /// Returns a reference to the internal buffer.
1861 pub(crate) fn buffer(&self) -> &BytesMut {
1862 &self.buffer
1863 }
1864}
1865
1866// =============================================================================
1867// ChunkSender - Mutex-protected sender for InsertChunks
1868// =============================================================================
1869
1870use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1871use std::sync::Mutex;
1872
1873/// A thread-safe sender for [`InsertChunk`]s.
1874///
1875/// `ChunkSender` manages the COPY protocol connection and ensures that only one
1876/// chunk is sent at a time. Multiple threads can call `send_chunk()` concurrently;
1877/// the mutex ensures serialized access.
1878///
1879/// # Example
1880///
1881/// ```no_run
1882/// use hyperdb_api::{Catalog, Connection, CreateMode, ChunkSender, InsertChunk, TableDefinition, SqlType, Result};
1883/// use std::sync::mpsc;
1884/// use std::thread;
1885///
1886/// fn main() -> Result<()> {
1887/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
1888///
1889/// let table_def = TableDefinition::new("products")
1890/// .add_required_column("id", SqlType::int())
1891/// .add_nullable_column("name", SqlType::text());
1892///
1893/// Catalog::new(&conn).create_table(&table_def)?;
1894///
1895/// let sender = ChunkSender::new(&conn, &table_def)?;
1896/// let (tx, rx) = mpsc::channel::<InsertChunk>();
1897///
1898/// // Worker thread
1899/// let table_def_clone = table_def.clone();
1900/// let handle = thread::spawn(move || {
1901/// let mut chunk = InsertChunk::from_table_definition(&table_def_clone);
1902/// for i in 0..1000i32 {
1903/// chunk.add_i32(i).unwrap();
1904/// chunk.add_str(&format!("Product {}", i)).unwrap();
1905/// chunk.end_row().unwrap();
1906/// }
1907/// tx.send(chunk).unwrap();
1908/// });
1909///
1910/// // Receive and send chunks
1911/// while let Ok(chunk) = rx.recv() {
1912/// sender.send_chunk(chunk)?;
1913/// }
1914///
1915/// handle.join().unwrap();
1916/// let rows = sender.finish()?;
1917/// println!("Inserted {} rows", rows);
1918/// Ok(())
1919/// }
1920/// ```
1921#[derive(Debug)]
1922pub struct ChunkSender<'conn> {
1923 connection: &'conn Connection,
1924 table_name: String,
1925 columns: Vec<String>,
1926 writer: Mutex<Option<CopyInWriter<'conn>>>,
1927 header_sent: std::sync::atomic::AtomicBool,
1928 total_rows: AtomicU64,
1929 chunks_sent: AtomicUsize,
1930}
1931
1932impl<'conn> ChunkSender<'conn> {
1933 /// Creates a new chunk sender for the given table.
1934 ///
1935 /// # Errors
1936 ///
1937 /// Returns [`Error::InvalidTableDefinition`] if `table_def` has zero
1938 /// columns. The COPY session itself is opened lazily on the first
1939 /// [`send_chunk`](Self::send_chunk), so transport errors surface there.
1940 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
1941 if table_def.column_count() == 0 {
1942 return Err(Error::invalid_table_definition(
1943 "Table definition must have at least one column",
1944 ));
1945 }
1946
1947 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
1948 let table_name = table_def.qualified_name();
1949
1950 Ok(ChunkSender {
1951 connection,
1952 table_name,
1953 columns,
1954 writer: Mutex::new(None),
1955 header_sent: std::sync::atomic::AtomicBool::new(false),
1956 total_rows: AtomicU64::new(0),
1957 chunks_sent: AtomicUsize::new(0),
1958 })
1959 }
1960
1961 /// Sends a chunk to Hyper.
1962 ///
1963 /// This method is thread-safe - multiple threads can call it concurrently,
1964 /// but only one chunk will be sent at a time.
1965 ///
1966 /// Each `InsertChunk` includes a `HyperBinary` header (19 bytes). This method
1967 /// automatically handles headers: the first chunk's header is sent, and
1968 /// headers in subsequent chunks are stripped (`HyperBinary` expects only one
1969 /// header per COPY stream).
1970 ///
1971 /// # Errors
1972 ///
1973 /// Returns an error if the chunk is empty or if sending fails.
1974 pub fn send_chunk(&self, mut chunk: InsertChunk) -> Result<()> {
1975 // Capture row count before take() resets it
1976 let row_count = chunk.row_count();
1977
1978 let Some(buffer) = chunk.take() else {
1979 return Ok(());
1980 };
1981
1982 // Acquire the lock for exclusive send access
1983 let mut writer_guard = self
1984 .writer
1985 .lock()
1986 .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
1987
1988 // Lazily initialize the COPY connection
1989 if writer_guard.is_none() {
1990 let client = self.connection.tcp_client().ok_or_else(|| {
1991 Error::feature_not_supported(
1992 "ChunkSender requires a TCP connection. gRPC connections do not support COPY operations."
1993 )
1994 })?;
1995 let columns: Vec<&str> = self
1996 .columns
1997 .iter()
1998 .map(std::string::String::as_str)
1999 .collect();
2000 *writer_guard = Some(client.copy_in(&self.table_name, &columns)?);
2001 }
2002
2003 // Handle headers: only first chunk should have header in the COPY stream
2004 // Each InsertChunk includes a 19-byte HyperBinary header, so we need to
2005 // strip headers from all chunks except the first one sent.
2006 let is_first = !self.header_sent.swap(true, Ordering::SeqCst);
2007
2008 let data_to_send = if is_first {
2009 // First chunk: send with header
2010 &buffer[..]
2011 } else {
2012 // Subsequent chunks: strip the 19-byte header if present
2013 if buffer.len() > hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE
2014 && buffer.starts_with(hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER)
2015 {
2016 &buffer[hyperdb_api_core::protocol::copy::HYPER_BINARY_HEADER_SIZE..]
2017 } else {
2018 &buffer[..]
2019 }
2020 };
2021
2022 // Write the chunk directly to the socket, avoiding a full-chunk memcpy
2023 // into the connection's write buffer. flush_stream ensures the data
2024 // reaches the server before we return.
2025 if let Some(ref mut writer) = *writer_guard {
2026 writer.send_direct(data_to_send)?;
2027 writer.flush_stream()?;
2028 }
2029
2030 // Update counters (lock already released for these atomic ops)
2031 drop(writer_guard);
2032 self.total_rows
2033 .fetch_add(row_count as u64, Ordering::Relaxed);
2034 self.chunks_sent.fetch_add(1, Ordering::Relaxed);
2035
2036 debug!(
2037 target: "hyperdb_api",
2038 chunk = self.chunks_sent.load(Ordering::Relaxed),
2039 rows = row_count,
2040 bytes = data_to_send.len(),
2041 "chunk-sender"
2042 );
2043
2044 Ok(())
2045 }
2046
2047 /// Returns the total number of rows sent so far.
2048 pub fn total_rows(&self) -> u64 {
2049 self.total_rows.load(Ordering::Relaxed)
2050 }
2051
2052 /// Returns the number of chunks sent so far.
2053 pub fn chunks_sent(&self) -> usize {
2054 self.chunks_sent.load(Ordering::Relaxed)
2055 }
2056
2057 /// Finishes the COPY operation and returns the total row count.
2058 ///
2059 /// This method consumes the sender. After calling this, the COPY operation
2060 /// is complete and all data has been committed.
2061 ///
2062 /// # Errors
2063 ///
2064 /// - Returns [`Error::Internal`] with message `"ChunkSender mutex poisoned"`
2065 /// if a sender thread panicked while holding the writer lock.
2066 /// - Returns [`Error::Server`] or [`Error::Io`] if sending the COPY
2067 /// trailer or finishing the COPY operation fails.
2068 pub fn finish(self) -> Result<u64> {
2069 let mut writer_guard = self
2070 .writer
2071 .lock()
2072 .map_err(|_| Error::internal("ChunkSender mutex poisoned"))?;
2073
2074 // If no chunks were sent, return 0
2075 let Some(writer) = writer_guard.take() else {
2076 return Ok(0);
2077 };
2078
2079 // Write and send the COPY trailer
2080 let mut trailer_buf = BytesMut::with_capacity(2);
2081 copy::write_trailer(&mut trailer_buf);
2082
2083 // Need to get mutable access to send trailer
2084 let mut writer = writer;
2085 writer.send(&trailer_buf)?;
2086
2087 // Finish the COPY operation
2088 let rows = writer.finish()?;
2089
2090 info!(
2091 target: "hyperdb_api",
2092 rows,
2093 chunks = self.chunks_sent.load(Ordering::Relaxed),
2094 table = %self.table_name,
2095 "chunk-sender-finish"
2096 );
2097
2098 Ok(rows)
2099 }
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104 use crate::table_definition::TableDefinition;
2105 use hyperdb_api_core::types::SqlType;
2106
2107 use super::InsertChunk;
2108
2109 fn create_test_table_def() -> TableDefinition {
2110 TableDefinition::new("test")
2111 .add_required_column("id", SqlType::int())
2112 .add_nullable_column("name", SqlType::text())
2113 }
2114
2115 #[test]
2116 fn test_inserter_column_validation() {
2117 // We can't fully test without a connection, but we can test validation logic
2118 let table_def = create_test_table_def();
2119 assert_eq!(table_def.column_count(), 2);
2120 }
2121
2122 #[test]
2123 fn test_insert_chunk_encoding() {
2124 let table_def = create_test_table_def();
2125 let mut chunk = InsertChunk::from_table_definition(&table_def);
2126
2127 // Add a row
2128 chunk.add_i32(42).unwrap();
2129 chunk.add_str("hello").unwrap();
2130 chunk.end_row().unwrap();
2131
2132 assert_eq!(chunk.row_count(), 1);
2133 assert!(!chunk.is_empty());
2134 assert!(!chunk.should_flush()); // Not at limit yet
2135
2136 // Add more rows
2137 for i in 0..100 {
2138 chunk.add_i32(i).unwrap();
2139 chunk.add_str(&format!("item {i}")).unwrap();
2140 chunk.end_row().unwrap();
2141 }
2142
2143 assert_eq!(chunk.row_count(), 101);
2144
2145 // Take the buffer
2146 let buffer = chunk.take().unwrap();
2147 assert!(!buffer.is_empty());
2148
2149 // Chunk should now be empty after take
2150 assert!(chunk.take().is_none());
2151 }
2152
2153 #[test]
2154 fn test_insert_chunk_null_handling() {
2155 let table_def = create_test_table_def();
2156 let mut chunk = InsertChunk::from_table_definition(&table_def);
2157
2158 // First column is NOT NULL, should fail
2159 assert!(chunk.add_null().is_err());
2160
2161 // Add the required column first
2162 chunk.add_i32(1).unwrap();
2163
2164 // Second column is nullable, should succeed
2165 chunk.add_null().unwrap();
2166 chunk.end_row().unwrap();
2167
2168 assert_eq!(chunk.row_count(), 1);
2169 }
2170
2171 #[test]
2172 fn test_insert_chunk_column_count_validation() {
2173 let table_def = create_test_table_def();
2174 let mut chunk = InsertChunk::from_table_definition(&table_def);
2175
2176 // Add only one column
2177 chunk.add_i32(1).unwrap();
2178
2179 // end_row should fail
2180 assert!(chunk.end_row().is_err());
2181
2182 // Add second column
2183 chunk.add_str("test").unwrap();
2184
2185 // Now end_row should succeed
2186 chunk.end_row().unwrap();
2187 }
2188
2189 #[test]
2190 fn test_insert_chunk_too_many_columns() {
2191 let table_def = create_test_table_def();
2192 let mut chunk = InsertChunk::from_table_definition(&table_def);
2193
2194 chunk.add_i32(1).unwrap();
2195 chunk.add_str("test").unwrap();
2196
2197 // Third column should fail
2198 assert!(chunk.add_i32(2).is_err());
2199 }
2200
2201 #[test]
2202 fn test_insert_chunk_clear() {
2203 let table_def = create_test_table_def();
2204 let mut chunk = InsertChunk::from_table_definition(&table_def);
2205
2206 chunk.add_i32(1).unwrap();
2207 chunk.add_str("test").unwrap();
2208 chunk.end_row().unwrap();
2209
2210 assert_eq!(chunk.row_count(), 1);
2211
2212 chunk.clear();
2213
2214 assert_eq!(chunk.row_count(), 0);
2215 assert!(chunk.is_empty());
2216 }
2217}