lex-bytecode 0.9.12

Bytecode compiler + VM for Lex.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155
3156
3157
3158
3159
3160
3161
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172
3173
3174
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185
3186
3187
3188
3189
3190
3191
3192
3193
3194
3195
3196
3197
3198
3199
3200
3201
3202
3203
3204
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
//! M5: bytecode VM. Stack machine with effect dispatch through a host handler.

use crate::op::*;
use crate::program::*;
use crate::value::{ActorCell, Value};
use std::sync::{Arc, Mutex, OnceLock};
use indexmap::IndexMap;
use smol_str::SmolStr;
use std::collections::{HashMap, VecDeque};

// ── IC polymorphism instrumentation (throwaway, env-gated) ─────────
// Enable with LEX_IC_STATS=1. With LEX_IC_STATS_OUT=<path> writes a
// TSV to <path>.<pid> on each Vm drop; otherwise dumps to stderr.

#[derive(Default)]
struct IcStats {
    sites: HashMap<(u32, u32), HashMap<u32, u64>>,
}

static IC_STATS: OnceLock<Mutex<IcStats>> = OnceLock::new();
static IC_STATS_ENABLED: OnceLock<bool> = OnceLock::new();

fn ic_stats_enabled() -> bool {
    *IC_STATS_ENABLED.get_or_init(|| {
        std::env::var("LEX_IC_STATS").map(|v| v == "1").unwrap_or(false)
    })
}

fn record_ic_hit(fn_id: u32, site_idx: u32, shape_id: u32) {
    let stats = IC_STATS.get_or_init(|| Mutex::new(IcStats::default()));
    let mut s = stats.lock().unwrap();
    *s.sites.entry((fn_id, site_idx)).or_default().entry(shape_id).or_insert(0) += 1;
}

