import time
import numpy as np
import traj_dist_rs
from joblib import Parallel, delayed
print("=" * 60)
print("Parallel Processing Examples")
print("=" * 60)
num_trajectories = 50
trajectory_length = 1000
print(
f"\nCreating {num_trajectories} trajectories with {trajectory_length} points each..."
)
trajectories = [
np.random.rand(trajectory_length, 2) * 10 for _ in range(num_trajectories)
]
print("\n1. Sequential vs Parallel Performance Comparison")
print("-" * 60)
def compute_dtw_sequential(trajs):
results = []
for i, traj1 in enumerate(trajs):
for j, traj2 in enumerate(trajs):
if i < j: result = traj_dist_rs.dtw(
traj1, traj2, "euclidean", use_full_matrix=False
)
results.append(result.distance)
return results
def compute_dtw_parallel(trajs):
def compute_pair(i, j):
result = traj_dist_rs.dtw(
trajs[i], trajs[j], "euclidean", use_full_matrix=False
)
return result.distance
pairs = [(i, j) for i in range(len(trajs)) for j in range(i + 1, len(trajs))]
results = Parallel(n_jobs=-1)(delayed(compute_pair)(i, j) for i, j in pairs)
return results
print(" Running sequential computation...")
start_time = time.time()
sequential_results = compute_dtw_sequential(trajectories[:10]) sequential_time = time.time() - start_time
print(f" Sequential time: {sequential_time:.3f}s")
print(" Running parallel computation...")
start_time = time.time()
parallel_results = compute_dtw_parallel(trajectories[:10]) parallel_time = time.time() - start_time
print(f" Parallel time: {parallel_time:.3f}s")
print(f" Speedup: {sequential_time / parallel_time:.2f}x")
print(f" Results match: {np.allclose(sequential_results, parallel_results)}")
print("\n2. Parallel Processing with Different Algorithms")
print("-" * 60)
def compute_distance_pairwise(algo_name, trajs, dist_type="euclidean", **kwargs):
def compute_pair(i, j):
if algo_name == "sspd":
return traj_dist_rs.sspd(trajs[i], trajs[j], dist_type)
elif algo_name == "dtw":
result = traj_dist_rs.dtw(
trajs[i], trajs[j], dist_type, use_full_matrix=False
)
return result.distance
elif algo_name == "hausdorff":
return traj_dist_rs.hausdorff(trajs[i], trajs[j], dist_type)
elif algo_name == "lcss":
eps = kwargs.get("epsilon", 0.1)
result = traj_dist_rs.lcss(
trajs[i], trajs[j], dist_type, eps, use_full_matrix=False
)
return result.distance
elif algo_name == "edr":
eps = kwargs.get("epsilon", 0.1)
result = traj_dist_rs.edr(
trajs[i], trajs[j], dist_type, eps, use_full_matrix=False
)
return result.distance
else:
raise ValueError(f"Unknown algorithm: {algo_name}")
pairs = [(i, j) for i in range(len(trajs)) for j in range(i + 1, len(trajs))]
results = Parallel(n_jobs=-1)(delayed(compute_pair)(i, j) for i, j in pairs)
return np.array(results)
test_trajs = trajectories[:5] algorithms = ["sspd", "dtw", "hausdorff", "lcss", "edr"]
for algo in algorithms:
start_time = time.time()
distances = compute_distance_pairwise(algo, test_trajs, epsilon=0.1)
elapsed = time.time() - start_time
print(
f" {algo:15s}: {len(distances)} distances, "
f"mean={distances.mean():.6f}, time={elapsed:.3f}s"
)
print("\n3. Parallel Batch Computation vs Sequential")
print("-" * 60)
metric = traj_dist_rs.Metric.sspd(type_d="euclidean")
start_time = time.time()
distances_seq = traj_dist_rs.pdist(trajectories[:20], metric)
seq_time = time.time() - start_time
print(f" Sequential pdist (20 trajectories): {seq_time:.3f}s")
start_time = time.time()
distances_par = traj_dist_rs.pdist(trajectories[:20], metric)
par_time = time.time() - start_time
print(f" Parallel pdist (20 trajectories): {par_time:.3f}s")
print(f" Speedup: {seq_time / par_time:.2f}x")
print(f" Results match: {np.allclose(distances_seq, distances_par)}")
print("\n4. Parallel Cross-Distance Computation")
print("-" * 60)
def compute_cross_distance(set_a, set_b, dist_type):
def compute_cell(i, j):
return traj_dist_rs.sspd(set_a[i], set_b[j], dist_type)
results = Parallel(n_jobs=-1)(
delayed(compute_cell)(i, j)
for i in range(len(set_a))
for j in range(len(set_b))
)
return np.array(results).reshape(len(set_a), len(set_b))
set_a = trajectories[:5]
set_b = trajectories[5:10]
dist_type = "euclidean"
start_time = time.time()
cross_matrix = compute_cross_distance(set_a, set_b, dist_type)
elapsed = time.time() - start_time
print(f" Cross-distance matrix shape: {cross_matrix.shape}")
print(f" Computation time: {elapsed:.3f}s")
print(" Matrix:")
print(cross_matrix)
print("\n5. Using DpResult with Joblib Parallel")
print("-" * 60)
def compute_dtw_with_matrix(i, j, trajs):
result = traj_dist_rs.dtw(trajs[i], trajs[j], "euclidean", use_full_matrix=True)
return result
test_trajs = trajectories[:5]
pairs = [(i, j) for i in range(len(test_trajs)) for j in range(i + 1, len(test_trajs))]
start_time = time.time()
results = Parallel(n_jobs=-1)(
delayed(compute_dtw_with_matrix)(i, j, test_trajs) for i, j in pairs
)
elapsed = time.time() - start_time
print(f" Computed {len(results)} DTW distances with matrices")
print(f" Time: {elapsed:.3f}s")
if results:
print(f" First result distance: {results[0].distance:.6f}")
print(f" First result matrix shape: {results[0].matrix.shape}")
print("\n" + "=" * 60)
print("All parallel processing examples completed successfully!")
print("=" * 60)