oxirs_core/concurrent/
batch_builder.rs

1//! Batch builder for accumulating and optimizing RDF operations
2//!
3//! This module provides a builder pattern for accumulating operations into
4//! optimal batches based on system resources and operation types.
5
6use crate::concurrent::parallel_batch::{BatchConfig, BatchOperation};
7use crate::model::{Object, Predicate, Subject, Triple};
8use crate::OxirsError;
9use parking_lot::Mutex;
10use std::collections::HashSet;
11use std::sync::Arc;
12
13/// Type alias for transform functions
14type TransformFn = Arc<dyn Fn(&Triple) -> Option<Triple> + Send + Sync>;
15
16/// Type alias for flush callback functions
17type FlushCallback = Arc<Mutex<Option<Box<dyn Fn(Vec<BatchOperation>) + Send + Sync>>>>;
18
19/// Operation coalescing strategy
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum CoalescingStrategy {
22    /// No coalescing - operations are kept as-is
23    None,
24    /// Deduplicate operations (remove duplicates)
25    Deduplicate,
26    /// Merge compatible operations
27    Merge,
28    /// Optimize operation order for better cache locality
29    OptimizeOrder,
30}
31
32/// Batch builder configuration
33#[derive(Debug, Clone)]
34pub struct BatchBuilderConfig {
35    /// Maximum size of a single batch
36    pub max_batch_size: usize,
37    /// Maximum memory usage in bytes
38    pub max_memory_usage: usize,
39    /// Coalescing strategy
40    pub coalescing_strategy: CoalescingStrategy,
41    /// Auto-flush when batch is full
42    pub auto_flush: bool,
43    /// Group operations by type for better performance
44    pub group_by_type: bool,
45}
46
47impl Default for BatchBuilderConfig {
48    fn default() -> Self {
49        let total_memory = sys_info::mem_info()
50            .map(|info| info.total * 1024) // Convert to bytes
51            .unwrap_or(8 * 1024 * 1024 * 1024); // 8GB default
52
53        BatchBuilderConfig {
54            max_batch_size: 10000,
55            max_memory_usage: (total_memory as usize) / 10, // Use up to 10% of system memory
56            coalescing_strategy: CoalescingStrategy::Deduplicate,
57            auto_flush: true,
58            group_by_type: true,
59        }
60    }
61}
62
63impl BatchBuilderConfig {
64    /// Create configuration optimized for current system
65    pub fn auto() -> Self {
66        let num_cpus = num_cpus::get();
67        let mem_info = sys_info::mem_info().ok();
68
69        let (max_batch_size, max_memory_usage) = if let Some(info) = mem_info {
70            let total_mb = info.total / 1024;
71            if total_mb > 16384 {
72                // > 16GB
73                (50000, (info.total * 1024 / 8) as usize) // Large batches, use 1/8 of memory
74            } else if total_mb > 8192 {
75                // > 8GB
76                (20000, (info.total * 1024 / 10) as usize) // Medium batches, use 1/10 of memory
77            } else {
78                (5000, (info.total * 1024 / 20) as usize) // Small batches, use 1/20 of memory
79            }
80        } else {
81            (10000, 1024 * 1024 * 1024) // 1GB default
82        };
83
84        BatchBuilderConfig {
85            max_batch_size: max_batch_size * num_cpus / 4, // Scale with CPU count
86            max_memory_usage,
87            coalescing_strategy: CoalescingStrategy::Merge,
88            auto_flush: true,
89            group_by_type: true,
90        }
91    }
92}
93
94/// Statistics for batch building
95#[derive(Debug, Clone, Default)]
96pub struct BatchBuilderStats {
97    pub total_operations: usize,
98    pub coalesced_operations: usize,
99    pub deduplicated_operations: usize,
100    pub batches_created: usize,
101    pub estimated_memory_usage: usize,
102}
103
104/// Batch builder for accumulating operations
105pub struct BatchBuilder {
106    config: BatchBuilderConfig,
107    /// Insert operations
108    insert_buffer: Vec<Triple>,
109    insert_set: HashSet<Triple>,
110    /// Remove operations  
111    remove_buffer: Vec<Triple>,
112    remove_set: HashSet<Triple>,
113    /// Query operations
114    query_buffer: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
115    /// Transform operations
116    transform_buffer: Vec<TransformFn>,
117    /// Current estimated memory usage
118    estimated_memory: usize,
119    /// Statistics
120    stats: BatchBuilderStats,
121    /// Flush callback
122    flush_callback: FlushCallback,
123}
124
125impl BatchBuilder {
126    /// Create a new batch builder
127    pub fn new(config: BatchBuilderConfig) -> Self {
128        BatchBuilder {
129            config,
130            insert_buffer: Vec::new(),
131            insert_set: HashSet::new(),
132            remove_buffer: Vec::new(),
133            remove_set: HashSet::new(),
134            query_buffer: Vec::new(),
135            transform_buffer: Vec::new(),
136            estimated_memory: 0,
137            stats: BatchBuilderStats::default(),
138            flush_callback: Arc::new(Mutex::new(None)),
139        }
140    }
141
142    /// Create a batch builder with automatic configuration
143    pub fn auto() -> Self {
144        Self::new(BatchBuilderConfig::auto())
145    }
146
147    /// Set a callback to be called when batches are flushed
148    pub fn on_flush<F>(&mut self, callback: F)
149    where
150        F: Fn(Vec<BatchOperation>) + Send + Sync + 'static,
151    {
152        *self.flush_callback.lock() = Some(Box::new(callback));
153    }
154
155    /// Add an insert operation
156    pub fn insert(&mut self, triple: Triple) -> Result<(), OxirsError> {
157        self.stats.total_operations += 1;
158
159        // Apply coalescing
160        match self.config.coalescing_strategy {
161            CoalescingStrategy::None => {
162                self.estimated_memory += self.estimate_triple_size(&triple);
163                self.insert_buffer.push(triple);
164            }
165            CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
166                if self.insert_set.insert(triple.clone()) {
167                    self.insert_buffer.push(triple.clone());
168                    self.estimated_memory += self.estimate_triple_size(&triple);
169                } else {
170                    self.stats.deduplicated_operations += 1;
171                }
172            }
173            CoalescingStrategy::OptimizeOrder => {
174                // For optimize order, we'll sort later
175                if self.insert_set.insert(triple.clone()) {
176                    self.insert_buffer.push(triple.clone());
177                    self.estimated_memory += self.estimate_triple_size(&triple);
178                }
179            }
180        }
181
182        self.check_flush()?;
183        Ok(())
184    }
185
186    /// Add multiple insert operations
187    pub fn insert_batch(&mut self, triples: Vec<Triple>) -> Result<(), OxirsError> {
188        for triple in triples {
189            self.insert(triple)?;
190        }
191        Ok(())
192    }
193
194    /// Add a remove operation
195    pub fn remove(&mut self, triple: Triple) -> Result<(), OxirsError> {
196        self.stats.total_operations += 1;
197
198        // Apply coalescing
199        match self.config.coalescing_strategy {
200            CoalescingStrategy::None => {
201                self.estimated_memory += self.estimate_triple_size(&triple);
202                self.remove_buffer.push(triple);
203            }
204            CoalescingStrategy::Deduplicate | CoalescingStrategy::Merge => {
205                if self.remove_set.insert(triple.clone()) {
206                    self.remove_buffer.push(triple.clone());
207                    self.estimated_memory += self.estimate_triple_size(&triple);
208                } else {
209                    self.stats.deduplicated_operations += 1;
210                }
211            }
212            CoalescingStrategy::OptimizeOrder => {
213                if self.remove_set.insert(triple.clone()) {
214                    self.remove_buffer.push(triple.clone());
215                    self.estimated_memory += self.estimate_triple_size(&triple);
216                }
217            }
218        }
219
220        self.check_flush()?;
221        Ok(())
222    }
223
224    /// Add a query operation
225    pub fn query(
226        &mut self,
227        subject: Option<Subject>,
228        predicate: Option<Predicate>,
229        object: Option<Object>,
230    ) -> Result<(), OxirsError> {
231        self.stats.total_operations += 1;
232        self.query_buffer.push((subject, predicate, object));
233        self.estimated_memory += 128; // Rough estimate for query pattern
234
235        self.check_flush()?;
236        Ok(())
237    }
238
239    /// Add a transform operation
240    pub fn transform<F>(&mut self, f: F) -> Result<(), OxirsError>
241    where
242        F: Fn(&Triple) -> Option<Triple> + Send + Sync + 'static,
243    {
244        self.stats.total_operations += 1;
245        self.transform_buffer.push(Arc::new(f));
246        self.estimated_memory += 64; // Rough estimate for closure
247
248        self.check_flush()?;
249        Ok(())
250    }
251
252    /// Get current statistics
253    pub fn stats(&self) -> &BatchBuilderStats {
254        &self.stats
255    }
256
257    /// Get the current number of pending operations
258    pub fn pending_operations(&self) -> usize {
259        self.insert_buffer.len()
260            + self.remove_buffer.len()
261            + self.query_buffer.len()
262            + self.transform_buffer.len()
263    }
264
265    /// Check if we should flush based on size or memory constraints
266    fn check_flush(&mut self) -> Result<(), OxirsError> {
267        if self.config.auto_flush {
268            let should_flush = self.pending_operations() >= self.config.max_batch_size
269                || self.estimated_memory >= self.config.max_memory_usage;
270
271            if should_flush {
272                self.flush()?;
273            }
274        }
275        Ok(())
276    }
277
278    /// Estimate the memory size of a triple
279    fn estimate_triple_size(&self, triple: &Triple) -> usize {
280        // Rough estimation:
281        // - Each IRI/blank node: ~100 bytes
282        // - Each literal: string length + 50 bytes overhead
283        // - Triple structure: 24 bytes
284        24 + self.estimate_term_size(triple.subject())
285            + self.estimate_term_size(triple.predicate())
286            + self.estimate_object_size(triple.object())
287    }
288
289    fn estimate_term_size(&self, _term: &impl std::fmt::Display) -> usize {
290        100 // Simplified estimation
291    }
292
293    fn estimate_object_size(&self, _object: &Object) -> usize {
294        150 // Simplified estimation, literals can be larger
295    }
296
297    /// Flush all pending operations into batch operations
298    pub fn flush(&mut self) -> Result<Vec<BatchOperation>, OxirsError> {
299        let mut operations = Vec::new();
300
301        if self.config.coalescing_strategy == CoalescingStrategy::Merge {
302            self.apply_merge_coalescing();
303        }
304
305        // Group operations by type if configured
306        if self.config.group_by_type {
307            // Optimize order if requested
308            if self.config.coalescing_strategy == CoalescingStrategy::OptimizeOrder {
309                self.optimize_operation_order();
310            }
311
312            // Create batches from buffers
313            if !self.insert_buffer.is_empty() {
314                operations.extend(self.create_insert_batches());
315            }
316
317            if !self.remove_buffer.is_empty() {
318                operations.extend(self.create_remove_batches());
319            }
320
321            if !self.query_buffer.is_empty() {
322                operations.extend(self.create_query_batches());
323            }
324
325            if !self.transform_buffer.is_empty() {
326                operations.extend(self.create_transform_batches());
327            }
328        } else {
329            // Mix operation types in batches
330            operations = self.create_mixed_batches();
331        }
332
333        // Update statistics
334        self.stats.batches_created += operations.len();
335        self.stats.estimated_memory_usage = self.estimated_memory;
336
337        // Clear buffers
338        self.clear();
339
340        // Call flush callback if set
341        if let Some(callback) = &*self.flush_callback.lock() {
342            callback(operations.clone());
343        }
344
345        Ok(operations)
346    }
347
348    /// Apply merge coalescing to combine compatible operations
349    fn apply_merge_coalescing(&mut self) {
350        // Remove inserts that are immediately removed
351        if !self.insert_buffer.is_empty() && !self.remove_buffer.is_empty() {
352            let remove_set = &self.remove_set;
353            let original_len = self.insert_buffer.len();
354            self.insert_buffer
355                .retain(|triple| !remove_set.contains(triple));
356            let coalesced = original_len - self.insert_buffer.len();
357
358            if coalesced > 0 {
359                self.stats.coalesced_operations += coalesced;
360                // Also remove from remove buffer
361                let insert_set = &self.insert_set;
362                self.remove_buffer
363                    .retain(|triple| !insert_set.contains(triple));
364            }
365        }
366    }
367
368    /// Optimize operation order for better cache locality
369    fn optimize_operation_order(&mut self) {
370        // Sort by subject for better cache locality
371        self.insert_buffer.sort_by_key(|a| a.subject().to_string());
372
373        self.remove_buffer.sort_by_key(|a| a.subject().to_string());
374    }
375
376    /// Create insert batches respecting max batch size
377    fn create_insert_batches(&mut self) -> Vec<BatchOperation> {
378        let mut batches = Vec::new();
379        let mut current_batch = Vec::new();
380
381        for triple in self.insert_buffer.drain(..) {
382            current_batch.push(triple);
383            if current_batch.len() >= self.config.max_batch_size {
384                batches.push(BatchOperation::Insert(std::mem::take(&mut current_batch)));
385            }
386        }
387
388        if !current_batch.is_empty() {
389            batches.push(BatchOperation::Insert(current_batch));
390        }
391
392        batches
393    }
394
395    /// Create remove batches respecting max batch size
396    fn create_remove_batches(&mut self) -> Vec<BatchOperation> {
397        let mut batches = Vec::new();
398        let mut current_batch = Vec::new();
399
400        for triple in self.remove_buffer.drain(..) {
401            current_batch.push(triple);
402            if current_batch.len() >= self.config.max_batch_size {
403                batches.push(BatchOperation::Remove(std::mem::take(&mut current_batch)));
404            }
405        }
406
407        if !current_batch.is_empty() {
408            batches.push(BatchOperation::Remove(current_batch));
409        }
410
411        batches
412    }
413
414    /// Create query batches
415    fn create_query_batches(&mut self) -> Vec<BatchOperation> {
416        self.query_buffer
417            .drain(..)
418            .map(|(s, p, o)| BatchOperation::Query {
419                subject: s,
420                predicate: p,
421                object: o,
422            })
423            .collect()
424    }
425
426    /// Create transform batches
427    fn create_transform_batches(&mut self) -> Vec<BatchOperation> {
428        self.transform_buffer
429            .drain(..)
430            .map(BatchOperation::Transform)
431            .collect()
432    }
433
434    /// Create mixed batches with different operation types
435    fn create_mixed_batches(&mut self) -> Vec<BatchOperation> {
436        // This is a simplified implementation
437        // In a real scenario, you might want to interleave operations more intelligently
438        let mut operations = Vec::new();
439
440        operations.extend(self.create_insert_batches());
441        operations.extend(self.create_remove_batches());
442        operations.extend(self.create_query_batches());
443        operations.extend(self.create_transform_batches());
444
445        operations
446    }
447
448    /// Clear all buffers
449    fn clear(&mut self) {
450        self.insert_buffer.clear();
451        self.insert_set.clear();
452        self.remove_buffer.clear();
453        self.remove_set.clear();
454        self.query_buffer.clear();
455        self.transform_buffer.clear();
456        self.estimated_memory = 0;
457    }
458}
459
460/// Create a batch configuration from builder config
461impl From<&BatchBuilderConfig> for BatchConfig {
462    fn from(builder_config: &BatchBuilderConfig) -> Self {
463        BatchConfig {
464            batch_size: builder_config.max_batch_size,
465            ..Default::default()
466        }
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use crate::model::NamedNode;
474
475    fn create_test_triple(id: usize) -> Triple {
476        Triple::new(
477            Subject::NamedNode(NamedNode::new(format!("http://subject/{id}")).unwrap()),
478            Predicate::NamedNode(NamedNode::new(format!("http://predicate/{id}")).unwrap()),
479            Object::NamedNode(NamedNode::new(format!("http://object/{id}")).unwrap()),
480        )
481    }
482
483    #[test]
484    fn test_batch_builder_basic() {
485        let config = BatchBuilderConfig {
486            max_batch_size: 10,
487            auto_flush: false,
488            ..Default::default()
489        };
490
491        let mut builder = BatchBuilder::new(config);
492
493        // Add operations
494        for i in 0..25 {
495            builder.insert(create_test_triple(i)).unwrap();
496        }
497
498        assert_eq!(builder.pending_operations(), 25);
499
500        // Flush and check batches
501        let batches = builder.flush().unwrap();
502        assert_eq!(batches.len(), 3); // 10 + 10 + 5
503        assert_eq!(builder.pending_operations(), 0);
504    }
505
506    #[test]
507    fn test_deduplication() {
508        let config = BatchBuilderConfig {
509            coalescing_strategy: CoalescingStrategy::Deduplicate,
510            auto_flush: false,
511            ..Default::default()
512        };
513
514        let mut builder = BatchBuilder::new(config);
515
516        // Add duplicate triples
517        let triple = create_test_triple(1);
518        for _ in 0..5 {
519            builder.insert(triple.clone()).unwrap();
520        }
521
522        assert_eq!(builder.pending_operations(), 1);
523        assert_eq!(builder.stats().deduplicated_operations, 4);
524    }
525
526    #[test]
527    fn test_merge_coalescing() {
528        let config = BatchBuilderConfig {
529            coalescing_strategy: CoalescingStrategy::Merge,
530            auto_flush: false,
531            ..Default::default()
532        };
533
534        let mut builder = BatchBuilder::new(config);
535
536        // Add insert then remove same triple
537        let triple = create_test_triple(1);
538        builder.insert(triple.clone()).unwrap();
539        builder.remove(triple).unwrap();
540
541        // After merge, both should be eliminated
542        let batches = builder.flush().unwrap();
543        assert_eq!(batches.len(), 0);
544        assert_eq!(builder.stats().coalesced_operations, 1);
545    }
546
547    #[test]
548    fn test_auto_flush() {
549        let config = BatchBuilderConfig {
550            max_batch_size: 5,
551            auto_flush: true,
552            ..Default::default()
553        };
554
555        let flushed_batches = Arc::new(Mutex::new(Vec::new()));
556        let flushed_clone = flushed_batches.clone();
557
558        let mut builder = BatchBuilder::new(config);
559        builder.on_flush(move |batches| {
560            flushed_clone.lock().extend(batches);
561        });
562
563        // Add operations that trigger auto-flush
564        for i in 0..12 {
565            builder.insert(create_test_triple(i)).unwrap();
566        }
567
568        // Should have auto-flushed twice
569        assert_eq!(flushed_batches.lock().len(), 2);
570        assert_eq!(builder.pending_operations(), 2); // 12 % 5
571    }
572
573    #[test]
574    fn test_mixed_operations() {
575        let config = BatchBuilderConfig {
576            group_by_type: true,
577            auto_flush: false,
578            ..Default::default()
579        };
580
581        let mut builder = BatchBuilder::new(config);
582
583        // Add different operation types
584        builder.insert(create_test_triple(1)).unwrap();
585        builder.remove(create_test_triple(2)).unwrap();
586        builder.query(None, None, None).unwrap();
587
588        let batches = builder.flush().unwrap();
589
590        // Should have 3 batches (one per type)
591        assert_eq!(batches.len(), 3);
592    }
593
594    #[test]
595    fn test_memory_limits() {
596        let config = BatchBuilderConfig {
597            max_memory_usage: 1000, // Very small limit
598            auto_flush: true,
599            ..Default::default()
600        };
601
602        let mut builder = BatchBuilder::new(config);
603
604        // Add operations until memory limit
605        let mut added = 0;
606        for i in 0..100 {
607            builder.insert(create_test_triple(i)).unwrap();
608            added += 1;
609            if builder.pending_operations() == 0 {
610                // Auto-flushed due to memory
611                break;
612            }
613        }
614
615        // Should have flushed before adding all 100
616        assert!(added < 100);
617        assert_eq!(builder.stats().batches_created, 1);
618    }
619}