rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
//! Optional CPU data-parallelism helpers for SoA batch kernels.
//!
//! This module is only compiled with the `rayon` feature. It exposes a small
//! set of helpers that parallelize per-element and chunked transformations
//! over `Vec<f32>` SoA columns extracted by
//! [`SoaExtractable`](rustsim_core::soa::SoaExtractable).
//!
//! Scope: these helpers are intentionally narrow. They target the SoA data
//! layout used by [`cpu_batch_step`](crate::compute::cpu_batch_step), which is
//! `Send + Sync`. The `RefCell`-backed agent store in
//! [`rustsim_core`](rustsim_core) is deliberately not parallelized here; for
//! parallelism above the SoA layer, users should either extract to SoA and
//! process in parallel, or run independent simulations in parallel via
//! [`ensemble`](crate::ensemble).
//!
//! # Example
//!
//! ```
//! # #[cfg(feature = "rayon")] {
//! use rustsim::parallel::{par_apply_column, par_apply_chunks};
//!
//! let mut x = vec![0.0_f32; 10_000];
//! par_apply_column(&mut x, |i, v| *v = i as f32);
//! par_apply_chunks(&mut x, 1024, |chunk| {
//!     for v in chunk {
//!         *v *= 2.0;
//!     }
//! });
//! # }
//! ```

use rayon::prelude::*;

/// Apply `f(index, &mut value)` in parallel over a single SoA column.
pub fn par_apply_column<F>(column: &mut [f32], f: F)
where
    F: Fn(usize, &mut f32) + Send + Sync,
{
    column.par_iter_mut().enumerate().for_each(|(i, v)| f(i, v));
}

/// Apply `f(chunk)` in parallel over fixed-size mutable chunks of a column.
///
/// The final chunk may be shorter than `chunk_size`.
pub fn par_apply_chunks<F>(column: &mut [f32], chunk_size: usize, f: F)
where
    F: Fn(&mut [f32]) + Send + Sync,
{
    assert!(chunk_size > 0, "chunk_size must be positive");
    column.par_chunks_mut(chunk_size).for_each(f);
}

/// Apply `f(index, [&mut f32; N])` in parallel across `N` parallel SoA columns.
///
/// All columns must have the same length. Panics if lengths differ.
pub fn par_apply_rows<const N: usize, F>(columns: &mut [&mut [f32]; N], f: F)
where
    F: Fn(usize, &mut [&mut f32; N]) + Send + Sync,
{
    let n = columns[0].len();
    for col in columns.iter() {
        assert_eq!(col.len(), n, "all columns must have equal length");
    }

    // Build per-row raw pointer arrays. Each row holds N distinct addresses,
    // one per column, so rows are pairwise-disjoint and safe to process in
    // parallel.
    #[repr(transparent)]
    struct Row<const N: usize>([*mut f32; N]);
    // SAFETY: each Row references disjoint addresses across disjoint columns
    // of equal length; no two parallel iterations can touch the same cell.
    unsafe impl<const N: usize> Send for Row<N> {}
    unsafe impl<const N: usize> Sync for Row<N> {}

    let rows: Vec<Row<N>> = (0..n)
        .map(|i| {
            let mut row: [*mut f32; N] = [std::ptr::null_mut(); N];
            for (c, col) in columns.iter_mut().enumerate() {
                row[c] = &mut col[i] as *mut f32;
            }
            Row(row)
        })
        .collect();

    rows.par_iter().enumerate().for_each(|(i, row)| {
        // SAFETY: row indices are unique across parallel iterations, and
        // each row's pointers reference disjoint columns, so no two threads
        // alias the same `&mut f32`.
        let mut refs: [&mut f32; N] = std::array::from_fn(|c| {
            let ptr = row.0[c];
            unsafe { &mut *ptr }
        });
        f(i, &mut refs);
    });
}

/// Apply `f(chunk_start, &mut [&mut [f32]])` in parallel over aligned chunks
/// across **multiple** SoA columns.
///
/// Every column must have the same length. Chunks are aligned: chunk `c`
/// spans `[c*chunk_size, (c+1)*chunk_size)` of every column, so the kernel
/// sees corresponding rows across all columns together.
///
/// This is the multi-column generalization of [`par_apply_chunks`] and the
/// runtime-sized counterpart of [`par_apply_rows`].
pub fn par_apply_chunks_multi<F>(columns: &mut [&mut [f32]], chunk_size: usize, f: F)
where
    F: Fn(usize, &mut [&mut [f32]]) + Send + Sync,
{
    assert!(chunk_size > 0, "chunk_size must be positive");
    if columns.is_empty() {
        return;
    }
    let n = columns[0].len();
    for col in columns.iter() {
        assert_eq!(col.len(), n, "all columns must have equal length");
    }
    if n == 0 {
        return;
    }
    let n_chunks = n.div_ceil(chunk_size);

    // Pointer snapshots per column. Each pointer refers to n valid f32 in the
    // caller's column; we derive disjoint sub-ranges below.
    #[repr(transparent)]
    struct ColPtr(*mut f32);
    // SAFETY: we hand out disjoint sub-slices per chunk; no two parallel
    // iterations touch overlapping addresses.
    unsafe impl Send for ColPtr {}
    unsafe impl Sync for ColPtr {}

    let ptrs: Vec<ColPtr> = columns.iter_mut().map(|c| ColPtr(c.as_mut_ptr())).collect();

    (0..n_chunks).into_par_iter().for_each(|c| {
        let start = c * chunk_size;
        let end = (start + chunk_size).min(n);
        let len = end - start;
        // SAFETY: chunks are pairwise disjoint (non-overlapping [start..end]
        // ranges of the same underlying columns). Each column's pointer is
        // valid for `n` elements, so offsetting by `start` and taking `len`
        // elements stays in-bounds.
        let mut slices: Vec<&mut [f32]> = ptrs
            .iter()
            .map(|p| unsafe { std::slice::from_raw_parts_mut(p.0.add(start), len) })
            .collect();
        f(start, &mut slices);
    });
}

