import pygraphsp as gs
def data_source():
return {"data": 42}
def process_data(input_value):
print(f"Processing: {input_value}")
return {"output_data": input_value * 2}
def display_result(final_value):
print(f"Final result: {final_value}")
return {}
def main():
print("=== Port Mapping Demo ===\n")
graph = gs.Graph()
graph.add(
data_source,
label="Data Source",
outputs=["data"]
)
graph.add(
process_data,
label="Processor",
inputs=[("data", "input_value")], outputs=[("output_data", "output_data")] )
graph.add(
display_result,
label="Display",
inputs=[("output_data", "final_value")] )
graph.add_edge("data_source", "data", "process_data", "data")
graph.add_edge("process_data", "output_data", "display_result", "output_data")
print("Graph structure:")
print(gs.Inspector.visualize(graph))
print()
print("Executing graph...")
executor = gs.Executor()
result = executor.execute(graph)
if result.is_success():
print("\n✓ Execution successful!")
else:
print("\n✗ Execution failed")
for node_id, error in result.errors.items():
print(f" - {node_id}: {error}")
def demo_simple_ports():
print("\n=== Simple Port Demo ===\n")
def add_numbers(a, b):
return {"sum": a + b}
def multiply(sum):
return {"result": sum * 10}
graph = gs.Graph()
graph.add(
add_numbers,
label="Adder",
inputs=["a", "b"], outputs=["sum"]
)
graph.add(
multiply,
label="Multiplier",
inputs=["sum"], outputs=["result"]
)
graph.auto_connect()
executor = gs.Executor()
graph.get_node("add_numbers").set_input("a", gs.PortData(5))
graph.get_node("add_numbers").set_input("b", gs.PortData(3))
result = executor.execute(graph)
if result.is_success():
final_result = result.get_output("multiply", "result")
print(f"Result: 5 + 3 = 8, then 8 * 10 = {final_result}")
print("✓ Simple port demo successful!")
def demo_port_objects():
print("\n=== Port Objects Demo ===\n")
def legacy_function(old_param_name):
return {"legacy_output": old_param_name.upper()}
graph = gs.Graph()
input_port = gs.Port(
"modern_input", "old_param_name", "Modern Input" )
input_port.with_description("This port connects modern data to a legacy function")
output_port = gs.Port(
"modern_output", "legacy_output", "Modern Output" )
graph.add(
legacy_function,
label="Legacy Function",
inputs=[input_port],
outputs=[output_port]
)
print("✓ Port objects demo created")
print(" This shows how you can wrap legacy functions with modern interfaces")
if __name__ == "__main__":
main()
demo_simple_ports()
demo_port_objects()