silk-graph 0.2.3

Merkle-CRDT graph engine for distributed, conflict-free knowledge graphs
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
"""Python tests for Silk sync protocol (S-3).

Tests the sync API from Python:
- generate_sync_offer / receive_sync_offer / merge_sync_payload round-trip
- snapshot / from_snapshot bootstrap
- Bidirectional sync with graph convergence
- Conflict resolution after sync (LWW, add-wins)
"""

import json
import pytest

from silk import GraphStore


ONTOLOGY = json.dumps(
    {
        "node_types": {
            "entity": {
                "description": "A managed thing",
                "properties": {
                    "status": {"value_type": "string"},
                    "cpu": {"value_type": "float"},
                },
            },
            "signal": {
                "description": "An observed fact",
                "properties": {
                    "severity": {"value_type": "string", "required": True},
                },
            },
        },
        "edge_types": {
            "RUNS_ON": {
                "source_types": ["entity"],
                "target_types": ["entity"],
                "properties": {},
            },
            "OBSERVES": {
                "source_types": ["signal"],
                "target_types": ["entity"],
                "properties": {},
            },
        },
    }
)


# -- Fixtures --


def make_store(instance_id: str) -> GraphStore:
    return GraphStore(instance_id, ONTOLOGY)


# -- Sync offer/payload round-trip --


