1use crate::concurrent::parallel_batch::{BatchConfig, BatchOperation};
7use crate::model::{Object, Predicate, Subject, Triple};
8use crate::OxirsError;
9use parking_lot::Mutex;
10use std::collections::HashSet;
11use std::sync::Arc;
12
13type TransformFn = Arc<dyn Fn(&Triple) -> Option<Triple> + Send + Sync>;
15
16type FlushCallback = Arc<Mutex<Option<Box<dyn Fn(Vec<BatchOperation>) + Send + Sync>>>>;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CoalescingStrategy {
22 None,
24 Deduplicate,
26 Merge,
28 OptimizeOrder,
30}
31
32#[derive(Debug, Clone)]
34pub struct BatchBuilderConfig {
35 pub max_batch_size: usize,
37 pub max_memory_usage: usize,
39 pub coalescing_strategy: CoalescingStrategy,
41 pub auto_flush: bool,
43 pub group_by_type: bool,
45}
46
47impl Default for BatchBuilderConfig {
48 fn default() -> Self {
49 let total_memory = sys_info::mem_info()
50 .map(|info| info.total * 1024) .unwrap_or(8 * 1024 * 1024 * 1024); BatchBuilderConfig {
54 max_batch_size: 10000,
55 max_memory_usage: (total_memory as usize) / 10, coalescing_strategy: CoalescingStrategy::Deduplicate,
57 auto_flush: true,
58 group_by_type: true,
59 }
60 }
61}
62
63impl BatchBuilderConfig {
64 pub fn auto() -> Self {
66 let num_cpus = num_cpus::get();
67 let mem_info = sys_info::mem_info().ok();
68
69 let (max_batch_size, max_memory_usage) = if let Some(info) = mem_info {
70 let total_mb = info.total / 1024;
71 if total_mb > 16384 {
72 (50000, (info.total * 1024 / 8) as usize) } else if total_mb > 8192 {
75 (20000, (info.total * 1024 / 10) as usize) } else {
78 (5000, (info.total * 1024 / 20) as usize) }
80 } else {
81 (10000, 1024 * 1024 * 1024) };
83
84 BatchBuilderConfig {
85 max_batch_size: max_batch_size * num_cpus / 4, max_memory_usage,
87 coalescing_strategy: CoalescingStrategy::Merge,
88 auto_flush: true,
89 group_by_type: true,
90 }
91 }
92}
93
94#[derive(Debug, Clone, Default)]
96pub struct BatchBuilderStats {
97 pub total_operations: usize,
98 pub coalesced_operations: usize,
99 pub deduplicated_operations: usize,
100 pub batches_created: usize,
101 pub estimated_memory_usage: usize,
102}
103
104pub struct BatchBuilder {
106 config: BatchBuilderConfig,
107 insert_buffer: Vec<Triple>,
109 insert_set: HashSet<Triple>,
110 remove_buffer: Vec<Triple>,
112 remove_set: HashSet<Triple>,
113 query_buffer: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
115 transform_buffer: Vec<TransformFn>,
117 estimated_memory: usize,
119 stats: BatchBuilderStats,
121 flush_callback: FlushCallback,
123}
124
125impl BatchBuilder {
126 pub fn new(config: BatchBuilderConfig) -> Self {
128 BatchBuilder {
129 config,
130 insert_buffer: Vec::new(),
131 insert_set: HashSet::new(),
132 remove_buffer: Vec::new(),
133 remove_set: HashSet::new(),
134 query_buffer: Vec::new(),
135 transform_buffer: Vec::new(),
136 estimated_memory: 0,
137 stats: BatchBuilderStats::default(),
138 flush_callback: Arc::new(Mutex::new(None)),
139 }
140 }
141
142 pub fn auto() -> Self {
144 Self::new(BatchBuilderConfig::auto())
145 }
146
147 pub fn on_flush<F>(&mut self, callback: F)
149 where
150 F: Fn(Vec<BatchOperation>) + Send + Sync + 'static,
151 {
152 *self.flush_callback.lock() = Some(Box::new(callback));
153 }
154
155 pub fn insert(&mut self, triple: Triple) -> Result<(), OxirsError> {
157 self.stats.total_operations += 1;
158
159 match self.config.coalescing_strategy {
161 CoalescingStrategy::None => {
162 self.estimated_memory += self.estimate_triple_size(&triple);
163 self.insert_buffer.push(triple);
164 }
165 CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
166 if self.insert_set.insert(triple.clone()) {
167 self.insert_buffer.push(triple.clone());
168 self.estimated_memory += self.estimate_triple_size(&triple);
169 } else {
170 self.stats.deduplicated_operations += 1;
171 }
172 }
173 CoalescingStrategy::OptimizeOrder => {
174 if self.insert_set.insert(triple.clone()) {
176 self.insert_buffer.push(triple.clone());
177 self.estimated_memory += self.estimate_triple_size(&triple);
178 }
179 }
180 }
181
182 self.check_flush()?;
183 Ok(())
184 }
185
186 pub fn insert_batch(&mut self, triples: Vec<Triple>) -> Result<(), OxirsError> {
188 for triple in triples {
189 self.insert(triple)?;
190 }
191 Ok(())
192 }
193
194 pub fn remove(&mut self, triple: Triple) -> Result<(), OxirsError> {
196 self.stats.total_operations += 1;
197
198 match self.config.coalescing_strategy {
200 CoalescingStrategy::None => {
201 self.estimated_memory += self.estimate_triple_size(&triple);
202 self.remove_buffer.push(triple);
203 }
204 CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
205 if self.remove_set.insert(triple.clone()) {
206 self.remove_buffer.push(triple.clone());
207 self.estimated_memory += self.estimate_triple_size(&triple);
208 } else {
209 self.stats.deduplicated_operations += 1;
210 }
211 }
212 CoalescingStrategy::OptimizeOrder => {
213 if self.remove_set.insert(triple.clone()) {
214 self.remove_buffer.push(triple.clone());
215 self.estimated_memory += self.estimate_triple_size(&triple);
216 }
217 }
218 }
219
220 self.check_flush()?;
221 Ok(())
222 }
223
224 pub fn query(
226 &mut self,
227 subject: Option<Subject>,
228 predicate: Option<Predicate>,
229 object: Option<Object>,
230 ) -> Result<(), OxirsError> {
231 self.stats.total_operations += 1;
232 self.query_buffer.push((subject, predicate, object));
233 self.estimated_memory += 128; self.check_flush()?;
236 Ok(())
237 }
238
239 pub fn transform<F>(&mut self, f: F) -> Result<(), OxirsError>
241 where
242 F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
243 {
244 self.stats.total_operations += 1;
245 self.transform_buffer.push(Arc::new(f));
246 self.estimated_memory += 64; self.check_flush()?;
249 Ok(())
250 }
251
252 pub fn stats(&self) -> &BatchBuilderStats {
254 &self.stats
255 }
256
257 pub fn pending_operations(&self) -> usize {
259 self.insert_buffer.len()
260 + self.remove_buffer.len()
261 + self.query_buffer.len()
262 + self.transform_buffer.len()
263 }
264
265 fn check_flush(&mut self) -> Result<(), OxirsError> {
267 if self.config.auto_flush {
268 let should_flush = self.pending_operations() >= self.config.max_batch_size
269 || self.estimated_memory >= self.config.max_memory_usage;
270
271 if should_flush {
272 self.flush()?;
273 }
274 }
275 Ok(())
276 }
277
278 fn estimate_triple_size(&self, triple: &Triple) -> usize {
280 24 + self.estimate_term_size(triple.subject())
285 + self.estimate_term_size(triple.predicate())
286 + self.estimate_object_size(triple.object())
287 }
288
289 fn estimate_term_size(&self, _term: &impl std::fmt::Display) -> usize {
290 100 }
292
293 fn estimate_object_size(&self, _object: &Object) -> usize {
294 150 }
296
297 pub fn flush(&mut self) -> Result<Vec<BatchOperation>, OxirsError> {
299 let mut operations = Vec::new();
300
301 if self.config.coalescing_strategy == CoalescingStrategy::Merge {
302 self.apply_merge_coalescing();
303 }
304
305 if self.config.group_by_type {
307 if self.config.coalescing_strategy == CoalescingStrategy::OptimizeOrder {
309 self.optimize_operation_order();
310 }
311
312 if !self.insert_buffer.is_empty() {
314 operations.extend(self.create_insert_batches());
315 }
316
317 if !self.remove_buffer.is_empty() {
318 operations.extend(self.create_remove_batches());
319 }
320
321 if !self.query_buffer.is_empty() {
322 operations.extend(self.create_query_batches());
323 }
324
325 if !self.transform_buffer.is_empty() {
326 operations.extend(self.create_transform_batches());
327 }
328 } else {
329 operations = self.create_mixed_batches();
331 }
332
333 self.stats.batches_created += operations.len();
335 self.stats.estimated_memory_usage = self.estimated_memory;
336
337 self.clear();
339
340 if let Some(callback) = &*self.flush_callback.lock() {
342 callback(operations.clone());
343 }
344
345 Ok(operations)
346 }
347
348 fn apply_merge_coalescing(&mut self) {
350 if !self.insert_buffer.is_empty() && !self.remove_buffer.is_empty() {
352 let remove_set = &self.remove_set;
353 let original_len = self.insert_buffer.len();
354 self.insert_buffer
355 .retain(|triple| !remove_set.contains(triple));
356 let coalesced = original_len - self.insert_buffer.len();
357
358 if coalesced > 0 {
359 self.stats.coalesced_operations += coalesced;
360 let insert_set = &self.insert_set;
362 self.remove_buffer
363 .retain(|triple| !insert_set.contains(triple));
364 }
365 }
366 }
367
368 fn optimize_operation_order(&mut self) {
370 self.insert_buffer.sort_by_key(|a| a.subject().to_string());
372
373 self.remove_buffer.sort_by_key(|a| a.subject().to_string());
374 }
375
376 fn create_insert_batches(&mut self) -> Vec<BatchOperation> {
378 let mut batches = Vec::new();
379 let mut current_batch = Vec::new();
380
381 for triple in self.insert_buffer.drain(..) {
382 current_batch.push(triple);
383 if current_batch.len() >= self.config.max_batch_size {
384 batches.push(BatchOperation::Insert(std::mem::take(&mut current_batch)));
385 }
386 }
387
388 if !current_batch.is_empty() {
389 batches.push(BatchOperation::Insert(current_batch));
390 }
391
392 batches
393 }
394
395 fn create_remove_batches(&mut self) -> Vec<BatchOperation> {
397 let mut batches = Vec::new();
398 let mut current_batch = Vec::new();
399
400 for triple in self.remove_buffer.drain(..) {
401 current_batch.push(triple);
402 if current_batch.len() >= self.config.max_batch_size {
403 batches.push(BatchOperation::Remove(std::mem::take(&mut current_batch)));
404 }
405 }
406
407 if !current_batch.is_empty() {
408 batches.push(BatchOperation::Remove(current_batch));
409 }
410
411 batches
412 }
413
414 fn create_query_batches(&mut self) -> Vec<BatchOperation> {
416 self.query_buffer
417 .drain(..)
418 .map(|(s, p, o)| BatchOperation::Query {
419 subject: s,
420 predicate: p,
421 object: o,
422 })
423 .collect()
424 }
425
426 fn create_transform_batches(&mut self) -> Vec<BatchOperation> {
428 self.transform_buffer
429 .drain(..)
430 .map(BatchOperation::Transform)
431 .collect()
432 }
433
434 fn create_mixed_batches(&mut self) -> Vec<BatchOperation> {
436 let mut operations = Vec::new();
439
440 operations.extend(self.create_insert_batches());
441 operations.extend(self.create_remove_batches());
442 operations.extend(self.create_query_batches());
443 operations.extend(self.create_transform_batches());
444
445 operations
446 }
447
448 fn clear(&mut self) {
450 self.insert_buffer.clear();
451 self.insert_set.clear();
452 self.remove_buffer.clear();
453 self.remove_set.clear();
454 self.query_buffer.clear();
455 self.transform_buffer.clear();
456 self.estimated_memory = 0;
457 }
458}
459
460impl From<&BatchBuilderConfig> for BatchConfig {
462 fn from(builder_config: &BatchBuilderConfig) -> Self {
463 BatchConfig {
464 batch_size: builder_config.max_batch_size,
465 ..Default::default()
466 }
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use crate::model::NamedNode;
474
475 fn create_test_triple(id: usize) -> Triple {
476 Triple::new(
477 Subject::NamedNode(NamedNode::new(format!("http://subject/{id}")).unwrap()),
478 Predicate::NamedNode(NamedNode::new(format!("http://predicate/{id}")).unwrap()),
479 Object::NamedNode(NamedNode::new(format!("http://object/{id}")).unwrap()),
480 )
481 }
482
483 #[test]
484 fn test_batch_builder_basic() {
485 let config = BatchBuilderConfig {
486 max_batch_size: 10,
487 auto_flush: false,
488 ..Default::default()
489 };
490
491 let mut builder = BatchBuilder::new(config);
492
493 for i in 0..25 {
495 builder.insert(create_test_triple(i)).unwrap();
496 }
497
498 assert_eq!(builder.pending_operations(), 25);
499
500 let batches = builder.flush().unwrap();
502 assert_eq!(batches.len(), 3); assert_eq!(builder.pending_operations(), 0);
504 }
505
506 #[test]
507 fn test_deduplication() {
508 let config = BatchBuilderConfig {
509 coalescing_strategy: CoalescingStrategy::Deduplicate,
510 auto_flush: false,
511 ..Default::default()
512 };
513
514 let mut builder = BatchBuilder::new(config);
515
516 let triple = create_test_triple(1);
518 for _ in 0..5 {
519 builder.insert(triple.clone()).unwrap();
520 }
521
522 assert_eq!(builder.pending_operations(), 1);
523 assert_eq!(builder.stats().deduplicated_operations, 4);
524 }
525
526 #[test]
527 fn test_merge_coalescing() {
528 let config = BatchBuilderConfig {
529 coalescing_strategy: CoalescingStrategy::Merge,
530 auto_flush: false,
531 ..Default::default()
532 };
533
534 let mut builder = BatchBuilder::new(config);
535
536 let triple = create_test_triple(1);
538 builder.insert(triple.clone()).unwrap();
539 builder.remove(triple).unwrap();
540
541 let batches = builder.flush().unwrap();
543 assert_eq!(batches.len(), 0);
544 assert_eq!(builder.stats().coalesced_operations, 1);
545 }
546
547 #[test]
548 fn test_auto_flush() {
549 let config = BatchBuilderConfig {
550 max_batch_size: 5,
551 auto_flush: true,
552 ..Default::default()
553 };
554
555 let flushed_batches = Arc::new(Mutex::new(Vec::new()));
556 let flushed_clone = flushed_batches.clone();
557
558 let mut builder = BatchBuilder::new(config);
559 builder.on_flush(move |batches| {
560 flushed_clone.lock().extend(batches);
561 });
562
563 for i in 0..12 {
565 builder.insert(create_test_triple(i)).unwrap();
566 }
567
568 assert_eq!(flushed_batches.lock().len(), 2);
570 assert_eq!(builder.pending_operations(), 2); }
572
573 #[test]
574 fn test_mixed_operations() {
575 let config = BatchBuilderConfig {
576 group_by_type: true,
577 auto_flush: false,
578 ..Default::default()
579 };
580
581 let mut builder = BatchBuilder::new(config);
582
583 builder.insert(create_test_triple(1)).unwrap();
585 builder.remove(create_test_triple(2)).unwrap();
586 builder.query(None, None, None).unwrap();
587
588 let batches = builder.flush().unwrap();
589
590 assert_eq!(batches.len(), 3);
592 }
593
594 #[test]
595 fn test_memory_limits() {
596 let config = BatchBuilderConfig {
597 max_memory_usage: 1000, auto_flush: true,
599 ..Default::default()
600 };
601
602 let mut builder = BatchBuilder::new(config);
603
604 let mut added = 0;
606 for i in 0..100 {
607 builder.insert(create_test_triple(i)).unwrap();
608 added += 1;
609 if builder.pending_operations() == 0 {
610 break;
612 }
613 }
614
615 assert!(added < 100);
617 assert_eq!(builder.stats().batches_created, 1);
618 }
619}