tokitai-operator 0.1.0

Verified DL kernel compiler: formally-checked GEMM, p-adic, sheaf, contract-carrying ops. Paper-artifact grade.
Documentation
//! Mini-batch streaming wrapper over any `Vec<QualitySample>`.
//!
//! A `SyntheticSampleStream` owns a dataset and yields fixed-size
//! `Vec<QualitySample>` mini-batches via [`SyntheticSampleStream::next_batch`]
//! until the dataset is exhausted. The last batch may be smaller
//! than the configured batch size, matching PyTorch's
//! `drop_last = false` behavior.
//!
//! This is intentionally a small, allocation-light wrapper. The
//! real training driver in `src/training/` owns its own batch
//! iterator; this stream is for the smoke tests and for ad-hoc
//! driving the `Model` / `MoEModel` forward+backward passes.

use crate::synth_data::decision_outcome::QualitySample;

/// A linear, deterministic iterator over a fixed dataset of
/// [`QualitySample`]s.
///
/// The stream holds the dataset by value and an internal cursor.
/// All batches are returned in dataset order; there is no
/// shuffling (callers can shuffle the dataset before constructing
/// the stream if they want stochastic order).
pub struct SyntheticSampleStream {
    /// The full dataset, ordered.
    samples: Vec<QualitySample>,
    /// Configured mini-batch size. Always `>= 1`.
    batch_size: usize,
    /// Index of the next sample to emit. Strictly increasing.
    cursor: usize,
}

impl SyntheticSampleStream {
    /// Build a new stream over `samples` with the given batch size.
    ///
    /// Panics if `batch_size == 0`.
    pub fn new(samples: Vec<QualitySample>, batch_size: usize) -> Self {
        assert!(batch_size >= 1, "batch_size must be >= 1");
        Self {
            samples,
            batch_size,
            cursor: 0,
        }
    }

    /// Total number of samples the stream will yield (in dataset
    /// order) before it is exhausted.
    pub fn len(&self) -> usize {
        self.samples.len()
    }

    /// True if the stream has no samples.
    pub fn is_empty(&self) -> bool {
        self.samples.is_empty()
    }

    /// Configured mini-batch size.
    pub fn batch_size(&self) -> usize {
        self.batch_size
    }

    /// Number of full batches the stream will yield (excluding the
    /// possibly-shorter final batch).
    pub fn n_full_batches(&self) -> usize {
        self.samples.len() / self.batch_size
    }

    /// Number of samples in the final (possibly shorter) batch.
    pub fn final_batch_size(&self) -> usize {
        let n = self.samples.len();
        if n == 0 {
            0
        } else {
            n - self.n_full_batches() * self.batch_size
        }
    }

    /// Total number of batches the stream will yield, including the
    /// possibly-shorter final batch.
    pub fn n_batches(&self) -> usize {
        let n = self.samples.len();
        if n == 0 { 0 } else { self.n_full_batches() + 1 }
    }

    /// Yield the next mini-batch, or `None` if the stream is
    /// exhausted. The returned `Vec` is freshly allocated and
    /// contains the next `min(batch_size, remaining)` samples in
    /// dataset order.
    pub fn next_batch(&mut self) -> Option<Vec<QualitySample>> {
        if self.cursor >= self.samples.len() {
            return None;
        }
        let end = (self.cursor + self.batch_size).min(self.samples.len());
        let batch: Vec<QualitySample> = self.samples[self.cursor..end].to_vec();
        self.cursor = end;
        Some(batch)
    }

    /// Reset the cursor so the stream can be iterated again from
    /// the start. The underlying samples are unchanged.
    pub fn reset(&mut self) {
        self.cursor = 0;
    }
}