brahe 1.6.0

Brahe is a modern satellite dynamics library for research and engineering applications designed to be easy-to-learn, high-performance, and quick-to-deploy. The north-star of the development is enabling users to solve meaningful problems and answer questions quickly, easily, and correctly.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
"""
Comparative benchmark orchestrator CLI.

Two harnesses share this CLI:

- ``perf`` (alias: ``run`` for backward compatibility) — times each task
  with a single fixed input across N iterations. This is the legacy
  behavior, kept verbatim for the speedup-vs-OreKit chart.
- ``accuracy`` — runs each task once across a sweep of N initial
  conditions and reports the distribution of element-wise errors versus
  OreKit. Output is JSONL (see ``benchmarks.comparative.accuracy``).

Usage:
    python -m benchmarks.comparative.runner list
    python -m benchmarks.comparative.runner perf [--module coordinates] [--language python] [--iterations 100]
    python -m benchmarks.comparative.runner accuracy [--module coordinates] [--samples 100]
    python -m benchmarks.comparative.runner plot

The legacy ``run`` subcommand is preserved as an alias of ``perf`` so
existing tooling (``just bench-compare``) does not break mid-refactor.
"""

import json
import subprocess
from itertools import combinations
from pathlib import Path
from typing import Optional

import typer

from benchmarks.comparative.config import (
    DEFAULT_ITERATIONS,
    DEFAULT_SEED,
    JAVA_PROJECT_DIR,
    NYX_BINARY,
    RESULTS_DIR,
    RUST_BINARY,
    collect_system_info,
)
from benchmarks.comparative.registry import filter_tasks
from benchmarks.comparative.reporting import (
    console,
    print_accuracy_table,
    print_performance_table,
    print_task_list,
)
from benchmarks.comparative.results import BenchmarkRun, TaskResult
from benchmarks.comparative.tasks.base import BenchmarkTask

app = typer.Typer(help="Comparative benchmark framework for Brahe")

# Java/OreKit is the reference baseline — run it first, use it for speedup and accuracy.
# Display order matches the published comparison ordering: Orekit (Java baseline),
# GMAT, Basilisk, brahe-Python, brahe-Rust.
LANGUAGE_ORDER = ["java", "gmat", "basilisk", "nyx", "python", "rust"]
BASELINE_LANGUAGE = "java"


@app.command("list")
def list_tasks(
    module: Optional[str] = typer.Option(None, help="Filter by module"),
    language: Optional[str] = typer.Option(None, help="Filter by language support"),
):
    """List available benchmark tasks."""
    tasks = filter_tasks(module=module, language=language)
    print_task_list(tasks)


