Struct arrow_odbc::OdbcWriter
source · pub struct OdbcWriter<S> { /* private fields */ }Expand description
Inserts batches from an [arrow::record_batch::RecordBatchReader] into a database.
Implementations§
source§impl<S> OdbcWriter<S>where
S: AsStatementRef,
impl<S> OdbcWriter<S>where
S: AsStatementRef,
sourcepub fn new(
row_capacity: usize,
schema: &Schema,
statement: Prepared<S>
) -> Result<Self, WriterError>
pub fn new(
row_capacity: usize,
schema: &Schema,
statement: Prepared<S>
) -> Result<Self, WriterError>
Construct a new ODBC writer using an alredy existing prepared statement. Usually you want to
call a higher level constructor like Self::with_connection. Yet, this constructor is
useful in two scenarios.
- The prepared statement is already constructed and you do not want to spend the time to prepare it again.
- You want to use the arrow arrays as arrar parameters for a statement, but that statement is not necessarily an INSERT statement with a simple 1to1 mapping of columns between table and arrow schema.
Parameters
row_capacity: The amount of rows send to the database in each chunk. With the exception of the last chunk, which may be smaller.schema: Schema needs to have one column for each positional parameter of the statement and match the data which will be supplied to the instance later. Otherwise your code will panic.statement: A prepared statement whose SQL text representation contains one placeholder for each column. The order of the placeholers must correspond to the orders of the columns in theschema.
Examples found in repository?
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
pub fn from_connection(
connection: Connection<'env>,
schema: &Schema,
table_name: &str,
row_capacity: usize,
) -> Result<Self, WriterError> {
let sql = insert_statement_from_schema(schema, table_name);
let statement = connection
.into_prepared(&sql)
.map_err(|source| WriterError::PreparingInsertStatement { source, sql })?;
Self::new(row_capacity, schema, statement)
}
}
impl<'o> OdbcWriter<StatementImpl<'o>> {
/// A writer which borrows the connection and inserts the given schema into a table with
/// matching column names.
///
/// **Note:**
///
/// If table or column names are derived from user input, be sure to sanatize the input in order
/// to prevent SQL injection attacks.
pub fn with_connection(
connection: &'o Connection<'o>,
schema: &Schema,
table_name: &str,
row_capacity: usize,
) -> Result<Self, WriterError> {
let sql = insert_statement_from_schema(schema, table_name);
let statement = connection
.prepare(&sql)
.map_err(|source| WriterError::PreparingInsertStatement { source, sql })?;
Self::new(row_capacity, schema, statement)
}sourcepub fn write_all(
&mut self,
reader: impl Iterator<Item = Result<RecordBatch, ArrowError>>
) -> Result<(), WriterError>
pub fn write_all(
&mut self,
reader: impl Iterator<Item = Result<RecordBatch, ArrowError>>
) -> Result<(), WriterError>
Consumes all the batches in the record batch reader and sends them chunk by chunk to the database.
sourcepub fn write_batch(
&mut self,
record_batch: &RecordBatch
) -> Result<(), WriterError>
pub fn write_batch(
&mut self,
record_batch: &RecordBatch
) -> Result<(), WriterError>
Consumes a single batch and sends it chunk by chunk to the database. The last batch may not
be consumed until Self::flush is called.
sourcepub fn flush(&mut self) -> Result<(), WriterError>
pub fn flush(&mut self) -> Result<(), WriterError>
The number of row in an individual record batch must not necessarily match the capacity of the buffers owned by this writer. Therfore sometimes records are not send to the database immediatly but rather we wait for the buffers to be filled then reading the next batch. Once we reach the last batch however, there is no “next batch” anymore. In that case we call this method in order to send the remainder of the records to the database as well.
Examples found in repository?
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
pub fn write_all(
&mut self,
reader: impl Iterator<Item = Result<RecordBatch, ArrowError>>,
) -> Result<(), WriterError> {
for result in reader {
let record_batch = result.map_err(WriterError::ReadingRecordBatch)?;
self.write_batch(&record_batch)?;
}
self.flush()?;
Ok(())
}
/// Consumes a single batch and sends it chunk by chunk to the database. The last batch may not
/// be consumed until [`Self::flush`] is called.
pub fn write_batch(&mut self, record_batch: &RecordBatch) -> Result<(), WriterError> {
let capacity = self.inserter.capacity();
let mut remanining_rows = record_batch.num_rows();
// The record batch may contain more rows than the capacity of our writer can hold. So we
// need to be able to fill the buffers multiple times and send them to the database in
// between.
while remanining_rows != 0 {
let chunk_size = min(capacity - self.inserter.num_rows(), remanining_rows);
let param_offset = self.inserter.num_rows();
self.inserter.set_num_rows(param_offset + chunk_size);
let chunk = record_batch.slice(record_batch.num_rows() - remanining_rows, chunk_size);
for (index, (array, strategy)) in chunk
.columns()
.iter()
.zip(self.strategies.iter())
.enumerate()
{
strategy.write_rows(param_offset, self.inserter.column_mut(index), array)?
}
// If we used up all capacity we send the parameters to the database and reset the
// parameter buffers.
if self.inserter.num_rows() == capacity {
self.flush()?;
}
remanining_rows -= chunk_size;
}
Ok(())
}source§impl<'env> OdbcWriter<StatementConnection<'env>>
impl<'env> OdbcWriter<StatementConnection<'env>>
sourcepub fn from_connection(
connection: Connection<'env>,
schema: &Schema,
table_name: &str,
row_capacity: usize
) -> Result<Self, WriterError>
pub fn from_connection(
connection: Connection<'env>,
schema: &Schema,
table_name: &str,
row_capacity: usize
) -> Result<Self, WriterError>
A writer which takes ownership of the connection and inserts the given schema into a table with matching column names.
Note:
If table or column names are derived from user input, be sure to sanatize the input in order to prevent SQL injection attacks.
source§impl<'o> OdbcWriter<StatementImpl<'o>>
impl<'o> OdbcWriter<StatementImpl<'o>>
sourcepub fn with_connection(
connection: &'o Connection<'o>,
schema: &Schema,
table_name: &str,
row_capacity: usize
) -> Result<Self, WriterError>
pub fn with_connection(
connection: &'o Connection<'o>,
schema: &Schema,
table_name: &str,
row_capacity: usize
) -> Result<Self, WriterError>
A writer which borrows the connection and inserts the given schema into a table with matching column names.
Note:
If table or column names are derived from user input, be sure to sanatize the input in order to prevent SQL injection attacks.