silk-graph 0.2.3

Merkle-CRDT graph engine for distributed, conflict-free knowledge graphs
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
"""Structural invariant checks (INV-1 through INV-6).

These tests enforce properties that must ALWAYS hold. They are the automated
enforcement layer above the formal proofs in PROOF.md. If any of these fail,
something fundamental is broken.

See INVARIANTS.md for the full specification.
"""

import os
import re
import random
import pytest
from silk import GraphStore


ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


# ---------------------------------------------------------------------------
# INV-1: Every PROOF.md invariant has a test
# ---------------------------------------------------------------------------

def test_proof_coverage():
    """Every invariant (I-xx) and theorem in PROOF.md must be referenced
    in at least one test file."""
    proof_path = os.path.join(ROOT, "PROOF.md")
    with open(proof_path) as f:
        proof_text = f.read()

    # Extract invariant identifiers: I-01, I-02, ..., I-06
    invariants = set(re.findall(r"\bI-0[1-9]\b", proof_text))
    # Extract theorem identifiers: Theorem 1, Theorem 2, Theorem 3
    theorems = set(re.findall(r"Theorem [1-9]", proof_text))

    assert len(invariants) >= 6, f"Expected at least 6 invariants, found {invariants}"
    assert len(theorems) >= 3, f"Expected at least 3 theorems, found {theorems}"

    # Scan all test files for references
    test_dir = os.path.join(ROOT, "pytests")
    test_content = ""
    for fname in os.listdir(test_dir):
        if fname.startswith("test_") and fname.endswith(".py"):
            with open(os.path.join(test_dir, fname)) as f:
                test_content += f.read()

    missing_invariants = []
    for inv in sorted(invariants):
        if inv not in test_content:
            missing_invariants.append(inv)

    missing_theorems = []
    for thm in sorted(theorems):
        if thm not in test_content:
            missing_theorems.append(thm)

    assert not missing_invariants, (
        f"PROOF.md invariants without test coverage: {missing_invariants}. "
        f"Add tests that reference these identifiers."
    )
    assert not missing_theorems, (
        f"PROOF.md theorems without test coverage: {missing_theorems}. "
        f"Add tests that reference these identifiers."
    )


# ---------------------------------------------------------------------------
# INV-2: Serialization determinism
# ---------------------------------------------------------------------------

def test_serialization_determinism():
    """Identical entries must serialize to identical bytes every time.
    Foundation of content addressing — non-deterministic serialization
    would break hash matching across peers."""
    store = GraphStore("det-test", {
        "node_types": {
            "server": {
                "properties": {
                    "name": {"value_type": "string", "required": True},
                    "port": {"value_type": "int"},
                    "tags": {"value_type": "list"},
                }
            }
        },
        "edge_types": {
            "CONNECTS": {
                "source_types": ["server"],
                "target_types": ["server"],
            }
        }
    })

    # Create entries with various property types
    store.add_node("s1", "server", "Alpha", {
        "name": "alpha",
        "port": 8080,
        "tags": ["prod", "us-east"],
    })
    store.add_node("s2", "server", "Beta", {"name": "beta", "port": 443})
    store.add_edge("e1", "CONNECTS", "s1", "s2")
    store.update_property("s1", "port", 9090)

    # Take a snapshot, serialize it
    snap_bytes = store.snapshot()

    # Serialize the same state 100 times — all must be identical
    for i in range(100):
        snap_bytes_again = store.snapshot()
        assert snap_bytes == snap_bytes_again, (
            f"Serialization non-determinism detected on iteration {i}"
        )

    # Round-trip: deserialize and re-serialize must be stable
    store2 = GraphStore.from_snapshot("det-test-2", snap_bytes)

    # Both stores should have same graph
    assert sorted([n["node_id"] for n in store.all_nodes()]) == \
           sorted([n["node_id"] for n in store2.all_nodes()])


# ---------------------------------------------------------------------------
# INV-3: Sync convergence for random entry orderings
# ---------------------------------------------------------------------------

def _random_store(seed, store_id, ontology):
    """Create a store with random nodes, edges, and property updates."""
    rng = random.Random(seed)
    store = GraphStore(store_id, ontology)
    node_ids = []

    # Random nodes
    num_nodes = rng.randint(3, 15)
    for i in range(num_nodes):
        nid = f"n-{store_id}-{i}"
        props = {"name": f"node-{i}", "value": rng.randint(0, 100)}
        store.add_node(nid, "entity", f"Label-{i}", props)
        node_ids.append(nid)

    # Random edges
    if len(node_ids) >= 2:
        num_edges = rng.randint(1, min(10, len(node_ids)))
        for i in range(num_edges):
            src = rng.choice(node_ids)
            tgt = rng.choice(node_ids)
            if src != tgt:
                store.add_edge(f"e-{store_id}-{i}", "LINKS", src, tgt)

    # Random property updates
    num_updates = rng.randint(0, 8)
    for _ in range(num_updates):
        nid = rng.choice(node_ids)
        store.update_property(nid, "value", rng.randint(0, 1000))

    # Random removals
    if rng.random() < 0.3 and len(node_ids) > 2:
        store.remove_node(rng.choice(node_ids))

    return store


