import pygraphsp as gs
def train_model(inputs):
learning_rate = inputs.get("learning_rate", 0.01)
data = inputs.get("data", 100)
accuracy = min(0.95, 0.5 + (learning_rate * 10) - (learning_rate ** 2) * 20)
loss = 1.0 - accuracy
print(f"Training with LR={learning_rate:.4f}: accuracy={accuracy:.3f}, loss={loss:.3f}")
return {
"accuracy": accuracy,
"loss": loss,
"learning_rate": learning_rate
}
def prepare_data(inputs):
print("Preparing dataset...")
return {"data": 100}
def select_best(inputs):
accuracies = inputs.get("accuracies", [])
losses = inputs.get("losses", [])
lrs = inputs.get("learning_rates", [])
if not accuracies:
return {"best_lr": 0.0, "best_accuracy": 0.0}
best_idx = accuracies.index(max(accuracies))
best_lr = lrs[best_idx]
best_accuracy = accuracies[best_idx]
print(f"\nBest configuration: LR={best_lr:.4f} with accuracy={best_accuracy:.3f}")
return {
"best_lr": best_lr,
"best_accuracy": best_accuracy
}
def main():
print("=== Variants Demo: Hyperparameter Sweep ===\n")
graph = gs.Graph()
graph.add(
prepare_data,
label="Data Preparation",
outputs=["data"]
)
graph.add(
train_model,
label="Train Model",
inputs=["data", "learning_rate"],
outputs=["accuracy", "loss", "learning_rate"]
)
graph.add_edge("prepare_data", "data", "train_model", "data")
print("Creating variants with different learning rates...")
def learning_rate_generator(i):
rates = [0.001, 0.005, 0.01, 0.05, 0.1]
return gs.PortData(rates[i])
variant_branches = graph.create_variants(
name_prefix="lr_sweep",
count=5,
param_name="learning_rate",
variant_function=learning_rate_generator,
parallel=True )
print(f"Created {len(variant_branches)} variant branches")
for branch in variant_branches:
print(f" - {branch}")
graph.add(
select_best,
label="Select Best",
inputs=["accuracies", "losses", "learning_rates"],
outputs=["best_lr", "best_accuracy"]
)
graph.merge_branches(
node_id="select_best",
branches=variant_branches,
port="accuracy" )
graph.merge_branches(
node_id="select_best",
branches=variant_branches,
port="loss" )
graph.merge_branches(
node_id="select_best",
branches=variant_branches,
port="learning_rate" )
print("\n=== Graph Structure ===")
print(gs.Inspector.visualize(graph))
print("\n=== Executing Graph ===\n")
executor = gs.Executor()
result = executor.execute(graph)
if result.is_success():
print("\n✓ Execution successful!")
best_lr = result.get_output("select_best", "best_lr")
best_acc = result.get_output("select_best", "best_accuracy")
print(f"\nFinal Result: Best LR = {best_lr}, Best Accuracy = {best_acc}")
else:
print("\n✗ Execution failed")
for node_id, error in result.errors.items():
print(f" - {node_id}: {error}")
def demo_custom_merge():
print("\n\n=== Custom Merge Demo ===\n")
def process_data(inputs):
batch_size = inputs.get("batch_size", 32)
throughput = batch_size * 100 memory = batch_size * 2
print(f"Batch size {batch_size}: throughput={throughput}, memory={memory}MB")
return {
"throughput": throughput,
"memory": memory,
"batch_size": batch_size
}
def compute_average(values):
if not values:
return gs.PortData(0.0)
total = sum(v for v in values if isinstance(v, (int, float)))
return gs.PortData(total / len(values))
graph = gs.Graph()
graph.add(
process_data,
label="Process Data",
inputs=["batch_size"],
outputs=["throughput", "memory", "batch_size"]
)
def batch_size_generator(i):
sizes = [16, 32, 64, 128, 256]
return gs.PortData(sizes[i])
variant_branches = graph.create_variants(
name_prefix="batch_sweep",
count=5,
param_name="batch_size",
variant_function=batch_size_generator
)
def summarize(inputs):
avg_throughput = inputs.get("avg_throughput", 0)
avg_memory = inputs.get("avg_memory", 0)
print(f"\nAverage across all batch sizes:")
print(f" Throughput: {avg_throughput:.0f}")
print(f" Memory: {avg_memory:.0f} MB")
return {}
graph.add(
summarize,
label="Summarize",
inputs=["avg_throughput", "avg_memory"]
)
graph.merge_branches(
node_id="summarize",
branches=variant_branches,
port="throughput",
merge_function=compute_average
)
graph.merge_branches(
node_id="summarize",
branches=variant_branches,
port="memory",
merge_function=compute_average
)
print("Executing custom merge demo...\n")
executor = gs.Executor()
result = executor.execute(graph)
if result.is_success():
print("\n✓ Custom merge demo successful!")
else:
print("\n✗ Custom merge demo failed")
def demo_nested_variants():
print("\n\n=== Nested Variants Demo ===\n")
def experiment(inputs):
lr = inputs.get("learning_rate", 0.01)
batch = inputs.get("batch_size", 32)
score = lr * 100 + batch * 0.1
print(f"LR={lr:.4f}, Batch={batch}: score={score:.2f}")
return {"score": score}
graph = gs.Graph()
graph.add(
experiment,
label="Run Experiment",
inputs=["learning_rate", "batch_size"],
outputs=["score"]
)
def lr_gen(i):
rates = [0.001, 0.01, 0.1]
return gs.PortData(rates[i])
lr_branches = graph.create_variants(
name_prefix="lr",
count=3,
param_name="learning_rate",
variant_function=lr_gen
)
print(f"Created {len(lr_branches)} learning rate variants")
for branch_name in lr_branches:
branch = graph.get_branch(branch_name)
def batch_gen(i):
sizes = [16, 32, 64, 128]
return gs.PortData(sizes[i])
batch_branches = branch.create_variants(
name_prefix="batch",
count=4,
param_name="batch_size",
variant_function=batch_gen
)
print(f" - {branch_name}: created {len(batch_branches)} batch size variants")
all_branches = graph.branch_names()
print(f"\nTotal branches: {len(all_branches)}")
print("This creates a 3×4 grid of experiments!")
print("\n✓ Nested variants demo complete")
print("Note: Execute would run all 12 combinations in parallel")
if __name__ == "__main__":
main()
demo_custom_merge()