Struct arrow_odbc::OdbcWriter

source ·
pub struct OdbcWriter<S> { /* private fields */ }
Expand description

Inserts batches from an [arrow::record_batch::RecordBatchReader] into a database.

Implementations§

Construct a new ODBC writer using an alredy existing prepared statement. Usually you want to call a higher level constructor like Self::with_connection. Yet, this constructor is useful in two scenarios.

  1. The prepared statement is already constructed and you do not want to spend the time to prepare it again.
  2. You want to use the arrow arrays as arrar parameters for a statement, but that statement is not necessarily an INSERT statement with a simple 1to1 mapping of columns between table and arrow schema.
Parameters
  • row_capacity: The amount of rows send to the database in each chunk. With the exception of the last chunk, which may be smaller.
  • schema: Schema needs to have one column for each positional parameter of the statement and match the data which will be supplied to the instance later. Otherwise your code will panic.
  • statement: A prepared statement whose SQL text representation contains one placeholder for each column. The order of the placeholers must correspond to the orders of the columns in the schema.
Examples found in repository?
src/odbc_writer.rs (line 267)
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
    pub fn from_connection(
        connection: Connection<'env>,
        schema: &Schema,
        table_name: &str,
        row_capacity: usize,
    ) -> Result<Self, WriterError> {
        let sql = insert_statement_from_schema(schema, table_name);
        let statement = connection
            .into_prepared(&sql)
            .map_err(|source| WriterError::PreparingInsertStatement { source, sql })?;
        Self::new(row_capacity, schema, statement)
    }
}

impl<'o> OdbcWriter<StatementImpl<'o>> {
    /// A writer which borrows the connection and inserts the given schema into a table with
    /// matching column names.
    ///
    /// **Note:**
    ///
    /// If table or column names are derived from user input, be sure to sanatize the input in order
    /// to prevent SQL injection attacks.
    pub fn with_connection(
        connection: &'o Connection<'o>,
        schema: &Schema,
        table_name: &str,
        row_capacity: usize,
    ) -> Result<Self, WriterError> {
        let sql = insert_statement_from_schema(schema, table_name);
        let statement = connection
            .prepare(&sql)
            .map_err(|source| WriterError::PreparingInsertStatement { source, sql })?;
        Self::new(row_capacity, schema, statement)
    }

Consumes all the batches in the record batch reader and sends them chunk by chunk to the database.

Examples found in repository?
src/odbc_writer.rs (line 56)
47
48
49
50
51
52
53
54
55
56
57
pub fn insert_into_table(
    connection: &Connection,
    batches: &mut impl RecordBatchReader,
    table_name: &str,
    batch_size: usize,
) -> Result<(), WriterError> {
    let schema = batches.schema();
    let mut inserter =
        OdbcWriter::with_connection(connection, schema.as_ref(), table_name, batch_size)?;
    inserter.write_all(batches)
}

Consumes a single batch and sends it chunk by chunk to the database. The last batch may not be consumed until Self::flush is called.

Examples found in repository?
src/odbc_writer.rs (line 196)
190
191
192
193
194
195
196
197
198
199
200
    pub fn write_all(
        &mut self,
        reader: impl Iterator<Item = Result<RecordBatch, ArrowError>>,
    ) -> Result<(), WriterError> {
        for result in reader {
            let record_batch = result.map_err(WriterError::ReadingRecordBatch)?;
            self.write_batch(&record_batch)?;
        }
        self.flush()?;
        Ok(())
    }

The number of row in an individual record batch must not necessarily match the capacity of the buffers owned by this writer. Therfore sometimes records are not send to the database immediatly but rather we wait for the buffers to be filled then reading the next batch. Once we reach the last batch however, there is no “next batch” anymore. In that case we call this method in order to send the remainder of the records to the database as well.

Examples found in repository?
src/odbc_writer.rs (line 198)
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
    pub fn write_all(
        &mut self,
        reader: impl Iterator<Item = Result<RecordBatch, ArrowError>>,
    ) -> Result<(), WriterError> {
        for result in reader {
            let record_batch = result.map_err(WriterError::ReadingRecordBatch)?;
            self.write_batch(&record_batch)?;
        }
        self.flush()?;
        Ok(())
    }

    /// Consumes a single batch and sends it chunk by chunk to the database. The last batch may not
    /// be consumed until [`Self::flush`] is called.
    pub fn write_batch(&mut self, record_batch: &RecordBatch) -> Result<(), WriterError> {
        let capacity = self.inserter.capacity();
        let mut remanining_rows = record_batch.num_rows();
        // The record batch may contain more rows than the capacity of our writer can hold. So we
        // need to be able to fill the buffers multiple times and send them to the database in
        // between.
        while remanining_rows != 0 {
            let chunk_size = min(capacity - self.inserter.num_rows(), remanining_rows);
            let param_offset = self.inserter.num_rows();
            self.inserter.set_num_rows(param_offset + chunk_size);
            let chunk = record_batch.slice(record_batch.num_rows() - remanining_rows, chunk_size);
            for (index, (array, strategy)) in chunk
                .columns()
                .iter()
                .zip(self.strategies.iter())
                .enumerate()
            {
                strategy.write_rows(param_offset, self.inserter.column_mut(index), array)?
            }

            // If we used up all capacity we send the parameters to the database and reset the
            // parameter buffers.
            if self.inserter.num_rows() == capacity {
                self.flush()?;
            }
            remanining_rows -= chunk_size;
        }

        Ok(())
    }

A writer which takes ownership of the connection and inserts the given schema into a table with matching column names.

Note:

If table or column names are derived from user input, be sure to sanatize the input in order to prevent SQL injection attacks.

A writer which borrows the connection and inserts the given schema into a table with matching column names.

Note:

If table or column names are derived from user input, be sure to sanatize the input in order to prevent SQL injection attacks.

Examples found in repository?
src/odbc_writer.rs (line 55)
47
48
49
50
51
52
53
54
55
56
57
pub fn insert_into_table(
    connection: &Connection,
    batches: &mut impl RecordBatchReader,
    table_name: &str,
    batch_size: usize,
) -> Result<(), WriterError> {
    let schema = batches.schema();
    let mut inserter =
        OdbcWriter::with_connection(connection, schema.as_ref(), table_name, batch_size)?;
    inserter.write_all(batches)
}

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.