Skip to main content

Module cross_node

Module cross_node 

Source
Expand description

Cross-node deterministic border-Gram reduction (#987, extending #973).

crate::streaming_border made the in-process accumulation of the Schur border Gram G = Σ_n x_n x_nᵀ bit-reproducible by construction: the chunk partition is a pure function of (n_rows, chunk_size), per-chunk partials are deterministic chunk_gram_flat reductions, and the cross-chunk fold is a fixed pairwise tree keyed by chunk index — never by arrival order, thread timing, or device count. This module extends that same fixed-shape-by-construction discipline one level up, to a fleet of worker nodes, with three properties the frontier corpus regime (10⁹–10¹¹ tokens, hundreds of TB of activations) demands:

  1. Node count never changes bits. A node’s partials are not leaves of a separate per-node tree that then gets merged (that shape would depend on the node count). Instead every node computes the globally indexed per-chunk partials it owns and ships (chunk_index, k·k partial) messages; the coordinator folds them through the single global cascade of StreamingBorderGram, which accepts any arrival order and folds in chunk-index order. The reduction topology is therefore a pure function of (n_rows, chunk_size) alone — running on 1 node, 3 nodes, or 64 nodes yields the identical bit pattern, because the tree never saw the node count. (The chunk→node assignment is rank-indexed and deterministic, but it only decides who computes a partial, never how partials combine.)
  2. Checkpoint/resume is the job model, not an afterthought. Any worker’s death resumes from its serialized NodeWorkerCheckpoint (a cursor into its owned chunk sequence); the coordinator’s full state — the in-order fold forest, the pending out-of-order partials, and the per-rank receipt cursors — serializes to a CrossNodeCheckpoint. Resume-equals- straight-through holds at the bit level on both sides because both cursors are positions in deterministic sequences.
  3. Partials, never rows, cross the wire. A worker streams its shard rows locally (object store / mmap — gam_sae::corpus) and ships only k·k f64 partials. The coordinator’s ingest seam is StreamingBorderGram::submit_chunk_gram; both producers route through the one chunk_gram_flat free function, so a shipped partial is bit-identical to the partial the coordinator would have computed from the same rows.

§Chunk→rank assignment

Round-robin by chunk index: rank r of n_ranks owns chunks {j : j ≡ r (mod n_ranks)}, in increasing order. Round-robin (rather than contiguous ranges) keeps the coordinator’s in-order fold frontier advancing steadily while all ranks make progress at similar rates, which bounds the pending out-of-order buffer by O(n_ranks × inter-node skew) instead of O(total chunks). The assignment is a pure function of (chunk_index, n_ranks); no scheduler, no work stealing — work stealing would not change bits (the fold is index-keyed) but it would break the one-cursor-per-rank resume model, so it is deliberately absent.

Pure library: no networking, no flags, no environment variables. The transport (MPI, gRPC, files on a shared filesystem) is the caller’s; this module owns the deterministic topology, the cursors, and the validation.

Structs§

CrossNodeCheckpoint
Serializable coordinator state: the inner accumulation state plus the per-rank receipt cursors.
CrossNodeGramReduction
Coordinator-side reduction: receives NodePartials from the fleet and folds them into the single global fixed-tree accumulator.
CrossNodePartition
The deterministic chunk partition + rank-indexed assignment shared by every participant of one cross-node pass. A pure function of its four fields; two participants constructed with the same fields agree on every derived quantity, with no communication.
NodePartial
One shipped partial: the global chunk index plus the deterministic k·k per-chunk Gram. This is the only message that crosses the node boundary — k·k f64 values per chunk, never rows.
NodeWorker
Worker-side driver for one rank: walks the rank’s deterministic owned-chunk sequence, turning row slices into shippable NodePartials.
NodeWorkerCheckpoint
Serialized cursor of one worker: everything needed for a replacement process (same rank, any host) to continue the dead worker’s deterministic chunk sequence from where receipts stopped. Pure data; the worker’s row source re-seeks by row range, which is a pure function of the partition.