import json
import math
import os
import sys
import numpy as np
import rebound
import assist
OBLIQUITY_J2000 = 0.40909280422 COS_EPS = math.cos(OBLIQUITY_J2000)
SIN_EPS = math.sin(OBLIQUITY_J2000)
ASSIST_FORCES_DEFAULT = 0x01 | 0x02 | 0x04 | 0x10 | 0x20 | 0x40
def ecliptic_to_equatorial(state):
x, y, z, vx, vy, vz = state
return [
x,
COS_EPS * y - SIN_EPS * z,
SIN_EPS * y + COS_EPS * z,
vx,
COS_EPS * vy - SIN_EPS * vz,
SIN_EPS * vy + COS_EPS * vz,
]
def equatorial_to_ecliptic(state):
x, y, z, vx, vy, vz = state
return [
x,
COS_EPS * y + SIN_EPS * z,
-SIN_EPS * y + COS_EPS * z,
vx,
COS_EPS * vy + SIN_EPS * vz,
-SIN_EPS * vy + COS_EPS * vz,
]
def mjd_to_assist_time(mjd_tdb, jd_ref):
return (mjd_tdb + 2400000.5) - jd_ref
def propagate_orbit(ephem, state_ecl, epoch_mjd, target_mjds, non_grav=None):
jd_ref = ephem.jd_ref
t0 = mjd_to_assist_time(epoch_mjd, jd_ref)
eq_state = ecliptic_to_equatorial(state_ecl)
sun = ephem.get_particle("sun", t0)
bary_state = [
eq_state[0] + sun.x,
eq_state[1] + sun.y,
eq_state[2] + sun.z,
eq_state[3] + sun.vx,
eq_state[4] + sun.vy,
eq_state[5] + sun.vz,
]
sim = rebound.Simulation()
sim.t = t0
extras = assist.Extras(sim, ephem)
if non_grav is not None:
extras.forces = extras.forces + ["NON_GRAVITATIONAL"]
extras.particle_params = np.array(list(non_grav), dtype=np.float64)
sim.add(
x=bary_state[0],
y=bary_state[1],
z=bary_state[2],
vx=bary_state[3],
vy=bary_state[4],
vz=bary_state[5],
m=0.0,
)
sim.N_active = 0
sim.exact_finish_time = 1
results = []
for target_mjd in target_mjds:
t_target = mjd_to_assist_time(target_mjd, jd_ref)
sim.integrate(t_target)
p = sim.particles[0]
bary_eq = [p.x, p.y, p.z, p.vx, p.vy, p.vz]
sun_t = ephem.get_particle("sun", t_target)
helio_eq = [
bary_eq[0] - sun_t.x,
bary_eq[1] - sun_t.y,
bary_eq[2] - sun_t.z,
bary_eq[3] - sun_t.vx,
bary_eq[4] - sun_t.vy,
bary_eq[5] - sun_t.vz,
]
helio_ecl = equatorial_to_ecliptic(helio_eq)
results.append({
"epoch": target_mjd,
"state": helio_ecl,
})
return results
def load_sample_orbits():
import csv
data_dir = os.path.join(os.path.dirname(__file__), "data")
csv_path = os.path.join(data_dir, "elements_sun_ec.csv")
if not os.path.exists(csv_path):
from importlib.resources import files as pkg_files
data_pkg = pkg_files("adam_core.utils.helpers.data")
csv_path = str(data_pkg.joinpath("elements_sun_ec.csv"))
orbits = []
with open(csv_path) as f:
reader = csv.DictReader(f)
for row in reader:
orbits.append({
"object_id": row["targetname"],
"epoch_mjd": float(row["mjd_tdb"]),
"state": [
float(row["x"]),
float(row["y"]),
float(row["z"]),
float(row["vx"]),
float(row["vy"]),
float(row["vz"]),
],
})
return orbits
def main():
planets_path = os.environ.get("ASSIST_PLANETS_PATH")
asteroids_path = os.environ.get("ASSIST_ASTEROIDS_PATH")
if not planets_path or not asteroids_path:
print("Error: Set ASSIST_PLANETS_PATH and ASSIST_ASTEROIDS_PATH", file=sys.stderr)
sys.exit(1)
ephem = assist.Ephem(planets_path, asteroids_path)
print(f"Loaded ephemeris: jd_ref={ephem.jd_ref}")
print(f"REBOUND version: {rebound.__version__}")
print(f"ASSIST version: {assist.__version__}")
orbits = load_sample_orbits()
print(f"Loaded {len(orbits)} sample orbits")
dt_days = 30.0
n_epochs = 10
nongrav_params = (0.0, 1e-10, 0.0)
reference_data = {
"metadata": {
"rebound_version": rebound.__version__,
"assist_version": assist.__version__,
"jd_ref": ephem.jd_ref,
"cos_eps": COS_EPS,
"sin_eps": SIN_EPS,
"propagation_days": dt_days,
"n_epochs": n_epochs,
"nongrav_params": list(nongrav_params),
},
"orbits": [],
"nongrav_orbits": [],
}
print("Gravity-only propagation:")
for i, orbit in enumerate(orbits):
epoch = orbit["epoch_mjd"]
t0 = mjd_to_assist_time(epoch, ephem.jd_ref)
t_end = mjd_to_assist_time(epoch + dt_days, ephem.jd_ref)
try:
ephem.get_particle("sun", t0)
ephem.get_particle("sun", t_end)
except Exception as e:
print(f" Skipping {orbit['object_id']}: epoch outside ephemeris range ({e})")
continue
target_epochs = [epoch + dt_days * (j + 1) / n_epochs for j in range(n_epochs)]
try:
results = propagate_orbit(ephem, orbit["state"], epoch, target_epochs)
except Exception as e:
print(f" Skipping {orbit['object_id']}: propagation failed ({e})")
continue
entry = {
"object_id": orbit["object_id"],
"epoch_mjd": epoch,
"initial_state": orbit["state"],
"propagated": results,
}
reference_data["orbits"].append(entry)
print(f" [{i+1}/{len(orbits)}] {orbit['object_id']}: {len(results)} epochs OK")
print(f"\nNon-gravitational propagation (A1, A2, A3) = {nongrav_params}:")
for i, orbit in enumerate(orbits):
epoch = orbit["epoch_mjd"]
t0 = mjd_to_assist_time(epoch, ephem.jd_ref)
t_end = mjd_to_assist_time(epoch + dt_days, ephem.jd_ref)
try:
ephem.get_particle("sun", t0)
ephem.get_particle("sun", t_end)
except Exception as e:
print(f" Skipping {orbit['object_id']}: epoch outside ephemeris range ({e})")
continue
target_epochs = [epoch + dt_days * (j + 1) / n_epochs for j in range(n_epochs)]
try:
results = propagate_orbit(
ephem, orbit["state"], epoch, target_epochs, non_grav=nongrav_params
)
except Exception as e:
print(f" Skipping {orbit['object_id']}: propagation failed ({e})")
continue
entry = {
"object_id": orbit["object_id"],
"epoch_mjd": epoch,
"initial_state": orbit["state"],
"propagated": results,
}
reference_data["nongrav_orbits"].append(entry)
print(f" [{i+1}/{len(orbits)}] {orbit['object_id']}: {len(results)} epochs OK")
output_path = os.path.join(os.path.dirname(__file__), "reference_data.json")
with open(output_path, "w") as f:
json.dump(reference_data, f, indent=2)
print(
f"\nWrote {len(reference_data['orbits'])} gravity-only orbits and "
f"{len(reference_data['nongrav_orbits'])} non-grav orbits to {output_path}"
)
if __name__ == "__main__":
main()