import json
import os
import shutil
import subprocess
import sys
import pytest
import acdp
HERE = os.path.dirname(os.path.abspath(__file__))
SEED = bytes(32)
AGENT_DID = "did:web:agents.example.com:test-producer"
KEY_ID = f"{AGENT_DID}#key-1"
GOLDEN_HASH = (
"sha256:f170150ddbf59d99794e7797824591b374d459782084597b644ecc57a41031b5"
)
GOLDEN_SIG = (
"ErkbV+FUdn49TgF3zJ3RBe3AmyGxLVAQdMjlhabUfM96qendmWwdVodX/SV3O3aKLypbUu6gmb5Npt3O/w7nDQ=="
)
PY_OMIT = {"omit_acdp_version": True}
NODE_OMIT = {"omitAcdpVersion": True}
class NodeWorker:
def __init__(self) -> None:
self._proc = subprocess.Popen(
["node", os.path.join(HERE, "node_worker.mjs")],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=sys.stderr, text=True,
bufsize=1,
)
self._next_id = 0
def _rpc(self, method: str, params: dict) -> dict:
self._next_id += 1
request = {"id": self._next_id, "method": method, "params": params}
self._proc.stdin.write(json.dumps(request) + "\n")
self._proc.stdin.flush()
line = self._proc.stdout.readline()
if not line:
raise RuntimeError(
f"node worker exited before answering {method!r}"
)
return json.loads(line)
def call(self, method: str, **params):
resp = self._rpc(method, params)
if not resp.get("ok"):
raise RuntimeError(f"node.{method} failed: {resp['error']}")
return resp["result"]
def call_expect_error(self, method: str, **params) -> str:
resp = self._rpc(method, params)
if resp.get("ok"):
raise AssertionError(f"node.{method} unexpectedly succeeded")
return resp["error"]
def close(self) -> None:
try:
self._proc.stdin.close()
self._proc.wait(timeout=5)
except Exception:
self._proc.kill()
@pytest.fixture(scope="module")
def node():
if shutil.which("node") is None:
pytest.skip("node executable not found on PATH")
worker = NodeWorker()
try:
worker.call("ping") yield worker
finally:
worker.close()
def _python_publish(opts: dict) -> str:
p = acdp.AcdpProducer.from_seed(SEED, AGENT_DID, KEY_ID)
return p.build_publish_request(**opts)
def _node_publish(node, opts_camel: dict) -> str:
producer = node.call(
"new_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(SEED)
)
return node.call(
"build_publish_request",
producer=producer["handle"],
opts=opts_camel,
)["raw"]
def test_python_matches_golden_hash_and_signature():
raw = _python_publish(
{
"title": "Golden test vector — minimal first version",
"context_type": "data_snapshot",
**PY_OMIT,
}
)
req = json.loads(raw)
assert req["content_hash"] == GOLDEN_HASH
assert req["signature"]["value"] == GOLDEN_SIG
def test_node_matches_golden_hash_and_signature(node):
raw = _node_publish(
node,
{
"title": "Golden test vector — minimal first version",
"contextType": "data_snapshot",
**NODE_OMIT,
},
)
req = json.loads(raw)
assert req["content_hash"] == GOLDEN_HASH
assert req["signature"]["value"] == GOLDEN_SIG
def test_python_and_node_emit_byte_identical_publish_requests(node):
py_raw = _python_publish(
{
"title": "Golden test vector — minimal first version",
"context_type": "data_snapshot",
}
)
node_raw = _node_publish(
node,
{
"title": "Golden test vector — minimal first version",
"contextType": "data_snapshot",
},
)
py_req = json.loads(py_raw)
node_req = json.loads(node_raw)
assert py_req["content_hash"] == node_req["content_hash"]
assert py_req["signature"]["value"] == node_req["signature"]["value"]
assert py_req["signature"]["algorithm"] == node_req["signature"]["algorithm"]
assert py_req["signature"]["key_id"] == node_req["signature"]["key_id"]
assert py_req["agent_id"] == node_req["agent_id"]
def test_python_and_node_match_on_a_richer_body(node):
derived = (
"acdp://registry.example.com/12345678-1234-4321-8123-123456781234"
)
py_raw = _python_publish(
{
"title": "Interop body",
"context_type": "analysis",
"summary": "rich body",
"tags": ["interop", "golden"],
"domain": "test.interop",
"derived_from": [derived],
"contributors": ["did:web:agents.example.com:contributor"],
"description": "Cross-language byte-equality assertion.",
}
)
node_raw = _node_publish(
node,
{
"title": "Interop body",
"contextType": "analysis",
"summary": "rich body",
"tags": ["interop", "golden"],
"domain": "test.interop",
"derivedFrom": [derived],
"contributors": ["did:web:agents.example.com:contributor"],
"description": "Cross-language byte-equality assertion.",
},
)
assert json.loads(py_raw)["content_hash"] == json.loads(node_raw)["content_hash"]
assert (
json.loads(py_raw)["signature"]["value"]
== json.loads(node_raw)["signature"]["value"]
)
def test_node_verifies_python_signature(node):
raw = _python_publish(
{"title": "From Python", "context_type": "data_snapshot"}
)
req = json.loads(raw)
pub_key_b64 = acdp.AcdpProducer.from_seed(
SEED, AGENT_DID, KEY_ID
).public_key_b64
assert node.call(
"verify_content_hash",
body_json=raw,
expected_hash=req["content_hash"],
)["ok"]
assert node.call(
"verify_signature",
pub_key_b64=pub_key_b64,
sig_b64=req["signature"]["value"],
content_hash=req["content_hash"],
)["ok"]
def test_python_verifies_node_signature(node):
producer = node.call(
"new_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(SEED)
)
raw = node.call(
"build_publish_request",
producer=producer["handle"],
opts={"title": "From Node", "contextType": "data_snapshot"},
)["raw"]
req = json.loads(raw)
assert acdp.AcdpVerifier.verify_content_hash(raw, req["content_hash"])
assert acdp.AcdpVerifier.verify_signature(
producer["public_key_b64"],
req["signature"]["value"],
req["content_hash"],
)
def test_sign_challenge_is_deterministic_across_bindings(node):
signing_input = (
"acdp-registry-auth:v1:nonce-abc:"
f"{AGENT_DID}:registry.example.com:1748000000"
)
py_sig = acdp.AcdpProducer.from_seed(
SEED, AGENT_DID, KEY_ID
).sign_challenge(signing_input)
producer = node.call(
"new_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(SEED)
)
node_sig = node.call(
"sign_challenge",
producer=producer["handle"],
signing_input=signing_input,
)["signature"]
assert py_sig == node_sig
P256_SEED = bytes(31) + bytes([1])
P256_GOLDEN_HASH = GOLDEN_HASH P256_GOLDEN_SIG = (
"O+b+E5OIecgwCnjDyTqsiwwy3VTdBHbVhiRR9k3FAPZHvLJ5dyYYVPPUWbl0dKDdgKMw2dWrnKWRANJVoS9vNw=="
)
P256_MINIMAL = {"title": "Golden test vector — minimal first version"}
def _python_p256_publish(opts: dict) -> str:
p = acdp.AcdpP256Producer.from_seed(P256_SEED, AGENT_DID, KEY_ID)
return p.build_publish_request(**opts)
def _node_p256_publish(node, opts_camel: dict) -> str:
producer = node.call(
"new_p256_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(P256_SEED)
)
return node.call(
"build_publish_request",
producer=producer["handle"],
opts=opts_camel,
)["raw"]
def test_p256_python_matches_golden():
req = json.loads(
_python_p256_publish(
{**P256_MINIMAL, "context_type": "data_snapshot", **PY_OMIT}
)
)
assert req["content_hash"] == P256_GOLDEN_HASH
assert req["signature"]["algorithm"] == "ecdsa-p256"
assert req["signature"]["value"] == P256_GOLDEN_SIG
def test_p256_node_matches_golden(node):
req = json.loads(
_node_p256_publish(
node, {**P256_MINIMAL, "contextType": "data_snapshot", **NODE_OMIT}
)
)
assert req["content_hash"] == P256_GOLDEN_HASH
assert req["signature"]["algorithm"] == "ecdsa-p256"
assert req["signature"]["value"] == P256_GOLDEN_SIG
def test_p256_python_and_node_emit_byte_identical_requests(node):
py_req = json.loads(
_python_p256_publish({**P256_MINIMAL, "context_type": "data_snapshot"})
)
node_req = json.loads(
_node_p256_publish(node, {**P256_MINIMAL, "contextType": "data_snapshot"})
)
assert py_req["content_hash"] == node_req["content_hash"]
assert py_req["signature"]["value"] == node_req["signature"]["value"]
assert py_req["signature"]["algorithm"] == node_req["signature"]["algorithm"] == "ecdsa-p256"
def test_p256_sign_challenge_is_deterministic_across_bindings(node):
signing_input = (
"acdp-registry-auth:v1:nonce-abc:"
f"{AGENT_DID}:registry.example.com:1748000000"
)
py_sig = acdp.AcdpP256Producer.from_seed(
P256_SEED, AGENT_DID, KEY_ID
).sign_challenge(signing_input)
producer = node.call(
"new_p256_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(P256_SEED)
)
node_sig = node.call(
"sign_challenge",
producer=producer["handle"],
signing_input=signing_input,
)["signature"]
assert py_sig == node_sig
def test_node_verifies_python_p256_signature(node):
raw = _python_p256_publish({**P256_MINIMAL, "context_type": "data_snapshot"})
req = json.loads(raw)
pub_sec1 = acdp.AcdpP256Producer.from_seed(
P256_SEED, AGENT_DID, KEY_ID
).public_key_sec1_b64
assert node.call(
"verify_signature_p256",
pub_key_sec1_b64=pub_sec1,
sig_b64=req["signature"]["value"],
content_hash=req["content_hash"],
)["ok"]
def test_python_verifies_node_p256_signature(node):
producer = node.call(
"new_p256_producer", agent_did=AGENT_DID, key_id=KEY_ID, seed=list(P256_SEED)
)
raw = node.call(
"build_publish_request",
producer=producer["handle"],
opts={**P256_MINIMAL, "contextType": "data_snapshot"},
)["raw"]
req = json.loads(raw)
assert acdp.AcdpVerifier.verify_signature_p256(
producer["public_key_sec1_b64"],
req["signature"]["value"],
req["content_hash"],
)
def test_extended_body_fields_are_byte_identical_across_bindings(node):
data_refs = json.dumps(
[{"type": "primary_result", "location": "https://example.com/d.parquet"}]
)
data_period = json.dumps(
{"start": "2026-01-01T00:00:00Z", "end": "2026-01-02T00:00:00Z"}
)
expires_at = "2026-06-01T00:00:00Z"
py_raw = _python_publish(
{
"title": "Extended body",
"context_type": "data_snapshot",
"data_refs": data_refs,
"data_period": data_period,
"expires_at": expires_at,
}
)
node_raw = _node_publish(
node,
{
"title": "Extended body",
"contextType": "data_snapshot",
"dataRefs": data_refs,
"dataPeriod": data_period,
"expiresAt": expires_at,
},
)
py_req = json.loads(py_raw)
node_req = json.loads(node_raw)
assert py_req["content_hash"] == node_req["content_hash"]
assert py_req["signature"]["value"] == node_req["signature"]["value"]
assert py_req["data_refs"][0]["location"] == "https://example.com/d.parquet"
assert py_req["data_period"] == node_req["data_period"]
assert py_req["expires_at"] == node_req["expires_at"]
_CANON_INPUTS = [
'{ "b": 1, "a": 2 }',
'{"x": -0.0}', '{"z": [3, 2, 1], "a": {"d": 4, "c": 3}}',
'{"k": "café — π", "n": 42, "deep": {"arr": [1, 2, {"q": true}]}}',
]
@pytest.mark.parametrize("doc", _CANON_INPUTS)
def test_canonicalize_is_byte_identical_across_bindings(node, doc):
py_out = acdp.AcdpCanonicalizer.canonicalize(doc)
node_out = node.call("canonicalize", json=doc)["result"]
assert py_out == node_out
@pytest.mark.parametrize("doc", _CANON_INPUTS)
def test_content_hash_is_identical_across_bindings(node, doc):
py_out = acdp.AcdpCanonicalizer.content_hash(doc)
node_out = node.call("content_hash", json=doc)["result"]
assert py_out == node_out
assert py_out.startswith("sha256:")
def _py_ssrf_url(url: str) -> dict:
pol = acdp.AcdpSsrfPolicy.production()
try:
pol.check_url(url)
return {"allowed": True}
except acdp.SsrfRejected as e:
return {"allowed": False, "reason": e.reason}
def _py_ssrf_ip(ip: str) -> dict:
pol = acdp.AcdpSsrfPolicy.production()
try:
pol.check_ip(ip)
return {"allowed": True}
except acdp.SsrfRejected as e:
return {"allowed": False, "reason": e.reason}
def _py_ssrf_redirect(from_url: str, to_url: str) -> dict:
pol = acdp.AcdpSsrfPolicy.production()
try:
pol.check_redirect_authority(from_url, to_url)
return {"allowed": True}
except acdp.SsrfRejected as e:
return {"allowed": False, "reason": e.reason}
@pytest.mark.parametrize(
"url",
[
"https://registry.example.com", "http://registry.example.com", "https://192.168.1.1", "https://[::1]", "not a url", ],
)
def test_ssrf_check_url_reason_matches_across_bindings(node, url):
py = _py_ssrf_url(url)
nd = node.call("ssrf_check_url", url=url)
assert py["allowed"] == nd["allowed"]
assert py.get("reason") == nd.get("reason")
@pytest.mark.parametrize(
"ip",
[
"127.0.0.1", "10.0.0.1", "169.254.169.254", "239.0.0.1", "8.8.8.8", "fc00::1", "fe80::1", "64:ff9b::a9fe:a9fe", "::ffff:10.0.0.1", "2001:db8::1", ],
)
def test_ssrf_check_ip_reason_matches_across_bindings(node, ip):
py = _py_ssrf_ip(ip)
nd = node.call("ssrf_check_ip", ip=ip)
assert py["allowed"] == nd["allowed"]
assert py.get("reason") == nd.get("reason")
@pytest.mark.parametrize(
"from_url,to_url",
[
("https://a.example/x", "https://a.example/y"), ("https://a.example/x", "https://a.example:443/y"), ("https://a.example/x", "https://b.example/y"), ("https://a.example/x", "https://a.example:8443/y"), ("https://a.example/x", "http://a.example/y"), ],
)
def test_ssrf_check_redirect_reason_matches_across_bindings(node, from_url, to_url):
py = _py_ssrf_redirect(from_url, to_url)
nd = node.call("ssrf_check_redirect", from_url=from_url, to_url=to_url)
assert py["allowed"] == nd["allowed"]
assert py.get("reason") == nd.get("reason")
import base64
_DID = "did:web:agents.example.com"
_DID_KEY_ID = f"{_DID}#key-1"
def _did_doc_for(public_key_b64: str, *, authorized: bool = True) -> str:
raw = base64.b64decode(public_key_b64)
x = base64.urlsafe_b64encode(raw).rstrip(b"=").decode("ascii")
return json.dumps(
{
"id": _DID,
"verificationMethod": [
{
"id": _DID_KEY_ID,
"type": "JsonWebKey2020",
"controller": _DID,
"publicKeyJwk": {"kty": "OKP", "crv": "Ed25519", "x": x},
}
],
"assertionMethod": [_DID_KEY_ID] if authorized else [],
}
)
def _py_did_resolve(doc_json: str, key_id: str, alg: str) -> dict:
try:
doc = acdp.AcdpDidDocument.parse(doc_json, _DID)
k = doc.key_for_algorithm(key_id, alg)
return {"ok": True, "key": k}
except acdp.DidResolutionError as e:
return {"ok": False, "reason": e.reason}
def test_did_web_to_url_matches_across_bindings(node):
for did in ("did:web:example.com", "did:web:example.com:users:alice"):
assert acdp.AcdpDid.web_to_url(did) == node.call("did_web_to_url", did=did)["result"]
def test_did_key_extraction_matches_across_bindings(node):
p = acdp.AcdpProducer.from_seed(SEED, _DID, _DID_KEY_ID)
doc_json = _did_doc_for(p.public_key_b64)
py = _py_did_resolve(doc_json, _DID_KEY_ID, "ed25519")
nd = node.call(
"did_key_for_algorithm",
doc_json=doc_json,
expected_did=_DID,
requested_key_id=_DID_KEY_ID,
requested_alg="ed25519",
)
assert py["ok"] and nd["ok"]
assert py["key"]["public_key_b64"] == nd["key"]["public_key_b64"] == p.public_key_b64
@pytest.mark.parametrize(
"doc_authorized,key_id,alg,expected_reason",
[
(True, _DID_KEY_ID, "ecdsa-p256", "alg_mismatch"), (False, _DID_KEY_ID, "ed25519", "key_not_authorized"), (True, f"{_DID}#key-2", "ed25519", "key_not_found"), (True, _DID_KEY_ID, "rsa", "unsupported_algorithm"),
],
)
def test_did_rejection_reason_matches_across_bindings(
node, doc_authorized, key_id, alg, expected_reason
):
p = acdp.AcdpProducer.from_seed(SEED, _DID, _DID_KEY_ID)
doc_json = _did_doc_for(p.public_key_b64, authorized=doc_authorized)
py = _py_did_resolve(doc_json, key_id, alg)
nd = node.call(
"did_key_for_algorithm",
doc_json=doc_json,
expected_did=_DID,
requested_key_id=key_id,
requested_alg=alg,
)
assert py["ok"] is False and nd["ok"] is False
assert py["reason"] == nd["reason"] == expected_reason