1use rand::{RngCore, SeedableRng};
10use rand_xoshiro::Xoshiro256PlusPlus;
11use rayon::prelude::*;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use crate::constants::*;
15
16#[cfg(feature = "numa")]
17use crate::numa::NumaTopology;
18
19#[cfg(feature = "numa")]
20use hwlocality::{
21 memory::binding::{MemoryBindingFlags, MemoryBindingPolicy},
22 Topology,
23};
24
25#[cfg(feature = "numa")]
31pub enum DataBuffer {
32 Uma(Vec<u8>),
35 Numa((Topology, hwlocality::memory::binding::Bytes<'static>, usize)),
39}
40
41#[cfg(feature = "numa")]
42impl DataBuffer {
43 pub fn as_mut_slice(&mut self) -> &mut [u8] {
45 match self {
46 DataBuffer::Uma(vec) => vec.as_mut_slice(),
47 DataBuffer::Numa((_, bytes, _)) => {
48 unsafe {
50 std::slice::from_raw_parts_mut(bytes.as_mut_ptr() as *mut u8, bytes.len())
51 }
52 }
53 }
54 }
55
56 pub fn as_slice(&self) -> &[u8] {
58 match self {
59 DataBuffer::Uma(vec) => vec.as_slice(),
60 DataBuffer::Numa((_, bytes, size)) => {
61 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const u8, *size) }
63 }
64 }
65 }
66
67 pub fn as_ptr(&self) -> *const u8 {
69 match self {
70 DataBuffer::Uma(vec) => vec.as_ptr(),
71 DataBuffer::Numa((_, bytes, _)) => bytes.as_ptr() as *const u8,
72 }
73 }
74
75 pub fn as_mut_ptr(&mut self) -> *mut u8 {
77 match self {
78 DataBuffer::Uma(vec) => vec.as_mut_ptr(),
79 DataBuffer::Numa((_, bytes, _)) => bytes.as_mut_ptr() as *mut u8,
80 }
81 }
82
83 pub fn len(&self) -> usize {
85 match self {
86 DataBuffer::Uma(vec) => vec.len(),
87 DataBuffer::Numa((_, _, size)) => *size,
88 }
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.len() == 0
94 }
95
96 pub fn truncate(&mut self, size: usize) {
98 match self {
99 DataBuffer::Uma(vec) => vec.truncate(size),
100 DataBuffer::Numa((_, bytes, actual_size)) => {
101 *actual_size = size.min(bytes.len());
102 }
103 }
104 }
105
106 pub fn into_bytes(self) -> bytes::Bytes {
112 match self {
113 DataBuffer::Uma(vec) => bytes::Bytes::from(vec),
114 DataBuffer::Numa((_, hwloc_bytes, size)) => {
115 let slice =
118 unsafe { std::slice::from_raw_parts(hwloc_bytes.as_ptr() as *const u8, size) };
119 bytes::Bytes::copy_from_slice(slice)
120 }
121 }
122 }
123}
124
125#[cfg(not(feature = "numa"))]
126pub enum DataBuffer {
127 Uma(Vec<u8>),
128}
129
130#[cfg(not(feature = "numa"))]
131impl DataBuffer {
132 pub fn as_mut_slice(&mut self) -> &mut [u8] {
133 match self {
134 DataBuffer::Uma(vec) => vec.as_mut_slice(),
135 }
136 }
137
138 pub fn as_slice(&self) -> &[u8] {
139 match self {
140 DataBuffer::Uma(vec) => vec.as_slice(),
141 }
142 }
143
144 pub fn as_ptr(&self) -> *const u8 {
145 match self {
146 DataBuffer::Uma(vec) => vec.as_ptr(),
147 }
148 }
149
150 pub fn as_mut_ptr(&mut self) -> *mut u8 {
151 match self {
152 DataBuffer::Uma(vec) => vec.as_mut_ptr(),
153 }
154 }
155
156 pub fn len(&self) -> usize {
157 match self {
158 DataBuffer::Uma(vec) => vec.len(),
159 }
160 }
161
162 pub fn truncate(&mut self, size: usize) {
163 match self {
164 DataBuffer::Uma(vec) => vec.truncate(size),
165 }
166 }
167}
168
169#[cfg(feature = "numa")]
175fn allocate_numa_buffer(
176 size: usize,
177 node_id: usize,
178) -> Result<(Topology, hwlocality::memory::binding::Bytes<'static>, usize), String> {
179 use hwlocality::object::types::ObjectType;
180
181 let topology =
183 Topology::new().map_err(|e| format!("Failed to create hwloc topology: {}", e))?;
184
185 let numa_nodes: Vec<_> = topology.objects_with_type(ObjectType::NUMANode).collect();
187
188 if numa_nodes.is_empty() {
189 return Err("No NUMA nodes found in topology".to_string());
190 }
191
192 let node = numa_nodes
194 .iter()
195 .find(|n| n.os_index() == Some(node_id))
196 .ok_or_else(|| {
197 format!(
198 "NUMA node {} not found (available: {:?})",
199 node_id,
200 numa_nodes
201 .iter()
202 .filter_map(|n| n.os_index())
203 .collect::<Vec<_>>()
204 )
205 })?;
206
207 let nodeset = node
209 .nodeset()
210 .ok_or_else(|| format!("NUMA node {} has no nodeset", node_id))?;
211
212 tracing::debug!(
213 "Allocating {} bytes on NUMA node {} with nodeset {:?}",
214 size,
215 node_id,
216 nodeset
217 );
218
219 let bytes = topology
222 .binding_allocate_memory(
223 size,
224 nodeset,
225 MemoryBindingPolicy::Bind,
226 MemoryBindingFlags::ASSUME_SINGLE_THREAD,
227 )
228 .map_err(|e| format!("Failed to allocate NUMA memory: {}", e))?;
229
230 let bytes_static = unsafe {
234 std::mem::transmute::<
235 hwlocality::memory::binding::Bytes<'_>,
236 hwlocality::memory::binding::Bytes<'static>,
237 >(bytes)
238 };
239
240 Ok((topology, bytes_static, size))
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum NumaMode {
246 #[default]
248 Auto,
249 Force,
251 Disabled,
253}
254
255#[derive(Debug, Clone)]
257pub struct GeneratorConfig {
258 pub size: usize,
260 pub dedup_factor: usize,
262 pub compress_factor: usize,
264 pub numa_mode: NumaMode,
266 pub max_threads: Option<usize>,
268 pub numa_node: Option<usize>,
271 pub block_size: Option<usize>,
275 pub seed: Option<u64>,
278}
279
280impl Default for GeneratorConfig {
281 fn default() -> Self {
282 Self {
283 size: BLOCK_SIZE,
284 dedup_factor: 1,
285 compress_factor: 1,
286 numa_mode: NumaMode::Auto,
287 max_threads: None, seed: None, numa_node: None, block_size: None, }
292 }
293}
294
295pub fn generate_data_simple(size: usize, dedup: usize, compress: usize) -> DataBuffer {
311 let config = GeneratorConfig {
312 size,
313 dedup_factor: dedup.max(1),
314 compress_factor: compress.max(1),
315 numa_mode: NumaMode::Auto,
316 max_threads: None,
317 numa_node: None,
318 block_size: None,
319 seed: None,
320 };
321 generate_data(config)
322}
323
324pub fn generate_data(config: GeneratorConfig) -> DataBuffer {
344 let block_size = config
346 .block_size
347 .map(|bs| bs.clamp(1024 * 1024, 32 * 1024 * 1024)) .unwrap_or(BLOCK_SIZE);
349
350 tracing::info!(
351 "Starting data generation: size={}, dedup={}, compress={}, block_size={}",
352 config.size,
353 config.dedup_factor,
354 config.compress_factor,
355 block_size
356 );
357
358 let size = config.size.max(block_size); let nblocks = size.div_ceil(block_size);
360
361 let dedup_factor = config.dedup_factor.max(1);
362 let unique_blocks = if dedup_factor > 1 {
363 ((nblocks as f64) / (dedup_factor as f64)).round().max(1.0) as usize
364 } else {
365 nblocks
366 };
367
368 tracing::debug!(
369 "Generating: size={}, blocks={}, dedup={}, unique_blocks={}, compress={}",
370 size,
371 nblocks,
372 dedup_factor,
373 unique_blocks,
374 config.compress_factor
375 );
376
377 let (f_num, f_den) = if config.compress_factor > 1 {
380 (config.compress_factor - 1, config.compress_factor)
381 } else {
382 (0, 1)
383 };
384 let floor_len = (f_num * block_size) / f_den;
385 let rem = (f_num * block_size) % f_den;
386
387 let copy_lens: Vec<usize> = {
388 let mut v = Vec::with_capacity(unique_blocks);
389 let mut err = 0;
390 for _ in 0..unique_blocks {
391 err += rem;
392 if err >= f_den {
393 err -= f_den;
394 v.push(floor_len + 1);
395 } else {
396 v.push(floor_len);
397 }
398 }
399 v
400 };
401
402 let call_entropy = generate_call_entropy();
404
405 let total_size = nblocks * block_size;
407 tracing::debug!("Allocating {} bytes ({} blocks)", total_size, nblocks);
408
409 #[cfg(feature = "numa")]
412 let mut data_buffer = if let Some(node_id) = config.numa_node {
413 tracing::info!("Attempting NUMA allocation on node {}", node_id);
414 match allocate_numa_buffer(total_size, node_id) {
415 Ok(buffer) => {
416 tracing::info!(
417 "Successfully allocated {} bytes on NUMA node {}",
418 total_size,
419 node_id
420 );
421 DataBuffer::Numa(buffer)
422 }
423 Err(e) => {
424 tracing::warn!("NUMA allocation failed: {}, falling back to UMA", e);
425 DataBuffer::Uma(vec![0u8; total_size])
426 }
427 }
428 } else {
429 DataBuffer::Uma(vec![0u8; total_size])
430 };
431
432 #[cfg(not(feature = "numa"))]
433 let mut data_buffer = DataBuffer::Uma(vec![0u8; total_size]);
434
435 #[cfg(feature = "numa")]
437 let numa_topology = if config.numa_mode != NumaMode::Disabled {
438 NumaTopology::detect().ok()
439 } else {
440 None
441 };
442
443 #[cfg(feature = "numa")]
445 let num_threads = if let Some(node_id) = config.numa_node {
446 if let Some(ref topology) = numa_topology {
447 if let Some(node) = topology.nodes.iter().find(|n| n.node_id == node_id) {
448 let node_cores = node.cpus.len();
450 let requested_threads = config.max_threads.unwrap_or(node_cores);
451 let threads = requested_threads.min(node_cores);
452 tracing::info!(
453 "Pinning to NUMA node {}: using {} threads ({} cores available)",
454 node_id,
455 threads,
456 node_cores
457 );
458 threads
459 } else {
460 tracing::warn!(
461 "NUMA node {} not found, using default thread count",
462 node_id
463 );
464 config.max_threads.unwrap_or_else(get_affinity_cpu_count)
465 }
466 } else {
467 tracing::warn!("NUMA topology not available, falling back to CPU affinity mask");
468 config.max_threads.unwrap_or_else(get_affinity_cpu_count)
471 }
472 } else {
473 config.max_threads.unwrap_or_else(num_cpus::get)
475 };
476
477 #[cfg(not(feature = "numa"))]
478 let num_threads = config.max_threads.unwrap_or_else(num_cpus::get);
479
480 tracing::info!("Using {} threads for parallel generation", num_threads);
481
482 #[cfg(feature = "numa")]
483 let should_optimize_numa = if let Some(ref topology) = numa_topology {
484 let optimize = match config.numa_mode {
485 NumaMode::Auto => topology.num_nodes > 1,
486 NumaMode::Force => true,
487 NumaMode::Disabled => false,
488 };
489
490 if optimize {
491 tracing::info!(
492 "NUMA optimization enabled: {} nodes detected",
493 topology.num_nodes
494 );
495 } else {
496 tracing::debug!(
497 "NUMA optimization not needed: {} nodes detected",
498 topology.num_nodes
499 );
500 }
501 optimize
502 } else {
503 false
504 };
505
506 #[cfg(not(feature = "numa"))]
507 let should_optimize_numa = false;
508
509 tracing::debug!("Starting parallel generation with rayon");
510
511 #[cfg(all(feature = "numa", feature = "thread-pinning"))]
514 let pool = if should_optimize_numa {
515 if let Some(ref topology) = numa_topology {
516 if topology.num_nodes > 1 {
517 tracing::debug!(
518 "Configuring NUMA-aware thread pinning for {} nodes",
519 topology.num_nodes
520 );
521
522 let cpu_map = std::sync::Arc::new(build_cpu_affinity_map(
524 topology,
525 num_threads,
526 config.numa_node,
527 ));
528
529 rayon::ThreadPoolBuilder::new()
530 .num_threads(num_threads)
531 .spawn_handler(move |thread| {
532 let cpu_map = cpu_map.clone();
533 let mut b = std::thread::Builder::new();
534 if let Some(name) = thread.name() {
535 b = b.name(name.to_owned());
536 }
537 if let Some(stack_size) = thread.stack_size() {
538 b = b.stack_size(stack_size);
539 }
540
541 b.spawn(move || {
542 let thread_id = rayon::current_thread_index().unwrap_or(0);
544 if let Some(core_ids) = cpu_map.get(&thread_id) {
545 pin_thread_to_cores(core_ids);
546 }
547 thread.run()
548 })?;
549 Ok(())
550 })
551 .build()
552 .expect("Failed to create NUMA-aware thread pool")
553 } else {
554 tracing::debug!("Skipping thread pinning on UMA system (would add overhead)");
555 rayon::ThreadPoolBuilder::new()
556 .num_threads(num_threads)
557 .build()
558 .expect("Failed to create thread pool")
559 }
560 } else {
561 rayon::ThreadPoolBuilder::new()
562 .num_threads(num_threads)
563 .build()
564 .expect("Failed to create thread pool")
565 }
566 } else {
567 rayon::ThreadPoolBuilder::new()
568 .num_threads(num_threads)
569 .build()
570 .expect("Failed to create thread pool")
571 };
572
573 #[cfg(not(all(feature = "numa", feature = "thread-pinning")))]
574 let pool = rayon::ThreadPoolBuilder::new()
575 .num_threads(num_threads)
576 .build()
577 .expect("Failed to create thread pool");
578
579 #[cfg(feature = "numa")]
583 if should_optimize_numa {
584 if let Some(ref topology) = numa_topology {
585 if topology.num_nodes > 1 {
586 tracing::debug!(
587 "Performing first-touch memory initialization for {} NUMA nodes",
588 topology.num_nodes
589 );
590 pool.install(|| {
591 let _data = data_buffer.as_mut_slice();
592 _data.par_chunks_mut(block_size).for_each(|chunk| {
593 chunk[0] = 0;
596 if chunk.len() > 4096 {
597 chunk[chunk.len() - 1] = 0;
598 }
599 });
600 });
601 } else {
602 tracing::trace!("Skipping first-touch on UMA system");
603 }
604 }
605 }
606
607 pool.install(|| {
608 let data = data_buffer.as_mut_slice();
609 data.par_chunks_mut(block_size)
610 .enumerate()
611 .for_each(|(i, chunk)| {
612 let ub = i % unique_blocks;
613 tracing::trace!("Filling block {} (unique block {})", i, ub);
614 fill_block(
616 chunk,
617 ub,
618 copy_lens[ub].min(chunk.len()),
619 i as u64,
620 call_entropy,
621 );
622 });
623 });
624
625 tracing::debug!("Parallel generation complete, truncating to {} bytes", size);
626 data_buffer.truncate(size);
628
629 data_buffer
631}
632
633fn fill_block(
664 out: &mut [u8],
665 unique_block_idx: usize,
666 copy_len: usize,
667 block_sequence: u64,
668 seed_base: u64,
669) {
670 tracing::trace!(
671 "fill_block: idx={}, seq={}, copy_len={}, out_len={}",
672 unique_block_idx,
673 block_sequence,
674 copy_len,
675 out.len()
676 );
677
678 let seed = seed_base.wrapping_add(block_sequence);
681 let mut rng = Xoshiro256PlusPlus::seed_from_u64(seed);
682
683 if copy_len == 0 {
691 tracing::trace!(
693 "Filling {} bytes with RNG keystream (incompressible)",
694 out.len()
695 );
696 rng.fill_bytes(out);
697 } else {
698 let incompressible_len = out.len().saturating_sub(copy_len);
700
701 tracing::trace!(
702 "Filling block: {} bytes random (incompressible) + {} bytes zeros (compressible)",
703 incompressible_len,
704 copy_len
705 );
706
707 if incompressible_len > 0 {
709 rng.fill_bytes(&mut out[..incompressible_len]);
710 }
711
712 if copy_len > 0 && incompressible_len < out.len() {
715 out[incompressible_len..].fill(0);
716 }
717 }
718
719 tracing::trace!(
720 "fill_block complete: {} compressible bytes (zeros)",
721 copy_len
722 );
723}
724
725fn generate_call_entropy() -> u64 {
727 let time_entropy = SystemTime::now()
728 .duration_since(UNIX_EPOCH)
729 .unwrap_or_default()
730 .as_nanos() as u64;
731
732 let urandom_entropy: u64 = {
733 let mut rng = rand::rng();
734 rng.next_u64()
735 };
736
737 time_entropy.wrapping_add(urandom_entropy)
738}
739
740#[cfg(all(feature = "numa", feature = "thread-pinning"))]
741use std::collections::HashMap;
742
743fn get_affinity_cpu_count() -> usize {
746 #[cfg(target_os = "linux")]
747 {
748 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
750 for line in status.lines() {
751 if line.starts_with("Cpus_allowed_list:") {
752 if let Some(cpus) = line.split(':').nth(1) {
753 let cpus = cpus.trim();
754 let count = parse_cpu_list(cpus);
755 if count > 0 {
756 tracing::debug!("CPU affinity mask: {} CPUs ({})", count, cpus);
757 return count;
758 }
759 }
760 }
761 }
762 }
763 }
764
765 num_cpus::get()
767}
768
769#[cfg(target_os = "linux")]
771fn parse_cpu_list(cpu_list: &str) -> usize {
772 let mut count = 0;
773 for range in cpu_list.split(',') {
774 let range = range.trim();
775 if range.is_empty() {
776 continue;
777 }
778
779 if let Some((start, end)) = range.split_once('-') {
780 if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
781 count += (e - s) + 1;
782 }
783 } else if range.parse::<usize>().is_ok() {
784 count += 1;
785 }
786 }
787 count
788}
789
790#[cfg(all(feature = "numa", feature = "thread-pinning"))]
792#[cfg(all(feature = "numa", feature = "thread-pinning"))]
796fn build_cpu_affinity_map(
797 topology: &crate::numa::NumaTopology,
798 num_threads: usize,
799 numa_node: Option<usize>,
800) -> HashMap<usize, Vec<usize>> {
801 let mut map = HashMap::new();
802
803 if let Some(target_node_id) = numa_node {
804 if let Some(target_node) = topology.nodes.iter().find(|n| n.node_id == target_node_id) {
806 tracing::info!(
807 "Pinning {} threads to NUMA node {} ({} cores available)",
808 num_threads,
809 target_node_id,
810 target_node.cpus.len()
811 );
812
813 for thread_id in 0..num_threads {
815 let core_idx = thread_id % target_node.cpus.len();
816 let core_id = target_node.cpus[core_idx];
817
818 tracing::trace!(
819 "Thread {} -> NUMA node {} core {}",
820 thread_id,
821 target_node_id,
822 core_id
823 );
824 map.insert(thread_id, vec![core_id]);
825 }
826 } else {
827 tracing::warn!(
828 "NUMA node {} not found in topology (available: 0-{})",
829 target_node_id,
830 topology.num_nodes - 1
831 );
832 }
833 } else {
834 let mut thread_id = 0;
836 let mut node_idx = 0;
837
838 while thread_id < num_threads {
839 if let Some(node) = topology.nodes.get(node_idx % topology.nodes.len()) {
840 let cores_per_thread =
842 (node.cpus.len() as f64 / num_threads as f64).ceil() as usize;
843 let cores_per_thread = cores_per_thread.max(1);
844
845 let start_cpu = (thread_id * cores_per_thread) % node.cpus.len();
846 let end_cpu = ((thread_id + 1) * cores_per_thread).min(node.cpus.len());
847
848 let core_ids: Vec<usize> = node.cpus[start_cpu..end_cpu].to_vec();
849
850 if !core_ids.is_empty() {
851 tracing::trace!(
852 "Thread {} -> NUMA node {} cores {:?}",
853 thread_id,
854 node.node_id,
855 &core_ids
856 );
857 map.insert(thread_id, core_ids);
858 }
859 }
860
861 thread_id += 1;
862 node_idx += 1;
863 }
864 }
865
866 map
867}
868
869#[cfg(all(feature = "numa", feature = "thread-pinning"))]
871fn pin_thread_to_cores(core_ids: &[usize]) {
872 if let Some(&first_core) = core_ids.first() {
873 if let Some(core_ids_all) = core_affinity::get_core_ids() {
874 if first_core < core_ids_all.len() {
875 let core_id = core_ids_all[first_core];
876 if core_affinity::set_for_current(core_id) {
877 tracing::trace!("Pinned thread to core {}", first_core);
878 } else {
879 tracing::debug!("Failed to pin thread to core {}", first_core);
880 }
881 }
882 }
883 }
884}
885
886pub struct DataGenerator {
892 total_size: usize,
893 current_pos: usize,
894 #[allow(dead_code)]
895 dedup_factor: usize,
896 #[allow(dead_code)]
897 compress_factor: usize,
898 unique_blocks: usize,
899 copy_lens: Vec<usize>,
900 call_entropy: u64,
901 block_sequence: u64, max_threads: usize, thread_pool: Option<rayon::ThreadPool>, block_size: usize, }
906
907impl DataGenerator {
908 pub fn new(config: GeneratorConfig) -> Self {
910 let block_size = config
912 .block_size
913 .map(|bs| bs.clamp(1024 * 1024, 32 * 1024 * 1024)) .unwrap_or(BLOCK_SIZE);
915
916 tracing::info!(
917 "Creating DataGenerator: size={}, dedup={}, compress={}, block_size={}",
918 config.size,
919 config.dedup_factor,
920 config.compress_factor,
921 block_size
922 );
923
924 let total_size = config.size.max(block_size); let nblocks = total_size.div_ceil(block_size);
926
927 let dedup_factor = config.dedup_factor.max(1);
928 let unique_blocks = if dedup_factor > 1 {
929 ((nblocks as f64) / (dedup_factor as f64)).round().max(1.0) as usize
930 } else {
931 nblocks
932 };
933
934 let (f_num, f_den) = if config.compress_factor > 1 {
936 (config.compress_factor - 1, config.compress_factor)
937 } else {
938 (0, 1)
939 };
940 let floor_len = (f_num * block_size) / f_den;
941 let rem = (f_num * block_size) % f_den;
942
943 let copy_lens: Vec<usize> = {
944 let mut v = Vec::with_capacity(unique_blocks);
945 let mut err = 0;
946 for _ in 0..unique_blocks {
947 err += rem;
948 if err >= f_den {
949 err -= f_den;
950 v.push(floor_len + 1);
951 } else {
952 v.push(floor_len);
953 }
954 }
955 v
956 };
957
958 let call_entropy = config.seed.unwrap_or_else(generate_call_entropy);
960
961 let max_threads = config.max_threads.unwrap_or_else(num_cpus::get);
962
963 let thread_pool = if max_threads > 1 {
965 match rayon::ThreadPoolBuilder::new()
966 .num_threads(max_threads)
967 .build()
968 {
969 Ok(pool) => {
970 tracing::info!(
971 "DataGenerator configured with {} threads (thread pool created)",
972 max_threads
973 );
974 Some(pool)
975 }
976 Err(e) => {
977 tracing::warn!(
978 "Failed to create thread pool: {}, falling back to sequential",
979 e
980 );
981 None
982 }
983 }
984 } else {
985 tracing::info!("DataGenerator configured for single-threaded operation");
986 None
987 };
988
989 Self {
990 total_size,
991 current_pos: 0,
992 dedup_factor,
993 compress_factor: config.compress_factor,
994 unique_blocks,
995 copy_lens,
996 call_entropy,
997 block_sequence: 0, max_threads,
999 thread_pool,
1000 block_size,
1001 }
1002 }
1003
1004 pub fn fill_chunk(&mut self, buf: &mut [u8]) -> usize {
1011 tracing::trace!(
1012 "fill_chunk called: pos={}/{}, buf_len={}",
1013 self.current_pos,
1014 self.total_size,
1015 buf.len()
1016 );
1017
1018 if self.current_pos >= self.total_size {
1019 tracing::trace!("fill_chunk: already complete");
1020 return 0;
1021 }
1022
1023 let remaining = self.total_size - self.current_pos;
1024 let to_write = buf.len().min(remaining);
1025 let chunk = &mut buf[..to_write];
1026
1027 let start_block = self.current_pos / self.block_size;
1029 let start_offset = self.current_pos % self.block_size;
1030 let end_pos = self.current_pos + to_write;
1031 let end_block = (end_pos - 1) / self.block_size;
1032 let num_blocks = end_block - start_block + 1;
1033
1034 const PARALLEL_THRESHOLD: usize = 2;
1037
1038 if num_blocks >= PARALLEL_THRESHOLD && self.max_threads > 1 {
1039 self.fill_chunk_parallel(chunk, start_block, start_offset, num_blocks)
1041 } else {
1042 self.fill_chunk_sequential(chunk, start_block, start_offset, num_blocks)
1044 }
1045 }
1046
1047 #[inline]
1049 fn fill_chunk_sequential(
1050 &mut self,
1051 chunk: &mut [u8],
1052 start_block: usize,
1053 start_offset: usize,
1054 num_blocks: usize,
1055 ) -> usize {
1056 let mut offset = 0;
1057
1058 for i in 0..num_blocks {
1059 let block_idx = start_block + i;
1060 let block_offset = if i == 0 { start_offset } else { 0 };
1061 let remaining_in_block = self.block_size - block_offset;
1062 let to_copy = remaining_in_block.min(chunk.len() - offset);
1063
1064 let ub = block_idx % self.unique_blocks;
1066
1067 let mut block_buf = vec![0u8; self.block_size];
1069 fill_block(
1070 &mut block_buf,
1071 ub,
1072 self.copy_lens[ub].min(self.block_size),
1073 self.block_sequence, self.call_entropy,
1075 );
1076
1077 self.block_sequence += 1; chunk[offset..offset + to_copy]
1081 .copy_from_slice(&block_buf[block_offset..block_offset + to_copy]);
1082
1083 offset += to_copy;
1084 }
1085
1086 let to_write = offset;
1087 self.current_pos += to_write;
1088
1089 tracing::debug!(
1090 "fill_chunk_sequential: generated {} blocks ({} MiB) for {} byte chunk",
1091 num_blocks,
1092 num_blocks * 4,
1093 to_write
1094 );
1095
1096 to_write
1097 }
1098
1099 fn fill_chunk_parallel(
1101 &mut self,
1102 chunk: &mut [u8],
1103 start_block: usize,
1104 start_offset: usize,
1105 num_blocks: usize,
1106 ) -> usize {
1107 use rayon::prelude::*;
1108
1109 let thread_pool = match &self.thread_pool {
1111 Some(pool) => pool,
1112 None => {
1113 return self.fill_chunk_sequential(chunk, start_block, start_offset, num_blocks);
1115 }
1116 };
1117
1118 let call_entropy = self.call_entropy;
1119 let copy_lens = &self.copy_lens;
1120 let unique_blocks = self.unique_blocks;
1121 let block_size = self.block_size;
1122 let base_sequence = self.block_sequence; thread_pool.install(|| {
1127 chunk
1128 .par_chunks_mut(block_size)
1129 .enumerate()
1130 .for_each(|(i, block_chunk)| {
1131 let block_idx = start_block + i;
1132 let ub = block_idx % unique_blocks;
1133 let block_seq = base_sequence + (i as u64); if i == 0 && start_offset > 0 {
1137 let mut temp = vec![0u8; block_size];
1139 fill_block(
1140 &mut temp,
1141 ub,
1142 copy_lens[ub].min(block_size),
1143 block_seq,
1144 call_entropy,
1145 );
1146 let copy_len = block_size
1147 .saturating_sub(start_offset)
1148 .min(block_chunk.len());
1149 block_chunk[..copy_len]
1150 .copy_from_slice(&temp[start_offset..start_offset + copy_len]);
1151 } else {
1152 let actual_len = block_chunk.len().min(block_size);
1154 fill_block(
1155 &mut block_chunk[..actual_len],
1156 ub,
1157 copy_lens[ub].min(actual_len),
1158 block_seq,
1159 call_entropy,
1160 );
1161 }
1162 });
1163 });
1164
1165 let to_write = chunk.len();
1166 self.current_pos += to_write;
1167 self.block_sequence += num_blocks as u64; tracing::debug!(
1170 "fill_chunk_parallel: ZERO-COPY generated {} blocks ({} MiB) for {} byte chunk",
1171 num_blocks,
1172 num_blocks * 4,
1173 to_write
1174 );
1175
1176 to_write
1177 }
1178
1179 pub fn reset(&mut self) {
1181 self.current_pos = 0;
1182 }
1183
1184 pub fn position(&self) -> usize {
1186 self.current_pos
1187 }
1188
1189 pub fn total_size(&self) -> usize {
1191 self.total_size
1192 }
1193
1194 pub fn is_complete(&self) -> bool {
1196 self.current_pos >= self.total_size
1197 }
1198
1199 pub fn set_seed(&mut self, seed: Option<u64>) {
1237 self.call_entropy = seed.unwrap_or_else(generate_call_entropy);
1238 self.block_sequence = 0;
1240 tracing::debug!(
1241 "Seed reset: {} (entropy={}) - block_sequence reset to 0",
1242 if seed.is_some() {
1243 "deterministic"
1244 } else {
1245 "non-deterministic"
1246 },
1247 self.call_entropy
1248 );
1249 }
1250
1251 pub fn recommended_chunk_size() -> usize {
1261 32 * 1024 * 1024 }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267 use super::*;
1268
1269 fn init_tracing() {
1270 use tracing_subscriber::{fmt, EnvFilter};
1271 let _ = fmt()
1272 .with_env_filter(EnvFilter::from_default_env())
1273 .try_init();
1274 }
1275
1276 #[test]
1277 fn test_generate_minimal() {
1278 init_tracing();
1279 let data = generate_data_simple(100, 1, 1);
1280 assert_eq!(data.len(), BLOCK_SIZE);
1281 }
1282
1283 #[test]
1284 fn test_generate_exact_block() {
1285 init_tracing();
1286 let data = generate_data_simple(BLOCK_SIZE, 1, 1);
1287 assert_eq!(data.len(), BLOCK_SIZE);
1288 }
1289
1290 #[test]
1291 fn test_generate_multiple_blocks() {
1292 init_tracing();
1293 let size = BLOCK_SIZE * 10;
1294 let data = generate_data_simple(size, 1, 1);
1295 assert_eq!(data.len(), size);
1296 }
1297
1298 #[test]
1299 fn test_streaming_generator() {
1300 init_tracing();
1301 eprintln!("Starting streaming generator test...");
1302
1303 let config = GeneratorConfig {
1304 size: BLOCK_SIZE * 5,
1305 dedup_factor: 1,
1306 compress_factor: 1,
1307 numa_mode: NumaMode::Auto,
1308 max_threads: None,
1309 numa_node: None,
1310 block_size: None,
1311 seed: None,
1312 };
1313
1314 eprintln!("Config: {} blocks, {} bytes total", 5, BLOCK_SIZE * 5);
1315
1316 let mut gen = DataGenerator::new(config.clone());
1317 let mut result = Vec::new();
1318
1319 let chunk_size = BLOCK_SIZE; let mut chunk = vec![0u8; chunk_size];
1323
1324 let mut iterations = 0;
1325 while !gen.is_complete() {
1326 let written = gen.fill_chunk(&mut chunk);
1327 if written == 0 {
1328 break;
1329 }
1330 result.extend_from_slice(&chunk[..written]);
1331 iterations += 1;
1332
1333 if iterations % 10 == 0 {
1334 eprintln!(
1335 " Iteration {}: written={}, total={}",
1336 iterations,
1337 written,
1338 result.len()
1339 );
1340 }
1341 }
1342
1343 eprintln!(
1344 "Completed in {} iterations, generated {} bytes",
1345 iterations,
1346 result.len()
1347 );
1348 assert_eq!(result.len(), config.size);
1349 assert!(gen.is_complete());
1350 }
1351
1352 #[test]
1353 fn test_set_seed_stream_reset() {
1354 use std::collections::hash_map::DefaultHasher;
1355 use std::hash::{Hash, Hasher};
1356
1357 fn hash_buffer(buf: &[u8]) -> u64 {
1358 let mut hasher = DefaultHasher::new();
1359 buf.hash(&mut hasher);
1360 hasher.finish()
1361 }
1362
1363 init_tracing();
1364 eprintln!("Testing set_seed() stream reset behavior...");
1365
1366 let size = 30 * 1024 * 1024; let chunk_size = 10 * 1024 * 1024; eprintln!("Test 1: Seed sequence reproducibility");
1371 let config = GeneratorConfig {
1372 size,
1373 dedup_factor: 1,
1374 compress_factor: 1,
1375 numa_mode: NumaMode::Auto,
1376 max_threads: None,
1377 numa_node: None,
1378 block_size: None,
1379 seed: Some(111),
1380 };
1381
1382 let mut gen1 = DataGenerator::new(config.clone());
1384 let mut buf1 = vec![0u8; chunk_size];
1385
1386 gen1.fill_chunk(&mut buf1);
1387 let hash1a = hash_buffer(&buf1);
1388
1389 gen1.set_seed(Some(222));
1390 gen1.fill_chunk(&mut buf1);
1391 let hash1b = hash_buffer(&buf1);
1392
1393 gen1.set_seed(Some(333));
1394 gen1.fill_chunk(&mut buf1);
1395 let hash1c = hash_buffer(&buf1);
1396
1397 let mut gen2 = DataGenerator::new(config.clone());
1399 let mut buf2 = vec![0u8; chunk_size];
1400
1401 gen2.fill_chunk(&mut buf2);
1402 let hash2a = hash_buffer(&buf2);
1403
1404 gen2.set_seed(Some(222));
1405 gen2.fill_chunk(&mut buf2);
1406 let hash2b = hash_buffer(&buf2);
1407
1408 gen2.set_seed(Some(333));
1409 gen2.fill_chunk(&mut buf2);
1410 let hash2c = hash_buffer(&buf2);
1411
1412 eprintln!(" Chunk 1: hash1={:016x}, hash2={:016x}", hash1a, hash2a);
1413 eprintln!(" Chunk 2: hash1={:016x}, hash2={:016x}", hash1b, hash2b);
1414 eprintln!(" Chunk 3: hash1={:016x}, hash2={:016x}", hash1c, hash2c);
1415
1416 assert_eq!(hash1a, hash2a, "Chunk 1 (seed=111) should match");
1417 assert_eq!(hash1b, hash2b, "Chunk 2 (seed=222) should match");
1418 assert_eq!(hash1c, hash2c, "Chunk 3 (seed=333) should match");
1419
1420 eprintln!("Test 2: Striped pattern creation");
1422 let mut gen = DataGenerator::new(GeneratorConfig {
1423 size: 40 * 1024 * 1024,
1424 dedup_factor: 1,
1425 compress_factor: 1,
1426 numa_mode: NumaMode::Auto,
1427 max_threads: None,
1428 numa_node: None,
1429 block_size: None,
1430 seed: Some(1111),
1431 });
1432
1433 let mut buf = vec![0u8; chunk_size];
1434
1435 gen.set_seed(Some(1111));
1437 gen.fill_chunk(&mut buf);
1438 let stripe1_hash = hash_buffer(&buf);
1439
1440 gen.set_seed(Some(2222));
1442 gen.fill_chunk(&mut buf);
1443 let stripe2_hash = hash_buffer(&buf);
1444
1445 gen.set_seed(Some(1111));
1447 gen.fill_chunk(&mut buf);
1448 let stripe3_hash = hash_buffer(&buf);
1449
1450 gen.set_seed(Some(2222));
1452 gen.fill_chunk(&mut buf);
1453 let stripe4_hash = hash_buffer(&buf);
1454
1455 eprintln!(" Stripe 1 (A): {:016x}", stripe1_hash);
1456 eprintln!(" Stripe 2 (B): {:016x}", stripe2_hash);
1457 eprintln!(" Stripe 3 (A): {:016x}", stripe3_hash);
1458 eprintln!(" Stripe 4 (B): {:016x}", stripe4_hash);
1459
1460 assert_eq!(
1461 stripe1_hash, stripe3_hash,
1462 "Stripe A should be reproducible"
1463 );
1464 assert_eq!(
1465 stripe2_hash, stripe4_hash,
1466 "Stripe B should be reproducible"
1467 );
1468 assert_ne!(stripe1_hash, stripe2_hash, "Stripe A and B should differ");
1469
1470 eprintln!("✅ All stream reset tests passed!");
1471 }
1472}