hyperdb_api/arrow_inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`ArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables.
8//!
9//! Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
10//! format, `ArrowInserter` accepts complete Arrow IPC stream data, making it ideal
11//! for integration with Arrow-based data pipelines.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
17//! # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
18//! # fn get_arrow_data() -> Vec<u8> { vec![] }
19//! use hyperdb_api::{ArrowInserter, Connection, TableDefinition};
20//!
21//! // Arrow IPC data from external source (e.g., arrow crate, Parquet reader)
22//! let arrow_ipc_data: Vec<u8> = get_arrow_data();
23//!
24//! let mut inserter = ArrowInserter::new(&conn, &table_def)?;
25//! inserter.insert_data(&arrow_ipc_data)?;
26//! let rows = inserter.execute()?;
27//! # Ok(())
28//! # }
29//! ```
30
31use std::time::Instant;
32
33use arrow::ipc::writer::StreamWriter;
34use hyperdb_api_core::client::client::CopyInWriter;
35use tracing::{debug, info};
36
37/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
38const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
39
40use crate::catalog::Catalog;
41use crate::connection::Connection;
42use crate::data_format::DataFormat;
43use crate::error::{Error, Result};
44use crate::table_definition::TableDefinition;
45
46/// Tracks which insertion pathway is active, preventing unsafe mixing of
47/// raw IPC methods (`insert_data`/`insert_record_batches`/`insert_raw`) with
48/// the `RecordBatch`-based method (`insert_batch`).
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50enum InsertMode {
51 /// Raw IPC bytes sent via `insert_data` / `insert_record_batches` / `insert_raw`.
52 RawIpc,
53 /// `RecordBatch` objects serialized through a `StreamWriter` via `insert_batch`.
54 BatchIpc,
55}
56
57/// Inserts Arrow IPC stream data into a Hyper table.
58///
59/// Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
60/// format, `ArrowInserter` accepts pre-formatted Arrow IPC stream data. This is useful
61/// when integrating with Arrow-based data pipelines or when you already have data in
62/// Arrow format.
63///
64/// # Arrow IPC Stream Format
65///
66/// Arrow IPC streams consist of:
67/// 1. A schema message (describing column names and types)
68/// 2. One or more record batch messages (containing the actual data)
69///
70/// The schema must match the target table's schema.
71///
72/// # Single vs Multiple Chunks
73///
74/// For single-chunk inserts, use [`insert_data()`](Self::insert_data) with a complete
75/// Arrow IPC stream (schema + record batches).
76///
77/// For multiple chunks (large datasets), use:
78/// 1. [`insert_data()`](Self::insert_data) for the first chunk (with schema)
79/// 2. [`insert_record_batches()`](Self::insert_record_batches) for subsequent chunks (without schema)
80///
81/// # Example
82///
83/// ```no_run
84/// use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};
85///
86/// fn main() -> Result<()> {
87/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
88///
89/// let table_def = TableDefinition::new("data")
90/// .add_required_column("id", SqlType::int())
91/// .add_nullable_column("value", SqlType::double());
92///
93/// Catalog::new(&conn).create_table(&table_def)?;
94///
95/// // Arrow IPC data from external source
96/// let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
97///
98/// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
99/// inserter.insert_data(&arrow_data)?;
100/// let rows = inserter.execute()?;
101/// println!("Inserted {} rows", rows);
102/// Ok(())
103/// }
104/// ```
105pub struct ArrowInserter<'conn> {
106 connection: &'conn Connection,
107 table_name: String,
108 columns: Vec<String>,
109 writer: Option<CopyInWriter<'conn>>,
110 /// Tracks whether an Arrow schema has been sent.
111 /// Sending schema twice causes an error in Hyper.
112 schema_sent: bool,
113 /// Total bytes sent (for logging).
114 total_bytes: usize,
115 /// Number of chunks sent.
116 chunk_count: usize,
117 /// Start time for timing the insert operation.
118 start_time: Instant,
119 /// Flush threshold in bytes. Data is buffered until this threshold is reached.
120 flush_threshold: usize,
121 /// Bytes buffered since the last flush.
122 buffered_bytes: usize,
123 /// Persistent IPC `StreamWriter` for `insert_batch()`. Streams each `RecordBatch`
124 /// eagerly — the schema is written on first use, then each batch is serialized
125 /// and sent immediately via `send_direct()`. The internal `Vec<u8>` buffer is
126 /// drained after every write to keep memory usage bounded at `O(batch_size)`.
127 batch_ipc_writer: Option<StreamWriter<Vec<u8>>>,
128 /// Tracks which insertion pathway is active. `None` means no data has been
129 /// sent yet. Once set, mixing pathways is an error.
130 insert_mode: Option<InsertMode>,
131}
132
133impl std::fmt::Debug for ArrowInserter<'_> {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.debug_struct("ArrowInserter")
136 .field("table_name", &self.table_name)
137 .field("columns", &self.columns)
138 .field("schema_sent", &self.schema_sent)
139 .field("chunk_count", &self.chunk_count)
140 .field("total_bytes", &self.total_bytes)
141 .finish_non_exhaustive()
142 }
143}
144
145impl<'conn> ArrowInserter<'conn> {
146 /// Creates a new Arrow inserter for the given table.
147 ///
148 /// # Arguments
149 ///
150 /// * `connection` - The database connection.
151 /// * `table_def` - The table definition for the target table.
152 ///
153 /// # Example
154 ///
155 /// ```no_run
156 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
157 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
158 /// let inserter = ArrowInserter::new(&conn, &table_def)?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// # Errors
164 ///
165 /// - Returns [`Error::Other`] with message
166 /// `"Table definition must have at least one column"` if `table_def`
167 /// has no columns.
168 /// - Returns [`Error::Other`] if `connection` is using gRPC transport
169 /// (COPY is TCP-only).
170 pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
171 let column_count = table_def.column_count();
172 if column_count == 0 {
173 return Err(Error::new("Table definition must have at least one column"));
174 }
175
176 // Fail fast: verify the connection supports COPY (TCP only).
177 // The actual COPY session is started lazily on the first data write
178 // to avoid locking the connection into COPY mode prematurely.
179 if connection.tcp_client().is_none() {
180 return Err(Error::new(
181 "ArrowInserter requires a TCP connection. \
182 gRPC connections do not support COPY operations.",
183 ));
184 }
185
186 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
187 let table_name = table_def.qualified_name();
188
189 Ok(ArrowInserter {
190 connection,
191 table_name,
192 columns,
193 writer: None,
194 schema_sent: false,
195 total_bytes: 0,
196 chunk_count: 0,
197 start_time: Instant::now(),
198 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
199 buffered_bytes: 0,
200 batch_ipc_writer: None,
201 insert_mode: None,
202 })
203 }
204
205 /// Creates an Arrow inserter by querying the table schema from the database.
206 ///
207 /// This method queries the database to get the table definition automatically,
208 /// which is useful when you want to insert into an existing table without
209 /// manually specifying the schema.
210 ///
211 /// # Arguments
212 ///
213 /// * `connection` - The database connection.
214 /// * `table_name` - The table name (can include database and schema qualifiers).
215 ///
216 /// # Errors
217 ///
218 /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
219 ///
220 /// # Example
221 ///
222 /// ```no_run
223 /// # use hyperdb_api::{ArrowInserter, Connection, Result};
224 /// # fn example(conn: &Connection) -> Result<()> {
225 /// let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;
226 /// # Ok(())
227 /// # }
228 /// ```
229 pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
230 where
231 T: TryInto<crate::TableName>,
232 crate::Error: From<T::Error>,
233 {
234 let catalog = Catalog::new(connection);
235 let table_def = catalog.get_table_definition(table_name)?;
236 Self::new(connection, &table_def)
237 }
238
239 /// Sets a custom flush threshold in bytes.
240 ///
241 /// Data is buffered until the threshold is reached, then flushed to the server.
242 /// Default is 16 MB (matching `HyperBinary` Inserter).
243 ///
244 /// # Example
245 ///
246 /// ```no_run
247 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
248 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
249 /// let inserter = ArrowInserter::new(&conn, &table_def)?
250 /// .with_flush_threshold(32 * 1024 * 1024); // 32 MB
251 /// # Ok(())
252 /// # }
253 /// ```
254 #[must_use]
255 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
256 self.flush_threshold = threshold;
257 self
258 }
259
260 /// Inserts a complete Arrow IPC stream (schema + record batches).
261 ///
262 /// Use this method for single-chunk inserts or for the first chunk of
263 /// multi-chunk inserts. The Arrow IPC stream must include the schema message
264 /// followed by one or more record batch messages.
265 ///
266 /// # Arguments
267 ///
268 /// * `arrow_ipc_data` - Complete Arrow IPC stream data (schema + record batches).
269 ///
270 /// # Errors
271 ///
272 /// Returns an error if:
273 /// - The connection fails to start COPY
274 /// - Sending data fails
275 /// - Schema was already sent (use `insert_record_batches()` for subsequent chunks)
276 ///
277 /// # Example
278 ///
279 /// ```no_run
280 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
281 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
282 /// # let first_arrow_ipc_stream = vec![];
283 /// # let second_chunk_batches_only = vec![];
284 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
285 ///
286 /// // First chunk with schema
287 /// inserter.insert_data(&first_arrow_ipc_stream)?;
288 ///
289 /// // For subsequent chunks without schema, use insert_record_batches()
290 /// inserter.insert_record_batches(&second_chunk_batches_only)?;
291 ///
292 /// let rows = inserter.execute()?;
293 /// # Ok(())
294 /// # }
295 /// ```
296 pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
297 if arrow_ipc_data.is_empty() {
298 return Ok(());
299 }
300
301 if self.insert_mode == Some(InsertMode::BatchIpc) {
302 return Err(Error::new(
303 "Cannot mix insert_data() with insert_batch(). \
304 Use either raw IPC methods (insert_data/insert_record_batches) \
305 or RecordBatch methods (insert_batch), not both.",
306 ));
307 }
308
309 if self.schema_sent {
310 return Err(Error::new(
311 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
312 or use insert_data() only once with the complete Arrow IPC stream.",
313 ));
314 }
315
316 self.ensure_writer()?;
317
318 if let Some(ref mut writer) = self.writer {
319 writer.send_direct(arrow_ipc_data)?;
320 }
321 self.buffered_bytes += arrow_ipc_data.len();
322 self.maybe_flush()?;
323
324 self.insert_mode = Some(InsertMode::RawIpc);
325 self.schema_sent = true;
326 self.total_bytes += arrow_ipc_data.len();
327 self.chunk_count += 1;
328
329 debug!(
330 target: "hyperdb_api",
331 chunk = self.chunk_count,
332 bytes = arrow_ipc_data.len(),
333 total_bytes = self.total_bytes,
334 buffered_bytes = self.buffered_bytes,
335 "arrow-inserter-chunk"
336 );
337
338 Ok(())
339 }
340
341 /// Inserts Arrow record batch data without schema.
342 ///
343 /// Use this method for subsequent chunks after the first `insert_data()` call.
344 /// The data should contain only Arrow record batch messages, **not** the schema.
345 ///
346 /// **Important**: Sending schema twice causes an error in Hyper. If you have
347 /// multiple complete Arrow IPC streams (each with schema), you need to strip
348 /// the schema from all but the first one.
349 ///
350 /// # Arguments
351 ///
352 /// * `arrow_batch_data` - Arrow record batch data (no schema message).
353 ///
354 /// # Errors
355 ///
356 /// Returns an error if:
357 /// - No schema has been sent yet (call `insert_data()` first)
358 /// - Sending data fails
359 ///
360 /// # Example
361 ///
362 /// ```no_run
363 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
364 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
365 /// # let first_chunk_with_schema = vec![];
366 /// # let second_chunk_batches = vec![];
367 /// # let third_chunk_batches = vec![];
368 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
369 ///
370 /// // First chunk must include schema
371 /// inserter.insert_data(&first_chunk_with_schema)?;
372 ///
373 /// // Subsequent chunks are record batches only
374 /// inserter.insert_record_batches(&second_chunk_batches)?;
375 /// inserter.insert_record_batches(&third_chunk_batches)?;
376 ///
377 /// let rows = inserter.execute()?;
378 /// # Ok(())
379 /// # }
380 /// ```
381 pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
382 if arrow_batch_data.is_empty() {
383 return Ok(());
384 }
385
386 if self.insert_mode == Some(InsertMode::BatchIpc) {
387 return Err(Error::new(
388 "Cannot mix insert_record_batches() with insert_batch(). \
389 Use either raw IPC methods (insert_data/insert_record_batches) \
390 or RecordBatch methods (insert_batch), not both.",
391 ));
392 }
393
394 if !self.schema_sent {
395 return Err(Error::new(
396 "No Arrow schema has been sent yet. Call insert_data() first with a complete \
397 Arrow IPC stream that includes the schema.",
398 ));
399 }
400
401 if let Some(ref mut writer) = self.writer {
402 writer.send_direct(arrow_batch_data)?;
403 }
404 self.buffered_bytes += arrow_batch_data.len();
405 self.maybe_flush()?;
406
407 self.total_bytes += arrow_batch_data.len();
408 self.chunk_count += 1;
409
410 debug!(
411 target: "hyperdb_api",
412 chunk = self.chunk_count,
413 bytes = arrow_batch_data.len(),
414 total_bytes = self.total_bytes,
415 buffered_bytes = self.buffered_bytes,
416 "arrow-inserter-batch-chunk"
417 );
418
419 Ok(())
420 }
421
422 /// Inserts raw Arrow data without schema tracking.
423 ///
424 /// This is a low-level method that sends data directly without checking
425 /// whether schema has been sent. Use this only if you are managing schema
426 /// handling yourself.
427 ///
428 /// For most use cases, prefer [`insert_data()`](Self::insert_data) and
429 /// [`insert_record_batches()`](Self::insert_record_batches).
430 ///
431 /// # Arguments
432 ///
433 /// * `data` - Raw Arrow IPC data to send.
434 ///
435 /// # Errors
436 ///
437 /// - Returns [`Error::Other`] if a previous `insert_batch` call already
438 /// locked the inserter into `RecordBatch` IPC mode — raw IPC and
439 /// `RecordBatch` paths cannot be mixed.
440 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
441 /// session fails to open.
442 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
443 /// the data or the socket write fails.
444 pub fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
445 if data.is_empty() {
446 return Ok(());
447 }
448
449 if self.insert_mode == Some(InsertMode::BatchIpc) {
450 return Err(Error::new(
451 "Cannot mix insert_raw() with insert_batch(). \
452 Use either raw IPC methods (insert_data/insert_record_batches/insert_raw) \
453 or RecordBatch methods (insert_batch), not both.",
454 ));
455 }
456
457 self.ensure_writer()?;
458
459 if let Some(ref mut writer) = self.writer {
460 writer.send_direct(data)?;
461 }
462 self.buffered_bytes += data.len();
463 self.maybe_flush()?;
464
465 self.total_bytes += data.len();
466 self.chunk_count += 1;
467
468 Ok(())
469 }
470
471 /// Finishes the insert operation and returns the number of rows inserted.
472 ///
473 /// This method completes the COPY operation and returns the row count
474 /// reported by the server.
475 ///
476 /// # Errors
477 ///
478 /// Returns an error if:
479 /// - No data was sent
480 /// - The COPY operation fails to complete
481 ///
482 /// # Example
483 ///
484 /// ```no_run
485 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
486 /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
487 /// # let arrow_data = vec![];
488 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
489 /// inserter.insert_data(&arrow_data)?;
490 /// let rows = inserter.execute()?;
491 /// # Ok(())
492 /// # }
493 /// ```
494 /// println!("Inserted {} rows", rows);
495 pub fn execute(mut self) -> Result<u64> {
496 // Finalize the IPC StreamWriter if insert_batch() was used.
497 // `into_inner()` calls `finish()` internally (writing the EOS marker)
498 // and returns the underlying buffer — a single fallible operation
499 // instead of the previous finish() + into_inner() pair.
500 //
501 // On error, `self` is dropped, and the Drop impl cancels the COPY
502 // writer, so the connection is always left in a clean state.
503 if let Some(ipc) = self.batch_ipc_writer.take() {
504 let buf = ipc
505 .into_inner()
506 .map_err(|e| Error::new(format!("Failed to finalize Arrow IPC stream: {e}")))?;
507 if !buf.is_empty() {
508 if let Some(ref mut writer) = self.writer {
509 writer.send_direct(&buf)?;
510 }
511 }
512 }
513
514 if self.writer.is_none() {
515 // No data was sent
516 return Ok(0);
517 }
518
519 let rows = self
520 .writer
521 .take()
522 .map(hyperdb_api_core::client::CopyInWriter::finish)
523 .transpose()?
524 .unwrap_or(0);
525
526 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
527 info!(
528 target: "hyperdb_api",
529 rows,
530 chunks = self.chunk_count,
531 total_bytes = self.total_bytes,
532 duration_ms,
533 table = %self.table_name,
534 "arrow-inserter-end"
535 );
536
537 Ok(rows)
538 }
539
540 /// Cancels the insert operation.
541 ///
542 /// All data sent so far will be discarded.
543 pub fn cancel(mut self) {
544 if let Some(writer) = self.writer.take() {
545 let _ = writer.cancel("Arrow insert cancelled");
546 }
547 }
548
549 /// Returns whether any data has been sent.
550 #[must_use]
551 pub fn has_data(&self) -> bool {
552 self.chunk_count > 0
553 }
554
555 /// Returns the total bytes sent so far.
556 #[must_use]
557 pub fn total_bytes(&self) -> usize {
558 self.total_bytes
559 }
560
561 /// Returns the number of chunks sent so far.
562 #[must_use]
563 pub fn chunk_count(&self) -> usize {
564 self.chunk_count
565 }
566
567 /// Inserts an Arrow `RecordBatch` directly, streaming it immediately.
568 ///
569 /// Each batch is serialized to Arrow IPC format and sent to the server right
570 /// away — no accumulation in memory. The schema is written automatically on
571 /// the first call; subsequent batches send only record-batch IPC messages.
572 ///
573 /// Memory usage is `O(batch_size)` regardless of how many batches are inserted.
574 ///
575 /// # Arguments
576 ///
577 /// * `batch` - The Arrow `RecordBatch` to insert.
578 ///
579 /// # Example
580 ///
581 /// ```no_run
582 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, SqlType, Catalog, CreateMode, Result};
583 /// # fn example() -> Result<()> {
584 /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
585 /// # let table_def = TableDefinition::new("data")
586 /// # .add_required_column("id", SqlType::int())
587 /// # .add_nullable_column("value", SqlType::double());
588 /// # Catalog::new(&conn).create_table(&table_def)?;
589 /// use arrow::array::{Int32Array, Float64Array};
590 /// use arrow::datatypes::{Schema, Field, DataType};
591 /// use arrow::record_batch::RecordBatch;
592 /// use std::sync::Arc;
593 ///
594 /// let schema = Arc::new(Schema::new(vec![
595 /// Field::new("id", DataType::Int32, false),
596 /// Field::new("value", DataType::Float64, true),
597 /// ]));
598 /// let batch = RecordBatch::try_new(schema, vec![
599 /// Arc::new(Int32Array::from(vec![1, 2, 3])),
600 /// Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
601 /// ]).unwrap();
602 ///
603 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
604 /// inserter.insert_batch(&batch)?;
605 /// let rows = inserter.execute()?;
606 /// # Ok(())
607 /// # }
608 /// ```
609 ///
610 /// # Errors
611 ///
612 /// - Returns [`Error::Other`] if a previous raw-IPC call locked this
613 /// inserter into the other mode — raw IPC and `RecordBatch` paths
614 /// cannot be mixed.
615 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
616 /// session cannot be opened.
617 /// - Returns [`Error::Other`] wrapping the underlying Arrow IPC
618 /// writer error if the schema or batch cannot be serialized (e.g.
619 /// dictionary misalignment, encoding failure).
620 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
621 /// the data or the socket write fails.
622 ///
623 /// # Panics
624 ///
625 /// Panics internally only if the IPC writer state is corrupted —
626 /// callers cannot trigger this from the public API. The `batch_ipc_writer`
627 /// is consulted via `as_mut().unwrap()` after it has just been set to
628 /// `Some`, so the unwrap is unreachable.
629 pub fn insert_batch(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
630 if self.insert_mode == Some(InsertMode::RawIpc) {
631 return Err(Error::new(
632 "Cannot mix insert_batch() with raw IPC methods. \
633 Use either RecordBatch methods (insert_batch) \
634 or raw IPC methods (insert_data/insert_record_batches/insert_raw), not both.",
635 ));
636 }
637
638 self.ensure_writer()?;
639 self.insert_mode = Some(InsertMode::BatchIpc);
640
641 // Create the IPC StreamWriter on first use — this writes the schema message
642 if self.batch_ipc_writer.is_none() {
643 let ipc_writer = StreamWriter::try_new(Vec::new(), &batch.schema())
644 .map_err(|e| Error::new(format!("Failed to create Arrow IPC writer: {e}")))?;
645 self.batch_ipc_writer = Some(ipc_writer);
646
647 // Drain the schema bytes that StreamWriter wrote during construction
648 self.drain_ipc_buffer()?;
649 self.schema_sent = true;
650 }
651
652 // Write the record batch — this appends batch IPC bytes to the internal Vec
653 self.batch_ipc_writer
654 .as_mut()
655 .expect("IPC writer must exist")
656 .write(batch)
657 .map_err(|e| Error::new(format!("Failed to write Arrow batch: {e}")))?;
658
659 // Drain the batch bytes and send them immediately
660 self.drain_ipc_buffer()?;
661 self.chunk_count += 1;
662
663 debug!(
664 target: "hyperdb_api",
665 chunk = self.chunk_count,
666 total_bytes = self.total_bytes,
667 buffered_bytes = self.buffered_bytes,
668 "arrow-inserter-batch"
669 );
670
671 Ok(())
672 }
673
674 /// Drains the internal IPC writer buffer and sends bytes via the COPY writer.
675 ///
676 /// # Safety contract with `StreamWriter`
677 ///
678 /// This method accesses the `StreamWriter`'s underlying `Vec<u8>` via
679 /// `get_mut()` (part of Arrow's public API) and drains it. This is safe
680 /// because `StreamWriter` writes sequentially via `Write::write_all()`
681 /// and does not cache buffer offsets or positions. After draining, the
682 /// Vec is empty but retains its allocation, so subsequent writes append
683 /// from offset 0 without reallocation.
684 fn drain_ipc_buffer(&mut self) -> Result<()> {
685 let ipc = self
686 .batch_ipc_writer
687 .as_mut()
688 .expect("IPC writer must exist");
689 let buf = ipc.get_mut();
690 if buf.is_empty() {
691 return Ok(());
692 }
693
694 // Send the current buffer contents, then clear while preserving the
695 // heap allocation so the next StreamWriter write avoids reallocation.
696 let len = buf.len();
697 if let Some(ref mut writer) = self.writer {
698 writer.send_direct(buf)?;
699 }
700 buf.clear();
701
702 self.buffered_bytes += len;
703 self.total_bytes += len;
704 self.maybe_flush()?;
705
706 Ok(())
707 }
708
709 /// Inserts multiple Arrow `RecordBatch`es, streaming each one immediately.
710 ///
711 /// This is a convenience method that calls [`insert_batch`](Self::insert_batch)
712 /// for each batch in the iterator. Memory usage stays bounded at `O(batch_size)`.
713 ///
714 /// # Example
715 ///
716 /// ```no_run
717 /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
718 /// # fn example(conn: &Connection, table_def: &TableDefinition, batches: Vec<arrow::record_batch::RecordBatch>) -> Result<()> {
719 /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
720 /// inserter.insert_batches(batches.iter())?;
721 /// let rows = inserter.execute()?;
722 /// # Ok(())
723 /// # }
724 /// ```
725 ///
726 /// # Errors
727 ///
728 /// Returns on the first batch that fails — see
729 /// [`insert_batch`](Self::insert_batch) for the failure modes.
730 pub fn insert_batches<'b>(
731 &mut self,
732 batches: impl IntoIterator<Item = &'b arrow::record_batch::RecordBatch>,
733 ) -> Result<()> {
734 for batch in batches {
735 self.insert_batch(batch)?;
736 }
737 Ok(())
738 }
739
740 /// Ensures the COPY writer is initialized.
741 fn ensure_writer(&mut self) -> Result<()> {
742 if self.writer.is_none() {
743 let client = self.connection.tcp_client().ok_or_else(|| {
744 crate::Error::new("ArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.")
745 })?;
746 let columns: Vec<&str> = self
747 .columns
748 .iter()
749 .map(std::string::String::as_str)
750 .collect();
751 let mut writer = client.copy_in_with_format(
752 &self.table_name,
753 &columns,
754 DataFormat::ArrowStream.as_sql_str(),
755 )?;
756 // Pre-allocate buffer to avoid reallocations during bulk insert
757 writer.reserve_buffer(self.flush_threshold + 1024 * 1024);
758 self.writer = Some(writer);
759 }
760 Ok(())
761 }
762
763 /// Flushes the TCP stream if the threshold is reached.
764 ///
765 /// With `send_direct()`, data is written directly to TCP. This periodic
766 /// flush ensures data is pushed to the server for high-latency connections.
767 fn maybe_flush(&mut self) -> Result<()> {
768 if self.buffered_bytes >= self.flush_threshold {
769 if let Some(ref mut writer) = self.writer {
770 writer.flush_stream()?;
771 }
772 debug!(
773 target: "hyperdb_api",
774 flushed_bytes = self.buffered_bytes,
775 threshold = self.flush_threshold,
776 "arrow-inserter-flush"
777 );
778 self.buffered_bytes = 0;
779 }
780 Ok(())
781 }
782}
783
784// Implement Drop to handle cleanup if the inserter is dropped without calling execute()
785impl Drop for ArrowInserter<'_> {
786 fn drop(&mut self) {
787 // If writer exists and we're being dropped without execute(),
788 // cancel the operation to avoid leaving the connection in a bad state.
789 if let Some(writer) = self.writer.take() {
790 let _ = writer.cancel("Arrow inserter dropped without execute");
791 }
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798
799 #[test]
800 fn test_data_format_sql_str() {
801 assert_eq!(DataFormat::ArrowStream.as_sql_str(), "ARROWSTREAM");
802 }
803}