@app.command()
def run(
    module: Optional[str] = typer.Option(None, help="Filter by module"),
    task: Optional[str] = typer.Option(None, help="Run specific task by name"),
    language: Optional[str] = typer.Option(None, help="Run only this language"),
    iterations: int = typer.Option(DEFAULT_ITERATIONS, help="Number of iterations"),
    seed: int = typer.Option(DEFAULT_SEED, help="Random seed for parameter generation"),
    output: Optional[Path] = typer.Option(None, help="Output directory for results"),
):
    """Run comparative performance benchmarks (timing only).

    Alias: ``perf`` — see the ``perf`` command which delegates here.
    """
    tasks = filter_tasks(module=module, task_name=task)
    if not tasks:
        console.print("[red]No matching tasks found.[/red]")
        raise typer.Exit(1)

    # Determine which languages to run
    languages_to_run = [language] if language else None

    console.print(
        f"[bold]Running {len(tasks)} task(s), {iterations} iterations, seed={seed}[/bold]\n"
    )

    benchmark_run = BenchmarkRun(system_info=collect_system_info())

    # No Basilisk/GMAT pre-import dance: GMAT now runs in a subprocess
    # per task, so there is no shared Python process where Basilisk and
    # GMAT could fight over the "Spacecraft" type registration. The two
    # libraries are isolated by process boundary.

    for t in tasks:
        requested = languages_to_run or t.languages
        # Sort by LANGUAGE_ORDER to ensure baseline (java) runs first
        task_languages = sorted(
            requested,
            key=lambda lang: (
                LANGUAGE_ORDER.index(lang) if lang in LANGUAGE_ORDER else 99
            ),
        )
        console.print(f"[cyan]Task:[/cyan] {t.name}{t.description}")

        task_results: dict[str, TaskResult] = {}

        for lang in task_languages:
            if lang not in t.languages:
                console.print(f"  [yellow]Skipping {lang} (no implementation)[/yellow]")
                continue

            console.print(f"  [dim]Running {lang}...[/dim]", end=" ")
            result = _dispatch_task(t, lang, iterations, seed)

            if result:
                task_results[lang] = result
                benchmark_run.task_results.append(result)
                console.print(f"[green]mean={result.mean:.6f}s[/green]")
            else:
                console.print("[red]FAILED[/red]")

        # Compute accuracy comparisons with java as reference when available
        if BASELINE_LANGUAGE in task_results:
            for lang in task_results:
                if lang != BASELINE_LANGUAGE:
                    comparison = t.compare_results(
                        task_results[BASELINE_LANGUAGE].results,
                        task_results[lang].results,
                        BASELINE_LANGUAGE,
                        lang,
                    )
                    benchmark_run.accuracy_comparisons.append(comparison)
        else:
            # No baseline; compare all pairs
            lang_pairs = list(combinations(task_results.keys(), 2))
            for lang_a, lang_b in lang_pairs:
                comparison = t.compare_results(
                    task_results[lang_a].results,
                    task_results[lang_b].results,
                    lang_a,
                    lang_b,
                )
                benchmark_run.accuracy_comparisons.append(comparison)

        console.print()

    # Print results
    print_performance_table(benchmark_run)
    print_accuracy_table(benchmark_run)

    # Save results
    output_dir = output or RESULTS_DIR
    filepath = benchmark_run.save(output_dir)
    console.print(f"\n[dim]Results saved to {filepath}[/dim]")


@app.command()
def plot(
    results_file: Optional[Path] = typer.Option(
        None, help="Specific results file to plot"
    ),
):
    """Generate comparison charts from benchmark results."""
    from benchmarks.comparative.plotting import generate_all_plots

    if results_file:
        benchmark_run = BenchmarkRun.load(results_file)
    else:
        benchmark_run = BenchmarkRun.load_latest(RESULTS_DIR)

    if benchmark_run is None:
        console.print("[red]No benchmark results found. Run benchmarks first.[/red]")
        raise typer.Exit(1)

    paths = generate_all_plots(benchmark_run)
    for p in paths:
        console.print(f"[green]Generated:[/green] {p}")


@app.command("perf")
def perf(
    module: Optional[str] = typer.Option(None, help="Filter by module"),
    task: Optional[str] = typer.Option(None, help="Run specific task by name"),
    language: Optional[str] = typer.Option(None, help="Run only this language"),
    iterations: int = typer.Option(DEFAULT_ITERATIONS, help="Number of iterations"),
    seed: int = typer.Option(DEFAULT_SEED, help="Random seed for parameter generation"),
    output: Optional[Path] = typer.Option(None, help="Output directory for results"),
):
    """Alias for ``run``. Times each task across N iterations of one input."""
    run(
        module=module,
        task=task,
        language=language,
        iterations=iterations,
        seed=seed,
        output=output,
    )


@app.command("accuracy")
def accuracy_cmd(
    module: Optional[str] = typer.Option(None, help="Filter by module"),
    task: Optional[str] = typer.Option(None, help="Run specific task by name"),
    samples: int = typer.Option(
        100, help="Initial-condition samples per task (default 100)"
    ),
    seed: int = typer.Option(DEFAULT_SEED, help="Random seed for sample generation"),
    output: Optional[Path] = typer.Option(None, help="Output directory for results"),
    quick: bool = typer.Option(
        False, help="Smoke mode: 5 samples per task regardless of --samples"
    ),
):
    """Sweep initial conditions and compare every language to OreKit.

    Writes JSONL — one summary record per (task, comparison-language) plus
    one detail record per (task, comparison-language, sample) — to
    ``benchmarks/comparative/results/accuracy_<timestamp>.jsonl`` and the
    canonical ``accuracy_latest.jsonl``.
    """
    from benchmarks.comparative.accuracy import run_accuracy

    run_accuracy(
        module=module,
        task=task,
        samples=samples,
        seed=seed,
        output=output,
        quick=quick,
    )


