Skip to main content

oxiphysics_io/
parallel_io.rs

1// Copyright 2026 COOLJAPAN OU (Team KitaSan)
2// SPDX-License-Identifier: Apache-2.0
3
4//! Parallel I/O utilities for large-scale distributed physics simulations.
5//!
6//! This module provides:
7//! - **Domain decomposition metadata** – rank, ghost cells, partition boundaries
8//! - **Chunked file I/O** – fixed-size chunks with parallel writer state tracking
9//! - **Parallel VTK output** – PVTU / PVD index files for distributed meshes
10//! - **MPI-like rank / domain storage** – load-balanced partition export
11//! - **Ghost cell metadata** – halo layers for stencil communication
12//! - **Distributed array serialization** – flat binary layout for parallel restart
13//! - **Checkpoint / restart** – step-indexed checkpoint files
14//! - **Merge of distributed outputs** – reassembling per-rank files
15//! - **Parallel HDF5-like layout** – dataset headers + contiguous data blocks
16
17#![allow(dead_code)]
18
19use std::fmt;
20use std::fs;
21use std::io::{self, BufRead, Write};
22
23// ─────────────────────────────────────────────────────────────────────────────
24// Domain decomposition metadata
25// ─────────────────────────────────────────────────────────────────────────────
26
27/// Axis-aligned bounding box in 3-D used to describe a subdomain.
28#[derive(Debug, Clone, PartialEq)]
29pub struct DomainBox {
30    /// Minimum corner `[x_min, y_min, z_min]`.
31    pub min: [f64; 3],
32    /// Maximum corner `[x_max, y_max, z_max]`.
33    pub max: [f64; 3],
34}
35
36impl DomainBox {
37    /// Construct a new domain box.
38    pub fn new(min: [f64; 3], max: [f64; 3]) -> Self {
39        Self { min, max }
40    }
41
42    /// Volume of the bounding box.
43    pub fn volume(&self) -> f64 {
44        (self.max[0] - self.min[0]).max(0.0)
45            * (self.max[1] - self.min[1]).max(0.0)
46            * (self.max[2] - self.min[2]).max(0.0)
47    }
48
49    /// Return the centre of the box.
50    pub fn centre(&self) -> [f64; 3] {
51        [
52            0.5 * (self.min[0] + self.max[0]),
53            0.5 * (self.min[1] + self.max[1]),
54            0.5 * (self.min[2] + self.max[2]),
55        ]
56    }
57
58    /// Test whether the point `p` lies strictly inside the box.
59    pub fn contains(&self, p: [f64; 3]) -> bool {
60        p[0] > self.min[0]
61            && p[0] < self.max[0]
62            && p[1] > self.min[1]
63            && p[1] < self.max[1]
64            && p[2] > self.min[2]
65            && p[2] < self.max[2]
66    }
67
68    /// Return `true` if this box overlaps (strictly) with `other`.
69    pub fn overlaps(&self, other: &DomainBox) -> bool {
70        self.min[0] < other.max[0]
71            && self.max[0] > other.min[0]
72            && self.min[1] < other.max[1]
73            && self.max[1] > other.min[1]
74            && self.min[2] < other.max[2]
75            && self.max[2] > other.min[2]
76    }
77}
78
79/// Per-rank subdomain metadata for a domain-decomposed simulation.
80#[derive(Debug, Clone)]
81pub struct RankDomain {
82    /// MPI rank index (0-based).
83    pub rank: usize,
84    /// Total number of ranks in the communicator.
85    pub n_ranks: usize,
86    /// Spatial bounding box of the owned (non-ghost) region.
87    pub owned_box: DomainBox,
88    /// Number of ghost cell layers on each face `[x-, x+, y-, y+, z-, z+]`.
89    pub ghost_layers: [usize; 6],
90    /// Number of owned cells in each direction `[nx, ny, nz]`.
91    pub owned_cells: [usize; 3],
92    /// Global offset of this rank's first cell `[ox, oy, oz]`.
93    pub global_offset: [usize; 3],
94}
95
96impl RankDomain {
97    /// Construct domain metadata for a single rank.
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        rank: usize,
101        n_ranks: usize,
102        owned_box: DomainBox,
103        ghost_layers: [usize; 6],
104        owned_cells: [usize; 3],
105        global_offset: [usize; 3],
106    ) -> Self {
107        Self {
108            rank,
109            n_ranks,
110            owned_box,
111            ghost_layers,
112            owned_cells,
113            global_offset,
114        }
115    }
116
117    /// Total number of locally owned cells.
118    pub fn n_owned_cells(&self) -> usize {
119        self.owned_cells[0] * self.owned_cells[1] * self.owned_cells[2]
120    }
121
122    /// Total cells including ghost layers (all six faces).
123    pub fn n_total_cells(&self) -> usize {
124        let ext_x = self.owned_cells[0] + self.ghost_layers[0] + self.ghost_layers[1];
125        let ext_y = self.owned_cells[1] + self.ghost_layers[2] + self.ghost_layers[3];
126        let ext_z = self.owned_cells[2] + self.ghost_layers[4] + self.ghost_layers[5];
127        ext_x * ext_y * ext_z
128    }
129
130    /// Fraction of the global workload assigned to this rank.
131    ///
132    /// Assumes uniform distribution; override if load varies.
133    pub fn load_fraction(&self) -> f64 {
134        if self.n_ranks == 0 {
135            return 1.0;
136        }
137        self.n_owned_cells() as f64
138    }
139}
140
141// ─────────────────────────────────────────────────────────────────────────────
142// Ghost cell metadata
143// ─────────────────────────────────────────────────────────────────────────────
144
145/// Cardinal face directions for a 3-D structured mesh.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum Face {
148    /// Negative-X face.
149    XMinus,
150    /// Positive-X face.
151    XPlus,
152    /// Negative-Y face.
153    YMinus,
154    /// Positive-Y face.
155    YPlus,
156    /// Negative-Z face.
157    ZMinus,
158    /// Positive-Z face.
159    ZPlus,
160}
161
162impl fmt::Display for Face {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        let s = match self {
165            Face::XMinus => "XMinus",
166            Face::XPlus => "XPlus",
167            Face::YMinus => "YMinus",
168            Face::YPlus => "YPlus",
169            Face::ZMinus => "ZMinus",
170            Face::ZPlus => "ZPlus",
171        };
172        write!(f, "{s}")
173    }
174}
175
176/// Describes the ghost cell layer on a single face.
177#[derive(Debug, Clone)]
178pub struct GhostLayer {
179    /// Which face this ghost layer belongs to.
180    pub face: Face,
181    /// Owning rank on the other side of this interface.
182    pub neighbour_rank: usize,
183    /// Number of ghost cell layers.
184    pub depth: usize,
185    /// Number of cells in the two tangential directions `[t1, t2]`.
186    pub tangential_cells: [usize; 2],
187}
188
189impl GhostLayer {
190    /// Total number of ghost cells in this layer.
191    pub fn n_cells(&self) -> usize {
192        self.depth * self.tangential_cells[0] * self.tangential_cells[1]
193    }
194}
195
196/// Full set of ghost-cell metadata for one rank.
197#[derive(Debug, Clone, Default)]
198pub struct GhostCellMeta {
199    /// One entry per active face interface.
200    pub layers: Vec<GhostLayer>,
201}
202
203impl GhostCellMeta {
204    /// Create an empty metadata container.
205    pub fn new() -> Self {
206        Self::default()
207    }
208
209    /// Add a ghost layer descriptor.
210    pub fn add_layer(&mut self, layer: GhostLayer) {
211        self.layers.push(layer);
212    }
213
214    /// Total ghost cells summed across all faces.
215    pub fn total_ghost_cells(&self) -> usize {
216        self.layers.iter().map(|l| l.n_cells()).sum()
217    }
218
219    /// List all neighbour ranks (deduplicated).
220    pub fn neighbour_ranks(&self) -> Vec<usize> {
221        let mut ranks: Vec<usize> = self.layers.iter().map(|l| l.neighbour_rank).collect();
222        ranks.sort_unstable();
223        ranks.dedup();
224        ranks
225    }
226}
227
228// ─────────────────────────────────────────────────────────────────────────────
229// Chunk splitting
230// ─────────────────────────────────────────────────────────────────────────────
231
232/// Specification for a single data chunk.
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct ChunkSpec {
235    /// Inclusive start index of this chunk.
236    pub start: usize,
237    /// Exclusive end index of this chunk.
238    pub end: usize,
239    /// Zero-based chunk index.
240    pub chunk_idx: usize,
241}
242
243impl ChunkSpec {
244    /// Number of elements in this chunk.
245    pub fn size(&self) -> usize {
246        self.end.saturating_sub(self.start)
247    }
248}
249
250/// Divide `total` elements into at most `n_chunks` contiguous chunks.
251///
252/// Returns a `Vec`ChunkSpec` covering `\[0, total)`. The last chunk absorbs
253/// any remainder so that all elements are covered.
254pub fn split_into_chunks(total: usize, n_chunks: usize) -> Vec<ChunkSpec> {
255    if total == 0 || n_chunks == 0 {
256        return Vec::new();
257    }
258    let actual_chunks = n_chunks.min(total);
259    let base = total / actual_chunks;
260    let remainder = total % actual_chunks;
261    let mut specs = Vec::with_capacity(actual_chunks);
262    let mut start = 0;
263    for idx in 0..actual_chunks {
264        let extra = if idx < remainder { 1 } else { 0 };
265        let end = start + base + extra;
266        specs.push(ChunkSpec {
267            start,
268            end,
269            chunk_idx: idx,
270        });
271        start = end;
272    }
273    specs
274}
275
276// ─────────────────────────────────────────────────────────────────────────────
277// Parallel writer state
278// ─────────────────────────────────────────────────────────────────────────────
279
280/// Tracks parallel write progress across multiple chunk files.
281#[derive(Debug, Clone)]
282pub struct ParallelWriter {
283    /// Base filename (without chunk suffix).
284    pub filename: String,
285    /// Number of chunks that have been written so far.
286    pub chunks_written: usize,
287    /// Total number of chunks expected.
288    pub total_chunks: usize,
289}
290
291impl ParallelWriter {
292    /// Create a new `ParallelWriter` for `total_chunks` chunks.
293    pub fn new(filename: impl Into<String>, total_chunks: usize) -> Self {
294        Self {
295            filename: filename.into(),
296            chunks_written: 0,
297            total_chunks,
298        }
299    }
300
301    /// Record that one more chunk has been written.
302    pub fn mark_chunk_written(&mut self) {
303        if self.chunks_written < self.total_chunks {
304            self.chunks_written += 1;
305        }
306    }
307
308    /// Returns `true` when all chunks have been written.
309    pub fn is_complete(&self) -> bool {
310        self.chunks_written >= self.total_chunks
311    }
312
313    /// Fraction of chunks completed, in `\[0.0, 1.0\]`.
314    pub fn progress(&self) -> f64 {
315        if self.total_chunks == 0 {
316            return 1.0;
317        }
318        self.chunks_written as f64 / self.total_chunks as f64
319    }
320
321    /// Reset the writer to zero written chunks.
322    pub fn reset(&mut self) {
323        self.chunks_written = 0;
324    }
325}
326
327// ─────────────────────────────────────────────────────────────────────────────
328// Parallel VTK output — PVTU / PVD
329// ─────────────────────────────────────────────────────────────────────────────
330
331/// Metadata for a single piece in a parallel VTU (PVTU) file.
332#[derive(Debug, Clone)]
333pub struct PvtuPiece {
334    /// Relative path to the per-rank `.vtu` file.
335    pub file: String,
336    /// MPI rank that owns this piece.
337    pub rank: usize,
338}
339
340/// Generates the XML content of a `.pvtu` (Parallel VTK Unstructured) index file.
341///
342/// * `pieces` – per-rank VTU file references.
343/// * `point_arrays` – names of point data arrays present in each piece.
344/// * `cell_arrays` – names of cell data arrays present in each piece.
345pub fn write_pvtu_xml(pieces: &[PvtuPiece], point_arrays: &[&str], cell_arrays: &[&str]) -> String {
346    let mut s = String::new();
347    s.push_str("<?xml version=\"1.0\"?>\n");
348    s.push_str(
349        "<VTKFile type=\"PUnstructuredGrid\" version=\"0.1\" byte_order=\"LittleEndian\">\n",
350    );
351    s.push_str("  <PUnstructuredGrid GhostLevel=\"1\">\n");
352    s.push_str("    <PPoints>\n");
353    s.push_str("      <PDataArray type=\"Float64\" NumberOfComponents=\"3\"/>\n");
354    s.push_str("    </PPoints>\n");
355    if !point_arrays.is_empty() {
356        s.push_str("    <PPointData>\n");
357        for name in point_arrays {
358            s.push_str(&format!(
359                "      <PDataArray type=\"Float64\" Name=\"{name}\"/>\n"
360            ));
361        }
362        s.push_str("    </PPointData>\n");
363    }
364    if !cell_arrays.is_empty() {
365        s.push_str("    <PCellData>\n");
366        for name in cell_arrays {
367            s.push_str(&format!(
368                "      <PDataArray type=\"Float64\" Name=\"{name}\"/>\n"
369            ));
370        }
371        s.push_str("    </PCellData>\n");
372    }
373    for piece in pieces {
374        s.push_str(&format!("    <Piece Source=\"{}\"/>\n", piece.file));
375    }
376    s.push_str("  </PUnstructuredGrid>\n");
377    s.push_str("</VTKFile>\n");
378    s
379}
380
381/// Generates the XML content of a `.pvd` (ParaView Data) time-series file.
382///
383/// * `entries` – list of `(time_value, pvtu_file)` pairs.
384pub fn write_pvd_xml(entries: &[(f64, &str)]) -> String {
385    let mut s = String::new();
386    s.push_str("<?xml version=\"1.0\"?>\n");
387    s.push_str("<VTKFile type=\"Collection\" version=\"0.1\">\n");
388    s.push_str("  <Collection>\n");
389    for (t, file) in entries {
390        s.push_str(&format!(
391            "    <DataSet timestep=\"{t:.6e}\" group=\"\" part=\"0\" file=\"{file}\"/>\n"
392        ));
393    }
394    s.push_str("  </Collection>\n");
395    s.push_str("</VTKFile>\n");
396    s
397}
398
399// ─────────────────────────────────────────────────────────────────────────────
400// Load-balanced partition export
401// ─────────────────────────────────────────────────────────────────────────────
402
403/// Partition assignment for load-balanced domain decomposition.
404#[derive(Debug, Clone)]
405pub struct PartitionMap {
406    /// For each global cell index, the owning rank.
407    pub cell_to_rank: Vec<usize>,
408    /// Total number of ranks.
409    pub n_ranks: usize,
410}
411
412impl PartitionMap {
413    /// Create a `PartitionMap` from a flat assignment vector.
414    pub fn new(cell_to_rank: Vec<usize>, n_ranks: usize) -> Self {
415        Self {
416            cell_to_rank,
417            n_ranks,
418        }
419    }
420
421    /// Number of global cells.
422    pub fn n_cells(&self) -> usize {
423        self.cell_to_rank.len()
424    }
425
426    /// Number of cells assigned to the given rank.
427    pub fn cells_on_rank(&self, rank: usize) -> usize {
428        self.cell_to_rank.iter().filter(|&&r| r == rank).count()
429    }
430
431    /// Load imbalance factor: `max_cells / ideal_cells`.
432    ///
433    /// Returns `1.0` for a perfectly balanced partition; higher values indicate
434    /// greater imbalance.  Returns `1.0` if there are no cells or ranks.
435    pub fn imbalance(&self) -> f64 {
436        if self.n_cells() == 0 || self.n_ranks == 0 {
437            return 1.0;
438        }
439        let ideal = self.n_cells() as f64 / self.n_ranks as f64;
440        let max_load = (0..self.n_ranks)
441            .map(|r| self.cells_on_rank(r))
442            .max()
443            .unwrap_or(0) as f64;
444        if ideal < 1e-12 { 1.0 } else { max_load / ideal }
445    }
446
447    /// Build a simple round-robin partition for `n_cells` cells over `n_ranks` ranks.
448    pub fn round_robin(n_cells: usize, n_ranks: usize) -> Self {
449        if n_ranks == 0 {
450            return Self::new(vec![], 0);
451        }
452        let map = (0..n_cells).map(|i| i % n_ranks).collect();
453        Self::new(map, n_ranks)
454    }
455
456    /// Serialize to a simple text format (one rank per line).
457    pub fn to_text(&self) -> String {
458        let mut s = format!("n_ranks={}\n", self.n_ranks);
459        for (i, &r) in self.cell_to_rank.iter().enumerate() {
460            s.push_str(&format!("{i} {r}\n"));
461        }
462        s
463    }
464}
465
466// ─────────────────────────────────────────────────────────────────────────────
467// Distributed array serialization
468// ─────────────────────────────────────────────────────────────────────────────
469
470/// A distributed scalar array owned by one rank.
471#[derive(Debug, Clone)]
472pub struct DistributedArray {
473    /// MPI rank that owns this shard.
474    pub rank: usize,
475    /// Global length of the full array.
476    pub global_len: usize,
477    /// Global start index of this shard.
478    pub offset: usize,
479    /// Local data values.
480    pub data: Vec<f64>,
481}
482
483impl DistributedArray {
484    /// Create a new shard.
485    pub fn new(rank: usize, global_len: usize, offset: usize, data: Vec<f64>) -> Self {
486        Self {
487            rank,
488            global_len,
489            offset,
490            data,
491        }
492    }
493
494    /// Local length of this shard.
495    pub fn local_len(&self) -> usize {
496        self.data.len()
497    }
498
499    /// Serialise to a compact binary-hex text format.
500    ///
501    /// Format: `rank offset global_len n_values <hex-encoded f64s>\n`
502    pub fn serialize_text(&self) -> String {
503        let hex: String = self
504            .data
505            .iter()
506            .flat_map(|&v| v.to_le_bytes())
507            .map(|b| format!("{b:02x}"))
508            .collect();
509        format!(
510            "{} {} {} {} {}\n",
511            self.rank,
512            self.offset,
513            self.global_len,
514            self.data.len(),
515            hex
516        )
517    }
518
519    /// Deserialise from the text format produced by `serialize_text`.
520    pub fn deserialize_text(s: &str) -> Option<Self> {
521        let mut it = s.split_whitespace();
522        let rank: usize = it.next()?.parse().ok()?;
523        let offset: usize = it.next()?.parse().ok()?;
524        let global_len: usize = it.next()?.parse().ok()?;
525        let n: usize = it.next()?.parse().ok()?;
526        let hex = it.next().unwrap_or("");
527        if hex.len() != n * 16 {
528            return None;
529        }
530        let data: Vec<f64> = (0..n)
531            .map(|i| {
532                let chunk = &hex[i * 16..(i + 1) * 16];
533                let mut bytes = [0u8; 8];
534                for (j, byte) in bytes.iter_mut().enumerate() {
535                    *byte = u8::from_str_radix(&chunk[j * 2..j * 2 + 2], 16).unwrap_or(0);
536                }
537                f64::from_le_bytes(bytes)
538            })
539            .collect();
540        Some(Self {
541            rank,
542            global_len,
543            offset,
544            data,
545        })
546    }
547
548    /// Reconstruct the global array by assembling shards in offset order.
549    ///
550    /// All shards must cover the entire global range without gaps or overlaps.
551    pub fn assemble(shards: &[DistributedArray]) -> Option<Vec<f64>> {
552        if shards.is_empty() {
553            return Some(vec![]);
554        }
555        let global_len = shards[0].global_len;
556        let mut result = vec![0.0_f64; global_len];
557        for shard in shards {
558            let end = shard.offset + shard.local_len();
559            if end > global_len {
560                return None;
561            }
562            result[shard.offset..end].copy_from_slice(&shard.data);
563        }
564        Some(result)
565    }
566}
567
568// ─────────────────────────────────────────────────────────────────────────────
569// Checkpoint / restart
570// ─────────────────────────────────────────────────────────────────────────────
571
572/// Header record written at the start of every checkpoint file.
573#[derive(Debug, Clone)]
574pub struct CheckpointHeader {
575    /// Simulation step index.
576    pub step: u64,
577    /// Simulation time.
578    pub time: f64,
579    /// Number of MPI ranks used when the checkpoint was written.
580    pub n_ranks: usize,
581    /// Number of particles/cells in this checkpoint.
582    pub n_items: usize,
583    /// A user-defined tag (simulation name, version, etc.).
584    pub tag: String,
585}
586
587impl CheckpointHeader {
588    /// Create a new checkpoint header.
589    pub fn new(step: u64, time: f64, n_ranks: usize, n_items: usize, tag: &str) -> Self {
590        Self {
591            step,
592            time,
593            n_ranks,
594            n_items,
595            tag: tag.to_owned(),
596        }
597    }
598
599    /// Serialize to a single-line text format.
600    pub fn to_line(&self) -> String {
601        format!(
602            "CHECKPOINT step={} time={:.10e} n_ranks={} n_items={} tag={}\n",
603            self.step, self.time, self.n_ranks, self.n_items, self.tag
604        )
605    }
606
607    /// Parse a checkpoint header from a text line.
608    pub fn from_line(line: &str) -> Option<Self> {
609        if !line.trim_start().starts_with("CHECKPOINT") {
610            return None;
611        }
612        let mut step = 0u64;
613        let mut time = 0.0f64;
614        let mut n_ranks = 1usize;
615        let mut n_items = 0usize;
616        let mut tag = String::new();
617        for token in line.split_whitespace().skip(1) {
618            if let Some(rest) = token.strip_prefix("step=") {
619                step = rest.parse().unwrap_or(0);
620            } else if let Some(rest) = token.strip_prefix("time=") {
621                time = rest.parse().unwrap_or(0.0);
622            } else if let Some(rest) = token.strip_prefix("n_ranks=") {
623                n_ranks = rest.parse().unwrap_or(1);
624            } else if let Some(rest) = token.strip_prefix("n_items=") {
625                n_items = rest.parse().unwrap_or(0);
626            } else if let Some(rest) = token.strip_prefix("tag=") {
627                tag = rest.to_owned();
628            }
629        }
630        Some(Self {
631            step,
632            time,
633            n_ranks,
634            n_items,
635            tag,
636        })
637    }
638}
639
640/// Write a checkpoint file containing a header and particle positions.
641///
642/// The file is created at `path`. Each position is written as three
643/// space-separated f64 values, one atom per line.
644pub fn write_checkpoint(
645    path: &str,
646    header: &CheckpointHeader,
647    positions: &[[f64; 3]],
648) -> Result<(), io::Error> {
649    let mut file = fs::File::create(path)?;
650    write!(file, "{}", header.to_line())?;
651    for pos in positions {
652        writeln!(file, "{:.10e} {:.10e} {:.10e}", pos[0], pos[1], pos[2])?;
653    }
654    Ok(())
655}
656
657/// Read a checkpoint file written by [`write_checkpoint`].
658///
659/// Returns the header and a vector of positions.
660pub fn read_checkpoint(path: &str) -> Result<(CheckpointHeader, Vec<[f64; 3]>), io::Error> {
661    let file = fs::File::open(path)?;
662    let reader = io::BufReader::new(file);
663    let mut lines_iter = reader.lines();
664    let header_line = lines_iter
665        .next()
666        .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "empty checkpoint"))??;
667    let header = CheckpointHeader::from_line(&header_line)
668        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad checkpoint header"))?;
669    let mut positions = Vec::new();
670    for line in lines_iter {
671        let line = line?;
672        let parts: Vec<&str> = line.split_whitespace().collect();
673        if parts.len() >= 3 {
674            let x: f64 = parts[0].parse().unwrap_or(0.0);
675            let y: f64 = parts[1].parse().unwrap_or(0.0);
676            let z: f64 = parts[2].parse().unwrap_or(0.0);
677            positions.push([x, y, z]);
678        }
679    }
680    Ok((header, positions))
681}
682
683// ─────────────────────────────────────────────────────────────────────────────
684// File merging
685// ─────────────────────────────────────────────────────────────────────────────
686
687/// Concatenate `n_chunks` chunk files into a single output file.
688///
689/// Chunk files are expected at `{base_path}.chunk{i}` for `i` in `0..n_chunks`.
690/// Each chunk file is appended in order to `output_path`.
691pub fn merge_chunk_files(
692    base_path: &str,
693    n_chunks: usize,
694    output_path: &str,
695) -> Result<(), io::Error> {
696    let mut out = fs::File::create(output_path)?;
697    for i in 0..n_chunks {
698        let chunk_path = format!("{base_path}.chunk{i}");
699        let data = fs::read(&chunk_path)?;
700        out.write_all(&data)?;
701    }
702    Ok(())
703}
704
705/// Merge per-rank checkpoint position files into a single file.
706///
707/// Per-rank files are expected at `{base_path}.rank{r}` for `r` in `0..n_ranks`.
708pub fn merge_rank_files(
709    base_path: &str,
710    n_ranks: usize,
711    output_path: &str,
712) -> Result<(), io::Error> {
713    let mut out = fs::File::create(output_path)?;
714    for r in 0..n_ranks {
715        let rank_path = format!("{base_path}.rank{r}");
716        if let Ok(data) = fs::read(&rank_path) {
717            out.write_all(&data)?;
718        }
719    }
720    Ok(())
721}
722
723// ─────────────────────────────────────────────────────────────────────────────
724// Bandwidth estimation
725// ─────────────────────────────────────────────────────────────────────────────
726
727/// Estimate I/O bandwidth in MB/s.
728///
729/// * `bytes` — number of bytes transferred.
730/// * `elapsed_secs` — elapsed time in seconds.
731pub fn estimate_io_bandwidth(bytes: usize, elapsed_secs: f64) -> f64 {
732    if elapsed_secs <= 0.0 {
733        return 0.0;
734    }
735    (bytes as f64) / elapsed_secs / 1e6
736}
737
738// ─────────────────────────────────────────────────────────────────────────────
739// Chunked trajectory
740// ─────────────────────────────────────────────────────────────────────────────
741
742/// A trajectory divided into fixed-size chunks of frames.
743#[derive(Debug, Clone)]
744pub struct ChunkedTrajectory {
745    /// Total number of frames in the full trajectory.
746    pub n_frames: usize,
747    /// Number of atoms per frame.
748    pub n_atoms: usize,
749    /// Number of frames per chunk.
750    pub chunk_size: usize,
751    /// All frames stored as `Vec<\[f64; 3\]>` (one entry per atom).
752    pub frames: Vec<Vec<[f64; 3]>>,
753}
754
755impl ChunkedTrajectory {
756    /// Create an empty trajectory with the given parameters.
757    pub fn new(n_atoms: usize, chunk_size: usize) -> Self {
758        Self {
759            n_frames: 0,
760            n_atoms,
761            chunk_size: chunk_size.max(1),
762            frames: Vec::new(),
763        }
764    }
765
766    /// Append a frame to the trajectory.
767    pub fn push_frame(&mut self, frame: Vec<[f64; 3]>) {
768        self.frames.push(frame);
769        self.n_frames = self.frames.len();
770    }
771
772    /// Number of chunks needed to cover all frames.
773    pub fn n_chunks(&self) -> usize {
774        if self.n_frames == 0 {
775            return 0;
776        }
777        self.n_frames.div_ceil(self.chunk_size)
778    }
779}
780
781// ─────────────────────────────────────────────────────────────────────────────
782// XYZ chunk I/O
783// ─────────────────────────────────────────────────────────────────────────────
784
785/// Write a `ChunkedTrajectory` to XYZ files, one file per chunk.
786///
787/// Output files are named `{base_path}.chunk{i}.xyz`.
788pub fn write_chunked_xyz(traj: &ChunkedTrajectory, base_path: &str) -> Result<(), io::Error> {
789    let n_chunks = traj.n_chunks();
790    for chunk_idx in 0..n_chunks {
791        let start_frame = chunk_idx * traj.chunk_size;
792        let end_frame = (start_frame + traj.chunk_size).min(traj.n_frames);
793        let chunk_path = format!("{base_path}.chunk{chunk_idx}.xyz");
794        let mut file = fs::File::create(&chunk_path)?;
795        for frame_idx in start_frame..end_frame {
796            let frame = &traj.frames[frame_idx];
797            writeln!(file, "{}", frame.len())?;
798            writeln!(file, "Frame {frame_idx}")?;
799            for pos in frame {
800                writeln!(file, "X {:.6} {:.6} {:.6}", pos[0], pos[1], pos[2])?;
801            }
802        }
803    }
804    Ok(())
805}
806
807/// Read chunked XYZ files written by [`write_chunked_xyz`].
808///
809/// Reads `{base_path}.chunk{i}.xyz` for `i` in `0..n_chunks`.
810pub fn read_chunked_xyz(base_path: &str, n_chunks: usize) -> Result<ChunkedTrajectory, io::Error> {
811    let mut frames: Vec<Vec<[f64; 3]>> = Vec::new();
812    for chunk_idx in 0..n_chunks {
813        let chunk_path = format!("{base_path}.chunk{chunk_idx}.xyz");
814        let file = fs::File::open(&chunk_path)?;
815        let reader = io::BufReader::new(file);
816        let lines: Vec<String> = reader.lines().collect::<Result<Vec<_>, _>>()?;
817        let mut idx = 0;
818        while idx < lines.len() {
819            let n_atoms: usize = lines[idx].trim().parse().unwrap_or(0);
820            idx += 1;
821            if idx >= lines.len() {
822                break;
823            }
824            idx += 1; // skip comment line
825            let mut frame = Vec::with_capacity(n_atoms);
826            for _ in 0..n_atoms {
827                if idx >= lines.len() {
828                    break;
829                }
830                let parts: Vec<&str> = lines[idx].split_whitespace().collect();
831                if parts.len() >= 4 {
832                    let x: f64 = parts[1].parse().unwrap_or(0.0);
833                    let y: f64 = parts[2].parse().unwrap_or(0.0);
834                    let z: f64 = parts[3].parse().unwrap_or(0.0);
835                    frame.push([x, y, z]);
836                }
837                idx += 1;
838            }
839            if !frame.is_empty() {
840                frames.push(frame);
841            }
842        }
843    }
844    let n_frames = frames.len();
845    let n_atoms = frames.first().map(|f| f.len()).unwrap_or(0);
846    Ok(ChunkedTrajectory {
847        n_frames,
848        n_atoms,
849        chunk_size: n_frames.max(1),
850        frames,
851    })
852}
853
854// ─────────────────────────────────────────────────────────────────────────────
855// Parallel HDF5-like layout
856// ─────────────────────────────────────────────────────────────────────────────
857
858/// Data-type descriptor for a dataset column.
859#[derive(Debug, Clone, PartialEq, Eq)]
860pub enum DsetType {
861    /// 64-bit float.
862    Float64,
863    /// 32-bit float.
864    Float32,
865    /// 64-bit unsigned integer.
866    Uint64,
867    /// 32-bit unsigned integer.
868    Uint32,
869}
870
871impl fmt::Display for DsetType {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        let s = match self {
874            DsetType::Float64 => "float64",
875            DsetType::Float32 => "float32",
876            DsetType::Uint64 => "uint64",
877            DsetType::Uint32 => "uint32",
878        };
879        write!(f, "{s}")
880    }
881}
882
883impl DsetType {
884    /// Size in bytes of a single element.
885    pub fn byte_size(&self) -> usize {
886        match self {
887            DsetType::Float64 | DsetType::Uint64 => 8,
888            DsetType::Float32 | DsetType::Uint32 => 4,
889        }
890    }
891}
892
893/// Header for a single named dataset within a parallel file.
894#[derive(Debug, Clone)]
895pub struct DatasetHeader {
896    /// Human-readable dataset name.
897    pub name: String,
898    /// Element type.
899    pub dtype: DsetType,
900    /// Number of components per element (e.g. 3 for a 3-D vector).
901    pub n_components: usize,
902    /// Total number of elements in the global dataset.
903    pub global_n: usize,
904    /// Byte offset of this dataset's data block within the file body.
905    pub byte_offset: usize,
906}
907
908impl DatasetHeader {
909    /// Construct a new dataset header.
910    pub fn new(
911        name: &str,
912        dtype: DsetType,
913        n_components: usize,
914        global_n: usize,
915        byte_offset: usize,
916    ) -> Self {
917        Self {
918            name: name.to_owned(),
919            dtype,
920            n_components,
921            global_n,
922            byte_offset,
923        }
924    }
925
926    /// Total byte size of this dataset's data block.
927    pub fn data_size_bytes(&self) -> usize {
928        self.global_n * self.n_components * self.dtype.byte_size()
929    }
930
931    /// Serialize to a text header line.
932    pub fn to_header_line(&self) -> String {
933        format!(
934            "DATASET name={} dtype={} n_comp={} global_n={} offset={}\n",
935            self.name, self.dtype, self.n_components, self.global_n, self.byte_offset
936        )
937    }
938}
939
940/// A lightweight parallel-HDF5-like file layout description.
941#[derive(Debug, Clone, Default)]
942pub struct ParallelFileLayout {
943    /// All dataset headers in declaration order.
944    pub datasets: Vec<DatasetHeader>,
945}
946
947impl ParallelFileLayout {
948    /// Create an empty layout.
949    pub fn new() -> Self {
950        Self::default()
951    }
952
953    /// Register a dataset and compute its byte offset automatically.
954    pub fn add_dataset(
955        &mut self,
956        name: &str,
957        dtype: DsetType,
958        n_components: usize,
959        global_n: usize,
960    ) {
961        let offset = self
962            .datasets
963            .last()
964            .map(|d| d.byte_offset + d.data_size_bytes())
965            .unwrap_or(0);
966        self.datasets.push(DatasetHeader::new(
967            name,
968            dtype,
969            n_components,
970            global_n,
971            offset,
972        ));
973    }
974
975    /// Total byte size of all dataset data blocks.
976    pub fn total_data_bytes(&self) -> usize {
977        self.datasets
978            .last()
979            .map(|d| d.byte_offset + d.data_size_bytes())
980            .unwrap_or(0)
981    }
982
983    /// Serialize all headers to a string.
984    pub fn header_block(&self) -> String {
985        self.datasets.iter().map(|d| d.to_header_line()).collect()
986    }
987
988    /// Look up a dataset by name.
989    pub fn find(&self, name: &str) -> Option<&DatasetHeader> {
990        self.datasets.iter().find(|d| d.name == name)
991    }
992}
993
994// ─────────────────────────────────────────────────────────────────────────────
995// Delta encoding
996// ─────────────────────────────────────────────────────────────────────────────
997
998/// Compress trajectory frames using delta encoding.
999///
1000/// The first frame is stored as-is; subsequent frames store the difference
1001/// from the previous frame.
1002pub fn compress_trajectory_delta(frames: &[Vec<[f64; 3]>]) -> Vec<Vec<[f64; 3]>> {
1003    if frames.is_empty() {
1004        return Vec::new();
1005    }
1006    let mut result = Vec::with_capacity(frames.len());
1007    result.push(frames[0].clone());
1008    for i in 1..frames.len() {
1009        let prev = &frames[i - 1];
1010        let curr = &frames[i];
1011        let n = curr.len().min(prev.len());
1012        let delta: Vec<[f64; 3]> = curr
1013            .iter()
1014            .enumerate()
1015            .map(|(j, &c)| {
1016                if j < n {
1017                    [c[0] - prev[j][0], c[1] - prev[j][1], c[2] - prev[j][2]]
1018                } else {
1019                    c
1020                }
1021            })
1022            .collect();
1023        result.push(delta);
1024    }
1025    result
1026}
1027
1028/// Reconstruct trajectory frames from delta-encoded data.
1029pub fn decompress_trajectory_delta(deltas: &[Vec<[f64; 3]>]) -> Vec<Vec<[f64; 3]>> {
1030    if deltas.is_empty() {
1031        return Vec::new();
1032    }
1033    let mut result = Vec::with_capacity(deltas.len());
1034    result.push(deltas[0].clone());
1035    for i in 1..deltas.len() {
1036        let prev = &result[i - 1];
1037        let delta = &deltas[i];
1038        let frame: Vec<[f64; 3]> = delta
1039            .iter()
1040            .enumerate()
1041            .map(|(j, &d)| {
1042                if j < prev.len() {
1043                    [prev[j][0] + d[0], prev[j][1] + d[1], prev[j][2] + d[2]]
1044                } else {
1045                    d
1046                }
1047            })
1048            .collect();
1049        result.push(frame);
1050    }
1051    result
1052}
1053
1054// ─────────────────────────────────────────────────────────────────────────────
1055// I/O statistics
1056// ─────────────────────────────────────────────────────────────────────────────
1057
1058/// Return a formatted I/O statistics summary string.
1059///
1060/// * `bytes_read` — total bytes read.
1061/// * `bytes_written` — total bytes written.
1062/// * `frames` — number of trajectory frames processed.
1063pub fn io_stats(bytes_read: usize, bytes_written: usize, frames: usize) -> String {
1064    format!("IO Stats: read={bytes_read}B written={bytes_written}B frames={frames}")
1065}
1066
1067/// Aggregated I/O performance counters.
1068#[derive(Debug, Clone, Default)]
1069pub struct IoCounters {
1070    /// Total bytes read across all operations.
1071    pub bytes_read: u64,
1072    /// Total bytes written across all operations.
1073    pub bytes_written: u64,
1074    /// Number of read operations.
1075    pub read_ops: u64,
1076    /// Number of write operations.
1077    pub write_ops: u64,
1078    /// Cumulative wall-clock time spent in read calls (seconds).
1079    pub read_time_s: f64,
1080    /// Cumulative wall-clock time spent in write calls (seconds).
1081    pub write_time_s: f64,
1082}
1083
1084impl IoCounters {
1085    /// Create zeroed counters.
1086    pub fn new() -> Self {
1087        Self::default()
1088    }
1089
1090    /// Record a completed read operation.
1091    pub fn record_read(&mut self, bytes: u64, elapsed_s: f64) {
1092        self.bytes_read += bytes;
1093        self.read_ops += 1;
1094        self.read_time_s += elapsed_s;
1095    }
1096
1097    /// Record a completed write operation.
1098    pub fn record_write(&mut self, bytes: u64, elapsed_s: f64) {
1099        self.bytes_written += bytes;
1100        self.write_ops += 1;
1101        self.write_time_s += elapsed_s;
1102    }
1103
1104    /// Average read bandwidth in MB/s.
1105    pub fn avg_read_bw_mb(&self) -> f64 {
1106        if self.read_time_s < 1e-15 {
1107            0.0
1108        } else {
1109            self.bytes_read as f64 / self.read_time_s / 1e6
1110        }
1111    }
1112
1113    /// Average write bandwidth in MB/s.
1114    pub fn avg_write_bw_mb(&self) -> f64 {
1115        if self.write_time_s < 1e-15 {
1116            0.0
1117        } else {
1118            self.bytes_written as f64 / self.write_time_s / 1e6
1119        }
1120    }
1121
1122    /// Reset all counters to zero.
1123    pub fn reset(&mut self) {
1124        *self = Self::default();
1125    }
1126
1127    /// Merge another counter set into this one.
1128    pub fn merge(&mut self, other: &IoCounters) {
1129        self.bytes_read += other.bytes_read;
1130        self.bytes_written += other.bytes_written;
1131        self.read_ops += other.read_ops;
1132        self.write_ops += other.write_ops;
1133        self.read_time_s += other.read_time_s;
1134        self.write_time_s += other.write_time_s;
1135    }
1136}
1137
1138// ─────────────────────────────────────────────────────────────────────────────
1139// Tests
1140// ─────────────────────────────────────────────────────────────────────────────
1141
1142#[cfg(test)]
1143mod tests {
1144    use super::*;
1145
1146    // ── DomainBox ──────────────────────────────────────────────────────────────
1147
1148    #[test]
1149    fn test_domain_box_volume() {
1150        let b = DomainBox::new([0.0, 0.0, 0.0], [2.0, 3.0, 4.0]);
1151        assert!((b.volume() - 24.0).abs() < 1e-10);
1152    }
1153
1154    #[test]
1155    fn test_domain_box_volume_zero() {
1156        let b = DomainBox::new([1.0, 1.0, 1.0], [1.0, 2.0, 2.0]);
1157        assert_eq!(b.volume(), 0.0);
1158    }
1159
1160    #[test]
1161    fn test_domain_box_centre() {
1162        let b = DomainBox::new([0.0, 0.0, 0.0], [2.0, 4.0, 6.0]);
1163        let c = b.centre();
1164        assert!((c[0] - 1.0).abs() < 1e-10);
1165        assert!((c[1] - 2.0).abs() < 1e-10);
1166        assert!((c[2] - 3.0).abs() < 1e-10);
1167    }
1168
1169    #[test]
1170    fn test_domain_box_contains() {
1171        let b = DomainBox::new([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]);
1172        assert!(b.contains([0.5, 0.5, 0.5]));
1173        assert!(!b.contains([1.5, 0.5, 0.5]));
1174        assert!(!b.contains([0.0, 0.5, 0.5])); // boundary is excluded
1175    }
1176
1177    #[test]
1178    fn test_domain_box_overlaps() {
1179        let a = DomainBox::new([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]);
1180        let b = DomainBox::new([0.5, 0.5, 0.5], [1.5, 1.5, 1.5]);
1181        assert!(a.overlaps(&b));
1182        let c = DomainBox::new([2.0, 2.0, 2.0], [3.0, 3.0, 3.0]);
1183        assert!(!a.overlaps(&c));
1184    }
1185
1186    // ── RankDomain ────────────────────────────────────────────────────────────
1187
1188    #[test]
1189    fn test_rank_domain_n_owned_cells() {
1190        let rd = RankDomain::new(
1191            0,
1192            4,
1193            DomainBox::new([0.0; 3], [1.0; 3]),
1194            [1; 6],
1195            [4, 4, 4],
1196            [0; 3],
1197        );
1198        assert_eq!(rd.n_owned_cells(), 64);
1199    }
1200
1201    #[test]
1202    fn test_rank_domain_n_total_cells() {
1203        let rd = RankDomain::new(
1204            0,
1205            1,
1206            DomainBox::new([0.0; 3], [1.0; 3]),
1207            [1, 1, 1, 1, 1, 1],
1208            [4, 4, 4],
1209            [0; 3],
1210        );
1211        // ext: (4+1+1)^3 = 216
1212        assert_eq!(rd.n_total_cells(), 216);
1213    }
1214
1215    // ── GhostLayer / GhostCellMeta ────────────────────────────────────────────
1216
1217    #[test]
1218    fn test_ghost_layer_n_cells() {
1219        let gl = GhostLayer {
1220            face: Face::XMinus,
1221            neighbour_rank: 1,
1222            depth: 2,
1223            tangential_cells: [8, 8],
1224        };
1225        assert_eq!(gl.n_cells(), 128);
1226    }
1227
1228    #[test]
1229    fn test_ghost_cell_meta_total() {
1230        let mut meta = GhostCellMeta::new();
1231        meta.add_layer(GhostLayer {
1232            face: Face::XPlus,
1233            neighbour_rank: 1,
1234            depth: 1,
1235            tangential_cells: [4, 4],
1236        });
1237        meta.add_layer(GhostLayer {
1238            face: Face::XMinus,
1239            neighbour_rank: 2,
1240            depth: 1,
1241            tangential_cells: [4, 4],
1242        });
1243        assert_eq!(meta.total_ghost_cells(), 32);
1244    }
1245
1246    #[test]
1247    fn test_ghost_cell_meta_neighbour_ranks() {
1248        let mut meta = GhostCellMeta::new();
1249        meta.add_layer(GhostLayer {
1250            face: Face::XPlus,
1251            neighbour_rank: 3,
1252            depth: 1,
1253            tangential_cells: [2, 2],
1254        });
1255        meta.add_layer(GhostLayer {
1256            face: Face::YMinus,
1257            neighbour_rank: 1,
1258            depth: 1,
1259            tangential_cells: [2, 2],
1260        });
1261        meta.add_layer(GhostLayer {
1262            face: Face::ZPlus,
1263            neighbour_rank: 3,
1264            depth: 1,
1265            tangential_cells: [2, 2],
1266        });
1267        let ranks = meta.neighbour_ranks();
1268        assert_eq!(ranks, vec![1, 3]);
1269    }
1270
1271    #[test]
1272    fn test_face_display() {
1273        assert_eq!(format!("{}", Face::XMinus), "XMinus");
1274        assert_eq!(format!("{}", Face::ZPlus), "ZPlus");
1275    }
1276
1277    // ── ChunkSpec ─────────────────────────────────────────────────────────────
1278
1279    #[test]
1280    fn test_chunk_spec_size() {
1281        let c = ChunkSpec {
1282            start: 5,
1283            end: 10,
1284            chunk_idx: 0,
1285        };
1286        assert_eq!(c.size(), 5);
1287    }
1288
1289    #[test]
1290    fn test_chunk_spec_size_zero() {
1291        let c = ChunkSpec {
1292            start: 10,
1293            end: 5,
1294            chunk_idx: 0,
1295        };
1296        assert_eq!(c.size(), 0);
1297    }
1298
1299    #[test]
1300    fn test_split_into_chunks_exact() {
1301        let chunks = split_into_chunks(12, 3);
1302        assert_eq!(chunks.len(), 3);
1303        assert_eq!(chunks[0].size(), 4);
1304        assert_eq!(chunks[1].size(), 4);
1305        assert_eq!(chunks[2].size(), 4);
1306    }
1307
1308    #[test]
1309    fn test_split_into_chunks_remainder() {
1310        let chunks = split_into_chunks(10, 3);
1311        assert_eq!(chunks.len(), 3);
1312        let total: usize = chunks.iter().map(|c| c.size()).sum();
1313        assert_eq!(total, 10);
1314    }
1315
1316    #[test]
1317    fn test_split_into_chunks_zero_total() {
1318        assert!(split_into_chunks(0, 4).is_empty());
1319    }
1320
1321    #[test]
1322    fn test_split_into_chunks_zero_n() {
1323        assert!(split_into_chunks(10, 0).is_empty());
1324    }
1325
1326    #[test]
1327    fn test_split_cover_all_elements() {
1328        for total in [1, 5, 100, 1000] {
1329            for n_chunks in [1, 3, 7, 10] {
1330                let chunks = split_into_chunks(total, n_chunks);
1331                let covered: usize = chunks.iter().map(|c| c.size()).sum();
1332                assert_eq!(covered, total, "total={total} n_chunks={n_chunks}");
1333            }
1334        }
1335    }
1336
1337    // ── ParallelWriter ────────────────────────────────────────────────────────
1338
1339    #[test]
1340    fn test_parallel_writer_progress() {
1341        let mut pw = ParallelWriter::new("test.bin", 4);
1342        pw.mark_chunk_written();
1343        pw.mark_chunk_written();
1344        assert!((pw.progress() - 0.5).abs() < 1e-10);
1345    }
1346
1347    #[test]
1348    fn test_parallel_writer_complete() {
1349        let mut pw = ParallelWriter::new("test.bin", 2);
1350        pw.mark_chunk_written();
1351        pw.mark_chunk_written();
1352        assert!(pw.is_complete());
1353    }
1354
1355    #[test]
1356    fn test_parallel_writer_no_overflow() {
1357        let mut pw = ParallelWriter::new("test.bin", 1);
1358        pw.mark_chunk_written();
1359        pw.mark_chunk_written();
1360        assert_eq!(pw.chunks_written, 1);
1361    }
1362
1363    #[test]
1364    fn test_parallel_writer_zero_chunks() {
1365        let pw = ParallelWriter::new("test.bin", 0);
1366        assert!((pw.progress() - 1.0).abs() < 1e-10);
1367        assert!(pw.is_complete());
1368    }
1369
1370    #[test]
1371    fn test_parallel_writer_reset() {
1372        let mut pw = ParallelWriter::new("f", 3);
1373        pw.mark_chunk_written();
1374        pw.mark_chunk_written();
1375        pw.reset();
1376        assert_eq!(pw.chunks_written, 0);
1377        assert!(!pw.is_complete());
1378    }
1379
1380    // ── PVTU / PVD XML ────────────────────────────────────────────────────────
1381
1382    #[test]
1383    fn test_pvtu_xml_contains_pieces() {
1384        let pieces = vec![
1385            PvtuPiece {
1386                file: "rank0.vtu".into(),
1387                rank: 0,
1388            },
1389            PvtuPiece {
1390                file: "rank1.vtu".into(),
1391                rank: 1,
1392            },
1393        ];
1394        let xml = write_pvtu_xml(&pieces, &["velocity"], &["pressure"]);
1395        assert!(xml.contains("rank0.vtu"));
1396        assert!(xml.contains("rank1.vtu"));
1397        assert!(xml.contains("velocity"));
1398        assert!(xml.contains("pressure"));
1399        assert!(xml.contains("PUnstructuredGrid"));
1400    }
1401
1402    #[test]
1403    fn test_pvtu_xml_no_arrays() {
1404        let pieces = vec![PvtuPiece {
1405            file: "r0.vtu".into(),
1406            rank: 0,
1407        }];
1408        let xml = write_pvtu_xml(&pieces, &[], &[]);
1409        assert!(xml.contains("PUnstructuredGrid"));
1410        assert!(!xml.contains("PPointData"));
1411        assert!(!xml.contains("PCellData"));
1412    }
1413
1414    #[test]
1415    fn test_pvd_xml_entries() {
1416        let entries = vec![(0.0, "t0.pvtu"), (0.1, "t1.pvtu"), (0.2, "t2.pvtu")];
1417        let xml = write_pvd_xml(&entries);
1418        assert!(xml.contains("t0.pvtu"));
1419        assert!(xml.contains("t2.pvtu"));
1420        assert!(xml.contains("Collection"));
1421    }
1422
1423    // ── PartitionMap ──────────────────────────────────────────────────────────
1424
1425    #[test]
1426    fn test_partition_map_round_robin() {
1427        let pm = PartitionMap::round_robin(9, 3);
1428        assert_eq!(pm.cells_on_rank(0), 3);
1429        assert_eq!(pm.cells_on_rank(1), 3);
1430        assert_eq!(pm.cells_on_rank(2), 3);
1431    }
1432
1433    #[test]
1434    fn test_partition_map_imbalance_perfect() {
1435        let pm = PartitionMap::round_robin(6, 3);
1436        assert!((pm.imbalance() - 1.0).abs() < 1e-10);
1437    }
1438
1439    #[test]
1440    fn test_partition_map_imbalance_skewed() {
1441        // All cells on rank 0 → high imbalance
1442        let pm = PartitionMap::new(vec![0; 9], 3);
1443        assert!(pm.imbalance() > 1.5);
1444    }
1445
1446    #[test]
1447    fn test_partition_map_to_text_roundtrip_count() {
1448        let pm = PartitionMap::round_robin(4, 2);
1449        let text = pm.to_text();
1450        // Should have n_ranks line + 4 cell lines
1451        assert_eq!(text.lines().count(), 5);
1452    }
1453
1454    // ── DistributedArray ──────────────────────────────────────────────────────
1455
1456    #[test]
1457    fn test_distributed_array_serialize_deserialize() {
1458        let arr = DistributedArray::new(0, 6, 0, vec![1.0, 2.0, 3.0]);
1459        let text = arr.serialize_text();
1460        let recovered = DistributedArray::deserialize_text(&text).unwrap();
1461        assert_eq!(recovered.rank, 0);
1462        assert_eq!(recovered.offset, 0);
1463        assert_eq!(recovered.global_len, 6);
1464        assert!((recovered.data[0] - 1.0).abs() < 1e-10);
1465        assert!((recovered.data[2] - 3.0).abs() < 1e-10);
1466    }
1467
1468    #[test]
1469    fn test_distributed_array_assemble() {
1470        let shard0 = DistributedArray::new(0, 6, 0, vec![1.0, 2.0, 3.0]);
1471        let shard1 = DistributedArray::new(1, 6, 3, vec![4.0, 5.0, 6.0]);
1472        let global = DistributedArray::assemble(&[shard0, shard1]).unwrap();
1473        assert_eq!(global.len(), 6);
1474        for (i, &v) in global.iter().enumerate() {
1475            assert!((v - (i + 1) as f64).abs() < 1e-10);
1476        }
1477    }
1478
1479    #[test]
1480    fn test_distributed_array_assemble_empty() {
1481        let result = DistributedArray::assemble(&[]).unwrap();
1482        assert!(result.is_empty());
1483    }
1484
1485    // ── CheckpointHeader ──────────────────────────────────────────────────────
1486
1487    #[test]
1488    fn test_checkpoint_header_roundtrip() {
1489        let h = CheckpointHeader::new(42, 3.125, 4, 1000, "sim_v1");
1490        let line = h.to_line();
1491        let h2 = CheckpointHeader::from_line(&line).unwrap();
1492        assert_eq!(h2.step, 42);
1493        assert!((h2.time - 3.125).abs() < 1e-6);
1494        assert_eq!(h2.n_ranks, 4);
1495        assert_eq!(h2.n_items, 1000);
1496        assert_eq!(h2.tag, "sim_v1");
1497    }
1498
1499    #[test]
1500    fn test_checkpoint_header_bad_line() {
1501        assert!(CheckpointHeader::from_line("garbage").is_none());
1502    }
1503
1504    // ── write_checkpoint / read_checkpoint ────────────────────────────────────
1505
1506    #[test]
1507    fn test_checkpoint_write_read_roundtrip() {
1508        let path = "/tmp/test_oxiphysics_ckpt.txt";
1509        let header = CheckpointHeader::new(10, 1.5, 2, 3, "test");
1510        let positions = vec![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]];
1511        write_checkpoint(path, &header, &positions).unwrap();
1512        let (h2, pos2) = read_checkpoint(path).unwrap();
1513        assert_eq!(h2.step, 10);
1514        assert_eq!(pos2.len(), 3);
1515        assert!((pos2[0][0] - 1.0).abs() < 1e-6);
1516        assert!((pos2[2][2] - 9.0).abs() < 1e-6);
1517        let _ = fs::remove_file(path);
1518    }
1519
1520    // ── merge_chunk_files / merge_rank_files ──────────────────────────────────
1521
1522    #[test]
1523    fn test_merge_chunk_files() {
1524        let base = "/tmp/test_merge_par_chunks";
1525        let output = "/tmp/test_merge_par_output.bin";
1526        for i in 0..3usize {
1527            fs::write(format!("{base}.chunk{i}"), format!("chunk{i}\n")).unwrap();
1528        }
1529        merge_chunk_files(base, 3, output).unwrap();
1530        let merged = fs::read_to_string(output).unwrap();
1531        assert!(merged.contains("chunk0"));
1532        assert!(merged.contains("chunk2"));
1533        for i in 0..3usize {
1534            let _ = fs::remove_file(format!("{base}.chunk{i}"));
1535        }
1536        let _ = fs::remove_file(output);
1537    }
1538
1539    #[test]
1540    fn test_merge_rank_files() {
1541        let base = "/tmp/test_merge_par_ranks";
1542        let output = "/tmp/test_merge_par_rank_output.bin";
1543        for r in 0..2usize {
1544            fs::write(format!("{base}.rank{r}"), format!("rank{r}\n")).unwrap();
1545        }
1546        merge_rank_files(base, 2, output).unwrap();
1547        let merged = fs::read_to_string(output).unwrap();
1548        assert!(merged.contains("rank0"));
1549        assert!(merged.contains("rank1"));
1550        for r in 0..2usize {
1551            let _ = fs::remove_file(format!("{base}.rank{r}"));
1552        }
1553        let _ = fs::remove_file(output);
1554    }
1555
1556    // ── estimate_io_bandwidth ─────────────────────────────────────────────────
1557
1558    #[test]
1559    fn test_estimate_io_bandwidth_normal() {
1560        let bw = estimate_io_bandwidth(1_000_000, 1.0);
1561        assert!((bw - 1.0).abs() < 1e-9);
1562    }
1563
1564    #[test]
1565    fn test_estimate_io_bandwidth_zero_time() {
1566        assert_eq!(estimate_io_bandwidth(1000, 0.0), 0.0);
1567    }
1568
1569    #[test]
1570    fn test_bandwidth_large() {
1571        let bw = estimate_io_bandwidth(1_000_000_000, 0.5);
1572        assert!((bw - 2000.0).abs() < 1.0);
1573    }
1574
1575    // ── ChunkedTrajectory ─────────────────────────────────────────────────────
1576
1577    #[test]
1578    fn test_chunked_trajectory_n_chunks() {
1579        let mut traj = ChunkedTrajectory::new(2, 3);
1580        for _ in 0..7 {
1581            traj.push_frame(vec![[0.0; 3]; 2]);
1582        }
1583        assert_eq!(traj.n_chunks(), 3);
1584    }
1585
1586    #[test]
1587    fn test_write_read_chunked_xyz_roundtrip() {
1588        let base = "/tmp/test_chunked_traj_par2";
1589        let mut traj = ChunkedTrajectory::new(2, 2);
1590        traj.push_frame(vec![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]);
1591        traj.push_frame(vec![[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]);
1592        traj.push_frame(vec![[9.0, 8.0, 7.0], [6.0, 5.0, 4.0]]);
1593        write_chunked_xyz(&traj, base).unwrap();
1594        let loaded = read_chunked_xyz(base, traj.n_chunks()).unwrap();
1595        assert_eq!(loaded.n_frames, 3);
1596        assert!((loaded.frames[0][0][0] - 1.0).abs() < 1e-4);
1597        for i in 0..traj.n_chunks() {
1598            let _ = fs::remove_file(format!("{base}.chunk{i}.xyz"));
1599        }
1600    }
1601
1602    // ── Delta encoding ────────────────────────────────────────────────────────
1603
1604    #[test]
1605    fn test_compress_decompress_delta_roundtrip() {
1606        let frames = vec![
1607            vec![[1.0_f64, 2.0, 3.0], [4.0, 5.0, 6.0]],
1608            vec![[1.1, 2.1, 3.1], [4.2, 5.2, 6.2]],
1609            vec![[1.5, 2.5, 3.5], [5.0, 5.5, 6.5]],
1610        ];
1611        let deltas = compress_trajectory_delta(&frames);
1612        let recovered = decompress_trajectory_delta(&deltas);
1613        assert_eq!(recovered.len(), frames.len());
1614        for (f, r) in frames.iter().zip(recovered.iter()) {
1615            for (fa, ra) in f.iter().zip(r.iter()) {
1616                assert!((fa[0] - ra[0]).abs() < 1e-10);
1617            }
1618        }
1619    }
1620
1621    #[test]
1622    fn test_compress_delta_single_frame() {
1623        let frames = vec![vec![[1.0_f64, 2.0, 3.0]]];
1624        let d = compress_trajectory_delta(&frames);
1625        assert_eq!(d.len(), 1);
1626        assert!((d[0][0][0] - 1.0).abs() < 1e-10);
1627    }
1628
1629    // ── ParallelFileLayout ────────────────────────────────────────────────────
1630
1631    #[test]
1632    fn test_parallel_file_layout_offsets() {
1633        let mut layout = ParallelFileLayout::new();
1634        layout.add_dataset("positions", DsetType::Float64, 3, 100);
1635        layout.add_dataset("velocity", DsetType::Float64, 3, 100);
1636        let pos = layout.find("positions").unwrap();
1637        assert_eq!(pos.byte_offset, 0);
1638        let vel = layout.find("velocity").unwrap();
1639        // 100 * 3 * 8 = 2400 bytes
1640        assert_eq!(vel.byte_offset, 2400);
1641    }
1642
1643    #[test]
1644    fn test_parallel_file_layout_total_bytes() {
1645        let mut layout = ParallelFileLayout::new();
1646        layout.add_dataset("x", DsetType::Float32, 1, 50);
1647        layout.add_dataset("y", DsetType::Uint32, 1, 50);
1648        // 50*4 + 50*4 = 400
1649        assert_eq!(layout.total_data_bytes(), 400);
1650    }
1651
1652    #[test]
1653    fn test_dataset_type_byte_sizes() {
1654        assert_eq!(DsetType::Float64.byte_size(), 8);
1655        assert_eq!(DsetType::Float32.byte_size(), 4);
1656        assert_eq!(DsetType::Uint64.byte_size(), 8);
1657        assert_eq!(DsetType::Uint32.byte_size(), 4);
1658    }
1659
1660    #[test]
1661    fn test_dataset_header_serialise() {
1662        let h = DatasetHeader::new("temp", DsetType::Float32, 1, 256, 0);
1663        let line = h.to_header_line();
1664        assert!(line.contains("DATASET"));
1665        assert!(line.contains("temp"));
1666        assert!(line.contains("float32"));
1667    }
1668
1669    // ── IoCounters ────────────────────────────────────────────────────────────
1670
1671    #[test]
1672    fn test_io_counters_read_bw() {
1673        let mut c = IoCounters::new();
1674        c.record_read(10_000_000, 0.01);
1675        let bw = c.avg_read_bw_mb();
1676        assert!((bw - 1000.0).abs() < 1.0);
1677    }
1678
1679    #[test]
1680    fn test_io_counters_merge() {
1681        let mut a = IoCounters::new();
1682        a.record_write(1000, 0.001);
1683        let mut b = IoCounters::new();
1684        b.record_write(2000, 0.002);
1685        a.merge(&b);
1686        assert_eq!(a.bytes_written, 3000);
1687        assert_eq!(a.write_ops, 2);
1688    }
1689
1690    #[test]
1691    fn test_io_counters_reset() {
1692        let mut c = IoCounters::new();
1693        c.record_read(999, 1.0);
1694        c.reset();
1695        assert_eq!(c.bytes_read, 0);
1696        assert_eq!(c.read_ops, 0);
1697    }
1698
1699    #[test]
1700    fn test_io_stats_format() {
1701        let s = io_stats(1024, 2048, 10);
1702        assert!(s.contains("1024"));
1703        assert!(s.contains("2048"));
1704        assert!(s.contains("10"));
1705    }
1706}