class TestSyncProtocol:
    def test_sync_offer_is_bytes(self):
        store = make_store("inst-a")
        offer = store.generate_sync_offer()
        assert isinstance(offer, bytes)
        assert len(offer) > 0

    def test_receive_sync_offer_is_bytes(self):
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")
        offer_a = store_a.generate_sync_offer()
        payload = store_b.receive_sync_offer(offer_a)
        assert isinstance(payload, bytes)

    def test_sync_a_to_b(self):
        """A has nodes B doesn't. After sync, B has them."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        store_a.add_node("s1", "entity", "Server 1", {"status": "alive"})
        store_a.add_node("s2", "entity", "Server 2", {"status": "dead"})

        # Sync: B offers → A computes payload → B merges
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        merged = store_b.merge_sync_payload(payload)

        assert merged >= 2  # at least s1 and s2
        assert store_b.get_node("s1") is not None
        assert store_b.get_node("s2") is not None
        assert store_b.get_node("s1")["properties"]["status"] == "alive"

    def test_sync_bidirectional_convergence(self):
        """A and B each have unique nodes. After bidirectional sync, both converge."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        store_a.add_node("a1", "entity", "Node from A")
        store_b.add_node("b1", "entity", "Node from B")

        # Sync A → B
        offer_b = store_b.generate_sync_offer()
        payload_a_to_b = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload_a_to_b)

        # Sync B → A
        offer_a = store_a.generate_sync_offer()
        payload_b_to_a = store_b.receive_sync_offer(offer_a)
        store_a.merge_sync_payload(payload_b_to_a)

        # Both should have both nodes
        assert store_a.get_node("a1") is not None
        assert store_a.get_node("b1") is not None
        assert store_b.get_node("a1") is not None
        assert store_b.get_node("b1") is not None

    def test_sync_is_idempotent(self):
        """Syncing twice produces the same result as syncing once."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        store_a.add_node("n1", "entity", "Node 1")

        # First sync
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        merged1 = store_b.merge_sync_payload(payload)
        assert merged1 >= 1

        len_after_first = store_b.len()

        # Second sync — should be no-op
        offer_b2 = store_b.generate_sync_offer()
        payload2 = store_a.receive_sync_offer(offer_b2)
        merged2 = store_b.merge_sync_payload(payload2)
        assert merged2 == 0
        assert store_b.len() == len_after_first

    def test_sync_with_edges(self):
        """Edges sync correctly along with their endpoint nodes."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        store_a.add_node("svc", "entity", "API Service")
        store_a.add_node("srv", "entity", "Server")
        store_a.add_edge("e1", "RUNS_ON", "svc", "srv")

        # Sync A → B
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload)

        # B should have the edge and both nodes
        assert store_b.get_node("svc") is not None
        assert store_b.get_node("srv") is not None
        assert store_b.get_edge("e1") is not None
        edges = store_b.all_edges()
        assert len(edges) == 1
        assert edges[0]["edge_type"] == "RUNS_ON"

    def test_sync_graph_queries_work_after_merge(self):
        """Graph queries (BFS, shortest path) work on merged data."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        store_a.add_node("a", "entity", "A")
        store_a.add_node("b", "entity", "B")
        store_a.add_node("c", "entity", "C")
        store_a.add_edge("ab", "RUNS_ON", "a", "b")
        store_a.add_edge("bc", "RUNS_ON", "b", "c")

        # Sync to B
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload)

        # BFS from A should reach B and C
        reachable = store_b.bfs("a")
        assert "b" in reachable
        assert "c" in reachable

        # Shortest path from A to C
        path = store_b.shortest_path("a", "c")
        assert path is not None
        assert path == ["a", "b", "c"]


# -- Snapshot --


class TestSnapshot:
    def test_snapshot_roundtrip(self):
        """Snapshot and from_snapshot produce equivalent stores."""
        store_a = make_store("inst-a")
        store_a.add_node("s1", "entity", "Server 1", {"status": "alive"})
        store_a.add_node("s2", "entity", "Server 2")
        store_a.add_edge("e1", "RUNS_ON", "s1", "s2")

        snap_bytes = store_a.snapshot()
        assert isinstance(snap_bytes, bytes)
        assert len(snap_bytes) > 0

        store_b = GraphStore.from_snapshot("inst-b", snap_bytes)

        # B should have all nodes and edges
        assert store_b.get_node("s1") is not None
        assert store_b.get_node("s2") is not None
        assert store_b.get_edge("e1") is not None
        assert store_b.get_node("s1")["properties"]["status"] == "alive"

    def test_snapshot_then_delta_sync(self):
        """After snapshot bootstrap, delta sync works for new entries."""
        store_a = make_store("inst-a")
        store_a.add_node("s1", "entity", "Server 1")

        # Bootstrap B from snapshot
        snap = store_a.snapshot()
        store_b = GraphStore.from_snapshot("inst-b", snap)

        # A adds more entries
        store_a.add_node("s2", "entity", "Server 2")

        # Delta sync: B offers → A computes → B merges
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        merged = store_b.merge_sync_payload(payload)

        assert merged >= 1
        assert store_b.get_node("s2") is not None

    def test_snapshot_preserves_ontology(self):
        """Ontology is preserved across snapshot bootstrap."""
        store_a = make_store("inst-a")
        snap = store_a.snapshot()
        store_b = GraphStore.from_snapshot("inst-b", snap)

        # Should have the same ontology
        assert store_b.node_type_names() == store_a.node_type_names()
        assert store_b.edge_type_names() == store_a.edge_type_names()

        # Should enforce the same ontology rules
        with pytest.raises(ValueError):
            store_b.add_node("x", "potato", "Bad type")

    def test_snapshot_graph_algorithms(self):
        """Graph algorithms work on stores bootstrapped from snapshot."""
        store_a = make_store("inst-a")
        store_a.add_node("a", "entity", "A")
        store_a.add_node("b", "entity", "B")
        store_a.add_edge("ab", "RUNS_ON", "a", "b")

        store_b = GraphStore.from_snapshot("inst-b", store_a.snapshot())

        path = store_b.shortest_path("a", "b")
        assert path == ["a", "b"]

        reachable = store_b.bfs("a")
        assert "b" in reachable


# -- Conflict resolution after sync --


class TestSyncConflictResolution:
    def test_lww_concurrent_property_update(self):
        """LWW resolves concurrent property updates after sync."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        # Both start with the same node (via sync)
        store_a.add_node("s1", "entity", "Server 1")
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload)

        # Both update the same property independently
        store_a.update_property("s1", "status", "alive")  # A's clock is ahead or tied
        store_b.update_property("s1", "status", "dead")  # B's clock

        # Sync A → B and B → A
        offer_b2 = store_b.generate_sync_offer()
        payload_a = store_a.receive_sync_offer(offer_b2)
        store_b.merge_sync_payload(payload_a)

        offer_a = store_a.generate_sync_offer()
        payload_b = store_b.receive_sync_offer(offer_a)
        store_a.merge_sync_payload(payload_b)

        # Both should converge to the same value (LWW)
        val_a = store_a.get_node("s1")["properties"]["status"]
        val_b = store_b.get_node("s1")["properties"]["status"]
        assert val_a == val_b  # same value, regardless of which "won"

    def test_add_wins_after_sync(self):
        """Add-wins: concurrent add + remove → node exists after sync."""
        store_a = make_store("inst-a")
        store_b = make_store("inst-b")

        # A creates and then removes a node
        store_a.add_node("s1", "entity", "Server 1")

        # Sync to B first
        offer_b = store_b.generate_sync_offer()
        payload = store_a.receive_sync_offer(offer_b)
        store_b.merge_sync_payload(payload)

        # A removes the node
        store_a.remove_node("s1")

        # B re-adds the node (concurrent with A's remove)
        store_b.add_node("s1", "entity", "Server 1 resurrected")

        # Sync B → A: A learns what B has
        offer_a = store_a.generate_sync_offer()
        payload_for_a = store_b.receive_sync_offer(offer_a)
        store_a.merge_sync_payload(payload_for_a)

        # Sync A → B: B learns what A has
        offer_b2 = store_b.generate_sync_offer()
        payload_for_b = store_a.receive_sync_offer(offer_b2)
        store_b.merge_sync_payload(payload_for_b)

        # Both should have the node (add-wins)
        assert store_a.get_node("s1") is not None
        assert store_b.get_node("s1") is not None


