1use legalis_core::LegalEntity;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use wide::f64x4;
17
18#[derive(Debug, Clone)]
20pub struct BatchConfig {
21 pub batch_size: usize,
23 pub num_workers: usize,
25 pub streaming_mode: bool,
27}
28
29impl Default for BatchConfig {
30 fn default() -> Self {
31 Self {
32 batch_size: 1000,
33 num_workers: num_cpus::get(),
34 streaming_mode: false,
35 }
36 }
37}
38
39impl BatchConfig {
40 pub fn with_batch_size(mut self, size: usize) -> Self {
42 self.batch_size = size;
43 self
44 }
45
46 pub fn with_workers(mut self, workers: usize) -> Self {
48 self.num_workers = workers;
49 self
50 }
51
52 pub fn with_streaming(mut self, enabled: bool) -> Self {
54 self.streaming_mode = enabled;
55 self
56 }
57}
58
59pub struct BatchIterator<T> {
61 items: Vec<T>,
62 batch_size: usize,
63 current_index: usize,
64}
65
66impl<T> BatchIterator<T> {
67 pub fn new(items: Vec<T>, batch_size: usize) -> Self {
69 Self {
70 items,
71 batch_size,
72 current_index: 0,
73 }
74 }
75
76 pub fn batch_count(&self) -> usize {
78 self.items.len().div_ceil(self.batch_size)
79 }
80}
81
82impl<T> Iterator for BatchIterator<T> {
83 type Item = Vec<T>;
84
85 fn next(&mut self) -> Option<Self::Item> {
86 if self.current_index >= self.items.len() {
87 return None;
88 }
89
90 let end_index = (self.current_index + self.batch_size).min(self.items.len());
91 let batch: Vec<T> = self.items.drain(self.current_index..end_index).collect();
92
93 Some(batch)
95 }
96}
97
98pub struct EntityPool<T: LegalEntity + Clone> {
100 pool: Vec<T>,
101 max_size: usize,
102}
103
104impl<T: LegalEntity + Clone> EntityPool<T> {
105 pub fn new(max_size: usize) -> Self {
107 Self {
108 pool: Vec::with_capacity(max_size),
109 max_size,
110 }
111 }
112
113 pub fn acquire(&mut self, create_fn: impl FnOnce() -> T) -> T {
115 self.pool.pop().unwrap_or_else(create_fn)
116 }
117
118 pub fn release(&mut self, entity: T) {
120 if self.pool.len() < self.max_size {
121 self.pool.push(entity);
122 }
123 }
124
125 pub fn size(&self) -> usize {
127 self.pool.len()
128 }
129
130 pub fn clear(&mut self) {
132 self.pool.clear();
133 }
134}
135
136pub struct StreamingProcessor {
138 buffer_size: usize,
139}
140
141impl StreamingProcessor {
142 pub fn new(buffer_size: usize) -> Self {
144 Self { buffer_size }
145 }
146
147 pub fn process<F, R>(&self, entities: Vec<Arc<dyn LegalEntity>>, mut processor: F) -> Vec<R>
149 where
150 F: FnMut(&dyn LegalEntity) -> R,
151 {
152 let mut results = Vec::with_capacity(entities.len());
153
154 for chunk in entities.chunks(self.buffer_size) {
155 for entity in chunk {
156 results.push(processor(entity.as_ref()));
157 }
158 }
159
160 results
161 }
162
163 pub fn process_parallel<F, R>(
165 &self,
166 entities: Vec<Arc<dyn LegalEntity>>,
167 processor: F,
168 ) -> Vec<R>
169 where
170 F: Fn(&dyn LegalEntity) -> R + Send + Sync,
171 R: Send,
172 {
173 use std::sync::Mutex;
174
175 let results = Mutex::new(Vec::with_capacity(entities.len()));
176
177 for chunk in entities.chunks(self.buffer_size) {
179 let chunk_results: Vec<R> = chunk
180 .iter()
181 .map(|entity| processor(entity.as_ref()))
182 .collect();
183
184 results.lock().unwrap().extend(chunk_results);
185 }
186
187 results.into_inner().unwrap()
188 }
189}
190
191pub struct LazyAttributeCache {
193 cache: std::collections::HashMap<String, String>,
194 dirty: bool,
195}
196
197impl LazyAttributeCache {
198 pub fn new() -> Self {
200 Self {
201 cache: std::collections::HashMap::new(),
202 dirty: false,
203 }
204 }
205
206 pub fn get_or_compute<F>(&mut self, key: &str, compute_fn: F) -> String
208 where
209 F: FnOnce() -> String,
210 {
211 if !self.cache.contains_key(key) {
212 let value = compute_fn();
213 self.cache.insert(key.to_string(), value.clone());
214 self.dirty = true;
215 value
216 } else {
217 self.cache.get(key).unwrap().clone()
218 }
219 }
220
221 pub fn is_dirty(&self) -> bool {
223 self.dirty
224 }
225
226 pub fn mark_clean(&mut self) {
228 self.dirty = false;
229 }
230
231 pub fn clear(&mut self) {
233 self.cache.clear();
234 self.dirty = false;
235 }
236
237 pub fn size(&self) -> usize {
239 self.cache.len()
240 }
241}
242
243impl Default for LazyAttributeCache {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249pub struct WorkScheduler {
251 num_workers: usize,
252 work_stealing_enabled: bool,
253}
254
255impl WorkScheduler {
256 pub fn new(num_workers: usize) -> Self {
258 Self {
259 num_workers,
260 work_stealing_enabled: true,
261 }
262 }
263
264 pub fn with_work_stealing(mut self, enabled: bool) -> Self {
266 self.work_stealing_enabled = enabled;
267 self
268 }
269
270 pub fn distribute_work<T>(&self, items: Vec<T>) -> Vec<Vec<T>> {
272 let total_items = items.len();
273 if total_items == 0 {
274 return (0..self.num_workers).map(|_| Vec::new()).collect();
275 }
276
277 let base_chunk_size = total_items / self.num_workers;
278 let remainder = total_items % self.num_workers;
279
280 let mut distributed: Vec<Vec<T>> = Vec::with_capacity(self.num_workers);
281 let mut items_iter = items.into_iter();
282
283 for worker_id in 0..self.num_workers {
284 let chunk_size = if worker_id < remainder {
286 base_chunk_size + 1
287 } else {
288 base_chunk_size
289 };
290
291 let chunk: Vec<T> = items_iter.by_ref().take(chunk_size).collect();
292 distributed.push(chunk);
293 }
294
295 distributed
296 }
297
298 pub fn optimal_batch_size(&self, total_items: usize) -> usize {
300 let min_batch = 100;
301 let max_batch = 10_000;
302
303 let calculated = (total_items / self.num_workers).max(min_batch);
304 calculated.min(max_batch)
305 }
306
307 pub fn num_workers(&self) -> usize {
309 self.num_workers
310 }
311}
312
313impl Default for WorkScheduler {
314 fn default() -> Self {
315 Self::new(num_cpus::get())
316 }
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize)]
321pub struct PopulationMetadata {
322 pub entity_count: usize,
324 pub format_version: u32,
326 pub created_at: chrono::DateTime<chrono::Utc>,
328 pub metadata: std::collections::HashMap<String, String>,
330}
331
332impl PopulationMetadata {
333 pub fn new(entity_count: usize) -> Self {
335 Self {
336 entity_count,
337 format_version: 1,
338 created_at: chrono::Utc::now(),
339 metadata: std::collections::HashMap::new(),
340 }
341 }
342
343 pub fn with_metadata(mut self, key: String, value: String) -> Self {
345 self.metadata.insert(key, value);
346 self
347 }
348}
349
350pub struct MemoryMappedPopulation {
352 metadata: PopulationMetadata,
353 #[allow(dead_code)]
354 file_path: std::path::PathBuf,
355}
356
357impl MemoryMappedPopulation {
358 pub fn create(
360 path: impl Into<std::path::PathBuf>,
361 metadata: PopulationMetadata,
362 ) -> std::io::Result<Self> {
363 let file_path = path.into();
364
365 let metadata_json = serde_json::to_string_pretty(&metadata)?;
367 std::fs::write(&file_path, metadata_json)?;
368
369 Ok(Self {
370 metadata,
371 file_path,
372 })
373 }
374
375 pub fn open(path: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
377 let file_path = path.into();
378
379 let metadata_json = std::fs::read_to_string(&file_path)?;
381 let metadata: PopulationMetadata = serde_json::from_str(&metadata_json)?;
382
383 Ok(Self {
384 metadata,
385 file_path,
386 })
387 }
388
389 pub fn metadata(&self) -> &PopulationMetadata {
391 &self.metadata
392 }
393
394 pub fn entity_count(&self) -> usize {
396 self.metadata.entity_count
397 }
398}
399
400pub struct ParallelExecutor {
402 scheduler: WorkScheduler,
403}
404
405impl ParallelExecutor {
406 pub fn new(num_workers: usize) -> Self {
408 Self {
409 scheduler: WorkScheduler::new(num_workers),
410 }
411 }
412
413 pub fn execute<T, F, R>(&self, items: Vec<T>, worker_fn: F) -> Vec<R>
415 where
416 T: Send,
417 F: Fn(T) -> R + Send + Sync + Clone,
418 R: Send,
419 {
420 use std::sync::Mutex;
421
422 let distributed = self.scheduler.distribute_work(items);
423 let results = Mutex::new(Vec::new());
424
425 std::thread::scope(|s| {
426 let handles: Vec<_> = distributed
427 .into_iter()
428 .map(|chunk| {
429 let worker_fn = worker_fn.clone();
430 let results_ref = &results;
431
432 s.spawn(move || {
433 let chunk_results: Vec<R> = chunk.into_iter().map(worker_fn).collect();
434 results_ref.lock().unwrap().extend(chunk_results);
435 })
436 })
437 .collect();
438
439 for handle in handles {
440 handle.join().unwrap();
441 }
442 });
443
444 results.into_inner().unwrap()
445 }
446
447 pub fn scheduler(&self) -> &WorkScheduler {
449 &self.scheduler
450 }
451}
452
453impl Default for ParallelExecutor {
454 fn default() -> Self {
455 Self::new(num_cpus::get())
456 }
457}
458
459pub struct SimdBatchProcessor;
483
484impl SimdBatchProcessor {
485 pub fn sum_f64(values: &[f64]) -> f64 {
499 let chunks = values.chunks_exact(4);
500 let remainder = chunks.remainder();
501
502 let mut sum_vec = f64x4::splat(0.0);
504 for chunk in chunks {
505 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
506 sum_vec += vec;
507 }
508
509 let sum: f64 = sum_vec.reduce_add();
511
512 sum + remainder.iter().sum::<f64>()
514 }
515
516 pub fn mean_f64(values: &[f64]) -> Option<f64> {
530 if values.is_empty() {
531 return None;
532 }
533 Some(Self::sum_f64(values) / values.len() as f64)
534 }
535
536 pub fn variance_f64(values: &[f64]) -> Option<f64> {
538 if values.is_empty() {
539 return None;
540 }
541
542 let mean = Self::mean_f64(values)?;
543 let mean_vec = f64x4::splat(mean);
544
545 let chunks = values.chunks_exact(4);
546 let remainder = chunks.remainder();
547
548 let mut sq_diff_sum = f64x4::splat(0.0);
550 for chunk in chunks {
551 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
552 let diff = vec - mean_vec;
553 sq_diff_sum += diff * diff;
554 }
555
556 let mut sum: f64 = sq_diff_sum.reduce_add();
557
558 for &value in remainder {
560 let diff = value - mean;
561 sum += diff * diff;
562 }
563
564 Some(sum / values.len() as f64)
565 }
566
567 pub fn std_dev_f64(values: &[f64]) -> Option<f64> {
569 Self::variance_f64(values).map(|v| v.sqrt())
570 }
571
572 pub fn min_f64(values: &[f64]) -> Option<f64> {
574 if values.is_empty() {
575 return None;
576 }
577
578 let chunks = values.chunks_exact(4);
579 let remainder = chunks.remainder();
580
581 let mut min_vec = f64x4::splat(f64::INFINITY);
582 for chunk in chunks {
583 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
584 min_vec = min_vec.min(vec);
585 }
586
587 let arr = min_vec.to_array();
589 let mut min = arr[0].min(arr[1]).min(arr[2]).min(arr[3]);
590
591 for &value in remainder {
593 if value < min {
594 min = value;
595 }
596 }
597
598 Some(min)
599 }
600
601 pub fn max_f64(values: &[f64]) -> Option<f64> {
603 if values.is_empty() {
604 return None;
605 }
606
607 let chunks = values.chunks_exact(4);
608 let remainder = chunks.remainder();
609
610 let mut max_vec = f64x4::splat(f64::NEG_INFINITY);
611 for chunk in chunks {
612 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
613 max_vec = max_vec.max(vec);
614 }
615
616 let arr = max_vec.to_array();
618 let mut max = arr[0].max(arr[1]).max(arr[2]).max(arr[3]);
619
620 for &value in remainder {
622 if value > max {
623 max = value;
624 }
625 }
626
627 Some(max)
628 }
629
630 pub fn scale_f64(values: &mut [f64], scalar: f64) {
632 let scalar_vec = f64x4::splat(scalar);
633
634 let (chunks, remainder) = values.split_at_mut(values.len() - values.len() % 4);
635
636 for chunk in chunks.chunks_exact_mut(4) {
638 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
639 let result = vec * scalar_vec;
640 let arr = result.to_array();
641 chunk.copy_from_slice(&arr);
642 }
643
644 for value in remainder {
646 *value *= scalar;
647 }
648 }
649
650 pub fn dot_product_f64(a: &[f64], b: &[f64]) -> Option<f64> {
652 if a.len() != b.len() {
653 return None;
654 }
655
656 let chunks_a = a.chunks_exact(4);
657 let chunks_b = b.chunks_exact(4);
658 let remainder_a = chunks_a.remainder();
659 let remainder_b = chunks_b.remainder();
660
661 let mut dot_vec = f64x4::splat(0.0);
662 for (chunk_a, chunk_b) in chunks_a.zip(chunks_b) {
663 let vec_a = f64x4::from([chunk_a[0], chunk_a[1], chunk_a[2], chunk_a[3]]);
664 let vec_b = f64x4::from([chunk_b[0], chunk_b[1], chunk_b[2], chunk_b[3]]);
665 dot_vec += vec_a * vec_b;
666 }
667
668 let mut dot: f64 = dot_vec.reduce_add();
669
670 for (val_a, val_b) in remainder_a.iter().zip(remainder_b) {
672 dot += val_a * val_b;
673 }
674
675 Some(dot)
676 }
677
678 pub fn normalize_f64(values: &mut [f64]) -> Option<()> {
680 let min = Self::min_f64(values)?;
681 let max = Self::max_f64(values)?;
682
683 if (max - min).abs() < f64::EPSILON {
684 return None; }
686
687 let range = max - min;
688 let min_vec = f64x4::splat(min);
689 let range_vec = f64x4::splat(range);
690
691 let (chunks, remainder) = values.split_at_mut(values.len() - values.len() % 4);
692
693 for chunk in chunks.chunks_exact_mut(4) {
695 let vec = f64x4::from([chunk[0], chunk[1], chunk[2], chunk[3]]);
696 let result = (vec - min_vec) / range_vec;
697 let arr = result.to_array();
698 chunk.copy_from_slice(&arr);
699 }
700
701 for value in remainder {
703 *value = (*value - min) / range;
704 }
705
706 Some(())
707 }
708}
709
710#[derive(Debug, Clone)]
734pub struct DistributedConfig {
735 pub node_id: String,
737 pub num_nodes: usize,
739 pub coordinator_addr: String,
741 pub port: u16,
743}
744
745impl DistributedConfig {
746 pub fn new(node_id: String, num_nodes: usize, coordinator_addr: String, port: u16) -> Self {
748 Self {
749 node_id,
750 num_nodes,
751 coordinator_addr,
752 port,
753 }
754 }
755
756 pub fn partition_range(&self, total_items: usize) -> (usize, usize) {
758 let node_index = self.node_id.parse::<usize>().unwrap_or(0);
759 let items_per_node = total_items / self.num_nodes;
760 let remainder = total_items % self.num_nodes;
761
762 let start = node_index * items_per_node + node_index.min(remainder);
763 let extra = if node_index < remainder { 1 } else { 0 };
764 let end = start + items_per_node + extra;
765
766 (start, end)
767 }
768}
769
770pub struct DistributedNode {
772 config: DistributedConfig,
773 #[allow(dead_code)]
774 runtime: tokio::runtime::Runtime,
775}
776
777impl DistributedNode {
778 pub fn new(config: DistributedConfig) -> Self {
780 let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
781
782 Self { config, runtime }
783 }
784
785 pub fn config(&self) -> &DistributedConfig {
787 &self.config
788 }
789
790 pub fn partition_work<T>(&self, items: Vec<T>) -> Vec<T> {
792 let total = items.len();
793 let (start, end) = self.config.partition_range(total);
794
795 items.into_iter().skip(start).take(end - start).collect()
796 }
797
798 pub fn execute_local<T, F, R>(&self, items: Vec<T>, worker_fn: F) -> Vec<R>
800 where
801 T: Send,
802 F: Fn(T) -> R + Send + Sync + Clone,
803 R: Send,
804 {
805 let executor = ParallelExecutor::default();
806 executor.execute(items, worker_fn)
807 }
808}
809
810pub struct DistributedCoordinator {
832 num_nodes: usize,
833 nodes: Vec<String>,
834}
835
836impl DistributedCoordinator {
837 pub fn new(num_nodes: usize) -> Self {
839 let nodes = (0..num_nodes).map(|i| format!("node-{}", i)).collect();
840
841 Self { num_nodes, nodes }
842 }
843
844 pub fn num_nodes(&self) -> usize {
846 self.num_nodes
847 }
848
849 pub fn nodes(&self) -> &[String] {
851 &self.nodes
852 }
853
854 pub fn distribute_work<T>(&self, items: Vec<T>) -> Vec<Vec<T>> {
856 let total_items = items.len();
857 let items_per_node = total_items / self.num_nodes;
858 let remainder = total_items % self.num_nodes;
859
860 let mut distributed: Vec<Vec<T>> = Vec::with_capacity(self.num_nodes);
861 let mut items_iter = items.into_iter();
862
863 for node_id in 0..self.num_nodes {
864 let node_size = if node_id < remainder {
865 items_per_node + 1
866 } else {
867 items_per_node
868 };
869
870 let chunk: Vec<T> = items_iter.by_ref().take(node_size).collect();
871 distributed.push(chunk);
872 }
873
874 distributed
875 }
876
877 pub fn aggregate_results<T>(&self, node_results: Vec<Vec<T>>) -> Vec<T> {
879 node_results.into_iter().flatten().collect()
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use legalis_core::BasicEntity;
887
888 #[test]
889 fn test_batch_iterator() {
890 let items: Vec<i32> = (0..100).collect();
891 let mut batch_iter = BatchIterator::new(items, 25);
892
893 assert_eq!(batch_iter.batch_count(), 4);
894
895 let batch1 = batch_iter.next().unwrap();
896 assert_eq!(batch1.len(), 25);
897
898 let batch2 = batch_iter.next().unwrap();
899 assert_eq!(batch2.len(), 25);
900
901 let batch3 = batch_iter.next().unwrap();
902 assert_eq!(batch3.len(), 25);
903
904 let batch4 = batch_iter.next().unwrap();
905 assert_eq!(batch4.len(), 25);
906
907 assert!(batch_iter.next().is_none());
908 }
909
910 #[test]
911 fn test_batch_config() {
912 let config = BatchConfig::default()
913 .with_batch_size(500)
914 .with_workers(4)
915 .with_streaming(true);
916
917 assert_eq!(config.batch_size, 500);
918 assert_eq!(config.num_workers, 4);
919 assert!(config.streaming_mode);
920 }
921
922 #[test]
923 fn test_entity_pool() {
924 let mut pool = EntityPool::new(10);
925
926 let entity1 = pool.acquire(BasicEntity::new);
927 let id1 = entity1.id();
928
929 pool.release(entity1);
930 assert_eq!(pool.size(), 1);
931
932 let entity2 = pool.acquire(BasicEntity::new);
933 assert_eq!(entity2.id(), id1); pool.clear();
936 assert_eq!(pool.size(), 0);
937 }
938
939 #[test]
940 fn test_streaming_processor() {
941 let processor = StreamingProcessor::new(10);
942
943 let entities: Vec<Arc<dyn LegalEntity>> = (0..50)
944 .map(|_| Arc::new(BasicEntity::new()) as Arc<dyn LegalEntity>)
945 .collect();
946
947 let results = processor.process(entities, |entity| entity.id());
948 assert_eq!(results.len(), 50);
949 }
950
951 #[test]
952 fn test_lazy_attribute_cache() {
953 let mut cache = LazyAttributeCache::new();
954
955 let value1 = cache.get_or_compute("test", || "computed".to_string());
956 assert_eq!(value1, "computed");
957 assert!(cache.is_dirty());
958 assert_eq!(cache.size(), 1);
959
960 cache.mark_clean();
961 assert!(!cache.is_dirty());
962
963 let value2 = cache.get_or_compute("test", || "should_not_compute".to_string());
964 assert_eq!(value2, "computed"); assert!(!cache.is_dirty());
966
967 cache.clear();
968 assert_eq!(cache.size(), 0);
969 }
970
971 #[test]
972 fn test_work_scheduler_distribution() {
973 let scheduler = WorkScheduler::new(4);
974 let items: Vec<i32> = (0..100).collect();
975
976 let distributed = scheduler.distribute_work(items);
977
978 assert_eq!(distributed.len(), 4);
979
980 for chunk in &distributed {
982 assert_eq!(chunk.len(), 25);
983 }
984
985 let total: usize = distributed.iter().map(|c| c.len()).sum();
987 assert_eq!(total, 100);
988 }
989
990 #[test]
991 fn test_work_scheduler_uneven_distribution() {
992 let scheduler = WorkScheduler::new(3);
993 let items: Vec<i32> = (0..10).collect();
994
995 let distributed = scheduler.distribute_work(items);
996
997 assert_eq!(distributed.len(), 3);
998
999 assert_eq!(distributed[0].len(), 4);
1001 assert_eq!(distributed[1].len(), 3);
1003 assert_eq!(distributed[2].len(), 3);
1004
1005 let total: usize = distributed.iter().map(|c| c.len()).sum();
1006 assert_eq!(total, 10);
1007 }
1008
1009 #[test]
1010 fn test_work_scheduler_optimal_batch_size() {
1011 let scheduler = WorkScheduler::new(4);
1012
1013 assert_eq!(scheduler.optimal_batch_size(200), 100);
1015
1016 assert_eq!(scheduler.optimal_batch_size(40_000), 10_000);
1018
1019 assert_eq!(scheduler.optimal_batch_size(1_000_000), 10_000);
1021 }
1022
1023 #[test]
1024 fn test_parallel_executor() {
1025 let executor = ParallelExecutor::new(4);
1026 let items: Vec<i32> = (0..100).collect();
1027
1028 let results = executor.execute(items, |x| x * 2);
1029
1030 assert_eq!(results.len(), 100);
1031 let mut sorted_results = results;
1033 sorted_results.sort();
1034
1035 for (i, &value) in sorted_results.iter().enumerate() {
1036 assert_eq!(value, (i as i32) * 2);
1037 }
1038 }
1039
1040 #[test]
1041 fn test_memory_mapped_metadata() {
1042 let metadata =
1043 PopulationMetadata::new(1000).with_metadata("source".to_string(), "test".to_string());
1044
1045 assert_eq!(metadata.entity_count, 1000);
1046 assert_eq!(metadata.format_version, 1);
1047 assert_eq!(metadata.metadata.get("source").unwrap(), "test");
1048 }
1049
1050 #[test]
1051 fn test_memory_mapped_population_create_and_open() {
1052 let temp_dir = std::env::temp_dir();
1053 let file_path = temp_dir.join("test_population.json");
1054
1055 let _ = std::fs::remove_file(&file_path);
1057
1058 let metadata = PopulationMetadata::new(500);
1060 let pop = MemoryMappedPopulation::create(&file_path, metadata).unwrap();
1061 assert_eq!(pop.entity_count(), 500);
1062
1063 let pop_opened = MemoryMappedPopulation::open(&file_path).unwrap();
1065 assert_eq!(pop_opened.entity_count(), 500);
1066 assert_eq!(pop_opened.metadata().format_version, 1);
1067
1068 std::fs::remove_file(&file_path).unwrap();
1070 }
1071
1072 #[test]
1074 fn test_simd_sum() {
1075 let values: Vec<f64> = (1..=100).map(|x| x as f64).collect();
1076 let sum = SimdBatchProcessor::sum_f64(&values);
1077 let expected: f64 = (1..=100).sum::<i32>() as f64;
1078 assert!((sum - expected).abs() < 1e-10);
1079 }
1080
1081 #[test]
1082 fn test_simd_mean() {
1083 let values: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1084 let mean = SimdBatchProcessor::mean_f64(&values).unwrap();
1085 assert!((mean - 3.0).abs() < 1e-10);
1086 }
1087
1088 #[test]
1089 fn test_simd_variance() {
1090 let values: Vec<f64> = vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
1091 let variance = SimdBatchProcessor::variance_f64(&values).unwrap();
1092 assert!((variance - 4.0).abs() < 1e-10);
1094 }
1095
1096 #[test]
1097 fn test_simd_std_dev() {
1098 let values: Vec<f64> = vec![2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
1099 let std_dev = SimdBatchProcessor::std_dev_f64(&values).unwrap();
1100 assert!((std_dev - 2.0).abs() < 1e-10);
1102 }
1103
1104 #[test]
1105 fn test_simd_min_max() {
1106 let values: Vec<f64> = vec![5.0, 2.0, 8.0, 1.0, 9.0, 3.0];
1107 let min = SimdBatchProcessor::min_f64(&values).unwrap();
1108 let max = SimdBatchProcessor::max_f64(&values).unwrap();
1109 assert_eq!(min, 1.0);
1110 assert_eq!(max, 9.0);
1111 }
1112
1113 #[test]
1114 fn test_simd_scale() {
1115 let mut values: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0];
1116 SimdBatchProcessor::scale_f64(&mut values, 2.0);
1117 assert_eq!(values, vec![2.0, 4.0, 6.0, 8.0, 10.0]);
1118 }
1119
1120 #[test]
1121 fn test_simd_dot_product() {
1122 let a: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0];
1123 let b: Vec<f64> = vec![5.0, 6.0, 7.0, 8.0];
1124 let dot = SimdBatchProcessor::dot_product_f64(&a, &b).unwrap();
1125 assert_eq!(dot, 70.0);
1127 }
1128
1129 #[test]
1130 fn test_simd_normalize() {
1131 let mut values: Vec<f64> = vec![0.0, 5.0, 10.0];
1132 SimdBatchProcessor::normalize_f64(&mut values).unwrap();
1133 assert_eq!(values[0], 0.0);
1134 assert_eq!(values[1], 0.5);
1135 assert_eq!(values[2], 1.0);
1136 }
1137
1138 #[test]
1139 fn test_simd_large_dataset() {
1140 let values: Vec<f64> = (1..=10000).map(|x| x as f64).collect();
1141 let sum = SimdBatchProcessor::sum_f64(&values);
1142 let expected: f64 = (1..=10000).sum::<i32>() as f64;
1143 assert!((sum - expected).abs() < 1e-6);
1144 }
1145
1146 #[test]
1148 fn test_distributed_config() {
1149 let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1150
1151 assert_eq!(config.node_id, "0");
1152 assert_eq!(config.num_nodes, 4);
1153 assert_eq!(config.coordinator_addr, "localhost");
1154 assert_eq!(config.port, 8080);
1155 }
1156
1157 #[test]
1158 fn test_distributed_partition_range() {
1159 let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1160
1161 let (start, end) = config.partition_range(100);
1162 assert_eq!(start, 0);
1163 assert_eq!(end, 25);
1164 }
1165
1166 #[test]
1167 fn test_distributed_node() {
1168 let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1169
1170 let node = DistributedNode::new(config);
1171 assert_eq!(node.config().node_id, "0");
1172 assert_eq!(node.config().num_nodes, 4);
1173 }
1174
1175 #[test]
1176 fn test_distributed_partition_work() {
1177 let config = DistributedConfig::new("0".to_string(), 4, "localhost".to_string(), 8080);
1178
1179 let node = DistributedNode::new(config);
1180 let items: Vec<i32> = (0..100).collect();
1181 let partitioned = node.partition_work(items);
1182
1183 assert_eq!(partitioned.len(), 25);
1185 assert_eq!(partitioned[0], 0);
1186 assert_eq!(partitioned[24], 24);
1187 }
1188
1189 #[test]
1190 fn test_distributed_execute_local() {
1191 let config = DistributedConfig::new("0".to_string(), 2, "localhost".to_string(), 8080);
1192
1193 let node = DistributedNode::new(config);
1194 let items: Vec<i32> = vec![1, 2, 3, 4, 5];
1195 let results = node.execute_local(items, |x| x * 2);
1196
1197 assert_eq!(results.len(), 5);
1198 let mut sorted = results;
1199 sorted.sort();
1200 assert_eq!(sorted, vec![2, 4, 6, 8, 10]);
1201 }
1202
1203 #[test]
1204 fn test_distributed_coordinator() {
1205 let coordinator = DistributedCoordinator::new(3);
1206 assert_eq!(coordinator.num_nodes(), 3);
1207 assert_eq!(coordinator.nodes().len(), 3);
1208 assert_eq!(coordinator.nodes()[0], "node-0");
1209 assert_eq!(coordinator.nodes()[1], "node-1");
1210 assert_eq!(coordinator.nodes()[2], "node-2");
1211 }
1212
1213 #[test]
1214 fn test_distributed_distribute_work() {
1215 let coordinator = DistributedCoordinator::new(3);
1216 let items: Vec<i32> = (0..10).collect();
1217 let distributed = coordinator.distribute_work(items);
1218
1219 assert_eq!(distributed.len(), 3);
1220 assert_eq!(distributed[0].len(), 4); assert_eq!(distributed[1].len(), 3);
1223 assert_eq!(distributed[2].len(), 3);
1224
1225 let total: usize = distributed.iter().map(|v| v.len()).sum();
1227 assert_eq!(total, 10);
1228 }
1229
1230 #[test]
1231 fn test_distributed_aggregate_results() {
1232 let coordinator = DistributedCoordinator::new(3);
1233 let node_results = vec![vec![1, 2, 3], vec![4, 5], vec![6, 7, 8, 9]];
1234
1235 let aggregated = coordinator.aggregate_results(node_results);
1236 assert_eq!(aggregated.len(), 9);
1237 assert_eq!(aggregated, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
1238 }
1239}