def test_sync_convergence_randomized():
    """Theorem 3: bidirectional sync must converge for any entry ordering.
    Runs 20 randomized scenarios."""
    ontology = {
        "node_types": {
            "entity": {
                "properties": {
                    "name": {"value_type": "string"},
                    "value": {"value_type": "int"},
                }
            }
        },
        "edge_types": {
            "LINKS": {
                "source_types": ["entity"],
                "target_types": ["entity"],
            }
        }
    }

    for scenario in range(20):
        seed_a = scenario * 1000
        seed_b = scenario * 1000 + 500

        store_a = _random_store(seed_a, f"a-{scenario}", ontology)
        store_b = _random_store(seed_b, f"b-{scenario}", ontology)

        # Bidirectional sync: A→B then B→A
        offer_b = store_b.generate_sync_offer()
        payload_a = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload_a)

        offer_a = store_a.generate_sync_offer()
        payload_b = store_b.receive_sync_offer(offer_a)
        store_a.merge_sync_payload(payload_b)

        # Verify convergence
        nodes_a = sorted([n["node_id"] for n in store_a.all_nodes()])
        nodes_b = sorted([n["node_id"] for n in store_b.all_nodes()])
        assert nodes_a == nodes_b, (
            f"Scenario {scenario}: node sets diverged. "
            f"A has {len(nodes_a)}, B has {len(nodes_b)}"
        )

        edges_a = sorted([e["edge_id"] for e in store_a.all_edges()])
        edges_b = sorted([e["edge_id"] for e in store_b.all_edges()])
        assert edges_a == edges_b, (
            f"Scenario {scenario}: edge sets diverged."
        )

        # Property values must match
        for nid in nodes_a:
            na = store_a.get_node(nid)
            nb = store_b.get_node(nid)
            assert na["properties"] == nb["properties"], (
                f"Scenario {scenario}: properties diverged for {nid}"
            )


# ---------------------------------------------------------------------------
# INV-4: Every write path validates against the ontology
# ---------------------------------------------------------------------------

# All GraphOp variants and their validation status.
# When a new variant is added, this test forces the developer to classify it.
VALIDATED_OPS = {"AddNode", "AddEdge", "UpdateProperty"}
SKIP_VALIDATION_OPS = {"RemoveNode", "RemoveEdge", "DefineOntology", "ExtendOntology", "Checkpoint", "DefineLens"}
ALL_KNOWN_OPS = VALIDATED_OPS | SKIP_VALIDATION_OPS


def test_all_graph_ops_have_validation_path():
    """Every GraphOp variant must be either validated or explicitly skipped.
    If a new variant is added to entry.rs without updating this list,
    this test fails — forcing a conscious decision about validation."""

    # Read the Rust source to find all GraphOp variants
    entry_rs = os.path.join(ROOT, "src", "entry.rs")
    with open(entry_rs) as f:
        content = f.read()

    # Extract variant names from serde rename attributes
    variants = set(re.findall(r'#\[serde\(rename = "(\w+)"\)\]', content))

    # Convert snake_case to PascalCase for comparison
    def to_pascal(s):
        return "".join(word.capitalize() for word in s.split("_"))

    actual_variants = {to_pascal(v) for v in variants}

    unknown = actual_variants - ALL_KNOWN_OPS
    assert not unknown, (
        f"New GraphOp variants found without validation classification: {unknown}. "
        f"Add them to VALIDATED_OPS or SKIP_VALIDATION_OPS in test_invariants.py."
    )

    removed = ALL_KNOWN_OPS - actual_variants
    assert not removed, (
        f"GraphOp variants in test list but not in entry.rs: {removed}. "
        f"Remove them from the classification lists."
    )


def test_validated_ops_actually_validate():
    """Ops in VALIDATED_OPS must actually reject invalid input."""
    store = GraphStore("val-test", {
        "node_types": {
            "server": {
                "properties": {
                    "name": {"value_type": "string", "required": True},
                    "port": {"value_type": "int"},
                }
            }
        },
        "edge_types": {
            "CONNECTS": {
                "source_types": ["server"],
                "target_types": ["server"],
            }
        }
    })

    # AddNode: wrong type rejects
    with pytest.raises((ValueError, Exception)):
        store.add_node("x", "nonexistent_type", "X", {})

    # AddEdge: wrong edge type rejects
    store.add_node("s1", "server", "S1", {"name": "s1"})
    store.add_node("s2", "server", "S2", {"name": "s2"})
    with pytest.raises((ValueError, Exception)):
        store.add_edge("e1", "NONEXISTENT", "s1", "s2")

    # UpdateProperty: wrong value type rejects
    with pytest.raises((ValueError, Exception)):
        store.update_property("s1", "port", "not_an_int")


# ---------------------------------------------------------------------------
# INV-5: Version consistency
# ---------------------------------------------------------------------------

