try:
import dist_agent_lang
runtime = dist_agent_lang.DistAgentLangRuntime()
data = b"Hello, World!"
hash_result = dist_agent_lang.hash_data(data, "SHA256")
print(f"Hash: {hash_result}")
private_key = b"your_private_key_here"
signature = dist_agent_lang.sign_data(data, private_key)
print(f"Signature: {signature.hex()}")
public_key = b"your_public_key_here"
is_valid = dist_agent_lang.verify_signature(data, signature, public_key)
print(f"Valid: {is_valid}")
result = runtime.call_function(
"CryptoService",
"batch_hash",
[["data1", "data2", "data3"]]
)
print(f"Batch hash results: {result}")
except ImportError:
print("FFI module not available, falling back to HTTP")
import requests
import json
def call_via_http(service_name, function_name, args):
url = f"http://localhost:3000/api/{service_name}/{function_name}"
response = requests.post(url, json=args)
return response.json()
def call_service(service_name, function_name, args, prefer_ffi=True):
if prefer_ffi:
try:
runtime = dist_agent_lang.DistAgentLangRuntime()
return runtime.call_function(service_name, function_name, args)
except (ImportError, AttributeError):
pass
return call_via_http(service_name, function_name, args)
import time
def benchmark_ffi_vs_http():
data = b"test_data" * 1000
iterations = 10000
try:
start = time.time()
for _ in range(iterations):
dist_agent_lang.hash_data(data, "SHA256")
ffi_time = time.time() - start
print(f"FFI: {ffi_time:.4f}s ({iterations/ffi_time:.0f} ops/sec)")
except ImportError:
print("FFI not available")
ffi_time = None
start = time.time()
for _ in range(iterations):
call_via_http("CryptoService", "hash_data", [data.decode()])
http_time = time.time() - start
print(f"HTTP: {http_time:.4f}s ({iterations/http_time:.0f} ops/sec)")
if ffi_time:
speedup = http_time / ffi_time
print(f"Speedup: {speedup:.1f}x faster with FFI")
if __name__ == "__main__":
print("=== Python FFI Example ===")
benchmark_ffi_vs_http()