# -- Edge validation during sync --


class TestEdgeValidationOnSync:
    """Verify that edge source/target type constraints are enforced during sync,
    even when entries arrive from a peer with a different ontology."""

    def test_invalid_edge_quarantined_on_sync(self):
        """An edge with wrong source/target types is quarantined on the
        receiving peer, not silently accepted."""
        # Peer A: permissive ontology allows server -> entity edges
        permissive = {
            "node_types": {
                "server": {"properties": {}},
                "app": {"properties": {}},
            },
            "edge_types": {
                "RUNS_ON": {
                    "source_types": ["server", "app"],
                    "target_types": ["server", "app"],
                },
            },
        }
        a = GraphStore("peer-a", permissive)
        a.add_node("s1", "server", "Server")
        a.add_node("a1", "app", "App")
        a.add_edge("bad", "RUNS_ON", "s1", "a1")  # server -> app

        # Peer B: strict ontology — only app -> server
        strict = {
            "node_types": {
                "server": {"properties": {}},
                "app": {"properties": {}},
            },
            "edge_types": {
                "RUNS_ON": {
                    "source_types": ["app"],
                    "target_types": ["server"],
                },
            },
        }
        b = GraphStore("peer-b", strict)

        # Sync A → B
        offer = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer)
        b.merge_sync_payload(payload)

        # Edge should be quarantined on B (wrong direction for B's ontology)
        assert b.get_edge("bad") is None, "invalid edge should not be queryable"
        assert len(b.get_quarantined()) > 0, "invalid edge should be quarantined"
        # Nodes should be present (they're valid)
        assert b.get_node("s1") is not None
        assert b.get_node("a1") is not None

    def test_valid_edge_survives_sync(self):
        """A valid edge is materialized correctly after sync."""
        ont = {
            "node_types": {"server": {"properties": {}}, "app": {"properties": {}}},
            "edge_types": {
                "RUNS_ON": {
                    "source_types": ["app"],
                    "target_types": ["server"],
                },
            },
        }
        a = GraphStore("peer-a", ont)
        a.add_node("s1", "server", "Server")
        a.add_node("a1", "app", "App")
        a.add_edge("e1", "RUNS_ON", "a1", "s1")  # app -> server (valid)

        b = GraphStore("peer-b", ont)
        offer = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer)
        b.merge_sync_payload(payload)

        assert b.get_edge("e1") is not None
        assert b.get_edge("e1")["source_id"] == "a1"
        assert b.get_edge("e1")["target_id"] == "s1"
        assert len(b.get_quarantined()) == 0

    def test_nodes_before_edges_in_topological_order(self):
        """Topological ordering guarantees nodes are materialized before
        their edges during sync. This test verifies the ordering by
        checking that edges are validated against existing node types."""
        ont = {
            "node_types": {"server": {"properties": {}}, "app": {"properties": {}}},
            "edge_types": {
                "RUNS_ON": {
                    "source_types": ["app"],
                    "target_types": ["server"],
                },
            },
        }
        # Build a graph with many nodes and edges
        a = GraphStore("peer-a", ont)
        for i in range(50):
            a.add_node(f"s-{i}", "server", f"Server {i}")
            a.add_node(f"a-{i}", "app", f"App {i}")
        for i in range(50):
            a.add_edge(f"e-{i}", "RUNS_ON", f"a-{i}", f"s-{i}")

        # Sync to empty peer
        b = GraphStore("peer-b", ont)
        offer = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer)
        b.merge_sync_payload(payload)

        # All 50 edges should be present (none quarantined)
        assert len(b.get_quarantined()) == 0
        for i in range(50):
            edge = b.get_edge(f"e-{i}")
            assert edge is not None, f"edge e-{i} missing after sync"
            assert edge["source_id"] == f"a-{i}"
            assert edge["target_id"] == f"s-{i}"


# -- HLC tie-breaking --


