1#![allow(dead_code)]
18
19use std::fmt;
20use std::fs;
21use std::io::{self, BufRead, Write};
22
23#[derive(Debug, Clone, PartialEq)]
29pub struct DomainBox {
30 pub min: [f64; 3],
32 pub max: [f64; 3],
34}
35
36impl DomainBox {
37 pub fn new(min: [f64; 3], max: [f64; 3]) -> Self {
39 Self { min, max }
40 }
41
42 pub fn volume(&self) -> f64 {
44 (self.max[0] - self.min[0]).max(0.0)
45 * (self.max[1] - self.min[1]).max(0.0)
46 * (self.max[2] - self.min[2]).max(0.0)
47 }
48
49 pub fn centre(&self) -> [f64; 3] {
51 [
52 0.5 * (self.min[0] + self.max[0]),
53 0.5 * (self.min[1] + self.max[1]),
54 0.5 * (self.min[2] + self.max[2]),
55 ]
56 }
57
58 pub fn contains(&self, p: [f64; 3]) -> bool {
60 p[0] > self.min[0]
61 && p[0] < self.max[0]
62 && p[1] > self.min[1]
63 && p[1] < self.max[1]
64 && p[2] > self.min[2]
65 && p[2] < self.max[2]
66 }
67
68 pub fn overlaps(&self, other: &DomainBox) -> bool {
70 self.min[0] < other.max[0]
71 && self.max[0] > other.min[0]
72 && self.min[1] < other.max[1]
73 && self.max[1] > other.min[1]
74 && self.min[2] < other.max[2]
75 && self.max[2] > other.min[2]
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct RankDomain {
82 pub rank: usize,
84 pub n_ranks: usize,
86 pub owned_box: DomainBox,
88 pub ghost_layers: [usize; 6],
90 pub owned_cells: [usize; 3],
92 pub global_offset: [usize; 3],
94}
95
96impl RankDomain {
97 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 rank: usize,
101 n_ranks: usize,
102 owned_box: DomainBox,
103 ghost_layers: [usize; 6],
104 owned_cells: [usize; 3],
105 global_offset: [usize; 3],
106 ) -> Self {
107 Self {
108 rank,
109 n_ranks,
110 owned_box,
111 ghost_layers,
112 owned_cells,
113 global_offset,
114 }
115 }
116
117 pub fn n_owned_cells(&self) -> usize {
119 self.owned_cells[0] * self.owned_cells[1] * self.owned_cells[2]
120 }
121
122 pub fn n_total_cells(&self) -> usize {
124 let ext_x = self.owned_cells[0] + self.ghost_layers[0] + self.ghost_layers[1];
125 let ext_y = self.owned_cells[1] + self.ghost_layers[2] + self.ghost_layers[3];
126 let ext_z = self.owned_cells[2] + self.ghost_layers[4] + self.ghost_layers[5];
127 ext_x * ext_y * ext_z
128 }
129
130 pub fn load_fraction(&self) -> f64 {
134 if self.n_ranks == 0 {
135 return 1.0;
136 }
137 self.n_owned_cells() as f64
138 }
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum Face {
148 XMinus,
150 XPlus,
152 YMinus,
154 YPlus,
156 ZMinus,
158 ZPlus,
160}
161
162impl fmt::Display for Face {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 let s = match self {
165 Face::XMinus => "XMinus",
166 Face::XPlus => "XPlus",
167 Face::YMinus => "YMinus",
168 Face::YPlus => "YPlus",
169 Face::ZMinus => "ZMinus",
170 Face::ZPlus => "ZPlus",
171 };
172 write!(f, "{s}")
173 }
174}
175
176#[derive(Debug, Clone)]
178pub struct GhostLayer {
179 pub face: Face,
181 pub neighbour_rank: usize,
183 pub depth: usize,
185 pub tangential_cells: [usize; 2],
187}
188
189impl GhostLayer {
190 pub fn n_cells(&self) -> usize {
192 self.depth * self.tangential_cells[0] * self.tangential_cells[1]
193 }
194}
195
196#[derive(Debug, Clone, Default)]
198pub struct GhostCellMeta {
199 pub layers: Vec<GhostLayer>,
201}
202
203impl GhostCellMeta {
204 pub fn new() -> Self {
206 Self::default()
207 }
208
209 pub fn add_layer(&mut self, layer: GhostLayer) {
211 self.layers.push(layer);
212 }
213
214 pub fn total_ghost_cells(&self) -> usize {
216 self.layers.iter().map(|l| l.n_cells()).sum()
217 }
218
219 pub fn neighbour_ranks(&self) -> Vec<usize> {
221 let mut ranks: Vec<usize> = self.layers.iter().map(|l| l.neighbour_rank).collect();
222 ranks.sort_unstable();
223 ranks.dedup();
224 ranks
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct ChunkSpec {
235 pub start: usize,
237 pub end: usize,
239 pub chunk_idx: usize,
241}
242
243impl ChunkSpec {
244 pub fn size(&self) -> usize {
246 self.end.saturating_sub(self.start)
247 }
248}
249
250pub fn split_into_chunks(total: usize, n_chunks: usize) -> Vec<ChunkSpec> {
255 if total == 0 || n_chunks == 0 {
256 return Vec::new();
257 }
258 let actual_chunks = n_chunks.min(total);
259 let base = total / actual_chunks;
260 let remainder = total % actual_chunks;
261 let mut specs = Vec::with_capacity(actual_chunks);
262 let mut start = 0;
263 for idx in 0..actual_chunks {
264 let extra = if idx < remainder { 1 } else { 0 };
265 let end = start + base + extra;
266 specs.push(ChunkSpec {
267 start,
268 end,
269 chunk_idx: idx,
270 });
271 start = end;
272 }
273 specs
274}
275
276#[derive(Debug, Clone)]
282pub struct ParallelWriter {
283 pub filename: String,
285 pub chunks_written: usize,
287 pub total_chunks: usize,
289}
290
291impl ParallelWriter {
292 pub fn new(filename: impl Into<String>, total_chunks: usize) -> Self {
294 Self {
295 filename: filename.into(),
296 chunks_written: 0,
297 total_chunks,
298 }
299 }
300
301 pub fn mark_chunk_written(&mut self) {
303 if self.chunks_written < self.total_chunks {
304 self.chunks_written += 1;
305 }
306 }
307
308 pub fn is_complete(&self) -> bool {
310 self.chunks_written >= self.total_chunks
311 }
312
313 pub fn progress(&self) -> f64 {
315 if self.total_chunks == 0 {
316 return 1.0;
317 }
318 self.chunks_written as f64 / self.total_chunks as f64
319 }
320
321 pub fn reset(&mut self) {
323 self.chunks_written = 0;
324 }
325}
326
327#[derive(Debug, Clone)]
333pub struct PvtuPiece {
334 pub file: String,
336 pub rank: usize,
338}
339
340pub fn write_pvtu_xml(pieces: &[PvtuPiece], point_arrays: &[&str], cell_arrays: &[&str]) -> String {
346 let mut s = String::new();
347 s.push_str("<?xml version=\"1.0\"?>\n");
348 s.push_str(
349 "<VTKFile type=\"PUnstructuredGrid\" version=\"0.1\" byte_order=\"LittleEndian\">\n",
350 );
351 s.push_str(" <PUnstructuredGrid GhostLevel=\"1\">\n");
352 s.push_str(" <PPoints>\n");
353 s.push_str(" <PDataArray type=\"Float64\" NumberOfComponents=\"3\"/>\n");
354 s.push_str(" </PPoints>\n");
355 if !point_arrays.is_empty() {
356 s.push_str(" <PPointData>\n");
357 for name in point_arrays {
358 s.push_str(&format!(
359 " <PDataArray type=\"Float64\" Name=\"{name}\"/>\n"
360 ));
361 }
362 s.push_str(" </PPointData>\n");
363 }
364 if !cell_arrays.is_empty() {
365 s.push_str(" <PCellData>\n");
366 for name in cell_arrays {
367 s.push_str(&format!(
368 " <PDataArray type=\"Float64\" Name=\"{name}\"/>\n"
369 ));
370 }
371 s.push_str(" </PCellData>\n");
372 }
373 for piece in pieces {
374 s.push_str(&format!(" <Piece Source=\"{}\"/>\n", piece.file));
375 }
376 s.push_str(" </PUnstructuredGrid>\n");
377 s.push_str("</VTKFile>\n");
378 s
379}
380
381pub fn write_pvd_xml(entries: &[(f64, &str)]) -> String {
385 let mut s = String::new();
386 s.push_str("<?xml version=\"1.0\"?>\n");
387 s.push_str("<VTKFile type=\"Collection\" version=\"0.1\">\n");
388 s.push_str(" <Collection>\n");
389 for (t, file) in entries {
390 s.push_str(&format!(
391 " <DataSet timestep=\"{t:.6e}\" group=\"\" part=\"0\" file=\"{file}\"/>\n"
392 ));
393 }
394 s.push_str(" </Collection>\n");
395 s.push_str("</VTKFile>\n");
396 s
397}
398
399#[derive(Debug, Clone)]
405pub struct PartitionMap {
406 pub cell_to_rank: Vec<usize>,
408 pub n_ranks: usize,
410}
411
412impl PartitionMap {
413 pub fn new(cell_to_rank: Vec<usize>, n_ranks: usize) -> Self {
415 Self {
416 cell_to_rank,
417 n_ranks,
418 }
419 }
420
421 pub fn n_cells(&self) -> usize {
423 self.cell_to_rank.len()
424 }
425
426 pub fn cells_on_rank(&self, rank: usize) -> usize {
428 self.cell_to_rank.iter().filter(|&&r| r == rank).count()
429 }
430
431 pub fn imbalance(&self) -> f64 {
436 if self.n_cells() == 0 || self.n_ranks == 0 {
437 return 1.0;
438 }
439 let ideal = self.n_cells() as f64 / self.n_ranks as f64;
440 let max_load = (0..self.n_ranks)
441 .map(|r| self.cells_on_rank(r))
442 .max()
443 .unwrap_or(0) as f64;
444 if ideal < 1e-12 { 1.0 } else { max_load / ideal }
445 }
446
447 pub fn round_robin(n_cells: usize, n_ranks: usize) -> Self {
449 if n_ranks == 0 {
450 return Self::new(vec![], 0);
451 }
452 let map = (0..n_cells).map(|i| i % n_ranks).collect();
453 Self::new(map, n_ranks)
454 }
455
456 pub fn to_text(&self) -> String {
458 let mut s = format!("n_ranks={}\n", self.n_ranks);
459 for (i, &r) in self.cell_to_rank.iter().enumerate() {
460 s.push_str(&format!("{i} {r}\n"));
461 }
462 s
463 }
464}
465
466#[derive(Debug, Clone)]
472pub struct DistributedArray {
473 pub rank: usize,
475 pub global_len: usize,
477 pub offset: usize,
479 pub data: Vec<f64>,
481}
482
483impl DistributedArray {
484 pub fn new(rank: usize, global_len: usize, offset: usize, data: Vec<f64>) -> Self {
486 Self {
487 rank,
488 global_len,
489 offset,
490 data,
491 }
492 }
493
494 pub fn local_len(&self) -> usize {
496 self.data.len()
497 }
498
499 pub fn serialize_text(&self) -> String {
503 let hex: String = self
504 .data
505 .iter()
506 .flat_map(|&v| v.to_le_bytes())
507 .map(|b| format!("{b:02x}"))
508 .collect();
509 format!(
510 "{} {} {} {} {}\n",
511 self.rank,
512 self.offset,
513 self.global_len,
514 self.data.len(),
515 hex
516 )
517 }
518
519 pub fn deserialize_text(s: &str) -> Option<Self> {
521 let mut it = s.split_whitespace();
522 let rank: usize = it.next()?.parse().ok()?;
523 let offset: usize = it.next()?.parse().ok()?;
524 let global_len: usize = it.next()?.parse().ok()?;
525 let n: usize = it.next()?.parse().ok()?;
526 let hex = it.next().unwrap_or("");
527 if hex.len() != n * 16 {
528 return None;
529 }
530 let data: Vec<f64> = (0..n)
531 .map(|i| {
532 let chunk = &hex[i * 16..(i + 1) * 16];
533 let mut bytes = [0u8; 8];
534 for (j, byte) in bytes.iter_mut().enumerate() {
535 *byte = u8::from_str_radix(&chunk[j * 2..j * 2 + 2], 16).unwrap_or(0);
536 }
537 f64::from_le_bytes(bytes)
538 })
539 .collect();
540 Some(Self {
541 rank,
542 global_len,
543 offset,
544 data,
545 })
546 }
547
548 pub fn assemble(shards: &[DistributedArray]) -> Option<Vec<f64>> {
552 if shards.is_empty() {
553 return Some(vec![]);
554 }
555 let global_len = shards[0].global_len;
556 let mut result = vec![0.0_f64; global_len];
557 for shard in shards {
558 let end = shard.offset + shard.local_len();
559 if end > global_len {
560 return None;
561 }
562 result[shard.offset..end].copy_from_slice(&shard.data);
563 }
564 Some(result)
565 }
566}
567
568#[derive(Debug, Clone)]
574pub struct CheckpointHeader {
575 pub step: u64,
577 pub time: f64,
579 pub n_ranks: usize,
581 pub n_items: usize,
583 pub tag: String,
585}
586
587impl CheckpointHeader {
588 pub fn new(step: u64, time: f64, n_ranks: usize, n_items: usize, tag: &str) -> Self {
590 Self {
591 step,
592 time,
593 n_ranks,
594 n_items,
595 tag: tag.to_owned(),
596 }
597 }
598
599 pub fn to_line(&self) -> String {
601 format!(
602 "CHECKPOINT step={} time={:.10e} n_ranks={} n_items={} tag={}\n",
603 self.step, self.time, self.n_ranks, self.n_items, self.tag
604 )
605 }
606
607 pub fn from_line(line: &str) -> Option<Self> {
609 if !line.trim_start().starts_with("CHECKPOINT") {
610 return None;
611 }
612 let mut step = 0u64;
613 let mut time = 0.0f64;
614 let mut n_ranks = 1usize;
615 let mut n_items = 0usize;
616 let mut tag = String::new();
617 for token in line.split_whitespace().skip(1) {
618 if let Some(rest) = token.strip_prefix("step=") {
619 step = rest.parse().unwrap_or(0);
620 } else if let Some(rest) = token.strip_prefix("time=") {
621 time = rest.parse().unwrap_or(0.0);
622 } else if let Some(rest) = token.strip_prefix("n_ranks=") {
623 n_ranks = rest.parse().unwrap_or(1);
624 } else if let Some(rest) = token.strip_prefix("n_items=") {
625 n_items = rest.parse().unwrap_or(0);
626 } else if let Some(rest) = token.strip_prefix("tag=") {
627 tag = rest.to_owned();
628 }
629 }
630 Some(Self {
631 step,
632 time,
633 n_ranks,
634 n_items,
635 tag,
636 })
637 }
638}
639
640pub fn write_checkpoint(
645 path: &str,
646 header: &CheckpointHeader,
647 positions: &[[f64; 3]],
648) -> Result<(), io::Error> {
649 let mut file = fs::File::create(path)?;
650 write!(file, "{}", header.to_line())?;
651 for pos in positions {
652 writeln!(file, "{:.10e} {:.10e} {:.10e}", pos[0], pos[1], pos[2])?;
653 }
654 Ok(())
655}
656
657pub fn read_checkpoint(path: &str) -> Result<(CheckpointHeader, Vec<[f64; 3]>), io::Error> {
661 let file = fs::File::open(path)?;
662 let reader = io::BufReader::new(file);
663 let mut lines_iter = reader.lines();
664 let header_line = lines_iter
665 .next()
666 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "empty checkpoint"))??;
667 let header = CheckpointHeader::from_line(&header_line)
668 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "bad checkpoint header"))?;
669 let mut positions = Vec::new();
670 for line in lines_iter {
671 let line = line?;
672 let parts: Vec<&str> = line.split_whitespace().collect();
673 if parts.len() >= 3 {
674 let x: f64 = parts[0].parse().unwrap_or(0.0);
675 let y: f64 = parts[1].parse().unwrap_or(0.0);
676 let z: f64 = parts[2].parse().unwrap_or(0.0);
677 positions.push([x, y, z]);
678 }
679 }
680 Ok((header, positions))
681}
682
683pub fn merge_chunk_files(
692 base_path: &str,
693 n_chunks: usize,
694 output_path: &str,
695) -> Result<(), io::Error> {
696 let mut out = fs::File::create(output_path)?;
697 for i in 0..n_chunks {
698 let chunk_path = format!("{base_path}.chunk{i}");
699 let data = fs::read(&chunk_path)?;
700 out.write_all(&data)?;
701 }
702 Ok(())
703}
704
705pub fn merge_rank_files(
709 base_path: &str,
710 n_ranks: usize,
711 output_path: &str,
712) -> Result<(), io::Error> {
713 let mut out = fs::File::create(output_path)?;
714 for r in 0..n_ranks {
715 let rank_path = format!("{base_path}.rank{r}");
716 if let Ok(data) = fs::read(&rank_path) {
717 out.write_all(&data)?;
718 }
719 }
720 Ok(())
721}
722
723pub fn estimate_io_bandwidth(bytes: usize, elapsed_secs: f64) -> f64 {
732 if elapsed_secs <= 0.0 {
733 return 0.0;
734 }
735 (bytes as f64) / elapsed_secs / 1e6
736}
737
738#[derive(Debug, Clone)]
744pub struct ChunkedTrajectory {
745 pub n_frames: usize,
747 pub n_atoms: usize,
749 pub chunk_size: usize,
751 pub frames: Vec<Vec<[f64; 3]>>,
753}
754
755impl ChunkedTrajectory {
756 pub fn new(n_atoms: usize, chunk_size: usize) -> Self {
758 Self {
759 n_frames: 0,
760 n_atoms,
761 chunk_size: chunk_size.max(1),
762 frames: Vec::new(),
763 }
764 }
765
766 pub fn push_frame(&mut self, frame: Vec<[f64; 3]>) {
768 self.frames.push(frame);
769 self.n_frames = self.frames.len();
770 }
771
772 pub fn n_chunks(&self) -> usize {
774 if self.n_frames == 0 {
775 return 0;
776 }
777 self.n_frames.div_ceil(self.chunk_size)
778 }
779}
780
781pub fn write_chunked_xyz(traj: &ChunkedTrajectory, base_path: &str) -> Result<(), io::Error> {
789 let n_chunks = traj.n_chunks();
790 for chunk_idx in 0..n_chunks {
791 let start_frame = chunk_idx * traj.chunk_size;
792 let end_frame = (start_frame + traj.chunk_size).min(traj.n_frames);
793 let chunk_path = format!("{base_path}.chunk{chunk_idx}.xyz");
794 let mut file = fs::File::create(&chunk_path)?;
795 for frame_idx in start_frame..end_frame {
796 let frame = &traj.frames[frame_idx];
797 writeln!(file, "{}", frame.len())?;
798 writeln!(file, "Frame {frame_idx}")?;
799 for pos in frame {
800 writeln!(file, "X {:.6} {:.6} {:.6}", pos[0], pos[1], pos[2])?;
801 }
802 }
803 }
804 Ok(())
805}
806
807pub fn read_chunked_xyz(base_path: &str, n_chunks: usize) -> Result<ChunkedTrajectory, io::Error> {
811 let mut frames: Vec<Vec<[f64; 3]>> = Vec::new();
812 for chunk_idx in 0..n_chunks {
813 let chunk_path = format!("{base_path}.chunk{chunk_idx}.xyz");
814 let file = fs::File::open(&chunk_path)?;
815 let reader = io::BufReader::new(file);
816 let lines: Vec<String> = reader.lines().collect::<Result<Vec<_>, _>>()?;
817 let mut idx = 0;
818 while idx < lines.len() {
819 let n_atoms: usize = lines[idx].trim().parse().unwrap_or(0);
820 idx += 1;
821 if idx >= lines.len() {
822 break;
823 }
824 idx += 1; let mut frame = Vec::with_capacity(n_atoms);
826 for _ in 0..n_atoms {
827 if idx >= lines.len() {
828 break;
829 }
830 let parts: Vec<&str> = lines[idx].split_whitespace().collect();
831 if parts.len() >= 4 {
832 let x: f64 = parts[1].parse().unwrap_or(0.0);
833 let y: f64 = parts[2].parse().unwrap_or(0.0);
834 let z: f64 = parts[3].parse().unwrap_or(0.0);
835 frame.push([x, y, z]);
836 }
837 idx += 1;
838 }
839 if !frame.is_empty() {
840 frames.push(frame);
841 }
842 }
843 }
844 let n_frames = frames.len();
845 let n_atoms = frames.first().map(|f| f.len()).unwrap_or(0);
846 Ok(ChunkedTrajectory {
847 n_frames,
848 n_atoms,
849 chunk_size: n_frames.max(1),
850 frames,
851 })
852}
853
854#[derive(Debug, Clone, PartialEq, Eq)]
860pub enum DsetType {
861 Float64,
863 Float32,
865 Uint64,
867 Uint32,
869}
870
871impl fmt::Display for DsetType {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 let s = match self {
874 DsetType::Float64 => "float64",
875 DsetType::Float32 => "float32",
876 DsetType::Uint64 => "uint64",
877 DsetType::Uint32 => "uint32",
878 };
879 write!(f, "{s}")
880 }
881}
882
883impl DsetType {
884 pub fn byte_size(&self) -> usize {
886 match self {
887 DsetType::Float64 | DsetType::Uint64 => 8,
888 DsetType::Float32 | DsetType::Uint32 => 4,
889 }
890 }
891}
892
893#[derive(Debug, Clone)]
895pub struct DatasetHeader {
896 pub name: String,
898 pub dtype: DsetType,
900 pub n_components: usize,
902 pub global_n: usize,
904 pub byte_offset: usize,
906}
907
908impl DatasetHeader {
909 pub fn new(
911 name: &str,
912 dtype: DsetType,
913 n_components: usize,
914 global_n: usize,
915 byte_offset: usize,
916 ) -> Self {
917 Self {
918 name: name.to_owned(),
919 dtype,
920 n_components,
921 global_n,
922 byte_offset,
923 }
924 }
925
926 pub fn data_size_bytes(&self) -> usize {
928 self.global_n * self.n_components * self.dtype.byte_size()
929 }
930
931 pub fn to_header_line(&self) -> String {
933 format!(
934 "DATASET name={} dtype={} n_comp={} global_n={} offset={}\n",
935 self.name, self.dtype, self.n_components, self.global_n, self.byte_offset
936 )
937 }
938}
939
940#[derive(Debug, Clone, Default)]
942pub struct ParallelFileLayout {
943 pub datasets: Vec<DatasetHeader>,
945}
946
947impl ParallelFileLayout {
948 pub fn new() -> Self {
950 Self::default()
951 }
952
953 pub fn add_dataset(
955 &mut self,
956 name: &str,
957 dtype: DsetType,
958 n_components: usize,
959 global_n: usize,
960 ) {
961 let offset = self
962 .datasets
963 .last()
964 .map(|d| d.byte_offset + d.data_size_bytes())
965 .unwrap_or(0);
966 self.datasets.push(DatasetHeader::new(
967 name,
968 dtype,
969 n_components,
970 global_n,
971 offset,
972 ));
973 }
974
975 pub fn total_data_bytes(&self) -> usize {
977 self.datasets
978 .last()
979 .map(|d| d.byte_offset + d.data_size_bytes())
980 .unwrap_or(0)
981 }
982
983 pub fn header_block(&self) -> String {
985 self.datasets.iter().map(|d| d.to_header_line()).collect()
986 }
987
988 pub fn find(&self, name: &str) -> Option<&DatasetHeader> {
990 self.datasets.iter().find(|d| d.name == name)
991 }
992}
993
994pub fn compress_trajectory_delta(frames: &[Vec<[f64; 3]>]) -> Vec<Vec<[f64; 3]>> {
1003 if frames.is_empty() {
1004 return Vec::new();
1005 }
1006 let mut result = Vec::with_capacity(frames.len());
1007 result.push(frames[0].clone());
1008 for i in 1..frames.len() {
1009 let prev = &frames[i - 1];
1010 let curr = &frames[i];
1011 let n = curr.len().min(prev.len());
1012 let delta: Vec<[f64; 3]> = curr
1013 .iter()
1014 .enumerate()
1015 .map(|(j, &c)| {
1016 if j < n {
1017 [c[0] - prev[j][0], c[1] - prev[j][1], c[2] - prev[j][2]]
1018 } else {
1019 c
1020 }
1021 })
1022 .collect();
1023 result.push(delta);
1024 }
1025 result
1026}
1027
1028pub fn decompress_trajectory_delta(deltas: &[Vec<[f64; 3]>]) -> Vec<Vec<[f64; 3]>> {
1030 if deltas.is_empty() {
1031 return Vec::new();
1032 }
1033 let mut result = Vec::with_capacity(deltas.len());
1034 result.push(deltas[0].clone());
1035 for i in 1..deltas.len() {
1036 let prev = &result[i - 1];
1037 let delta = &deltas[i];
1038 let frame: Vec<[f64; 3]> = delta
1039 .iter()
1040 .enumerate()
1041 .map(|(j, &d)| {
1042 if j < prev.len() {
1043 [prev[j][0] + d[0], prev[j][1] + d[1], prev[j][2] + d[2]]
1044 } else {
1045 d
1046 }
1047 })
1048 .collect();
1049 result.push(frame);
1050 }
1051 result
1052}
1053
1054pub fn io_stats(bytes_read: usize, bytes_written: usize, frames: usize) -> String {
1064 format!("IO Stats: read={bytes_read}B written={bytes_written}B frames={frames}")
1065}
1066
1067#[derive(Debug, Clone, Default)]
1069pub struct IoCounters {
1070 pub bytes_read: u64,
1072 pub bytes_written: u64,
1074 pub read_ops: u64,
1076 pub write_ops: u64,
1078 pub read_time_s: f64,
1080 pub write_time_s: f64,
1082}
1083
1084impl IoCounters {
1085 pub fn new() -> Self {
1087 Self::default()
1088 }
1089
1090 pub fn record_read(&mut self, bytes: u64, elapsed_s: f64) {
1092 self.bytes_read += bytes;
1093 self.read_ops += 1;
1094 self.read_time_s += elapsed_s;
1095 }
1096
1097 pub fn record_write(&mut self, bytes: u64, elapsed_s: f64) {
1099 self.bytes_written += bytes;
1100 self.write_ops += 1;
1101 self.write_time_s += elapsed_s;
1102 }
1103
1104 pub fn avg_read_bw_mb(&self) -> f64 {
1106 if self.read_time_s < 1e-15 {
1107 0.0
1108 } else {
1109 self.bytes_read as f64 / self.read_time_s / 1e6
1110 }
1111 }
1112
1113 pub fn avg_write_bw_mb(&self) -> f64 {
1115 if self.write_time_s < 1e-15 {
1116 0.0
1117 } else {
1118 self.bytes_written as f64 / self.write_time_s / 1e6
1119 }
1120 }
1121
1122 pub fn reset(&mut self) {
1124 *self = Self::default();
1125 }
1126
1127 pub fn merge(&mut self, other: &IoCounters) {
1129 self.bytes_read += other.bytes_read;
1130 self.bytes_written += other.bytes_written;
1131 self.read_ops += other.read_ops;
1132 self.write_ops += other.write_ops;
1133 self.read_time_s += other.read_time_s;
1134 self.write_time_s += other.write_time_s;
1135 }
1136}
1137
1138#[cfg(test)]
1143mod tests {
1144 use super::*;
1145
1146 #[test]
1149 fn test_domain_box_volume() {
1150 let b = DomainBox::new([0.0, 0.0, 0.0], [2.0, 3.0, 4.0]);
1151 assert!((b.volume() - 24.0).abs() < 1e-10);
1152 }
1153
1154 #[test]
1155 fn test_domain_box_volume_zero() {
1156 let b = DomainBox::new([1.0, 1.0, 1.0], [1.0, 2.0, 2.0]);
1157 assert_eq!(b.volume(), 0.0);
1158 }
1159
1160 #[test]
1161 fn test_domain_box_centre() {
1162 let b = DomainBox::new([0.0, 0.0, 0.0], [2.0, 4.0, 6.0]);
1163 let c = b.centre();
1164 assert!((c[0] - 1.0).abs() < 1e-10);
1165 assert!((c[1] - 2.0).abs() < 1e-10);
1166 assert!((c[2] - 3.0).abs() < 1e-10);
1167 }
1168
1169 #[test]
1170 fn test_domain_box_contains() {
1171 let b = DomainBox::new([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]);
1172 assert!(b.contains([0.5, 0.5, 0.5]));
1173 assert!(!b.contains([1.5, 0.5, 0.5]));
1174 assert!(!b.contains([0.0, 0.5, 0.5])); }
1176
1177 #[test]
1178 fn test_domain_box_overlaps() {
1179 let a = DomainBox::new([0.0, 0.0, 0.0], [1.0, 1.0, 1.0]);
1180 let b = DomainBox::new([0.5, 0.5, 0.5], [1.5, 1.5, 1.5]);
1181 assert!(a.overlaps(&b));
1182 let c = DomainBox::new([2.0, 2.0, 2.0], [3.0, 3.0, 3.0]);
1183 assert!(!a.overlaps(&c));
1184 }
1185
1186 #[test]
1189 fn test_rank_domain_n_owned_cells() {
1190 let rd = RankDomain::new(
1191 0,
1192 4,
1193 DomainBox::new([0.0; 3], [1.0; 3]),
1194 [1; 6],
1195 [4, 4, 4],
1196 [0; 3],
1197 );
1198 assert_eq!(rd.n_owned_cells(), 64);
1199 }
1200
1201 #[test]
1202 fn test_rank_domain_n_total_cells() {
1203 let rd = RankDomain::new(
1204 0,
1205 1,
1206 DomainBox::new([0.0; 3], [1.0; 3]),
1207 [1, 1, 1, 1, 1, 1],
1208 [4, 4, 4],
1209 [0; 3],
1210 );
1211 assert_eq!(rd.n_total_cells(), 216);
1213 }
1214
1215 #[test]
1218 fn test_ghost_layer_n_cells() {
1219 let gl = GhostLayer {
1220 face: Face::XMinus,
1221 neighbour_rank: 1,
1222 depth: 2,
1223 tangential_cells: [8, 8],
1224 };
1225 assert_eq!(gl.n_cells(), 128);
1226 }
1227
1228 #[test]
1229 fn test_ghost_cell_meta_total() {
1230 let mut meta = GhostCellMeta::new();
1231 meta.add_layer(GhostLayer {
1232 face: Face::XPlus,
1233 neighbour_rank: 1,
1234 depth: 1,
1235 tangential_cells: [4, 4],
1236 });
1237 meta.add_layer(GhostLayer {
1238 face: Face::XMinus,
1239 neighbour_rank: 2,
1240 depth: 1,
1241 tangential_cells: [4, 4],
1242 });
1243 assert_eq!(meta.total_ghost_cells(), 32);
1244 }
1245
1246 #[test]
1247 fn test_ghost_cell_meta_neighbour_ranks() {
1248 let mut meta = GhostCellMeta::new();
1249 meta.add_layer(GhostLayer {
1250 face: Face::XPlus,
1251 neighbour_rank: 3,
1252 depth: 1,
1253 tangential_cells: [2, 2],
1254 });
1255 meta.add_layer(GhostLayer {
1256 face: Face::YMinus,
1257 neighbour_rank: 1,
1258 depth: 1,
1259 tangential_cells: [2, 2],
1260 });
1261 meta.add_layer(GhostLayer {
1262 face: Face::ZPlus,
1263 neighbour_rank: 3,
1264 depth: 1,
1265 tangential_cells: [2, 2],
1266 });
1267 let ranks = meta.neighbour_ranks();
1268 assert_eq!(ranks, vec![1, 3]);
1269 }
1270
1271 #[test]
1272 fn test_face_display() {
1273 assert_eq!(format!("{}", Face::XMinus), "XMinus");
1274 assert_eq!(format!("{}", Face::ZPlus), "ZPlus");
1275 }
1276
1277 #[test]
1280 fn test_chunk_spec_size() {
1281 let c = ChunkSpec {
1282 start: 5,
1283 end: 10,
1284 chunk_idx: 0,
1285 };
1286 assert_eq!(c.size(), 5);
1287 }
1288
1289 #[test]
1290 fn test_chunk_spec_size_zero() {
1291 let c = ChunkSpec {
1292 start: 10,
1293 end: 5,
1294 chunk_idx: 0,
1295 };
1296 assert_eq!(c.size(), 0);
1297 }
1298
1299 #[test]
1300 fn test_split_into_chunks_exact() {
1301 let chunks = split_into_chunks(12, 3);
1302 assert_eq!(chunks.len(), 3);
1303 assert_eq!(chunks[0].size(), 4);
1304 assert_eq!(chunks[1].size(), 4);
1305 assert_eq!(chunks[2].size(), 4);
1306 }
1307
1308 #[test]
1309 fn test_split_into_chunks_remainder() {
1310 let chunks = split_into_chunks(10, 3);
1311 assert_eq!(chunks.len(), 3);
1312 let total: usize = chunks.iter().map(|c| c.size()).sum();
1313 assert_eq!(total, 10);
1314 }
1315
1316 #[test]
1317 fn test_split_into_chunks_zero_total() {
1318 assert!(split_into_chunks(0, 4).is_empty());
1319 }
1320
1321 #[test]
1322 fn test_split_into_chunks_zero_n() {
1323 assert!(split_into_chunks(10, 0).is_empty());
1324 }
1325
1326 #[test]
1327 fn test_split_cover_all_elements() {
1328 for total in [1, 5, 100, 1000] {
1329 for n_chunks in [1, 3, 7, 10] {
1330 let chunks = split_into_chunks(total, n_chunks);
1331 let covered: usize = chunks.iter().map(|c| c.size()).sum();
1332 assert_eq!(covered, total, "total={total} n_chunks={n_chunks}");
1333 }
1334 }
1335 }
1336
1337 #[test]
1340 fn test_parallel_writer_progress() {
1341 let mut pw = ParallelWriter::new("test.bin", 4);
1342 pw.mark_chunk_written();
1343 pw.mark_chunk_written();
1344 assert!((pw.progress() - 0.5).abs() < 1e-10);
1345 }
1346
1347 #[test]
1348 fn test_parallel_writer_complete() {
1349 let mut pw = ParallelWriter::new("test.bin", 2);
1350 pw.mark_chunk_written();
1351 pw.mark_chunk_written();
1352 assert!(pw.is_complete());
1353 }
1354
1355 #[test]
1356 fn test_parallel_writer_no_overflow() {
1357 let mut pw = ParallelWriter::new("test.bin", 1);
1358 pw.mark_chunk_written();
1359 pw.mark_chunk_written();
1360 assert_eq!(pw.chunks_written, 1);
1361 }
1362
1363 #[test]
1364 fn test_parallel_writer_zero_chunks() {
1365 let pw = ParallelWriter::new("test.bin", 0);
1366 assert!((pw.progress() - 1.0).abs() < 1e-10);
1367 assert!(pw.is_complete());
1368 }
1369
1370 #[test]
1371 fn test_parallel_writer_reset() {
1372 let mut pw = ParallelWriter::new("f", 3);
1373 pw.mark_chunk_written();
1374 pw.mark_chunk_written();
1375 pw.reset();
1376 assert_eq!(pw.chunks_written, 0);
1377 assert!(!pw.is_complete());
1378 }
1379
1380 #[test]
1383 fn test_pvtu_xml_contains_pieces() {
1384 let pieces = vec![
1385 PvtuPiece {
1386 file: "rank0.vtu".into(),
1387 rank: 0,
1388 },
1389 PvtuPiece {
1390 file: "rank1.vtu".into(),
1391 rank: 1,
1392 },
1393 ];
1394 let xml = write_pvtu_xml(&pieces, &["velocity"], &["pressure"]);
1395 assert!(xml.contains("rank0.vtu"));
1396 assert!(xml.contains("rank1.vtu"));
1397 assert!(xml.contains("velocity"));
1398 assert!(xml.contains("pressure"));
1399 assert!(xml.contains("PUnstructuredGrid"));
1400 }
1401
1402 #[test]
1403 fn test_pvtu_xml_no_arrays() {
1404 let pieces = vec![PvtuPiece {
1405 file: "r0.vtu".into(),
1406 rank: 0,
1407 }];
1408 let xml = write_pvtu_xml(&pieces, &[], &[]);
1409 assert!(xml.contains("PUnstructuredGrid"));
1410 assert!(!xml.contains("PPointData"));
1411 assert!(!xml.contains("PCellData"));
1412 }
1413
1414 #[test]
1415 fn test_pvd_xml_entries() {
1416 let entries = vec![(0.0, "t0.pvtu"), (0.1, "t1.pvtu"), (0.2, "t2.pvtu")];
1417 let xml = write_pvd_xml(&entries);
1418 assert!(xml.contains("t0.pvtu"));
1419 assert!(xml.contains("t2.pvtu"));
1420 assert!(xml.contains("Collection"));
1421 }
1422
1423 #[test]
1426 fn test_partition_map_round_robin() {
1427 let pm = PartitionMap::round_robin(9, 3);
1428 assert_eq!(pm.cells_on_rank(0), 3);
1429 assert_eq!(pm.cells_on_rank(1), 3);
1430 assert_eq!(pm.cells_on_rank(2), 3);
1431 }
1432
1433 #[test]
1434 fn test_partition_map_imbalance_perfect() {
1435 let pm = PartitionMap::round_robin(6, 3);
1436 assert!((pm.imbalance() - 1.0).abs() < 1e-10);
1437 }
1438
1439 #[test]
1440 fn test_partition_map_imbalance_skewed() {
1441 let pm = PartitionMap::new(vec![0; 9], 3);
1443 assert!(pm.imbalance() > 1.5);
1444 }
1445
1446 #[test]
1447 fn test_partition_map_to_text_roundtrip_count() {
1448 let pm = PartitionMap::round_robin(4, 2);
1449 let text = pm.to_text();
1450 assert_eq!(text.lines().count(), 5);
1452 }
1453
1454 #[test]
1457 fn test_distributed_array_serialize_deserialize() {
1458 let arr = DistributedArray::new(0, 6, 0, vec![1.0, 2.0, 3.0]);
1459 let text = arr.serialize_text();
1460 let recovered = DistributedArray::deserialize_text(&text).unwrap();
1461 assert_eq!(recovered.rank, 0);
1462 assert_eq!(recovered.offset, 0);
1463 assert_eq!(recovered.global_len, 6);
1464 assert!((recovered.data[0] - 1.0).abs() < 1e-10);
1465 assert!((recovered.data[2] - 3.0).abs() < 1e-10);
1466 }
1467
1468 #[test]
1469 fn test_distributed_array_assemble() {
1470 let shard0 = DistributedArray::new(0, 6, 0, vec![1.0, 2.0, 3.0]);
1471 let shard1 = DistributedArray::new(1, 6, 3, vec![4.0, 5.0, 6.0]);
1472 let global = DistributedArray::assemble(&[shard0, shard1]).unwrap();
1473 assert_eq!(global.len(), 6);
1474 for (i, &v) in global.iter().enumerate() {
1475 assert!((v - (i + 1) as f64).abs() < 1e-10);
1476 }
1477 }
1478
1479 #[test]
1480 fn test_distributed_array_assemble_empty() {
1481 let result = DistributedArray::assemble(&[]).unwrap();
1482 assert!(result.is_empty());
1483 }
1484
1485 #[test]
1488 fn test_checkpoint_header_roundtrip() {
1489 let h = CheckpointHeader::new(42, 3.125, 4, 1000, "sim_v1");
1490 let line = h.to_line();
1491 let h2 = CheckpointHeader::from_line(&line).unwrap();
1492 assert_eq!(h2.step, 42);
1493 assert!((h2.time - 3.125).abs() < 1e-6);
1494 assert_eq!(h2.n_ranks, 4);
1495 assert_eq!(h2.n_items, 1000);
1496 assert_eq!(h2.tag, "sim_v1");
1497 }
1498
1499 #[test]
1500 fn test_checkpoint_header_bad_line() {
1501 assert!(CheckpointHeader::from_line("garbage").is_none());
1502 }
1503
1504 #[test]
1507 fn test_checkpoint_write_read_roundtrip() {
1508 let path = "/tmp/test_oxiphysics_ckpt.txt";
1509 let header = CheckpointHeader::new(10, 1.5, 2, 3, "test");
1510 let positions = vec![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]];
1511 write_checkpoint(path, &header, &positions).unwrap();
1512 let (h2, pos2) = read_checkpoint(path).unwrap();
1513 assert_eq!(h2.step, 10);
1514 assert_eq!(pos2.len(), 3);
1515 assert!((pos2[0][0] - 1.0).abs() < 1e-6);
1516 assert!((pos2[2][2] - 9.0).abs() < 1e-6);
1517 let _ = fs::remove_file(path);
1518 }
1519
1520 #[test]
1523 fn test_merge_chunk_files() {
1524 let base = "/tmp/test_merge_par_chunks";
1525 let output = "/tmp/test_merge_par_output.bin";
1526 for i in 0..3usize {
1527 fs::write(format!("{base}.chunk{i}"), format!("chunk{i}\n")).unwrap();
1528 }
1529 merge_chunk_files(base, 3, output).unwrap();
1530 let merged = fs::read_to_string(output).unwrap();
1531 assert!(merged.contains("chunk0"));
1532 assert!(merged.contains("chunk2"));
1533 for i in 0..3usize {
1534 let _ = fs::remove_file(format!("{base}.chunk{i}"));
1535 }
1536 let _ = fs::remove_file(output);
1537 }
1538
1539 #[test]
1540 fn test_merge_rank_files() {
1541 let base = "/tmp/test_merge_par_ranks";
1542 let output = "/tmp/test_merge_par_rank_output.bin";
1543 for r in 0..2usize {
1544 fs::write(format!("{base}.rank{r}"), format!("rank{r}\n")).unwrap();
1545 }
1546 merge_rank_files(base, 2, output).unwrap();
1547 let merged = fs::read_to_string(output).unwrap();
1548 assert!(merged.contains("rank0"));
1549 assert!(merged.contains("rank1"));
1550 for r in 0..2usize {
1551 let _ = fs::remove_file(format!("{base}.rank{r}"));
1552 }
1553 let _ = fs::remove_file(output);
1554 }
1555
1556 #[test]
1559 fn test_estimate_io_bandwidth_normal() {
1560 let bw = estimate_io_bandwidth(1_000_000, 1.0);
1561 assert!((bw - 1.0).abs() < 1e-9);
1562 }
1563
1564 #[test]
1565 fn test_estimate_io_bandwidth_zero_time() {
1566 assert_eq!(estimate_io_bandwidth(1000, 0.0), 0.0);
1567 }
1568
1569 #[test]
1570 fn test_bandwidth_large() {
1571 let bw = estimate_io_bandwidth(1_000_000_000, 0.5);
1572 assert!((bw - 2000.0).abs() < 1.0);
1573 }
1574
1575 #[test]
1578 fn test_chunked_trajectory_n_chunks() {
1579 let mut traj = ChunkedTrajectory::new(2, 3);
1580 for _ in 0..7 {
1581 traj.push_frame(vec![[0.0; 3]; 2]);
1582 }
1583 assert_eq!(traj.n_chunks(), 3);
1584 }
1585
1586 #[test]
1587 fn test_write_read_chunked_xyz_roundtrip() {
1588 let base = "/tmp/test_chunked_traj_par2";
1589 let mut traj = ChunkedTrajectory::new(2, 2);
1590 traj.push_frame(vec![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]);
1591 traj.push_frame(vec![[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]);
1592 traj.push_frame(vec![[9.0, 8.0, 7.0], [6.0, 5.0, 4.0]]);
1593 write_chunked_xyz(&traj, base).unwrap();
1594 let loaded = read_chunked_xyz(base, traj.n_chunks()).unwrap();
1595 assert_eq!(loaded.n_frames, 3);
1596 assert!((loaded.frames[0][0][0] - 1.0).abs() < 1e-4);
1597 for i in 0..traj.n_chunks() {
1598 let _ = fs::remove_file(format!("{base}.chunk{i}.xyz"));
1599 }
1600 }
1601
1602 #[test]
1605 fn test_compress_decompress_delta_roundtrip() {
1606 let frames = vec![
1607 vec![[1.0_f64, 2.0, 3.0], [4.0, 5.0, 6.0]],
1608 vec![[1.1, 2.1, 3.1], [4.2, 5.2, 6.2]],
1609 vec![[1.5, 2.5, 3.5], [5.0, 5.5, 6.5]],
1610 ];
1611 let deltas = compress_trajectory_delta(&frames);
1612 let recovered = decompress_trajectory_delta(&deltas);
1613 assert_eq!(recovered.len(), frames.len());
1614 for (f, r) in frames.iter().zip(recovered.iter()) {
1615 for (fa, ra) in f.iter().zip(r.iter()) {
1616 assert!((fa[0] - ra[0]).abs() < 1e-10);
1617 }
1618 }
1619 }
1620
1621 #[test]
1622 fn test_compress_delta_single_frame() {
1623 let frames = vec![vec![[1.0_f64, 2.0, 3.0]]];
1624 let d = compress_trajectory_delta(&frames);
1625 assert_eq!(d.len(), 1);
1626 assert!((d[0][0][0] - 1.0).abs() < 1e-10);
1627 }
1628
1629 #[test]
1632 fn test_parallel_file_layout_offsets() {
1633 let mut layout = ParallelFileLayout::new();
1634 layout.add_dataset("positions", DsetType::Float64, 3, 100);
1635 layout.add_dataset("velocity", DsetType::Float64, 3, 100);
1636 let pos = layout.find("positions").unwrap();
1637 assert_eq!(pos.byte_offset, 0);
1638 let vel = layout.find("velocity").unwrap();
1639 assert_eq!(vel.byte_offset, 2400);
1641 }
1642
1643 #[test]
1644 fn test_parallel_file_layout_total_bytes() {
1645 let mut layout = ParallelFileLayout::new();
1646 layout.add_dataset("x", DsetType::Float32, 1, 50);
1647 layout.add_dataset("y", DsetType::Uint32, 1, 50);
1648 assert_eq!(layout.total_data_bytes(), 400);
1650 }
1651
1652 #[test]
1653 fn test_dataset_type_byte_sizes() {
1654 assert_eq!(DsetType::Float64.byte_size(), 8);
1655 assert_eq!(DsetType::Float32.byte_size(), 4);
1656 assert_eq!(DsetType::Uint64.byte_size(), 8);
1657 assert_eq!(DsetType::Uint32.byte_size(), 4);
1658 }
1659
1660 #[test]
1661 fn test_dataset_header_serialise() {
1662 let h = DatasetHeader::new("temp", DsetType::Float32, 1, 256, 0);
1663 let line = h.to_header_line();
1664 assert!(line.contains("DATASET"));
1665 assert!(line.contains("temp"));
1666 assert!(line.contains("float32"));
1667 }
1668
1669 #[test]
1672 fn test_io_counters_read_bw() {
1673 let mut c = IoCounters::new();
1674 c.record_read(10_000_000, 0.01);
1675 let bw = c.avg_read_bw_mb();
1676 assert!((bw - 1000.0).abs() < 1.0);
1677 }
1678
1679 #[test]
1680 fn test_io_counters_merge() {
1681 let mut a = IoCounters::new();
1682 a.record_write(1000, 0.001);
1683 let mut b = IoCounters::new();
1684 b.record_write(2000, 0.002);
1685 a.merge(&b);
1686 assert_eq!(a.bytes_written, 3000);
1687 assert_eq!(a.write_ops, 2);
1688 }
1689
1690 #[test]
1691 fn test_io_counters_reset() {
1692 let mut c = IoCounters::new();
1693 c.record_read(999, 1.0);
1694 c.reset();
1695 assert_eq!(c.bytes_read, 0);
1696 assert_eq!(c.read_ops, 0);
1697 }
1698
1699 #[test]
1700 fn test_io_stats_format() {
1701 let s = io_stats(1024, 2048, 10);
1702 assert!(s.contains("1024"));
1703 assert!(s.contains("2048"));
1704 assert!(s.contains("10"));
1705 }
1706}