pub fn dump_ic_stats() {
    let Some(stats) = IC_STATS.get() else { return; };
    let s = stats.lock().unwrap();
    if s.sites.is_empty() { return; }
    let mut out = String::from("fn_id\tsite_idx\tshape_id\thits\n");
    let mut entries: Vec<_> = s.sites.iter().collect();
    entries.sort_by_key(|((f, si), _)| (*f, *si));
    for ((f, site), shapes) in entries {
        let mut shape_entries: Vec<_> = shapes.iter().collect();
        shape_entries.sort_by_key(|(sid, _)| **sid);
        for (sid, hits) in shape_entries {
            out.push_str(&format!("{f}\t{site}\t{sid}\t{hits}\n"));
        }
    }
    match std::env::var("LEX_IC_STATS_OUT").ok() {
        Some(path) => {
            let pid = std::process::id();
            let _ = std::fs::write(format!("{path}.{pid}"), out);
        }
        None => { eprint!("{out}"); }
    }
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum VmError {
    #[error("runtime panic: {0}")]
    Panic(String),
    #[error("type mismatch at runtime: {0}")]
    TypeMismatch(String),
    #[error("stack underflow")]
    StackUnderflow,
    #[error("unknown function: {0}")]
    UnknownFunction(String),
    #[error("effect handler error: {0}")]
    Effect(String),
    #[error("call stack overflow: recursion depth exceeded ({0})")]
    CallStackOverflow(u32),
    /// Refinement predicate failed at a call boundary (#209 slice 3).
    /// Surfaced when a function declares `param :: Type{x | predicate}`,
    /// the call-site arg couldn't be discharged statically (slice 2),
    /// and the runtime evaluator finds the predicate is `false` for
    /// the actual argument value. The `verdict` mirrors the shape of
    /// `gate.verdict`-style records in `lex-trace`.
    #[error("refinement violated: argument {param_index} of `{fn_name}` (binding `{binding}`): {reason}")]
    RefinementFailed {
        fn_name: String,
        param_index: usize,
        binding: String,
        reason: String,
    },
}

/// Maximum simultaneous call frames. Defends against unbounded
/// recursion in agent-emitted code: a body that calls itself
/// without a base case would otherwise blow the host's native
/// stack and crash the process. Real Lex code rarely exceeds
/// ~30 frames; 1024 is generous headroom while still well under
/// the OS stack limit at any per-frame size we use.
pub const MAX_CALL_DEPTH: u32 = 1024;

/// Per-frame stack-record budget (#464 step 2). Counts the number of
/// `Value` slots a frame may consume from `Vm::stack_record_arena`
/// before further `Op::AllocStackRecord` requests fall back to the
/// heap path. 64 slots at the current `size_of::<Value>() = 64B`
/// gives ~4 KiB per frame, matching the design-doc proposal in
/// `docs/design/escape-analysis.md`. A handler-shaped function
/// (one outer record of ≤8 fields, plus a handful of small inner
/// records) fits well inside this without growing.
pub const STACK_RECORD_BUDGET_SLOTS: u32 = 64;

/// Adaptive-memoization warmup window (#229 adaptive). A pure
/// function is given this many cache-probing calls to demonstrate a
/// hit; if it reaches the window with zero hits, memoization is
/// disabled for it (its calls stop hashing args). A function that
/// genuinely benefits — e.g. naive recursive `fib`, where each call
/// immediately reuses sub-results — accumulates hits well before the
/// window closes and stays enabled. 64 balances "give real reuse a
/// chance" against "don't pay the hash forever on always-miss code".
const MEMO_WARMUP_CALLS: u32 = 64;

/// Per-function adaptive-memoization state (#229 adaptive). `enabled`
/// starts true; once a function reaches `MEMO_WARMUP_CALLS` cache
/// probes with `hits == 0`, it flips to false and that function's
/// calls skip the args hash entirely for the rest of the Vm's life.
#[derive(Clone, Copy)]
struct MemoFnState {
    calls: u32,
    hits: u32,
    enabled: bool,
}

impl Default for MemoFnState {
    fn default() -> Self {
        MemoFnState { calls: 0, hits: 0, enabled: true }
    }
}

/// Host-side effect dispatch. Implementors decide what `kind`/`op` mean
/// and how arguments map to side effects.
pub trait EffectHandler {
    fn dispatch(&mut self, kind: &str, op: &str, args: Vec<Value>) -> Result<Value, String>;

    /// Hook called by the VM at every function call so handlers can
    /// enforce per-call budget consumption (#225). The argument is
    /// the sum of `[budget(N)]` declared on the callee's signature;
    /// the handler returns `Err` to refuse the call (the VM converts
    /// to `VmError::Effect`). Default impl is a no-op so legacy
    /// handlers and pure-only runs are unaffected.
    fn note_call_budget(&mut self, _budget_cost: u64) -> Result<(), String> {
        Ok(())
    }

    /// Enter a per-request allocation scope (#463 scaffolding).
    /// Called by the runtime layer (e.g. `net.serve_fn`'s request
    /// loop) immediately before invoking the user handler closure
    /// for one request. Implementations push a fresh arena onto
    /// their internal stack and return its identifier; the matching
    /// `exit_request_scope` call drops it.
    ///
    /// Default impl is a no-op — handlers without arena support
    /// return a sentinel scope id which they ignore on exit.
    /// `DefaultHandler` in `lex-runtime` provides the real
    /// implementation.
    ///
    /// Today the VM does NOT route any `Value` allocations through
    /// the returned arena — see the scaffolding notes in
    /// `crates/lex-runtime/src/arena.rs`. The hook exists so the
    /// follow-on slice that adds Value-rep arena routing has a
    /// stable trait surface to extend.
    fn enter_request_scope(&mut self) -> u64 { 0 }

    /// Exit a per-request allocation scope opened by
    /// `enter_request_scope`. Implementations drop the arena
    /// associated with `scope_id`. Calling exit with a scope_id
    /// that wasn't returned by a prior enter is implementation-
    /// defined behavior — DefaultHandler treats it as a no-op so
    /// mismatched pairs don't panic.
    fn exit_request_scope(&mut self, _scope_id: u64) {}

    /// `list.par_map` worker-handler factory (#305 slice 2).
    ///
    /// Each parallel worker thread runs its own `Vm` and therefore
    /// needs its own effect handler. The parent handler may opt in
    /// to per-worker dispatch by returning `Some(handler)` here;
    /// returning `None` (the default) keeps slice-1 behavior: the
    /// worker runs `DenyAllEffects` and any effect call inside the
    /// closure fails with `VmError::Effect`.
    ///
    /// The returned handler must be `Send` so the worker can take
    /// ownership across a thread boundary. Shared state (budget
    /// pool, chat registry, etc.) is wired up by the implementer.
    /// Per-worker independence (MCP client cache, output sink)
    /// is intentional — the alternative is mutex-serialization of
    /// the whole effect dispatch, which would defeat the parallelism.
    fn spawn_for_worker(&self) -> Option<Box<dyn EffectHandler + Send>> {
        None
    }
}

/// `Vm` exposes itself as a `ClosureCaller` so the parser interpreter
/// can invoke user-supplied closures during a `parser.run` walk
/// (#221). The Vm is reentrant for closure invocation: pushing a new
/// frame onto an active call stack is supported, and the handler
/// stays in place so any effects the closure body fires dispatch
/// normally.
impl<'a> crate::parser_runtime::ClosureCaller for Vm<'a> {
    fn call_closure(&mut self, closure: Value, args: Vec<Value>) -> Result<Value, String> {
        self.invoke_closure_value(closure, args)
            .map_err(|e| format!("{e:?}"))
    }
}

/// A handler that fails any effect call. Useful as a default for pure-only runs.
pub struct DenyAllEffects;
impl EffectHandler for DenyAllEffects {
    fn dispatch(&mut self, kind: &str, op: &str, _args: Vec<Value>) -> Result<Value, String> {
        Err(format!("effects not permitted (attempted {kind}.{op})"))
    }
}

/// Trace receiver. Implementors record the call/effect tree and may
/// substitute effect responses (for replay).
pub trait Tracer {
    fn enter_call(&mut self, node_id: &str, name: &str, args: &[Value]);
    fn enter_effect(&mut self, node_id: &str, kind: &str, op: &str, args: &[Value]);
    fn exit_ok(&mut self, value: &Value);
    fn exit_err(&mut self, message: &str);
    /// Tail-call optimization: pop the current frame's open call without
    /// re-entering the parent (the new call takes its place).
    fn exit_call_tail(&mut self);
    /// During replay, return Some(v) to substitute an effect's output.
    fn override_effect(&mut self, _node_id: &str) -> Option<Value> { None }
}

/// No-op tracer for normal execution.
pub struct NullTracer;
impl Tracer for NullTracer {
    fn enter_call(&mut self, _: &str, _: &str, _: &[Value]) {}
    fn enter_effect(&mut self, _: &str, _: &str, _: &str, _: &[Value]) {}
    fn exit_ok(&mut self, _: &Value) {}
    fn exit_err(&mut self, _: &str) {}
    fn exit_call_tail(&mut self) {}
}

#[derive(Debug, Clone)]
pub(crate) enum FrameKind {
    /// Top-level entry frame; doesn't correspond to a Call opcode.
    Entry,
    /// Frame opened by Call/TailCall. The `String` is the originating
    /// `NodeId`; useful for diagnostics even if currently unread.
    Call(#[allow(dead_code)] String),
}

pub struct Vm<'a> {
    program: &'a Program,
    handler: Box<dyn EffectHandler + 'a>,
    pub(crate) tracer: Box<dyn Tracer + 'a>,
    /// Per-call frames. Each frame has its own locals array and pc.
    frames: Vec<Frame>,
    stack: Vec<Value>,
    /// Soft cap to avoid runaway computations in tests.
    pub step_limit: u64,
    pub steps: u64,
    /// Per-Vm memoization cache for pure functions (#229). Keyed by
    /// `(fn_id, hash_call_args(args))` — a 128-bit structural digest
    /// of the arguments (see `hash_call_args`). Effectful functions
    /// never enter this map. The cache lives for the lifetime of one
    /// `Vm::call` chain — calling `Vm::with_handler` again starts a
    /// fresh cache.
    pure_memo: std::collections::HashMap<(u32, [u8; 16]), Value>,
    /// Diagnostic counters for `--trace` observability (#229).
    pub pure_memo_hits: u64,
    pub pure_memo_misses: u64,
    /// Number of effect-free calls that skipped the cache entirely
    /// because adaptive memoization disabled their function (#229
    /// adaptive). Observability only.
    pub pure_memo_skips: u64,
    /// Adaptive-memoization state, one entry per function (indexed by
    /// `fn_id`), parallel to `field_ics` (#229 adaptive). Memoization
    /// only pays when a function is called repeatedly with equal args;
    /// the unconditional `hash_call_args` on every effect-free call is
    /// pure overhead otherwise (the `response_build` profile: 0 hits /
    /// 3600 misses, ~12% of instructions). After a warmup window with
    /// zero hits we stop hashing that function's calls — always safe,
    /// since the callee is pure and recomputing yields the same value.
    /// Sticky for the Vm's lifetime: a function that hasn't hit in
    /// `MEMO_WARMUP_CALLS` calls won't amortize later.
    memo_fn_state: Vec<MemoFnState>,
    /// Monomorphic inline caches for `Op::GetField` (#462 slice 1 +
    /// shape-keyed verification slice). Indexed by
    /// `[fn_id as usize][site_idx as usize]` — one entry per
    /// field-access site within each function. `site_idx` is assigned
    /// at compile time by `FnCompiler::field_get_sites` so every emit
    /// produces a stable identifier independent of pc. The cache
    /// survives the planned dispatch rewrite (#461) and a future
    /// JIT (#465).
    ///
    /// Slot shape: `(shape_id, offset)`. The pre-shape-keyed slice
    /// stored only the offset and re-verified each hit by walking
    /// `IndexMap::get_index(off)` and string-comparing the field name
    /// against the requested `name_idx`. After this slice, hits
    /// against compile-time records (real `shape_id`) verify with a
    /// single `u32` compare and skip the string compare entirely —
    /// per the #462 slice-2b measurement that observed 0% polymorphism
    /// and 86% of hits going to records with a real shape_id.
    ///
    /// `NO_SHAPE_ID` records (JSON / SQL / HTTP-built — 14% of measured
    /// hits, 100% of inbox/gateway traffic) fall through to the
    /// pre-slice name-compare verification. Distinct dynamic shapes
    /// both carry `NO_SHAPE_ID` and would otherwise alias on a
    /// pure-shape-keyed IC; keeping the name compare on that path
    /// preserves correctness without a separate cache for them.
    ///
    /// Outer Vec is pre-sized to `program.functions.len()`; each inner
    /// Vec is empty until the first GetField in that function runs,
    /// at which point we one-shot allocate it to the compiler-recorded
    /// `field_ic_sites` size and never resize again. Lazy on the inner
    /// side so VMs created for short-lived scripts don't eagerly
    /// allocate IC slots for functions they never enter.
    field_ics: Vec<Vec<Option<(u32, usize)>>>,
    /// Stack allocator for function locals (#389 slice 3).
    ///
    /// Every function frame claims `locals_count` contiguous slots from
    /// this Vec on push and releases them on pop.  Because Lex uses
    /// strictly LIFO frame semantics the most-recently-pushed frame's
    /// slots always sit at the top of the Vec, so `truncate` is the
    /// correct (and O(1)) release operation.
    ///
    /// The Vec is pre-allocated once at VM construction and then grows
    /// only if the actual call depth × locals width exceeds the initial
    /// capacity.  After a top-level `vm.call` returns the Vec is empty
    /// again but its capacity is retained, so the next request incurs
    /// zero allocations for locals up to the high-water mark.
    locals_storage: Vec<Value>,
    /// Stack-record arena (#464 step 2). Each `Op::AllocStackRecord`
    /// at a non-escaping site appends its `field_count` field values
    /// here; the produced `Value::StackRecord` carries `slab_start =
    /// arena.len() - field_count` so reads are an O(1) slab index.
    /// On `Op::Return` the arena is truncated back to
    /// `frame.stack_record_arena_start`, releasing every record the
    /// frame allocated in O(1) — same lifetime story as
    /// `locals_storage` for frame locals.
    ///
    /// LIFO frame discipline guarantees a frame's records always sit
    /// at the top of the arena while the frame is live, so neither
    /// inter-frame interleaving nor index churn can occur.
    stack_record_arena: Vec<Value>,
    /// Per-Vm counters for #464 acceptance measurement. Incremented
    /// on every `Op::MakeRecord` / `Op::AllocStackRecord` dispatch.
    /// The bench reads these to compute the stack-allocation rate
    /// (≥ 60% of records on the stack is the acceptance bar). Cheap
    /// in the hot path — two unconditional u64 increments per record.
    pub stack_record_allocs: u64,
    pub stack_record_heap_fallbacks: u64,
    pub heap_record_allocs: u64,
    /// Request-scoped arena slab (#463 slice 2a). Mirrors the shape of
    /// `stack_record_arena` but lives across frames inside the
    /// request scope opened by `EffectHandler::enter_request_scope`.
    /// Each `Op::AllocArenaRecord` / `Op::AllocArenaTuple` appends its
    /// field values here and pushes a handle (`Value::ArenaRecord` /
    /// `Value::ArenaTuple`) whose `slab_start` indexes back in.
    /// Truncated to the saved start on `exit_request_scope`, releasing
    /// every value the scope built in O(1) — same lifetime story as
    /// `stack_record_arena` truncating on `Op::Return`.
    ///
    /// Slabs nest LIFO: `arena_scope_starts` holds the
    /// `arena_slab.len()` snapshot taken at each `enter_request_scope`,
    /// and `exit_request_scope` truncates back to the matching entry.
    /// An empty `arena_scope_starts` means **no active scope** — the
    /// alloc ops fall back to their `MakeRecord` / `MakeTuple` heap
    /// path, so the VM stays sound when arena-lowered bytecode runs in
    /// a non-handler context.
    arena_slab: Vec<Value>,
    /// LIFO stack of `arena_slab.len()` snapshots, one per active
    /// request scope. See `arena_slab`.
    arena_scope_starts: Vec<u32>,
    /// Counters for #463 slice-2b acceptance (will be the
    /// arena-allocation-rate gate, paralleling the #464 stack-rate
    /// counters above). Incremented in the op handlers; harmless in
    /// slice 2a since codegen doesn't emit the ops yet.
    pub arena_record_allocs: u64,
    pub arena_record_heap_fallbacks: u64,
    /// Optional JIT tier hook (#465 phase-1 integration). Consulted
    /// by the `Op::Call` dispatch arm after refinements + memo. See
    /// `crate::jit_hook` for the trait contract. `None` means
    /// "interpreter-only" — that branch in the dispatch arm folds
    /// to a single null-pointer check the optimizer can hoist.
    jit_hook: Option<Box<dyn crate::jit_hook::JitHook + 'a>>,
}

struct Frame {
    fn_id: u32,
    pc: usize,
    /// Start index of this frame's locals in `Vm::locals_storage` (#389
    /// slice 3). The frame owns `locals_storage[locals_start..locals_start
    /// + locals_len]`; `Op::Return` truncates the Vec back to
    /// `locals_start`, releasing the slots in O(1).
    locals_start: usize,
    locals_len: usize,
    /// Stack base when this frame started (for cleanup on return).
    stack_base: usize,
    trace_kind: FrameKind,
    /// Pure-fn memo key (#229). `Some(key)` if the call was eligible
    /// for memoization and missed the cache; on Op::Return the key
    /// is used to write the return value back into the cache.
    /// `None` means "don't memoize" — either the function isn't pure,
    /// the call wasn't through Op::Call, or memoization is disabled.
    memo_key: Option<(u32, [u8; 16])>,
    /// #464 step 2: start index of this frame's records in
    /// `Vm::stack_record_arena`. On `Op::Return`, the arena is
    /// truncated back here. Identical lifetime discipline to
    /// `locals_start`.
    stack_record_arena_start: usize,
    /// Remaining stack-record budget for this frame, in Value-slot
    /// units (#464 step 2). Initial value: `STACK_RECORD_BUDGET_SLOTS`.
    /// When an `Op::AllocStackRecord` would consume more slots than
    /// remain, the VM falls back to the heap path silently (same
    /// observable effect as `Op::MakeRecord`), so the budget never
    /// surfaces as a user-visible error.
    stack_record_budget_remaining: u32,
}

/// Sum of `[budget(N)]` declarations on a function's signature
/// (#225). Used by Op::Call / Op::TailCall / Op::CallClosure to
/// notify the EffectHandler of per-call budget cost so the handler
/// can deduct from a shared pool and refuse calls that would
/// exceed the policy ceiling. Negative `Int` args are ignored —
/// the static check (`policy::check_program`) treats budgets as
/// non-negative.
fn call_budget_cost(f: &crate::program::Function) -> u64 {
    let mut total: u64 = 0;
    for e in &f.effects {
        if e.kind == "budget" {
            if let Some(crate::program::EffectArg::Int(n)) = &e.arg {
                if *n >= 0 {
                    total = total.saturating_add(*n as u64);
                }
            }
        }
    }
    total
}

/// Hash the argument list for a pure-fn memoization lookup (#229).
///
/// The memo cache (`pure_memo`) is keyed on this 128-bit digest with
/// no secondary equality check, so the contract is: argument lists
/// that are equal under `Value`'s `PartialEq` must produce the same
/// digest, and the 128-bit width keeps the false-collision rate
/// (which would return a wrong cached result) negligible.
///
/// History (#461 follow-up): this used to build a `serde_json::Value`
/// of every arg, canonicalize it, and SHA-256 the bytes. Profiling
/// the `response_build` workload showed that path at 27.6% of all
/// instructions — it dominated the VM, since every effect-free call
/// pays it whether or not the cache ever hits. The cache is per-`Vm`
/// and ephemeral, so a cryptographic, cross-process-stable key was
/// never needed. We now walk the `Value` tree directly into two
/// domain-separated `SipHash` passes (deterministic fixed-key
/// `DefaultHasher`), concatenating the two 64-bit outputs into a
/// 128-bit key. No JSON allocation, no crypto.
///
/// The walk mirrors `Value::PartialEq` so the equal-args-equal-key
/// contract holds: `Record` is hashed order-independently over its
/// fields (matching `IndexMap`'s order-insensitive equality),
/// `Closure` on `(body_hash, captures)` not `fn_id` (#222), and
/// `Actor`/`Ticker` on pointer identity (matching `Arc::ptr_eq`).
fn hash_call_args(args: &[Value]) -> [u8; 16] {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::Hasher;
    let mut h0 = DefaultHasher::new();
    let mut h1 = DefaultHasher::new();
    // Domain separator: makes the two passes diverge so the
    // concatenated halves span the full 128-bit space rather than
    // duplicating one 64-bit value.
    h1.write_u8(0x9e);
    h0.write_usize(args.len());
    h1.write_usize(args.len());
    for a in args {
        hash_value_into(a, &mut h0);
        hash_value_into(a, &mut h1);
    }
    let lo = h0.finish();
    let hi = h1.finish();
    let mut out = [0u8; 16];
    out[..8].copy_from_slice(&lo.to_le_bytes());
    out[8..].copy_from_slice(&hi.to_le_bytes());
    out
}

/// Structural hash of a `Value` into `h`, consistent with
/// `Value::PartialEq`. The leading discriminant byte keeps distinct
/// variants from colliding (e.g. `Int(0)` vs `Bool(false)`).
fn hash_value_into<H: std::hash::Hasher>(v: &Value, h: &mut H) {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::Hasher as _;
    match v {
        Value::Int(n) => { h.write_u8(0x01); h.write_i64(*n); }
        // Bit pattern, not value: total and deterministic. NaN==NaN
        // by bits (a memo hit there is harmless — the callee is pure
        // and returns the same result for bit-identical args), and
        // +0.0/-0.0 differ (a harmless extra miss).
        Value::Float(f) => { h.write_u8(0x02); h.write_u64(f.to_bits()); }
        Value::Bool(b) => { h.write_u8(0x03); h.write_u8(*b as u8); }
        Value::Str(s) => {
            h.write_u8(0x04);
            h.write_usize(s.len());
            h.write(s.as_bytes());
        }
        Value::Bytes(b) => {
            h.write_u8(0x05);
            h.write_usize(b.len());
            h.write(b);
        }
        Value::Unit => { h.write_u8(0x06); }
        Value::List(items) => {
            h.write_u8(0x07);
            h.write_usize(items.len());
            for it in items { hash_value_into(it, h); }
        }
        Value::Tuple(items) => {
            h.write_u8(0x08);
            h.write_usize(items.len());
            for it in items { hash_value_into(it, h); }
        }
        Value::Deque(items) => {
            h.write_u8(0x09);
            h.write_usize(items.len());
            for it in items { hash_value_into(it, h); }
        }
        // `IndexMap` equality is order-insensitive, so the hash must
        // be too: combine per-entry sub-hashes with wrapping add (a
        // commutative mix) rather than feeding them in iteration
        // order.
        Value::Record { fields, .. } => {
            h.write_u8(0x0a);
            let mut combined: u64 = 0;
            for (k, val) in fields.iter() {
                let mut e = DefaultHasher::new();
                e.write(k.as_bytes());
                e.write_u8(0xff);
                hash_value_into(val, &mut e);
                combined = combined.wrapping_add(e.finish());
            }
            h.write_u64(combined);
            h.write_usize(fields.len());
        }
        Value::Variant { name, args } => {
            h.write_u8(0x0b);
            h.write_usize(name.len());
            h.write(name.as_bytes());
            h.write_usize(args.len());
            for a in args { hash_value_into(a, h); }
        }
        // Identity is `(body_hash, captures)`, not `fn_id` (#222).
        Value::Closure { body_hash, captures, .. } => {
            h.write_u8(0x0c);
            h.write(body_hash);
            h.write_usize(captures.len());
            for c in captures { hash_value_into(c, h); }
        }
        Value::F64Array { rows, cols, data } => {
            h.write_u8(0x0d);
            h.write_u32(*rows);
            h.write_u32(*cols);
            for f in data { h.write_u64(f.to_bits()); }
        }
        // BTreeMap / BTreeSet iterate in sorted key order — already
        // canonical, so direct feed is order-independent.
        Value::Map(m) => {
            h.write_u8(0x0e);
            h.write_usize(m.len());
            for (k, val) in m {
                hash_mapkey_into(k, h);
                hash_value_into(val, h);
            }
        }
        Value::Set(s) => {
            h.write_u8(0x0f);
            h.write_usize(s.len());
            for k in s { hash_mapkey_into(k, h); }
        }
        // Pointer identity, matching `Arc::ptr_eq` in PartialEq.
        Value::Actor(a) => {
            h.write_u8(0x10);
            h.write_usize(Arc::as_ptr(a) as *const () as usize);
        }
        Value::Ticker(t) => {
            h.write_u8(0x11);
            h.write_usize(Arc::as_ptr(t) as *const () as usize);
        }
        // Coarse summary (schema + dimensions), matching the prior
        // `to_json` encoding which deliberately omitted the cell data
        // (tables can be GB-scale). Equal tables share schema + dims
        // so equal-args-equal-key holds; this is no coarser than the
        // pre-#461-followup behavior.
        Value::ArrowTable(t) => {
            h.write_u8(0x12);
            h.write_i64(t.num_rows() as i64);
            h.write_i64(t.num_columns() as i64);
            for f in t.schema().fields() {
                h.write(f.name().as_bytes());
                h.write_u8(0xfe);
            }
        }
        // #464: a StackRecord crossing into the memo path means an
        // escape the analysis was supposed to reject. Mirror the
        // PartialEq / to_json panic rather than mint a bogus key.
        Value::StackRecord { .. } =>
            panic!("BUG(#464): Value::StackRecord reached memo hashing — \
                    escape analysis should have prevented escape to a call boundary"),
        Value::StackTuple { .. } =>
            panic!("BUG(#464): Value::StackTuple reached memo hashing — \
                    escape analysis should have prevented escape to a call boundary"),
        // #463 slice 2a: arena handles must never reach memo hashing.
        // The memo cache outlives every request scope, so a hashed
        // arena handle would dangle. Slice 1's arena-eligibility
        // analysis must exclude pure-fn allocation sites (the memo
        // path is reached only through pure-fn calls) — any reach
        // here is a soundness bug.
        Value::ArenaRecord { .. } =>
            panic!("BUG(#463): Value::ArenaRecord reached memo hashing — \
                    arena-eligibility analysis must exclude pure-fn allocation sites"),
        Value::ArenaTuple { .. } =>
            panic!("BUG(#463): Value::ArenaTuple reached memo hashing — \
                    arena-eligibility analysis must exclude pure-fn allocation sites"),
    }
}

/// Hash a `MapKey` into `h` with its own discriminant so a `Str`
/// key and an `Int` key never collide.
fn hash_mapkey_into<H: std::hash::Hasher>(k: &crate::value::MapKey, h: &mut H) {
    use crate::value::MapKey;
    match k {
        MapKey::Str(s) => { h.write_u8(0x01); h.write_usize(s.len()); h.write(s.as_bytes()); }
        MapKey::Int(n) => { h.write_u8(0x02); h.write_i64(*n); }
    }
}

/// Evaluate a refinement predicate at runtime against the actual
/// argument value (#209 slice 3). Mirrors `lex_types::discharge`'s
/// static evaluator but operates on `Value` directly.
///
/// Returns `Ok(true)` / `Ok(false)` for a clean boolean verdict, or
/// `Err(reason)` if the predicate references something the runtime
/// can't resolve (free variable beyond the binding, unsupported AST
/// node). Callers map `Ok(false)` and `Err` to `VmError::RefinementFailed`.
fn eval_refinement(
    predicate: &lex_ast::CExpr,
    binding: &str,
    arg: &Value,
) -> Result<bool, String> {
    match eval_refinement_inner(predicate, binding, arg) {
        Ok(Value::Bool(b)) => Ok(b),
        Ok(other) => Err(format!("predicate didn't reduce to a Bool, got {other:?}")),
        Err(e) => Err(e),
    }
}

fn eval_refinement_inner(
    e: &lex_ast::CExpr,
    binding: &str,
    arg: &Value,
) -> Result<Value, String> {
    use lex_ast::{CExpr, CLit};
    match e {
        CExpr::Literal { value } => Ok(match value {
            CLit::Int { value } => Value::Int(*value),
            CLit::Float { value } => Value::Float(value.parse().unwrap_or(0.0)),
            CLit::Bool { value } => Value::Bool(*value),
            CLit::Str { value } => Value::Str(value.as_str().into()),
            CLit::Bytes { value } => Value::Str(value.as_str().into()), // hex; unusual in predicates
            CLit::Unit => Value::Unit,
        }),
        CExpr::Var { name } if name == binding => Ok(arg.clone()),
        CExpr::Var { name } => Err(format!(
            "predicate references free var `{name}`; runtime check \
             only resolves the binding (slice 4 will plumb call-site \
             context)")),
        CExpr::UnaryOp { op, expr } => {
            let v = eval_refinement_inner(expr, binding, arg)?;
            match (op.as_str(), v) {
                ("not", Value::Bool(b)) => Ok(Value::Bool(!b)),
                ("-", Value::Int(n)) => Ok(Value::Int(-n)),
                ("-", Value::Float(n)) => Ok(Value::Float(-n)),
                (o, v) => Err(format!("unsupported unary `{o}` on {v:?}")),
            }
        }
        CExpr::BinOp { op, lhs, rhs } => {
            // Short-circuit `and` / `or` for the same reasons as the
            // static evaluator.
            if op == "and" || op == "or" {
                let l = eval_refinement_inner(lhs, binding, arg)?;
                let lb = match l {
                    Value::Bool(b) => b,
                    other => return Err(format!("`{op}` on non-bool: {other:?}")),
                };
                if op == "and" && !lb { return Ok(Value::Bool(false)); }
                if op == "or"  &&  lb { return Ok(Value::Bool(true));  }
                let r = eval_refinement_inner(rhs, binding, arg)?;
                return match r {
                    Value::Bool(b) => Ok(Value::Bool(b)),
                    other => Err(format!("`{op}` on non-bool: {other:?}")),
                };
            }
            let l = eval_refinement_inner(lhs, binding, arg)?;
            let r = eval_refinement_inner(rhs, binding, arg)?;
            apply_refinement_binop(op, &l, &r)
        }
        // Other AST forms (Call, Let, Match, FieldAccess, Lambda,
        // Block, Constructors, Records, Tuples, Lists, Return) need
        // a more general evaluator that can call back into the VM.
        // Out of scope for slice 3; a future slice may unify this
        // with the spec-checker's gate evaluator.
        other => Err(format!("unsupported predicate node: {other:?}")),
    }
}

fn apply_refinement_binop(op: &str, l: &Value, r: &Value) -> Result<Value, String> {
    use Value::*;
    match (op, l, r) {
        ("+", Int(a), Int(b)) => Ok(Int(a + b)),
        ("-", Int(a), Int(b)) => Ok(Int(a - b)),
        ("*", Int(a), Int(b)) => Ok(Int(a * b)),
        ("/", Int(a), Int(b)) if *b != 0 => Ok(Int(a / b)),
        ("%", Int(a), Int(b)) if *b != 0 => Ok(Int(a % b)),
        ("+", Float(a), Float(b)) => Ok(Float(a + b)),
        ("-", Float(a), Float(b)) => Ok(Float(a - b)),
        ("*", Float(a), Float(b)) => Ok(Float(a * b)),
        ("/", Float(a), Float(b)) => Ok(Float(a / b)),

        ("==", a, b) => Ok(Bool(a == b)),
        ("!=", a, b) => Ok(Bool(a != b)),

        ("<",  Int(a), Int(b)) => Ok(Bool(a < b)),
        ("<=", Int(a), Int(b)) => Ok(Bool(a <= b)),
        (">",  Int(a), Int(b)) => Ok(Bool(a > b)),
        (">=", Int(a), Int(b)) => Ok(Bool(a >= b)),

        ("<",  Float(a), Float(b)) => Ok(Bool(a < b)),
        ("<=", Float(a), Float(b)) => Ok(Bool(a <= b)),
        (">",  Float(a), Float(b)) => Ok(Bool(a > b)),
        (">=", Float(a), Float(b)) => Ok(Bool(a >= b)),

        (op, a, b) => Err(format!(
            "unsupported binop `{op}` on {a:?} and {b:?}")),
    }
}

fn const_str(constants: &[Const], idx: u32) -> String {
    match constants.get(idx as usize) {
        Some(Const::NodeId(s)) | Some(Const::Str(s)) => s.clone(),
        _ => String::new(),
    }
}

/// Read `LEX_PAR_MAX_CONCURRENCY` (default = available CPU cores,
/// fallback 4). Capped at 64 so a malformed env var can't spawn an
/// unreasonable number of OS threads.
/// Order-defining comparator for `list.sort_by` keys (#338).
/// Same-typed Int / Float / Str pairs compare via their native
/// `Ord` / `PartialOrd`. Mixed-type or other key shapes compare
/// as Equal; combined with `Vec::sort_by`'s stability that
/// preserves the original element order — best-effort fallback
/// that never panics.
fn compare_sort_keys(a: &Value, b: &Value) -> std::cmp::Ordering {
    use std::cmp::Ordering;
    match (a, b) {
        (Value::Int(x), Value::Int(y)) => x.cmp(y),
        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
        (Value::Str(x), Value::Str(y)) => x.cmp(y),
        _ => Ordering::Equal,
    }
}

fn par_max_concurrency() -> usize {
    let from_env = std::env::var("LEX_PAR_MAX_CONCURRENCY")
        .ok()
        .and_then(|s| s.parse::<usize>().ok())
        .filter(|n| *n > 0);
    let default = std::thread::available_parallelism()
        .map(|n| n.get())
        .unwrap_or(4);
    from_env.unwrap_or(default).min(64)
}

/// `list.par_map`'s runtime: spawn OS threads (capped by
/// `LEX_PAR_MAX_CONCURRENCY`), apply `closure` to each item, return
/// results in input order. Each worker runs a fresh `Vm` with
/// [`DenyAllEffects`] for #305 slice 1 — effectful closures fail
/// with `VmError::Effect`. Slice 2 will plumb a per-thread effect
/// handler split.
fn par_map_run<'a>(
    program: &'a Program,
    closure: Value,
    items: Vec<Value>,
    worker_handlers: Vec<Box<dyn EffectHandler + Send>>,
) -> Result<Vec<Value>, VmError> {
    if items.is_empty() {
        return Ok(Vec::new());
    }
    let n_workers = worker_handlers.len().min(items.len()).max(1);
    // Carve items into `n_workers` round-robin buckets so each
    // worker processes (indices, items) pairs and we can reassemble
    // in input order.
    let mut buckets: Vec<Vec<(usize, Value)>> = (0..n_workers).map(|_| Vec::new()).collect();
    for (i, v) in items.into_iter().enumerate() {
        buckets[i % n_workers].push((i, v));
    }
    let n_total: usize = buckets.iter().map(|b| b.len()).sum();
    let results: std::sync::Mutex<Vec<Option<Result<Value, String>>>> =
        std::sync::Mutex::new((0..n_total).map(|_| None).collect());

    // Pair each bucket with its pre-built handler so workers own
    // their handler outright — no shared mutable state across
    // worker threads.
    let mut worker_handlers = worker_handlers;
    worker_handlers.truncate(n_workers);
    type Pair = (Vec<(usize, Value)>, Box<dyn EffectHandler + Send>);
    let pairs: Vec<Pair> = buckets.into_iter().zip(worker_handlers).collect();

    std::thread::scope(|s| {
        let mut handles = Vec::with_capacity(pairs.len());
        for (bucket, handler) in pairs {
            let closure = closure.clone();
            let results = &results;
            handles.push(s.spawn(move || {
                // `Box<dyn EffectHandler + Send>` has implicit
                // `+ 'static`; that coerces to `+ 'a` because
                // `'static` outlives any `'a`. The `Send` bound is
                // auto-erased on the unsize coercion.
                let handler_for_vm: Box<dyn EffectHandler + 'a> = handler;
                let mut vm = Vm::with_handler(program, handler_for_vm);
                for (idx, item) in bucket {
                    let r = vm
                        .invoke_closure_value(closure.clone(), vec![item])
                        .map_err(|e| format!("{e:?}"));
                    results.lock().unwrap()[idx] = Some(r);
                }
            }));
        }
        for h in handles {
            h.join().map_err(|_| ()).ok();
        }
    });

    let mut out = Vec::with_capacity(n_total);
    let inner = results.into_inner().unwrap();
    for r in inner {
        match r {
            Some(Ok(v)) => out.push(v),
            Some(Err(e)) => return Err(VmError::Effect(format!("par_map worker: {e}"))),
            None => return Err(VmError::Panic("par_map worker did not produce a result".into())),
        }
    }
    Ok(out)
}