def test_version_consistency():
    """Cargo.toml, pyproject.toml, and CHANGELOG.md must all agree on version."""
    # Cargo.toml
    cargo_path = os.path.join(ROOT, "Cargo.toml")
    with open(cargo_path) as f:
        cargo = f.read()
    cargo_match = re.search(r'^version\s*=\s*"([^"]+)"', cargo, re.MULTILINE)
    assert cargo_match, "No version found in Cargo.toml"
    cargo_version = cargo_match.group(1)

    # pyproject.toml
    pyproject_path = os.path.join(ROOT, "pyproject.toml")
    with open(pyproject_path) as f:
        pyproject = f.read()
    py_match = re.search(r'^version\s*=\s*"([^"]+)"', pyproject, re.MULTILINE)
    assert py_match, "No version found in pyproject.toml"
    py_version = py_match.group(1)

    # CHANGELOG.md — latest version entry
    changelog_path = os.path.join(ROOT, "CHANGELOG.md")
    with open(changelog_path) as f:
        changelog = f.read()
    cl_match = re.search(r"^## \[([^\]]+)\]", changelog, re.MULTILINE)
    # Skip [Unreleased] if present
    cl_versions = re.findall(r"^## \[([^\]]+)\]", changelog, re.MULTILINE)
    cl_versions = [v for v in cl_versions if v != "Unreleased"]
    assert cl_versions, "No version entries in CHANGELOG.md"
    changelog_version = cl_versions[0]

    assert cargo_version == py_version, (
        f"Version mismatch: Cargo.toml={cargo_version}, pyproject.toml={py_version}"
    )
    assert cargo_version == changelog_version, (
        f"Version mismatch: Cargo.toml={cargo_version}, CHANGELOG.md={changelog_version}"
    )


# ---------------------------------------------------------------------------
# INV-6: Oplog integrity verification
# ---------------------------------------------------------------------------

def test_verify_integrity():
    """store.verify_integrity() checks I-01, I-02, I-04 on the full oplog."""
    store = GraphStore("int-test", {
        "node_types": {
            "entity": {
                "properties": {"name": {"value_type": "string"}}
            }
        },
        "edge_types": {
            "LINKS": {
                "source_types": ["entity"],
                "target_types": ["entity"],
            }
        }
    })

    # Build up some state
    store.add_node("n1", "entity", "A", {"name": "alpha"})
    store.add_node("n2", "entity", "B", {"name": "beta"})
    store.add_edge("e1", "LINKS", "n1", "n2")
    store.update_property("n1", "name", "alpha-updated")

    # Integrity should be clean
    ok, errors = store.verify_integrity()
    assert ok, f"Integrity check failed on clean store: {errors}"
    assert errors == []


def test_verify_integrity_after_sync():
    """Integrity holds after bidirectional sync."""
    ontology = {
        "node_types": {
            "entity": {
                "properties": {"name": {"value_type": "string"}}
            }
        },
        "edge_types": {}
    }

    store_a = GraphStore("a", ontology)
    store_b = GraphStore("b", ontology)

    store_a.add_node("n1", "entity", "A", {"name": "from-a"})
    store_b.add_node("n2", "entity", "B", {"name": "from-b"})

    # Bidirectional sync
    offer = store_b.generate_sync_offer()
    payload = store_a.receive_sync_offer(offer)
    store_b.merge_sync_payload(payload)

    offer = store_a.generate_sync_offer()
    payload = store_b.receive_sync_offer(offer)
    store_a.merge_sync_payload(payload)

    # Both should pass integrity
    ok_a, errors_a = store_a.verify_integrity()
    ok_b, errors_b = store_b.verify_integrity()
    assert ok_a, f"Store A integrity failed after sync: {errors_a}"
    assert ok_b, f"Store B integrity failed after sync: {errors_b}"


def test_verify_integrity_after_compaction():
    """Integrity holds after compaction (I-01, I-02, I-04 still valid)."""
    store = GraphStore("comp-test", {
        "node_types": {
            "entity": {
                "properties": {"name": {"value_type": "string"}}
            }
        },
        "edge_types": {}
    })

    for i in range(10):
        store.add_node(f"n{i}", "entity", f"Node {i}", {"name": f"name-{i}"})

    store.compact()

    ok, errors = store.verify_integrity()
    assert ok, f"Integrity failed after compaction: {errors}"


# ---------------------------------------------------------------------------
# Cross-references to PROOF.md identifiers
# (Required by INV-1 — ensures grep finds them)
# ---------------------------------------------------------------------------
# I-01: Hash integrity — tested by test_verify_integrity (re-hashes every entry)
# I-02: Causal completeness — tested by test_verify_integrity (checks parent existence)
# I-03: Append-only — structural (HashMap insert-only, no delete API)
# I-04: Heads accuracy — tested by test_verify_integrity (recomputes heads)
# I-05: Topological determinism — tested by test_sync_convergence_randomized (same topo → same graph)
# I-06: Quarantine monotonicity — tested in test_quarantine.py
# Theorem 1: Deterministic materialization — tested by test_sync_convergence_randomized
# Theorem 2: Idempotent merge — tested in test_sync.py::test_sync_is_idempotent
# Theorem 3: Convergence — tested by test_sync_convergence_randomized