Skip to main content

legalis_sim/
performance.rs

1//! Performance optimizations for large-scale simulations.
2//!
3//! This module provides:
4//! - Batch processing for large populations
5//! - Memory-efficient streaming
6//! - Entity pooling and recycling
7//! - Lazy attribute evaluation
8//! - Optimized work distribution
9//! - Memory-mapped population storage
10//! - SIMD-accelerated numeric operations
11//! - Distributed multi-node simulation
12
13use legalis_core::LegalEntity;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use wide::f64x4;
17
18/// Batch processor configuration
19#[derive(Debug, Clone)]
20pub struct BatchConfig {
21    /// Number of entities to process in each batch
22    pub batch_size: usize,
23    /// Number of parallel workers
24    pub num_workers: usize,
25    /// Enable memory-efficient streaming mode
26    pub streaming_mode: bool,
27}
28
29impl Default for BatchConfig {
30    fn default() -> Self {
31        Self {
32            batch_size: 1000,
33            num_workers: num_cpus::get(),
34            streaming_mode: false,
35        }
36    }
37}
38
39impl BatchConfig {
40    /// Creates a new batch configuration with custom batch size
41    pub fn with_batch_size(mut self, size: usize) -> Self {
42        self.batch_size = size;
43        self
44    }
45
46    /// Sets the number of parallel workers
47    pub fn with_workers(mut self, workers: usize) -> Self {
48        self.num_workers = workers;
49        self
50    }
51
52    /// Enables streaming mode for memory efficiency
53    pub fn with_streaming(mut self, enabled: bool) -> Self {
54        self.streaming_mode = enabled;
55        self
56    }
57}
58
59/// Batch iterator for processing large populations in chunks
60pub struct BatchIterator<T> {
61    items: Vec<T>,
62    batch_size: usize,
63    current_index: usize,
64}
65
66impl<T> BatchIterator<T> {
67    /// Creates a new batch iterator
68    pub fn new(items: Vec<T>, batch_size: usize) -> Self {
69        Self {
70            items,
71            batch_size,
72            current_index: 0,
73        }
74    }
75
76    /// Returns the total number of batches
77    pub fn batch_count(&self) -> usize {
78        self.items.len().div_ceil(self.batch_size)
79    }
80}
81
82impl<T> Iterator for BatchIterator<T> {
83    type Item = Vec<T>;
84
85    fn next(&mut self) -> Option<Self::Item> {
86        if self.current_index >= self.items.len() {
87            return None;
88        }
89
90        let end_index = (self.current_index + self.batch_size).min(self.items.len());
91        let batch: Vec<T> = self.items.drain(self.current_index..end_index).collect();
92
93        // Don't increment current_index since we're draining
94        Some(batch)
95    }
96}
97
98/// Entity pool for recycling entity objects
99pub struct EntityPool<T: LegalEntity + Clone> {
100    pool: Vec<T>,
101    max_size: usize,
102}
103
104impl<T: LegalEntity + Clone> EntityPool<T> {
105    /// Creates a new entity pool with a maximum size
106    pub fn new(max_size: usize) -> Self {
107        Self {
108            pool: Vec::with_capacity(max_size),
109            max_size,
110        }
111    }
112
113    /// Acquires an entity from the pool or creates a new one
114    pub fn acquire(&mut self, create_fn: impl FnOnce() -> T) -> T {
115        self.pool.pop().unwrap_or_else(create_fn)
116    }
117
118    /// Returns an entity to the pool for reuse
119    pub fn release(&mut self, entity: T) {
120        if self.pool.len() < self.max_size {
121            self.pool.push(entity);
122        }
123    }
124
125    /// Returns the current pool size
126    pub fn size(&self) -> usize {
127        self.pool.len()
128    }
129
130    /// Clears the pool
131    pub fn clear(&mut self) {
132        self.pool.clear();
133    }
134}
135
136/// Streaming processor for memory-efficient population processing
137pub struct StreamingProcessor {
138    buffer_size: usize,
139}
140
141impl StreamingProcessor {
142    /// Creates a new streaming processor
143    pub fn new(buffer_size: usize) -> Self {
144        Self { buffer_size }
145    }
146
147    /// Processes entities in streaming fashion with a callback
148    pub fn process<F, R>(&self, entities: Vec<Arc<dyn LegalEntity>>, mut processor: F) -> Vec<R>
149    where
150        F: FnMut(&dyn LegalEntity) -> R,
151    {
152        let mut results = Vec::with_capacity(entities.len());
153
154        for chunk in entities.chunks(self.buffer_size) {
155            for entity in chunk {
156                results.push(processor(entity.as_ref()));
157            }
158        }
159
160        results
161    }
162
163    /// Processes entities in parallel batches
164    pub fn process_parallel<F, R>(
165        &self,
166        entities: Vec<Arc<dyn LegalEntity>>,
167        processor: F,
168    ) -> Vec<R>
169    where
170        F: Fn(&dyn LegalEntity) -> R + Send + Sync,
171        R: Send,
172    {
173        use std::sync::Mutex;
174
175        let results = Mutex::new(Vec::with_capacity(entities.len()));
176
177        // Process in chunks to avoid excessive memory usage
178        for chunk in entities.chunks(self.buffer_size) {
179            let chunk_results: Vec<R> = chunk
180                .iter()
181                .map(|entity| processor(entity.as_ref()))
182                .collect();
183
184            results.lock().unwrap().extend(chunk_results);
185        }
186
187        results.into_inner().unwrap()
188    }
189}
190
191/// Lazy attribute cache for delayed evaluation
192pub struct LazyAttributeCache {
193    cache: std::collections::HashMap<String, String>,
194    dirty: bool,
195}
196
197impl LazyAttributeCache {
198    /// Creates a new lazy attribute cache
199    pub fn new() -> Self {
200        Self {
201            cache: std::collections::HashMap::new(),
202            dirty: false,
203        }
204    }
205
206    /// Gets a cached attribute or computes it
207    pub fn get_or_compute<F>(&mut self, key: &str, compute_fn: F) -> String
208    where
209        F: FnOnce() -> String,
210    {
211        if !self.cache.contains_key(key) {
212            let value = compute_fn();
213            self.cache.insert(key.to_string(), value.clone());
214            self.dirty = true;
215            value
216        } else {
217            self.cache.get(key).unwrap().clone()
218        }
219    }
220
221    /// Checks if the cache has been modified
222    pub fn is_dirty(&self) -> bool {
223        self.dirty
224    }
225
226    /// Marks the cache as clean
227    pub fn mark_clean(&mut self) {
228        self.dirty = false;
229    }
230
231    /// Clears the cache
232    pub fn clear(&mut self) {
233        self.cache.clear();
234        self.dirty = false;
235    }
236
237    /// Returns the cache size
238    pub fn size(&self) -> usize {
239        self.cache.len()
240    }
241}
242
243impl Default for LazyAttributeCache {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249/// Work scheduler for optimal thread distribution
250pub struct WorkScheduler {
251    num_workers: usize,
252    work_stealing_enabled: bool,
253}
254
255impl WorkScheduler {
256    /// Creates a new work scheduler
257    pub fn new(num_workers: usize) -> Self {
258        Self {
259            num_workers,
260            work_stealing_enabled: true,
261        }
262    }
263
264    /// Enables or disables work stealing
265    pub fn with_work_stealing(mut self, enabled: bool) -> Self {
266        self.work_stealing_enabled = enabled;
267        self
268    }
269
270    /// Distributes work items across workers optimally
271    pub fn distribute_work<T>(&self, items: Vec<T>) -> Vec<Vec<T>> {
272        let total_items = items.len();
273        if total_items == 0 {
274            return (0..self.num_workers).map(|_| Vec::new()).collect();
275        }
276
277        let base_chunk_size = total_items / self.num_workers;
278        let remainder = total_items % self.num_workers;
279
280        let mut distributed: Vec<Vec<T>> = Vec::with_capacity(self.num_workers);
281        let mut items_iter = items.into_iter();
282
283        for worker_id in 0..self.num_workers {
284            // Give extra item to first 'remainder' workers
285            let chunk_size = if worker_id < remainder {
286                base_chunk_size + 1
287            } else {
288                base_chunk_size
289            };
290
291            let chunk: Vec<T> = items_iter.by_ref().take(chunk_size).collect();
292            distributed.push(chunk);
293        }
294
295        distributed
296    }
297
298    /// Returns optimal batch size for given total items
299    pub fn optimal_batch_size(&self, total_items: usize) -> usize {
300        let min_batch = 100;
301        let max_batch = 10_000;
302
303        let calculated = (total_items / self.num_workers).max(min_batch);
304        calculated.min(max_batch)
305    }
306
307    /// Returns the number of workers
308    pub fn num_workers(&self) -> usize {
309        self.num_workers
310    }
311}
312
313impl Default for WorkScheduler {
314    fn default() -> Self {
315        Self::new(num_cpus::get())
316    }
317}
318
319/// Memory-mapped population storage for large datasets
320#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct PopulationMetadata {
322    /// Number of entities in the population
323    pub entity_count: usize,
324    /// Version of the data format
325    pub format_version: u32,
326    /// Creation timestamp
327    pub created_at: chrono::DateTime<chrono::Utc>,
328    /// Additional metadata
329    pub metadata: std::collections::HashMap<String, String>,
330}
331
332impl PopulationMetadata {
333    /// Creates new metadata
334    pub fn new(entity_count: usize) -> Self {
335        Self {
336            entity_count,
337            format_version: 1,
338            created_at: chrono::Utc::now(),
339            metadata: std::collections::HashMap::new(),
340        }
341    }
342
343    /// Adds metadata entry
344    pub fn with_metadata(mut self, key: String, value: String) -> Self {
345        self.metadata.insert(key, value);
346        self
347    }
348}
349
350/// Memory-mapped population manager
351pub struct MemoryMappedPopulation {
352    metadata: PopulationMetadata,
353    #[allow(dead_code)]
354    file_path: std::path::PathBuf,
355}
356
357impl MemoryMappedPopulation {
358    /// Creates a new memory-mapped population file
359    pub fn create(
360        path: impl Into<std::path::PathBuf>,
361        metadata: PopulationMetadata,
362    ) -> std::io::Result<Self> {
363        let file_path = path.into();
364
365        // Write metadata to file
366        let metadata_json = serde_json::to_string_pretty(&metadata)?;
367        std::fs::write(&file_path, metadata_json)?;
368
369        Ok(Self {
370            metadata,
371            file_path,
372        })
373    }
374
375    /// Opens an existing memory-mapped population file
376    pub fn open(path: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
377        let file_path = path.into();
378
379        // Read metadata from file
380        let metadata_json = std::fs::read_to_string(&file_path)?;
381        let metadata: PopulationMetadata = serde_json::from_str(&metadata_json)?;
382
383        Ok(Self {
384            metadata,
385            file_path,
386        })
387    }
388
389    /// Returns population metadata
390    pub fn metadata(&self) -> &PopulationMetadata {
391        &self.metadata
392    }
393
394    /// Returns the number of entities
395    pub fn entity_count(&self) -> usize {
396        self.metadata.entity_count
397    }
398}
399
400/// Parallel work executor with optimized distribution
401pub struct ParallelExecutor {
402    scheduler: WorkScheduler,
403}
404
405impl ParallelExecutor {
406    /// Creates a new parallel executor
407    pub fn new(num_workers: usize) -> Self {
408        Self {
409            scheduler: WorkScheduler::new(num_workers),
410        }
411    }
412
413    /// Executes work in parallel with optimal distribution
414    pub fn execute<T, F, R>(&self, items: Vec<T>, worker_fn: F) -> Vec<R>
415    where
416        T: Send,
417        F: Fn(T) -> R + Send + Sync + Clone,
418        R: Send,
419    {
420        use std::sync::Mutex;
421
422        let distributed = self.scheduler.distribute_work(items);
423        let results = Mutex::new(Vec::new());
424
425        std::thread::scope(|s| {
426            let handles: Vec<_> = distributed
427                .into_iter()
428                .map(|chunk| {
429                    let worker_fn = worker_fn.clone();
430                    let results_ref = &results;
431
432                    s.spawn(move || {
433                        let chunk_results: Vec<R> = chunk.into_iter().map(worker_fn).collect();
434                        results_ref.lock().unwrap().extend(chunk_results);
435                    })
436                })
437                .collect();
438
439            for handle in handles {
440                handle.join().unwrap();
441            }
442        });
443
444        results.into_inner().unwrap()
445    }
446
447    /// Returns the scheduler
448    pub fn scheduler(&self) -> &WorkScheduler {
449        &self.scheduler
450    }
451}
452
453impl Default for ParallelExecutor {
454    fn default() -> Self {
455        Self::new(num_cpus::get())
456    }
457}
458
459/// SIMD-accelerated batch operations for numeric data.
460///
461/// This processor uses SIMD (Single Instruction, Multiple Data) instructions
462/// to accelerate common numeric operations on large datasets. Operations process
463/// 4 f64 values simultaneously, providing significant performance improvements
464/// for large-scale simulations.
465///
466/// # Examples
467///
468/// ```
469/// use legalis_sim::SimdBatchProcessor;
470///
471/// // Compute statistics on a large dataset
472/// let data: Vec<f64> = (1..=1000).map(|x| x as f64).collect();
473///
474/// let sum = SimdBatchProcessor::sum_f64(&data);
475/// let mean = SimdBatchProcessor::mean_f64(&data).unwrap();
476/// let std_dev = SimdBatchProcessor::std_dev_f64(&data).unwrap();
477///
478/// assert_eq!(sum, 500500.0);
479/// assert!((mean - 500.5).abs() < 0.01);
480/// assert!(std_dev > 0.0);
481/// ```
482pub struct SimdBatchProcessor;
483
484impl SimdBatchProcessor {
485    /// Computes sum of f64 values using SIMD acceleration.
486    ///
487    /// Processes values in batches of 4 using SIMD instructions for improved performance.
488    ///
489    /// # Examples
490    ///
491    /// ```
492    /// use legalis_sim::SimdBatchProcessor;
493    ///
494    /// let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
495    /// let sum = SimdBatchProcessor::sum_f64(&values);
496    /// assert_eq!(sum, 15.0);
497    /// ```
498    pub fn sum_f64(values: &[f64]) -> f64 {
499        let chunks = values.chunks_exact(4);
500        let remainder = chunks.remainder();
501
502        // Process 4 values at a time using SIMD
503        let mut sum_vec = f64x4::splat(0.0);
504        for chunk in chunks {
505            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
506            sum_vec += vec;
507        }
508
509        // Sum the SIMD vector
510        let sum: f64 = sum_vec.reduce_add();
511
512        // Add remainder
513        sum + remainder.iter().sum::<f64>()
514    }
515
516    /// Computes mean of f64 values using SIMD acceleration.
517    ///
518    /// Returns `None` if the input slice is empty.
519    ///
520    /// # Examples
521    ///
522    /// ```
523    /// use legalis_sim::SimdBatchProcessor;
524    ///
525    /// let values = vec![2.0, 4.0, 6.0, 8.0, 10.0];
526    /// let mean = SimdBatchProcessor::mean_f64(&values).unwrap();
527    /// assert_eq!(mean, 6.0);
528    /// ```
529    pub fn mean_f64(values: &[f64]) -> Option<f64> {
530        if values.is_empty() {
531            return None;
532        }
533        Some(Self::sum_f64(values) / values.len() as f64)
534    }
535
536    /// Computes variance of f64 values using SIMD acceleration
537    pub fn variance_f64(values: &[f64]) -> Option<f64> {
538        if values.is_empty() {
539            return None;
540        }
541
542        let mean = Self::mean_f64(values)?;
543        let mean_vec = f64x4::splat(mean);
544
545        let chunks = values.chunks_exact(4);
546        let remainder = chunks.remainder();
547
548        // Compute sum of squared differences using SIMD
549        let mut sq_diff_sum = f64x4::splat(0.0);
550        for chunk in chunks {
551            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
552            let diff = vec - mean_vec;
553            sq_diff_sum += diff * diff;
554        }
555
556        let mut sum: f64 = sq_diff_sum.reduce_add();
557
558        // Add remainder
559        for &value in remainder {
560            let diff = value - mean;
561            sum += diff * diff;
562        }
563
564        Some(sum / values.len() as f64)
565    }
566
567    /// Computes standard deviation using SIMD acceleration
568    pub fn std_dev_f64(values: &[f64]) -> Option<f64> {
569        Self::variance_f64(values).map(|v| v.sqrt())
570    }
571
572    /// Computes minimum value using SIMD acceleration
573    pub fn min_f64(values: &[f64]) -> Option<f64> {
574        if values.is_empty() {
575            return None;
576        }
577
578        let chunks = values.chunks_exact(4);
579        let remainder = chunks.remainder();
580
581        let mut min_vec = f64x4::splat(f64::INFINITY);
582        for chunk in chunks {
583            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
584            min_vec = min_vec.min(vec);
585        }
586
587        // Extract minimum from SIMD vector
588        let arr = min_vec.to_array();
589        let mut min = arr[0].min(arr[1]).min(arr[2]).min(arr[3]);
590
591        // Check remainder
592        for &value in remainder {
593            if value < min {
594                min = value;
595            }
596        }
597
598        Some(min)
599    }
600
601    /// Computes maximum value using SIMD acceleration
602    pub fn max_f64(values: &[f64]) -> Option<f64> {
603        if values.is_empty() {
604            return None;
605        }
606
607        let chunks = values.chunks_exact(4);
608        let remainder = chunks.remainder();
609
610        let mut max_vec = f64x4::splat(f64::NEG_INFINITY);
611        for chunk in chunks {
612            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
613            max_vec = max_vec.max(vec);
614        }
615
616        // Extract maximum from SIMD vector
617        let arr = max_vec.to_array();
618        let mut max = arr[0].max(arr[1]).max(arr[2]).max(arr[3]);
619
620        // Check remainder
621        for &value in remainder {
622            if value > max {
623                max = value;
624            }
625        }
626
627        Some(max)
628    }
629
630    /// Applies scalar multiplication using SIMD acceleration
631    pub fn scale_f64(values: &mut [f64], scalar: f64) {
632        let scalar_vec = f64x4::splat(scalar);
633
634        let (chunks, remainder) = values.split_at_mut(values.len() - values.len() % 4);
635
636        // Process 4 values at a time
637        for chunk in chunks.chunks_exact_mut(4) {
638            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
639            let result = vec * scalar_vec;
640            let arr = result.to_array();
641            chunk.copy_from_slice(&arr);
642        }
643
644        // Process remainder
645        for value in remainder {
646            *value *= scalar;
647        }
648    }
649
650    /// Computes dot product of two vectors using SIMD acceleration
651    pub fn dot_product_f64(a: &[f64], b: &[f64]) -> Option<f64> {
652        if a.len() != b.len() {
653            return None;
654        }
655
656        let chunks_a = a.chunks_exact(4);
657        let chunks_b = b.chunks_exact(4);
658        let remainder_a = chunks_a.remainder();
659        let remainder_b = chunks_b.remainder();
660
661        let mut dot_vec = f64x4::splat(0.0);
662        for (chunk_a, chunk_b) in chunks_a.zip(chunks_b) {
663            let vec_a = f64x4::from([chunk_a[0], chunk_a[1], chunk_a[2], chunk_a[3]]);
664            let vec_b = f64x4::from([chunk_b[0], chunk_b[1], chunk_b[2], chunk_b[3]]);
665            dot_vec += vec_a * vec_b;
666        }
667
668        let mut dot: f64 = dot_vec.reduce_add();
669
670        // Add remainder
671        for (val_a, val_b) in remainder_a.iter().zip(remainder_b) {
672            dot += val_a * val_b;
673        }
674
675        Some(dot)
676    }
677
678    /// Normalizes values to [0, 1] range using SIMD acceleration
679    pub fn normalize_f64(values: &mut [f64]) -> Option<()> {
680        let min = Self::min_f64(values)?;
681        let max = Self::max_f64(values)?;
682
683        if (max - min).abs() < f64::EPSILON {
684            return None; // Avoid division by zero
685        }
686
687        let range = max - min;
688        let min_vec = f64x4::splat(min);
689        let range_vec = f64x4::splat(range);
690
691        let (chunks, remainder) = values.split_at_mut(values.len() - values.len() % 4);
692
693        // Process 4 values at a time
694        for chunk in chunks.chunks_exact_mut(4) {
695            let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
696            let result = (vec - min_vec) / range_vec;
697            let arr = result.to_array();
698            chunk.copy_from_slice(&arr);
699        }
700
701        // Process remainder
702        for value in remainder {
703            *value = (*value - min) / range;
704        }
705
706        Some(())
707    }
708}
709
710/// Distributed simulation coordinator for multi-node execution.
711///
712/// Enables simulation workloads to be distributed across multiple compute nodes
713/// for large-scale simulations that exceed single-node capacity.
714///
715/// # Examples
716///
717/// ```
718/// use legalis_sim::DistributedConfig;
719///
720/// // Configure a node in a 4-node cluster
721/// let config = DistributedConfig::new(
722///     "0".to_string(),
723///     4,
724///     "coordinator.example.com".to_string(),
725///     8080
726/// );
727///
728/// // Determine which portion of 1000 items this node should process
729/// let (start, end) = config.partition_range(1000);
730/// assert_eq!(start, 0);
731/// assert_eq!(end, 250); // First node processes items 0-249
732/// ```
733#[derive(Debug, Clone)]
734pub struct DistributedConfig {
735    /// Node ID in the cluster
736    pub node_id: String,
737    /// Total number of nodes
738    pub num_nodes: usize,
739    /// Coordinator address
740    pub coordinator_addr: String,
741    /// Port for node communication
742    pub port: u16,
743}
744
745impl DistributedConfig {
746    /// Creates a new distributed configuration
747    pub fn new(node_id: String, num_nodes: usize, coordinator_addr: String, port: u16) -> Self {
748        Self {
749            node_id,
750            num_nodes,
751            coordinator_addr,
752            port,
753        }
754    }
755
756    /// Returns the partition range for this node
757    pub fn partition_range(&self, total_items: usize) -> (usize, usize) {
758        let node_index = self.node_id.parse::<usize>().unwrap_or(0);
759        let items_per_node = total_items / self.num_nodes;
760        let remainder = total_items % self.num_nodes;
761
762        let start = node_index * items_per_node + node_index.min(remainder);
763        let extra = if node_index < remainder { 1 } else { 0 };
764        let end = start + items_per_node + extra;
765
766        (start, end)
767    }
768}
769
770/// Distributed simulation node
771pub struct DistributedNode {
772    config: DistributedConfig,
773    #[allow(dead_code)]
774    runtime: tokio::runtime::Runtime,
775}
776
777impl DistributedNode {
778    /// Creates a new distributed node
779    pub fn new(config: DistributedConfig) -> Self {
780        let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
781
782        Self { config, runtime }
783    }
784
785    /// Returns the node configuration
786    pub fn config(&self) -> &DistributedConfig {
787        &self.config
788    }
789
790    /// Partitions work for this node
791    pub fn partition_work<T>(&self, items: Vec<T>) -> Vec<T> {
792        let total = items.len();
793        let (start, end) = self.config.partition_range(total);
794
795        items.into_iter().skip(start).take(end - start).collect()
796    }
797
798    /// Executes work on this node
799    pub fn execute_local<T, F, R>(&self, items: Vec<T>, worker_fn: F) -> Vec<R>
800    where
801        T: Send,
802        F: Fn(T) -> R + Send + Sync + Clone,
803        R: Send,
804    {
805        let executor = ParallelExecutor::default();
806        executor.execute(items, worker_fn)
807    }
808}
809
810/// Distributed simulation coordinator.
811///
812/// Manages work distribution across multiple simulation nodes and aggregates results.
813///
814/// # Examples
815///
816/// ```
817/// use legalis_sim::DistributedCoordinator;
818///
819/// // Create coordinator for 3-node cluster
820/// let coordinator = DistributedCoordinator::new(3);
821///
822/// // Distribute 100 items across nodes
823/// let items: Vec<i32> = (0..100).collect();
824/// let distributed = coordinator.distribute_work(items);
825///
826/// assert_eq!(distributed.len(), 3);
827/// assert_eq!(distributed[0].len(), 34); // First node gets extra item
828/// assert_eq!(distributed[1].len(), 33);
829/// assert_eq!(distributed[2].len(), 33);
830/// ```
831pub struct DistributedCoordinator {
832    num_nodes: usize,
833    nodes: Vec<String>,
834}
835
836impl DistributedCoordinator {
837    /// Creates a new distributed coordinator
838    pub fn new(num_nodes: usize) -> Self {
839        let nodes = (0..num_nodes).map(|i| format!("node-{}", i)).collect();
840
841        Self { num_nodes, nodes }
842    }
843
844    /// Returns the number of nodes
845    pub fn num_nodes(&self) -> usize {
846        self.num_nodes
847    }
848
849    /// Returns the list of nodes
850    pub fn nodes(&self) -> &[String] {
851        &self.nodes
852    }
853
854    /// Distributes work across nodes
855    pub fn distribute_work<T>(&self, items: Vec<T>) -> Vec<Vec<T>> {
856        let total_items = items.len();
857        let items_per_node = total_items / self.num_nodes;
858        let remainder = total_items % self.num_nodes;
859
860        let mut distributed: Vec<Vec<T>> = Vec::with_capacity(self.num_nodes);
861        let mut items_iter = items.into_iter();
862
863        for node_id in 0..self.num_nodes {
864            let node_size = if node_id < remainder {
865                items_per_node + 1
866            } else {
867                items_per_node
868            };
869
870            let chunk: Vec<T> = items_iter.by_ref().take(node_size).collect();
871            distributed.push(chunk);
872        }
873
874        distributed
875    }
876
877    /// Aggregates results from all nodes
878    pub fn aggregate_results<T>(&self, node_results: Vec<Vec<T>>) -> Vec<T> {
879        node_results.into_iter().flatten().collect()
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use legalis_core::BasicEntity;
887
888    #[test]
889    fn test_batch_iterator() {
890        let items: Vec<i32> = (0..100).collect();
891        let mut batch_iter = BatchIterator::new(items, 25);
892
893        assert_eq!(batch_iter.batch_count(), 4);
894
895        let batch1 = batch_iter.next().unwrap();
896        assert_eq!(batch1.len(), 25);
897
898        let batch2 = batch_iter.next().unwrap();
899        assert_eq!(batch2.len(), 25);
900
901        let batch3 = batch_iter.next().unwrap();
902        assert_eq!(batch3.len(), 25);
903
904        let batch4 = batch_iter.next().unwrap();
905        assert_eq!(batch4.len(), 25);
906
907        assert!(batch_iter.next().is_none());
908    }
909
910    #[test]
911    fn test_batch_config() {
912        let config = BatchConfig::default()
913            .with_batch_size(500)
914            .with_workers(4)
915            .with_streaming(true);
916
917        assert_eq!(config.batch_size, 500);
918        assert_eq!(config.num_workers, 4);
919        assert!(config.streaming_mode);
920    }
921
922    #[test]
923    fn test_entity_pool() {
924        let mut pool = EntityPool::new(10);
925
926        let entity1 = pool.acquire(BasicEntity::new);
927        let id1 = entity1.id();
928
929        pool.release(entity1);
930        assert_eq!(pool.size(), 1);
931
932        let entity2 = pool.acquire(BasicEntity::new);
933        assert_eq!(entity2.id(), id1); // Should reuse the same entity
934
935        pool.clear();
936        assert_eq!(pool.size(), 0);
937    }
938
939    #[test]
940    fn test_streaming_processor() {
941        let processor = StreamingProcessor::new(10);
942
943        let entities: Vec<Arc<dyn LegalEntity>> = (0..50)
944            .map(|_| Arc::new(BasicEntity::new()) as Arc<dyn LegalEntity>)
945            .collect();
946
947        let results = processor.process(entities, |entity| entity.id());
948        assert_eq!(results.len(), 50);
949    }
950
951    #[test]
952    fn test_lazy_attribute_cache() {
953        let mut cache = LazyAttributeCache::new();
954
955        let value1 = cache.get_or_compute("test", || "computed".to_string());
956        assert_eq!(value1, "computed");
957        assert!(cache.is_dirty());
958        assert_eq!(cache.size(), 1);
959
960        cache.mark_clean();
961        assert!(!cache.is_dirty());
962
963        let value2 = cache.get_or_compute("test", || "should_not_compute".to_string());
964        assert_eq!(value2, "computed"); // Should use cached value
965        assert!(!cache.is_dirty());
966
967        cache.clear();
968        assert_eq!(cache.size(), 0);
969    }
970
971    #[test]
972    fn test_work_scheduler_distribution() {
973        let scheduler = WorkScheduler::new(4);
974        let items: Vec<i32> = (0..100).collect();
975
976        let distributed = scheduler.distribute_work(items);
977
978        assert_eq!(distributed.len(), 4);
979
980        // Each worker should get 25 items (100 / 4)
981        for chunk in &distributed {
982            assert_eq!(chunk.len(), 25);
983        }
984
985        // Verify all items are distributed
986        let total: usize = distributed.iter().map(|c| c.len()).sum();
987        assert_eq!(total, 100);
988    }
989
990    #[test]
991    fn test_work_scheduler_uneven_distribution() {
992        let scheduler = WorkScheduler::new(3);
993        let items: Vec<i32> = (0..10).collect();
994
995        let distributed = scheduler.distribute_work(items);
996
997        assert_eq!(distributed.len(), 3);
998
999        // First worker gets 4 items (10 / 3 = 3 remainder 1)
1000        assert_eq!(distributed[0].len(), 4);
1001        // Other workers get 3 items each
1002        assert_eq!(distributed[1].len(), 3);
1003        assert_eq!(distributed[2].len(), 3);
1004
1005        let total: usize = distributed.iter().map(|c| c.len()).sum();
1006        assert_eq!(total, 10);
1007    }
1008
1009    #[test]
1010    fn test_work_scheduler_optimal_batch_size() {
1011        let scheduler = WorkScheduler::new(4);
1012
1013        // Small dataset
1014        assert_eq!(scheduler.optimal_batch_size(200), 100);
1015
1016        // Medium dataset
1017        assert_eq!(scheduler.optimal_batch_size(40_000), 10_000);
1018
1019        // Large dataset - should cap at max
1020        assert_eq!(scheduler.optimal_batch_size(1_000_000), 10_000);
1021    }
1022
1023    #[test]
1024    fn test_parallel_executor() {
1025        let executor = ParallelExecutor::new(4);
1026        let items: Vec<i32> = (0..100).collect();
1027
1028        let results = executor.execute(items, |x| x * 2);
1029
1030        assert_eq!(results.len(), 100);
1031        // Results might not be in order due to parallel execution
1032        let mut sorted_results = results;
1033        sorted_results.sort();
1034
1035        for (i, &value) in sorted_results.iter().enumerate() {
1036            assert_eq!(value, (i as i32) * 2);
1037        }
1038    }
1039
1040    #[test]
1041    fn test_memory_mapped_metadata() {
1042        let metadata =
1043            PopulationMetadata::new(1000).with_metadata("source".to_string(), "test".to_string());
1044
1045        assert_eq!(metadata.entity_count, 1000);
1046        assert_eq!(metadata.format_version, 1);
1047        assert_eq!(metadata.metadata.get("source").unwrap(), "test");
1048    }
1049
1050    #[test]
1051    fn test_memory_mapped_population_create_and_open() {
1052        let temp_dir = std::env::temp_dir();
1053        let file_path = temp_dir.join("test_population.json");
1054
1055        // Clean up if exists
1056        let _ = std::fs::remove_file(&file_path);
1057
1058        // Create new population file
1059        let metadata = PopulationMetadata::new(500);
1060        let pop = MemoryMappedPopulation::create(&file_path, metadata).unwrap();
1061        assert_eq!(pop.entity_count(), 500);
1062
1063        // Open existing population file
1064        let pop_opened = MemoryMappedPopulation::open(&file_path).unwrap();
1065        assert_eq!(pop_opened.entity_count(), 500);
1066        assert_eq!(pop_opened.metadata().format_version, 1);
1067
1068        // Clean up
1069        std::fs::remove_file(&file_path).unwrap();
1070    }
1071
1072    // SIMD tests
1073    #[test]
1074    fn test_simd_sum() {
1075        let values: Vec<f64> = (1..=100).map(|x| x as f64).collect();
1076        let sum = SimdBatchProcessor::sum_f64(&values);
1077        let expected: f64 = (1..=100).sum::<i32>() as f64;
1078        assert!((sum - expected).abs() < 1e-10);
1079    }
1080
1081    #[test]
1082    fn test_simd_mean() {
1083        let values: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1084        let mean = SimdBatchProcessor::mean_f64(&values).unwrap();
1085        assert!((mean - 3.0).abs() < 1e-10);
1086    }
1087
1088    #[test]
1089    fn test_simd_variance() {
1090        let values: Vec<f64> = vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
1091        let variance = SimdBatchProcessor::variance_f64(&values).unwrap();
1092        // Expected variance = 4.0
1093        assert!((variance - 4.0).abs() < 1e-10);
1094    }
1095
1096    #[test]
1097    fn test_simd_std_dev() {
1098        let values: Vec<f64> = vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
1099        let std_dev = SimdBatchProcessor::std_dev_f64(&values).unwrap();
1100        // Expected std_dev = 2.0
1101        assert!((std_dev - 2.0).abs() < 1e-10);
1102    }
1103
1104    #[test]
1105    fn test_simd_min_max() {
1106        let values: Vec<f64> = vec![5.0, 2.0, 8.0, 1.0, 9.0, 3.0];
1107        let min = SimdBatchProcessor::min_f64(&values).unwrap();
1108        let max = SimdBatchProcessor::max_f64(&values).unwrap();
1109        assert_eq!(min, 1.0);
1110        assert_eq!(max, 9.0);
1111    }
1112
1113    #[test]
1114    fn test_simd_scale() {
1115        let mut values: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1116        SimdBatchProcessor::scale_f64(&mut values, 2.0);
1117        assert_eq!(values, vec![2.0, 4.0, 6.0, 8.0, 10.0]);
1118    }
1119
1120    #[test]
1121    fn test_simd_dot_product() {
1122        let a: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0];
1123        let b: Vec<f64> = vec![5.0, 6.0, 7.0, 8.0];
1124        let dot = SimdBatchProcessor::dot_product_f64(&a, &b).unwrap();
1125        // 1*5 + 2*6 + 3*7 + 4*8 = 5 + 12 + 21 + 32 = 70
1126        assert_eq!(dot, 70.0);
1127    }
1128
1129    #[test]
1130    fn test_simd_normalize() {
1131        let mut values: Vec<f64> = vec![0.0, 5.0, 10.0];
1132        SimdBatchProcessor::normalize_f64(&mut values).unwrap();
1133        assert_eq!(values[0], 0.0);
1134        assert_eq!(values[1], 0.5);
1135        assert_eq!(values[2], 1.0);
1136    }
1137
1138    #[test]
1139    fn test_simd_large_dataset() {
1140        let values: Vec<f64> = (1..=10000).map(|x| x as f64).collect();
1141        let sum = SimdBatchProcessor::sum_f64(&values);
1142        let expected: f64 = (1..=10000).sum::<i32>() as f64;
1143        assert!((sum - expected).abs() < 1e-6);
1144    }
1145
1146    // Distributed simulation tests
1147    #[test]
1148    fn test_distributed_config() {
1149        let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1150
1151        assert_eq!(config.node_id, "0");
1152        assert_eq!(config.num_nodes, 4);
1153        assert_eq!(config.coordinator_addr, "localhost");
1154        assert_eq!(config.port, 8080);
1155    }
1156
1157    #[test]
1158    fn test_distributed_partition_range() {
1159        let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1160
1161        let (start, end) = config.partition_range(100);
1162        assert_eq!(start, 0);
1163        assert_eq!(end, 25);
1164    }
1165
1166    #[test]
1167    fn test_distributed_node() {
1168        let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1169
1170        let node = DistributedNode::new(config);
1171        assert_eq!(node.config().node_id, "0");
1172        assert_eq!(node.config().num_nodes, 4);
1173    }
1174
1175    #[test]
1176    fn test_distributed_partition_work() {
1177        let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1178
1179        let node = DistributedNode::new(config);
1180        let items: Vec<i32> = (0..100).collect();
1181        let partitioned = node.partition_work(items);
1182
1183        // Node 0 should get first 25 items
1184        assert_eq!(partitioned.len(), 25);
1185        assert_eq!(partitioned[0], 0);
1186        assert_eq!(partitioned[24], 24);
1187    }
1188
1189    #[test]
1190    fn test_distributed_execute_local() {
1191        let config = DistributedConfig::new("0".to_string(), 2, "localhost".to_string(), 8080);
1192
1193        let node = DistributedNode::new(config);
1194        let items: Vec<i32> = vec![1, 2, 3, 4, 5];
1195        let results = node.execute_local(items, |x| x * 2);
1196
1197        assert_eq!(results.len(), 5);
1198        let mut sorted = results;
1199        sorted.sort();
1200        assert_eq!(sorted, vec![2, 4, 6, 8, 10]);
1201    }
1202
1203    #[test]
1204    fn test_distributed_coordinator() {
1205        let coordinator = DistributedCoordinator::new(3);
1206        assert_eq!(coordinator.num_nodes(), 3);
1207        assert_eq!(coordinator.nodes().len(), 3);
1208        assert_eq!(coordinator.nodes()[0], "node-0");
1209        assert_eq!(coordinator.nodes()[1], "node-1");
1210        assert_eq!(coordinator.nodes()[2], "node-2");
1211    }
1212
1213    #[test]
1214    fn test_distributed_distribute_work() {
1215        let coordinator = DistributedCoordinator::new(3);
1216        let items: Vec<i32> = (0..10).collect();
1217        let distributed = coordinator.distribute_work(items);
1218
1219        assert_eq!(distributed.len(), 3);
1220        // 10 items / 3 nodes = 3 per node + 1 remainder
1221        assert_eq!(distributed[0].len(), 4); // First node gets extra
1222        assert_eq!(distributed[1].len(), 3);
1223        assert_eq!(distributed[2].len(), 3);
1224
1225        // Verify all items are distributed
1226        let total: usize = distributed.iter().map(|v| v.len()).sum();
1227        assert_eq!(total, 10);
1228    }
1229
1230    #[test]
1231    fn test_distributed_aggregate_results() {
1232        let coordinator = DistributedCoordinator::new(3);
1233        let node_results = vec![vec![1, 2, 3], vec![4, 5], vec![6, 7, 8, 9]];
1234
1235        let aggregated = coordinator.aggregate_results(node_results);
1236        assert_eq!(aggregated.len(), 9);
1237        assert_eq!(aggregated, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
1238    }
1239}