Skip to main content

ArrowInserter

Struct ArrowInserter 

Source
pub struct ArrowInserter<'conn> { /* private fields */ }
Expand description

Inserts Arrow IPC stream data into a Hyper table.

Unlike Inserter which builds rows incrementally in HyperBinary format, ArrowInserter accepts pre-formatted Arrow IPC stream data. This is useful when integrating with Arrow-based data pipelines or when you already have data in Arrow format.

§Arrow IPC Stream Format

Arrow IPC streams consist of:

  1. A schema message (describing column names and types)
  2. One or more record batch messages (containing the actual data)

The schema must match the target table’s schema.

§Single vs Multiple Chunks

For single-chunk inserts, use insert_data() with a complete Arrow IPC stream (schema + record batches).

For multiple chunks (large datasets), use:

  1. insert_data() for the first chunk (with schema)
  2. insert_record_batches() for subsequent chunks (without schema)

§Example

use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};

fn main() -> Result<()> {
    let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;

    let table_def = TableDefinition::new("data")
        .add_required_column("id", SqlType::int())
        .add_nullable_column("value", SqlType::double());

    Catalog::new(&conn).create_table(&table_def)?;

    // Arrow IPC data from external source
    let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here

    let mut inserter = ArrowInserter::new(&conn, &table_def)?;
    inserter.insert_data(&arrow_data)?;
    let rows = inserter.execute()?;
    println!("Inserted {} rows", rows);
    Ok(())
}

Implementations§

Source§

impl<'conn> ArrowInserter<'conn>

Source