class TestHLCTieBreaking:
    """Verify that HLC tie-breaking is deterministic and documented."""

    def test_lower_instance_id_wins_tie(self):
        """When two peers write the same property at the same logical time,
        the peer with the lexicographically lower instance_id wins."""
        ont = {
            "node_types": {"entity": {"properties": {"value": {"value_type": "string"}}}},
            "edge_types": {},
        }
        # Create shared base
        base = GraphStore("base", ont)
        base.add_node("n1", "entity", "Node", {"value": "original"})

        # Fork to two peers with known instance IDs
        a = GraphStore.from_snapshot("aaa-peer", base.snapshot())
        b = GraphStore.from_snapshot("zzz-peer", base.snapshot())

        # Both update the same property (concurrent)
        a.update_property("n1", "value", "from-aaa")
        b.update_property("n1", "value", "from-zzz")

        # Sync bidirectionally
        offer = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer)
        b.merge_sync_payload(payload)

        offer = a.generate_sync_offer()
        payload = b.receive_sync_offer(offer)
        a.merge_sync_payload(payload)

        # Both must agree
        val_a = a.get_node("n1")["properties"]["value"]
        val_b = b.get_node("n1")["properties"]["value"]
        assert val_a == val_b, f"peers diverged: a={val_a}, b={val_b}"

        # The winner is deterministic — lower instance_id wins ties.
        # If clocks are equal, "aaa-peer" < "zzz-peer", so aaa wins.
        # But clocks may not be exactly equal (HLC advances), so we
        # only assert convergence, not which peer won.
        # The point: both agree, deterministically.


# -- Multi-subtype sync (SA-033 investigation) --


