hyperdb_api/arrow_inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`ArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables.
8//!
9//! Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
10//! format, `ArrowInserter` accepts complete Arrow IPC stream data, making it ideal
11//! for integration with Arrow-based data pipelines.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
17//! # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
18//! # fn get_arrow_data() -> Vec<u8> { vec![] }
19//! use hyperdb_api::{ArrowInserter, Connection, TableDefinition};
20//!
21//! // Arrow IPC data from external source (e.g., arrow crate, Parquet reader)
22//! let arrow_ipc_data: Vec<u8> = get_arrow_data();
23//!
24//! let mut inserter = ArrowInserter::new(&conn, &table_def)?;
25//! inserter.insert_data(&arrow_ipc_data)?;
26//! let rows = inserter.execute()?;
27//! # Ok(())
28//! # }
29//! ```
30
31use std::time::Instant;
32
33use arrow::ipc::writer::StreamWriter;
34use hyperdb_api_core::client::client::CopyInWriter;
35use tracing::{debug, info};
36
37/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
38const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
39
40use crate::catalog::Catalog;
41use crate::connection::Connection;
42use crate::data_format::DataFormat;
43use crate::error::{Error, Result};
44use crate::table_definition::TableDefinition;
45
46/// Tracks which insertion pathway is active, preventing unsafe mixing of
47/// raw IPC methods (`insert_data`/`insert_record_batches`/`insert_raw`) with
48/// the `RecordBatch`-based method (`insert_batch`).
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50enum InsertMode {
51 /// Raw IPC bytes sent via `insert_data` / `insert_record_batches` / `insert_raw`.
52 RawIpc,
53 /// `RecordBatch` objects serialized through a `StreamWriter` via `insert_batch`.
54 BatchIpc,
55}
56
57/// Inserts Arrow IPC stream data into a Hyper table.
58///
59/// Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
60/// format, `ArrowInserter` accepts pre-formatted Arrow IPC stream data. This is useful
61/// when integrating with Arrow-based data pipelines or when you already have data in
62/// Arrow format.
63///
64/// # Arrow IPC Stream Format
65///
66/// Arrow IPC streams consist of:
67/// 1. A schema message (describing column names and types)
68/// 2. One or more record batch messages (containing the actual data)
69///
70/// The schema must match the target table's schema.
71///
72/// # Single vs Multiple Chunks
73///
74/// For single-chunk inserts, use [`insert_data()`](Self::insert_data) with a complete
75/// Arrow IPC stream (schema + record batches).
76///
77/// For multiple chunks (large datasets), use:
78/// 1. [`insert_data()`](Self::insert_data) for the first chunk (with schema)
79/// 2. [`insert_record_batches()`](Self::insert_record_batches) for subsequent chunks (without schema)
80///
81/// # Example
82///
83/// ```no_run
84/// use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};
85///
86/// fn main() -> Result<()> {
87/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
88///
89/// let table_def = TableDefinition::new("data")
90/// .add_required_column("id", SqlType::int())
91/// .add_nullable_column("value", SqlType::double());
92///
93/// Catalog::new(&conn).create_table(&table_def)?;
94///
95/// // Arrow IPC data from external source
96/// let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
97///
98/// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
99/// inserter.insert_data(&arrow_data)?;
100/// let rows = inserter.execute()?;
101/// println!("Inserted {} rows", rows);
102/// Ok(())
103/// }
104/// ```
105pub struct ArrowInserter<'conn> {
106 connection: &'conn Connection,
107 table_name: String,
108 columns: Vec<String>,
109 writer: Option<CopyInWriter<'conn>>,
110 /// Tracks whether an Arrow schema has been sent.
111 /// Sending schema twice causes an error in Hyper.
112 schema_sent: bool,
113 /// Total bytes sent (for logging).
114 total_bytes: usize,
115 /// Number of chunks sent.
116 chunk_count: usize,
117 /// Start time for timing the insert operation.
118 start_time: Instant,
119 /// Flush threshold in bytes. Data is buffered until this threshold is reached.
120 flush_threshold: usize,
121 /// Bytes buffered since the last flush.
122 buffered_bytes: usize,
123 /// Persistent IPC `StreamWriter` for `insert_batch()`. Streams each `RecordBatch`
124 /// eagerly — the schema is written on first use, then each batch is serialized
125 /// and sent immediately via `send_direct()`. The internal `Vec<u8>` buffer is
126 /// drained after every write to keep memory usage bounded at `O(batch_size)`.
127 batch_ipc_writer: Option<StreamWriter<Vec<u8>>>,
128 /// Tracks which insertion pathway is active. `None` means no data has been
129 /// sent yet. Once set, mixing pathways is an error.
130 insert_mode: Option<InsertMode>,
131}
132
133impl std::fmt::Debug for ArrowInserter<'_> {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.debug_struct("ArrowInserter")
136 .field("table_name", &self.table_name)
137 .field("columns", &self.columns)
138 .field("schema_sent", &self.schema_sent)
139 .field("chunk_count", &self.chunk_count)
140 .field("total_bytes", &self.total_bytes)
141 .finish_non_exhaustive()
142 }
143}
144
145impl<'conn> ArrowInserter<'conn> {
146 /// Creates a new Arrow inserter for the given table.
147 ///
148 /// # Arguments
149 ///
150 /// * `connection` - The database connection.
151 /// * `table_def` - The table definition for the target table.
152 ///
153 /// # Example
154 ///
155 /// ```no_run
156 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
157 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
158 /// let inserter = ArrowInserter::new(&conn, &table_def)?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// # Errors
164 ///
165 /// - Returns [`Error::InvalidTableDefinition`] with message
166 /// `"Table definition must have at least one column"` if `table_def`
167 /// has no columns.
168 /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
169 /// (COPY is TCP-only).
170 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
171 let column_count = table_def.column_count();
172 if column_count == 0 {
173 return Err(Error::invalid_table_definition(
174 "Table definition must have at least one column",
175 ));
176 }
177
178 // Fail fast: verify the connection supports COPY (TCP only).
179 // The actual COPY session is started lazily on the first data write
180 // to avoid locking the connection into COPY mode prematurely.
181 if connection.tcp_client().is_none() {
182 return Err(Error::feature_not_supported(
183 "ArrowInserter requires a TCP connection. \
184 gRPC connections do not support COPY operations.",
185 ));
186 }
187
188 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
189 let table_name = table_def.qualified_name();
190
191 Ok(ArrowInserter {
192 connection,
193 table_name,
194 columns,
195 writer: None,
196 schema_sent: false,
197 total_bytes: 0,
198 chunk_count: 0,
199 start_time: Instant::now(),
200 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
201 buffered_bytes: 0,
202 batch_ipc_writer: None,
203 insert_mode: None,
204 })
205 }
206
207 /// Creates an Arrow inserter by querying the table schema from the database.
208 ///
209 /// This method queries the database to get the table definition automatically,
210 /// which is useful when you want to insert into an existing table without
211 /// manually specifying the schema.
212 ///
213 /// # Arguments
214 ///
215 /// * `connection` - The database connection.
216 /// * `table_name` - The table name (can include database and schema qualifiers).
217 ///
218 /// # Errors
219 ///
220 /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
221 ///
222 /// # Example
223 ///
224 /// ```no_run
225 /// # use hyperdb_api::{ArrowInserter, Connection, Result};
226 /// # fn example(conn: &Connection) -> Result<()> {
227 /// let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;
228 /// # Ok(())
229 /// # }
230 /// ```
231 pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
232 where
233 T: TryInto<crate::TableName>,
234 crate::Error: From<T::Error>,
235 {
236 let catalog = Catalog::new(connection);
237 let table_def = catalog.get_table_definition(table_name)?;
238 Self::new(connection, &table_def)
239 }
240
241 /// Sets a custom flush threshold in bytes.
242 ///
243 /// Data is buffered until the threshold is reached, then flushed to the server.
244 /// Default is 16 MB (matching `HyperBinary` Inserter).
245 ///
246 /// # Example
247 ///
248 /// ```no_run
249 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
250 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
251 /// let inserter = ArrowInserter::new(&conn, &table_def)?
252 /// .with_flush_threshold(32 * 1024 * 1024); // 32 MB
253 /// # Ok(())
254 /// # }
255 /// ```
256 #[must_use]
257 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
258 self.flush_threshold = threshold;
259 self
260 }
261
262 /// Inserts a complete Arrow IPC stream (schema + record batches).
263 ///
264 /// Use this method for single-chunk inserts or for the first chunk of
265 /// multi-chunk inserts. The Arrow IPC stream must include the schema message
266 /// followed by one or more record batch messages.
267 ///
268 /// # Arguments
269 ///
270 /// * `arrow_ipc_data` - Complete Arrow IPC stream data (schema + record batches).
271 ///
272 /// # Errors
273 ///
274 /// Returns an error if:
275 /// - The connection fails to start COPY
276 /// - Sending data fails
277 /// - Schema was already sent (use `insert_record_batches()` for subsequent chunks)
278 ///
279 /// # Example
280 ///
281 /// ```no_run
282 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
283 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
284 /// # let first_arrow_ipc_stream = vec![];
285 /// # let second_chunk_batches_only = vec![];
286 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
287 ///
288 /// // First chunk with schema
289 /// inserter.insert_data(&first_arrow_ipc_stream)?;
290 ///
291 /// // For subsequent chunks without schema, use insert_record_batches()
292 /// inserter.insert_record_batches(&second_chunk_batches_only)?;
293 ///
294 /// let rows = inserter.execute()?;
295 /// # Ok(())
296 /// # }
297 /// ```
298 pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
299 if arrow_ipc_data.is_empty() {
300 return Ok(());
301 }
302
303 if self.insert_mode == Some(InsertMode::BatchIpc) {
304 return Err(Error::invalid_operation(
305 "Cannot mix insert_data() with insert_batch(). \
306 Use either raw IPC methods (insert_data/insert_record_batches) \
307 or RecordBatch methods (insert_batch), not both.",
308 ));
309 }
310
311 if self.schema_sent {
312 return Err(Error::invalid_operation(
313 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
314 or use insert_data() only once with the complete Arrow IPC stream.",
315 ));
316 }
317
318 self.ensure_writer()?;
319
320 if let Some(ref mut writer) = self.writer {
321 writer.send_direct(arrow_ipc_data)?;
322 }
323 self.buffered_bytes += arrow_ipc_data.len();
324 self.maybe_flush()?;
325
326 self.insert_mode = Some(InsertMode::RawIpc);
327 self.schema_sent = true;
328 self.total_bytes += arrow_ipc_data.len();
329 self.chunk_count += 1;
330
331 debug!(
332 target: "hyperdb_api",
333 chunk = self.chunk_count,
334 bytes = arrow_ipc_data.len(),
335 total_bytes = self.total_bytes,
336 buffered_bytes = self.buffered_bytes,
337 "arrow-inserter-chunk"
338 );
339
340 Ok(())
341 }
342
343 /// Inserts Arrow record batch data without schema.
344 ///
345 /// Use this method for subsequent chunks after the first `insert_data()` call.
346 /// The data should contain only Arrow record batch messages, **not** the schema.
347 ///
348 /// **Important**: Sending schema twice causes an error in Hyper. If you have
349 /// multiple complete Arrow IPC streams (each with schema), you need to strip
350 /// the schema from all but the first one.
351 ///
352 /// # Arguments
353 ///
354 /// * `arrow_batch_data` - Arrow record batch data (no schema message).
355 ///
356 /// # Errors
357 ///
358 /// Returns an error if:
359 /// - No schema has been sent yet (call `insert_data()` first)
360 /// - Sending data fails
361 ///
362 /// # Example
363 ///
364 /// ```no_run
365 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
366 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
367 /// # let first_chunk_with_schema = vec![];
368 /// # let second_chunk_batches = vec![];
369 /// # let third_chunk_batches = vec![];
370 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
371 ///
372 /// // First chunk must include schema
373 /// inserter.insert_data(&first_chunk_with_schema)?;
374 ///
375 /// // Subsequent chunks are record batches only
376 /// inserter.insert_record_batches(&second_chunk_batches)?;
377 /// inserter.insert_record_batches(&third_chunk_batches)?;
378 ///
379 /// let rows = inserter.execute()?;
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
384 if arrow_batch_data.is_empty() {
385 return Ok(());
386 }
387
388 if self.insert_mode == Some(InsertMode::BatchIpc) {
389 return Err(Error::invalid_operation(
390 "Cannot mix insert_record_batches() with insert_batch(). \
391 Use either raw IPC methods (insert_data/insert_record_batches) \
392 or RecordBatch methods (insert_batch), not both.",
393 ));
394 }
395
396 if !self.schema_sent {
397 return Err(Error::invalid_operation(
398 "No Arrow schema has been sent yet. Call insert_data() first with a complete \
399 Arrow IPC stream that includes the schema.",
400 ));
401 }
402
403 if let Some(ref mut writer) = self.writer {
404 writer.send_direct(arrow_batch_data)?;
405 }
406 self.buffered_bytes += arrow_batch_data.len();
407 self.maybe_flush()?;
408
409 self.total_bytes += arrow_batch_data.len();
410 self.chunk_count += 1;
411
412 debug!(
413 target: "hyperdb_api",
414 chunk = self.chunk_count,
415 bytes = arrow_batch_data.len(),
416 total_bytes = self.total_bytes,
417 buffered_bytes = self.buffered_bytes,
418 "arrow-inserter-batch-chunk"
419 );
420
421 Ok(())
422 }
423
424 /// Inserts raw Arrow data without schema tracking.
425 ///
426 /// This is a low-level method that sends data directly without checking
427 /// whether schema has been sent. Use this only if you are managing schema
428 /// handling yourself.
429 ///
430 /// For most use cases, prefer [`insert_data()`](Self::insert_data) and
431 /// [`insert_record_batches()`](Self::insert_record_batches).
432 ///
433 /// # Arguments
434 ///
435 /// * `data` - Raw Arrow IPC data to send.
436 ///
437 /// # Errors
438 ///
439 /// - Returns [`Error::InvalidOperation`] if a previous `insert_batch`
440 /// call already locked the inserter into `RecordBatch` IPC mode —
441 /// raw IPC and `RecordBatch` paths cannot be mixed.
442 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
443 /// session fails to open.
444 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
445 /// the data or the socket write fails.
446 pub fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
447 if data.is_empty() {
448 return Ok(());
449 }
450
451 if self.insert_mode == Some(InsertMode::BatchIpc) {
452 return Err(Error::invalid_operation(
453 "Cannot mix insert_raw() with insert_batch(). \
454 Use either raw IPC methods (insert_data/insert_record_batches/insert_raw) \
455 or RecordBatch methods (insert_batch), not both.",
456 ));
457 }
458
459 self.ensure_writer()?;
460
461 if let Some(ref mut writer) = self.writer {
462 writer.send_direct(data)?;
463 }
464 self.buffered_bytes += data.len();
465 self.maybe_flush()?;
466
467 self.total_bytes += data.len();
468 self.chunk_count += 1;
469
470 Ok(())
471 }
472
473 /// Finishes the insert operation and returns the number of rows inserted.
474 ///
475 /// This method completes the COPY operation and returns the row count
476 /// reported by the server.
477 ///
478 /// # Errors
479 ///
480 /// Returns an error if:
481 /// - No data was sent
482 /// - The COPY operation fails to complete
483 ///
484 /// # Example
485 ///
486 /// ```no_run
487 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
488 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
489 /// # let arrow_data = vec![];
490 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
491 /// inserter.insert_data(&arrow_data)?;
492 /// let rows = inserter.execute()?;
493 /// # Ok(())
494 /// # }
495 /// ```
496 /// println!("Inserted {} rows", rows);
497 pub fn execute(mut self) -> Result<u64> {
498 // Finalize the IPC StreamWriter if insert_batch() was used.
499 // `into_inner()` calls `finish()` internally (writing the EOS marker)
500 // and returns the underlying buffer — a single fallible operation
501 // instead of the previous finish() + into_inner() pair.
502 //
503 // On error, `self` is dropped, and the Drop impl cancels the COPY
504 // writer, so the connection is always left in a clean state.
505 if let Some(ipc) = self.batch_ipc_writer.take() {
506 let buf = ipc.into_inner().map_err(|e| {
507 Error::conversion(format!("Failed to finalize Arrow IPC stream: {e}"))
508 })?;
509 if !buf.is_empty() {
510 if let Some(ref mut writer) = self.writer {
511 writer.send_direct(&buf)?;
512 }
513 }
514 }
515
516 if self.writer.is_none() {
517 // No data was sent
518 return Ok(0);
519 }
520
521 let rows = self
522 .writer
523 .take()
524 .map(hyperdb_api_core::client::CopyInWriter::finish)
525 .transpose()?
526 .unwrap_or(0);
527
528 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
529 info!(
530 target: "hyperdb_api",
531 rows,
532 chunks = self.chunk_count,
533 total_bytes = self.total_bytes,
534 duration_ms,
535 table = %self.table_name,
536 "arrow-inserter-end"
537 );
538
539 Ok(rows)
540 }
541
542 /// Cancels the insert operation.
543 ///
544 /// All data sent so far will be discarded.
545 pub fn cancel(mut self) {
546 if let Some(writer) = self.writer.take() {
547 let _ = writer.cancel("Arrow insert cancelled");
548 }
549 }
550
551 /// Returns whether any data has been sent.
552 #[must_use]
553 pub fn has_data(&self) -> bool {
554 self.chunk_count > 0
555 }
556
557 /// Returns the total bytes sent so far.
558 #[must_use]
559 pub fn total_bytes(&self) -> usize {
560 self.total_bytes
561 }
562
563 /// Returns the number of chunks sent so far.
564 #[must_use]
565 pub fn chunk_count(&self) -> usize {
566 self.chunk_count
567 }
568
569 /// Inserts an Arrow `RecordBatch` directly, streaming it immediately.
570 ///
571 /// Each batch is serialized to Arrow IPC format and sent to the server right
572 /// away — no accumulation in memory. The schema is written automatically on
573 /// the first call; subsequent batches send only record-batch IPC messages.
574 ///
575 /// Memory usage is `O(batch_size)` regardless of how many batches are inserted.
576 ///
577 /// # Arguments
578 ///
579 /// * `batch` - The Arrow `RecordBatch` to insert.
580 ///
581 /// # Example
582 ///
583 /// ```no_run
584 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, SqlType, Catalog, CreateMode, Result};
585 /// # fn example() -> Result<()> {
586 /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
587 /// # let table_def = TableDefinition::new("data")
588 /// # .add_required_column("id", SqlType::int())
589 /// # .add_nullable_column("value", SqlType::double());
590 /// # Catalog::new(&conn).create_table(&table_def)?;
591 /// use arrow::array::{Int32Array, Float64Array};
592 /// use arrow::datatypes::{Schema, Field, DataType};
593 /// use arrow::record_batch::RecordBatch;
594 /// use std::sync::Arc;
595 ///
596 /// let schema = Arc::new(Schema::new(vec![
597 /// Field::new("id", DataType::Int32, false),
598 /// Field::new("value", DataType::Float64, true),
599 /// ]));
600 /// let batch = RecordBatch::try_new(schema, vec![
601 /// Arc::new(Int32Array::from(vec![1, 2, 3])),
602 /// Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
603 /// ]).unwrap();
604 ///
605 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
606 /// inserter.insert_batch(&batch)?;
607 /// let rows = inserter.execute()?;
608 /// # Ok(())
609 /// # }
610 /// ```
611 ///
612 /// # Errors
613 ///
614 /// - Returns [`Error::InvalidOperation`] if a previous raw-IPC call
615 /// locked this inserter into the other mode — raw IPC and
616 /// `RecordBatch` paths cannot be mixed.
617 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
618 /// session cannot be opened.
619 /// - Returns [`Error::Conversion`] wrapping the underlying Arrow IPC
620 /// writer error if the schema or batch cannot be serialized (e.g.
621 /// dictionary misalignment, encoding failure).
622 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
623 /// the data or the socket write fails.
624 ///
625 /// # Panics
626 ///
627 /// Panics internally only if the IPC writer state is corrupted —
628 /// callers cannot trigger this from the public API. The `batch_ipc_writer`
629 /// is consulted via `as_mut().unwrap()` after it has just been set to
630 /// `Some`, so the unwrap is unreachable.
631 pub fn insert_batch(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
632 if self.insert_mode == Some(InsertMode::RawIpc) {
633 return Err(Error::invalid_operation(
634 "Cannot mix insert_batch() with raw IPC methods. \
635 Use either RecordBatch methods (insert_batch) \
636 or raw IPC methods (insert_data/insert_record_batches/insert_raw), not both.",
637 ));
638 }
639
640 self.ensure_writer()?;
641 self.insert_mode = Some(InsertMode::BatchIpc);
642
643 // Create the IPC StreamWriter on first use — this writes the schema message
644 if self.batch_ipc_writer.is_none() {
645 let ipc_writer = StreamWriter::try_new(Vec::new(), &batch.schema()).map_err(|e| {
646 Error::conversion(format!("Failed to create Arrow IPC writer: {e}"))
647 })?;
648 self.batch_ipc_writer = Some(ipc_writer);
649
650 // Drain the schema bytes that StreamWriter wrote during construction
651 self.drain_ipc_buffer()?;
652 self.schema_sent = true;
653 }
654
655 // Write the record batch — this appends batch IPC bytes to the internal Vec
656 self.batch_ipc_writer
657 .as_mut()
658 .expect("IPC writer must exist")
659 .write(batch)
660 .map_err(|e| Error::conversion(format!("Failed to write Arrow batch: {e}")))?;
661
662 // Drain the batch bytes and send them immediately
663 self.drain_ipc_buffer()?;
664 self.chunk_count += 1;
665
666 debug!(
667 target: "hyperdb_api",
668 chunk = self.chunk_count,
669 total_bytes = self.total_bytes,
670 buffered_bytes = self.buffered_bytes,
671 "arrow-inserter-batch"
672 );
673
674 Ok(())
675 }
676
677 /// Drains the internal IPC writer buffer and sends bytes via the COPY writer.
678 ///
679 /// # Safety contract with `StreamWriter`
680 ///
681 /// This method accesses the `StreamWriter`'s underlying `Vec<u8>` via
682 /// `get_mut()` (part of Arrow's public API) and drains it. This is safe
683 /// because `StreamWriter` writes sequentially via `Write::write_all()`
684 /// and does not cache buffer offsets or positions. After draining, the
685 /// Vec is empty but retains its allocation, so subsequent writes append
686 /// from offset 0 without reallocation.
687 fn drain_ipc_buffer(&mut self) -> Result<()> {
688 let ipc = self
689 .batch_ipc_writer
690 .as_mut()
691 .expect("IPC writer must exist");
692 let buf = ipc.get_mut();
693 if buf.is_empty() {
694 return Ok(());
695 }
696
697 // Send the current buffer contents, then clear while preserving the
698 // heap allocation so the next StreamWriter write avoids reallocation.
699 let len = buf.len();
700 if let Some(ref mut writer) = self.writer {
701 writer.send_direct(buf)?;
702 }
703 buf.clear();
704
705 self.buffered_bytes += len;
706 self.total_bytes += len;
707 self.maybe_flush()?;
708
709 Ok(())
710 }
711
712 /// Inserts multiple Arrow `RecordBatch`es, streaming each one immediately.
713 ///
714 /// This is a convenience method that calls [`insert_batch`](Self::insert_batch)
715 /// for each batch in the iterator. Memory usage stays bounded at `O(batch_size)`.
716 ///
717 /// # Example
718 ///
719 /// ```no_run
720 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
721 /// # fn example(conn: &Connection, table_def: &TableDefinition, batches: Vec<arrow::record_batch::RecordBatch>) -> Result<()> {
722 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
723 /// inserter.insert_batches(batches.iter())?;
724 /// let rows = inserter.execute()?;
725 /// # Ok(())
726 /// # }
727 /// ```
728 ///
729 /// # Errors
730 ///
731 /// Returns on the first batch that fails — see
732 /// [`insert_batch`](Self::insert_batch) for the failure modes.
733 pub fn insert_batches<'b>(
734 &mut self,
735 batches: impl IntoIterator<Item = &'b arrow::record_batch::RecordBatch>,
736 ) -> Result<()> {
737 for batch in batches {
738 self.insert_batch(batch)?;
739 }
740 Ok(())
741 }
742
743 /// Ensures the COPY writer is initialized.
744 fn ensure_writer(&mut self) -> Result<()> {
745 if self.writer.is_none() {
746 let client = self.connection.tcp_client().ok_or_else(|| {
747 crate::Error::feature_not_supported(
748 "ArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.",
749 )
750 })?;
751 let columns: Vec<&str> = self
752 .columns
753 .iter()
754 .map(std::string::String::as_str)
755 .collect();
756 let mut writer = client.copy_in_with_format(
757 &self.table_name,
758 &columns,
759 DataFormat::ArrowStream.as_sql_str(),
760 )?;
761 // Pre-allocate buffer to avoid reallocations during bulk insert
762 writer.reserve_buffer(self.flush_threshold + 1024 * 1024);
763 self.writer = Some(writer);
764 }
765 Ok(())
766 }
767
768 /// Flushes the TCP stream if the threshold is reached.
769 ///
770 /// With `send_direct()`, data is written directly to TCP. This periodic
771 /// flush ensures data is pushed to the server for high-latency connections.
772 fn maybe_flush(&mut self) -> Result<()> {
773 if self.buffered_bytes >= self.flush_threshold {
774 if let Some(ref mut writer) = self.writer {
775 writer.flush_stream()?;
776 }
777 debug!(
778 target: "hyperdb_api",
779 flushed_bytes = self.buffered_bytes,
780 threshold = self.flush_threshold,
781 "arrow-inserter-flush"
782 );
783 self.buffered_bytes = 0;
784 }
785 Ok(())
786 }
787}
788
789// Implement Drop to handle cleanup if the inserter is dropped without calling execute()
790impl Drop for ArrowInserter<'_> {
791 fn drop(&mut self) {
792 // If writer exists and we're being dropped without execute(),
793 // cancel the operation to avoid leaving the connection in a bad state.
794 if let Some(writer) = self.writer.take() {
795 let _ = writer.cancel("Arrow inserter dropped without execute");
796 }
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803
804 #[test]
805 fn test_data_format_sql_str() {
806 assert_eq!(DataFormat::ArrowStream.as_sql_str(), "ARROWSTREAM");
807 }
808}