#![allow(clippy::unwrap_used, clippy::expect_used, clippy::vec_init_then_push)]
use mssql_client::{BulkColumn, BulkInsert, BulkInsertBuilder, BulkOptions, Client, Config, Error};
use mssql_types::SqlValue;
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt::init();
let host = std::env::var("MSSQL_HOST").unwrap_or_else(|_| "localhost".into());
let database = std::env::var("MSSQL_DATABASE").unwrap_or_else(|_| "master".into());
let user = std::env::var("MSSQL_USER").unwrap_or_else(|_| "sa".into());
let password = std::env::var("MSSQL_PASSWORD").unwrap_or_else(|_| "Password123!".into());
let conn_str = format!(
"Server={host};Database={database};User Id={user};Password={password};TrustServerCertificate=true"
);
let config = Config::from_connection_string(&conn_str)?;
let mut client = Client::connect(config).await?;
println!("Connected to SQL Server");
client
.simple_query("IF OBJECT_ID('tempdb..#BulkTest', 'U') IS NOT NULL DROP TABLE #BulkTest")
.await?;
client
.simple_query(
"CREATE TABLE #BulkTest (
id INT NOT NULL,
name NVARCHAR(100) NOT NULL,
value DECIMAL(18,2),
created_at DATETIME2 DEFAULT GETDATE()
)",
)
.await?;
println!("Test table created");
let options = BulkOptions {
batch_size: 1000, check_constraints: true,
fire_triggers: false,
keep_nulls: true,
table_lock: false, order_hint: None,
max_errors: 0, };
let columns = vec![
BulkColumn::new("id", "INT", 0),
BulkColumn::new("name", "NVARCHAR(100)", 1),
BulkColumn::new("value", "DECIMAL(18,2)", 2),
];
let builder = BulkInsertBuilder::new("#BulkTest")
.with_options(options.clone())
.with_typed_columns(columns.clone());
let insert_bulk_sql = builder.build_insert_bulk_statement()?;
println!("INSERT BULK statement: {insert_bulk_sql}");
println!("Starting bulk insert...");
let mut bulk = BulkInsert::new(columns, options.batch_size);
let num_rows = 10_000;
for i in 0..num_rows {
let row = vec![
SqlValue::Int(i),
SqlValue::String(format!("User_{i}")),
SqlValue::Null, ];
bulk.send_row_values(&row)?;
if bulk.should_flush() {
let packets = bulk.take_packets();
println!(
" Flushed batch at {} rows, {} packets generated",
bulk.total_rows(),
packets.len()
);
}
}
let final_packets = bulk.finish_packets();
println!(
" Final flush: {} total rows, {} packets",
bulk.total_rows(),
final_packets.len()
);
let result = bulk.result();
println!(
"\nBulk insert packet generation complete: {} rows prepared, {} batches",
result.rows_affected, result.batches_committed
);
let rows = client
.query("SELECT 'Bulk insert packets generated' AS status", &[])
.await?;
for result in rows {
let row = result?;
let status: String = row.get(0)?;
println!("Status: {status}");
}
client.close().await?;
println!("\nDone!");
Ok(())
}
#[allow(dead_code)]
fn create_sample_rows() -> Vec<Vec<SqlValue>> {
let mut rows = Vec::new();
rows.push(vec![
SqlValue::Int(1),
SqlValue::String("Integer test".into()),
SqlValue::Null,
]);
rows.push(vec![
SqlValue::Int(2),
SqlValue::String("Unicode test".into()),
SqlValue::Null,
]);
rows.push(vec![
SqlValue::Int(3),
SqlValue::String("NULL value test".into()),
SqlValue::Null,
]);
rows
}