import gc
import os
from pathlib import Path
from timeit import Timer, timeit
import numpy as np
from numpy.typing import NDArray
from scipy.interpolate import RegularGridInterpolator, RectBivariateSpline
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from interpn import (
MultilinearRectilinear,
MultilinearRegular,
MulticubicRegular,
MulticubicRectilinear,
NearestRegular,
NearestRectilinear,
interpn as interpn_fn,
)
RUN_INTERPN_ONLY = os.environ.get("INTERPNPY_INTERPN_ONLY", "").lower() in {
"1",
"true",
"yes",
}
TARGET_SAMPLE_SECONDS = 2.0
MAX_TIMER_LOOPS = 1_000_000_000
def average_call_time(
func, points, target_seconds: float = TARGET_SAMPLE_SECONDS
) -> float:
timer = Timer(lambda: func(points))
gc.collect()
calibrated_loops, total = timer.autorange()
avg = total / calibrated_loops if total else 0.0
fallback_loops = max(1, min(MAX_TIMER_LOOPS, calibrated_loops))
if avg == 0.0:
iterations = fallback_loops
else:
iterations = max(1, min(MAX_TIMER_LOOPS, int(target_seconds / avg) or 1))
gc.collect()
total = timer.timeit(iterations)
return total / iterations
DASH_STYLES = ["solid", "dash", "dot", "dashdot", "longdash", "longdashdot"]
THREAD_SPEEDUP_COLORS = {
"linear": "#1f77b4",
"cubic": "#ff7f0e",
"nearest": "#2ca02c",
}
def _normalized_line_style(index: int) -> str:
return DASH_STYLES[index % len(DASH_STYLES)]
def fill_between(
fig: go.Figure,
*,
x: NDArray,
upper: NDArray,
lower: NDArray,
row: int,
col: int,
fillcolor: str = "rgba(139, 196, 59, 0.25)",
) -> None:
clamped_upper = np.maximum(upper, lower)
assert np.all(clamped_upper >= lower)
fig.add_trace(
go.Scatter(
x=np.concatenate([x, x[::-1]]),
y=np.concatenate([clamped_upper, lower[::-1]]),
mode="lines",
line=dict(width=0),
fill="toself",
fillcolor=fillcolor,
showlegend=False,
hoverinfo="skip",
),
row=row,
col=col,
)
def _plot_normalized_vs_nobs(
*,
ns: list[int],
throughputs: dict[str, list[float]],
kinds: dict[str, str],
title: str,
output_path: Path,
) -> None:
for kind in ["Linear", "Cubic"]:
baseline_vals = throughputs.get(f"Scipy RegularGridInterpolator {kind}")
if not baseline_vals:
continue
baseline_arr = np.array(baseline_vals)
series = [
(name, np.array(values))
for name, values in throughputs.items()
if kinds.get(name) == kind and values
]
fig = make_subplots(rows=1, cols=1)
for idx, (label, values) in enumerate(series):
min_len = min(len(values), len(baseline_arr))
if min_len == 0:
continue
x_vals = np.array(ns[:min_len])
ratios = values[:min_len] / baseline_arr[:min_len]
is_baseline = label.startswith("Scipy RegularGridInterpolator")
legend_name = "Scipy" if is_baseline else "InterpN"
showlegend = True
for trace in fig.data:
if trace.name == legend_name:
showlegend = False
break
fig.add_trace(
go.Scatter(
x=x_vals,
y=np.ones_like(ratios) if is_baseline else ratios,
mode="lines",
line=dict(color="black", width=2, dash=_normalized_line_style(idx)),
marker=dict(
symbol="square"
if label.lower().startswith("scipy")
else "circle",
size=8,
),
opacity=1.0,
name=legend_name,
showlegend=showlegend,
)
)
if not is_baseline and label.startswith("InterpN"):
ones = np.ones_like(ratios)
upper = np.maximum(ratios, ones)
lower = np.minimum(ratios, ones)
fill_between(
fig,
x=x_vals,
upper=upper,
lower=lower,
row=1,
col=1,
)
fig.update_xaxes(
type="log",
title_text="Number of Observation Points",
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_yaxes(
title_text="Normalized Throughput",
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_layout(
title=dict(text=f"{title}, {kind}", y=0.97, yanchor="top"),
height=400,
margin=dict(t=60, l=60, r=60, b=110),
legend=dict(
orientation="h",
yanchor="top",
y=1.0,
x=1.0,
xanchor="right",
),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="black"),
)
per_kind_output = output_path.with_stem(output_path.stem + f"_{kind.lower()}")
fig.write_image(str(per_kind_output))
fig.write_html(
str(per_kind_output.with_suffix(".html")),
include_plotlyjs="cdn",
full_html=False,
)
fig.show()
def _plot_throughput_vs_dims(
*,
ndims_to_test: list[int],
throughputs: dict[str, list[float]],
kinds: dict[str, str],
nobs: int,
output_path: Path,
) -> None:
fig = make_subplots(
rows=2,
cols=1,
subplot_titles=["Linear", "Cubic"],
shared_yaxes=True,
horizontal_spacing=0.08,
)
special_x = {
"Scipy RectBivariateSpline Cubic": [2],
"Numpy Interp": [1],
}
for row, kind in enumerate(["Linear", "Cubic"], start=1):
series = [
(name, values)
for name, values in throughputs.items()
if kinds.get(name) == kind and values
]
if not series:
continue
max_throughput = max(max(values) for _, values in series if values) or 1.0
baseline_vals = throughputs.get(f"Scipy RegularGridInterpolator {kind}")
interpn_series = [
values
for name, values in throughputs.items()
if name.startswith("InterpN") and f"Multi{kind.lower()}" in name and values
]
v = max(interpn_series, key=lambda vals: vals[-1])
baseline_norm = np.array(baseline_vals) / max_throughput
fill_between(
fig,
x=np.array(ndims_to_test),
upper=np.array(v) / max_throughput,
lower=baseline_norm,
row=row,
col=1,
)
for idx, (label, values) in enumerate(series):
normalized = np.array(values) / max_throughput
if not len(normalized):
continue
x_vals = special_x.get(label, ndims_to_test[: len(normalized)])
is_baseline = label.lower().startswith("scipy") or label.lower().startswith(
"numpy"
)
fig.add_trace(
go.Scatter(
x=x_vals,
y=normalized,
mode="lines+markers" if is_baseline else "lines",
line=dict(color="black", width=2, dash=_normalized_line_style(idx)),
marker=dict(symbol="square" if is_baseline else "circle", size=8),
opacity=1.0 if is_baseline else 1.0,
name=label,
showlegend=True,
legendgroup=kind,
legendgrouptitle_text=kind,
),
row=row,
col=1,
)
fig.update_xaxes(
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_xaxes(
title_text="Number of Dimensions",
row=2,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_yaxes(
type="log",
title_text="Normalized Throughput",
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_yaxes(
type="log",
row=2,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_layout(
title=dict(
text=f"Interpolation on 4x...x4 N-Dimensional Grid<br>{nobs} Observation Point{'s' if nobs > 1 else ''}",
y=0.97,
yanchor="top",
),
height=450,
margin=dict(t=80, l=60, r=200, b=90),
legend=dict(
orientation="v",
yanchor="top",
y=1.0,
x=1.02,
xanchor="left",
),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="black"),
)
fig.write_image(str(output_path))
fig.write_html(
str(output_path.with_suffix(".html")), include_plotlyjs="cdn", full_html=False
)
fig.show()
def _plot_speedup_vs_dims(
*,
ndims_to_test: list[int],
throughputs: dict[str, list[float]],
kinds: dict[str, str],
nobs: int,
output_dir: Path,
) -> None:
for kind in ["Linear", "Cubic"]:
fig = make_subplots(
rows=1,
cols=1,
horizontal_spacing=0.05,
)
baseline = throughputs[f"Scipy RegularGridInterpolator {kind}"]
baseline = np.array(baseline)
for idx, (name, values) in enumerate(throughputs.items()):
if not name.startswith("InterpN") or kinds.get(name) != kind or not values:
continue
if "Nearest" in name:
continue
if "Rectilinear" in name:
continue
vals = np.array(values)
min_len = min(len(vals), len(baseline))
if min_len == 0:
continue
x_vals = ndims_to_test[:min_len]
speedup = vals[:min_len] / baseline[:min_len]
fig.add_trace(
go.Scatter(
x=x_vals,
y=speedup,
mode="lines",
line=dict(color="black", width=3),
marker=dict(symbol="circle", size=8),
name=name,
showlegend=False,
),
row=1,
col=1,
)
ones = np.ones_like(speedup)
upper = np.maximum(speedup, ones)
fill_between(
fig,
x=np.array(x_vals),
upper=upper,
lower=ones,
row=1,
col=1,
)
fig.add_hline(
y=1.0,
line=dict(color="gray", dash="dot"),
row=1,
col=1,
)
fig.update_xaxes(
title_text="Number of Dimensions",
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_yaxes(
title_text="Speedup vs. Scipy",
row=1,
col=1,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_layout(
title=dict(
text=f"InterpN Speedup vs. Scipy<br>{kind}, {nobs} Observation Point{'s' if nobs > 1 else ''}",
y=0.97,
yanchor="top",
),
height=450,
margin=dict(t=60, l=60, r=20, b=90),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="black"),
)
output_path = output_dir / f"speedup_vs_dims_{nobs}_obs_{kind.lower()}.svg"
fig.write_image(str(output_path))
fig.write_html(
str(output_path.with_suffix(".html")),
include_plotlyjs="cdn",
full_html=False,
)
fig.show()
def _thread_counts() -> list[int]:
max_threads = os.cpu_count() or 1
max_threads = max(int(max_threads / 2), 1) counts = []
threads = 1
while threads < max_threads:
counts.append(threads)
threads *= 2
counts.append(max_threads)
return sorted(set(counts))
def _plot_speedup_vs_threads(
*,
thread_counts: list[int],
speedups: dict[str, dict[str, list[float]]],
nobs: int,
output_path: Path,
) -> None:
fig = make_subplots(rows=1, cols=1)
dash_styles = [
_normalized_line_style(i)
for i in range(len(["linear", "cubic", "nearest"]) * 2)
]
all_values = []
thread_arr = np.array(thread_counts)
series: list[tuple[str, np.ndarray]] = []
for grid_kind in ["regular", "rectilinear"]:
for method in ["linear", "cubic", "nearest"]:
values = speedups[grid_kind].get(method)
if not values:
continue
values_arr = np.array(values, dtype=float)
all_values.append(values_arr)
series.append((f"{method.title()} {grid_kind}", values_arr))
for _, values_arr in series:
ones = np.ones_like(values_arr)
fill_between(
fig,
x=thread_arr,
upper=np.maximum(values_arr, ones),
lower=np.minimum(values_arr, ones),
row=1,
col=1,
fillcolor="rgba(139, 196, 59, 0.25)",
)
for idx, (label, values_arr) in enumerate(series):
fig.add_trace(
go.Scatter(
x=thread_arr,
y=values_arr,
mode="lines+markers",
name=label,
line=dict(
color="black",
width=2,
dash=dash_styles[idx],
),
marker=dict(size=7, color="black"),
showlegend=False,
),
row=1,
col=1,
)
fig.add_hline(
y=1.0,
line=dict(color="black", dash="dot", width=1),
row=1,
col=1,
)
y_min = 1.0
if all_values:
y_min = min(1.0, min(values.min() for values in all_values))
y_max = max(thread_counts) if thread_counts else 1
fig.update_xaxes(
title_text="Threads",
row=1,
col=1,
range=[min(thread_counts), max(thread_counts)] if thread_counts else None,
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_yaxes(
title_text="Speedup vs. 1 Thread",
row=1,
col=1,
range=[y_min, y_max],
showline=True,
linecolor="black",
linewidth=1,
mirror=True,
ticks="outside",
tickcolor="black",
showgrid=False,
zeroline=False,
)
fig.update_layout(
title=dict(
text=f"InterpN Thread Speedup ({nobs} Observation Points)",
y=0.98,
yanchor="top",
),
height=430,
margin=dict(t=70, l=60, r=40, b=80),
showlegend=False,
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="black"),
)
fig.write_image(str(output_path))
fig.write_html(
str(output_path.with_suffix(".html")),
include_plotlyjs="cdn",
full_html=False,
)
fig.show()
def bench_thread_speedup_vs_threads():
nobs = 10_000_000
ndims = 2
ngrid = int(10e6**0.5)
rng = np.random.default_rng(17)
thread_counts = _thread_counts()
speedups: dict[str, dict[str, list[float]]] = {
"regular": {},
"rectilinear": {},
}
def make_grids(kind: str) -> list[NDArray]:
base = np.linspace(-1.0, 1.0, ngrid)
if kind == "regular":
return [base for _ in range(ndims)]
warped = base**3
return [warped for _ in range(ndims)]
for grid_kind in ["regular", "rectilinear"]:
grids = make_grids(grid_kind)
mesh = np.meshgrid(*grids, indexing="ij")
vals = (mesh[0] + 2.0 * mesh[1]).astype(np.float64)
vals_flat = vals.ravel()
obs = []
for grid in grids:
lo = grid[0]
hi = grid[-1]
span = hi - lo
obs.append(
rng.uniform(lo + 0.05 * span, hi - 0.05 * span, size=nobs).astype(
np.float64
)
)
out = np.zeros_like(obs[0])
for method in ["linear", "cubic", "nearest"]:
timings = []
for threads in thread_counts:
timed = average_call_time(
lambda points, threads=threads: interpn_fn(
obs=points,
grids=grids,
vals=vals_flat,
method=method,
out=out,
linearize_extrapolation=False,
grid_kind=grid_kind,
max_threads=threads,
),
obs,
)
timings.append(timed)
baseline = timings[0]
speedups[grid_kind][method] = [baseline / t if t else 0.0 for t in timings]
output_path = Path(__file__).parent / f"../docs/speedup_vs_threads_{nobs}_obs.svg"
_plot_speedup_vs_threads(
thread_counts=thread_counts,
speedups=speedups,
nobs=nobs,
output_path=output_path,
)
def bench_4_dims_1_obs():
nbench = 30 preallocate = False ndims = 4 ngrid = 20 nobs = 1 m = max(int(float(nobs) ** (1.0 / ndims) + 2), 2)
grids = [np.linspace(-1.0, 1.0, ngrid) for _ in range(ndims)]
xgrid = np.meshgrid(*grids, indexing="ij")
zgrid = np.random.uniform(-1.0, 1.0, xgrid[0].size)
dims = [x.size for x in grids]
starts = np.array([x[0] for x in grids])
steps = np.array([x[1] - x[0] for x in grids])
obsgrid = np.meshgrid(
*[np.linspace(-0.99, 0.99, m) for _ in range(ndims)], indexing="ij"
)
obsgrid = [x.flatten()[0:nobs] for x in obsgrid]
rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None
)
cubic_rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None, method="cubic"
)
rectilinear_interpn = MultilinearRectilinear.new(grids, zgrid)
regular_interpn = MultilinearRegular.new(dims, starts, steps, zgrid)
cubic_regular_interpn = MulticubicRegular.new(
dims, starts, steps, zgrid, linearize_extrapolation=True
)
cubic_rectilinear_interpn = MulticubicRectilinear.new(
grids, zgrid, linearize_extrapolation=True
)
nearest_regular_interpn = NearestRegular.new(dims, starts, steps, zgrid)
nearest_rectilinear_interpn = NearestRectilinear.new(grids, zgrid)
out = None if not preallocate else np.zeros_like(obsgrid[0].flatten())
interps = {
"Scipy RegularGridInterpolator Linear": rectilinear_sp,
"Scipy RegularGridInterpolator Cubic": cubic_rectilinear_sp,
"InterpN MultilinearRegular": lambda p: regular_interpn.eval(p, out),
"InterpN MultilinearRectilinear": lambda p: rectilinear_interpn.eval(p, out),
"InterpN MulticubicRegular": lambda p: cubic_regular_interpn.eval(p, out),
"InterpN MulticubicRectilinear": lambda p: cubic_rectilinear_interpn.eval(
p, out
),
"InterpN NearestRegular": lambda p: nearest_regular_interpn.eval(p, out),
"InterpN NearestRectilinear": lambda p: nearest_rectilinear_interpn.eval(
p, out
),
"numpy interp": lambda p: np.interp(p[0], grids[0], zgrid), }
points_interpn = [x.flatten() for x in obsgrid]
points_sp = np.array(points_interpn).T
points = {
"Scipy RegularGridInterpolator Linear": points_sp,
"Scipy RegularGridInterpolator Cubic": points_sp,
"InterpN MultilinearRegular": points_interpn,
"InterpN MultilinearRectilinear": points_interpn,
"InterpN MulticubicRegular": points_interpn,
"InterpN MulticubicRectilinear": points_interpn,
"InterpN NearestRegular": points_interpn,
"InterpN NearestRectilinear": points_interpn,
"numpy interp": points_interpn,
}
print("\nInterpolation in sequential order")
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
if name == "numpy interp" and ndims > 1:
continue
p = points[name]
timeit(lambda: func(p), number=nbench) t = timeit(lambda: func(p), number=nbench) / nbench
throughput = nobs / t
print(f"\n---- {ndims} Dims")
print(f"Method: {name}")
print(f"Time {t:.2e} s")
print(f"Throughput {throughput:.2e} #/s")
points_interpn1 = [np.random.permutation(x.flatten()) for x in obsgrid]
points_sp1 = np.array(points_interpn1).T
points1 = {
"Scipy RegularGridInterpolator Linear": points_sp1,
"Scipy RegularGridInterpolator Cubic": points_sp1,
"InterpN MultilinearRegular": points_interpn1,
"InterpN MultilinearRectilinear": points_interpn1,
"InterpN MulticubicRegular": points_interpn1,
"InterpN MulticubicRectilinear": points_interpn1,
"InterpN NearestRegular": points_interpn1,
"InterpN NearestRectilinear": points_interpn1,
"numpy interp": points_interpn1,
}
print("\nInterpolation in random order")
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
if name == "numpy interp" and ndims > 1:
continue
p = points1[name]
timeit(lambda: func(p), number=nbench) t = timeit(lambda: func(p), number=nbench) / nbench
throughput = nobs / t
print(f"\n---- {ndims} Dims")
print(f"Method: {name}")
print(f"Time {t:.2e} s")
print(f"Throughput {throughput:.2e} #/s")
points_interpn2 = [np.random.permutation(x.flatten()) + 3.0 for x in obsgrid]
points_sp2 = np.array(points_interpn2).T
points2 = {
"Scipy RegularGridInterpolator Linear": points_sp2,
"Scipy RegularGridInterpolator Cubic": points_sp2,
"InterpN MultilinearRegular": points_interpn2,
"InterpN MultilinearRectilinear": points_interpn2,
"InterpN MulticubicRegular": points_interpn2,
"InterpN MulticubicRectilinear": points_interpn2,
"InterpN NearestRegular": points_interpn2,
"InterpN NearestRectilinear": points_interpn2,
"numpy interp": points_interpn2,
}
print("\nExtrapolation to corner region in random order")
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
if name == "numpy interp" and ndims > 1:
continue
p = points2[name]
timeit(lambda: func(p), number=nbench) t = timeit(lambda: func(p), number=nbench) / nbench
throughput = nobs / t
print(f"\n---- {ndims} Dims")
print(f"Method: {name}")
print(f"Time {t:.2e} s")
print(f"Throughput {throughput:.2e} #/s")
points_interpn3 = [
np.random.permutation(x.flatten()) + (3.0 if i == 0 else 0.0)
for i, x in enumerate(obsgrid)
]
points_sp3 = np.array(points_interpn).T
points3 = {
"Scipy RegularGridInterpolator Linear": points_sp3,
"Scipy RegularGridInterpolator Cubic": points_sp3,
"InterpN MultilinearRegular": points_interpn3,
"InterpN MultilinearRectilinear": points_interpn3,
"InterpN MulticubicRegular": points_interpn3,
"InterpN MulticubicRectilinear": points_interpn3,
"InterpN NearestRegular": points_interpn3,
"InterpN NearestRectilinear": points_interpn3,
"numpy interp": points_interpn3,
}
print("\nExtrapolation to side region in random order")
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
if name == "numpy interp" and ndims > 1:
continue
p = points3[name]
t = timeit(lambda: func(p), number=nbench) / nbench
throughput = nobs / t
print(f"\n---- {ndims} Dims")
print(f"Method: {name}")
print(f"Time {t:.2e} s")
print(f"Throughput {throughput:.2e} #/s")
def bench_3_dims_n_obs_unordered():
for preallocate in [True, False]:
ndims = 3 ngrid = 20
grids = [np.linspace(-1.0, 1.0, ngrid) for _ in range(ndims)]
xgrid = np.meshgrid(*grids, indexing="ij")
zgrid = np.random.uniform(-1.0, 1.0, xgrid[0].size)
dims = [x.size for x in grids]
starts = np.array([x[0] for x in grids])
steps = np.array([x[1] - x[0] for x in grids])
rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None
)
cubic_rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None, method="cubic"
)
rectilinear_interpn = MultilinearRectilinear.new(grids, zgrid)
regular_interpn = MultilinearRegular.new(dims, starts, steps, zgrid)
cubic_regular_interpn = MulticubicRegular.new(
dims, starts, steps, zgrid, linearize_extrapolation=True
)
cubic_rectilinear_interpn = MulticubicRectilinear.new(
grids, zgrid, linearize_extrapolation=True
)
nearest_regular_interpn = NearestRegular.new(dims, starts, steps, zgrid)
nearest_rectilinear_interpn = NearestRectilinear.new(grids, zgrid)
throughputs = {
"Scipy RegularGridInterpolator Linear": [],
"Scipy RegularGridInterpolator Cubic": [],
"InterpN MultilinearRegular": [],
"InterpN MultilinearRectilinear": [],
"InterpN MulticubicRegular": [],
"InterpN MulticubicRectilinear": [],
"InterpN NearestRegular": [],
"InterpN NearestRectilinear": [],
}
ns = [1, 10, 50, 100, 500, 1000, 10000]
print("\nThroughput plotting")
print(ns)
for nobs in ns:
print(nobs)
m = max(int(float(nobs) ** (1.0 / ndims) + 2), 2)
obsgrid = np.meshgrid(
*[np.linspace(-0.99, 0.99, m) for _ in range(ndims)], indexing="ij"
)
obsgrid = [
x.flatten()[0:nobs] for x in obsgrid
]
out = None if not preallocate else np.zeros_like(obsgrid[0].flatten())
interps = {
"Scipy RegularGridInterpolator Linear": rectilinear_sp,
"Scipy RegularGridInterpolator Cubic": cubic_rectilinear_sp,
"InterpN MultilinearRegular": lambda p: regular_interpn.eval(p, out),
"InterpN MultilinearRectilinear": lambda p: rectilinear_interpn.eval(
p, out
),
"InterpN MulticubicRegular": lambda p: cubic_regular_interpn.eval(
p, out
),
"InterpN MulticubicRectilinear": lambda p: cubic_rectilinear_interpn.eval(
p, out
),
"InterpN NearestRegular": lambda p: nearest_regular_interpn.eval(
p, out
),
"InterpN NearestRectilinear": lambda p: nearest_rectilinear_interpn.eval(
p, out
),
}
points_interpn = [np.random.permutation(x.flatten()) for x in obsgrid]
points_sp = np.array(points_interpn).T
points = {
"Scipy RegularGridInterpolator Linear": points_sp,
"Scipy RegularGridInterpolator Cubic": points_sp,
"InterpN MultilinearRegular": points_interpn,
"InterpN MultilinearRectilinear": points_interpn,
"InterpN MulticubicRegular": points_interpn,
"InterpN MulticubicRectilinear": points_interpn,
"InterpN NearestRegular": points_interpn,
"InterpN NearestRectilinear": points_interpn,
}
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
if "cubic" in name.lower() and nobs > 10000:
continue
p = points[name]
avg_time = average_call_time(func, p)
throughput = nobs / avg_time
throughputs[name].append(throughput)
kinds = {
"Scipy RegularGridInterpolator Linear": "Linear",
"Scipy RegularGridInterpolator Cubic": "Cubic",
"InterpN MultilinearRegular": "Linear",
"InterpN MultilinearRectilinear": "Linear",
"InterpN MulticubicRegular": "Cubic",
"InterpN MulticubicRectilinear": "Cubic",
"InterpN NearestRegular": "Linear",
"InterpN NearestRectilinear": "Linear",
}
with_alloc_string = "_prealloc" if preallocate else ""
suffix = (
"With Preallocated Output" if preallocate else "Without Preallocated Output"
)
figure_title = f"Interpolation on 20x20x20 Grid<br>{suffix}"
output_path = (
Path(__file__).parent
/ f"../docs/3d_throughput_vs_nobs{with_alloc_string}.svg"
)
_plot_normalized_vs_nobs(
ns=ns,
throughputs=throughputs,
kinds=kinds,
title=figure_title,
output_path=output_path,
)
def bench_4_dims_n_obs_unordered():
for preallocate in [True, False]:
ndims = 4 ngrid = 20
grids = [np.linspace(-1.0, 1.0, ngrid) for _ in range(ndims)]
xgrid = np.meshgrid(*grids, indexing="ij")
zgrid = np.random.uniform(-1.0, 1.0, xgrid[0].size)
dims = [x.size for x in grids]
starts = np.array([x[0] for x in grids])
steps = np.array([x[1] - x[0] for x in grids])
rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None
)
cubic_rectilinear_sp = RegularGridInterpolator(
grids, zgrid.reshape(xgrid[0].shape), bounds_error=None, method="cubic"
)
rectilinear_interpn = MultilinearRectilinear.new(grids, zgrid)
regular_interpn = MultilinearRegular.new(dims, starts, steps, zgrid)
cubic_regular_interpn = MulticubicRegular.new(
dims, starts, steps, zgrid, linearize_extrapolation=True
)
cubic_rectilinear_interpn = MulticubicRectilinear.new(
grids, zgrid, linearize_extrapolation=True
)
nearest_regular_interpn = NearestRegular.new(dims, starts, steps, zgrid)
nearest_rectilinear_interpn = NearestRectilinear.new(grids, zgrid)
throughputs = {
"Scipy RegularGridInterpolator Linear": [],
"Scipy RegularGridInterpolator Cubic": [],
"InterpN MultilinearRegular": [],
"InterpN MultilinearRectilinear": [],
"InterpN MulticubicRegular": [],
"InterpN MulticubicRectilinear": [],
"InterpN NearestRegular": [],
"InterpN NearestRectilinear": [],
}
ns = [1, 10, 50, 100, 500, 1000, 10000]
print("\nThroughput plotting")
print(ns)
for nobs in ns:
print(nobs)
m = max(int(float(nobs) ** (1.0 / ndims) + 2), 2)
obsgrid = np.meshgrid(
*[np.linspace(-0.99, 0.99, m) for _ in range(ndims)], indexing="ij"
)
obsgrid = [
x.flatten()[0:nobs] for x in obsgrid
]
out = None if not preallocate else np.zeros_like(obsgrid[0].flatten())
interps = {
"Scipy RegularGridInterpolator Linear": rectilinear_sp,
"Scipy RegularGridInterpolator Cubic": cubic_rectilinear_sp,
"InterpN MultilinearRegular": lambda p: regular_interpn.eval(p, out),
"InterpN MultilinearRectilinear": lambda p: rectilinear_interpn.eval(
p, out
),
"InterpN MulticubicRegular": lambda p: cubic_regular_interpn.eval(
p, out
),
"InterpN MulticubicRectilinear": lambda p: cubic_rectilinear_interpn.eval(
p, out
),
"InterpN NearestRegular": lambda p: nearest_regular_interpn.eval(
p, out
),
"InterpN NearestRectilinear": lambda p: nearest_rectilinear_interpn.eval(
p, out
),
}
points_interpn = [np.random.permutation(x.flatten()) for x in obsgrid]
points_sp = np.array(points_interpn).T
points = {
"Scipy RegularGridInterpolator Linear": points_sp,
"Scipy RegularGridInterpolator Cubic": points_sp,
"InterpN MultilinearRegular": points_interpn,
"InterpN MultilinearRectilinear": points_interpn,
"InterpN MulticubicRegular": points_interpn,
"InterpN MulticubicRectilinear": points_interpn,
"InterpN NearestRegular": points_interpn,
"InterpN NearestRectilinear": points_interpn,
}
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
p = points[name]
avg_time = average_call_time(func, p)
throughput = nobs / avg_time
throughputs[name].append(throughput)
kinds = {
"Scipy RegularGridInterpolator Linear": "Linear",
"Scipy RegularGridInterpolator Cubic": "Cubic",
"InterpN MultilinearRegular": "Linear",
"InterpN MultilinearRectilinear": "Linear",
"InterpN MulticubicRegular": "Cubic",
"InterpN MulticubicRectilinear": "Cubic",
"InterpN NearestRegular": "Linear",
"InterpN NearestRectilinear": "Linear",
}
with_alloc_string = "_prealloc" if preallocate else ""
suffix = (
"With Preallocated Output" if preallocate else "Without Preallocated Output"
)
figure_title = f"Interpolation on 20x...x20 4D Grid<br>{suffix}"
output_path = (
Path(__file__).parent
/ f"../docs/4d_throughput_vs_nobs{with_alloc_string}.svg"
)
_plot_normalized_vs_nobs(
ns=ns,
throughputs=throughputs,
kinds=kinds,
title=figure_title,
output_path=output_path,
)
def bench_throughput_vs_dims():
for nobs, nbench in [(1, 10000), (1000, 100)]:
throughputs = {
"Scipy RegularGridInterpolator Linear": [],
"Scipy RegularGridInterpolator Cubic": [],
"InterpN MultilinearRegular": [],
"InterpN MultilinearRectilinear": [],
"InterpN MulticubicRegular": [],
"InterpN MulticubicRectilinear": [],
"InterpN NearestRegular": [],
"InterpN NearestRectilinear": [],
"Scipy RectBivariateSpline Cubic": [], "Numpy Interp": [],
}
ndims_to_test = [x for x in range(1, 7)]
for ndims in ndims_to_test:
ngrid = 4
grids = [np.linspace(-1.0, 1.0, ngrid) for _ in range(ndims)]
xgrid = np.meshgrid(*grids, indexing="ij")
zgrid = np.random.uniform(-1.0, 1.0, xgrid[0].size)
z = zgrid.reshape(xgrid[0].shape)
dims = [x.size for x in grids]
starts = np.array([x[0] for x in grids])
steps = np.array([x[1] - x[0] for x in grids])
rectilinear_sp = RegularGridInterpolator(grids, z.copy(), bounds_error=None)
cubic_rectilinear_sp = RegularGridInterpolator(
grids, z.copy(), bounds_error=None, method="cubic"
)
rectilinear_interpn = MultilinearRectilinear.new(grids, zgrid)
regular_interpn = MultilinearRegular.new(dims, starts, steps, zgrid)
cubic_regular_interpn = MulticubicRegular.new(
dims, starts, steps, zgrid, linearize_extrapolation=True
)
cubic_rectilinear_interpn = MulticubicRectilinear.new(
grids, zgrid, linearize_extrapolation=True
)
nearest_regular_interpn = NearestRegular.new(dims, starts, steps, zgrid)
nearest_rectilinear_interpn = NearestRectilinear.new(grids, zgrid)
m = max(int(float(nobs) ** (1.0 / ndims) + 2), 2)
obsgrid = np.meshgrid(
*[np.linspace(-0.99, 0.99, m) for _ in range(ndims)], indexing="ij"
)
obsgrid = [
x.flatten()[0:nobs] for x in obsgrid
]
out = np.zeros((nobs,))
interps = {
"Scipy RegularGridInterpolator Linear": rectilinear_sp,
"Scipy RegularGridInterpolator Cubic": cubic_rectilinear_sp,
"InterpN MultilinearRegular": lambda p,
interp=regular_interpn: interp.eval(p, out),
"InterpN MultilinearRectilinear": (
lambda p, interp=rectilinear_interpn: interp.eval(p, out)
),
"InterpN MulticubicRegular": lambda p,
interp=cubic_regular_interpn: interp.eval(p, out),
"InterpN MulticubicRectilinear": (
lambda p, interp=cubic_rectilinear_interpn: interp.eval(p, out)
),
"InterpN NearestRegular": lambda p,
interp=nearest_regular_interpn: interp.eval(p, out),
"InterpN NearestRectilinear": (
lambda p, interp=nearest_rectilinear_interpn: interp.eval(p, out)
),
}
if ndims == 1:
interps["Numpy Interp"] = lambda p: np.interp(p[0], grids[0], zgrid)
if ndims == 2:
cubic_rbs_sp = RectBivariateSpline(
grids[0], grids[1], z.copy(), kx=3, ky=3, s=0
)
interps["Scipy RectBivariateSpline Cubic"] = lambda p: cubic_rbs_sp(
*p, grid=False
)
points_interpn = [np.random.permutation(x.flatten()) for x in obsgrid]
points_sp = np.ascontiguousarray(np.array(points_interpn).T)
points = {
"Scipy RegularGridInterpolator Linear": points_sp,
"Scipy RegularGridInterpolator Cubic": points_sp,
"Scipy RectBivariateSpline Cubic": points_interpn,
"InterpN MultilinearRegular": points_interpn,
"InterpN MultilinearRectilinear": points_interpn,
"InterpN MulticubicRegular": points_interpn,
"InterpN MulticubicRectilinear": points_interpn,
"InterpN NearestRegular": points_interpn,
"InterpN NearestRectilinear": points_interpn,
"Numpy Interp": points_interpn,
}
for name, func in interps.items():
if RUN_INTERPN_ONLY and not name.startswith("InterpN "):
continue
print(ndims, name)
p = points[name]
timeit(
lambda: func(p), setup=gc.collect, number=int(nbench / 4)
) t = timeit(lambda: func(p), setup=gc.collect, number=nbench) / nbench
throughput = nobs / t
throughputs[name].append(throughput)
kinds = {
"Scipy RegularGridInterpolator Linear": "Linear",
"Scipy RegularGridInterpolator Cubic": "Cubic",
"Scipy RectBivariateSpline Cubic": "Cubic",
"InterpN MultilinearRegular": "Linear",
"InterpN MultilinearRectilinear": "Linear",
"InterpN MulticubicRegular": "Cubic",
"InterpN MulticubicRectilinear": "Cubic",
"InterpN NearestRegular": "Linear",
"InterpN NearestRectilinear": "Linear",
"Numpy Interp": "Linear",
}
output_path = (
Path(__file__).parent / f"../docs/throughput_vs_dims_{nobs}_obs.svg"
)
_plot_throughput_vs_dims(
ndims_to_test=ndims_to_test,
throughputs=throughputs,
kinds=kinds,
nobs=nobs,
output_path=output_path,
)
_plot_speedup_vs_dims(
ndims_to_test=ndims_to_test,
throughputs=throughputs,
kinds=kinds,
nobs=nobs,
output_dir=Path(__file__).parent.parent / "docs",
)
def main():
bench_throughput_vs_dims()
bench_thread_speedup_vs_threads()
bench_4_dims_1_obs()
bench_4_dims_n_obs_unordered()
bench_3_dims_n_obs_unordered()
if __name__ == "__main__":
main()