class TestMultiSubtypeSync:
    """Verify that ALL subtypes transfer during sync, not just some.

    Reproduces the partial sync bug observed in production:
    Entity(instance) nodes synced between peers but Entity(capability),
    Entity(k8s_cluster), and Rule(guardrail) nodes did not.
    See shelob/docs/silk-sync-investigation.md.
    """

    RICH_ONTOLOGY = json.dumps({
        "node_types": {
            "entity": {
                "properties": {},
                "subtypes": {
                    "instance": {"properties": {
                        "host": {"value_type": "string"},
                        "priority": {"value_type": "int"},
                    }},
                    "capability": {"properties": {
                        "name": {"value_type": "string"},
                        "role": {"value_type": "string"},
                        "status": {"value_type": "string"},
                    }},
                    "k8s_cluster": {"properties": {
                        "name": {"value_type": "string"},
                        "server_url": {"value_type": "string"},
                    }},
                },
            },
            "signal": {
                "properties": {},
                "subtypes": {
                    "alert": {"properties": {
                        "severity": {"value_type": "string"},
                    }},
                },
            },
            "rule": {
                "properties": {},
                "subtypes": {
                    "guardrail": {"properties": {
                        "scope": {"value_type": "string"},
                        "check_type": {"value_type": "string"},
                    }},
                },
            },
        },
        "edge_types": {
            "RUNS_ON": {
                "source_types": ["entity"],
                "target_types": ["entity"],
                "properties": {},
            },
            "DEPENDS_ON": {
                "source_types": ["entity"],
                "target_types": ["entity"],
                "properties": {},
            },
        },
    })

    def _make(self, instance_id: str) -> GraphStore:
        return GraphStore(instance_id, self.RICH_ONTOLOGY)

    def test_all_subtypes_sync_in_one_round(self):
        """A has instance + capability + k8s_cluster + guardrail.
        After one sync round, B has all of them."""
        a = self._make("leader")
        b = self._make("joiner")

        # Leader: rich KG with multiple subtypes
        a.add_node("inst-a", "entity", "leader", {"host": "10.0.0.1", "priority": 100}, subtype="instance")
        a.add_node("cap-runtime", "entity", "K3s", {"name": "K3s", "role": "container_runtime", "status": "installed"}, subtype="capability")
        a.add_node("cap-gw", "entity", "nginx", {"name": "nginx", "role": "gateway", "status": "installed"}, subtype="capability")
        a.add_node("cluster-k3s", "entity", "k3s", {"name": "k3s", "server_url": "https://10.0.0.1:6443"}, subtype="k8s_cluster")
        a.add_node("guard-1", "rule", "self-model", {"scope": "update", "check_type": "pre_flight"}, subtype="guardrail")
        a.add_edge("cap-runtime-RUNS_ON-inst-a", "RUNS_ON", "cap-runtime", "inst-a")
        a.add_edge("cap-gw-DEPENDS_ON-cap-runtime", "DEPENDS_ON", "cap-gw", "cap-runtime")

        # Joiner: minimal KG
        b.add_node("inst-b", "entity", "joiner", {"host": "10.0.0.2", "priority": 50}, subtype="instance")

        # Sync: B offers → A responds → B merges
        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload)

        # Reverse: A offers → B responds → A merges
        offer_a = a.generate_sync_offer()
        payload2 = b.receive_sync_offer(offer_a)
        a.merge_sync_payload(payload2)

        # B must have ALL nodes from A
        assert b.get_node("inst-a") is not None, "instance node not synced"
        assert b.get_node("cap-runtime") is not None, "capability node not synced"
        assert b.get_node("cap-gw") is not None, "capability node not synced"
        assert b.get_node("cluster-k3s") is not None, "k8s_cluster node not synced"
        assert b.get_node("guard-1") is not None, "guardrail node not synced"

        # Edges must sync too
        assert b.get_edge("cap-runtime-RUNS_ON-inst-a") is not None, "RUNS_ON edge not synced"
        assert b.get_edge("cap-gw-DEPENDS_ON-cap-runtime") is not None, "DEPENDS_ON edge not synced"

        # A must have B's node
        assert a.get_node("inst-b") is not None, "reverse sync failed"

    def test_subtype_properties_preserved_after_sync(self):
        """Synced nodes retain their subtype and property values."""
        a = self._make("alpha")
        b = self._make("beta")

        a.add_node("cap-1", "entity", "Docker", {"name": "Docker", "role": "container_runtime", "status": "installed"}, subtype="capability")
        a.add_node("guard-2", "rule", "disk check", {"scope": "deploy", "check_type": "post_flight"}, subtype="guardrail")

        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload)

        cap = b.get_node("cap-1")
        assert cap is not None
        assert cap["subtype"] == "capability"
        assert cap["properties"]["name"] == "Docker"
        assert cap["properties"]["role"] == "container_runtime"
        assert cap["properties"]["status"] == "installed"

        guard = b.get_node("guard-2")
        assert guard is not None
        assert guard["subtype"] == "guardrail"
        assert guard["properties"]["scope"] == "deploy"

    def test_many_nodes_all_subtypes_converge(self):
        """Stress: 50 nodes across 5 subtypes all converge after sync."""
        a = self._make("source")
        b = self._make("dest")

        for i in range(10):
            a.add_node(f"inst-{i}", "entity", f"inst-{i}", {"host": f"10.0.0.{i}", "priority": i}, subtype="instance")
            a.add_node(f"cap-{i}", "entity", f"cap-{i}", {"name": f"cap-{i}", "role": "test", "status": "installed"}, subtype="capability")
            a.add_node(f"cluster-{i}", "entity", f"cluster-{i}", {"name": f"c-{i}", "server_url": f"https://10.0.0.{i}:6443"}, subtype="k8s_cluster")
            a.add_node(f"alert-{i}", "signal", f"alert-{i}", {"severity": "warning"}, subtype="alert")
            a.add_node(f"guard-{i}", "rule", f"guard-{i}", {"scope": "update", "check_type": "pre_flight"}, subtype="guardrail")

        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        merged = b.merge_sync_payload(payload)

        assert merged > 0
        for i in range(10):
            assert b.get_node(f"inst-{i}") is not None, f"inst-{i} missing"
            assert b.get_node(f"cap-{i}") is not None, f"cap-{i} missing"
            assert b.get_node(f"cluster-{i}") is not None, f"cluster-{i} missing"
            assert b.get_node(f"alert-{i}") is not None, f"alert-{i} missing"
            assert b.get_node(f"guard-{i}") is not None, f"guard-{i} missing"


# -- Genesis divergence (SA-033 root cause investigation) --


