import json
import os
import sys
import urllib.request
import urllib.error
SERVER_URL = os.environ.get("MICRORESOLVE_URL", "http://localhost:3001")
API_KEY = os.environ.get("MICRORESOLVE_API_KEY", "")
NS_ID = "bench-delta-bw"
N_INTENTS = 50
PHRASES_PER_INTENT = 4 N_MUTATIONS = 100
DELTA_RATIO_THRESHOLD = 0.25
def headers():
h = {"Content-Type": "application/json"}
if API_KEY:
h["X-Api-Key"] = API_KEY
h["X-Namespace-ID"] = NS_ID
return h
def post(path, body):
url = f"{SERVER_URL}{path}"
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, headers=headers(), method="POST")
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
def get(path):
url = f"{SERVER_URL}{path}"
req = urllib.request.Request(url, headers=headers(), method="GET")
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
def sync_request(local_version, supports_delta=False, oplog_min_version=None):
url = f"{SERVER_URL}/api/sync"
body = {
"local_versions": {NS_ID: local_version},
"logs": [],
"corrections": [],
"library_version": "bench/0.2.0",
}
if supports_delta:
body["supports_delta"] = True
if oplog_min_version is not None:
body["oplog_min_version"] = oplog_min_version
data = json.dumps(body).encode()
h = {"Content-Type": "application/json"}
if API_KEY:
h["X-Api-Key"] = API_KEY
req = urllib.request.Request(url, data=data, headers=h, method="POST")
with urllib.request.urlopen(req) as r:
raw = r.read()
return raw, json.loads(raw)
def create_namespace():
try:
get("/api/namespaces")
except urllib.error.HTTPError:
pass
try:
url = f"{SERVER_URL}/api/namespaces/{NS_ID}"
h = {"Content-Type": "application/json"}
if API_KEY:
h["X-Api-Key"] = API_KEY
req = urllib.request.Request(
url,
data=json.dumps({"name": "Delta Bandwidth Bench"}).encode(),
headers=h,
method="PUT",
)
urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
if e.code not in (200, 201, 409):
print(f"Warning: namespace create returned {e.code}")
def add_intent(intent_id, phrases):
post(
"/api/intents",
{
"id": intent_id,
"phrases": phrases,
"description": f"Benchmark intent {intent_id}",
},
)
def add_phrase(intent_id, phrase):
post("/api/phrases", {"intent_id": intent_id, "phrase": phrase, "lang": "en"})
def main():
print(f"Server: {SERVER_URL}")
print(f"Namespace: {NS_ID}")
print()
try:
urllib.request.urlopen(f"{SERVER_URL}/api/health", timeout=3)
except Exception as e:
print(f"ERROR: server not reachable at {SERVER_URL}: {e}")
print("Start the server: ./target/release/microresolve-studio --data /tmp/bench_data")
sys.exit(1)
print(f"Building baseline: {N_INTENTS} intents × {PHRASES_PER_INTENT} phrases ...")
create_namespace()
for i in range(N_INTENTS):
intent_id = f"bench:intent_{i:03d}"
phrases = [
f"phrase {i} variant {j} for testing delta sync bandwidth measurement"
for j in range(PHRASES_PER_INTENT)
]
add_intent(intent_id, phrases)
raw_full, resp_full = sync_request(local_version=0, supports_delta=False)
full_export_bytes = len(raw_full)
baseline_version = resp_full.get("namespaces", {}).get(NS_ID, {}).get("version", 0)
print(f"Baseline version: {baseline_version}")
print(f"Full export size: {full_export_bytes:,} bytes")
print(f"\nApplying {N_MUTATIONS} small mutations ...")
for i in range(N_MUTATIONS):
intent_id = f"bench:intent_{i % N_INTENTS:03d}"
phrase = f"mutation phrase {i} added during delta bandwidth test run"
add_phrase(intent_id, phrase)
raw_full_after, resp_full_after = sync_request(
local_version=baseline_version, supports_delta=False
)
full_after_bytes = len(raw_full_after)
server_version_after = (
resp_full_after.get("namespaces", {}).get(NS_ID, {}).get("version", 0)
)
print(f"\nAfter {N_MUTATIONS} mutations:")
print(f" Server version: {server_version_after}")
print(f" Full export (delta=false): {full_after_bytes:,} bytes")
raw_delta, resp_delta = sync_request(
local_version=baseline_version, supports_delta=True, oplog_min_version=baseline_version
)
delta_bytes = len(raw_delta)
ns_delta = resp_delta.get("namespaces", {}).get(NS_ID, {})
has_ops = "ops" in ns_delta
has_export_fallback = "export" in ns_delta
n_ops = len(ns_delta.get("ops", []))
print(f" Delta sync (delta=true): {delta_bytes:,} bytes")
print(f" Delta response has ops: {has_ops} ({n_ops} ops)")
print(f" Delta response is full export: {has_export_fallback}")
if full_after_bytes == 0:
print("\nWARN: full export size is 0 — server may have no data for this namespace")
sys.exit(1)
ratio = delta_bytes / full_after_bytes
print(f"\nDelta / Full ratio: {ratio:.3f} ({ratio * 100:.1f}%)")
print(f"Threshold: ≤ {DELTA_RATIO_THRESHOLD * 100:.0f}%")
if has_export_fallback:
print(
"\nINFO: server returned full export for delta request "
"(oplog too short — expected for first run after baseline)."
)
print(" Delta path is exercised once the oplog accumulates ops.")
print(" Re-run this bench AFTER the server has processed the mutations.")
print("\nPASS (full-export fallback is correct behaviour for cold oplog)")
return
if ratio <= DELTA_RATIO_THRESHOLD:
print(f"\nPASS — delta is {ratio * 100:.1f}% of full (≤ {DELTA_RATIO_THRESHOLD * 100:.0f}% target)")
else:
print(
f"\nFAIL — delta is {ratio * 100:.1f}% of full (exceeds {DELTA_RATIO_THRESHOLD * 100:.0f}% target)"
)
sys.exit(1)
if __name__ == "__main__":
main()