1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use {
	super::base::convert_table_name,
	crate::{Cast, Column, DBBase, DBMut, ODBCDatabase, Result, Row, Value},
	async_trait::async_trait,
	odbc_api::{
		buffers::{AnyColumnBuffer, ColumnarBuffer, TextColumn},
		parameter::InputParameter,
		Bit, IntoParameter,
	},
};

#[async_trait(?Send)]
impl DBMut for ODBCDatabase {
	async fn insert_data(&mut self, table_name: &str, rows: Vec<Row>) -> Result<()> {
		for rows in rows.chunks(255) {
			self.insert(table_name, rows.to_vec()).await?;
		}
		Ok(())
	}
}

impl ODBCDatabase {
	async fn insert(&mut self, table_name: &str, rows: Vec<Row>) -> Result<()> {
		let connection = self
			.environment
			.connect_with_connection_string(&self.connection_string)?;
		let schema = self.fetch_schema(&table_name).await?.unwrap();
		let table_name = convert_table_name(table_name);
		let columns = schema
			.column_defs
			.iter()
			.map(|col_def| col_def.name.as_str())
			.collect::<Vec<&str>>();

		let mut insert_columns: Vec<Vec<Value>> = columns.iter().map(|_| Vec::new()).collect();
		for Row(row) in rows {
			for (index, value) in row.into_iter().enumerate() {
				insert_columns[index].push(value);
			}
		}
		let insert_columns: Vec<(usize, Vec<Value>)> = insert_columns
			.into_iter()
			.enumerate()
			.filter(|(_, column)| column.iter().any(|value| !matches!(value, Value::Null)))
			.collect();

		let columns = insert_columns
			.iter()
			.map(|(index, _)| columns[*index].clone())
			.collect::<Vec<&str>>()
			.join(", ");
		let placeholders = insert_columns
			.iter()
			.map(|_| "?")
			.collect::<Vec<&str>>()
			.join(", ");
		let insert_columns: Vec<(u16, AnyColumnBuffer)> = insert_columns
			.into_iter()
			.map(|(index, column)| {
				(
					index as u16,
					into_buffer(column, schema.column_defs[index].clone()),
				)
			})
			.collect(); // TODO: Handle overflow

		let insert_columns = ColumnarBuffer::new(insert_columns);

		let query = format!(
			"INSERT INTO {table} ({columns}) VALUES ({placeholders})",
			table = table_name,
			columns = columns,
			placeholders = placeholders
		);
		connection.execute(&query, &insert_columns).unwrap();
		Ok(())
	}
}
fn into_buffer(values: Vec<Value>, column_def: Column) -> AnyColumnBuffer {
	use crate::ValueType::*;
	match column_def.data_type {
		Timestamp | U64 | I64 => AnyColumnBuffer::I64(
			values
				.into_iter()
				.map(|value| value.cast().unwrap())
				.collect(),
		),
		F64 => AnyColumnBuffer::F64(
			values
				.into_iter()
				.map(|value| value.cast().unwrap())
				.collect(),
		),
		Str => {
			let mut col = TextColumn::new(255, values.len() * 1000);
			values.into_iter().enumerate().for_each(|(index, value)| {
				let text: String = value.cast().unwrap();
				col.append(index, Some(text.as_bytes()))
			});
			AnyColumnBuffer::Text(col)
		}
		_ => unimplemented!(),
	}
}