class TestGenesisDivergence:
    """Test sync behavior when two stores have different genesis entries.

    In production, two Shelob instances created with the same ontology but
    different instance_ids produce different genesis hashes (the author field
    is the instance_id). This means their DAGs have no common ancestor.

    Silk must either:
    a) Converge despite divergent genesis (cross-DAG merge), or
    b) Reject the sync with a clear error (incompatible stores).

    Silently dropping entries is not acceptable.
    """

    SIMPLE_ONT = json.dumps({
        "node_types": {
            "entity": {"properties": {"status": {"value_type": "string"}}},
        },
        "edge_types": {},
    })

    def test_different_instance_ids_produce_different_genesis(self):
        """Two stores with same ontology but different instance_ids
        have different genesis hashes (because author differs)."""
        a = GraphStore("inst-a", self.SIMPLE_ONT)
        b = GraphStore("inst-b", self.SIMPLE_ONT)

        # Introspect: different instance_ids → different genesis
        entries_a = a.entries_since(None)
        entries_b = b.entries_since(None)

        # Both have exactly 1 entry (genesis)
        assert len(entries_a) == 1
        assert len(entries_b) == 1

        # The genesis hashes differ because author is different
        hash_a = entries_a[0]["hash"]
        hash_b = entries_b[0]["hash"]
        # This documents the reality — whether they match or differ
        # determines if sync can work between independent stores.
        if hash_a == hash_b:
            # Same genesis → sync should work (shared root)
            pass
        else:
            # Different genesis → this is the root cause
            # Sync has no common ancestor, entries_missing may fail silently
            pass

    def test_sync_between_independent_stores_transfers_data(self):
        """Two independently-created stores must converge after sync.

        This is the production scenario: gamma and delta are created
        separately, each from their own seed. They must still sync.
        """
        a = GraphStore("inst-a", self.SIMPLE_ONT)
        b = GraphStore("inst-b", self.SIMPLE_ONT)

        a.add_node("node-a", "entity", "from A", {"status": "active"})
        b.add_node("node-b", "entity", "from B", {"status": "active"})

        # Sync A → B (B sends offer, A responds with payload, B merges)
        offer_b = b.generate_sync_offer()
        payload_a_to_b = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload_a_to_b)

        # Sync B → A
        offer_a = a.generate_sync_offer()
        payload_b_to_a = b.receive_sync_offer(offer_a)
        a.merge_sync_payload(payload_b_to_a)

        # Both must have both nodes
        assert b.get_node("node-a") is not None, "A's node not synced to B"
        assert a.get_node("node-b") is not None, "B's node not synced to A"

    def test_sync_independent_stores_multiple_nodes(self):
        """Independent stores with many nodes across types must converge."""
        ont = json.dumps({
            "node_types": {
                "entity": {"properties": {}, "subtypes": {
                    "server": {"properties": {"host": {"value_type": "string"}}},
                    "service": {"properties": {"name": {"value_type": "string"}}},
                }},
                "rule": {"properties": {}, "subtypes": {
                    "guardrail": {"properties": {"scope": {"value_type": "string"}}},
                }},
            },
            "edge_types": {
                "RUNS_ON": {"source_types": ["entity"], "target_types": ["entity"], "properties": {}},
            },
        })

        a = GraphStore("gamma", ont)
        b = GraphStore("delta", ont)

        # Gamma: full infrastructure KG
        a.add_node("srv-1", "entity", "server-1", {"host": "10.0.0.1"}, subtype="server")
        a.add_node("svc-api", "entity", "api", {"name": "api"}, subtype="service")
        a.add_node("guard-1", "rule", "disk-check", {"scope": "deploy"}, subtype="guardrail")
        a.add_edge("svc-api-RUNS_ON-srv-1", "RUNS_ON", "svc-api", "srv-1")

        # Delta: only its own identity
        b.add_node("srv-2", "entity", "server-2", {"host": "10.0.0.2"}, subtype="server")

        # Bidirectional sync
        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload)

        offer_a = a.generate_sync_offer()
        payload2 = b.receive_sync_offer(offer_a)
        a.merge_sync_payload(payload2)

        # B must have gamma's nodes
        assert b.get_node("srv-1") is not None, "server not synced"
        assert b.get_node("svc-api") is not None, "service not synced"
        assert b.get_node("guard-1") is not None, "guardrail not synced"
        assert b.get_edge("svc-api-RUNS_ON-srv-1") is not None, "edge not synced"

        # A must have delta's node
        assert a.get_node("srv-2") is not None, "reverse sync failed"


# -- outgoing_edges after sync (SA-033 edge index bug) --