def _dispatch_task(
    task: BenchmarkTask,
    language: str,
    iterations: int,
    seed: int,
) -> TaskResult | None:
    """Dispatch a task to the appropriate language implementation.

    GMAT goes through the subprocess path (like Java and Rust) rather than
    the in-process import. Running GMAT in-process accumulates C++ library
    state across tasks and reliably segfaults on
    ``force_model.accel_point_mass_gravity`` after a long sequence of
    other tasks. A fresh subprocess per task guarantees clean state.
    """
    if language == "python":
        return _run_python(task, iterations, seed)
    elif language == "rust":
        return _run_subprocess(task, language, iterations, seed, _get_rust_command())
    elif language == "nyx":
        return _run_subprocess(task, language, iterations, seed, _get_nyx_command())
    elif language == "java":
        return _run_subprocess(task, language, iterations, seed, _get_java_command())
    elif language == "basilisk":
        return _run_basilisk(task, iterations, seed)
    elif language == "gmat":
        return _run_subprocess(task, language, iterations, seed, _get_gmat_command())
    return None


def _run_python(
    task: BenchmarkTask,
    iterations: int,
    seed: int,
) -> TaskResult | None:
    """Run a benchmark task using the Python brahe implementation."""
    from benchmarks.comparative.implementations.python import dispatch

    try:
        input_data = task.to_input_json(iterations, seed)
        return dispatch(input_data)
    except Exception as e:
        console.print(f"    [red]Error: {e}[/red]")
        return None


def _run_basilisk(
    task: BenchmarkTask,
    iterations: int,
    seed: int,
) -> TaskResult | None:
    """Run a benchmark task using Basilisk (bsk) in-process."""
    try:
        from benchmarks.comparative.implementations.basilisk import dispatch
    except ImportError:
        console.print(
            "    [yellow]Basilisk not installed. Run: just bench-compare-setup[/yellow]"
        )
        return None

    try:
        input_data = task.to_input_json(iterations, seed)
        return dispatch(input_data)
    except Exception as e:
        console.print(f"    [red]Error: {e}[/red]")
        return None


def _get_gmat_command() -> list[str] | None:
    """Return the command used to spawn a GMAT subprocess.

    Returns None if ``GMAT_ROOT_PATH`` is unset (so the runner can render
    the usual "GMAT not ready" skip line consistent with the legacy
    in-process behavior). The actual ``gmatpy`` import — which can raise
    cleanly inside the subprocess — is deferred to the child.
    """
    import os
    import sys

    if not os.environ.get("GMAT_ROOT_PATH"):
        return None
    return [sys.executable, "-m", "benchmarks.comparative.implementations.gmat"]


def _get_nyx_command() -> list[str] | None:
    """Return the command to invoke the Nyx benchmark binary, or None if not built."""
    if not NYX_BINARY.exists():
        return None
    return [str(NYX_BINARY)]


def _get_rust_command() -> list[str] | None:
    """Get the Rust benchmark binary command, or None if not built."""
    if RUST_BINARY.exists():
        return [str(RUST_BINARY)]
    return None