impl<'a> Vm<'a> {
    pub fn new(program: &'a Program) -> Self {
        Self::with_handler(program, Box::new(DenyAllEffects))
    }

    pub fn with_handler(program: &'a Program, handler: Box<dyn EffectHandler + 'a>) -> Self {
        Self {
            program,
            handler,
            tracer: Box::new(NullTracer),
            // Pre-allocate enough capacity for a typical request so the first
            // call incurs no reallocation (#389 slice 3).
            frames: Vec::with_capacity(32),
            stack: Vec::with_capacity(128),
            step_limit: 10_000_000,
            steps: 0,
            pure_memo: std::collections::HashMap::new(),
            pure_memo_hits: 0,
            pure_memo_misses: 0,
            pure_memo_skips: 0,
            memo_fn_state: vec![MemoFnState::default(); program.functions.len()],
            field_ics: vec![Vec::new(); program.functions.len()],
            // 256 slots handles ~32 frames × 8 locals; grows on demand and
            // retains capacity across consecutive vm.call() invocations.
            locals_storage: Vec::with_capacity(256),
            // #464 step 2: zero capacity at construction — handlers that
            // never AllocStackRecord (most code today, until the lowering
            // pass kicks in) pay nothing. First allocation triggers Vec
            // growth; capacity is retained across `vm.call` invocations.
            stack_record_arena: Vec::new(),
            stack_record_allocs: 0,
            stack_record_heap_fallbacks: 0,
            heap_record_allocs: 0,
            // #463 slice 2a: empty until the first enter_request_scope.
            // Programs that never enter a scope incur zero arena cost
            // (the alloc ops, if reached, fall back to the heap path).
            arena_slab: Vec::new(),
            arena_scope_starts: Vec::new(),
            arena_record_allocs: 0,
            arena_record_heap_fallbacks: 0,
            jit_hook: None,
        }
    }

    pub fn set_tracer(&mut self, tracer: Box<dyn Tracer + 'a>) {
        self.tracer = tracer;
    }

    /// Install (or replace) the JIT hook consulted by `Op::Call`'s
    /// dispatch arm. With `None`, dispatch behaves exactly as before
    /// — the hook check is a single null-option branch the optimizer
    /// can hoist. See the [`crate::jit_hook`] module for the
    /// contract callers must uphold.
    pub fn set_jit_hook(&mut self, hook: Option<Box<dyn crate::jit_hook::JitHook + 'a>>) {
        self.jit_hook = hook;
    }

    /// Cap the number of opcode dispatches before the VM aborts with
    /// `step limit exceeded`. Useful as a runtime DoS guard against
    /// untrusted code (e.g. the `agent-tool` sandbox, where an LLM
    /// could emit `list.fold(list.range(0, 1_000_000_000), …)` to hang
    /// the host). Default is 10_000_000.
    pub fn set_step_limit(&mut self, limit: u64) {
        self.step_limit = limit;
    }

    pub fn call(&mut self, name: &str, args: Vec<Value>) -> Result<Value, VmError> {
        let fn_id = self.program.lookup(name).ok_or_else(|| VmError::Panic(format!("no function `{name}`")))?;
        self.invoke(fn_id, args)
    }

    /// Vm-level handler for `parser.run` (#221). Routed here from
    /// `Op::EffectCall` rather than through the `EffectHandler` so
    /// the recursive parser interpreter has reentrant Vm access for
    /// closure invocation. Returns the wrapped `Result[T, ParseErr]`
    /// value the language sees.
    fn run_parser_op(&mut self, args: Vec<Value>) -> Result<Value, String> {
        let parser = args.first().cloned()
            .ok_or_else(|| "parser.run: missing parser arg".to_string())?;
        let input = match args.get(1) {
            Some(Value::Str(s)) => s.clone(),
            _ => return Err("parser.run: input must be Str".into()),
        };
        match crate::parser_runtime::run_parser(&parser, &input, 0, self) {
            Ok((value, _pos)) => Ok(Value::Variant {
                name: "Ok".into(),
                args: vec![value],
            }),
            Err((pos, msg)) => {
                let mut e: IndexMap<String, Value> = IndexMap::new();
                e.insert("pos".into(), Value::Int(pos as i64));
                e.insert("message".into(), Value::Str(msg.into()));
                Ok(Value::Variant {
                    name: "Err".into(),
                    args: vec![Value::record_dynamic(e)],
                })
            }
        }
    }

    // ---- Variant helpers used by conc.* registry ops (#444) ----
    // Local helpers (avoid pulling in serde / public API). Lex's
    // `Result`/`Option` are stdlib unions; their runtime shape is a
    // `Value::Variant { name, args }` with the constructor name as
    // declared (`Ok`/`Err`/`Some`/`None`).

    /// VM-level handler for `conc.*` effect ops (#381).
    ///
    /// * `conc.spawn(init, handler)` — creates an `Actor` wrapping the
    ///   initial state and the handler closure. No background thread is
    ///   started; the actor runs synchronously on the calling thread
    ///   under a `Mutex` so concurrent callers serialise.
    ///
    /// * `conc.ask(actor, msg)` — locks the actor, calls
    ///   `handler(state, msg)` on *this* VM (reentrant), expects a
    ///   2-tuple `(new_state, reply)`, updates the actor's state, and
    ///   returns `reply`.
    ///
    /// * `conc.tell(actor, msg)` — same as `ask` but discards the
    ///   reply and returns `Unit`.
    fn run_conc_op(&mut self, op: &str, args: Vec<Value>) -> Result<Value, String> {
        match op {
            "spawn" => {
                let mut it = args.into_iter();
                let init = it.next().unwrap_or(Value::Unit);
                let handler = it.next().unwrap_or(Value::Unit);
                if !matches!(handler, Value::Closure { .. }) {
                    return Err(format!(
                        "conc.spawn: handler must be a Closure, got {handler:?}"));
                }
                Ok(Value::Actor(Arc::new(Mutex::new(ActorCell {
                    state: init,
                    handler: crate::value::ActorHandler::Lex(handler),
                }))))
            }
            "ask" | "tell" => {
                let mut it = args.into_iter();
                let actor_val = it.next().unwrap_or(Value::Unit);
                let msg = it.next().unwrap_or(Value::Unit);
                let cell = match actor_val {
                    Value::Actor(ref arc) => Arc::clone(arc),
                    other => return Err(format!(
                        "conc.{op}: first arg must be an Actor, got {other:?}")),
                };
                // Lock the actor: guarantees at-most-one-concurrent message.
                let mut guard = cell.lock().map_err(|e| format!("conc.{op}: actor mutex poisoned: {e}"))?;
                let handler = guard.handler.clone();
                let state = guard.state.clone();
                match handler {
                    crate::value::ActorHandler::Lex(closure_val) => {
                        // Call handler(state, msg) on this VM — full effect access.
                        let result = self.invoke_closure_value(closure_val, vec![state, msg])
                            .map_err(|e| format!("conc.{op}: handler error: {e:?}"))?;
                        // Expect (new_state, reply) tuple.
                        match result {
                            Value::Tuple(mut parts) if parts.len() == 2 => {
                                let reply = parts.pop().unwrap();
                                let new_state = parts.pop().unwrap();
                                guard.state = new_state;
                                drop(guard);
                                if op == "ask" { Ok(reply) } else { Ok(Value::Unit) }
                            }
                            other => Err(format!(
                                "conc.{op}: handler must return a 2-tuple (new_state, reply), got {other:?}")),
                        }
                    }
                    crate::value::ActorHandler::Native(native) => {
                        // Native bridge: fire-and-forget; `state` is unused
                        // (the bridge's "state" is the external resource, e.g.
                        // a WebSocket connection). The closure receives `msg`
                        // directly. `ask` returns whatever the bridge produces;
                        // `tell` discards it. State stays untouched.
                        drop(guard);
                        let result = (native.send)(msg)
                            .map_err(|e| format!("conc.{op}: native handler error: {e}"))?;
                        if op == "ask" { Ok(result) } else { Ok(Value::Unit) }
                    }
                }
            }
            "register" => {
                // conc.register(actor, name) -> Result[Unit, ConcError]
                // Returns Ok(Unit) on first register, Err(AlreadyRegistered(name))
                // if the name is taken. v1 stores the actor opaquely —
                // see crate::conc_registry for the type-tag note.
                let mut it = args.into_iter();
                let actor = it.next().unwrap_or(Value::Unit);
                if !matches!(actor, Value::Actor(_)) {
                    return Err(format!(
                        "conc.register: first arg must be an Actor, got {actor:?}"));
                }
                let name = match it.next() {
                    Some(Value::Str(s)) => s.to_string(),
                    other => return Err(format!(
                        "conc.register: name must be Str, got {other:?}")),
                };
                Ok(match crate::conc_registry::register(&name, actor) {
                    Ok(()) => variant_ok(Value::Unit),
                    Err(crate::conc_registry::RegError::AlreadyRegistered(n)) => {
                        variant_err(variant("AlreadyRegistered", vec![Value::Str(n.into())]))
                    }
                    Err(crate::conc_registry::RegError::NotRegistered(_)) => {
                        unreachable!("register cannot produce NotRegistered")
                    }
                })
            }
            "lookup" => {
                // conc.lookup(name) -> Option[Actor[S, M]]
                // Returns Some(actor) if registered, None otherwise. The
                // [S, M] static parametrisation at the call site is not
                // checked at runtime in v1 — caller's responsibility to
                // match the registration site's type.
                let mut it = args.into_iter();
                let name = match it.next() {
                    Some(Value::Str(s)) => s.to_string(),
                    other => return Err(format!(
                        "conc.lookup: name must be Str, got {other:?}")),
                };
                Ok(match crate::conc_registry::lookup(&name) {
                    Some(actor) => variant("Some", vec![actor]),
                    None => variant("None", vec![]),
                })
            }
            "unregister" => {
                // conc.unregister(name) -> Result[Unit, ConcError]
                let mut it = args.into_iter();
                let name = match it.next() {
                    Some(Value::Str(s)) => s.to_string(),
                    other => return Err(format!(
                        "conc.unregister: name must be Str, got {other:?}")),
                };
                Ok(match crate::conc_registry::unregister(&name) {
                    Ok(()) => variant_ok(Value::Unit),
                    Err(crate::conc_registry::RegError::NotRegistered(n)) => {
                        variant_err(variant("NotRegistered", vec![Value::Str(n.into())]))
                    }
                    Err(crate::conc_registry::RegError::AlreadyRegistered(_)) => {
                        unreachable!("unregister cannot produce AlreadyRegistered")
                    }
                })
            }
            "registered" => {
                // conc.registered() -> List[Str] — sorted snapshot.
                let names = crate::conc_registry::registered();
                Ok(Value::List(names.into_iter()
                    .map(|n| Value::Str(n.into()))
                    .collect()))
            }
            other => Err(format!("unknown conc.{other}")),
        }
    }

    /// Invoke a `Value::Closure` by combining its captures with the
    /// supplied call args and dispatching to the underlying function.
    /// Used by the parser interpreter (#221) to call user-supplied
    /// `f` arguments inside `parser.map` / `parser.and_then` nodes.
    pub fn invoke_closure_value(
        &mut self,
        closure: Value,
        args: Vec<Value>,
    ) -> Result<Value, VmError> {
        let (fn_id, captures) = match closure {
            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
            other => return Err(VmError::TypeMismatch(
                format!("invoke_closure_value: not a closure: {other:?}"))),
        };
        let mut combined = captures;
        combined.extend(args);
        self.invoke(fn_id, combined)
    }

    /// Invoke a 1-arg closure without allocating a separate args
    /// `Vec` (#464 call-overhead). The closure's own `captures` Vec
    /// is reused as the combined `captures ++ [arg]` argument buffer,
    /// so the per-element call in `ListMap`/`ListFilter`/`SortByKey`
    /// allocates at most once (the `push`) instead of twice (a fresh
    /// `vec![arg]` plus the `extend`). Semantically identical to
    /// `invoke_closure_value(closure, vec![arg])`.
    pub fn invoke_closure_1(&mut self, closure: Value, arg: Value) -> Result<Value, VmError> {
        let (fn_id, mut combined) = match closure {
            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
            other => return Err(VmError::TypeMismatch(
                format!("invoke_closure_1: not a closure: {other:?}"))),
        };
        combined.push(arg);
        self.invoke(fn_id, combined)
    }

    /// Invoke a 2-arg closure without a separate args `Vec` — the
    /// `ListFold` combiner path. See `invoke_closure_1`.
    pub fn invoke_closure_2(&mut self, closure: Value, a: Value, b: Value) -> Result<Value, VmError> {
        let (fn_id, mut combined) = match closure {
            Value::Closure { fn_id, captures, .. } => (fn_id, captures),
            other => return Err(VmError::TypeMismatch(
                format!("invoke_closure_2: not a closure: {other:?}"))),
        };
        combined.push(a);
        combined.push(b);
        self.invoke(fn_id, combined)
    }

    /// Open a request-scoped arena via the underlying
    /// `EffectHandler::enter_request_scope` (#463 scaffolding).
    /// Runtime layers — `net.serve_fn`, `net.serve_ws`,
    /// `net.serve_quic` — call this immediately before invoking the
    /// user handler closure for a single request. Pair with
    /// `exit_request_scope` once the response has been built and
    /// any lazy iterators in it have been drained (#477).
    ///
    /// Returns the scope id the runtime should pass back to
    /// `exit_request_scope`. The handler's default impl returns 0
    /// and the matching `exit` is a no-op; `DefaultHandler`'s
    /// implementation actually allocates an arena.
    pub fn enter_request_scope(&mut self) -> u64 {
        // #463 slice 2a: snapshot the slab high-water mark so
        // `exit_request_scope` can truncate back to here, releasing
        // every arena-allocated value the scope built in O(1).
        self.arena_scope_starts.push(self.arena_slab.len() as u32);
        self.handler.enter_request_scope()
    }

    /// True iff there is at least one active request scope — i.e. an
    /// `enter_request_scope` not yet matched by `exit_request_scope`.
    /// Runtime layers use this to skip `materialize_arena_handles` on
    /// paths where no scope was entered (e.g. tiny-http worker
    /// dispatch), keeping the no-arena path zero-cost. Slice 2b-i.
    pub fn arena_scope_active(&self) -> bool {
        !self.arena_scope_starts.is_empty()
    }