class TestOutgoingEdgesAfterSync:
    """Reproduce the production bug: outgoing_edges() returns [] after sync
    even though all_edges() shows the edge exists.

    See shelob/docs/silk-outgoing-edges-bug.md.
    """

    FLEET_ONTOLOGY = json.dumps({
        "node_types": {
            "entity": {
                "properties": {},
                "subtypes": {
                    "instance": {"properties": {
                        "host": {"value_type": "string"},
                        "priority": {"value_type": "int"},
                        "status": {"value_type": "string"},
                    }},
                    "server": {"properties": {
                        "name": {"value_type": "string"},
                        "ip_v4": {"value_type": "string"},
                    }},
                    "capability": {"properties": {
                        "name": {"value_type": "string"},
                        "role": {"value_type": "string"},
                        "status": {"value_type": "string"},
                    }},
                },
            },
        },
        "edge_types": {
            "RUNS_ON": {
                "source_types": ["entity"],
                "target_types": ["entity"],
                "properties": {},
            },
            "MEMBER_OF": {
                "source_types": ["entity"],
                "target_types": ["entity"],
                "properties": {},
            },
        },
    })

    def _make(self, instance_id: str) -> GraphStore:
        return GraphStore(instance_id, self.FLEET_ONTOLOGY)

    def test_outgoing_edges_after_sync_production_scenario(self):
        """Exact production scenario: two instances with different seeds.

        Gamma creates inst-gamma + server-gamma + RUNS_ON edge.
        Delta creates inst-delta + server-delta + RUNS_ON edge.
        After sync, delta must find inst-gamma's RUNS_ON via outgoing_edges().
        """
        gamma = self._make("gamma")
        delta = self._make("delta")

        # Gamma's seed
        gamma.add_node("inst-gamma", "entity", "gamma",
                        {"host": "5.78.44.251", "priority": 100, "status": "active"},
                        subtype="instance")
        gamma.add_node("server-gamma", "entity", "gamma-srv",
                        {"name": "gamma", "ip_v4": "5.78.44.251"},
                        subtype="server")
        gamma.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
                        "inst-gamma", "server-gamma")

        # Delta's seed
        delta.add_node("inst-delta", "entity", "delta",
                        {"host": "5.78.81.60", "priority": 50, "status": "active"},
                        subtype="instance")
        delta.add_node("server-delta", "entity", "delta-srv",
                        {"name": "delta", "ip_v4": "5.78.81.60"},
                        subtype="server")
        delta.add_edge("inst-delta-RUNS_ON-server-delta", "RUNS_ON",
                        "inst-delta", "server-delta")

        # Sync gamma → delta
        offer_d = delta.generate_sync_offer()
        payload = gamma.receive_sync_offer(offer_d)
        delta.merge_sync_payload(payload)

        # Sync delta → gamma
        offer_g = gamma.generate_sync_offer()
        payload2 = delta.receive_sync_offer(offer_g)
        gamma.merge_sync_payload(payload2)

        # THE BUG: outgoing_edges must find the synced RUNS_ON edge
        delta_edges = delta.outgoing_edges("inst-gamma")
        assert len(delta_edges) > 0, (
            "outgoing_edges('inst-gamma') returned [] on delta after sync. "
            "all_edges shows: " + str([(e["edge_id"], e["source_id"], e["target_id"])
                                       for e in delta.all_edges()])
        )
        assert delta_edges[0]["edge_type"] == "RUNS_ON"
        assert delta_edges[0]["target_id"] == "server-gamma"

        # Reverse: gamma must find delta's edge
        gamma_edges = gamma.outgoing_edges("inst-delta")
        assert len(gamma_edges) > 0, "outgoing_edges('inst-delta') returned [] on gamma"
        assert gamma_edges[0]["target_id"] == "server-delta"

    def test_outgoing_edges_after_sync_with_capabilities(self):
        """Multiple edge types: RUNS_ON + capability edges all findable."""
        gamma = self._make("gamma")
        delta = self._make("delta")

        # Gamma: instance + server + capabilities + edges
        gamma.add_node("inst-gamma", "entity", "gamma",
                        {"host": "10.0.0.1", "priority": 100, "status": "active"},
                        subtype="instance")
        gamma.add_node("server-gamma", "entity", "srv",
                        {"name": "gamma", "ip_v4": "10.0.0.1"},
                        subtype="server")
        gamma.add_node("cap-k3s", "entity", "K3s",
                        {"name": "K3s", "role": "container_runtime", "status": "installed"},
                        subtype="capability")
        gamma.add_node("cap-nginx", "entity", "nginx",
                        {"name": "nginx", "role": "gateway", "status": "installed"},
                        subtype="capability")
        gamma.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
                        "inst-gamma", "server-gamma")
        gamma.add_edge("cap-k3s-RUNS_ON-server-gamma", "RUNS_ON",
                        "cap-k3s", "server-gamma")
        gamma.add_edge("cap-nginx-RUNS_ON-server-gamma", "RUNS_ON",
                        "cap-nginx", "server-gamma")

        # Delta: minimal
        delta.add_node("inst-delta", "entity", "delta",
                        {"host": "10.0.0.2", "priority": 50, "status": "active"},
                        subtype="instance")

        # Sync gamma → delta
        offer_d = delta.generate_sync_offer()
        payload = gamma.receive_sync_offer(offer_d)
        delta.merge_sync_payload(payload)

        # outgoing_edges for the instance
        inst_edges = delta.outgoing_edges("inst-gamma")
        assert len(inst_edges) == 1, f"expected 1 RUNS_ON from inst-gamma, got {len(inst_edges)}"

        # outgoing_edges for capabilities
        k3s_edges = delta.outgoing_edges("cap-k3s")
        assert len(k3s_edges) == 1, f"expected 1 RUNS_ON from cap-k3s, got {len(k3s_edges)}"

        nginx_edges = delta.outgoing_edges("cap-nginx")
        assert len(nginx_edges) == 1, f"expected 1 RUNS_ON from cap-nginx, got {len(nginx_edges)}"

        # incoming_edges on server-gamma should find all 3
        incoming = delta.incoming_edges("server-gamma")
        assert len(incoming) == 3, f"expected 3 incoming on server-gamma, got {len(incoming)}"

    def test_outgoing_edges_survives_tombstone_resurrection(self):
        """Edge removed then re-added via sync — index must reflect resurrection."""
        a = self._make("store-a")
        b = self._make("store-b")

        a.add_node("n1", "entity", "n1",
                    {"host": "x", "priority": 1, "status": "active"}, subtype="instance")
        a.add_node("n2", "entity", "n2",
                    {"name": "s", "ip_v4": "x"}, subtype="server")
        a.add_edge("e1", "RUNS_ON", "n1", "n2")
        a.remove_edge("e1")
        a.add_edge("e1", "RUNS_ON", "n1", "n2")  # resurrect

        # Sync to b
        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload)

        # b must see the resurrected edge
        edges = b.outgoing_edges("n1")
        assert len(edges) == 1, f"expected 1 edge after resurrection sync, got {len(edges)}"
        assert edges[0]["edge_id"] == "e1"

    def test_outgoing_edges_after_persistent_reload(self, tmp_path):
        """Edges findable via outgoing_edges after redb close + reopen."""
        path_a = str(tmp_path / "a.redb")

        a = GraphStore("inst-a", self.FLEET_ONTOLOGY, path=path_a)
        a.add_node("n1", "entity", "n1",
                    {"host": "x", "priority": 1, "status": "active"}, subtype="instance")
        a.add_node("n2", "entity", "n2",
                    {"name": "s", "ip_v4": "x"}, subtype="server")
        a.add_edge("e1", "RUNS_ON", "n1", "n2")

        # Verify before close
        assert len(a.outgoing_edges("n1")) == 1

        # Close and reopen
        del a
        a2 = GraphStore("inst-a", self.FLEET_ONTOLOGY, path=path_a)

        edges = a2.outgoing_edges("n1")
        assert len(edges) == 1, f"outgoing_edges empty after redb reload, got {len(edges)}"
        assert edges[0]["edge_id"] == "e1"

    def test_outgoing_edges_after_sync_then_reload(self, tmp_path):
        """The full production scenario: sync + redb restart.

        Store A (in-memory): creates nodes + edges.
        Store B (redb): receives via sync. Closes. Reopens.
        outgoing_edges must work on the reopened store for synced edges.
        """
        path_b = str(tmp_path / "b.redb")

        a = self._make("gamma")
        a.add_node("inst-gamma", "entity", "gamma",
                    {"host": "10.0.0.1", "priority": 100, "status": "active"},
                    subtype="instance")
        a.add_node("server-gamma", "entity", "srv",
                    {"name": "gamma", "ip_v4": "10.0.0.1"},
                    subtype="server")
        a.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
                    "inst-gamma", "server-gamma")

        b = GraphStore("delta", self.FLEET_ONTOLOGY, path=path_b)
        b.add_node("inst-delta", "entity", "delta",
                    {"host": "10.0.0.2", "priority": 50, "status": "active"},
                    subtype="instance")

        # Sync gamma → delta
        offer_b = b.generate_sync_offer()
        payload = a.receive_sync_offer(offer_b)
        b.merge_sync_payload(payload)

        # Verify before close
        assert len(b.outgoing_edges("inst-gamma")) == 1, "outgoing_edges failed before reload"

        # Close and reopen (simulates systemd restart)
        del b
        b2 = GraphStore("delta", self.FLEET_ONTOLOGY, path=path_b)

        # THE TEST: outgoing_edges must work after sync + restart
        edges = b2.outgoing_edges("inst-gamma")
        assert len(edges) == 1, (
            f"outgoing_edges('inst-gamma') empty after sync + redb reload. "
            f"all_edges: {[(e['edge_id'], e['source_id'], e['target_id']) for e in b2.all_edges()]}"
        )
        assert edges[0]["edge_type"] == "RUNS_ON"
        assert edges[0]["target_id"] == "server-gamma"