use crate::concurrent::parallel_batch::{BatchConfig, BatchOperation};
use crate::model::{Object, Predicate, Subject, Triple};
use crate::OxirsError;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::sync::Arc;
type TransformFn = Arc<dyn Fn(&Triple) -> Option<Triple> + Send + Sync>;
type FlushCallback = Arc<Mutex<Option<Box<dyn Fn(Vec<BatchOperation>) + Send + Sync>>>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CoalescingStrategy {
None,
Deduplicate,
Merge,
OptimizeOrder,
}
#[derive(Debug, Clone)]
pub struct BatchBuilderConfig {
pub max_batch_size: usize,
pub max_memory_usage: usize,
pub coalescing_strategy: CoalescingStrategy,
pub auto_flush: bool,
pub group_by_type: bool,
}
impl Default for BatchBuilderConfig {
fn default() -> Self {
let total_memory = sys_info::mem_info()
.map(|info| info.total * 1024) .unwrap_or(8 * 1024 * 1024 * 1024);
BatchBuilderConfig {
max_batch_size: 10000,
max_memory_usage: (total_memory as usize) / 10, coalescing_strategy: CoalescingStrategy::Deduplicate,
auto_flush: true,
group_by_type: true,
}
}
}
impl BatchBuilderConfig {
pub fn auto() -> Self {
let num_cpus = num_cpus::get();
let mem_info = sys_info::mem_info().ok();
let (max_batch_size, max_memory_usage) = if let Some(info) = mem_info {
let total_mb = info.total / 1024;
if total_mb > 16384 {
(50000, (info.total * 1024 / 8) as usize) } else if total_mb > 8192 {
(20000, (info.total * 1024 / 10) as usize) } else {
(5000, (info.total * 1024 / 20) as usize) }
} else {
(10000, 1024 * 1024 * 1024) };
BatchBuilderConfig {
max_batch_size: max_batch_size * num_cpus / 4, max_memory_usage,
coalescing_strategy: CoalescingStrategy::Merge,
auto_flush: true,
group_by_type: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchBuilderStats {
pub total_operations: usize,
pub coalesced_operations: usize,
pub deduplicated_operations: usize,
pub batches_created: usize,
pub estimated_memory_usage: usize,
}
pub struct BatchBuilder {
config: BatchBuilderConfig,
insert_buffer: Vec<Triple>,
insert_set: HashSet<Triple>,
remove_buffer: Vec<Triple>,
remove_set: HashSet<Triple>,
query_buffer: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
transform_buffer: Vec<TransformFn>,
estimated_memory: usize,
stats: BatchBuilderStats,
flush_callback: FlushCallback,
}
impl BatchBuilder {
pub fn new(config: BatchBuilderConfig) -> Self {
BatchBuilder {
config,
insert_buffer: Vec::new(),
insert_set: HashSet::new(),
remove_buffer: Vec::new(),
remove_set: HashSet::new(),
query_buffer: Vec::new(),
transform_buffer: Vec::new(),
estimated_memory: 0,
stats: BatchBuilderStats::default(),
flush_callback: Arc::new(Mutex::new(None)),
}
}
pub fn auto() -> Self {
Self::new(BatchBuilderConfig::auto())
}
pub fn on_flush<F>(&mut self, callback: F)
where
F: Fn(Vec<BatchOperation>) + Send + Sync + 'static,
{
*self.flush_callback.lock() = Some(Box::new(callback));
}
pub fn insert(&mut self, triple: Triple) -> Result<(), OxirsError> {
self.stats.total_operations += 1;
match self.config.coalescing_strategy {
CoalescingStrategy::None => {
self.estimated_memory += self.estimate_triple_size(&triple);
self.insert_buffer.push(triple);
}
CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
if self.insert_set.insert(triple.clone()) {
self.insert_buffer.push(triple.clone());
self.estimated_memory += self.estimate_triple_size(&triple);
} else {
self.stats.deduplicated_operations += 1;
}
}
CoalescingStrategy::OptimizeOrder => {
if self.insert_set.insert(triple.clone()) {
self.insert_buffer.push(triple.clone());
self.estimated_memory += self.estimate_triple_size(&triple);
}
}
}
self.check_flush()?;
Ok(())
}
pub fn insert_batch(&mut self, triples: Vec<Triple>) -> Result<(), OxirsError> {
for triple in triples {
self.insert(triple)?;
}
Ok(())
}
pub fn remove(&mut self, triple: Triple) -> Result<(), OxirsError> {
self.stats.total_operations += 1;
match self.config.coalescing_strategy {
CoalescingStrategy::None => {
self.estimated_memory += self.estimate_triple_size(&triple);
self.remove_buffer.push(triple);
}
CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
if self.remove_set.insert(triple.clone()) {
self.remove_buffer.push(triple.clone());
self.estimated_memory += self.estimate_triple_size(&triple);
} else {
self.stats.deduplicated_operations += 1;
}
}
CoalescingStrategy::OptimizeOrder => {
if self.remove_set.insert(triple.clone()) {
self.remove_buffer.push(triple.clone());
self.estimated_memory += self.estimate_triple_size(&triple);
}
}
}
self.check_flush()?;
Ok(())
}
pub fn query(
&mut self,
subject: Option<Subject>,
predicate: Option<Predicate>,
object: Option<Object>,
) -> Result<(), OxirsError> {
self.stats.total_operations += 1;
self.query_buffer.push((subject, predicate, object));
self.estimated_memory += 128;
self.check_flush()?;
Ok(())
}
pub fn transform<F>(&mut self, f: F) -> Result<(), OxirsError>
where
F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
{
self.stats.total_operations += 1;
self.transform_buffer.push(Arc::new(f));
self.estimated_memory += 64;
self.check_flush()?;
Ok(())
}
pub fn stats(&self) -> &BatchBuilderStats {
&self.stats
}
pub fn pending_operations(&self) -> usize {
self.insert_buffer.len()
+ self.remove_buffer.len()
+ self.query_buffer.len()
+ self.transform_buffer.len()
}
fn check_flush(&mut self) -> Result<(), OxirsError> {
if self.config.auto_flush {
let should_flush = self.pending_operations() >= self.config.max_batch_size
|| self.estimated_memory >= self.config.max_memory_usage;
if should_flush {
self.flush()?;
}
}
Ok(())
}
fn estimate_triple_size(&self, triple: &Triple) -> usize {
24 + self.estimate_term_size(triple.subject())
+ self.estimate_term_size(triple.predicate())
+ self.estimate_object_size(triple.object())
}
fn estimate_term_size(&self, _term: &impl std::fmt::Display) -> usize {
100 }
fn estimate_object_size(&self, _object: &Object) -> usize {
150 }
pub fn flush(&mut self) -> Result<Vec<BatchOperation>, OxirsError> {
let mut operations = Vec::new();
if self.config.coalescing_strategy == CoalescingStrategy::Merge {
self.apply_merge_coalescing();
}
if self.config.group_by_type {
if self.config.coalescing_strategy == CoalescingStrategy::OptimizeOrder {
self.optimize_operation_order();
}
if !self.insert_buffer.is_empty() {
operations.extend(self.create_insert_batches());
}
if !self.remove_buffer.is_empty() {
operations.extend(self.create_remove_batches());
}
if !self.query_buffer.is_empty() {
operations.extend(self.create_query_batches());
}
if !self.transform_buffer.is_empty() {
operations.extend(self.create_transform_batches());
}
} else {
operations = self.create_mixed_batches();
}
self.stats.batches_created += operations.len();
self.stats.estimated_memory_usage = self.estimated_memory;
self.clear();
if let Some(callback) = &*self.flush_callback.lock() {
callback(operations.clone());
}
Ok(operations)
}
fn apply_merge_coalescing(&mut self) {
if !self.insert_buffer.is_empty() && !self.remove_buffer.is_empty() {
let remove_set = &self.remove_set;
let original_len = self.insert_buffer.len();
self.insert_buffer
.retain(|triple| !remove_set.contains(triple));
let coalesced = original_len - self.insert_buffer.len();
if coalesced > 0 {
self.stats.coalesced_operations += coalesced;
let insert_set = &self.insert_set;
self.remove_buffer
.retain(|triple| !insert_set.contains(triple));
}
}
}
fn optimize_operation_order(&mut self) {
self.insert_buffer.sort_by_key(|a| a.subject().to_string());
self.remove_buffer.sort_by_key(|a| a.subject().to_string());
}
fn create_insert_batches(&mut self) -> Vec<BatchOperation> {
let mut batches = Vec::new();
let mut current_batch = Vec::new();
for triple in self.insert_buffer.drain(..) {
current_batch.push(triple);
if current_batch.len() >= self.config.max_batch_size {
batches.push(BatchOperation::Insert(std::mem::take(&mut current_batch)));
}
}
if !current_batch.is_empty() {
batches.push(BatchOperation::Insert(current_batch));
}
batches
}
fn create_remove_batches(&mut self) -> Vec<BatchOperation> {
let mut batches = Vec::new();
let mut current_batch = Vec::new();
for triple in self.remove_buffer.drain(..) {
current_batch.push(triple);
if current_batch.len() >= self.config.max_batch_size {
batches.push(BatchOperation::Remove(std::mem::take(&mut current_batch)));
}
}
if !current_batch.is_empty() {
batches.push(BatchOperation::Remove(current_batch));
}
batches
}
fn create_query_batches(&mut self) -> Vec<BatchOperation> {
self.query_buffer
.drain(..)
.map(|(s, p, o)| BatchOperation::Query {
subject: s,
predicate: p,
object: o,
})
.collect()
}
fn create_transform_batches(&mut self) -> Vec<BatchOperation> {
self.transform_buffer
.drain(..)
.map(BatchOperation::Transform)
.collect()
}
fn create_mixed_batches(&mut self) -> Vec<BatchOperation> {
let mut operations = Vec::new();
operations.extend(self.create_insert_batches());
operations.extend(self.create_remove_batches());
operations.extend(self.create_query_batches());
operations.extend(self.create_transform_batches());
operations
}
fn clear(&mut self) {
self.insert_buffer.clear();
self.insert_set.clear();
self.remove_buffer.clear();
self.remove_set.clear();
self.query_buffer.clear();
self.transform_buffer.clear();
self.estimated_memory = 0;
}
}
impl From<&BatchBuilderConfig> for BatchConfig {
fn from(builder_config: &BatchBuilderConfig) -> Self {
BatchConfig {
batch_size: builder_config.max_batch_size,
..Default::default()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::NamedNode;
fn create_test_triple(id: usize) -> Triple {
Triple::new(
Subject::NamedNode(
NamedNode::new(format!("http://subject/{id}")).expect("valid IRI from format"),
),
Predicate::NamedNode(
NamedNode::new(format!("http://predicate/{id}")).expect("valid IRI from format"),
),
Object::NamedNode(
NamedNode::new(format!("http://object/{id}")).expect("valid IRI from format"),
),
)
}
#[test]
fn test_batch_builder_basic() {
let config = BatchBuilderConfig {
max_batch_size: 10,
auto_flush: false,
..Default::default()
};
let mut builder = BatchBuilder::new(config);
for i in 0..25 {
builder
.insert(create_test_triple(i))
.expect("builder insert should succeed");
}
assert_eq!(builder.pending_operations(), 25);
let batches = builder.flush().expect("flush should succeed");
assert_eq!(batches.len(), 3); assert_eq!(builder.pending_operations(), 0);
}
#[test]
fn test_deduplication() {
let config = BatchBuilderConfig {
coalescing_strategy: CoalescingStrategy::Deduplicate,
auto_flush: false,
..Default::default()
};
let mut builder = BatchBuilder::new(config);
let triple = create_test_triple(1);
for _ in 0..5 {
builder
.insert(triple.clone())
.expect("builder insert should succeed");
}
assert_eq!(builder.pending_operations(), 1);
assert_eq!(builder.stats().deduplicated_operations, 4);
}
#[test]
fn test_merge_coalescing() {
let config = BatchBuilderConfig {
coalescing_strategy: CoalescingStrategy::Merge,
auto_flush: false,
..Default::default()
};
let mut builder = BatchBuilder::new(config);
let triple = create_test_triple(1);
builder
.insert(triple.clone())
.expect("builder insert should succeed");
builder.remove(triple).expect("remove should succeed");
let batches = builder.flush().expect("flush should succeed");
assert_eq!(batches.len(), 0);
assert_eq!(builder.stats().coalesced_operations, 1);
}
#[test]
fn test_auto_flush() {
let config = BatchBuilderConfig {
max_batch_size: 5,
auto_flush: true,
..Default::default()
};
let flushed_batches = Arc::new(Mutex::new(Vec::new()));
let flushed_clone = flushed_batches.clone();
let mut builder = BatchBuilder::new(config);
builder.on_flush(move |batches| {
flushed_clone.lock().extend(batches);
});
for i in 0..12 {
builder
.insert(create_test_triple(i))
.expect("builder insert should succeed");
}
assert_eq!(flushed_batches.lock().len(), 2);
assert_eq!(builder.pending_operations(), 2); }
#[test]
fn test_mixed_operations() {
let config = BatchBuilderConfig {
group_by_type: true,
auto_flush: false,
..Default::default()
};
let mut builder = BatchBuilder::new(config);
builder
.insert(create_test_triple(1))
.expect("builder insert should succeed");
builder
.remove(create_test_triple(2))
.expect("builder remove should succeed");
builder
.query(None, None, None)
.expect("query should succeed");
let batches = builder.flush().expect("flush should succeed");
assert_eq!(batches.len(), 3);
}
#[test]
fn test_memory_limits() {
let config = BatchBuilderConfig {
max_memory_usage: 1000, auto_flush: true,
..Default::default()
};
let mut builder = BatchBuilder::new(config);
let mut added = 0;
for i in 0..100 {
builder
.insert(create_test_triple(i))
.expect("builder insert should succeed");
added += 1;
if builder.pending_operations() == 0 {
break;
}
}
assert!(added < 100);
assert_eq!(builder.stats().batches_created, 1);
}
}