/// Partition a scheduled `AgentId` list into chunks and process each chunk
/// in parallel.
///
/// This is the opt-in, read-only bridge between schedulers (which produce a
/// deterministic `Vec<AgentId>` per step) and parallel work that does **not**
/// touch the `!Send + !Sync` agent store. The closure receives each chunk's
/// start index within the full schedule plus a slice of IDs, and must
/// produce its effect through `Send + Sync` state (atomics, sharded
/// accumulators, SoA columns, etc.) that the caller has closed over.
///
/// Determinism: chunks are consumed in order, but the closure is invoked
/// concurrently across them. If the closure is a pure function of its chunk
/// input and commutatively merged side state, the overall result is
/// deterministic.
///
/// See the module docs for why this is explicitly _not_ a parallel agent
/// store.
pub fn par_chunked_schedule<F>(schedule: &[rustsim_core::types::AgentId], chunk_size: usize, f: F)
where
    F: Fn(usize, &[rustsim_core::types::AgentId]) + Send + Sync,
{
    assert!(chunk_size > 0, "chunk_size must be positive");
    schedule
        .par_chunks(chunk_size)
        .enumerate()
        .for_each(|(c, chunk)| f(c * chunk_size, chunk));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn par_apply_column_writes_all_indices() {
        let mut col = vec![0.0_f32; 1000];
        par_apply_column(&mut col, |i, v| *v = i as f32);
        for (i, v) in col.iter().enumerate() {
            assert_eq!(*v, i as f32);
        }
    }

    #[test]
    fn par_apply_chunks_processes_tail() {
        let mut col = vec![1.0_f32; 1003];
        par_apply_chunks(&mut col, 128, |chunk| {
            for v in chunk {
                *v = 2.0;
            }
        });
        assert!(col.iter().all(|v| *v == 2.0));
    }

    #[test]
    fn par_apply_rows_two_columns() {
        let mut a = vec![1.0_f32; 500];
        let mut b = vec![2.0_f32; 500];
        {
            let a_slice: &mut [f32] = &mut a;
            let b_slice: &mut [f32] = &mut b;
            let mut cols: [&mut [f32]; 2] = [a_slice, b_slice];
            par_apply_rows::<2, _>(&mut cols, |_i, row| {
                *row[0] += 10.0;
                *row[1] *= 3.0;
            });
        }
        assert!(a.iter().all(|v| *v == 11.0));
        assert!(b.iter().all(|v| *v == 6.0));
    }

    #[test]
    fn par_apply_chunks_multi_processes_aligned_chunks() {
        let mut a = vec![1.0_f32; 1003];
        let mut b = vec![2.0_f32; 1003];
        {
            let a_slice: &mut [f32] = &mut a;
            let b_slice: &mut [f32] = &mut b;
            let mut cols: [&mut [f32]; 2] = [a_slice, b_slice];
            par_apply_chunks_multi(&mut cols, 128, |start, slices| {
                let (head, tail) = slices.split_at_mut(1);
                let ca = &mut head[0];
                let cb = &mut tail[0];
                for (offset, (va, vb)) in ca.iter_mut().zip(cb.iter_mut()).enumerate() {
                    *va = (start + offset) as f32;
                    *vb = *va * 2.0;
                }
            });
        }
        for (i, v) in a.iter().enumerate() {
            assert_eq!(*v, i as f32);
        }
        for (i, v) in b.iter().enumerate() {
            assert_eq!(*v, (i as f32) * 2.0);
        }
    }

    #[test]
    fn par_chunked_schedule_covers_every_id_once() {
        use std::sync::atomic::{AtomicUsize, Ordering};
        let schedule: Vec<rustsim_core::types::AgentId> = (0..1000).collect();
        let counters: Vec<AtomicUsize> = (0..1000).map(|_| AtomicUsize::new(0)).collect();
        par_chunked_schedule(&schedule, 64, |start, chunk| {
            for (offset, id) in chunk.iter().enumerate() {
                assert_eq!(schedule[start + offset], *id);
                counters[*id as usize].fetch_add(1, Ordering::Relaxed);
            }
        });
        for c in &counters {
            assert_eq!(c.load(Ordering::Relaxed), 1);
        }
    }
}