def _get_java_command() -> list[str] | None:
    """Get the Java benchmark command, or None if not built.

    Uses the pre-extracted Gradle distribution binary
    (``build/bench-comparative/bin/bench-comparative``) so that running
    benchmarks never requires network access or a Gradle daemon write to
    ``~/.gradle/``.  The ``gradlew run`` path is kept as a fallback for
    environments where the distribution has not been extracted yet.
    """
    import os
    import shutil

    # Prefer the pre-extracted application distribution (no Gradle required).
    dist_binary = JAVA_PROJECT_DIR / "build" / "bench-comparative" / "bin" / "bench-comparative"
    if dist_binary.exists():
        # Resolve JAVA_HOME so the startup script finds java.
        _ensure_java_home()
        return [str(dist_binary)]

    # Fallback: use gradlew (requires Gradle download / daemon on first run).
    gradlew = JAVA_PROJECT_DIR / "gradlew"
    if not gradlew.exists():
        return None

    build_dir = JAVA_PROJECT_DIR / "build"
    if not build_dir.exists():
        return None

    _ensure_java_home()
    return [
        str(gradlew),
        "-p",
        str(JAVA_PROJECT_DIR),
        "--quiet",
        "run",
    ]


def _ensure_java_home() -> None:
    """Set JAVA_HOME (and prepend bin to PATH) if the system java shim is broken.

    The macOS shim at ``/usr/bin/java`` fails with exit-code 1 when no JDK
    is installed via the Apple mechanism, even though Homebrew's openjdk is
    present.  The generated Gradle startup script honours ``$JAVA_HOME``, so
    setting it here before spawning subprocesses is sufficient.
    """
    import os
    import shutil
    import subprocess

    # Already set and working — nothing to do.
    if os.environ.get("JAVA_HOME"):
        return

    # Quick check: does the current java resolve correctly?
    try:
        r = subprocess.run(["java", "-version"], capture_output=True, timeout=5)
        if r.returncode == 0:
            return
    except (FileNotFoundError, subprocess.TimeoutExpired):
        pass

    # Scan Homebrew candidate paths.
    candidates = [
        "/opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home",
        "/usr/local/opt/openjdk/libexec/openjdk.jdk/Contents/Home",
    ]
    for candidate in candidates:
        java_bin = os.path.join(candidate, "bin", "java")
        if os.path.isfile(java_bin) and os.access(java_bin, os.X_OK):
            os.environ["JAVA_HOME"] = candidate
            os.environ["PATH"] = f"{candidate}/bin:{os.environ.get('PATH', '')}"
            return


def _run_subprocess(
    task: BenchmarkTask,
    language: str,
    iterations: int,
    seed: int,
    command: list[str] | None,
) -> TaskResult | None:
    """Run a benchmark task via subprocess with JSON protocol."""
    if command is None:
        console.print(
            f"    [yellow]{language} not ready. Run: just bench-compare-setup[/yellow]"
        )
        return None

    input_data = task.to_input_json(iterations, seed)
    input_json = json.dumps(input_data)
    task_timeout = task.timeout

    try:
        result = subprocess.run(
            command,
            input=input_json,
            capture_output=True,
            text=True,
            timeout=task_timeout,
        )

        if result.returncode == 2 and language == "gmat":
            # GMAT subprocess uses exit 2 for "not configured" — render the
            # same skip line as the legacy in-process path so the runner
            # behaves identically when GMAT_ROOT_PATH is unset or invalid.
            console.print(
                f"    [yellow]GMAT not ready ({result.stderr.strip()[:200]}). "
                f"Set GMAT_ROOT_PATH and run: just bench-compare-setup[/yellow]"
            )
            return None
        if result.returncode != 0:
            console.print(f"    [red]Subprocess error: {result.stderr[:200]}[/red]")
            return None

        output = json.loads(result.stdout)
        return TaskResult(
            task_name=output["task"],
            language=output["metadata"]["language"],
            library=output["metadata"]["library"],
            iterations=output["iterations"],
            times_seconds=output["times_seconds"],
            results=output["results"],
            metadata=output["metadata"],
        )
    except subprocess.TimeoutExpired:
        console.print(f"    [red]Timeout after {task_timeout}s[/red]")
        return None
    except (json.JSONDecodeError, KeyError) as e:
        console.print(f"    [red]Protocol error: {e}[/red]")
        return None


def main():
    app()


if __name__ == "__main__":
    main()