Skip to main content

dgen_data/
generator.rs

1// src/generator.rs
2//
3// SPDX-License-Identifier: MIT OR Apache-2.0
4
5//! High-performance data generation with controllable deduplication and compression
6//!
7//! Ported from s3dlio/src/data_gen_alt.rs with NUMA optimizations
8
9use rand::{RngCore, SeedableRng};
10use rand_xoshiro::Xoshiro256PlusPlus;
11use rayon::prelude::*;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14use crate::constants::*;
15
16#[cfg(feature = "numa")]
17use crate::numa::NumaTopology;
18
19#[cfg(feature = "numa")]
20use hwlocality::{
21    memory::binding::{MemoryBindingFlags, MemoryBindingPolicy},
22    Topology,
23};
24
25/// ZERO-COPY buffer abstraction for UMA and NUMA allocations
26///
27/// CRITICAL: This type NEVER copies data - it holds the actual memory and provides
28/// mutable slices for zero-copy operations. Python bindings access this memory
29/// directly via raw pointers.
30#[cfg(feature = "numa")]
31pub enum DataBuffer {
32    /// UMA allocation using Vec<u8> (fast path, 43-50 GB/s)
33    /// Python accesses via Vec's raw pointer
34    Uma(Vec<u8>),
35    /// NUMA allocation using hwlocality Bytes (target: 1,200-1,400 GB/s)
36    /// Python accesses via Bytes' raw pointer - ZERO COPY to Python!
37    /// Stores (Topology, Bytes, actual_size) to keep Topology alive
38    Numa((Topology, hwlocality::memory::binding::Bytes<'static>, usize)),
39}
40
41#[cfg(feature = "numa")]
42impl DataBuffer {
43    /// Get mutable slice for data generation (zero-copy)
44    pub fn as_mut_slice(&mut self) -> &mut [u8] {
45        match self {
46            DataBuffer::Uma(vec) => vec.as_mut_slice(),
47            DataBuffer::Numa((_, bytes, _)) => {
48                // SAFETY: We've allocated this buffer and will initialize it
49                unsafe {
50                    std::slice::from_raw_parts_mut(bytes.as_mut_ptr() as *mut u8, bytes.len())
51                }
52            }
53        }
54    }
55
56    /// Get immutable slice view (zero-copy)
57    pub fn as_slice(&self) -> &[u8] {
58        match self {
59            DataBuffer::Uma(vec) => vec.as_slice(),
60            DataBuffer::Numa((_, bytes, size)) => {
61                // SAFETY: Buffer has been fully initialized
62                unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const u8, *size) }
63            }
64        }
65    }
66
67    /// Get raw pointer for zero-copy Python access
68    pub fn as_ptr(&self) -> *const u8 {
69        match self {
70            DataBuffer::Uma(vec) => vec.as_ptr(),
71            DataBuffer::Numa((_, bytes, _)) => bytes.as_ptr() as *const u8,
72        }
73    }
74
75    /// Get mutable raw pointer for zero-copy Python access
76    pub fn as_mut_ptr(&mut self) -> *mut u8 {
77        match self {
78            DataBuffer::Uma(vec) => vec.as_mut_ptr(),
79            DataBuffer::Numa((_, bytes, _)) => bytes.as_mut_ptr() as *mut u8,
80        }
81    }
82
83    /// Get length (actual data size, not allocated size)
84    pub fn len(&self) -> usize {
85        match self {
86            DataBuffer::Uma(vec) => vec.len(),
87            DataBuffer::Numa((_, _, size)) => *size,
88        }
89    }
90
91    /// Check if buffer is empty
92    pub fn is_empty(&self) -> bool {
93        self.len() == 0
94    }
95
96    /// Truncate to requested size (modifies metadata only, NO COPY)
97    pub fn truncate(&mut self, size: usize) {
98        match self {
99            DataBuffer::Uma(vec) => vec.truncate(size),
100            DataBuffer::Numa((_, bytes, actual_size)) => {
101                *actual_size = size.min(bytes.len());
102            }
103        }
104    }
105
106    /// Convert to bytes::Bytes for Python API (ZERO-COPY for UMA, minimal copy for NUMA)
107    ///
108    /// For UMA: Uses Bytes::from(Vec<u8>) which is cheap (just wraps the allocation)
109    /// For NUMA: Must copy to bytes::Bytes since hwlocality::Bytes can't be converted directly
110    ///          Alternative: Keep as DataBuffer and implement Python buffer protocol directly
111    pub fn into_bytes(self) -> bytes::Bytes {
112        match self {
113            DataBuffer::Uma(vec) => bytes::Bytes::from(vec),
114            DataBuffer::Numa((_, hwloc_bytes, size)) => {
115                // Convert NUMA-allocated memory to bytes::Bytes
116                // Unfortunately this requires a copy since bytes::Bytes needs owned data
117                let slice =
118                    unsafe { std::slice::from_raw_parts(hwloc_bytes.as_ptr() as *const u8, size) };
119                bytes::Bytes::copy_from_slice(slice)
120            }
121        }
122    }
123}
124
125#[cfg(not(feature = "numa"))]
126pub enum DataBuffer {
127    Uma(Vec<u8>),
128}
129
130#[cfg(not(feature = "numa"))]
131impl DataBuffer {
132    pub fn as_mut_slice(&mut self) -> &mut [u8] {
133        match self {
134            DataBuffer::Uma(vec) => vec.as_mut_slice(),
135        }
136    }
137
138    pub fn as_slice(&self) -> &[u8] {
139        match self {
140            DataBuffer::Uma(vec) => vec.as_slice(),
141        }
142    }
143
144    pub fn as_ptr(&self) -> *const u8 {
145        match self {
146            DataBuffer::Uma(vec) => vec.as_ptr(),
147        }
148    }
149
150    pub fn as_mut_ptr(&mut self) -> *mut u8 {
151        match self {
152            DataBuffer::Uma(vec) => vec.as_mut_ptr(),
153        }
154    }
155
156    pub fn len(&self) -> usize {
157        match self {
158            DataBuffer::Uma(vec) => vec.len(),
159        }
160    }
161
162    pub fn truncate(&mut self, size: usize) {
163        match self {
164            DataBuffer::Uma(vec) => vec.truncate(size),
165        }
166    }
167}
168
169/// Allocate NUMA-aware buffer on specific node
170///
171/// # Returns
172/// - Ok((Topology, Bytes, size)) on successful NUMA allocation
173/// - Err(String) on failure (caller should fall back to UMA)
174#[cfg(feature = "numa")]
175fn allocate_numa_buffer(
176    size: usize,
177    node_id: usize,
178) -> Result<(Topology, hwlocality::memory::binding::Bytes<'static>, usize), String> {
179    use hwlocality::object::types::ObjectType;
180
181    // Create topology
182    let topology =
183        Topology::new().map_err(|e| format!("Failed to create hwloc topology: {}", e))?;
184
185    // Find NUMA node
186    let numa_nodes: Vec<_> = topology.objects_with_type(ObjectType::NUMANode).collect();
187
188    if numa_nodes.is_empty() {
189        return Err("No NUMA nodes found in topology".to_string());
190    }
191
192    // Get the NUMA node by OS index
193    let node = numa_nodes
194        .iter()
195        .find(|n| n.os_index() == Some(node_id))
196        .ok_or_else(|| {
197            format!(
198                "NUMA node {} not found (available: {:?})",
199                node_id,
200                numa_nodes
201                    .iter()
202                    .filter_map(|n| n.os_index())
203                    .collect::<Vec<_>>()
204            )
205        })?;
206
207    // Get nodeset for this NUMA node
208    let nodeset = node
209        .nodeset()
210        .ok_or_else(|| format!("NUMA node {} has no nodeset", node_id))?;
211
212    tracing::debug!(
213        "Allocating {} bytes on NUMA node {} with nodeset {:?}",
214        size,
215        node_id,
216        nodeset
217    );
218
219    // Allocate memory bound to this NUMA node
220    // Using ASSUME_SINGLE_THREAD flag for maximum portability
221    let bytes = topology
222        .binding_allocate_memory(
223            size,
224            nodeset,
225            MemoryBindingPolicy::Bind,
226            MemoryBindingFlags::ASSUME_SINGLE_THREAD,
227        )
228        .map_err(|e| format!("Failed to allocate NUMA memory: {}", e))?;
229
230    // SAFETY: We need to extend the lifetime to 'static because we're storing
231    // both Topology and Bytes together, and Bytes' lifetime is tied to Topology.
232    // This is safe because we keep Topology alive as long as Bytes exists.
233    let bytes_static = unsafe {
234        std::mem::transmute::<
235            hwlocality::memory::binding::Bytes<'_>,
236            hwlocality::memory::binding::Bytes<'static>,
237        >(bytes)
238    };
239
240    Ok((topology, bytes_static, size))
241}
242
243/// NUMA optimization mode
244#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
245pub enum NumaMode {
246    /// Auto-detect: enable NUMA optimizations only on multi-node systems
247    #[default]
248    Auto,
249    /// Force NUMA: enable optimizations even on UMA systems (for testing)
250    Force,
251    /// Disable: never use NUMA optimizations (default for cloud/VM)
252    Disabled,
253}
254
255/// Configuration for data generation
256#[derive(Debug, Clone)]
257pub struct GeneratorConfig {
258    /// Total size in bytes
259    pub size: usize,
260    /// Deduplication factor (1 = no dedup, N = N:1 logical:physical ratio)
261    pub dedup_factor: usize,
262    /// Compression factor (1 = incompressible, N = N:1 logical:physical ratio)
263    pub compress_factor: usize,
264    /// NUMA optimization mode (Auto, Force, or Disabled)
265    pub numa_mode: NumaMode,
266    /// Maximum number of threads to use (None = use all available cores)
267    pub max_threads: Option<usize>,
268    /// Pin to specific NUMA node (None = use all nodes, Some(n) = pin to node n)
269    /// When set, only uses cores from this NUMA node and limits threads accordingly
270    pub numa_node: Option<usize>,
271    /// Internal block size for parallelization (None = use BLOCK_SIZE constant)
272    /// Larger blocks (16-32 MB) improve throughput by amortizing Rayon overhead
273    /// but use more memory. Must be at least 1 MB and at most 32 MB.
274    pub block_size: Option<usize>,
275    /// Random seed for reproducible data generation (None = use time + urandom)
276    /// When set, generates identical data for the same seed value
277    pub seed: Option<u64>,
278}
279
280impl Default for GeneratorConfig {
281    fn default() -> Self {
282        Self {
283            size: BLOCK_SIZE,
284            dedup_factor: 1,
285            compress_factor: 1,
286            numa_mode: NumaMode::Auto,
287            max_threads: None, // Use all available cores
288            seed: None,        // Use time + urandom
289            numa_node: None,   // Use all NUMA nodes
290            block_size: None,  // Use BLOCK_SIZE constant (4 MB)
291        }
292    }
293}
294
295/// Simple API: Generate data with default config
296///
297/// # Parameters
298/// - `size`: Total bytes to generate
299/// - `dedup`: Deduplication factor (1 = no dedup, N = N:1 ratio)
300/// - `compress`: Compression factor (1 = incompressible, N = N:1 ratio)
301///
302/// # Example
303/// ```rust
304/// use dgen_data::generate_data_simple;
305///
306/// // Generate 100 MiB incompressible data with no deduplication
307/// let data = generate_data_simple(100 * 1024 * 1024, 1, 1);
308/// assert_eq!(data.len(), 100 * 1024 * 1024);
309/// ```
310pub fn generate_data_simple(size: usize, dedup: usize, compress: usize) -> DataBuffer {
311    let config = GeneratorConfig {
312        size,
313        dedup_factor: dedup.max(1),
314        compress_factor: compress.max(1),
315        numa_mode: NumaMode::Auto,
316        max_threads: None,
317        numa_node: None,
318        block_size: None,
319        seed: None,
320    };
321    generate_data(config)
322}
323
324/// Generate data with full configuration (ZERO-COPY - returns DataBuffer)
325///
326/// # Algorithm
327/// 1. Fill blocks with Xoshiro256++ keystream (high entropy baseline)
328/// 2. Add local back-references for compression
329/// 3. Use round-robin deduplication across unique blocks
330/// 4. Parallel generation via rayon (NUMA-aware if enabled)
331///
332/// # Performance
333/// - 5-15 GB/s per core with incompressible data
334/// - 1-4 GB/s with compression enabled (depends on compress factor)
335/// - Near-linear scaling with CPU cores
336///
337/// # Returns
338/// DataBuffer that holds the generated data without copying:
339/// - UMA: Vec<u8> wrapper
340/// - NUMA: hwlocality Bytes wrapper (when numa_node is specified)
341///
342/// Python accesses this memory directly via buffer protocol - ZERO COPY!
343pub fn generate_data(config: GeneratorConfig) -> DataBuffer {
344    // Validate and get effective block size (default 4 MB, max 32 MB)
345    let block_size = config
346        .block_size
347        .map(|bs| bs.clamp(1024 * 1024, 32 * 1024 * 1024)) // 1 MB min, 32 MB max
348        .unwrap_or(BLOCK_SIZE);
349
350    tracing::info!(
351        "Starting data generation: size={}, dedup={}, compress={}, block_size={}",
352        config.size,
353        config.dedup_factor,
354        config.compress_factor,
355        block_size
356    );
357
358    let size = config.size.max(block_size); // Use block_size as minimum
359    let nblocks = size.div_ceil(block_size);
360
361    let dedup_factor = config.dedup_factor.max(1);
362    let unique_blocks = if dedup_factor > 1 {
363        ((nblocks as f64) / (dedup_factor as f64)).round().max(1.0) as usize
364    } else {
365        nblocks
366    };
367
368    tracing::debug!(
369        "Generating: size={}, blocks={}, dedup={}, unique_blocks={}, compress={}",
370        size,
371        nblocks,
372        dedup_factor,
373        unique_blocks,
374        config.compress_factor
375    );
376
377    // Calculate per-block copy lengths using integer error accumulation
378    // This ensures even distribution of compression across blocks
379    let (f_num, f_den) = if config.compress_factor > 1 {
380        (config.compress_factor - 1, config.compress_factor)
381    } else {
382        (0, 1)
383    };
384    let floor_len = (f_num * block_size) / f_den;
385    let rem = (f_num * block_size) % f_den;
386
387    let copy_lens: Vec<usize> = {
388        let mut v = Vec::with_capacity(unique_blocks);
389        let mut err = 0;
390        for _ in 0..unique_blocks {
391            err += rem;
392            if err >= f_den {
393                err -= f_den;
394                v.push(floor_len + 1);
395            } else {
396                v.push(floor_len);
397            }
398        }
399        v
400    };
401
402    // Per-call entropy for RNG seeding
403    let call_entropy = generate_call_entropy();
404
405    // Allocate buffer (NUMA-aware if numa_node is specified)
406    let total_size = nblocks * block_size;
407    tracing::debug!("Allocating {} bytes ({} blocks)", total_size, nblocks);
408
409    // CRITICAL: UMA fast path - always use Vec<u8> when numa_node is None
410    // This preserves 43-50 GB/s performance on UMA systems
411    #[cfg(feature = "numa")]
412    let mut data_buffer = if let Some(node_id) = config.numa_node {
413        tracing::info!("Attempting NUMA allocation on node {}", node_id);
414        match allocate_numa_buffer(total_size, node_id) {
415            Ok(buffer) => {
416                tracing::info!(
417                    "Successfully allocated {} bytes on NUMA node {}",
418                    total_size,
419                    node_id
420                );
421                DataBuffer::Numa(buffer)
422            }
423            Err(e) => {
424                tracing::warn!("NUMA allocation failed: {}, falling back to UMA", e);
425                DataBuffer::Uma(vec![0u8; total_size])
426            }
427        }
428    } else {
429        DataBuffer::Uma(vec![0u8; total_size])
430    };
431
432    #[cfg(not(feature = "numa"))]
433    let mut data_buffer = DataBuffer::Uma(vec![0u8; total_size]);
434
435    // NUMA optimization check
436    #[cfg(feature = "numa")]
437    let numa_topology = if config.numa_mode != NumaMode::Disabled {
438        NumaTopology::detect().ok()
439    } else {
440        None
441    };
442
443    // Adjust thread count if pinning to specific NUMA node
444    #[cfg(feature = "numa")]
445    let num_threads = if let Some(node_id) = config.numa_node {
446        if let Some(ref topology) = numa_topology {
447            if let Some(node) = topology.nodes.iter().find(|n| n.node_id == node_id) {
448                // Limit threads to cores available on this NUMA node
449                let node_cores = node.cpus.len();
450                let requested_threads = config.max_threads.unwrap_or(node_cores);
451                let threads = requested_threads.min(node_cores);
452                tracing::info!(
453                    "Pinning to NUMA node {}: using {} threads ({} cores available)",
454                    node_id,
455                    threads,
456                    node_cores
457                );
458                threads
459            } else {
460                tracing::warn!(
461                    "NUMA node {} not found, using default thread count",
462                    node_id
463                );
464                config.max_threads.unwrap_or_else(get_affinity_cpu_count)
465            }
466        } else {
467            tracing::warn!("NUMA topology not available, falling back to CPU affinity mask");
468            // CRITICAL: When numa_node is specified but topology unavailable,
469            // respect the process's CPU affinity mask (set by Python multiprocessing)
470            config.max_threads.unwrap_or_else(get_affinity_cpu_count)
471        }
472    } else {
473        // No specific NUMA node, use all cores
474        config.max_threads.unwrap_or_else(num_cpus::get)
475    };
476
477    #[cfg(not(feature = "numa"))]
478    let num_threads = config.max_threads.unwrap_or_else(num_cpus::get);
479
480    tracing::info!("Using {} threads for parallel generation", num_threads);
481
482    #[cfg(feature = "numa")]
483    let should_optimize_numa = if let Some(ref topology) = numa_topology {
484        let optimize = match config.numa_mode {
485            NumaMode::Auto => topology.num_nodes > 1,
486            NumaMode::Force => true,
487            NumaMode::Disabled => false,
488        };
489
490        if optimize {
491            tracing::info!(
492                "NUMA optimization enabled: {} nodes detected",
493                topology.num_nodes
494            );
495        } else {
496            tracing::debug!(
497                "NUMA optimization not needed: {} nodes detected",
498                topology.num_nodes
499            );
500        }
501        optimize
502    } else {
503        false
504    };
505
506    #[cfg(not(feature = "numa"))]
507    let should_optimize_numa = false;
508
509    tracing::debug!("Starting parallel generation with rayon");
510
511    // Build thread pool with optional NUMA-aware thread pinning
512    // Only pin threads on true NUMA systems (>1 node) - adds overhead on UMA
513    #[cfg(all(feature = "numa", feature = "thread-pinning"))]
514    let pool = if should_optimize_numa {
515        if let Some(ref topology) = numa_topology {
516            if topology.num_nodes > 1 {
517                tracing::debug!(
518                    "Configuring NUMA-aware thread pinning for {} nodes",
519                    topology.num_nodes
520                );
521
522                // Build CPU affinity mapping (wrap in Arc for sharing across threads)
523                let cpu_map = std::sync::Arc::new(build_cpu_affinity_map(
524                    topology,
525                    num_threads,
526                    config.numa_node,
527                ));
528
529                rayon::ThreadPoolBuilder::new()
530                    .num_threads(num_threads)
531                    .spawn_handler(move |thread| {
532                        let cpu_map = cpu_map.clone();
533                        let mut b = std::thread::Builder::new();
534                        if let Some(name) = thread.name() {
535                            b = b.name(name.to_owned());
536                        }
537                        if let Some(stack_size) = thread.stack_size() {
538                            b = b.stack_size(stack_size);
539                        }
540
541                        b.spawn(move || {
542                            // Pin this thread to specific CPU cores
543                            let thread_id = rayon::current_thread_index().unwrap_or(0);
544                            if let Some(core_ids) = cpu_map.get(&thread_id) {
545                                pin_thread_to_cores(core_ids);
546                            }
547                            thread.run()
548                        })?;
549                        Ok(())
550                    })
551                    .build()
552                    .expect("Failed to create NUMA-aware thread pool")
553            } else {
554                tracing::debug!("Skipping thread pinning on UMA system (would add overhead)");
555                rayon::ThreadPoolBuilder::new()
556                    .num_threads(num_threads)
557                    .build()
558                    .expect("Failed to create thread pool")
559            }
560        } else {
561            rayon::ThreadPoolBuilder::new()
562                .num_threads(num_threads)
563                .build()
564                .expect("Failed to create thread pool")
565        }
566    } else {
567        rayon::ThreadPoolBuilder::new()
568            .num_threads(num_threads)
569            .build()
570            .expect("Failed to create thread pool")
571    };
572
573    #[cfg(not(all(feature = "numa", feature = "thread-pinning")))]
574    let pool = rayon::ThreadPoolBuilder::new()
575        .num_threads(num_threads)
576        .build()
577        .expect("Failed to create thread pool");
578
579    // First-touch memory initialization for NUMA locality
580    // Only beneficial on true NUMA systems (>1 node)
581    // On UMA systems, this just adds overhead
582    #[cfg(feature = "numa")]
583    if should_optimize_numa {
584        if let Some(ref topology) = numa_topology {
585            if topology.num_nodes > 1 {
586                tracing::debug!(
587                    "Performing first-touch memory initialization for {} NUMA nodes",
588                    topology.num_nodes
589                );
590                pool.install(|| {
591                    let _data = data_buffer.as_mut_slice();
592                    _data.par_chunks_mut(block_size).for_each(|chunk| {
593                        // Touch each page to allocate it locally
594                        // Linux allocates memory on the node of the thread that first writes to it
595                        chunk[0] = 0;
596                        if chunk.len() > 4096 {
597                            chunk[chunk.len() - 1] = 0;
598                        }
599                    });
600                });
601            } else {
602                tracing::trace!("Skipping first-touch on UMA system");
603            }
604        }
605    }
606
607    pool.install(|| {
608        let data = data_buffer.as_mut_slice();
609        data.par_chunks_mut(block_size)
610            .enumerate()
611            .for_each(|(i, chunk)| {
612                let ub = i % unique_blocks;
613                tracing::trace!("Filling block {} (unique block {})", i, ub);
614                // Use sequential block index for reproducibility
615                fill_block(
616                    chunk,
617                    ub,
618                    copy_lens[ub].min(chunk.len()),
619                    i as u64,
620                    call_entropy,
621                );
622            });
623    });
624
625    tracing::debug!("Parallel generation complete, truncating to {} bytes", size);
626    // Truncate to requested size (metadata only, NO COPY!)
627    data_buffer.truncate(size);
628
629    // Return DataBuffer directly - Python accesses via raw pointer (ZERO COPY!)
630    data_buffer
631}
632
633/// Fill a single block with controlled compression
634///
635/// # Algorithm (OPTIMIZED January 2026)
636///
637/// **NEW METHOD (Current)**: Zero-fill for compression
638/// 1. Fill incompressible portion with Xoshiro256++ keystream (high-entropy random data)
639/// 2. Fill compressible portion with zeros (memset - extremely fast)
640///
641/// **OLD METHOD (Before Jan 2026)**: Back-reference approach
642/// - Filled entire block with RNG data
643/// - Created back-references using copy_within() in 64-256 byte chunks
644/// - SLOW: Required 2x memory traffic (write all, then copy 50% for 2:1 compression)
645/// - Example: 1 MB block @ 2:1 ratio = 1 MB RNG write + 512 KB of copy_within operations
646///
647/// **WHY CHANGED**:
648/// - Testing showed significant slowdown with compression enabled (1-4 GB/s vs 15 GB/s)
649/// - Back-references created small, inefficient memory copies
650/// - Zero-fill approach matches DLIO benchmark methodology
651/// - Much faster: memset is highly optimized (often CPU instruction or libc fast path)
652///
653/// **PERFORMANCE COMPARISON**:
654/// - Incompressible (copy_len=0): ~15 GB/s per core (both methods identical)
655/// - 2:1 compression (copy_len=50%): OLD ~2-4 GB/s, NEW ~10-12 GB/s (estimated)
656///
657/// # Parameters
658/// - `out`: Output buffer (BLOCK_SIZE bytes)
659/// - `unique_block_idx`: Index of unique block (for RNG seeding)
660/// - `copy_len`: Target bytes to make compressible (filled with zeros)
661/// - `block_sequence`: Sequential block number for RNG derivation
662/// - `seed_base`: Base seed for this generation session
663fn fill_block(
664    out: &mut [u8],
665    unique_block_idx: usize,
666    copy_len: usize,
667    block_sequence: u64,
668    seed_base: u64,
669) {
670    tracing::trace!(
671        "fill_block: idx={}, seq={}, copy_len={}, out_len={}",
672        unique_block_idx,
673        block_sequence,
674        copy_len,
675        out.len()
676    );
677
678    // Derive RNG from seed_base + sequential block number
679    // This ensures: same seed_base + same sequence → identical output
680    let seed = seed_base.wrapping_add(block_sequence);
681    let mut rng = Xoshiro256PlusPlus::seed_from_u64(seed);
682
683    // OPTIMIZED COMPRESSION METHOD (January 2026):
684    // For compress_factor N:1 ratio, we want (N-1)/N of the block to be compressible
685    // Example: 2:1 ratio means 50% compressible, 4:1 means 75% compressible
686    //
687    // Strategy: Fill incompressible portion with RNG, compressible portion with zeros
688    // This is MUCH faster than the old back-reference approach
689
690    if copy_len == 0 {
691        // No compression: fill entire block with high-entropy random data
692        tracing::trace!(
693            "Filling {} bytes with RNG keystream (incompressible)",
694            out.len()
695        );
696        rng.fill_bytes(out);
697    } else {
698        // With compression: split between random and zeros
699        let incompressible_len = out.len().saturating_sub(copy_len);
700
701        tracing::trace!(
702            "Filling block: {} bytes random (incompressible) + {} bytes zeros (compressible)",
703            incompressible_len,
704            copy_len
705        );
706
707        // Step 1: Fill incompressible portion with high-entropy keystream
708        if incompressible_len > 0 {
709            rng.fill_bytes(&mut out[..incompressible_len]);
710        }
711
712        // Step 2: Fill compressible portion with zeros (memset - super fast!)
713        // This is typically optimized to a CPU instruction or fast libc call
714        if copy_len > 0 && incompressible_len < out.len() {
715            out[incompressible_len..].fill(0);
716        }
717    }
718
719    tracing::trace!(
720        "fill_block complete: {} compressible bytes (zeros)",
721        copy_len
722    );
723}
724
725/// Generate per-call entropy from time + urandom
726fn generate_call_entropy() -> u64 {
727    let time_entropy = SystemTime::now()
728        .duration_since(UNIX_EPOCH)
729        .unwrap_or_default()
730        .as_nanos() as u64;
731
732    let urandom_entropy: u64 = {
733        let mut rng = rand::rng();
734        rng.next_u64()
735    };
736
737    time_entropy.wrapping_add(urandom_entropy)
738}
739
740#[cfg(all(feature = "numa", feature = "thread-pinning"))]
741use std::collections::HashMap;
742
743/// Get CPU count from current process affinity mask
744/// Falls back to num_cpus::get() if affinity cannot be determined
745fn get_affinity_cpu_count() -> usize {
746    #[cfg(target_os = "linux")]
747    {
748        // Try to read /proc/self/status to get Cpus_allowed_list
749        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
750            for line in status.lines() {
751                if line.starts_with("Cpus_allowed_list:") {
752                    if let Some(cpus) = line.split(':').nth(1) {
753                        let cpus = cpus.trim();
754                        let count = parse_cpu_list(cpus);
755                        if count > 0 {
756                            tracing::debug!("CPU affinity mask: {} CPUs ({})", count, cpus);
757                            return count;
758                        }
759                    }
760                }
761            }
762        }
763    }
764
765    // Fallback to system CPU count
766    num_cpus::get()
767}
768
769/// Parse Linux CPU list (e.g., "0-23" or "0-11,24-35")
770#[cfg(target_os = "linux")]
771fn parse_cpu_list(cpu_list: &str) -> usize {
772    let mut count = 0;
773    for range in cpu_list.split(',') {
774        let range = range.trim();
775        if range.is_empty() {
776            continue;
777        }
778
779        if let Some((start, end)) = range.split_once('-') {
780            if let (Ok(s), Ok(e)) = (start.parse::<usize>(), end.parse::<usize>()) {
781                count += (e - s) + 1;
782            }
783        } else if range.parse::<usize>().is_ok() {
784            count += 1;
785        }
786    }
787    count
788}
789
790/// Build CPU affinity map for thread pinning
791#[cfg(all(feature = "numa", feature = "thread-pinning"))]
792/// Build CPU affinity map for thread pinning
793/// If numa_node is Some(n), only use cores from NUMA node n
794/// If numa_node is None, distribute threads across all NUMA nodes
795#[cfg(all(feature = "numa", feature = "thread-pinning"))]
796fn build_cpu_affinity_map(
797    topology: &crate::numa::NumaTopology,
798    num_threads: usize,
799    numa_node: Option<usize>,
800) -> HashMap<usize, Vec<usize>> {
801    let mut map = HashMap::new();
802
803    if let Some(target_node_id) = numa_node {
804        // Pin to specific NUMA node only
805        if let Some(target_node) = topology.nodes.iter().find(|n| n.node_id == target_node_id) {
806            tracing::info!(
807                "Pinning {} threads to NUMA node {} ({} cores available)",
808                num_threads,
809                target_node_id,
810                target_node.cpus.len()
811            );
812
813            // Distribute threads across cores in this NUMA node only
814            for thread_id in 0..num_threads {
815                let core_idx = thread_id % target_node.cpus.len();
816                let core_id = target_node.cpus[core_idx];
817
818                tracing::trace!(
819                    "Thread {} -> NUMA node {} core {}",
820                    thread_id,
821                    target_node_id,
822                    core_id
823                );
824                map.insert(thread_id, vec![core_id]);
825            }
826        } else {
827            tracing::warn!(
828                "NUMA node {} not found in topology (available: 0-{})",
829                target_node_id,
830                topology.num_nodes - 1
831            );
832        }
833    } else {
834        // Distribute threads across ALL NUMA nodes (old behavior)
835        let mut thread_id = 0;
836        let mut node_idx = 0;
837
838        while thread_id < num_threads {
839            if let Some(node) = topology.nodes.get(node_idx % topology.nodes.len()) {
840                // Assign threads to cores within this NUMA node
841                let cores_per_thread =
842                    (node.cpus.len() as f64 / num_threads as f64).ceil() as usize;
843                let cores_per_thread = cores_per_thread.max(1);
844
845                let start_cpu = (thread_id * cores_per_thread) % node.cpus.len();
846                let end_cpu = ((thread_id + 1) * cores_per_thread).min(node.cpus.len());
847
848                let core_ids: Vec<usize> = node.cpus[start_cpu..end_cpu].to_vec();
849
850                if !core_ids.is_empty() {
851                    tracing::trace!(
852                        "Thread {} -> NUMA node {} cores {:?}",
853                        thread_id,
854                        node.node_id,
855                        &core_ids
856                    );
857                    map.insert(thread_id, core_ids);
858                }
859            }
860
861            thread_id += 1;
862            node_idx += 1;
863        }
864    }
865
866    map
867}
868
869/// Pin current thread to specific CPU cores
870#[cfg(all(feature = "numa", feature = "thread-pinning"))]
871fn pin_thread_to_cores(core_ids: &[usize]) {
872    if let Some(&first_core) = core_ids.first() {
873        if let Some(core_ids_all) = core_affinity::get_core_ids() {
874            if first_core < core_ids_all.len() {
875                let core_id = core_ids_all[first_core];
876                if core_affinity::set_for_current(core_id) {
877                    tracing::trace!("Pinned thread to core {}", first_core);
878                } else {
879                    tracing::debug!("Failed to pin thread to core {}", first_core);
880                }
881            }
882        }
883    }
884}
885
886// =============================================================================
887// Streaming Generator
888// =============================================================================
889
890/// Streaming data generator (like ObjectGenAlt from s3dlio)
891pub struct DataGenerator {
892    total_size: usize,
893    current_pos: usize,
894    #[allow(dead_code)]
895    dedup_factor: usize,
896    #[allow(dead_code)]
897    compress_factor: usize,
898    unique_blocks: usize,
899    copy_lens: Vec<usize>,
900    call_entropy: u64,
901    block_sequence: u64, // Sequential counter for RNG derivation (reset by set_seed)
902    max_threads: usize,  // Thread count for parallel generation
903    thread_pool: Option<rayon::ThreadPool>, // Reused thread pool (created once)
904    block_size: usize,   // Internal parallelization block size (4-32 MB)
905}
906
907impl DataGenerator {
908    /// Create new streaming generator
909    pub fn new(config: GeneratorConfig) -> Self {
910        // Validate and get effective block size (default 4 MB, max 32 MB)
911        let block_size = config
912            .block_size
913            .map(|bs| bs.clamp(1024 * 1024, 32 * 1024 * 1024)) // 1 MB min, 32 MB max
914            .unwrap_or(BLOCK_SIZE);
915
916        tracing::info!(
917            "Creating DataGenerator: size={}, dedup={}, compress={}, block_size={}",
918            config.size,
919            config.dedup_factor,
920            config.compress_factor,
921            block_size
922        );
923
924        let total_size = config.size.max(block_size); // Use block_size as minimum
925        let nblocks = total_size.div_ceil(block_size);
926
927        let dedup_factor = config.dedup_factor.max(1);
928        let unique_blocks = if dedup_factor > 1 {
929            ((nblocks as f64) / (dedup_factor as f64)).round().max(1.0) as usize
930        } else {
931            nblocks
932        };
933
934        // Calculate copy lengths
935        let (f_num, f_den) = if config.compress_factor > 1 {
936            (config.compress_factor - 1, config.compress_factor)
937        } else {
938            (0, 1)
939        };
940        let floor_len = (f_num * block_size) / f_den;
941        let rem = (f_num * block_size) % f_den;
942
943        let copy_lens: Vec<usize> = {
944            let mut v = Vec::with_capacity(unique_blocks);
945            let mut err = 0;
946            for _ in 0..unique_blocks {
947                err += rem;
948                if err >= f_den {
949                    err -= f_den;
950                    v.push(floor_len + 1);
951                } else {
952                    v.push(floor_len);
953                }
954            }
955            v
956        };
957
958        // Use provided seed or generate entropy from time + urandom
959        let call_entropy = config.seed.unwrap_or_else(generate_call_entropy);
960
961        let max_threads = config.max_threads.unwrap_or_else(num_cpus::get);
962
963        // Create thread pool ONCE for reuse (major performance optimization)
964        let thread_pool = if max_threads > 1 {
965            match rayon::ThreadPoolBuilder::new()
966                .num_threads(max_threads)
967                .build()
968            {
969                Ok(pool) => {
970                    tracing::info!(
971                        "DataGenerator configured with {} threads (thread pool created)",
972                        max_threads
973                    );
974                    Some(pool)
975                }
976                Err(e) => {
977                    tracing::warn!(
978                        "Failed to create thread pool: {}, falling back to sequential",
979                        e
980                    );
981                    None
982                }
983            }
984        } else {
985            tracing::info!("DataGenerator configured for single-threaded operation");
986            None
987        };
988
989        Self {
990            total_size,
991            current_pos: 0,
992            dedup_factor,
993            compress_factor: config.compress_factor,
994            unique_blocks,
995            copy_lens,
996            call_entropy,
997            block_sequence: 0, // Start at block 0
998            max_threads,
999            thread_pool,
1000            block_size,
1001        }
1002    }
1003
1004    /// Fill the next chunk of data
1005    ///
1006    /// Returns the number of bytes written. When this returns 0, generation is complete.
1007    ///
1008    /// **Performance**: When buffer contains multiple blocks (>=8 MB), generation is parallelized
1009    /// using rayon. Small buffers (<8 MB) use sequential generation to avoid threading overhead.
1010    pub fn fill_chunk(&mut self, buf: &mut [u8]) -> usize {
1011        tracing::trace!(
1012            "fill_chunk called: pos={}/{}, buf_len={}",
1013            self.current_pos,
1014            self.total_size,
1015            buf.len()
1016        );
1017
1018        if self.current_pos >= self.total_size {
1019            tracing::trace!("fill_chunk: already complete");
1020            return 0;
1021        }
1022
1023        let remaining = self.total_size - self.current_pos;
1024        let to_write = buf.len().min(remaining);
1025        let chunk = &mut buf[..to_write];
1026
1027        // Determine number of blocks to generate
1028        let start_block = self.current_pos / self.block_size;
1029        let start_offset = self.current_pos % self.block_size;
1030        let end_pos = self.current_pos + to_write;
1031        let end_block = (end_pos - 1) / self.block_size;
1032        let num_blocks = end_block - start_block + 1;
1033
1034        // Use parallel generation for large buffers (>=2 blocks), sequential for small
1035        // This avoids rayon overhead for tiny chunks
1036        const PARALLEL_THRESHOLD: usize = 2;
1037
1038        if num_blocks >= PARALLEL_THRESHOLD && self.max_threads > 1 {
1039            // PARALLEL PATH: Generate all blocks in parallel
1040            self.fill_chunk_parallel(chunk, start_block, start_offset, num_blocks)
1041        } else {
1042            // SEQUENTIAL PATH: Generate blocks one at a time (small buffers or single-threaded)
1043            self.fill_chunk_sequential(chunk, start_block, start_offset, num_blocks)
1044        }
1045    }
1046
1047    /// Sequential fill for small buffers
1048    #[inline]
1049    fn fill_chunk_sequential(
1050        &mut self,
1051        chunk: &mut [u8],
1052        start_block: usize,
1053        start_offset: usize,
1054        num_blocks: usize,
1055    ) -> usize {
1056        let mut offset = 0;
1057
1058        for i in 0..num_blocks {
1059            let block_idx = start_block + i;
1060            let block_offset = if i == 0 { start_offset } else { 0 };
1061            let remaining_in_block = self.block_size - block_offset;
1062            let to_copy = remaining_in_block.min(chunk.len() - offset);
1063
1064            // Map to unique block
1065            let ub = block_idx % self.unique_blocks;
1066
1067            // Generate full block
1068            let mut block_buf = vec![0u8; self.block_size];
1069            fill_block(
1070                &mut block_buf,
1071                ub,
1072                self.copy_lens[ub].min(self.block_size),
1073                self.block_sequence, // Use current sequence
1074                self.call_entropy,
1075            );
1076
1077            self.block_sequence += 1; // Increment for next block
1078
1079            // Copy needed portion
1080            chunk[offset..offset + to_copy]
1081                .copy_from_slice(&block_buf[block_offset..block_offset + to_copy]);
1082
1083            offset += to_copy;
1084        }
1085
1086        let to_write = offset;
1087        self.current_pos += to_write;
1088
1089        tracing::debug!(
1090            "fill_chunk_sequential: generated {} blocks ({} MiB) for {} byte chunk",
1091            num_blocks,
1092            num_blocks * 4,
1093            to_write
1094        );
1095
1096        to_write
1097    }
1098
1099    /// Parallel fill for large buffers (uses reused thread pool - ZERO COPY)
1100    fn fill_chunk_parallel(
1101        &mut self,
1102        chunk: &mut [u8],
1103        start_block: usize,
1104        start_offset: usize,
1105        num_blocks: usize,
1106    ) -> usize {
1107        use rayon::prelude::*;
1108
1109        // Use stored thread pool if available, otherwise fall back to sequential
1110        let thread_pool = match &self.thread_pool {
1111            Some(pool) => pool,
1112            None => {
1113                // No thread pool - fall back to sequential
1114                return self.fill_chunk_sequential(chunk, start_block, start_offset, num_blocks);
1115            }
1116        };
1117
1118        let call_entropy = self.call_entropy;
1119        let copy_lens = &self.copy_lens;
1120        let unique_blocks = self.unique_blocks;
1121        let block_size = self.block_size;
1122        let base_sequence = self.block_sequence; // Capture current sequence
1123
1124        // ZERO-COPY: Generate directly into output buffer using par_chunks_mut
1125        // This is the same approach as generate_data() - no temporary allocations!
1126        thread_pool.install(|| {
1127            chunk
1128                .par_chunks_mut(block_size)
1129                .enumerate()
1130                .for_each(|(i, block_chunk)| {
1131                    let block_idx = start_block + i;
1132                    let ub = block_idx % unique_blocks;
1133                    let block_seq = base_sequence + (i as u64); // Sequential block number
1134
1135                    // Handle first block with offset
1136                    if i == 0 && start_offset > 0 {
1137                        // Generate full block into temp, copy needed portion
1138                        let mut temp = vec![0u8; block_size];
1139                        fill_block(
1140                            &mut temp,
1141                            ub,
1142                            copy_lens[ub].min(block_size),
1143                            block_seq,
1144                            call_entropy,
1145                        );
1146                        let copy_len = block_size
1147                            .saturating_sub(start_offset)
1148                            .min(block_chunk.len());
1149                        block_chunk[..copy_len]
1150                            .copy_from_slice(&temp[start_offset..start_offset + copy_len]);
1151                    } else {
1152                        // Generate directly into output buffer (ZERO-COPY!)
1153                        let actual_len = block_chunk.len().min(block_size);
1154                        fill_block(
1155                            &mut block_chunk[..actual_len],
1156                            ub,
1157                            copy_lens[ub].min(actual_len),
1158                            block_seq,
1159                            call_entropy,
1160                        );
1161                    }
1162                });
1163        });
1164
1165        let to_write = chunk.len();
1166        self.current_pos += to_write;
1167        self.block_sequence += num_blocks as u64; // Increment sequence for next fill
1168
1169        tracing::debug!(
1170            "fill_chunk_parallel: ZERO-COPY generated {} blocks ({} MiB) for {} byte chunk",
1171            num_blocks,
1172            num_blocks * 4,
1173            to_write
1174        );
1175
1176        to_write
1177    }
1178
1179    /// Reset generator to start
1180    pub fn reset(&mut self) {
1181        self.current_pos = 0;
1182    }
1183
1184    /// Get current position
1185    pub fn position(&self) -> usize {
1186        self.current_pos
1187    }
1188
1189    /// Get total size
1190    pub fn total_size(&self) -> usize {
1191        self.total_size
1192    }
1193
1194    /// Check if generation is complete
1195    pub fn is_complete(&self) -> bool {
1196        self.current_pos >= self.total_size
1197    }
1198
1199    /// Set or reset the random seed for subsequent data generation
1200    ///
1201    /// This allows changing the data pattern mid-stream while maintaining generation position.
1202    /// The new seed takes effect on the next `fill_chunk()` call.
1203    ///
1204    /// # Arguments
1205    /// * `seed` - New seed value, or None to use time+urandom entropy (non-deterministic)
1206    ///
1207    /// # Examples
1208    /// ```rust,no_run
1209    /// use dgen_data::{DataGenerator, GeneratorConfig, NumaMode};
1210    ///
1211    /// let config = GeneratorConfig {
1212    ///     size: 100 * 1024 * 1024,
1213    ///     dedup_factor: 1,
1214    ///     compress_factor: 1,
1215    ///     numa_mode: NumaMode::Auto,
1216    ///     max_threads: None,
1217    ///     numa_node: None,
1218    ///     block_size: None,
1219    ///     seed: Some(12345),
1220    /// };
1221    ///
1222    /// let mut gen = DataGenerator::new(config);
1223    /// let mut buffer = vec![0u8; 1024 * 1024];
1224    ///
1225    /// // Generate some data with initial seed
1226    /// gen.fill_chunk(&mut buffer);
1227    ///
1228    /// // Change seed for different pattern
1229    /// gen.set_seed(Some(67890));
1230    /// gen.fill_chunk(&mut buffer);  // Uses new seed
1231    ///
1232    /// // Switch to non-deterministic mode
1233    /// gen.set_seed(None);
1234    /// gen.fill_chunk(&mut buffer);  // Uses time+urandom
1235    /// ```
1236    pub fn set_seed(&mut self, seed: Option<u64>) {
1237        self.call_entropy = seed.unwrap_or_else(generate_call_entropy);
1238        // Reset block sequence counter - this ensures same seed → identical stream
1239        self.block_sequence = 0;
1240        tracing::debug!(
1241            "Seed reset: {} (entropy={}) - block_sequence reset to 0",
1242            if seed.is_some() {
1243                "deterministic"
1244            } else {
1245                "non-deterministic"
1246            },
1247            self.call_entropy
1248        );
1249    }
1250
1251    /// Get recommended chunk size for optimal performance
1252    ///
1253    /// Returns 32 MB, which provides the best balance between:
1254    /// - Parallelism: 8 blocks × 4 MB = good distribution across cores
1255    /// - Cache locality: Fits well in L3 cache
1256    /// - Memory overhead: Reasonable buffer size
1257    ///
1258    /// Based on empirical testing showing 32 MB is ~16% faster than 64 MB
1259    /// and significantly better than smaller or larger sizes.
1260    pub fn recommended_chunk_size() -> usize {
1261        32 * 1024 * 1024 // 32 MB
1262    }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267    use super::*;
1268
1269    fn init_tracing() {
1270        use tracing_subscriber::{fmt, EnvFilter};
1271        let _ = fmt()
1272            .with_env_filter(EnvFilter::from_default_env())
1273            .try_init();
1274    }
1275
1276    #[test]
1277    fn test_generate_minimal() {
1278        init_tracing();
1279        let data = generate_data_simple(100, 1, 1);
1280        assert_eq!(data.len(), BLOCK_SIZE);
1281    }
1282
1283    #[test]
1284    fn test_generate_exact_block() {
1285        init_tracing();
1286        let data = generate_data_simple(BLOCK_SIZE, 1, 1);
1287        assert_eq!(data.len(), BLOCK_SIZE);
1288    }
1289
1290    #[test]
1291    fn test_generate_multiple_blocks() {
1292        init_tracing();
1293        let size = BLOCK_SIZE * 10;
1294        let data = generate_data_simple(size, 1, 1);
1295        assert_eq!(data.len(), size);
1296    }
1297
1298    #[test]
1299    fn test_streaming_generator() {
1300        init_tracing();
1301        eprintln!("Starting streaming generator test...");
1302
1303        let config = GeneratorConfig {
1304            size: BLOCK_SIZE * 5,
1305            dedup_factor: 1,
1306            compress_factor: 1,
1307            numa_mode: NumaMode::Auto,
1308            max_threads: None,
1309            numa_node: None,
1310            block_size: None,
1311            seed: None,
1312        };
1313
1314        eprintln!("Config: {} blocks, {} bytes total", 5, BLOCK_SIZE * 5);
1315
1316        let mut gen = DataGenerator::new(config.clone());
1317        let mut result = Vec::new();
1318
1319        // Use a larger chunk size to avoid generating too many blocks
1320        // Generating 4 MiB block per 1024 bytes is 4096x overhead!
1321        let chunk_size = BLOCK_SIZE; // Use full block size for efficiency
1322        let mut chunk = vec![0u8; chunk_size];
1323
1324        let mut iterations = 0;
1325        while !gen.is_complete() {
1326            let written = gen.fill_chunk(&mut chunk);
1327            if written == 0 {
1328                break;
1329            }
1330            result.extend_from_slice(&chunk[..written]);
1331            iterations += 1;
1332
1333            if iterations % 10 == 0 {
1334                eprintln!(
1335                    "  Iteration {}: written={}, total={}",
1336                    iterations,
1337                    written,
1338                    result.len()
1339                );
1340            }
1341        }
1342
1343        eprintln!(
1344            "Completed in {} iterations, generated {} bytes",
1345            iterations,
1346            result.len()
1347        );
1348        assert_eq!(result.len(), config.size);
1349        assert!(gen.is_complete());
1350    }
1351
1352    #[test]
1353    fn test_set_seed_stream_reset() {
1354        use std::collections::hash_map::DefaultHasher;
1355        use std::hash::{Hash, Hasher};
1356
1357        fn hash_buffer(buf: &[u8]) -> u64 {
1358            let mut hasher = DefaultHasher::new();
1359            buf.hash(&mut hasher);
1360            hasher.finish()
1361        }
1362
1363        init_tracing();
1364        eprintln!("Testing set_seed() stream reset behavior...");
1365
1366        let size = 30 * 1024 * 1024; // 30 MB
1367        let chunk_size = 10 * 1024 * 1024; // 10 MB chunks
1368
1369        // Test 1: Same seed sequence produces identical data
1370        eprintln!("Test 1: Seed sequence reproducibility");
1371        let config = GeneratorConfig {
1372            size,
1373            dedup_factor: 1,
1374            compress_factor: 1,
1375            numa_mode: NumaMode::Auto,
1376            max_threads: None,
1377            numa_node: None,
1378            block_size: None,
1379            seed: Some(111),
1380        };
1381
1382        // First run with seed sequence: 111 -> 222 -> 333
1383        let mut gen1 = DataGenerator::new(config.clone());
1384        let mut buf1 = vec![0u8; chunk_size];
1385
1386        gen1.fill_chunk(&mut buf1);
1387        let hash1a = hash_buffer(&buf1);
1388
1389        gen1.set_seed(Some(222));
1390        gen1.fill_chunk(&mut buf1);
1391        let hash1b = hash_buffer(&buf1);
1392
1393        gen1.set_seed(Some(333));
1394        gen1.fill_chunk(&mut buf1);
1395        let hash1c = hash_buffer(&buf1);
1396
1397        // Second run with same seed sequence
1398        let mut gen2 = DataGenerator::new(config.clone());
1399        let mut buf2 = vec![0u8; chunk_size];
1400
1401        gen2.fill_chunk(&mut buf2);
1402        let hash2a = hash_buffer(&buf2);
1403
1404        gen2.set_seed(Some(222));
1405        gen2.fill_chunk(&mut buf2);
1406        let hash2b = hash_buffer(&buf2);
1407
1408        gen2.set_seed(Some(333));
1409        gen2.fill_chunk(&mut buf2);
1410        let hash2c = hash_buffer(&buf2);
1411
1412        eprintln!("  Chunk 1: hash1={:016x}, hash2={:016x}", hash1a, hash2a);
1413        eprintln!("  Chunk 2: hash1={:016x}, hash2={:016x}", hash1b, hash2b);
1414        eprintln!("  Chunk 3: hash1={:016x}, hash2={:016x}", hash1c, hash2c);
1415
1416        assert_eq!(hash1a, hash2a, "Chunk 1 (seed=111) should match");
1417        assert_eq!(hash1b, hash2b, "Chunk 2 (seed=222) should match");
1418        assert_eq!(hash1c, hash2c, "Chunk 3 (seed=333) should match");
1419
1420        // Test 2: Striped pattern (A-B-A-B) reproduces correctly
1421        eprintln!("Test 2: Striped pattern creation");
1422        let mut gen = DataGenerator::new(GeneratorConfig {
1423            size: 40 * 1024 * 1024,
1424            dedup_factor: 1,
1425            compress_factor: 1,
1426            numa_mode: NumaMode::Auto,
1427            max_threads: None,
1428            numa_node: None,
1429            block_size: None,
1430            seed: Some(1111),
1431        });
1432
1433        let mut buf = vec![0u8; chunk_size];
1434
1435        // Stripe 1: A
1436        gen.set_seed(Some(1111));
1437        gen.fill_chunk(&mut buf);
1438        let stripe1_hash = hash_buffer(&buf);
1439
1440        // Stripe 2: B
1441        gen.set_seed(Some(2222));
1442        gen.fill_chunk(&mut buf);
1443        let stripe2_hash = hash_buffer(&buf);
1444
1445        // Stripe 3: A (should match Stripe 1)
1446        gen.set_seed(Some(1111));
1447        gen.fill_chunk(&mut buf);
1448        let stripe3_hash = hash_buffer(&buf);
1449
1450        // Stripe 4: B (should match Stripe 2)
1451        gen.set_seed(Some(2222));
1452        gen.fill_chunk(&mut buf);
1453        let stripe4_hash = hash_buffer(&buf);
1454
1455        eprintln!("  Stripe 1 (A): {:016x}", stripe1_hash);
1456        eprintln!("  Stripe 2 (B): {:016x}", stripe2_hash);
1457        eprintln!("  Stripe 3 (A): {:016x}", stripe3_hash);
1458        eprintln!("  Stripe 4 (B): {:016x}", stripe4_hash);
1459
1460        assert_eq!(
1461            stripe1_hash, stripe3_hash,
1462            "Stripe A should be reproducible"
1463        );
1464        assert_eq!(
1465            stripe2_hash, stripe4_hash,
1466            "Stripe B should be reproducible"
1467        );
1468        assert_ne!(stripe1_hash, stripe2_hash, "Stripe A and B should differ");
1469
1470        eprintln!("✅ All stream reset tests passed!");
1471    }
1472}