Expand description
Cross-node deterministic border-Gram reduction (#987, extending #973).
crate::streaming_border made the in-process accumulation of the
Schur border Gram G = Σ_n x_n x_nᵀ bit-reproducible by construction: the
chunk partition is a pure function of (n_rows, chunk_size), per-chunk
partials are deterministic chunk_gram_flat reductions, and the
cross-chunk fold is a fixed pairwise tree keyed by chunk index — never by
arrival order, thread timing, or device count. This module extends that same
fixed-shape-by-construction discipline one level up, to a fleet of
worker nodes, with three properties the frontier corpus regime
(10⁹–10¹¹ tokens, hundreds of TB of activations) demands:
- Node count never changes bits. A node’s partials are not leaves of a
separate per-node tree that then gets merged (that shape would depend
on the node count). Instead every node computes the globally indexed
per-chunk partials it owns and ships
(chunk_index, k·k partial)messages; the coordinator folds them through the single global cascade ofStreamingBorderGram, which accepts any arrival order and folds in chunk-index order. The reduction topology is therefore a pure function of(n_rows, chunk_size)alone — running on 1 node, 3 nodes, or 64 nodes yields the identical bit pattern, because the tree never saw the node count. (The chunk→node assignment is rank-indexed and deterministic, but it only decides who computes a partial, never how partials combine.) - Checkpoint/resume is the job model, not an afterthought. Any worker’s
death resumes from its serialized
NodeWorkerCheckpoint(a cursor into its owned chunk sequence); the coordinator’s full state — the in-order fold forest, the pending out-of-order partials, and the per-rank receipt cursors — serializes to aCrossNodeCheckpoint. Resume-equals- straight-through holds at the bit level on both sides because both cursors are positions in deterministic sequences. - Partials, never rows, cross the wire. A worker streams its shard rows
locally (object store / mmap —
gam_sae::corpus) and ships onlyk·kf64 partials. The coordinator’s ingest seam isStreamingBorderGram::submit_chunk_gram; both producers route through the onechunk_gram_flatfree function, so a shipped partial is bit-identical to the partial the coordinator would have computed from the same rows.
§Chunk→rank assignment
Round-robin by chunk index: rank r of n_ranks owns chunks
{j : j ≡ r (mod n_ranks)}, in increasing order. Round-robin (rather than
contiguous ranges) keeps the coordinator’s in-order fold frontier advancing
steadily while all ranks make progress at similar rates, which bounds the
pending out-of-order buffer by O(n_ranks × inter-node skew) instead of
O(total chunks). The assignment is a pure function of
(chunk_index, n_ranks); no scheduler, no work stealing — work stealing
would not change bits (the fold is index-keyed) but it would break the
one-cursor-per-rank resume model, so it is deliberately absent.
Pure library: no networking, no flags, no environment variables. The transport (MPI, gRPC, files on a shared filesystem) is the caller’s; this module owns the deterministic topology, the cursors, and the validation.
Structs§
- Cross
Node Checkpoint - Serializable coordinator state: the inner accumulation state plus the per-rank receipt cursors.
- Cross
Node Gram Reduction - Coordinator-side reduction: receives
NodePartials from the fleet and folds them into the single global fixed-tree accumulator. - Cross
Node Partition - The deterministic chunk partition + rank-indexed assignment shared by every participant of one cross-node pass. A pure function of its four fields; two participants constructed with the same fields agree on every derived quantity, with no communication.
- Node
Partial - One shipped partial: the global chunk index plus the deterministic
k·kper-chunk Gram. This is the only message that crosses the node boundary —k·kf64 values per chunk, never rows. - Node
Worker - Worker-side driver for one rank: walks the rank’s deterministic owned-chunk
sequence, turning row slices into shippable
NodePartials. - Node
Worker Checkpoint - Serialized cursor of one worker: everything needed for a replacement process (same rank, any host) to continue the dead worker’s deterministic chunk sequence from where receipts stopped. Pure data; the worker’s row source re-seeks by row range, which is a pure function of the partition.