import argparse
import json
import struct
from pathlib import Path
from dataclasses import dataclass
from enum import IntFlag, CONFORM
from typing import List
class Flag(IntFlag, boundary=CONFORM):
BOUNDARY = 0
FLUID = 16
def as_rust_cell(self, inflow=None, outflow=False):
if self is Flag.FLUID:
return "Fluid"
if inflow is not None:
return {"Boundary": {"Inflow": {"velocity": inflow}}}
if outflow:
return {"Boundary": "Outflow"}
return {"Boundary": "NoSlip"}
@dataclass
class SimulationOutput:
imax: int jmax: int
U: List[List[float]] V: List[List[float]] P: List[List[float]] T: List[List[float]] flags: List[List[Flag]]
def as_rust_grid(self):
size = [self.imax + 2, self.jmax + 2]
def flatten_array(arr):
return {"v": 1, "dim": size, "data": [x for y in arr for x in y]}
new_flags = []
for x, (ux, vx, flagsx) in enumerate(zip(self.U, self.V, self.flags)):
row = []
for y, (uval, vval, flagval) in enumerate(zip(ux, vx, flagsx)):
inflow = None
outflow = False
if y > 0 and y <= self.jmax:
if x == 0:
inflow = [uval, vval]
elif x == self.imax + 1:
outflow = True
row.append(flagval.as_rust_cell(inflow=inflow, outflow=outflow))
new_flags.append(row)
return {
"size": size,
"u": flatten_array(self.U),
"v": flatten_array(self.V),
"pressure": flatten_array(self.P),
"cell_type": flatten_array(new_flags),
}
def get_args():
parser = argparse.ArgumentParser(
description="Run NaSt2D and generate test files",
)
parser.add_argument(
"--parse-outfile",
required=True,
type=Path,
help="Parse an outfile from NaSt2D",
)
parser.add_argument(
"--output",
type=Path,
help="Write the JSON output to a file instead of stdout",
)
return parser.parse_args()
def parse_array(stream, imax, jmax):
return [[next(stream) for _ in range(jmax + 2)] for _ in range(imax + 2)]
def parse_stream(int_stream, float_stream):
imax = next(int_stream)
jmax = next(int_stream)
U = parse_array(float_stream, imax, jmax)
V = parse_array(float_stream, imax, jmax)
P = parse_array(float_stream, imax, jmax)
T = parse_array(float_stream, imax, jmax)
def flag_stream():
for value in int_stream:
yield Flag(value)
flags = parse_array(flag_stream(), imax, jmax)
return SimulationOutput(
imax,
jmax,
U,
V,
P,
T,
flags,
)
def parse_out_file(file_obj, int_bytes=4, int_type="i"):
def float_stream():
while len(byte_chunk := file_obj.read(8)) == 8:
yield struct.unpack("d", byte_chunk)[0]
def int_stream():
while len(byte_chunk := file_obj.read(int_bytes)) == int_bytes:
yield struct.unpack(int_type, byte_chunk)[0]
return parse_stream(int_stream(), float_stream())
def main(args):
with open(args.parse_outfile, "rb") as f:
grid = parse_out_file(f)
if args.output:
with open(args.output, "w") as f:
json.dump(grid.as_rust_grid(), f, sort_keys=2, indent=2)
else:
print(f"{json.dumps(grid.as_rust_grid(), sort_keys=2, indent=2)}")
return 0
if __name__ == "__main__":
main(get_args())