multisql/databases/odbc/
mutable.rs

1use {
2	super::{base::convert_table_name, ColumnSet},
3	crate::{DBBase, DBMut, ODBCDatabase, Result, Row, Value},
4	async_trait::async_trait,
5	odbc_api::buffers::TextRowSet,
6};
7
8const BATCH_SIZE: usize = 4096;
9
10#[async_trait(?Send)]
11impl DBMut for ODBCDatabase {
12	async fn insert_data(&mut self, table_name: &str, rows: Vec<Row>) -> Result<()> {
13		self.insert(table_name, rows.to_vec()).await?;
14		Ok(())
15	}
16}
17
18impl ODBCDatabase {
19	async fn insert(&mut self, table_name: &str, rows: Vec<Row>) -> Result<()> {
20		let connection = self
21			.environment
22			.connect_with_connection_string(&self.connection_string)?;
23		let schema = self.fetch_schema(&table_name).await?.unwrap();
24		let table_name = convert_table_name(table_name);
25		let columns = schema
26			.column_defs
27			.iter()
28			.map(|col_def| col_def.name.as_str())
29			.collect::<Vec<&str>>();
30
31		let rows: Vec<Vec<Value>> = rows.into_iter().map(|Row(row)| row).collect();
32
33		connection.set_autocommit(false)?;
34		for rows in rows.chunks(BATCH_SIZE) {
35			let column_set = ColumnSet::new(rows.to_vec(), BATCH_SIZE);
36			let query = column_set.query(&table_name, &columns);
37
38			let mut statement = connection.prepare(&query)?;
39			//let buffer: ColumnarBuffer<AnyColumnBuffer> = column_set.try_into()?;
40			let buffer: TextRowSet = column_set.try_into()?;
41
42			statement.execute(&buffer)?;
43		}
44		connection.commit()?;
45		Ok(())
46	}
47}