pub fn new( connection: &'conn Connection, table_def: &TableDefinition, ) -> Result<Self>

Creates a new Arrow inserter for the given table.

§Arguments
  • connection - The database connection.
  • table_def - The table definition for the target table.
§Example
let inserter = ArrowInserter::new(&conn, &table_def)?;
§Errors
  • Returns Error::Other with message "Table definition must have at least one column" if table_def has no columns.
  • Returns Error::Other if connection is using gRPC transport (COPY is TCP-only).
Source

pub fn from_table<T>( connection: &'conn Connection, table_name: T, ) -> Result<Self>
where T: TryInto<TableName>, Error: From<T::Error>,

Creates an Arrow inserter by querying the table schema from the database.

This method queries the database to get the table definition automatically, which is useful when you want to insert into an existing table without manually specifying the schema.

§Arguments
  • connection - The database connection.
  • table_name - The table name (can include database and schema qualifiers).
§Errors

Returns an error if the table doesn’t exist or if the schema cannot be retrieved.

§Example
let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;
Source

pub fn with_flush_threshold(self, threshold: usize) -> Self

Sets a custom flush threshold in bytes.

Data is buffered until the threshold is reached, then flushed to the server. Default is 16 MB (matching HyperBinary Inserter).

§Example
let inserter = ArrowInserter::new(&conn, &table_def)?
    .with_flush_threshold(32 * 1024 * 1024);  // 32 MB
Source

pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()>

Inserts a complete Arrow IPC stream (schema + record batches).

Use this method for single-chunk inserts or for the first chunk of multi-chunk inserts. The Arrow IPC stream must include the schema message followed by one or more record batch messages.

§Arguments
  • arrow_ipc_data - Complete Arrow IPC stream data (schema + record batches).
§Errors

Returns an error if:

  • The connection fails to start COPY
  • Sending data fails
  • Schema was already sent (use insert_record_batches() for subsequent chunks)
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;

// First chunk with schema
inserter.insert_data(&first_arrow_ipc_stream)?;

// For subsequent chunks without schema, use insert_record_batches()
inserter.insert_record_batches(&second_chunk_batches_only)?;

let rows = inserter.execute()?;
Source

pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()>

Inserts Arrow record batch data without schema.

Use this method for subsequent chunks after the first insert_data() call. The data should contain only Arrow record batch messages, not the schema.

Important: Sending schema twice causes an error in Hyper. If you have multiple complete Arrow IPC streams (each with schema), you need to strip the schema from all but the first one.

§Arguments
  • arrow_batch_data - Arrow record batch data (no schema message).
§Errors

Returns an error if:

  • No schema has been sent yet (call insert_data() first)
  • Sending data fails
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;

// First chunk must include schema
inserter.insert_data(&first_chunk_with_schema)?;

// Subsequent chunks are record batches only
inserter.insert_record_batches(&second_chunk_batches)?;
inserter.insert_record_batches(&third_chunk_batches)?;

let rows = inserter.execute()?;
Source

pub fn insert_raw(&mut self, data: &[u8]) -> Result<()>

Inserts raw Arrow data without schema tracking.

This is a low-level method that sends data directly without checking whether schema has been sent. Use this only if you are managing schema handling yourself.

For most use cases, prefer insert_data() and insert_record_batches().

§Arguments
  • data - Raw Arrow IPC data to send.
§Errors
  • Returns Error::Other if a previous insert_batch call already locked the inserter into RecordBatch IPC mode — raw IPC and RecordBatch paths cannot be mixed.
  • Returns Error::Other / Error::Client if the lazy COPY session fails to open.
  • Returns Error::Client / Error::Io if the server rejects the data or the socket write fails.
Source

pub fn execute(self) -> Result<u64>

Finishes the insert operation and returns the number of rows inserted.

This method completes the COPY operation and returns the row count reported by the server.

§Errors

Returns an error if:

  • No data was sent
  • The COPY operation fails to complete
§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_data(&arrow_data)?;
let rows = inserter.execute()?;

println!(“Inserted {} rows”, rows);

Source

pub fn cancel(self)

Cancels the insert operation.

All data sent so far will be discarded.

Source

pub fn has_data(&self) -> bool

Returns whether any data has been sent.

Source

pub fn total_bytes(&self) -> usize

Returns the total bytes sent so far.

Source

pub fn chunk_count(&self) -> usize

Returns the number of chunks sent so far.

Source

pub fn insert_batch(&mut self, batch: &RecordBatch) -> Result<()>

Inserts an Arrow RecordBatch directly, streaming it immediately.

Each batch is serialized to Arrow IPC format and sent to the server right away — no accumulation in memory. The schema is written automatically on the first call; subsequent batches send only record-batch IPC messages.

Memory usage is O(batch_size) regardless of how many batches are inserted.

§Arguments
  • batch - The Arrow RecordBatch to insert.
§Example
use arrow::array::{Int32Array, Float64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;

let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int32, false),
    Field::new("value", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(schema, vec![
    Arc::new(Int32Array::from(vec![1, 2, 3])),
    Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
]).unwrap();

let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_batch(&batch)?;
let rows = inserter.execute()?;
§Errors
  • Returns Error::Other if a previous raw-IPC call locked this inserter into the other mode — raw IPC and RecordBatch paths cannot be mixed.
  • Returns Error::Other / Error::Client if the lazy COPY session cannot be opened.
  • Returns Error::Other wrapping the underlying Arrow IPC writer error if the schema or batch cannot be serialized (e.g. dictionary misalignment, encoding failure).
  • Returns Error::Client / Error::Io if the server rejects the data or the socket write fails.
§Panics

Panics internally only if the IPC writer state is corrupted — callers cannot trigger this from the public API. The batch_ipc_writer is consulted via as_mut().unwrap() after it has just been set to Some, so the unwrap is unreachable.

Source

pub fn insert_batches<'b>( &mut self, batches: impl IntoIterator<Item = &'b RecordBatch>, ) -> Result<()>

Inserts multiple Arrow RecordBatches, streaming each one immediately.

This is a convenience method that calls insert_batch for each batch in the iterator. Memory usage stays bounded at O(batch_size).

§Example
let mut inserter = ArrowInserter::new(&conn, &table_def)?;
inserter.insert_batches(batches.iter())?;
let rows = inserter.execute()?;
§Errors

Returns on the first batch that fails — see insert_batch for the failure modes.

Trait Implementations§

Source§

impl Debug for ArrowInserter<'_>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Drop for ArrowInserter<'_>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

§

impl<'conn> Freeze for ArrowInserter<'conn>

§

impl<'conn> !RefUnwindSafe for ArrowInserter<'conn>

§

impl<'conn> !Send for ArrowInserter<'conn>

§

impl<'conn> Sync for ArrowInserter<'conn>

§

impl<'conn> Unpin for ArrowInserter<'conn>

§

impl<'conn> UnsafeUnpin for ArrowInserter<'conn>

§

impl<'conn> !UnwindSafe for ArrowInserter<'conn>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more