    /// Close the request scope opened by `enter_request_scope`.
    /// Drops the associated arena.
    pub fn exit_request_scope(&mut self, scope_id: u64) {
        // #463 slice 2a: truncate the slab back to the matching
        // `enter` snapshot, then notify the handler. Out-of-order /
        // unpaired exits (e.g. a stray `exit` with no prior `enter`)
        // are tolerated as no-ops — the handler does the same, and a
        // stray exit shouldn't crash a live server.
        if let Some(start) = self.arena_scope_starts.pop() {
            self.arena_slab.truncate(start as usize);
        }
        self.handler.exit_request_scope(scope_id)
    }

    /// Deep-walk `value` and resolve every `Value::ArenaRecord` /
    /// `Value::ArenaTuple` handle into its heap-owned equivalent
    /// (`Value::Record` / `Value::Tuple`), reading field contents
    /// out of `Vm::arena_slab` along the way. Primitives, closures,
    /// maps/sets, and the host-managed handles (`Actor` / `Ticker` /
    /// `ArrowTable`) are returned unchanged.
    ///
    /// **The boundary helper** flagged in
    /// `docs/design/arena-plumbing.md` § "Arena handles MUST be
    /// readable at serialization". Callers — the response
    /// serialization path in `lex-runtime`, the trace recorder when
    /// it records a Call/EffectCall arg, anywhere a value crosses
    /// out of the VM into host-managed storage — call this
    /// **while the producing scope is still active**, before
    /// `exit_request_scope`. After exit the slab is truncated, so a
    /// handle materialized after-the-fact would read garbage (or
    /// panic on the bounds check).
    ///
    /// `Value::StackRecord` / `Value::StackTuple` would similarly
    /// need slab resolution, but the #464 escape analysis prevents
    /// them from reaching boundary-crossing ops in the first place
    /// (they're frame-local by construction). Reaching here means a
    /// hand-built or analysis-buggy program; we panic with the same
    /// loud-not-silent contract the other inspection paths use.
    ///
    /// Idempotent on already-materialized values (no arena handles
    /// in the tree → only the recursive walk's clones, no slab
    /// lookups). Cost per call is one walk + clone of the tree —
    /// amortized over the per-node mallocs avoided during request
    /// handling, the net stays strongly positive.
    pub fn materialize_arena_handles(&self, value: Value) -> Value {
        use crate::value::Value as V;
        match value {
            // Primitives + opaque handles cross unchanged. Cheap
            // — clones are essentially free for the Copy-ish ones
            // and Arc-bumps for the handle types.
            V::Int(_) | V::Float(_) | V::Bool(_) | V::Str(_) | V::Bytes(_)
            | V::Unit | V::Closure { .. } | V::F64Array { .. }
            | V::Map(_) | V::Set(_) | V::Actor(_) | V::Ticker(_)
            | V::ArrowTable(_) => value,

            // Containers: recurse on each element. Map/Set keys are
            // MapKey (Str | Int), never Value, so no handles can
            // hide there.
            V::List(items) => V::List(
                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
            V::Tuple(items) => V::Tuple(
                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
            V::Deque(items) => V::Deque(
                items.into_iter().map(|v| self.materialize_arena_handles(v)).collect()),
            V::Variant { name, args } => V::Variant {
                name,
                args: args.into_iter().map(|v| self.materialize_arena_handles(v)).collect(),
            },
            V::Record { shape_id, fields } => {
                let mut out: IndexMap<SmolStr, Value> = IndexMap::with_capacity(fields.len());
                for (k, v) in fields.into_iter() {
                    out.insert(k, self.materialize_arena_handles(v));
                }
                V::Record { shape_id, fields: Box::new(out) }
            }

            // The actual resolution work — read the slab and build a
            // heap form. Field-name ordering for ArenaRecord matches
            // the shape's, same as `MakeRecord`'s IndexMap insertion
            // pattern; that's the contract that makes the polymorphic
            // GetField IC work, and we reuse it here.
            V::ArenaRecord { shape_id, slab_start, field_count } => {
                let start = slab_start as usize;
                let n = field_count as usize;
                debug_assert!(start + n <= self.arena_slab.len(),
                    "ArenaRecord handle out of bounds — likely materialized after exit_request_scope");
                let shape = &self.program.record_shapes[shape_id as usize];
                let mut fields: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
                for (i, name_const_idx) in shape.iter().take(n).enumerate() {
                    let name: SmolStr = match &self.program.constants[*name_const_idx as usize] {
                        Const::FieldName(s) => s.as_str().into(),
                        _ => panic!("BUG(#463): ArenaRecord shape entry not a FieldName const"),
                    };
                    let v = self.materialize_arena_handles(self.arena_slab[start + i].clone());
                    fields.insert(name, v);
                }
                V::Record { shape_id, fields: Box::new(fields) }
            }
            V::ArenaTuple { slab_start, arity } => {
                let start = slab_start as usize;
                let n = arity as usize;
                debug_assert!(start + n <= self.arena_slab.len(),
                    "ArenaTuple handle out of bounds — likely materialized after exit_request_scope");
                let items: Vec<Value> = (0..n)
                    .map(|i| self.materialize_arena_handles(self.arena_slab[start + i].clone()))
                    .collect();
                V::Tuple(items)
            }

            // #464 stack handles are frame-local; the analysis
            // prevents them from reaching any boundary the
            // materializer is called at. Reach = bug; panic loud.
            V::StackRecord { .. } =>
                panic!("BUG(#464/#463): Value::StackRecord reached materialize_arena_handles \
                        — escape analysis should keep stack handles inside their frame"),
            V::StackTuple { .. } =>
                panic!("BUG(#464/#463): Value::StackTuple reached materialize_arena_handles \
                        — escape analysis should keep stack handles inside their frame"),
        }
    }

    /// Read a named field out of a record without materializing its
    /// parent. Works uniformly on `Value::Record` (heap) and
    /// `Value::ArenaRecord` (slab handle), so a runtime layer can
    /// consume the response record structurally — straight out of
    /// the arena slab — instead of paying for a tree-wide
    /// `materialize_arena_handles` walk just to read three top-level
    /// fields.
    ///
    /// Returns `None` if the value isn't a record or the field
    /// doesn't exist. The returned `Value` is a clone of the slot
    /// contents (records' field values can themselves be records,
    /// variants, etc.; cloning at the boundary is unavoidable
    /// without lifetime trickery on the public API).
    ///
    /// Performance: on the heap path it's a `IndexMap::get` + clone.
    /// On the arena path it's a linear walk of the shape's
    /// field-name vec (`field_count` long, typically ≤ 10) +
    /// an O(1) slab index + clone. The polymorphic-IC equivalent
    /// inside the VM is faster, but this API is for **host**
    /// consumers, not hot-loop dispatch.
    ///
    /// `Value::StackRecord` is deliberately not handled — those
    /// handles are frame-local by construction (#464 escape pass)
    /// and shouldn't reach host boundaries; reaching them here is
    /// a soundness bug surfaced as a panic, matching the existing
    /// inspection-path contract.
    pub fn get_record_field(&self, value: &Value, name: &str) -> Option<Value> {
        match value {
            Value::Record { fields, .. } => fields.get(name).cloned(),
            Value::ArenaRecord { shape_id, slab_start, field_count } => {
                let shape = self.program.record_shapes.get(*shape_id as usize)?;
                let n = (*field_count as usize).min(shape.len());
                for (i, &name_const_idx) in shape.iter().take(n).enumerate() {
                    if let Const::FieldName(s) = &self.program.constants[name_const_idx as usize] {
                        if s == name {
                            return Some(self.arena_slab[*slab_start as usize + i].clone());
                        }
                    }
                }
                None
            }
            Value::StackRecord { .. } =>
                panic!("BUG(#464): Value::StackRecord reached Vm::get_record_field \
                        — frame-local handles should never reach the host boundary"),
            _ => None,
        }
    }

    /// Positional read out of a tuple without materializing its
    /// parent. Works uniformly on `Value::Tuple` and
    /// `Value::ArenaTuple`. See `get_record_field` for the lifetime
    /// rationale.
    pub fn get_tuple_elem(&self, value: &Value, idx: u16) -> Option<Value> {
        match value {
            Value::Tuple(items) => items.get(idx as usize).cloned(),
            Value::ArenaTuple { slab_start, arity } => {
                if idx >= *arity { return None; }
                Some(self.arena_slab[*slab_start as usize + idx as usize].clone())
            }
            Value::StackTuple { .. } =>
                panic!("BUG(#464): Value::StackTuple reached Vm::get_tuple_elem \
                        — frame-local handles should never reach the host boundary"),
            _ => None,
        }
    }

    /// Arena-aware `to_json` — produces a `serde_json::Value` from
    /// a `Value` whose tree may contain `ArenaRecord` / `ArenaTuple`
    /// handles, reading them straight out of `Vm::arena_slab`
    /// instead of materializing into a heap `Value::Record` mirror
    /// first.
    ///
    /// Equivalent output to `value.to_json()` on a fully-materialized
    /// tree (idempotent in that sense). Use this when serializing a
    /// handler return value to JSON for the response — saves the
    /// per-node IndexMap allocations the materialize-then-to_json
    /// pattern pays.
    pub fn value_to_json(&self, value: &Value) -> serde_json::Value {
        use serde_json::Value as J;
        match value {
            // Primitives + opaque host handles: delegate to the
            // existing `Value::to_json` — its output is identical
            // and it handles the host-handle types we don't model
            // (Actor / Ticker / ArrowTable / F64Array / Map / Set /
            // Closure / Bytes encoding) in one place.
            Value::Int(_) | Value::Float(_) | Value::Bool(_) | Value::Str(_)
            | Value::Bytes(_) | Value::Unit | Value::Closure { .. }
            | Value::F64Array { .. } | Value::Map(_) | Value::Set(_)
            | Value::Actor(_) | Value::Ticker(_) | Value::ArrowTable(_)
                => value.to_json(),

            Value::List(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
            Value::Tuple(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
            Value::Deque(items) => J::Array(items.iter().map(|v| self.value_to_json(v)).collect()),
            Value::Variant { name, args } => {
                let mut m = serde_json::Map::new();
                m.insert("$variant".into(), J::String(name.clone()));
                m.insert("args".into(),
                    J::Array(args.iter().map(|v| self.value_to_json(v)).collect()));
                J::Object(m)
            }
            Value::Record { fields, .. } => {
                let mut m = serde_json::Map::new();
                for (k, v) in fields.iter() {
                    m.insert(k.to_string(), self.value_to_json(v));
                }
                J::Object(m)
            }

            // Slab-direct: read the cells in shape order, emit a
            // JSON object using the shape's field names. The cost
            // delta vs the `Value::to_json` materialize-then-walk
            // path is the saved `Box<IndexMap>` allocation +
            // insertion + drop.
            Value::ArenaRecord { shape_id, slab_start, field_count } => {
                let shape = match self.program.record_shapes.get(*shape_id as usize) {
                    Some(s) => s,
                    None => return J::Null,
                };
                let n = (*field_count as usize).min(shape.len());
                let mut m = serde_json::Map::with_capacity(n);
                for (i, &name_const_idx) in shape.iter().take(n).enumerate() {
                    let name = match &self.program.constants[name_const_idx as usize] {
                        Const::FieldName(s) => s.to_string(),
                        _ => continue,
                    };
                    let cell = &self.arena_slab[*slab_start as usize + i];
                    m.insert(name, self.value_to_json(cell));
                }
                J::Object(m)
            }
            Value::ArenaTuple { slab_start, arity } => {
                let start = *slab_start as usize;
                let n = *arity as usize;
                let items: Vec<serde_json::Value> = (0..n)
                    .map(|i| self.value_to_json(&self.arena_slab[start + i]))
                    .collect();
                J::Array(items)
            }

            // Stack handles must not reach the host — same defensive
            // panic as the other inspection paths.
            Value::StackRecord { .. } =>
                panic!("BUG(#464): Value::StackRecord reached Vm::value_to_json \
                        — frame-local handles should never reach the host boundary"),
            Value::StackTuple { .. } =>
                panic!("BUG(#464): Value::StackTuple reached Vm::value_to_json \
                        — frame-local handles should never reach the host boundary"),
        }
    }

    pub fn invoke(&mut self, fn_id: u32, args: Vec<Value>) -> Result<Value, VmError> {
        let f = &self.program.functions[fn_id as usize];
        if args.len() != f.arity as usize {
            return Err(VmError::Panic(format!("arity mismatch calling {}", f.name)));
        }
        // Refinement runtime check at the public entry point too
        // (#209 slice 3). `Op::Call` checks for in-program calls;
        // this branch covers `vm.call("entry", ...)` from the host
        // and the reentrant `invoke_closure_value` path. Same
        // semantics, same error shape.
        //
        // Iterate `f.refinements` by reference — the loop body
        // only reads from `self.program` (via `r`) and from locals,
        // so we don't need to clone the Vec to detach it from
        // `&self`. The function name is cloned **lazily**, only on
        // the failure path: functions with no refinements (the common
        // case) never enter the loop, so the per-call `f.name.clone()`
        // was pure waste on the hot path (#464 call-overhead).
        for (i, refinement) in f.refinements.iter().enumerate() {
            if let Some(r) = refinement {
                let arg = args.get(i).cloned().unwrap_or(Value::Unit);
                match eval_refinement(&r.predicate, &r.binding, &arg) {
                    Ok(true) => {}
                    Ok(false) => return Err(VmError::RefinementFailed {
                        fn_name: f.name.clone(),
                        param_index: i,
                        binding: r.binding.clone(),
                        reason: format!("predicate failed for {} = {arg:?}", r.binding),
                    }),
                    Err(reason) => return Err(VmError::RefinementFailed {
                        fn_name: f.name.clone(),
                        param_index: i,
                        binding: r.binding.clone(),
                        reason,
                    }),
                }
            }
        }
        // #465 JIT tier hook at the public entry — same contract as
        // the `Op::Call` dispatch arm. Pure-fn memo is not consulted
        // at this layer (memo is per-Op::Call); the hook fires
        // unconditionally for refinement-clean calls.
        if let Some(mut hook) = self.jit_hook.take() {
            let hook_result = hook.try_call(fn_id, &args);
            self.jit_hook = Some(hook);
            if let Some(result) = hook_result? {
                return Ok(result);
            }
        }
        let f = &self.program.functions[fn_id as usize];
        // Claim slots from the locals stack allocator (#389 slice 3).
        let locals_start = self.locals_storage.len();
        let locals_len = f.locals_count.max(f.arity) as usize;
        self.locals_storage.resize(locals_start + locals_len, Value::Unit);
        for (i, v) in args.into_iter().enumerate() {
            self.locals_storage[locals_start + i] = v;
        }
        // Record the depth before pushing — this is what `run` will
        // exit at, supporting reentrant invocation from inside the
        // VM (e.g. the parser interpreter calling closures, #221).
        let base_depth = self.frames.len();
        self.push_frame(Frame {
            fn_id, pc: 0, locals_start, locals_len,
            stack_base: self.stack.len(),
            trace_kind: FrameKind::Entry,
            memo_key: None,
            stack_record_arena_start: self.stack_record_arena.len(),
            stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
        })?;
        self.run_to(base_depth)
    }

    /// All call-frame pushes funnel through here so the depth
    /// check can't be skipped by a missing branch. Returns
    /// `CallStackOverflow` instead of letting recursion blow the
    /// host's native stack.
    fn push_frame(&mut self, frame: Frame) -> Result<(), VmError> {
        if self.frames.len() as u32 >= MAX_CALL_DEPTH {
            return Err(VmError::CallStackOverflow(MAX_CALL_DEPTH));
        }
        self.frames.push(frame);
        Ok(())
    }

    /// Run until the frame stack drops to `base_depth`. Required for
    /// reentrant invocation: a `Vm::invoke` call from inside an
    /// already-running `run()` must return when *its* frame returns,
    /// not when the entire frame stack empties (#221).
    fn run_to(&mut self, base_depth: usize) -> Result<Value, VmError> {
        // #461 slice A: cache the executing function's code slice across
        // ops instead of re-deriving `program.functions[fn_id].code` on
        // every iteration. The program is borrowed (`&'a Program`) and is
        // never mutated during a run, so the slice reference is valid for
        // the whole run and — crucially — is independent of the `&mut self`
        // borrow the op handlers take: it points into the caller-owned
        // `Program`, not into `*self`. Re-resolve only when `fn_id`
        // changes, which is exactly the frame-transition set (Call /
        // CallClosure / TailCall / Return); recursion into the same
        // `fn_id` correctly keeps the cached slice. `frame_idx` / `fn_id`
        // stay recomputed per op (cheap field reads), so the op handlers
        // are untouched and their `fn_id` bindings shadow as before.
        let program: &'a Program = self.program;
        let mut code: &'a [Op] = &[];
        let mut code_fn_id: u32 = u32::MAX;
        loop {
            if self.steps > self.step_limit {
                let frame_idx = self.frames.len() - 1;
                let fn_id = self.frames[frame_idx].fn_id;
                let fn_name = &program.functions[fn_id as usize].name;
                return Err(VmError::Panic(format!(
                    "step limit exceeded in `{fn_name}` ({} > {})",
                    self.steps, self.step_limit,
                )));
            }
            self.steps += 1;
            let frame_idx = self.frames.len() - 1;
            let pc = self.frames[frame_idx].pc;
            let fn_id = self.frames[frame_idx].fn_id;
            if fn_id != code_fn_id {
                code = &program.functions[fn_id as usize].code;
                code_fn_id = fn_id;
            }
            // #461 slice B: the bytecode verifier (#366) proves pc stays
            // in bounds for every reachable op — every path through a
            // function ends in Return / Jump / TailCall, so execution
            // never falls off the end of `code`. The per-op
            // `pc >= code.len()` guard is therefore redundant for verified
            // programs; demote it to a debug-only assertion. The `code[pc]`
            // index below stays bounds-checked, so a malformed program in
            // a release build still panics (loudly, just without the
            // bespoke message) rather than reading out of bounds — no
            // `unsafe`, no UB, only the cold error-return path leaves the
            // hot loop.
            debug_assert!(
                pc < code.len(),
                "ran past end of code in `{}`",
                program.functions[fn_id as usize].name,
            );
            let op = code[pc];
            self.frames[frame_idx].pc = pc + 1;

            match op {
                Op::PushConst(i) => {
                    let c = &self.program.constants[i as usize];
                    self.stack.push(const_to_value(c));
                }
                Op::Pop => { self.pop()?; }
                Op::Dup => {
                    let v = self.peek()?.clone();
                    self.stack.push(v);
                }
                Op::LoadLocal(i) => {
                    let base = self.frames[frame_idx].locals_start;
                    let v = self.locals_storage[base + i as usize].clone();
                    self.stack.push(v);
                }
                Op::StoreLocal(i) => {
                    let v = self.pop()?;
                    let base = self.frames[frame_idx].locals_start;
                    self.locals_storage[base + i as usize] = v;
                }
                Op::MakeRecord { shape_idx, field_count } => {
                    self.heap_record_allocs += 1;
                    let shape = &self.program.record_shapes[shape_idx as usize];
                    let n = field_count as usize;
                    debug_assert_eq!(shape.len(), n,
                        "MakeRecord field_count must match record_shapes[shape_idx].len()");
                    let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                    for i in (0..n).rev() {
                        values[i] = self.pop()?;
                    }
                    let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
                    for (i, val) in values.into_iter().enumerate() {
                        let name: SmolStr = match &self.program.constants[shape[i] as usize] {
                            Const::FieldName(s) => s.as_str().into(),
                            _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
                        };
                        rec.insert(name, val);
                    }
                    self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
                }
                Op::AllocStackRecord { shape_idx, field_count } => {
                    // #464 step 2. Same value-stack contract as
                    // MakeRecord (pop `field_count`, push 1), but the
                    // fields live in the VM's stack-record arena
                    // instead of a heap-allocated IndexMap.
                    //
                    // Budget check: if this frame's remaining
                    // allocation budget can't cover `field_count`
                    // slots, fall back to MakeRecord behavior. The
                    // observable result is identical (a record
                    // value) so the compiler doesn't need to know
                    // ahead of time whether the budget will hold.
                    let n = field_count as usize;
                    let frame = &mut self.frames[frame_idx];
                    if frame.stack_record_budget_remaining < field_count as u32 {
                        self.stack_record_heap_fallbacks += 1;
                        // Heap fallback path — exact copy of
                        // MakeRecord's body. Compiler emitted
                        // AllocStackRecord because escape analysis
                        // proved the record can stay frame-local;
                        // the budget exhaustion is a runtime cost
                        // ceiling, not a correctness issue.
                        let shape = &self.program.record_shapes[shape_idx as usize];
                        debug_assert_eq!(shape.len(), n,
                            "AllocStackRecord field_count must match record_shapes[shape_idx].len()");
                        let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                        for i in (0..n).rev() {
                            values[i] = self.pop()?;
                        }
                        let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
                        for (i, val) in values.into_iter().enumerate() {
                            let name: SmolStr = match &self.program.constants[shape[i] as usize] {
                                Const::FieldName(s) => s.as_str().into(),
                                _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
                            };
                            rec.insert(name, val);
                        }
                        self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
                    } else {
                        self.stack_record_allocs += 1;
                        // Stack path: append the popped field values
                        // to the arena in shape order (matches the
                        // IndexMap insertion order used by MakeRecord,
                        // so the polymorphic GetField IC sees the same
                        // offset for either variant).
                        frame.stack_record_budget_remaining -= field_count as u32;
                        let slab_start = self.stack_record_arena.len();
                        // Reserve all slots upfront so we can write in
                        // shape order while popping in reverse —
                        // matches MakeRecord's idiom.
                        self.stack_record_arena.resize(slab_start + n, Value::Unit);
                        for i in (0..n).rev() {
                            let v = self.pop()?;
                            self.stack_record_arena[slab_start + i] = v;
                        }
                        self.stack.push(Value::StackRecord {
                            shape_id: shape_idx,
                            slab_start: slab_start as u32,
                            field_count,
                        });
                    }
                }
                Op::AllocArenaRecord { shape_idx, field_count } => {
                    // #463 slice 2a. Same value-stack contract as
                    // MakeRecord, but field values land in the
                    // request-scoped `arena_slab` instead of a
                    // per-field heap IndexMap. Runtime fallback when
                    // no scope is active — the op silently degrades
                    // to the MakeRecord heap path so arena-lowered
                    // bytecode stays sound in non-handler contexts
                    // (REPL, tests, top-level scripts).
                    let n = field_count as usize;
                    if self.arena_scope_starts.is_empty() {
                        self.arena_record_heap_fallbacks += 1;
                        // Heap fallback path — exact copy of
                        // MakeRecord's body. Same compile-time
                        // contract (shape order, IndexMap insertion)
                        // so the resulting Value::Record is
                        // indistinguishable from a direct MakeRecord.
                        let shape = &self.program.record_shapes[shape_idx as usize];
                        debug_assert_eq!(shape.len(), n,
                            "AllocArenaRecord field_count must match record_shapes[shape_idx].len()");
                        let mut values: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                        for i in (0..n).rev() {
                            values[i] = self.pop()?;
                        }
                        let mut rec: IndexMap<SmolStr, Value> = IndexMap::with_capacity(n);
                        for (i, val) in values.into_iter().enumerate() {
                            let name: SmolStr = match &self.program.constants[shape[i] as usize] {
                                Const::FieldName(s) => s.as_str().into(),
                                _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
                            };
                            rec.insert(name, val);
                        }
                        self.stack.push(Value::Record { shape_id: shape_idx, fields: Box::new(rec) });
                    } else {
                        self.arena_record_allocs += 1;
                        // Arena path: append the popped field values
                        // to the slab in shape order (matches
                        // MakeRecord's IndexMap insertion order, so
                        // the polymorphic GetField IC sees the same
                        // offset across all three variants).
                        let slab_start = self.arena_slab.len();
                        self.arena_slab.resize(slab_start + n, Value::Unit);
                        for i in (0..n).rev() {
                            let v = self.pop()?;
                            self.arena_slab[slab_start + i] = v;
                        }
                        self.stack.push(Value::ArenaRecord {
                            shape_id: shape_idx,
                            slab_start: slab_start as u32,
                            field_count,
                        });
                    }
                }
                Op::MakeTuple(n) => {
                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
                    self.stack.push(Value::Tuple(items));
                }
                Op::AllocStackTuple { arity } => {
                    // #464 tuple codegen. Same value-stack contract as
                    // MakeTuple (pop `arity`, push 1), but the elements
                    // live in the shared stack-record arena instead of
                    // a heap Vec. Budget exhaustion falls back to the
                    // MakeTuple heap path — identical observable result.
                    let n = arity as usize;
                    let frame = &mut self.frames[frame_idx];
                    if frame.stack_record_budget_remaining < arity as u32 {
                        self.stack_record_heap_fallbacks += 1;
                        let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                        for i in (0..n).rev() { items[i] = self.pop()?; }
                        self.stack.push(Value::Tuple(items));
                    } else {
                        self.stack_record_allocs += 1;
                        frame.stack_record_budget_remaining -= arity as u32;
                        let slab_start = self.stack_record_arena.len();
                        self.stack_record_arena.resize(slab_start + n, Value::Unit);
                        for i in (0..n).rev() {
                            let v = self.pop()?;
                            self.stack_record_arena[slab_start + i] = v;
                        }
                        self.stack.push(Value::StackTuple {
                            slab_start: slab_start as u32,
                            arity,
                        });
                    }
                }
                Op::AllocArenaTuple { arity } => {
                    // #463 slice 2a. Tuple analogue of
                    // AllocArenaRecord: arena slab when a scope is
                    // active, MakeTuple heap fallback otherwise.
                    let n = arity as usize;
                    if self.arena_scope_starts.is_empty() {
                        self.arena_record_heap_fallbacks += 1;
                        let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                        for i in (0..n).rev() { items[i] = self.pop()?; }
                        self.stack.push(Value::Tuple(items));
                    } else {
                        self.arena_record_allocs += 1;
                        let slab_start = self.arena_slab.len();
                        self.arena_slab.resize(slab_start + n, Value::Unit);
                        for i in (0..n).rev() {
                            let v = self.pop()?;
                            self.arena_slab[slab_start + i] = v;
                        }
                        self.stack.push(Value::ArenaTuple {
                            slab_start: slab_start as u32,
                            arity,
                        });
                    }
                }
                Op::MakeList(n) => {
                    let mut items: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                    for i in (0..n as usize).rev() { items[i] = self.pop()?; }
                    self.stack.push(Value::List(items.into()));
                }
                Op::MakeVariant { name_idx, arity } => {
                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
                    let name = match &self.program.constants[name_idx as usize] {
                        Const::VariantName(s) => s.clone(),
                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
                    };
                    self.stack.push(Value::Variant { name, args });
                }
                Op::GetField { name_idx, site_idx } => {
                    let v = self.pop()?;
                    match v {
                        Value::Record { fields: r, shape_id } => {
                            if ic_stats_enabled() {
                                record_ic_hit(fn_id, site_idx, shape_id);
                            }
                            // Inline cache keyed by (fn_id, site_idx) with
                            // shape_id-keyed verification (#462). Slot stores
                            // (shape_id_at_install, offset). Hit verification:
                            // - real shape_id (!= NO_SHAPE_ID) matches: offset
                            //   is guaranteed valid (records with the same
                            //   shape_id share the same field-name ordering
                            //   from the compile-time `record_shapes` table).
                            //   One u32 compare; no string work.
                            // - NO_SHAPE_ID matches NO_SHAPE_ID: distinct
                            //   dynamic shapes both carry this sentinel and
                            //   would otherwise alias, so we fall back to
                            //   verifying via name compare against the field
                            //   at the cached offset — the pre-slice
                            //   correctness path.
                            // On any mismatch we walk by name and reinstall
                            // (shape_id, offset).
                            let fid = fn_id as usize;
                            let sid = site_idx as usize;
                            if self.field_ics[fid].is_empty() {
                                let n = self.program.functions[fid].field_ic_sites as usize;
                                self.field_ics[fid] = vec![None; n];
                            }
                            let cached = self.field_ics[fid][sid];
                            let value = 'ic: {
                                if let Some((cached_shape, off)) = cached {
                                    if cached_shape == shape_id {
                                        if shape_id != crate::value::NO_SHAPE_ID {
                                            // Real shape match: offset is sound.
                                            if let Some((_, val)) = r.get_index(off) {
                                                break 'ic val.clone();
                                            }
                                        } else if let Some((k, val)) = r.get_index(off) {
                                            // Dynamic shape: verify by name.
                                            if let Const::FieldName(s) =
                                                &self.program.constants[name_idx as usize]
                                            {
                                                if s == k { break 'ic val.clone(); }
                                            }
                                        }
                                    }
                                }
                                // Cache miss: resolve by name, install
                                // (shape_id, offset).
                                let name = match &self.program.constants[name_idx as usize] {
                                    Const::FieldName(s) => s.as_str(),
                                    _ => return Err(VmError::TypeMismatch(
                                        "expected FieldName const".into())),
                                };
                                let (off, _, val) = r.get_full(name)
                                    .ok_or_else(|| VmError::TypeMismatch(
                                        format!("missing field `{name}`")))?;
                                self.field_ics[fid][sid] = Some((shape_id, off));
                                val.clone()
                            };
                            self.stack.push(value);
                        }
                        Value::StackRecord { shape_id, slab_start, field_count } => {
                            // #464 step 2: dispatch over a stack-allocated
                            // record. The IC slot stored
                            // (shape_id, offset_in_shape) is interoperable
                            // with the heap path because MakeRecord builds
                            // the IndexMap in shape order — offset N means
                            // the same field in either representation. So
                            // we share `field_ics` with the heap path; no
                            // per-variant cache pollution.
                            if ic_stats_enabled() {
                                record_ic_hit(fn_id, site_idx, shape_id);
                            }
                            let fid = fn_id as usize;
                            let sid = site_idx as usize;
                            if self.field_ics[fid].is_empty() {
                                let n = self.program.functions[fid].field_ic_sites as usize;
                                self.field_ics[fid] = vec![None; n];
                            }
                            let cached = self.field_ics[fid][sid];
                            let value = 'ic: {
                                if let Some((cached_shape, off)) = cached {
                                    if cached_shape == shape_id && (off as u16) < field_count {
                                        // Shape-keyed verification is sound
                                        // here for the same reason as the
                                        // heap path — compile-time shape
                                        // IDs are issued by
                                        // `Program::record_shapes` and
                                        // their field order is fixed.
                                        // Stack records always carry a
                                        // compile-time shape_id (NO_SHAPE_ID
                                        // is impossible — AllocStackRecord
                                        // is only emitted at compile time
                                        // with a known shape_idx).
                                        let idx = slab_start as usize + off;
                                        break 'ic self.stack_record_arena[idx].clone();
                                    }
                                }
                                // Cache miss: walk the shape's field-name
                                // vec to find the slot for `name_idx`. The
                                // miss path is O(field_count) like the
                                // heap path, but the hot retrieval after
                                // install is one array index — cheaper
                                // than IndexMap::get_index.
                                let shape =
                                    &self.program.record_shapes[shape_id as usize];
                                let target_name = match &self.program.constants[name_idx as usize] {
                                    Const::FieldName(s) => s.as_str(),
                                    _ => return Err(VmError::TypeMismatch(
                                        "expected FieldName const".into())),
                                };
                                let mut found: Option<usize> = None;
                                for (i, fn_const_idx) in shape.iter().enumerate() {
                                    if let Const::FieldName(s) =
                                        &self.program.constants[*fn_const_idx as usize]
                                    {
                                        if s == target_name { found = Some(i); break; }
                                    }
                                }
                                let off = found.ok_or_else(|| VmError::TypeMismatch(
                                    format!("missing field `{target_name}` on stack record")))?;
                                self.field_ics[fid][sid] = Some((shape_id, off));
                                self.stack_record_arena[slab_start as usize + off].clone()
                            };
                            self.stack.push(value);
                        }
                        Value::ArenaRecord { shape_id, slab_start, field_count } => {
                            // #463 slice 2a: dispatch over an
                            // arena-allocated record. Identical IC
                            // story to `StackRecord` above — the slot
                            // stores `(shape_id, offset)` and offset
                            // semantics match `Value::Record`'s
                            // IndexMap insertion order, so the IC is
                            // three-way interoperable.
                            if ic_stats_enabled() {
                                record_ic_hit(fn_id, site_idx, shape_id);
                            }
                            let fid = fn_id as usize;
                            let sid = site_idx as usize;
                            if self.field_ics[fid].is_empty() {
                                let n = self.program.functions[fid].field_ic_sites as usize;
                                self.field_ics[fid] = vec![None; n];
                            }
                            let cached = self.field_ics[fid][sid];
                            let value = 'ic: {
                                if let Some((cached_shape, off)) = cached {
                                    if cached_shape == shape_id && (off as u16) < field_count {
                                        let idx = slab_start as usize + off;
                                        break 'ic self.arena_slab[idx].clone();
                                    }
                                }
                                let shape =
                                    &self.program.record_shapes[shape_id as usize];
                                let target_name = match &self.program.constants[name_idx as usize] {
                                    Const::FieldName(s) => s.as_str(),
                                    _ => return Err(VmError::TypeMismatch(
                                        "expected FieldName const".into())),
                                };
                                let mut found: Option<usize> = None;
                                for (i, fn_const_idx) in shape.iter().enumerate() {
                                    if let Const::FieldName(s) =
                                        &self.program.constants[*fn_const_idx as usize]
                                    {
                                        if s == target_name { found = Some(i); break; }
                                    }
                                }
                                let off = found.ok_or_else(|| VmError::TypeMismatch(
                                    format!("missing field `{target_name}` on arena record")))?;
                                self.field_ics[fid][sid] = Some((shape_id, off));
                                self.arena_slab[slab_start as usize + off].clone()
                            };
                            self.stack.push(value);
                        }
                        other => return Err(VmError::TypeMismatch(
                            format!("GetField on non-record: {other:?}"))),
                    }
                }
                Op::GetElem(i) => {
                    let v = self.pop()?;
                    match v {
                        Value::Tuple(items) => {
                            let v = items.into_iter().nth(i as usize)
                                .ok_or_else(|| VmError::TypeMismatch(format!("tuple index {i} out of range")))?;
                            self.stack.push(v);
                        }
                        // #464 tuple codegen: positional read out of a
                        // frame-local tuple. The arena slot is an O(1)
                        // index, mirroring the heap path's nth().
                        Value::StackTuple { slab_start, arity } => {
                            if i >= arity {
                                return Err(VmError::TypeMismatch(
                                    format!("tuple index {i} out of range")));
                            }
                            let v = self.stack_record_arena[slab_start as usize + i as usize].clone();
                            self.stack.push(v);
                        }
                        // #463 slice 2a: positional read out of an
                        // arena tuple — same O(1) index pattern as
                        // StackTuple but through `arena_slab`.
                        Value::ArenaTuple { slab_start, arity } => {
                            if i >= arity {
                                return Err(VmError::TypeMismatch(
                                    format!("tuple index {i} out of range")));
                            }
                            let v = self.arena_slab[slab_start as usize + i as usize].clone();
                            self.stack.push(v);
                        }
                        other => return Err(VmError::TypeMismatch(format!("GetElem on non-tuple: {other:?}"))),
                    }
                }
                Op::TestVariant(i) => {
                    let name = match &self.program.constants[i as usize] {
                        Const::VariantName(s) => s.clone(),
                        _ => return Err(VmError::TypeMismatch("expected VariantName const".into())),
                    };
                    let v = self.pop()?;
                    match &v {
                        Value::Variant { name: vname, .. } => {
                            self.stack.push(Value::Bool(vname == &name));
                        }
                        // For tag-only enums of primitive type (e.g. ParseError = Empty | NotNumber)
                        // the value is currently a Variant too, since constructors emit MakeVariant.
                        other => return Err(VmError::TypeMismatch(format!("TestVariant on non-variant: {other:?}"))),
                    }
                }
                Op::GetVariant(_i) => {
                    let v = self.pop()?;
                    match v {
                        Value::Variant { args, .. } => {
                            self.stack.push(Value::Tuple(args));
                        }
                        other => return Err(VmError::TypeMismatch(format!("GetVariant on non-variant: {other:?}"))),
                    }
                }
                Op::GetVariantArg(i) => {
                    let v = self.pop()?;
                    match v {
                        Value::Variant { mut args, .. } => {
                            if (i as usize) >= args.len() {
                                return Err(VmError::TypeMismatch("variant arg index oob".into()));
                            }
                            self.stack.push(args.swap_remove(i as usize));
                        }
                        other => return Err(VmError::TypeMismatch(format!("GetVariantArg on non-variant: {other:?}"))),
                    }
                }
                Op::GetListLen => {
                    let v = self.pop()?;
                    match v {
                        Value::List(items) => self.stack.push(Value::Int(items.len() as i64)),
                        other => return Err(VmError::TypeMismatch(format!("GetListLen on non-list: {other:?}"))),
                    }
                }
                Op::GetListElem(i) => {
                    let v = self.pop()?;
                    match v {
                        Value::List(items) => {
                            let v = items.into_iter().nth(i as usize)
                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
                            self.stack.push(v);
                        }
                        other => return Err(VmError::TypeMismatch(format!("GetListElem on non-list: {other:?}"))),
                    }
                }
                Op::GetListElemDyn => {
                    // Stack: [list, idx]
                    let idx = match self.pop()? {
                        Value::Int(n) => n as usize,
                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn idx: {other:?}"))),
                    };
                    let v = self.pop()?;
                    match v {
                        Value::List(items) => {
                            let v = items.into_iter().nth(idx)
                                .ok_or_else(|| VmError::TypeMismatch("list index oob".into()))?;
                            self.stack.push(v);
                        }
                        other => return Err(VmError::TypeMismatch(format!("GetListElemDyn on non-list: {other:?}"))),
                    }
                }
                Op::ListAppend => {
                    let value = self.pop()?;
                    let list = self.pop()?;
                    match list {
                        Value::List(mut items) => {
                            items.push_back(value);
                            self.stack.push(Value::List(items));
                        }
                        other => return Err(VmError::TypeMismatch(format!("ListAppend on non-list: {other:?}"))),
                    }
                }
                Op::Jump(off) => {
                    let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
                    self.frames[frame_idx].pc = new_pc;
                }
                Op::JumpIf(off) => {
                    let v = self.pop()?;
                    if v.as_bool() {
                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
                        self.frames[frame_idx].pc = new_pc;
                    }
                }
                Op::JumpIfNot(off) => {
                    let v = self.pop()?;
                    if !v.as_bool() {
                        let new_pc = (self.frames[frame_idx].pc as i32 + off) as usize;
                        self.frames[frame_idx].pc = new_pc;
                    }
                }
                Op::MakeClosure { fn_id, capture_count } => {
                    let n = capture_count as usize;
                    let mut captures: Vec<Value> = (0..n).map(|_| Value::Unit).collect();
                    for i in (0..n).rev() { captures[i] = self.pop()?; }
                    // Look up the canonical body hash so the resulting
                    // `Value::Closure` carries it for equality (#222).
                    let body_hash = self.program.functions[fn_id as usize].body_hash;
                    self.stack.push(Value::Closure { fn_id, body_hash, captures });
                }
                Op::CallClosure { arity, node_id_idx } => {
                    let arity = arity as usize;
                    // Args sit on the value stack at [args_base..]; the
                    // closure sits just below them at args_base - 1. Take
                    // the closure out (leaving a Unit placeholder), then
                    // write its captures and pop the args directly into
                    // the callee's locals — no per-call args Vec and no
                    // `captures.extend(args)` realloc (#464). The combined
                    // [captures, args] view the tracer wants is exactly
                    // the contiguous locals slice we just filled.
                    let args_base = self.stack.len() - arity;
                    let closure = std::mem::replace(&mut self.stack[args_base - 1], Value::Unit);
                    let (fn_id, captures) = match closure {
                        Value::Closure { fn_id, captures, .. } => (fn_id, captures),
                        other => return Err(VmError::TypeMismatch(format!("CallClosure on non-closure: {other:?}"))),
                    };
                    let fid = fn_id as usize;
                    let node_id = const_str(&self.program.constants, node_id_idx);
                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
                    if budget_cost > 0 {
                        self.handler.note_call_budget(budget_cost)
                            .map_err(VmError::Effect)?;
                    }
                    let cap_n = captures.len();
                    let locals_start = self.locals_storage.len();
                    let locals_len = self.program.functions[fid].locals_count
                        .max(self.program.functions[fid].arity) as usize;
                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
                    for (i, v) in captures.into_iter().enumerate() {
                        self.locals_storage[locals_start + i] = v;
                    }
                    // Move the args off the value stack into the locals
                    // following the captures (popping leaves the args off
                    // the stack; the closure's Unit placeholder is then
                    // the top, so truncate it away).
                    for i in (0..arity).rev() {
                        self.locals_storage[locals_start + cap_n + i] = self.pop()?;
                    }
                    self.stack.truncate(args_base - 1);
                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.locals_storage[locals_start..locals_start + cap_n + arity]);
                    self.push_frame(Frame {
                        fn_id, pc: 0, locals_start, locals_len,
                        stack_base: self.stack.len(),
                        trace_kind: FrameKind::Call(node_id),
                        // Op::CallClosure intentionally doesn't memoize
                        // for v1 (#229) — closures over captures need a
                        // hashing strategy that includes the captures.
                        // Direct Op::Call is the v1 surface.
                        memo_key: None,
                        stack_record_arena_start: self.stack_record_arena.len(),
                        stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
                    })?;
                }
                Op::SortByKey { node_id_idx: _ } => {
                    // #338: pop (xs, f). For each x in xs, invoke
                    // f(x) to derive a sortable key. Stable-sort the
                    // (key, value) pairs by key. Return the values
                    // in sorted order. Keys must be Int / Float /
                    // Str; mixed-type pairs and other types compare
                    // as equal (preserving original order — stable
                    // sort).
                    let f = self.pop()?;
                    let xs = self.pop()?;
                    let items = match xs {
                        Value::List(v) => v,
                        other => return Err(VmError::TypeMismatch(
                            format!("SortByKey requires a List, got: {other:?}"))),
                    };
                    if !matches!(f, Value::Closure { .. }) {
                        return Err(VmError::TypeMismatch(
                            format!("SortByKey requires a closure, got: {f:?}")));
                    }
                    let mut keyed: Vec<(Value, Value)> = Vec::with_capacity(items.len());
                    for item in items {
                        let key = self.invoke_closure_1(f.clone(), item.clone())?;
                        keyed.push((key, item));
                    }
                    keyed.sort_by(|(ka, _), (kb, _)| compare_sort_keys(ka, kb));
                    let sorted: VecDeque<Value> = keyed.into_iter().map(|(_, v)| v).collect();
                    self.stack.push(Value::List(sorted));
                }
                Op::ParallelMap { node_id_idx: _ } => {
                    // #305 slice 1: pop (xs, f) and apply f to each
                    // element across OS threads.
                    //
                    // #305 slice 2: each worker now asks the parent
                    // handler for a thread-safe per-worker handler via
                    // `EffectHandler::spawn_for_worker`. Handlers that
                    // opt in (e.g. `DefaultHandler`) yield a fresh
                    // instance sharing the budget pool; handlers that
                    // don't fall back to the slice-1 behavior of
                    // `DenyAllEffects` in the worker.
                    let f = self.pop()?;
                    let xs = self.pop()?;
                    let items = match xs {
                        Value::List(v) => v,
                        other => return Err(VmError::TypeMismatch(
                            format!("ParallelMap requires a List, got: {other:?}"))),
                    };
                    if !matches!(f, Value::Closure { .. }) {
                        return Err(VmError::TypeMismatch(
                            format!("ParallelMap requires a closure, got: {f:?}")));
                    }
                    // Pre-build one handler per worker on the main
                    // thread so the worker just owns its handler with
                    // no shared borrowing. The actual worker count is
                    // capped by `LEX_PAR_MAX_CONCURRENCY` (resolved
                    // inside par_map_run); cap ≤ items.len() so we
                    // never over-allocate handlers.
                    let n_workers = par_max_concurrency().max(1).min(items.len().max(1));
                    let mut worker_handlers: Vec<Box<dyn EffectHandler + Send>> =
                        Vec::with_capacity(n_workers);
                    for _ in 0..n_workers {
                        worker_handlers.push(
                            self.handler
                                .spawn_for_worker()
                                .unwrap_or_else(|| Box::new(DenyAllEffects)),
                        );
                    }
                    let results = par_map_run(self.program, f, items.into_iter().collect(), worker_handlers)?;
                    self.stack.push(Value::List(results.into()));
                }
                Op::ListMap { node_id_idx: _ } => {
                    // #464: native map. Owns `xs` (no per-iteration
                    // clone of the input or accumulator that the old
                    // inlined `LoadLocal`-based loop incurred) and
                    // builds the output with one pre-sized allocation.
                    let f = self.pop()?;
                    let xs = self.pop()?;
                    let items = match xs {
                        Value::List(v) => v,
                        other => return Err(VmError::TypeMismatch(
                            format!("ListMap requires a List, got: {other:?}"))),
                    };
                    if !matches!(f, Value::Closure { .. }) {
                        return Err(VmError::TypeMismatch(
                            format!("ListMap requires a closure, got: {f:?}")));
                    }
                    let mut out: VecDeque<Value> = VecDeque::with_capacity(items.len());
                    for item in items {
                        out.push_back(self.invoke_closure_1(f.clone(), item)?);
                    }
                    self.stack.push(Value::List(out));
                }
                Op::ListFilter { node_id_idx: _ } => {
                    // #464: native filter. Pred is applied to a clone
                    // of each element; the original element is kept on
                    // a true result.
                    let f = self.pop()?;
                    let xs = self.pop()?;
                    let items = match xs {
                        Value::List(v) => v,
                        other => return Err(VmError::TypeMismatch(
                            format!("ListFilter requires a List, got: {other:?}"))),
                    };
                    if !matches!(f, Value::Closure { .. }) {
                        return Err(VmError::TypeMismatch(
                            format!("ListFilter requires a closure, got: {f:?}")));
                    }
                    let mut out: VecDeque<Value> = VecDeque::new();
                    for item in items {
                        let keep = self.invoke_closure_1(f.clone(), item.clone())?;
                        if keep.as_bool() {
                            out.push_back(item);
                        }
                    }
                    self.stack.push(Value::List(out));
                }
                Op::ListFold { node_id_idx: _ } => {
                    // #464: native left-fold. `acc` is threaded by
                    // value; each element is moved into the combiner.
                    let f = self.pop()?;
                    let init = self.pop()?;
                    let xs = self.pop()?;
                    let items = match xs {
                        Value::List(v) => v,
                        other => return Err(VmError::TypeMismatch(
                            format!("ListFold requires a List, got: {other:?}"))),
                    };
                    if !matches!(f, Value::Closure { .. }) {
                        return Err(VmError::TypeMismatch(
                            format!("ListFold requires a closure, got: {f:?}")));
                    }
                    let mut acc = init;
                    for item in items {
                        acc = self.invoke_closure_2(f.clone(), acc, item)?;
                    }
                    self.stack.push(acc);
                }
                Op::Call { fn_id, arity, node_id_idx } => {
                    let arity = arity as usize;
                    let fid = fn_id as usize;
                    // Args sit on the value stack at [args_base..]. We
                    // read them in place for the refinement / memo /
                    // trace checks and only move them into the locals
                    // slot-allocator at the very end — avoiding a
                    // per-call args Vec (#464 call-overhead). The stack
                    // naturally holds the args until consumed, so the
                    // only early-exit cleanup is truncating them off on
                    // a memo hit; a refinement error aborts the VM.
                    let args_base = self.stack.len() - arity;
                    let node_id = const_str(&self.program.constants, node_id_idx);
                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
                    if budget_cost > 0 {
                        self.handler.note_call_budget(budget_cost)
                            .map_err(VmError::Effect)?;
                    }
                    // Refinement runtime check (#209 slice 3). Each
                    // param's `Option<Refinement>` is evaluated against
                    // the actual arg before the frame is pushed. The
                    // tracer sees the call enter; failure surfaces as
                    // `VmError::RefinementFailed` *before* the body
                    // starts, which means an erroring trace shows the
                    // call as enter+exit_err with the verdict reason
                    // (same shape as `gate.verdict`).
                    //
                    // Iterate by reference — the loop body reads only
                    // through `r` (borrowed from `self.program`) and the
                    // arg slots on the stack; we don't mutate `self`, so
                    // the borrows are disjoint.
                    let refinements = &self.program.functions[fid].refinements;
                    for (i, refinement) in refinements.iter().enumerate() {
                        if let Some(r) = refinement {
                            let arg = self.stack[args_base + i].clone();
                            match eval_refinement(&r.predicate, &r.binding, &arg) {
                                Ok(true) => { /* satisfied, continue */ }
                                Ok(false) => {
                                    return Err(VmError::RefinementFailed {
                                        fn_name: self.program.functions[fid].name.clone(),
                                        param_index: i,
                                        binding: r.binding.clone(),
                                        reason: format!(
                                            "predicate failed for {} = {arg:?}",
                                            r.binding),
                                    });
                                }
                                Err(reason) => {
                                    return Err(VmError::RefinementFailed {
                                        fn_name: self.program.functions[fid].name.clone(),
                                        param_index: i,
                                        binding: r.binding.clone(),
                                        reason,
                                    });
                                }
                            }
                        }
                    }
                    // Pure-fn memoization (#229): if the callee declares
                    // no effects, hash the args and consult the cache.
                    // On hit, push the cached value, emit synthetic
                    // enter+exit trace events (so the trace still shows
                    // the call), and skip the frame push entirely.
                    //
                    // Adaptive gate (#229 adaptive): only hash if this
                    // function still has memoization enabled. A pure
                    // function whose args never repeat pays the hash for
                    // nothing; after a warmup window with zero hits we
                    // disable it and its calls take the plain path below.
                    let memo_key: Option<(u32, [u8; 16])> =
                        if self.program.functions[fid].effects.is_empty()
                            && self.memo_fn_state[fid].enabled
                            // #621: skip memo if any arg contains a request-scoped
                            // arena handle. The memo cache outlives the request arena,
                            // so hashing such a handle would dangle.
                            && !self.stack[args_base..].iter().any(|v| v.contains_arena_record())
                        {
                            Some((fn_id, hash_call_args(&self.stack[args_base..])))
                        } else {
                            if self.program.functions[fid].effects.is_empty() {
                                self.pure_memo_skips += 1;
                            }
                            None
                        };
                    if let Some(key) = memo_key {
                        self.memo_fn_state[fid].calls += 1;
                        if let Some(cached) = self.pure_memo.get(&key).cloned() {
                            self.memo_fn_state[fid].hits += 1;
                            self.pure_memo_hits += 1;
                            self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
                            self.tracer.exit_ok(&cached);
                            self.stack.truncate(args_base);
                            self.stack.push(cached);
                            continue;
                        }
                        self.pure_memo_misses += 1;
                        // Disable on a cold function: warmup elapsed with
                        // no hit. Always safe — the callee is pure, so the
                        // plain path recomputes the identical result.
                        let st = &mut self.memo_fn_state[fid];
                        if st.calls >= MEMO_WARMUP_CALLS && st.hits == 0 {
                            st.enabled = false;
                        }
                    }
                    // #465 JIT tier hook. Consulted after refinements +
                    // memo. The hook contract (see `crate::jit_hook`)
                    // requires the dispatcher to emit the synthetic
                    // tracer events itself — we do that on hit, then
                    // truncate the args off the stack and push the
                    // result, mirroring the memo-hit path above.
                    //
                    // Take/restore around the call so the hook can
                    // borrow `&self.stack` for its args slice while
                    // we hold `&mut hook`. Cheaper than cloning the
                    // args; the take/put is two pointer writes.
                    if let Some(mut hook) = self.jit_hook.take() {
                        let hook_result = hook.try_call(fn_id, &self.stack[args_base..]);
                        self.jit_hook = Some(hook);
                        match hook_result? {
                            Some(result) => {
                                self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
                                self.tracer.exit_ok(&result);
                                // Memoize the result if memo is enabled
                                // for this fn — same semantics as a
                                // regular call's Return path.
                                if let Some(key) = memo_key {
                                    self.pure_memo.insert(key, result.clone());
                                }
                                self.stack.truncate(args_base);
                                self.stack.push(result);
                                continue;
                            }
                            None => { /* hook declined; fall through */ }
                        }
                    }
                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
                    let locals_len = self.program.functions[fid].locals_count
                        .max(self.program.functions[fid].arity) as usize;
                    let locals_start = self.locals_storage.len();
                    self.locals_storage.resize(locals_start + locals_len, Value::Unit);
                    // Move the args off the stack into the callee's
                    // locals (popping leaves the stack at `args_base`).
                    for i in (0..arity).rev() {
                        self.locals_storage[locals_start + i] = self.pop()?;
                    }
                    self.push_frame(Frame {
                        fn_id, pc: 0, locals_start, locals_len,
                        stack_base: self.stack.len(),
                        trace_kind: FrameKind::Call(node_id),
                        memo_key,
                        stack_record_arena_start: self.stack_record_arena.len(),
                        stack_record_budget_remaining: STACK_RECORD_BUDGET_SLOTS,
                    })?;
                }
                Op::TailCall { fn_id, arity, node_id_idx } => {
                    let arity = arity as usize;
                    let fid = fn_id as usize;
                    // Args sit on the value stack at [args_base..]. Read
                    // them in place for the refinement / trace checks and
                    // move them into the reused frame's locals at the end
                    // — no per-call args Vec (#464). Tail calls have no
                    // memoization, so the consumers are refinement, trace,
                    // then the locals move. The args live on `self.stack`
                    // while locals live on `self.locals_storage`, so the
                    // `truncate(old_locals_start)` below (which releases
                    // the *old* frame's locals) doesn't touch them.
                    let args_base = self.stack.len() - arity;
                    let node_id = const_str(&self.program.constants, node_id_idx);
                    let budget_cost = call_budget_cost(&self.program.functions[fid]);
                    if budget_cost > 0 {
                        self.handler.note_call_budget(budget_cost)
                            .map_err(VmError::Effect)?;
                    }
                    // Refinement runtime check on tail calls too
                    // (#209 slice 3). Same shape as Op::Call.
                    let refinements = &self.program.functions[fid].refinements;
                    for (i, refinement) in refinements.iter().enumerate() {
                        if let Some(r) = refinement {
                            let arg = self.stack[args_base + i].clone();
                            match eval_refinement(&r.predicate, &r.binding, &arg) {
                                Ok(true) => {}
                                Ok(false) => return Err(VmError::RefinementFailed {
                                    fn_name: self.program.functions[fid].name.clone(),
                                    param_index: i,
                                    binding: r.binding.clone(),
                                    reason: format!(
                                        "predicate failed for {} = {arg:?}",
                                        r.binding),
                                }),
                                Err(reason) => return Err(VmError::RefinementFailed {
                                    fn_name: self.program.functions[fid].name.clone(),
                                    param_index: i,
                                    binding: r.binding.clone(),
                                    reason,
                                }),
                            }
                        }
                    }
                    // #465 JIT tier hook for tail calls. A tail-called
                    // function's result IS the current frame's result,
                    // so on a hook hit we collapse the current frame:
                    // truncate state back to the frame's entry, emit
                    // the synthetic enter+exit_ok trace events that a
                    // normal tail-into-return would have produced, then
                    // bubble the result up the same way Op::Return
                    // does.
                    if let Some(mut hook) = self.jit_hook.take() {
                        let hook_result = hook.try_call(fn_id, &self.stack[args_base..]);
                        self.jit_hook = Some(hook);
                        if let Some(result) = hook_result? {
                            self.tracer.exit_call_tail();
                            self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
                            self.tracer.exit_ok(&result);
                            let frame = self.frames.pop().unwrap();
                            self.stack.truncate(frame.stack_base);
                            self.locals_storage.truncate(frame.locals_start);
                            self.stack_record_arena.truncate(frame.stack_record_arena_start);
                            // Tail calls don't carry a memo_key (the
                            // existing arm doesn't memoize them), so
                            // skip the memo store the Return path does.
                            if self.frames.len() <= base_depth {
                                return Ok(result);
                            }
                            self.stack.push(result);
                            continue;
                        }
                    }
                    // A tail call closes the current call's trace frame and
                    // opens a new one in its place — preserves the caller's
                    // tree depth in the trace.
                    self.tracer.exit_call_tail();
                    self.tracer.enter_call(&node_id, &self.program.functions[fid].name, &self.stack[args_base..]);
                    // Reuse the current frame's locals_start position:
                    // truncate to release old locals then extend for the
                    // new function (#389 slice 3, same as Op::Return but
                    // without popping the frame).
                    let old_locals_start = self.frames.last().unwrap().locals_start;
                    self.locals_storage.truncate(old_locals_start);
                    let new_locals_len = self.program.functions[fid].locals_count
                        .max(self.program.functions[fid].arity) as usize;
                    self.locals_storage.resize(old_locals_start + new_locals_len, Value::Unit);
                    // Move the args off the value stack into the callee's
                    // locals (popping leaves the stack at `args_base`).
                    for i in (0..arity).rev() {
                        self.locals_storage[old_locals_start + i] = self.pop()?;
                    }
                    // #464 step 2: a tail-called function gets a fresh
                    // stack-record arena view. Release any records the
                    // pre-tail-call code allocated (they can't be live
                    // — the args have already been popped off the
                    // value stack) and refill the budget for the
                    // callee.
                    let arena_start = self.frames.last().unwrap().stack_record_arena_start;
                    self.stack_record_arena.truncate(arena_start);
                    let frame = self.frames.last_mut().unwrap();
                    frame.fn_id = fn_id;
                    frame.pc = 0;
                    frame.locals_len = new_locals_len;
                    frame.trace_kind = FrameKind::Call(node_id);
                    frame.stack_record_budget_remaining = STACK_RECORD_BUDGET_SLOTS;
                }
                Op::EffectCall { kind_idx, op_idx, arity, node_id_idx } => {
                    let mut args: Vec<Value> = (0..arity).map(|_| Value::Unit).collect();
                    for i in (0..arity as usize).rev() { args[i] = self.pop()?; }
                    let kind = match &self.program.constants[kind_idx as usize] {
                        Const::Str(s) => s.clone(),
                        _ => return Err(VmError::TypeMismatch("expected Str const for effect kind".into())),
                    };
                    let op_name = match &self.program.constants[op_idx as usize] {
                        Const::Str(s) => s.clone(),
                        _ => return Err(VmError::TypeMismatch("expected Str const for effect op".into())),
                    };
                    let node_id = const_str(&self.program.constants, node_id_idx);
                    self.tracer.enter_effect(&node_id, &kind, &op_name, &args);
                    let result = match self.tracer.override_effect(&node_id) {
                        Some(v) => Ok(v),
                        // VM-level intercept for `parser.run` (#221).
                        // Routed inline rather than through the handler
                        // because the parser interpreter needs reentrant
                        // VM access to invoke `Value::Closure` values
                        // from `Map` / `AndThen` nodes.
                        None if (kind.as_str(), op_name.as_str()) == ("parser", "run")
                            => self.run_parser_op(args),
                        // VM-level intercept for `conc.*` (#381). The actor
                        // handler closure must run on the calling VM so it can
                        // dispatch arbitrary effects through the same handler
                        // chain (e.g. sql queries inside an actor).
                        None if kind.as_str() == "conc"
                            => self.run_conc_op(op_name.as_str(), args),
                        None => self.handler.dispatch(&kind, &op_name, args),
                    };
                    match result {
                        Ok(v) => {
                            self.tracer.exit_ok(&v);
                            self.stack.push(v);
                        }
                        Err(e) => {
                            self.tracer.exit_err(&e);
                            return Err(VmError::Effect(e));
                        }
                    }
                }
                Op::Return => {
                    let v = self.pop()?;
                    let frame = self.frames.pop().unwrap();
                    // Trim any extra stuff that the function pushed but didn't pop.
                    self.stack.truncate(frame.stack_base);
                    // Release this frame's locals back to the arena (#389 slice 3).
                    // LIFO frame ordering guarantees this frame's slots are at the top.
                    self.locals_storage.truncate(frame.locals_start);
                    // #464 step 2: release this frame's stack-record
                    // slab. LIFO frame discipline guarantees its
                    // records sit at the top of the arena. The
                    // returned value `v` is escape-proven not to be
                    // one of them — the compiler only emits
                    // AllocStackRecord at sites that don't reach
                    // `Return`.
                    self.stack_record_arena.truncate(frame.stack_record_arena_start);
                    if matches!(frame.trace_kind, FrameKind::Call(_)) {
                        self.tracer.exit_ok(&v);
                    }
                    // Pure-fn memoization (#229): if this frame was a
                    // memoizable call that missed the cache, write the
                    // computed return value back so the next call with
                    // the same args returns it without re-executing.
                    if let Some(key) = frame.memo_key {
                        self.pure_memo.insert(key, v.clone());
                    }
                    // Exit when we've returned past the depth this
                    // `run_to` was entered at — supports reentrancy
                    // (a nested `invoke` returns into its caller, not
                    // out of the outermost VM run, #221).
                    if self.frames.len() <= base_depth {
                        return Ok(v);
                    }
                    self.stack.push(v);
                }
                Op::Panic(i) => {
                    let msg = match &self.program.constants[i as usize] {
                        Const::Str(s) => s.clone(),
                        _ => "panic".into(),
                    };
                    return Err(VmError::Panic(msg));
                }
                // Arithmetic
                Op::IntAdd => self.bin_int(|a, b| Value::Int(a + b))?,
                Op::IntSub => self.bin_int(|a, b| Value::Int(a - b))?,
                Op::IntMul => self.bin_int(|a, b| Value::Int(a * b))?,
                Op::IntDiv => self.bin_int(|a, b| Value::Int(a / b))?,
                Op::IntMod => self.bin_int(|a, b| Value::Int(a % b))?,
                Op::IntNeg => {
                    let a = self.pop()?.as_int();
                    self.stack.push(Value::Int(-a));
                }
                Op::IntEq => self.bin_int(|a, b| Value::Bool(a == b))?,
                Op::IntLt => self.bin_int(|a, b| Value::Bool(a < b))?,
                Op::IntLe => self.bin_int(|a, b| Value::Bool(a <= b))?,
                Op::FloatAdd => self.bin_float(|a, b| Value::Float(a + b))?,
                Op::FloatSub => self.bin_float(|a, b| Value::Float(a - b))?,
                Op::FloatMul => self.bin_float(|a, b| Value::Float(a * b))?,
                Op::FloatDiv => self.bin_float(|a, b| Value::Float(a / b))?,
                Op::FloatNeg => {
                    let a = self.pop()?.as_float();
                    self.stack.push(Value::Float(-a));
                }
                Op::FloatEq => self.bin_float(|a, b| Value::Bool(a == b))?,
                Op::FloatLt => self.bin_float(|a, b| Value::Bool(a < b))?,
                Op::FloatLe => self.bin_float(|a, b| Value::Bool(a <= b))?,
                Op::NumAdd => {
                    // #308: `+` is overloaded — Str+Str concatenates,
                    // numerics add. Other arithmetic ops (-, *, /, %)
                    // still reject Str at the type-checker layer.
                    let b = self.pop()?;
                    let a = self.pop()?;
                    match (a, b) {
                        (Value::Int(x), Value::Int(y)) => self.stack.push(Value::Int(x + y)),
                        (Value::Float(x), Value::Float(y)) => self.stack.push(Value::Float(x + y)),
                        (Value::Str(x), Value::Str(y)) => {
                            // SmolStr is immutable; concatenate via a temporary String.
                            let mut s = String::with_capacity(x.len() + y.len());
                            s.push_str(&x);
                            s.push_str(&y);
                            self.stack.push(Value::Str(s.into()));
                        }
                        (a, b) => return Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
                    }
                }
                Op::NumSub => self.bin_num(|a, b| Value::Int(a - b), |a, b| Value::Float(a - b))?,
                Op::NumMul => self.bin_num(|a, b| Value::Int(a * b), |a, b| Value::Float(a * b))?,
                Op::NumDiv => self.bin_num(|a, b| Value::Int(a / b), |a, b| Value::Float(a / b))?,
                Op::NumMod => self.bin_int(|a, b| Value::Int(a % b))?,
                Op::NumNeg => {
                    let v = self.pop()?;
                    match v {
                        Value::Int(n) => self.stack.push(Value::Int(-n)),
                        Value::Float(f) => self.stack.push(Value::Float(-f)),
                        other => return Err(VmError::TypeMismatch(format!("NumNeg on {other:?}"))),
                    }
                }
                Op::NumEq => self.bin_eq()?,
                Op::NumLt => self.bin_ord(|a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b), |a, b| Value::Bool(a < b))?,
                Op::NumLe => self.bin_ord(|a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b), |a, b| Value::Bool(a <= b))?,
                Op::BoolAnd => {
                    let b = self.pop()?.as_bool();
                    let a = self.pop()?.as_bool();
                    self.stack.push(Value::Bool(a && b));
                }
                Op::BoolOr => {
                    let b = self.pop()?.as_bool();
                    let a = self.pop()?.as_bool();
                    self.stack.push(Value::Bool(a || b));
                }
                Op::BoolNot => {
                    let a = self.pop()?.as_bool();
                    self.stack.push(Value::Bool(!a));
                }
                Op::StrConcat => {
                    let b = self.pop()?;
                    let a = self.pop()?;
                    let s = format!("{}{}", a.as_str(), b.as_str());
                    self.stack.push(Value::Str(s.into()));
                }
                Op::StrLen => {
                    let v = self.pop()?;
                    self.stack.push(Value::Int(v.as_str().len() as i64));
                }
                Op::StrEq => {
                    let b = self.pop()?;
                    let a = self.pop()?;
                    self.stack.push(Value::Bool(a.as_str() == b.as_str()));
                }
                Op::BytesLen => {
                    let v = self.pop()?;
                    match v {
                        Value::Bytes(b) => self.stack.push(Value::Int(b.len() as i64)),
                        other => return Err(VmError::TypeMismatch(format!("BytesLen on {other:?}"))),
                    }
                }
                Op::BytesEq => {
                    let b = self.pop()?;
                    let a = self.pop()?;
                    let eq = match (a, b) {
                        (Value::Bytes(x), Value::Bytes(y)) => x == y,
                        _ => return Err(VmError::TypeMismatch("BytesEq operands".into())),
                    };
                    self.stack.push(Value::Bool(eq));
                }

                // Superinstructions (#461).
                Op::LoadLocalAddIntConst { local_idx, imm_const_idx } => {
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + local_idx as usize].as_int();
                    let b = match &self.program.constants[imm_const_idx as usize] {
                        Const::Int(n) => *n,
                        c => return Err(VmError::TypeMismatch(
                            format!("LoadLocalAddIntConst expected Int const, got {c:?}"))),
                    };
                    self.stack.push(Value::Int(a + b));
                    // Override the default `pc + 1`: skip past the
                    // two inert primitive ops (the original
                    // PushConst + IntAdd) that the peephole pass
                    // left in place for body-hash stability.
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalAddLocal { lhs_idx, rhs_idx } => {
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
                    self.stack.push(Value::Int(a + b));
                    // Override the default `pc + 1`: skip past the
                    // two inert primitive ops (the original
                    // LoadLocal(rhs_idx) + IntAdd) that the peephole
                    // pass left in place for body-hash stability.
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalSubLocal { lhs_idx, rhs_idx } => {
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
                    self.stack.push(Value::Int(a - b));
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalMulLocal { lhs_idx, rhs_idx } => {
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + lhs_idx as usize].as_int();
                    let b = self.locals_storage[base + rhs_idx as usize].as_int();
                    self.stack.push(Value::Int(a * b));
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalGetField { local_idx, name_idx, site_idx } => {
                    // #461 slice 9: fused `LoadLocal + GetField`. Reads
                    // the field directly out of the local record by
                    // reference and pushes it, advancing pc by 2 (one
                    // tombstone — the original GetField). Avoids the
                    // unfused pair's whole-record clone onto the value
                    // stack: the dominant heap-record churn on the
                    // `response_build` profile (`r.total` field reads).
                    let base = self.frames[frame_idx].locals_start;
                    let v = self.read_local_record_field(
                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetField")?;
                    self.stack.push(v);
                    self.frames[frame_idx].pc = pc + 2;
                }
                Op::LoadLocalGetFieldAdd { local_idx, name_idx, site_idx } => {
                    // #461 slice 7: fused `LoadLocal + GetField + IntAdd`.
                    // Pop the prior stack top (the accumulator), read the
                    // field by reference (shared IC via
                    // `read_local_record_field`), push the sum, advance
                    // pc by 3 (skip the GetField and IntAdd tombstones).
                    let acc = self.pop()?.as_int();
                    let base = self.frames[frame_idx].locals_start;
                    let b = self.read_local_record_field(
                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldAdd")?.as_int();
                    self.stack.push(Value::Int(acc + b));
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalGetFieldSub { local_idx, name_idx, site_idx } => {
                    // #461 slice 8: `LoadLocal + GetField + IntSub`. The
                    // `acc - r.field` idiom. IntSub computes
                    // deeper-minus-top; the field was on top in the
                    // unfused form, so the result is `acc - field`.
                    let acc = self.pop()?.as_int();
                    let base = self.frames[frame_idx].locals_start;
                    let b = self.read_local_record_field(
                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldSub")?.as_int();
                    self.stack.push(Value::Int(acc - b));
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalGetFieldMul { local_idx, name_idx, site_idx } => {
                    // #461 slice 8: `LoadLocal + GetField + IntMul`. The
                    // `acc * r.field` idiom (mul is commutative, so
                    // operand order doesn't matter).
                    let acc = self.pop()?.as_int();
                    let base = self.frames[frame_idx].locals_start;
                    let b = self.read_local_record_field(
                        base, local_idx, fn_id, name_idx, site_idx, "LoadLocalGetFieldMul")?.as_int();
                    self.stack.push(Value::Int(acc * b));
                    self.frames[frame_idx].pc = pc + 3;
                }
                Op::LoadLocalEqIntConstJumpIfNot { local_idx, imm_const_idx, jump_offset } => {
                    // First jump-aware fusion (#461 slice 5). The
                    // JumpIfNot's offset is relative to its own
                    // pc + 1 = (pc + 3) + 1 = pc + 4, so the branch
                    // target is `pc + 4 + jump_offset`. Fall-through
                    // (equal → JumpIfNot doesn't jump) is `pc + 4`
                    // (skip past the 3 tombstones — PushConst +
                    // IntEq + JumpIfNot).
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + local_idx as usize].as_int();
                    let b = match &self.program.constants[imm_const_idx as usize] {
                        Const::Int(n) => *n,
                        _ => return Err(VmError::TypeMismatch(
                            "LoadLocalEqIntConstJumpIfNot expects Const::Int".into())),
                    };
                    let next_pc = if a == b {
                        pc + 4
                    } else {
                        ((pc as i32 + 4) + jump_offset) as usize
                    };
                    self.frames[frame_idx].pc = next_pc;
                }
                Op::LoadLocalStoreEqIntConstJumpIfNot { src, dst, imm_const_idx, jump_offset } => {
                    // Slice 6: absorbs LoadLocal + StoreLocal + slice-5 op.
                    // 6-slot window total (this op + 5 tombstones); fall-
                    // through is `pc + 6`, branch target is `pc + 6 +
                    // jump_offset` (the original JumpIfNot was at slot
                    // pc+5, with offset relative to its own pc+1 = pc+6).
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + src as usize].as_int();
                    // Mirror the original `StoreLocal(dst)` — later
                    // arm tests in the same `match` expect to find
                    // the scrutinee at `locals[dst]`.
                    self.locals_storage[base + dst as usize] = Value::Int(a);
                    let b = match &self.program.constants[imm_const_idx as usize] {
                        Const::Int(n) => *n,
                        _ => return Err(VmError::TypeMismatch(
                            "LoadLocalStoreEqIntConstJumpIfNot expects Const::Int".into())),
                    };
                    let next_pc = if a == b {
                        pc + 6
                    } else {
                        ((pc as i32 + 6) + jump_offset) as usize
                    };
                    self.frames[frame_idx].pc = next_pc;
                }
                Op::LoadLocalAddIntConstStoreLocal { src, imm_const_idx, dest } => {
                    let base = self.frames[frame_idx].locals_start;
                    let a = self.locals_storage[base + src as usize].as_int();
                    let b = match &self.program.constants[imm_const_idx as usize] {
                        Const::Int(n) => *n,
                        c => return Err(VmError::TypeMismatch(
                            format!("LoadLocalAddIntConstStoreLocal expected Int const, got {c:?}"))),
                    };
                    self.locals_storage[base + dest as usize] = Value::Int(a + b);
                    // Skip past the 3 inert primitive ops we
                    // absorbed (original PushConst + IntAdd +
                    // StoreLocal).
                    self.frames[frame_idx].pc = pc + 4;
                }
            }
        }
    }

    fn pop(&mut self) -> Result<Value, VmError> {
        self.stack.pop().ok_or(VmError::StackUnderflow)
    }
    fn peek(&self) -> Result<&Value, VmError> {
        self.stack.last().ok_or(VmError::StackUnderflow)
    }

    /// IC-cached field read of `locals[local_idx]`, shared by the
    /// field-read fusions: slice 9's `LoadLocalGetField` and slice
    /// 7/8's `LoadLocalGetField{Add,Sub,Mul}`. Uses the same
    /// `(fn_id, site_idx)` inline-cache slot as the unfused
    /// `Op::GetField`, so the paths stay cache-consistent.
    /// `op_name` only appears in the non-record error message.
    ///
    /// Reads the record **by reference** and clones out only the
    /// selected field — it does *not* clone the whole record. The
    /// unfused `[LoadLocal, GetField]` pair clones the entire record
    /// (`Box<IndexMap>` for a heap record) onto the value stack just
    /// to read one field and drop the rest; on the `response_build`
    /// profile that whole-record clone+drop of the returned `Response`
    /// dominated the malloc traffic. Borrowing in place removes it.
    ///
    /// Borrow discipline: the inline-cache slot can't be written while
    /// the record (a borrow of `self.locals_storage`) is live, so the
    /// match yields `(value, install)` and the `field_ics` write
    /// happens after the borrow ends.
    ///
    /// `#[inline(always)]`: hot dispatch path, called from four tight
    /// `run_to` arms; leaving it out-of-line showed up as a standalone
    /// call frame on the profile.
    #[inline(always)]
    fn read_local_record_field(
        &mut self,
        base: usize,
        local_idx: u16,
        fn_id: u32,
        name_idx: u32,
        site_idx: u32,
        op_name: &str,
    ) -> Result<Value, VmError> {
        let fid = fn_id as usize;
        let sid = site_idx as usize;
        if self.field_ics[fid].is_empty() {
            let n = self.program.functions[fid].field_ic_sites as usize;
            self.field_ics[fid] = vec![None; n];
        }
        let cached = self.field_ics[fid][sid];
        let li = base + local_idx as usize;

        let (value, install): (Value, Option<(u32, usize)>) =
            match &self.locals_storage[li] {
                Value::Record { fields: r, shape_id } => {
                    let shape_id = *shape_id;
                    if ic_stats_enabled() {
                        record_ic_hit(fn_id, site_idx, shape_id);
                    }
                    let hit = if let Some((cached_shape, off)) = cached {
                        if cached_shape == shape_id {
                            if shape_id != crate::value::NO_SHAPE_ID {
                                r.get_index(off).map(|(_, val)| val.clone())
                            } else if let Some((k, val)) = r.get_index(off) {
                                match &self.program.constants[name_idx as usize] {
                                    Const::FieldName(s) if s == k => Some(val.clone()),
                                    _ => None,
                                }
                            } else { None }
                        } else { None }
                    } else { None };
                    match hit {
                        Some(v) => (v, None),
                        None => {
                            let name = match &self.program.constants[name_idx as usize] {
                                Const::FieldName(s) => s.as_str(),
                                _ => return Err(VmError::TypeMismatch(
                                    "expected FieldName const".into())),
                            };
                            let (off, _, val) = r.get_full(name)
                                .ok_or_else(|| VmError::TypeMismatch(
                                    format!("missing field `{name}`")))?;
                            (val.clone(), Some((shape_id, off)))
                        }
                    }
                }
                &Value::StackRecord { shape_id, slab_start, field_count } => {
                    if ic_stats_enabled() {
                        record_ic_hit(fn_id, site_idx, shape_id);
                    }
                    if let Some((cached_shape, off)) = cached {
                        if cached_shape == shape_id && (off as u16) < field_count {
                            let idx = slab_start as usize + off;
                            (self.stack_record_arena[idx].clone(), None)
                        } else {
                            let off = self.resolve_stack_field(shape_id, name_idx)?;
                            (self.stack_record_arena[slab_start as usize + off].clone(),
                             Some((shape_id, off)))
                        }
                    } else {
                        let off = self.resolve_stack_field(shape_id, name_idx)?;
                        (self.stack_record_arena[slab_start as usize + off].clone(),
                         Some((shape_id, off)))
                    }
                }
                // #463 slice 2a: superinstruction read out of an
                // arena-allocated record held in a local. Same shape
                // resolution as the stack-record arm (records share
                // the same `record_shapes` table regardless of
                // allocation site); only the slab indexed differs.
                &Value::ArenaRecord { shape_id, slab_start, field_count } => {
                    if ic_stats_enabled() {
                        record_ic_hit(fn_id, site_idx, shape_id);
                    }
                    if let Some((cached_shape, off)) = cached {
                        if cached_shape == shape_id && (off as u16) < field_count {
                            let idx = slab_start as usize + off;
                            (self.arena_slab[idx].clone(), None)
                        } else {
                            let off = self.resolve_stack_field(shape_id, name_idx)?;
                            (self.arena_slab[slab_start as usize + off].clone(),
                             Some((shape_id, off)))
                        }
                    } else {
                        let off = self.resolve_stack_field(shape_id, name_idx)?;
                        (self.arena_slab[slab_start as usize + off].clone(),
                         Some((shape_id, off)))
                    }
                }
                other => return Err(VmError::TypeMismatch(
                    format!("{op_name} on non-record: {other:?}"))),
            };
        if let Some(entry) = install {
            self.field_ics[fid][sid] = Some(entry);
        }
        Ok(value)
    }

    /// Resolve a field offset within a stack-record shape by name
    /// (the slow path when the inline cache misses). Factored out so
    /// `read_local_record_field` doesn't hold the `locals_storage`
    /// borrow across the `record_shapes` / `constants` walk.
    #[inline]
    fn resolve_stack_field(&self, shape_id: u32, name_idx: u32) -> Result<usize, VmError> {
        let shape = &self.program.record_shapes[shape_id as usize];
        let target_name = match &self.program.constants[name_idx as usize] {
            Const::FieldName(s) => s.as_str(),
            _ => return Err(VmError::TypeMismatch("expected FieldName const".into())),
        };
        for (i, fn_const_idx) in shape.iter().enumerate() {
            if let Const::FieldName(s) = &self.program.constants[*fn_const_idx as usize] {
                if s == target_name { return Ok(i); }
            }
        }
        Err(VmError::TypeMismatch(
            format!("missing field `{target_name}` on stack record")))
    }

    fn bin_int(&mut self, f: impl Fn(i64, i64) -> Value) -> Result<(), VmError> {
        let b = self.pop()?.as_int();
        let a = self.pop()?.as_int();
        self.stack.push(f(a, b));
        Ok(())
    }
    fn bin_float(&mut self, f: impl Fn(f64, f64) -> Value) -> Result<(), VmError> {
        let b = self.pop()?.as_float();
        let a = self.pop()?.as_float();
        self.stack.push(f(a, b));
        Ok(())
    }
    fn bin_num(
        &mut self,
        i: impl Fn(i64, i64) -> Value,
        f: impl Fn(f64, f64) -> Value,
    ) -> Result<(), VmError> {
        let b = self.pop()?;
        let a = self.pop()?;
        match (a, b) {
            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
        }
    }

    /// Like `bin_num` but also handles `Str` operands via lexicographic order.
    /// Used by `NumLt` / `NumLe` because the type checker admits `Str < Str`
    /// and `>` / `>=` compile as swap+NumLt / swap+NumLe (#332).
    fn bin_ord(
        &mut self,
        i: impl Fn(i64, i64) -> Value,
        f: impl Fn(f64, f64) -> Value,
        s: impl Fn(&str, &str) -> Value,
    ) -> Result<(), VmError> {
        let b = self.pop()?;
        let a = self.pop()?;
        match (a, b) {
            (Value::Int(x), Value::Int(y)) => { self.stack.push(i(x, y)); Ok(()) }
            (Value::Float(x), Value::Float(y)) => { self.stack.push(f(x, y)); Ok(()) }
            (Value::Str(x), Value::Str(y)) => { self.stack.push(s(&x, &y)); Ok(()) }
            (a, b) => Err(VmError::TypeMismatch(format!("Num op: {a:?} {b:?}"))),
        }
    }
    fn bin_eq(&mut self) -> Result<(), VmError> {
        let b = self.pop()?;
        let a = self.pop()?;
        self.stack.push(Value::Bool(a == b));
        Ok(())
    }
}

impl Drop for Vm<'_> {
    fn drop(&mut self) {
        if ic_stats_enabled() {
            dump_ic_stats();
        }
    }
}

/// Construct a `Value::Variant` with the given name and args.
/// Used by `conc.*` registry ops to return `Result`/`Option`/`ConcError`
/// values without hand-writing the struct literal at every site.
fn variant(name: &str, args: Vec<Value>) -> Value {
    Value::Variant { name: name.to_string(), args }
}
fn variant_ok(payload: Value) -> Value { variant("Ok", vec![payload]) }
fn variant_err(payload: Value) -> Value { variant("Err", vec![payload]) }

fn const_to_value(c: &Const) -> Value {
    match c {
        Const::Int(n) => Value::Int(*n),
        Const::Float(f) => Value::Float(*f),
        Const::Bool(b) => Value::Bool(*b),
        Const::Str(s) => Value::Str(s.as_str().into()),
        Const::Bytes(b) => Value::Bytes(b.clone()),
        Const::Unit => Value::Unit,
        Const::FieldName(s) | Const::VariantName(s) | Const::NodeId(s) => Value::Str(s.as_str().into()),
    }
}

#[cfg(test)]
mod memo_hash_tests {
    //! #461 follow-up: invariants for the structural memo-key hash
    //! that replaced the SHA-256-over-canonical-JSON path. The memo
    //! cache keys on this digest with no equality fallback, so the
    //! load-bearing property is "equal-under-PartialEq args produce
    //! an equal key" — plus enough discrimination that distinct args
    //! don't collide in practice.
    use super::*;
    use indexmap::IndexMap;

    fn rec(pairs: &[(&str, Value)]) -> Value {
        let mut m: IndexMap<SmolStr, Value> = IndexMap::new();
        for (k, v) in pairs { m.insert((*k).into(), v.clone()); }
        Value::Record { shape_id: crate::value::NO_SHAPE_ID, fields: Box::new(m) }
    }

    #[test]
    fn identical_args_hash_equal() {
        let a = vec![Value::Int(7), Value::Str("hi".into())];
        let b = vec![Value::Int(7), Value::Str("hi".into())];
        assert_eq!(hash_call_args(&a), hash_call_args(&b));
    }

    #[test]
    fn distinct_scalars_differ() {
        assert_ne!(hash_call_args(&[Value::Int(7)]), hash_call_args(&[Value::Int(8)]));
        assert_ne!(hash_call_args(&[Value::Int(0)]), hash_call_args(&[Value::Bool(false)]));
        assert_ne!(hash_call_args(&[Value::Int(0)]), hash_call_args(&[Value::Unit]));
        assert_ne!(hash_call_args(&[Value::Bool(true)]), hash_call_args(&[Value::Bool(false)]));
    }

    #[test]
    fn arity_is_part_of_the_key() {
        assert_ne!(
            hash_call_args(&[Value::Int(1), Value::Int(2)]),
            hash_call_args(&[Value::Int(1)]),
        );
        // A 2-arg call vs a single Tuple arg of the same elements
        // must not collide.
        assert_ne!(
            hash_call_args(&[Value::Int(1), Value::Int(2)]),
            hash_call_args(&[Value::Tuple(vec![Value::Int(1), Value::Int(2)])]),
        );
    }

    #[test]
    fn record_hash_is_field_order_independent() {
        // IndexMap equality ignores insertion order, so the key must
        // too — otherwise equal records would miss the cache.
        let r1 = rec(&[("a", Value::Int(1)), ("b", Value::Int(2))]);
        let r2 = rec(&[("b", Value::Int(2)), ("a", Value::Int(1))]);
        assert_eq!(r1, r2, "precondition: records compare equal");
        assert_eq!(hash_call_args(&[r1]), hash_call_args(&[r2]));
    }

    #[test]
    fn record_distinguishes_values_and_keys() {
        let base = rec(&[("a", Value::Int(1)), ("b", Value::Int(2))]);
        let diff_val = rec(&[("a", Value::Int(1)), ("b", Value::Int(3))]);
        let diff_key = rec(&[("a", Value::Int(1)), ("c", Value::Int(2))]);
        assert_ne!(hash_call_args(std::slice::from_ref(&base)), hash_call_args(&[diff_val]));
        assert_ne!(hash_call_args(&[base]), hash_call_args(&[diff_key]));
    }

    #[test]
    fn shape_id_does_not_affect_record_key() {
        // PartialEq ignores shape_id; the key must too.
        let mut m: IndexMap<SmolStr, Value> = IndexMap::new();
        m.insert("a".into(), Value::Int(1));
        let r_no_shape = Value::Record { shape_id: crate::value::NO_SHAPE_ID, fields: Box::new(m.clone()) };
        let r_shaped = Value::Record { shape_id: 3, fields: Box::new(m) };
        assert_eq!(r_no_shape, r_shaped);
        assert_eq!(hash_call_args(&[r_no_shape]), hash_call_args(&[r_shaped]));
    }

    #[test]
    fn variant_name_and_args_matter() {
        let some1 = Value::Variant { name: "Some".into(), args: vec![Value::Int(1)] };
        let some1b = Value::Variant { name: "Some".into(), args: vec![Value::Int(1)] };
        let some2 = Value::Variant { name: "Some".into(), args: vec![Value::Int(2)] };
        let none = Value::Variant { name: "None".into(), args: vec![] };
        assert_eq!(hash_call_args(std::slice::from_ref(&some1)), hash_call_args(&[some1b]));
        assert_ne!(hash_call_args(std::slice::from_ref(&some1)), hash_call_args(&[some2]));
        assert_ne!(hash_call_args(&[some1]), hash_call_args(&[none]));
    }

    #[test]
    fn float_bit_pattern_keys() {
        assert_eq!(hash_call_args(&[Value::Float(1.5)]), hash_call_args(&[Value::Float(1.5)]));
        assert_ne!(hash_call_args(&[Value::Float(1.5)]), hash_call_args(&[Value::Float(2.5)]));
        // Same NaN bit pattern → same key (harmless: pure callee is
        // deterministic on bit-identical args).
        let nan = f64::NAN;
        assert_eq!(hash_call_args(&[Value::Float(nan)]), hash_call_args(&[Value::Float(nan)]));
    }
}