pub struct ArrowInserter<'conn> { /* private fields */ }Expand description
Inserts Arrow IPC stream data into a Hyper table.
Unlike Inserter which builds rows incrementally in HyperBinary
format, ArrowInserter accepts pre-formatted Arrow IPC stream data. This is useful
when integrating with Arrow-based data pipelines or when you already have data in
Arrow format.
§Arrow IPC Stream Format
Arrow IPC streams consist of:
- A schema message (describing column names and types)
- One or more record batch messages (containing the actual data)
The schema must match the target table’s schema.
§Single vs Multiple Chunks
For single-chunk inserts, use insert_data() with a complete
Arrow IPC stream (schema + record batches).
For multiple chunks (large datasets), use:
insert_data()for the first chunk (with schema)insert_record_batches()for subsequent chunks (without schema)
§Example
use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};
fn main() -> Result<()> {
let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
let table_def = TableDefinition::new("data")
.add_required_column("id", SqlType::int())
.add_nullable_column("value", SqlType::double());
Catalog::new(&conn).create_table(&table_def)?;
// Arrow IPC data from external source
let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_data(&arrow_data)?;
let rows = inserter.execute()?;
println!("Inserted {} rows", rows);
Ok(())
}Implementations§
Source§impl<'conn> ArrowInserter<'conn>
impl<'conn> ArrowInserter<'conn>
Sourcepub fn new(
connection: &'conn Connection,
table_def: &TableDefinition,
) -> Result<Self>
pub fn new( connection: &'conn Connection, table_def: &TableDefinition, ) -> Result<Self>
Creates a new Arrow inserter for the given table.
§Arguments
connection- The database connection.table_def- The table definition for the target table.
§Example
let inserter = ArrowInserter::new(&conn, &table_def)?;§Errors
- Returns
Error::Otherwith message"Table definition must have at least one column"iftable_defhas no columns. - Returns
Error::Otherifconnectionis using gRPC transport (COPY is TCP-only).
Sourcepub fn from_table<T>(
connection: &'conn Connection,
table_name: T,
) -> Result<Self>
pub fn from_table<T>( connection: &'conn Connection, table_name: T, ) -> Result<Self>
Creates an Arrow inserter by querying the table schema from the database.
This method queries the database to get the table definition automatically, which is useful when you want to insert into an existing table without manually specifying the schema.
§Arguments
connection- The database connection.table_name- The table name (can include database and schema qualifiers).
§Errors
Returns an error if the table doesn’t exist or if the schema cannot be retrieved.
§Example
let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;Sourcepub fn with_flush_threshold(self, threshold: usize) -> Self
pub fn with_flush_threshold(self, threshold: usize) -> Self
Sets a custom flush threshold in bytes.
Data is buffered until the threshold is reached, then flushed to the server.
Default is 16 MB (matching HyperBinary Inserter).
§Example
let inserter = ArrowInserter::new(&conn, &table_def)?
.with_flush_threshold(32 * 1024 * 1024); // 32 MBSourcepub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()>
pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()>
Inserts a complete Arrow IPC stream (schema + record batches).
Use this method for single-chunk inserts or for the first chunk of multi-chunk inserts. The Arrow IPC stream must include the schema message followed by one or more record batch messages.
§Arguments
arrow_ipc_data- Complete Arrow IPC stream data (schema + record batches).
§Errors
Returns an error if:
- The connection fails to start COPY
- Sending data fails
- Schema was already sent (use
insert_record_batches()for subsequent chunks)
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
// First chunk with schema
inserter.insert_data(&first_arrow_ipc_stream)?;
// For subsequent chunks without schema, use insert_record_batches()
inserter.insert_record_batches(&second_chunk_batches_only)?;
let rows = inserter.execute()?;Sourcepub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()>
pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()>
Inserts Arrow record batch data without schema.
Use this method for subsequent chunks after the first insert_data() call.
The data should contain only Arrow record batch messages, not the schema.
Important: Sending schema twice causes an error in Hyper. If you have multiple complete Arrow IPC streams (each with schema), you need to strip the schema from all but the first one.
§Arguments
arrow_batch_data- Arrow record batch data (no schema message).
§Errors
Returns an error if:
- No schema has been sent yet (call
insert_data()first) - Sending data fails
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
// First chunk must include schema
inserter.insert_data(&first_chunk_with_schema)?;
// Subsequent chunks are record batches only
inserter.insert_record_batches(&second_chunk_batches)?;
inserter.insert_record_batches(&third_chunk_batches)?;
let rows = inserter.execute()?;Sourcepub fn insert_raw(&mut self, data: &[u8]) -> Result<()>
pub fn insert_raw(&mut self, data: &[u8]) -> Result<()>
Inserts raw Arrow data without schema tracking.
This is a low-level method that sends data directly without checking whether schema has been sent. Use this only if you are managing schema handling yourself.
For most use cases, prefer insert_data() and
insert_record_batches().
§Arguments
data- Raw Arrow IPC data to send.
§Errors
- Returns
Error::Otherif a previousinsert_batchcall already locked the inserter intoRecordBatchIPC mode — raw IPC andRecordBatchpaths cannot be mixed. - Returns
Error::Other/Error::Clientif the lazy COPY session fails to open. - Returns
Error::Client/Error::Ioif the server rejects the data or the socket write fails.
Sourcepub fn execute(self) -> Result<u64>
pub fn execute(self) -> Result<u64>
Finishes the insert operation and returns the number of rows inserted.
This method completes the COPY operation and returns the row count reported by the server.
§Errors
Returns an error if:
- No data was sent
- The COPY operation fails to complete
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_data(&arrow_data)?;
let rows = inserter.execute()?;println!(“Inserted {} rows”, rows);
Sourcepub fn total_bytes(&self) -> usize
pub fn total_bytes(&self) -> usize
Returns the total bytes sent so far.
Sourcepub fn chunk_count(&self) -> usize
pub fn chunk_count(&self) -> usize
Returns the number of chunks sent so far.
Sourcepub fn insert_batch(&mut self, batch: &RecordBatch) -> Result<()>
pub fn insert_batch(&mut self, batch: &RecordBatch) -> Result<()>
Inserts an Arrow RecordBatch directly, streaming it immediately.
Each batch is serialized to Arrow IPC format and sent to the server right away — no accumulation in memory. The schema is written automatically on the first call; subsequent batches send only record-batch IPC messages.
Memory usage is O(batch_size) regardless of how many batches are inserted.
§Arguments
batch- The ArrowRecordBatchto insert.
§Example
use arrow::array::{Int32Array, Float64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(schema, vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
]).unwrap();
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_batch(&batch)?;
let rows = inserter.execute()?;§Errors
- Returns
Error::Otherif a previous raw-IPC call locked this inserter into the other mode — raw IPC andRecordBatchpaths cannot be mixed. - Returns
Error::Other/Error::Clientif the lazy COPY session cannot be opened. - Returns
Error::Otherwrapping the underlying Arrow IPC writer error if the schema or batch cannot be serialized (e.g. dictionary misalignment, encoding failure). - Returns
Error::Client/Error::Ioif the server rejects the data or the socket write fails.
§Panics
Panics internally only if the IPC writer state is corrupted —
callers cannot trigger this from the public API. The batch_ipc_writer
is consulted via as_mut().unwrap() after it has just been set to
Some, so the unwrap is unreachable.
Sourcepub fn insert_batches<'b>(
&mut self,
batches: impl IntoIterator<Item = &'b RecordBatch>,
) -> Result<()>
pub fn insert_batches<'b>( &mut self, batches: impl IntoIterator<Item = &'b RecordBatch>, ) -> Result<()>
Inserts multiple Arrow RecordBatches, streaming each one immediately.
This is a convenience method that calls insert_batch
for each batch in the iterator. Memory usage stays bounded at O(batch_size).
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_batches(batches.iter())?;
let rows = inserter.execute()?;§Errors
Returns on the first batch that fails — see
insert_batch for the failure modes.
Trait Implementations§
Source§impl Debug for ArrowInserter<'_>
impl Debug for ArrowInserter<'_>
Source§impl Drop for ArrowInserter<'_>
impl Drop for ArrowInserter<'_>
Auto Trait Implementations§
impl<'conn> Freeze for ArrowInserter<'conn>
impl<'conn> !RefUnwindSafe for ArrowInserter<'conn>
impl<'conn> !Send for ArrowInserter<'conn>
impl<'conn> Sync for ArrowInserter<'conn>
impl<'conn> Unpin for ArrowInserter<'conn>
impl<'conn> UnsafeUnpin for ArrowInserter<'conn>
impl<'conn> !UnwindSafe for ArrowInserter<'conn>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request