use parking_lot::{Condvar, Mutex};
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::connection::SochConnection;
use crate::error::Result;
use sochdb_core::soch::SochValue;
#[derive(Debug, Clone)]
pub enum BatchOp {
Insert {
table: String,
values: Vec<(String, SochValue)>,
},
InsertSlice {
table: String,
row_id: u64,
values: Vec<Option<SochValue>>,
},
Update {
table: String,
key_field: String,
key_value: SochValue,
updates: Vec<(String, SochValue)>,
},
Delete {
table: String,
key_field: String,
key_value: SochValue,
},
}
#[derive(Debug, Clone)]
pub struct BatchResult {
pub ops_executed: usize,
pub ops_failed: usize,
pub duration_ms: u64,
pub fsync_count: u64,
pub chunks_committed: usize,
}
pub struct BatchWriter<'a> {
conn: &'a SochConnection,
ops: Vec<BatchOp>,
max_batch_size: usize,
auto_commit: bool,
chunks_committed: usize,
total_ops_executed: usize,
total_ops_failed: usize,
cumulative_duration_ms: u64,
}
impl<'a> BatchWriter<'a> {
pub fn new(conn: &'a SochConnection) -> Self {
Self {
conn,
ops: Vec::new(),
max_batch_size: 1000,
auto_commit: false,
chunks_committed: 0,
total_ops_executed: 0,
total_ops_failed: 0,
cumulative_duration_ms: 0,
}
}
pub fn max_batch_size(mut self, size: usize) -> Self {
self.max_batch_size = size.max(1); self
}
pub fn auto_commit(mut self, enabled: bool) -> Self {
self.auto_commit = enabled;
self
}
fn maybe_auto_flush(&mut self) -> Result<()> {
if self.auto_commit && self.ops.len() >= self.max_batch_size {
self.flush_current_batch()?;
}
Ok(())
}
fn flush_current_batch(&mut self) -> Result<()> {
if self.ops.is_empty() {
return Ok(());
}
let start = Instant::now();
let batch_ops = std::mem::take(&mut self.ops);
let batch_size = batch_ops.len();
let mut ops_failed = 0;
{
let mut tch = self.conn.tch.write();
for op in batch_ops {
match op {
BatchOp::Insert { table, values } => {
let map: std::collections::HashMap<String, SochValue> =
values.into_iter().collect();
tch.insert_row(&table, &map);
}
BatchOp::InsertSlice {
table,
row_id: _,
values,
} => {
let schema = tch.get_table_schema(&table);
if let Some(schema) = schema {
let columns: Vec<_> = schema
.fields
.iter()
.zip(values.into_iter())
.filter_map(|(name, val)| val.map(|v| (name.clone(), v)))
.collect();
let map: std::collections::HashMap<String, SochValue> =
columns.into_iter().collect();
tch.insert_row(&table, &map);
} else {
ops_failed += 1;
}
}
BatchOp::Update {
table,
key_field,
key_value,
updates,
} => {
let map: std::collections::HashMap<String, SochValue> =
updates.into_iter().collect();
let where_clause = crate::connection::WhereClause::Simple {
field: key_field,
op: crate::connection::CompareOp::Eq,
value: key_value,
};
let _mutation_result = tch.update_rows(&table, &map, Some(&where_clause));
}
BatchOp::Delete {
table,
key_field,
key_value,
} => {
let where_clause = crate::connection::WhereClause::Simple {
field: key_field,
op: crate::connection::CompareOp::Eq,
value: key_value,
};
let _mutation_result = tch.delete_rows(&table, Some(&where_clause));
}
}
}
}
self.conn.fsync()?;
let duration = start.elapsed().as_millis() as u64;
self.chunks_committed += 1;
self.total_ops_executed += batch_size - ops_failed;
self.total_ops_failed += ops_failed;
self.cumulative_duration_ms += duration;
Ok(())
}
pub fn insert(mut self, table: &str, values: Vec<(&str, SochValue)>) -> Self {
self.ops.push(BatchOp::Insert {
table: table.to_string(),
values: values
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
});
if let Err(_e) = self.maybe_auto_flush() {
}
self
}
pub fn insert_slice(
mut self,
table: &str,
row_id: u64,
values: Vec<Option<SochValue>>,
) -> Self {
self.ops.push(BatchOp::InsertSlice {
table: table.to_string(),
row_id,
values,
});
if let Err(_e) = self.maybe_auto_flush() {
}
self
}
pub fn update(
mut self,
table: &str,
key_field: &str,
key_value: SochValue,
updates: Vec<(&str, SochValue)>,
) -> Self {
self.ops.push(BatchOp::Update {
table: table.to_string(),
key_field: key_field.to_string(),
key_value,
updates: updates
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
});
if let Err(_e) = self.maybe_auto_flush() {
}
self
}
pub fn delete(mut self, table: &str, key_field: &str, key_value: SochValue) -> Self {
self.ops.push(BatchOp::Delete {
table: table.to_string(),
key_field: key_field.to_string(),
key_value,
});
if let Err(_e) = self.maybe_auto_flush() {
}
self
}
pub fn pending_count(&self) -> usize {
self.ops.len()
}
pub fn total_count(&self) -> usize {
self.total_ops_executed + self.total_ops_failed + self.ops.len()
}
pub fn execute(mut self) -> Result<BatchResult> {
let _start = Instant::now();
if !self.ops.is_empty() {
self.flush_current_batch()?;
}
Ok(BatchResult {
ops_executed: self.total_ops_executed,
ops_failed: self.total_ops_failed,
duration_ms: self.cumulative_duration_ms,
fsync_count: self.chunks_committed as u64,
chunks_committed: self.chunks_committed,
})
}
}
#[deprecated(
since = "0.2.0",
note = "Use sochdb_storage::EventDrivenGroupCommit for actual WAL integration"
)]
pub struct GroupCommitBuffer {
inner: Arc<Mutex<GroupCommitInner>>,
condvar: Arc<Condvar>,
config: GroupCommitConfig,
}
struct GroupCommitInner {
pending: VecDeque<PendingCommit>,
batch_id: u64,
}
#[allow(dead_code)]
struct PendingCommit {
id: u64,
batch_id: u64,
committed: bool,
}
#[derive(Debug, Clone)]
pub struct GroupCommitConfig {
pub max_wait_ms: u64,
pub max_batch_size: usize,
pub target_batch_size: usize,
pub fsync_latency_us: u64,
}
impl Default for GroupCommitConfig {
fn default() -> Self {
Self {
max_wait_ms: 10,
max_batch_size: 1000,
target_batch_size: 100,
fsync_latency_us: 5000, }
}
}
#[allow(deprecated)]
impl GroupCommitBuffer {
pub fn new(config: GroupCommitConfig) -> Self {
Self {
inner: Arc::new(Mutex::new(GroupCommitInner {
pending: VecDeque::new(),
batch_id: 0,
})),
condvar: Arc::new(Condvar::new()),
config,
}
}
pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
let l_fsync = self.config.fsync_latency_us as f64 / 1_000_000.0;
let n_star = (2.0 * l_fsync * arrival_rate / wait_cost).sqrt();
(n_star as usize).clamp(1, self.config.max_batch_size)
}
pub fn submit_and_wait(&self, op_id: u64) -> Result<u64> {
let timeout = Duration::from_millis(self.config.max_wait_ms);
let target_size = self.config.target_batch_size;
let mut inner = self.inner.lock();
let current_batch_id = inner.batch_id;
inner.pending.push_back(PendingCommit {
id: op_id,
batch_id: current_batch_id,
committed: false,
});
let need_flush = inner.pending.len() >= target_size;
if need_flush {
inner.batch_id += 1;
}
let batch_id = inner.batch_id;
let result = self.condvar.wait_for(&mut inner, timeout);
if result.timed_out() {
inner.batch_id += 1;
}
Ok(batch_id)
}
pub fn flush(&self) {
let mut inner = self.inner.lock();
for pending in inner.pending.iter_mut() {
pending.committed = true;
}
inner.pending.clear();
inner.batch_id += 1;
self.condvar.notify_all();
}
pub fn pending_count(&self) -> usize {
self.inner.lock().pending.len()
}
}
impl SochConnection {
pub fn batch<'a>(&'a self) -> BatchWriter<'a> {
BatchWriter::new(self)
}
pub fn bulk_insert(
&self,
table: &str,
rows: Vec<Vec<(&str, SochValue)>>,
) -> Result<BatchResult> {
let mut batch = BatchWriter::new(self)
.max_batch_size(1000)
.auto_commit(true); for row in rows {
batch = batch.insert(table, row);
}
batch.execute()
}
pub fn bulk_insert_slice(
&self,
table: &str,
rows: Vec<(u64, Vec<Option<SochValue>>)>,
) -> Result<BatchResult> {
let mut batch = BatchWriter::new(self)
.max_batch_size(1000)
.auto_commit(true);
for (row_id, values) in rows {
batch = batch.insert_slice(table, row_id, values);
}
batch.execute()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_writer() {
let conn = SochConnection::open("./test").unwrap();
let result = conn
.batch()
.insert(
"users",
vec![
("id", SochValue::Int(1)),
("name", SochValue::Text("Alice".to_string())),
],
)
.insert(
"users",
vec![
("id", SochValue::Int(2)),
("name", SochValue::Text("Bob".to_string())),
],
)
.execute()
.unwrap();
assert_eq!(result.ops_executed, 2);
assert_eq!(result.fsync_count, 1);
assert_eq!(result.chunks_committed, 1);
}
#[test]
fn test_streaming_batch_writer() {
let conn = SochConnection::open("./test_streaming").unwrap();
let result = conn
.batch()
.max_batch_size(2)
.auto_commit(true)
.insert("users", vec![("id", SochValue::Int(1))])
.insert("users", vec![("id", SochValue::Int(2))])
.insert("users", vec![("id", SochValue::Int(3))])
.insert("users", vec![("id", SochValue::Int(4))])
.insert("users", vec![("id", SochValue::Int(5))])
.execute()
.unwrap();
assert_eq!(result.ops_executed, 5);
assert_eq!(result.chunks_committed, 3); }
#[test]
fn test_group_commit_config() {
let config = GroupCommitConfig::default();
assert_eq!(config.max_wait_ms, 10);
assert_eq!(config.max_batch_size, 1000);
}
#[test]
#[allow(deprecated)]
fn test_optimal_batch_size() {
let config = GroupCommitConfig {
fsync_latency_us: 5000, ..Default::default()
};
let buffer = GroupCommitBuffer::new(config);
let optimal = buffer.optimal_batch_size(10000.0, 0.001);
assert!(optimal > 1);
assert!(optimal <= 1000);
}
#[test]
fn test_bulk_insert() {
let conn = SochConnection::open("./test").unwrap();
let rows = vec![
vec![
("id", SochValue::Int(1)),
("name", SochValue::Text("A".to_string())),
],
vec![
("id", SochValue::Int(2)),
("name", SochValue::Text("B".to_string())),
],
vec![
("id", SochValue::Int(3)),
("name", SochValue::Text("C".to_string())),
],
];
let result = conn.bulk_insert("users", rows).unwrap();
assert_eq!(result.ops_executed, 3);
}
#[test]
fn test_insert_slice() {
use crate::connection::FieldType;
let conn = SochConnection::open("./test_slice").unwrap();
conn.register_table(
"users",
&[
("id".to_string(), FieldType::UInt64),
("name".to_string(), FieldType::Text),
],
)
.unwrap();
let result = conn
.batch()
.insert_slice(
"users",
1,
vec![
Some(SochValue::UInt(1)),
Some(SochValue::Text("Alice".to_string())),
],
)
.insert_slice(
"users",
2,
vec![
Some(SochValue::UInt(2)),
Some(SochValue::Text("Bob".to_string())),
],
)
.execute()
.unwrap();
assert_eq!(result.ops_executed, 2);
}
}