supermachine 0.7.70

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588
2589
2590
2591
2592
2593
2594
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614
2615
2616
2617
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627
2628
2629
2630
2631
2632
2633
2634
2635
2636
2637
2638
2639
2640
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730
2731
2732
2733
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752
2753
2754
2755
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
//! The Linux/x86 microVM run loop — boots a real kernel (uni- or multi-CPU)
//! through the crate.
//!
//! This is the KVM-native device-serving loop: it drives `VcpuFd::run`
//! directly (not the abstract [`crate::hypervisor`] `step()`), because KVM
//! completes a guest read by writing the result back into the shared `kvm_run`
//! buffer in place — something the by-value `VcpuExit` of the portable seam
//! cannot express. The seam stays the contract for portable orchestration
//! (register access, force-exit, snapshot); the dataplane below it is each
//! backend's own (HVF has its worker loop; this is KVM's).
//!
//! It reuses the portable device plane unchanged: [`Com1`] (16550 serial),
//! [`MmioBus`] + [`MmioVirtio`] + [`VirtioBlk`] for the virtio-mmio block
//! device. For SMP, each vCPU runs its device-serving loop on its own thread;
//! shared devices are behind the same `Send + Sync` handles the HVF path uses
//! ([`MmioBus`] is internally locked; [`Com1`] sits behind a `Mutex`). When one
//! vCPU stops (kernel halt/reboot), the others are kicked with the increment-5
//! [`force_exit`](crate::kvm::KvmVcpuHandle::force_exit) and joined.

use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use kvm_bindings::{kvm_clock_data, kvm_irqchip, kvm_pit_state2};
use kvm_ioctls::VcpuExit as KvmExit;
use vmm_sys_util::eventfd::EventFd;

use super::{KvmDeviceState, KvmError, KvmSnapshotState, KvmVcpu, KvmVcpuHandle, KvmVm};
use crate::arch::x86_64::mptable;
use crate::devices::com1::{Com1, Com1State, COM1_BASE, COM1_IRQ};
use crate::devices::mmio_bus::MmioBus;
use crate::devices::virtio::blk::VirtioBlk;
use crate::devices::virtio::fs::{VirtioFs, VirtioFsConfig};
use crate::devices::virtio::mmio::{MmioSnapshot, MmioVirtio};
use crate::devices::virtio::queue::GuestMem;
use crate::devices::virtio::vsock::device::Vsock;
use crate::devices::virtio::vsock::muxer::TsiListenerSnapshot;
use crate::devices::virtio::vsock::muxer_thread::MuxerStream;
use crate::devices::virtio::VirtioDevice;
use crate::hypervisor::{HypervisorVcpu, HypervisorVm, VcpuHandle};
use crate::snapshot_frame::{DeviceBacking, DeviceKind, DeviceRecord};

/// virtio-mmio transport window. Placed above guest RAM so the guest's accesses
/// fall outside every memory slot and trap as `KVM_EXIT_MMIO`. The matching
/// `virtio_mmio.device=<len>@<base>:<irq>` is appended to the kernel cmdline.
const VIRTIO_BASE: u64 = 0xd000_0000;
const VIRTIO_LEN: u64 = 0x1000;
const VIRTIO_IRQ: u32 = 5;
/// virtio-mmio QueueNotify register — the doorbell offloaded to an ioeventfd.
const VIRTIO_QUEUE_NOTIFY: u64 = VIRTIO_BASE + 0x050;

/// virtio-vsock transport window (a 2nd virtio-mmio device). Notify is
/// bus-routed (not ioeventfd) because vsock has 3 queues and the QueueNotify
/// value carries the queue index, which a NoDatamatch ioeventfd would drop.
const VSOCK_BASE: u64 = 0xd000_1000;
const VSOCK_LEN: u64 = 0x1000;
const VSOCK_IRQ: u32 = 6;
/// The guest's vsock context id (host is CID 2).
const GUEST_CID: u64 = 3;

/// Data volumes (extra virtio-blk devices beyond the rootfs `vda`) start here,
/// one 0x1000-spaced virtio-mmio window + successive IRQ each. `vda` is at
/// VIRTIO_BASE/IRQ5, vsock at VSOCK_BASE/IRQ6, so volumes begin at vdb.
const VOLUME_BASE: u64 = 0xd000_2000;
const VOLUME_IRQ_BASE: u32 = 7;

/// virtio-fs windows (host-dir mounts). Placed 1 MiB above the blk/vsock/volume
/// cluster so they never collide with the volume span (volumes would need 4096
/// of them to reach here), one 0x1000-spaced window + successive IRQ each.
/// Bus-routed notify like vsock (virtio-fs has a hiprio + a request queue, so
/// the QueueNotify value carries the queue index an ioeventfd would drop).
const FS_BASE: u64 = 0xd010_0000;
const FS_LEN: u64 = 0x1000;
const FS_IRQ_BASE: u32 = 10;

/// virtio-balloon transport window (opt-in cooperative memory release). Placed
/// at 2 MiB above the blk/vsock/volume cluster — well clear of the volume span
/// (`VOLUME_BASE`, would need ~510 volumes to reach here) and the virtio-fs
/// cluster (`FS_BASE` at +1 MiB, would need ~256 fs mounts), so its 0x1000
/// window never collides. Its IRQ is assigned dynamically AFTER the fs mounts
/// (`balloon_irq`) so the contiguous volume/fs IRQ run is undisturbed.
const BALLOON_BASE: u64 = 0xd020_0000;
const BALLOON_LEN: u64 = 0x1000;

/// Highest GSI the in-kernel IOAPIC exposes (24 pins: GSI 0..=23). Every virtio
/// device IRQ must stay below this or the line is never delivered.
const IOAPIC_GSI_CEILING: u32 = 24;

/// The balloon device's IRQ, placed on the first GSI past the virtio-fs run so
/// the contiguous `VOLUME_IRQ_BASE..FS_IRQ_BASE+num_fs` block is left intact.
/// Must match between cold-boot `new` and `finish_restore` (both pass the same
/// `num_fs`).
fn balloon_irq(num_fs: usize) -> u32 {
    FS_IRQ_BASE + num_fs as u32
}

/// Validate the virtio-mmio IRQ budget BEFORE attaching devices — the single
/// source of truth for the KVM slot layout (volumes occupy
/// `[VOLUME_IRQ_BASE, FS_IRQ_BASE)`, fs mounts `[FS_IRQ_BASE, FS_IRQ_BASE+num_fs)`,
/// balloon the next GSI). Guards two silent-corruption gaps the old inline
/// `BASE + i` arithmetic had no check for:
///   1. **Volume↔fs IRQ collision** — >`FS_IRQ_BASE-VOLUME_IRQ_BASE` volumes would
///      run a volume's IRQ into the fs range (two devices sharing a GSI line).
///   2. **IOAPIC overflow** — fs mounts (+ balloon) past GSI 23 get a line that's
///      never delivered, so the device silently never interrupts.
/// Returns a human-readable error instead of booting a VM with aliased IRQs.
fn virtio_irq_budget_ok(
    num_volumes: usize,
    num_fs: usize,
    enable_balloon: bool,
) -> Result<(), String> {
    let max_volumes = (FS_IRQ_BASE - VOLUME_IRQ_BASE) as usize;
    if num_volumes > max_volumes {
        return Err(format!(
            "too many data volumes: {num_volumes} > max {max_volumes} \
             (volume IRQs would collide with the virtio-fs IRQ range)"
        ));
    }
    // Highest GSI actually used: balloon (if present) sits one past the fs run;
    // otherwise the top fs mount does.
    let highest_used = if enable_balloon {
        balloon_irq(num_fs)
    } else if num_fs > 0 {
        FS_IRQ_BASE + num_fs as u32 - 1
    } else {
        VSOCK_IRQ
    };
    if highest_used >= IOAPIC_GSI_CEILING {
        return Err(format!(
            "virtio IRQ budget exhausted: highest GSI {highest_used} >= IOAPIC ceiling \
             {IOAPIC_GSI_CEILING} ({num_fs} fs mounts{})",
            if enable_balloon { " + balloon" } else { "" }
        ));
    }
    Ok(())
}

/// virtio-fs DAX windows: guest-physical ranges into which the host mmaps file
/// pages on FUSE_SETUPMAPPING (zero-copy reads, host page-cache shared across
/// VMs). Placed at 64 GiB — clear of guest RAM (low GiB) and the MMIO cluster
/// (~3.5 GiB), and within any x86_64 MAXPHYADDR (64 GiB = 36 bits). One 1 GiB
/// window per fs mount; mappings are added on demand as KVM memory slots (the
/// window has no backing slot until a SETUPMAPPING populates a sub-range).
const FS_DAX_BASE: u64 = 0x10_0000_0000;
const FS_DAX_WINDOW_LEN: u64 = 1 << 30;

/// Maps host file pages into the guest's DAX window via KVM memory slots — the
/// KVM counterpart of HVF's `HvfMapper`. READ-only prot maps a
/// `KVM_MEM_READONLY` slot (a read-only base can't be mutated through DAX);
/// READ+WRITE maps RW. REMOVEMAPPING deletes the slot by gpa.
struct KvmDaxMapper {
    vm: Arc<KvmVm>,
}

/// Construct the KVM DAX mapper as a `dyn HvfMapper`. Exposed so the seam's
/// `HypervisorVm::dax_mapper` (in `kvm/mod.rs`) can build it from a shared VM
/// handle without the private struct/field crossing the module boundary.
pub(crate) fn kvm_dax_mapper(vm: Arc<KvmVm>) -> Arc<dyn crate::fuse::HvfMapper> {
    Arc::new(KvmDaxMapper { vm })
}

impl crate::fuse::HvfMapper for KvmDaxMapper {
    fn map(
        &self,
        host_va: *mut u8,
        gpa: u64,
        len: u64,
        prot: u32,
    ) -> Result<(), crate::fuse::Errno> {
        let kvm_prot = if prot & crate::fuse::DAX_PROT_WRITE != 0 {
            crate::hypervisor::prot::RWX
        } else {
            crate::hypervisor::prot::READ // no WRITE → map_ram sets KVM_MEM_READONLY
        };
        // SAFETY: host_va covers `len` bytes (the backend's file mmap, kept alive
        // by the DaxSession's active-slot table until REMOVEMAPPING); gpa lies in
        // this device's DAX window, disjoint from RAM/MMIO/other mappings.
        unsafe { self.vm.map_ram(host_va, gpa, len as usize, kvm_prot) }
            .map_err(|_| crate::fuse::backend::EIO)
    }

    fn unmap(&self, gpa: u64, len: u64) -> Result<(), crate::fuse::Errno> {
        // SAFETY: no vCPU is accessing this DAX sub-range at REMOVEMAPPING time
        // (the guest dropped its references first, per the virtio-fs protocol).
        unsafe { self.vm.unmap_ram(gpa, len as usize) }.map_err(|_| crate::fuse::backend::EIO)
    }
}

/// A host directory to expose to the guest as a virtio-fs mount. The guest init
/// reads the `sm.virtiofs=TAG:MOUNT` cmdline token and `mount -t virtiofs`es it.
#[derive(Clone)]
pub struct VirtioFsAttach {
    /// Host directory served read-only over virtio-fs (the FUSE backend root).
    pub host_path: String,
    /// virtio-fs tag the guest mounts by (`mount -t virtiofs <tag> <mount>`).
    pub tag: String,
    /// Absolute mount point inside the guest container.
    pub mount: String,
}

/// A data volume to attach as an extra virtio-blk device and mount in the guest.
/// The host backing file must already exist + be formatted (the caller does
/// that). The guest init mounts `/dev/vd{b,c,…}` at `mount`.
#[derive(Clone)]
pub struct VolumeAttach {
    /// Host path to the (formatted) backing file.
    pub path: String,
    /// Grow the backing file to at least this many bytes (sparse).
    pub size: u64,
    /// Absolute mount point inside the guest container (e.g. `/var/lib/data`).
    pub mount: String,
}

/// Runtime handles for one attached virtio-fs mount, retained on the VM so a
/// snapshot can capture the device's MMIO/queue cursors, the FUSE backend's
/// inode/handle tables, and the DAX slot table — and restore can re-attach the
/// same mount. The host dir is referenced by `host_path` (contents live on the
/// host, never copied into the snapshot).
struct FsMount {
    mmio: Arc<MmioVirtio>,
    backend: Arc<dyn crate::fuse::FsBackend>,
    dax: Arc<crate::fuse::DaxSession>,
    host_path: String,
    tag: String,
    mount: String,
    dax_gpa: u64,
    dax_window_len: u64,
}

/// virtio-fs snapshot: everything needed to re-attach the mount on restore. The
/// FUSE backend's tables and the DAX slot table are opaque blobs produced by
/// their own `snapshot_state` (PosixFs lazy-reopens fds on first post-restore
/// I/O; DAX slots are metadata-only until eagerly re-bound via
/// [`crate::fuse::DaxSession::rebind_all`]).
struct VirtioFsSnap {
    host_path: String,
    tag: String,
    mount: String,
    dax_gpa: u64,
    dax_window_len: u64,
    mmio: MmioSnapshot,
    backend_state: Vec<u8>,
    dax_state: Vec<u8>,
}

/// COM1 occupies eight ports from its base (`0x3f8..=0x3ff`).
const COM1_PORTS: std::ops::Range<u16> = COM1_BASE..COM1_BASE + 8;

/// Inputs to [`LinuxVm::new`].
pub struct LinuxVmConfig<'a> {
    /// Guest RAM size in bytes (must clear the kernel + initrd footprint).
    pub mem_size: usize,
    /// Number of vCPUs (>= 1). With more than one, an MP table is written so
    /// the kernel discovers the secondary CPUs.
    pub num_cpus: u8,
    /// The kernel image (bzImage) bytes.
    pub kernel: &'a [u8],
    /// Optional initramfs the kernel unpacks as its initial rootfs.
    pub initrd: Option<&'a [u8]>,
    /// Optional virtio-blk backing file → `/dev/vda`.
    pub disk_path: Option<&'a str>,
    /// Size to grow the backing file to (ignored when `disk_path` is `None`).
    pub disk_size: u64,
    /// Base kernel command line; the virtio-mmio registration is appended when
    /// a disk is present. Caller sets `console=ttyS0` etc.
    pub cmdline: &'a str,
    /// Attach a virtio-vsock device (host↔guest sockets). The guest CID is 3.
    pub enable_vsock: bool,
    /// Extra data volumes → `/dev/vdb`, `/dev/vdc`, … each mounted in the guest
    /// at its `mount` (the generated init reads `sm.volume=DEV:MOUNT` from the
    /// cmdline). Empty for the common no-volume case. Honored on cold boot; a
    /// snapshot records each volume's mapping (path/size/mount + device state) so
    /// restore re-attaches the same backing files (the contents live on the host).
    pub volumes: &'a [VolumeAttach],
    /// Host directories exposed to the guest as virtio-fs mounts (zero-copy base
    /// sharing — see `docs/design/kvm-virtiofs-dax-2026-06-07.md`). Snapshots
    /// carry the mounts: `VmSnapshot.virtiofs` captures each mount's device +
    /// FUSE-backend + DAX-slot state, and restore re-attaches them (host dir by
    /// path, lazy fd reopen + eager DAX rebind).
    pub virtiofs: &'a [VirtioFsAttach],
    /// vsock TSI control-channel auth token (32 bytes). When `Some`, it is
    /// appended to the cmdline as `supermachine.tsi_token=<hex>` (the guest
    /// stamps it on every control DGRAM) and handed to the muxer (which rejects
    /// any control packet that doesn't carry it) — so an in-guest workload can't
    /// forge TSI control ops to bypass egress policy. `None` disables enforcement
    /// (legacy / tests). The token is captured into the snapshot so restore can
    /// re-enforce the same value the guest RAM still stamps.
    pub tsi_token: Option<[u8; 32]>,
    /// Attach a virtio-balloon device (opt-in cooperative memory release). When
    /// `true` the guest's `virtio_balloon` driver binds the device at
    /// `BALLOON_BASE`; the host then drives `request_inflate` to madvise-FREE
    /// pages the guest hands back. Default `false` — like HVF, balloon is pure
    /// cost with zero benefit unless explicitly used.
    ///
    /// COLD-BOOT only: a balloon device is not captured in snapshots, so a
    /// restored VM has no balloon (matching HVF's "balloon off at restore"
    /// default — reclaim is a one-shot cold-boot lever, not a warm-pool feature).
    /// The product/warm-pool path never enables it; drive it via a direct
    /// [`LinuxVm`] for a long-lived cold-boot VM.
    pub enable_balloon: bool,
}

/// Why a vCPU's run loop returned.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ExitReason {
    /// Guest executed `hlt` with interrupts disabled (clean stop / panic=-1).
    Halt,
    /// Triple fault / `KVM_EXIT_SHUTDOWN` (reboot under `reboot=t`).
    Shutdown,
    /// A cross-thread [`force_exit`](crate::kvm::KvmVcpuHandle::force_exit)
    /// stopped this vCPU (because another vCPU stopped first).
    Canceled,
    /// An exit we don't model (debug, internal error). Carries a description.
    Unknown(String),
}

/// A booted Linux/x86 microVM: VM + vCPUs + device bus, ready to [`run`].
///
/// [`run`]: LinuxVm::run
pub struct LinuxVm {
    vm: Arc<KvmVm>,
    vcpus: Vec<KvmVcpu>,
    bus: Arc<MmioBus>,
    com1: Arc<Mutex<Com1>>,
    host: *mut u8,
    mem_size: usize,
    /// Keep the block device alive for the VM's lifetime.
    _blk: Option<Arc<VirtioBlk>>,
    /// The block device's MMIO transport (for snapshot queue-cursor capture)
    /// + its backing (path, size) so a snapshot can re-open it on restore.
    blk_mmio: Option<Arc<MmioVirtio>>,
    disk: Option<(String, u64)>,
    /// The virtio-vsock device (host↔guest sockets), if attached.
    vsock: Option<Arc<Vsock>>,
    /// The vsock device's MMIO transport (for snapshot queue-cursor capture +
    /// restore), parallel to `blk_mmio`.
    vsock_mmio: Option<Arc<MmioVirtio>>,
    /// The virtio-balloon device (opt-in), if attached. Retained so the host can
    /// drive `request_inflate` (see [`LinuxVm::request_balloon_inflate`]) and so
    /// a snapshot can capture its MMIO state + re-attach on restore.
    balloon: Option<Arc<crate::devices::virtio::balloon::VirtioBalloon>>,
    /// The balloon device's MMIO transport (queue-cursor capture/restore).
    balloon_mmio: Option<Arc<MmioVirtio>>,
    /// virtio device thread (services the ioeventfd doorbell), its stop flag,
    /// and an eventfd clone to wake it on teardown.
    dev_thread: Option<std::thread::JoinHandle<()>>,
    dev_stop: Arc<AtomicBool>,
    dev_wake: Option<EventFd>,
    /// Data-volume virtio-blk devices (vdb, vdc, …) kept alive for the VM's
    /// lifetime, plus their drain threads + wake eventfds (joined on teardown
    /// before guest RAM is unmapped, since they DMA into it).
    _volume_blks: Vec<Arc<VirtioBlk>>,
    volume_threads: Vec<std::thread::JoinHandle<()>>,
    volume_wakes: Vec<EventFd>,
    /// Each data volume's MMIO transport (for snapshot queue-cursor capture) and
    /// its backing metadata (path/size/mount), parallel to `_volume_blks`, so a
    /// snapshot records the mapping and restore re-attaches the same files.
    volume_mmios: Vec<Arc<MmioVirtio>>,
    volume_meta: Vec<VolumeAttach>,
    /// The vsock TSI control-channel auth token this VM was booted with, if any.
    /// The guest kernel stamps it on every control DGRAM (`supermachine.tsi_token=`
    /// cmdline) and the muxer enforces it. Retained so a snapshot can persist it
    /// and restore re-supply it to a fresh muxer (the guest RAM — and thus its
    /// captured `tsi_auth_token[32]` — survives the snapshot, so the same token
    /// must keep being enforced or restored egress control would be rejected).
    tsi_token: Option<[u8; 32]>,
    /// Attached virtio-fs mounts (host dir → guest). Retained so a snapshot can
    /// capture each mount's device + FUSE backend + DAX slot state and restore
    /// re-attach it. Empty for the common no-virtio-fs case.
    fs_mounts: Vec<FsMount>,
    /// Host-side vsock bridge acceptor threads (exec bridge + TSI mux). Each
    /// loops on `listener.incoming()` and outlives a `run()`; they are stopped
    /// and joined in [`Drop`] BEFORE guest RAM is unmapped. Without this they'd
    /// leak a thread + bound socket fd per VM across pool churn. Interior
    /// mutability because `start_exec_bridge` / `start_tsi_mux` register them
    /// through `&self` while the VM is being wired up.
    bridges: Mutex<Vec<crate::vmm::vsock_mux::Acceptor>>,
    /// Per-vCPU snapshot-baseline register state, for the in-place reset path
    /// (isolated warm-reuse): each vCPU thread re-applies its baseline when the
    /// host bumps `reset_seq`. Populated only on the restore path (the baseline
    /// IS the snapshot); empty for a cold-booted VM (which is never reset).
    vcpu_baselines: Vec<KvmSnapshotState>,
    /// Monotonic reset counter. `RunningVm::reset_to_snapshot` bumps it after
    /// resetting RAM + intc + devices; each parked vCPU thread observes the bump
    /// on resume and re-applies its `vcpu_baselines` entry before continuing.
    reset_seq: Arc<AtomicU64>,
    /// Snapshot baseline of the in-kernel intc + timer (PIT/PIC/IOAPIC/kvmclock)
    /// and the 16550 serial, re-applied by `reset_to_snapshot`. `Some` only on
    /// the restore path (cold-boot VMs are never reset).
    reset_intc: Option<KvmDeviceState>,
    reset_com1: Option<Com1State>,
    /// Snapshot baseline of each virtio device's MMIO/queue-cursor state,
    /// re-applied by `reset_to_snapshot` (with the vsock muxer drained) so the
    /// host device views match the guest's reset-to-baseline RAM. Parallel to
    /// `blk_mmio` / `vsock_mmio` / `volume_mmios` / `fs_mounts`. Restore-path only.
    reset_blk_mmio: Option<MmioSnapshot>,
    reset_vsock_mmio: Option<MmioSnapshot>,
    reset_volume_mmios: Vec<MmioSnapshot>,
    reset_fs_mmios: Vec<MmioSnapshot>,
}

/// Process-global counter so each VM's host-side unix sockets (exec bridge, TSI
/// mux) get a UNIQUE path — multiple VMs run in one process (e.g. the pool's
/// idle set), so a pid-only name would collide and the last bind would clobber
/// earlier VMs' sockets.
fn next_sock_id() -> u64 {
    static SOCK_ID: AtomicU64 = AtomicU64::new(0);
    SOCK_ID.fetch_add(1, Ordering::Relaxed)
}

// SAFETY: the only non-Send field is `host: *mut u8` (the guest-RAM mmap). It is
// owned exclusively by this handle — only snapshot capture (reads it) and Drop
// (munmaps it) ever dereference it, and never concurrently. The vCPU threads
// were `mem::take`n out of `vcpus` at start and access guest RAM through the
// kernel's KVM memory slot (Arc<KvmVm>, Send+Sync), not this pointer. So moving
// the handle to another thread (e.g. into the pool's idle queue / refiller) is
// sound. RunningVm/Vm then derive Send from their (all-Send) fields.
unsafe impl Send for LinuxVm {}

/// Hint the kernel to back an anonymous guest-RAM mapping with 2 MiB huge pages
/// (THP). On hosts with `transparent_hugepage=madvise` (a common default) anon
/// memory only gets huge pages with this advice; for a multi-hundred-MiB guest
/// that cuts page faults and EPT/TLB pressure dramatically on touched RAM,
/// speeding boot and steady-state. Best-effort — if THP is unavailable the call
/// is a harmless no-op, so the result is ignored.
fn advise_hugepage(ptr: *mut u8, len: usize) {
    // SAFETY: ptr/len describe a live mapping owned by the caller for >= the
    // duration of this call; madvise only sets a hint and never frees pages.
    unsafe {
        libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_HUGEPAGE);
    }
}

/// Mark a guest-RAM region as KSM-mergeable (`MADV_MERGEABLE`). This is the
/// cross-VM RAM-density lever (see `docs/design/in-vm-builder-density-finding-
/// 2026-06-07.md`): identical guest pages ACROSS VMs — shared base-image rootfs
/// read into page cache, libc/kernel text, and byte-identical build outputs —
/// collapse to a single host copy once the operator enables KSM
/// (`/sys/kernel/mm/ksm/run=1`). Free + harmless when KSM is off (the default):
/// the advice is recorded but nothing scans. KSM only merges private anonymous
/// pages, so on a CoW file-backed restore it merges the pages the guest has
/// privatised (the common case — KVM faults guest RAM writable). Best-effort;
/// result ignored. Opt out with `SUPERMACHINE_NO_KSM=1` (e.g. to avoid KSM's
/// scan CPU on a host that runs KSM globally for other reasons).
fn advise_mergeable(ptr: *mut u8, len: usize) {
    if std::env::var_os("SUPERMACHINE_NO_KSM").is_some() {
        return;
    }
    // SAFETY: same contract as advise_hugepage — a live mapping owned by the
    // caller; madvise only sets the KSM-eligibility hint, never frees pages.
    unsafe {
        libc::madvise(ptr as *mut libc::c_void, len, libc::MADV_MERGEABLE);
    }
}

/// Register a data-volume virtio-blk device on `bus` at `base`/`irq`, wiring the
/// used-buffer irqfd + the QueueNotify ioeventfd doorbell + a drain thread
/// (sharing the rootfs `dev_stop`). Mirrors the rootfs `vda` setup in
/// [`LinuxVm::new`] for vdb/vdc/…. Returns the kept-alive device, its drain
/// thread, and the wake eventfd (the latter two are `None` under KVM_NO_IOEVENTFD).
#[allow(clippy::too_many_arguments)]
fn register_volume_blk(
    vm: &Arc<KvmVm>,
    bus: &Arc<MmioBus>,
    host: *mut u8,
    mem_size: usize,
    name: &str,
    path: &str,
    size: u64,
    base: u64,
    irq: u32,
    dev_stop: &Arc<AtomicBool>,
) -> Result<
    (
        Arc<VirtioBlk>,
        Arc<MmioVirtio>,
        Option<std::thread::JoinHandle<()>>,
        Option<EventFd>,
    ),
    KvmError,
> {
    let blk = Arc::new(VirtioBlk::open_rw(name, path, size)?);
    let gmem = GuestMem::new(host, 0, mem_size);
    let irq_efd = EventFd::new(0)?;
    vm.register_irqfd(&irq_efd, irq)?;
    let irq_efd_dev = irq_efd.try_clone()?;
    let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
        let _ = irq_efd_dev.write(1);
    });
    let mmio = Arc::new(MmioVirtio::new(blk.clone(), gmem, irq_raise));
    blk.set_irq_raise(mmio.make_used_buffer_irq());
    bus.register(base, mmio.clone());
    let mut thread = None;
    let mut wake = None;
    if std::env::var_os("KVM_NO_IOEVENTFD").is_none() {
        let notify_efd = EventFd::new(0)?;
        vm.register_mmio_ioevent(&notify_efd, base + 0x050)?;
        let notify_rd = notify_efd.try_clone()?;
        let blk_thread = blk.clone();
        let stop = dev_stop.clone();
        thread = Some(std::thread::spawn(move || loop {
            if notify_rd.read().is_err() {
                break;
            }
            if stop.load(Ordering::SeqCst) {
                break;
            }
            blk_thread.notify(0);
        }));
        wake = Some(notify_efd.try_clone()?);
    }
    Ok((blk, mmio, thread, wake))
}

impl LinuxVm {
    /// Create the VM, lay down the boot environment (+ MP table for SMP), bring
    /// the BSP to the kernel entry, park the APs for SIPI, and wire the devices.
    /// Does not start executing — call [`LinuxVm::run`].
    pub fn new(cfg: &LinuxVmConfig) -> Result<Self, KvmError> {
        assert!(cfg.num_cpus >= 1, "num_cpus must be >= 1");
        let vm = Arc::new(KvmVm::create()?);
        vm.create_pit()?;

        // Anonymous guest RAM. MAP_NORESERVE: most of it is never touched.
        let host = unsafe {
            libc::mmap(
                std::ptr::null_mut(),
                cfg.mem_size,
                libc::PROT_READ | libc::PROT_WRITE,
                libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_NORESERVE,
                -1,
                0,
            )
        };
        if host == libc::MAP_FAILED {
            return Err(KvmError::from(std::io::Error::last_os_error()));
        }
        let host = host as *mut u8;
        advise_hugepage(host, cfg.mem_size);
        advise_mergeable(host, cfg.mem_size);

        // SAFETY: `host` is a fresh mapping of `mem_size` bytes, kept alive for
        // the VM's lifetime (freed in Drop after the VM is gone).
        unsafe {
            if let Err(e) = vm.map_ram(host, 0, cfg.mem_size, crate::hypervisor::prot::RWX) {
                libc::munmap(host as *mut libc::c_void, cfg.mem_size);
                return Err(e);
            }
        }

        // Assemble the boot image + vCPUs + devices; on any failure unmap the
        // guest RAM exactly once (below) instead of threading cleanup through
        // every `?`.
        let assemble = || -> Result<Assembled, KvmError> {
            // Single source of truth for the virtio-mmio IRQ layout: reject a
            // device set whose IRQs would alias (volumes overrunning fs) or
            // overflow the IOAPIC, instead of silently booting with a dead line.
            virtio_irq_budget_ok(cfg.volumes.len(), cfg.virtiofs.len(), cfg.enable_balloon)
                .map_err(|e| KvmError::from(std::io::Error::other(e)))?;
            let mut cmdline = cfg.cmdline.to_string();
            if cfg.disk_path.is_some() {
                cmdline.push_str(&format!(
                    " virtio_mmio.device=0x{VIRTIO_LEN:x}@0x{VIRTIO_BASE:x}:{VIRTIO_IRQ}"
                ));
            }
            if cfg.enable_vsock {
                cmdline.push_str(&format!(
                    " virtio_mmio.device=0x{VSOCK_LEN:x}@0x{VSOCK_BASE:x}:{VSOCK_IRQ}"
                ));
            }
            // Data volumes: one virtio-mmio blk window each (vdb, vdc, …), plus
            // an `sm.volume=DEV:MOUNT` token the generated init mounts. Block
            // devices are named by virtio-blk probe order (cmdline order), so
            // the first non-rootfs blk becomes vdb regardless of the vsock token.
            for (i, vol) in cfg.volumes.iter().enumerate() {
                let base = VOLUME_BASE + (i as u64) * 0x1000;
                let irq = VOLUME_IRQ_BASE + i as u32;
                let dev = format!("vd{}", (b'b' + i as u8) as char);
                cmdline.push_str(&format!(" virtio_mmio.device=0x1000@0x{base:x}:{irq}"));
                cmdline.push_str(&format!(" sm.volume=/dev/{dev}:{}", vol.mount));
            }
            // virtio-fs mounts: one virtio-mmio window each + an
            // `sm.virtiofs=TAG:MOUNT` token the generated init mounts.
            for (i, fs) in cfg.virtiofs.iter().enumerate() {
                let base = FS_BASE + (i as u64) * 0x1000;
                let irq = FS_IRQ_BASE + i as u32;
                cmdline.push_str(&format!(
                    " virtio_mmio.device=0x{FS_LEN:x}@0x{base:x}:{irq}"
                ));
                cmdline.push_str(&format!(" sm.virtiofs={}:{}", fs.tag, fs.mount));
            }
            // virtio-balloon (opt-in): one virtio-mmio window at BALLOON_BASE,
            // IRQ on the first GSI past the fs run. The guest's virtio_balloon
            // driver binds it; the host drives inflation via `request_inflate`.
            if cfg.enable_balloon {
                let irq = balloon_irq(cfg.virtiofs.len());
                cmdline.push_str(&format!(
                    " virtio_mmio.device=0x{BALLOON_LEN:x}@0x{BALLOON_BASE:x}:{irq}"
                ));
            }
            // vsock TSI control-channel auth: the guest's af_tsi driver reads
            // `supermachine.tsi_token=<hex>` and prepends those 32 bytes to every
            // control DGRAM; the muxer (below) enforces a match. Without it any
            // in-guest userspace process could forge TSI control packets to open
            // arbitrary host sockets (egress) bypassing egress_policy.
            if let Some(token) = cfg.tsi_token.as_ref() {
                crate::cli::append_tsi_token_cmdline(&mut cmdline, &crate::cli::hex_lower(token));
            }

            // SAFETY: `host` maps `mem_size` writable bytes; nothing else
            // aliases it yet (no vCPU created until after this).
            let mem = unsafe { std::slice::from_raw_parts_mut(host, cfg.mem_size) };
            let boot_cfg = crate::hypervisor::LinuxBootConfig {
                kernel: cfg.kernel,
                initrd: cfg.initrd,
                cmdline: &cmdline,
                ram_gpa: 0, // KVM guest RAM is based at GPA 0
                ram_size: cfg.mem_size,
                fdt: None, // x86 boots via boot_params, not a device tree
            };

            // vCPUs: the BSP boots from the kernel entry via the backend-agnostic
            // `boot_linux` seam (x86: setup_boot writes kernel/initrd/boot_params/
            // GDT into RAM + applies long-mode entry regs). The rest park in
            // wait-for-SIPI and are brought up by the kernel via the LAPIC.
            let mut vcpus = Vec::with_capacity(cfg.num_cpus as usize);
            let bsp = vm.create_vcpu()?;
            vm.boot_linux(&bsp, mem, &boot_cfg)?;
            vcpus.push(bsp);
            // SMP: describe the CPUs to the kernel (no ACPI → MP table). Written
            // after boot setup; the MP table occupies a disjoint RAM region.
            if cfg.num_cpus > 1 {
                mptable::write_mptable(mem, cfg.num_cpus).map_err(|e| {
                    KvmError::from(std::io::Error::new(
                        std::io::ErrorKind::Other,
                        e.to_string(),
                    ))
                })?;
            }
            for _ in 1..cfg.num_cpus {
                let vcpu = vm.create_vcpu()?;
                vcpu.park_for_sipi()?;
                vcpus.push(vcpu);
            }

            // Device bus + (if a disk) the virtio-blk transport at VIRTIO_BASE.
            // The QueueNotify doorbell is offloaded to an ioeventfd and the
            // used-buffer IRQ to an irqfd, so neither the guest's kick nor the
            // interrupt costs a vCPU round-trip; a device thread drains the
            // queue. Config-space MMIO (feature/queue setup) still exits to the
            // bus on the vCPU, but that is rare (driver init only).
            let bus = Arc::new(MmioBus::new());
            let mut blk_keep = None;
            let mut blk_mmio_keep = None;
            let mut disk_keep = None;
            let mut dev_thread = None;
            let mut dev_wake = None;
            let dev_stop = Arc::new(AtomicBool::new(false));
            if let Some(disk) = cfg.disk_path {
                let blk = Arc::new(VirtioBlk::open_rw("vda", disk, cfg.disk_size)?);
                let gmem = GuestMem::new(host, 0, cfg.mem_size);

                // The used-buffer IRQ always goes via irqfd (no set_irq_line
                // ioctl). make_used_buffer_irq sets InterruptStatus first.
                let irq_efd = EventFd::new(0)?;
                vm.register_irqfd(&irq_efd, VIRTIO_IRQ)?;
                let irq_efd_dev = irq_efd.try_clone()?;
                let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                    let _ = irq_efd_dev.write(1);
                });
                let mmio = Arc::new(MmioVirtio::new(blk.clone(), gmem, irq_raise));
                blk.set_irq_raise(mmio.make_used_buffer_irq());
                bus.register(VIRTIO_BASE, mmio.clone());
                blk_mmio_keep = Some(mmio);
                disk_keep = Some((disk.to_string(), cfg.disk_size));

                // The QueueNotify doorbell: by default offload it to an
                // ioeventfd + device thread (no vCPU exit per kick). Setting
                // KVM_NO_IOEVENTFD instead leaves QueueNotify as a normal MMIO
                // exit handled on the vCPU via the bus — the A/B baseline used
                // to measure the ioeventfd win (transient benchmark scaffolding).
                let use_ioeventfd = std::env::var_os("KVM_NO_IOEVENTFD").is_none();
                if use_ioeventfd {
                    let notify_efd = EventFd::new(0)?; // blocking; device thread waits on it
                    vm.register_mmio_ioevent(&notify_efd, VIRTIO_QUEUE_NOTIFY)?;
                    let notify_rd = notify_efd.try_clone()?;
                    let blk_thread = blk.clone();
                    let stop = dev_stop.clone();
                    dev_thread = Some(std::thread::spawn(move || loop {
                        if notify_rd.read().is_err() {
                            break;
                        }
                        if stop.load(Ordering::SeqCst) {
                            break;
                        }
                        blk_thread.notify(0);
                    }));
                    dev_wake = Some(notify_efd.try_clone()?);
                }
                blk_keep = Some(blk);
            }

            // virtio-vsock (host↔guest sockets). Notify is bus-routed (3 queues,
            // qidx in the QueueNotify value); the device's muxer thread raises
            // IRQ6 via set_irq_line when it has RX data for the guest.
            let mut vsock_keep = None;
            let mut vsock_mmio_keep = None;
            if cfg.enable_vsock {
                let vsock = Arc::new(
                    Vsock::with_tsi_token(GUEST_CID, cfg.tsi_token)
                        .map_err(|e| KvmError::from(std::io::Error::other(format!("{e:?}"))))?,
                );
                let gmem = GuestMem::new(host, 0, cfg.mem_size);
                let vm_irq = vm.clone();
                let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                    let _ = vm_irq.set_irq(VSOCK_IRQ, true);
                    let _ = vm_irq.set_irq(VSOCK_IRQ, false);
                });
                let mmio = Arc::new(MmioVirtio::new(vsock.clone(), gmem, irq_raise));
                vsock.set_irq_raise(mmio.make_used_buffer_irq());
                // The muxer invokes this after pushing host→guest packets to its
                // rxq, to wake the device's RX drain (fill the guest's RX
                // descriptors + raise the IRQ). Without it, host→guest packets
                // (e.g. a connect REQUEST) sit in the rxq and never reach the
                // guest.
                let vsock_for_kick = vsock.clone();
                vsock
                    .muxer()
                    .set_kick(Arc::new(move || vsock_for_kick.kick()));
                bus.register(VSOCK_BASE, mmio.clone());
                vsock_mmio_keep = Some(mmio);
                vsock_keep = Some(vsock);
            }

            // virtio-fs mounts (host dir → guest). Bus-routed notify (hiprio +
            // request queue) like vsock; IRQ raised via set_irq_line. Each mount
            // gets a DAX window (1 GiB at FS_DAX_BASE+i*win) + a DaxSession on its
            // FUSE server, so a guest that mounts `-o dax` gets zero-copy reads
            // (host file pages mapped into the window as KVM memory slots on
            // SETUPMAPPING); a plain mount uses the request queue. The device +
            // backend + DAX session handles are retained in `fs_mounts` so a
            // snapshot can capture their state and restore re-attach the mount.
            let mut fs_mounts: Vec<FsMount> = Vec::with_capacity(cfg.virtiofs.len());
            for (i, fsm) in cfg.virtiofs.iter().enumerate() {
                let base = FS_BASE + (i as u64) * 0x1000;
                let irq = FS_IRQ_BASE + i as u32;
                let dax_gpa = FS_DAX_BASE + (i as u64) * FS_DAX_WINDOW_LEN;
                let backend: Arc<dyn crate::fuse::FsBackend> =
                    Arc::new(crate::fuse::PosixFs::new(&fsm.host_path).map_err(|e| {
                        KvmError::from(std::io::Error::other(format!(
                            "virtio-fs root {}: {e}",
                            fsm.host_path
                        )))
                    })?);
                let fs_dev = Arc::new(VirtioFs::with_backend(
                    VirtioFsConfig {
                        tag: fsm.tag.clone(),
                        num_request_queues: 1,
                        dax_window_gpa: dax_gpa,
                        dax_window_len: FS_DAX_WINDOW_LEN,
                    },
                    backend.clone(),
                ));
                // DAX session: routes SETUPMAPPING/REMOVEMAPPING through the KVM
                // memory-slot mapper for this window.
                let mapper: Arc<dyn crate::fuse::HvfMapper> =
                    Arc::new(KvmDaxMapper { vm: vm.clone() });
                let session = Arc::new(crate::fuse::DaxSession::new(
                    dax_gpa,
                    FS_DAX_WINDOW_LEN,
                    backend.clone(),
                    mapper,
                ));
                fs_dev
                    .fuse_server()
                    .lock()
                    .unwrap_or_else(|e| e.into_inner())
                    .set_dax(session.clone());
                let gmem = GuestMem::new(host, 0, cfg.mem_size);
                let vm_irq = vm.clone();
                let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                    let _ = vm_irq.set_irq(irq, true);
                    let _ = vm_irq.set_irq(irq, false);
                });
                let mmio = Arc::new(MmioVirtio::new(fs_dev.clone(), gmem, irq_raise));
                fs_dev.set_irq_raise(mmio.make_used_buffer_irq());
                bus.register(base, mmio.clone());
                fs_mounts.push(FsMount {
                    mmio,
                    backend,
                    dax: session,
                    host_path: fsm.host_path.clone(),
                    tag: fsm.tag.clone(),
                    mount: fsm.mount.clone(),
                    dax_gpa,
                    dax_window_len: FS_DAX_WINDOW_LEN,
                });
            }

            // Data volumes (vdb, vdc, …): each is its own virtio-blk at the
            // cmdline-matching base/IRQ, sharing `dev_stop` with the rootfs
            // drain thread. We keep the device + thread + wake alive on the VM.
            let mut volume_blks = Vec::with_capacity(cfg.volumes.len());
            let mut volume_mmios = Vec::with_capacity(cfg.volumes.len());
            let mut volume_meta = Vec::with_capacity(cfg.volumes.len());
            let mut volume_threads = Vec::new();
            let mut volume_wakes = Vec::new();
            for (i, vol) in cfg.volumes.iter().enumerate() {
                let base = VOLUME_BASE + (i as u64) * 0x1000;
                let irq = VOLUME_IRQ_BASE + i as u32;
                let name = format!("vd{}", (b'b' + i as u8) as char);
                let (blk, mmio, thread, wake) = register_volume_blk(
                    &vm,
                    &bus,
                    host,
                    cfg.mem_size,
                    &name,
                    &vol.path,
                    vol.size,
                    base,
                    irq,
                    &dev_stop,
                )?;
                volume_blks.push(blk);
                volume_mmios.push(mmio);
                volume_meta.push(vol.clone());
                if let Some(t) = thread {
                    volume_threads.push(t);
                }
                if let Some(w) = wake {
                    volume_wakes.push(w);
                }
            }

            // virtio-balloon (opt-in): bus-routed notify like vsock (the guest's
            // QueueNotify write routes through the MMIO bus to the device). The
            // used-buffer IRQ fires on inflate/deflate completion; the
            // config-change IRQ fires when the host bumps `num_pages` via
            // `request_inflate`. ram_gpa=0 (guest RAM is mapped at GPA 0).
            let mut balloon_keep = None;
            let mut balloon_mmio_keep = None;
            if cfg.enable_balloon {
                let balloon = Arc::new(crate::devices::virtio::balloon::VirtioBalloon::new());
                let balloon_dev = Arc::new(crate::devices::virtio::balloon::VirtioBalloonWithRam {
                    inner: balloon.clone(),
                    ram_host: host,
                    ram_size: cfg.mem_size,
                    ram_gpa: 0,
                });
                let irq = balloon_irq(cfg.virtiofs.len());
                let vm_irq = vm.clone();
                let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                    let _ = vm_irq.set_irq(irq, true);
                    let _ = vm_irq.set_irq(irq, false);
                });
                let gmem = GuestMem::new(host, 0, cfg.mem_size);
                let mmio = Arc::new(MmioVirtio::new(balloon_dev, gmem, irq_raise));
                balloon.set_irq_raise(mmio.make_used_buffer_irq());
                balloon.set_config_irq_raise(mmio.make_config_change_irq());
                bus.register(BALLOON_BASE, mmio.clone());
                balloon_mmio_keep = Some(mmio);
                balloon_keep = Some(balloon);
            }

            Ok(Assembled {
                vcpus,
                bus,
                balloon: balloon_keep,
                balloon_mmio: balloon_mmio_keep,
                blk: blk_keep,
                blk_mmio: blk_mmio_keep,
                disk: disk_keep,
                vsock: vsock_keep,
                vsock_mmio: vsock_mmio_keep,
                dev_thread,
                dev_stop,
                dev_wake,
                volume_blks,
                volume_mmios,
                volume_meta,
                volume_threads,
                volume_wakes,
                fs_mounts,
            })
        };

        let a = match assemble() {
            Ok(parts) => parts,
            Err(e) => {
                unsafe { libc::munmap(host as *mut libc::c_void, cfg.mem_size) };
                return Err(e);
            }
        };

        Ok(LinuxVm {
            vm,
            vcpus: a.vcpus,
            bus: a.bus,
            com1: Arc::new(Mutex::new(Com1::new())),
            host,
            mem_size: cfg.mem_size,
            _blk: a.blk,
            blk_mmio: a.blk_mmio,
            disk: a.disk,
            vsock: a.vsock,
            vsock_mmio: a.vsock_mmio,
            balloon: a.balloon,
            balloon_mmio: a.balloon_mmio,
            dev_thread: a.dev_thread,
            dev_stop: a.dev_stop,
            dev_wake: a.dev_wake,
            _volume_blks: a.volume_blks,
            volume_threads: a.volume_threads,
            volume_wakes: a.volume_wakes,
            volume_mmios: a.volume_mmios,
            volume_meta: a.volume_meta,
            tsi_token: cfg.tsi_token,
            fs_mounts: a.fs_mounts,
            bridges: Mutex::new(Vec::new()),
            // Cold-booted VM: no snapshot baseline, so it is never reset.
            vcpu_baselines: Vec::new(),
            reset_seq: Arc::new(AtomicU64::new(0)),
            reset_intc: None,
            reset_com1: None,
            reset_blk_mmio: None,
            reset_vsock_mmio: None,
            reset_volume_mmios: Vec::new(),
            reset_fs_mmios: Vec::new(),
        })
    }

    /// Ask the guest to release `pages` 4 KiB pages via virtio-balloon (no-op if
    /// the VM was booted without `enable_balloon`). Bumps the device's
    /// `num_pages` config + fires the config-change IRQ; the guest's balloon
    /// driver then frees that many pages and hands their PFNs back, which the
    /// device `madvise(MADV_FREE)`s on the host RAM mapping. Idempotent — a
    /// repeated identical target is a no-op. Returns `true` if a balloon device
    /// was present to drive.
    pub fn request_balloon_inflate(&self, pages: u32) -> bool {
        match &self.balloon {
            Some(b) => {
                b.request_inflate(pages);
                true
            }
            None => false,
        }
    }

    /// Spawn one device-serving thread per vCPU (consuming `self.vcpus`), all
    /// sharing the given `stop` / `snapshot_req` flags. Returns the join handles
    /// (whose results are `(ExitReason, Option<KvmSnapshotState>)`) and the
    /// per-vCPU force-exit tokens. Shared by [`run`](LinuxVm::run),
    /// [`snapshot_after`](LinuxVm::snapshot_after), and the product run-control
    /// path — the one place vCPU threads are launched.
    #[allow(clippy::type_complexity)]
    fn spawn_vcpus(
        &mut self,
        stop: Arc<AtomicBool>,
        snapshot_req: Arc<AtomicBool>,
        exits: Arc<AtomicU64>,
        count_exits: bool,
    ) -> (
        Vec<std::thread::JoinHandle<(ExitReason, Option<KvmSnapshotState>)>>,
        Vec<KvmVcpuHandle>,
    ) {
        self.spawn_vcpus_paused(stop, snapshot_req, exits, count_exits, None)
    }

    /// As [`spawn_vcpus`](Self::spawn_vcpus) but with an optional
    /// [`PauseCoord`]: when present, a `force_exit` while `pause.pause` is set
    /// makes each vCPU thread capture its state, park, and resume (rather than
    /// exit) — the basis for live snapshots. `None` = legacy stop/snapshot-exit.
    #[allow(clippy::type_complexity)]
    fn spawn_vcpus_paused(
        &mut self,
        stop: Arc<AtomicBool>,
        snapshot_req: Arc<AtomicBool>,
        exits: Arc<AtomicU64>,
        count_exits: bool,
        pause: Option<Arc<PauseCoord>>,
    ) -> (
        Vec<std::thread::JoinHandle<(ExitReason, Option<KvmSnapshotState>)>>,
        Vec<KvmVcpuHandle>,
    ) {
        let handles: Vec<KvmVcpuHandle> = self.vcpus.iter().map(|v| v.exit_token()).collect();
        let vcpus = std::mem::take(&mut self.vcpus);
        let mut threads = Vec::with_capacity(vcpus.len());
        for (idx, vcpu) in vcpus.into_iter().enumerate() {
            let vm = self.vm.clone();
            let bus = self.bus.clone();
            let com1 = self.com1.clone();
            let stop = stop.clone();
            let snapshot_req = snapshot_req.clone();
            let exits = exits.clone();
            let handles = handles.clone();
            let pause = pause.clone();
            // In-place reset wiring: this vCPU's snapshot baseline (Some on the
            // restore path, None for a cold boot) + the shared reset counter the
            // thread watches on each pause-resume.
            let baseline = self.vcpu_baselines.get(idx).cloned();
            let reset_seq = self.reset_seq.clone();
            threads.push(std::thread::spawn(move || {
                run_vcpu(
                    vcpu,
                    vm,
                    bus,
                    com1,
                    stop,
                    snapshot_req,
                    exits,
                    count_exits,
                    handles,
                    pause,
                    idx,
                    baseline,
                    reset_seq,
                )
            }));
        }
        (threads, handles)
    }

    /// Run all vCPUs until the VM stops, streaming the serial console to stdout.
    /// Each vCPU runs its device-serving loop on its own thread; the first to
    /// stop (kernel halt/reboot) force-exits the rest. Returns the BSP's exit
    /// reason (the VM's overall result).
    pub fn run(&mut self) -> Result<ExitReason, KvmError> {
        // Exit counting is benchmark-only: the shared atomic would otherwise
        // bounce a cache line across vCPUs on every exit. Capture the flag once.
        let count_exits = std::env::var_os("KVM_COUNT_EXITS").is_some();
        let exits = Arc::new(AtomicU64::new(0));
        let stop = Arc::new(AtomicBool::new(false));
        // No snapshot requested on this path.
        let snapshot_req = Arc::new(AtomicBool::new(false));
        let (threads, _handles) = self.spawn_vcpus(stop, snapshot_req, exits.clone(), count_exits);

        // The BSP (thread 0) determines the VM result; joining it first blocks
        // until the kernel stops, which propagates `stop` to the APs.
        let mut result = ExitReason::Unknown("no vcpus".into());
        for (i, t) in threads.into_iter().enumerate() {
            let (r, _snap) = t
                .join()
                .unwrap_or((ExitReason::Unknown("vcpu thread panicked".into()), None));
            if std::env::var_os("KVM_DEBUG_VCPU").is_some() {
                eprintln!("[vcpu {i}] exit: {r:?}");
            }
            if i == 0 {
                result = r;
            }
        }
        if count_exits {
            let v = exits.load(Ordering::SeqCst);
            eprintln!(
                "[kvm] virtio-notify vCPU exits: {} | other device exits: {}",
                v & 0xffff_ffff,
                v >> 32
            );
        }
        Ok(result)
    }

    /// Run the VM, then after `after` quiesce ALL vCPUs at clean instruction
    /// boundaries (force_exit) and capture a full [`VmSnapshot`]: every vCPU's
    /// CPU state (7a), the in-kernel device state (7c), and a copy of guest RAM.
    /// Consumes the running VM (its vCPUs are taken). Restore with
    /// [`LinuxVm::restore`].
    ///
    /// (This no-disk path snapshots vCPU+devices+RAM; virtio queue cursors +
    /// serial register state are a follow-on — a disked guest would also need
    /// MmioVirtio::capture_state and a Com1 snapshot.)
    pub fn snapshot_after(&mut self, after: std::time::Duration) -> Result<VmSnapshot, KvmError> {
        let stop = Arc::new(AtomicBool::new(false));
        let snapshot_req = Arc::new(AtomicBool::new(false));
        let exits = Arc::new(AtomicU64::new(0));
        let (threads, handles) = self.spawn_vcpus(stop, snapshot_req.clone(), exits, false);
        let ncpus = handles.len();

        // Trigger: let the guest run, then request a snapshot + kick everyone.
        let trigger_handles = handles.clone();
        let trigger_req = snapshot_req.clone();
        let timer = std::thread::spawn(move || {
            std::thread::sleep(after);
            trigger_req.store(true, Ordering::SeqCst);
            KvmVcpuHandle::force_exit(&trigger_handles);
        });

        let snap = self.capture_quiesced(threads, ncpus);
        let _ = timer.join();
        snap
    }

    /// Join the snapshot-quiesced vCPU `threads` (each must have stopped at a
    /// clean boundary with its captured CPU state in hand — the caller already
    /// set `snapshot_req` + force-exited), then quiesce the device thread and
    /// capture in-kernel devices + serial + virtio-blk cursors + guest RAM into
    /// a [`VmSnapshot`]. Shared by [`snapshot_after`](LinuxVm::snapshot_after)
    /// and [`RunningVm::snapshot`].
    fn capture_quiesced(
        &mut self,
        threads: Vec<std::thread::JoinHandle<(ExitReason, Option<KvmSnapshotState>)>>,
        ncpus: usize,
    ) -> Result<VmSnapshot, KvmError> {
        // Collect per-vCPU state in index (spawn) order.
        let mut vcpu_states = Vec::with_capacity(ncpus);
        for t in threads {
            let (_reason, snap) = t
                .join()
                .unwrap_or((ExitReason::Unknown("panic".into()), None));
            if let Some(s) = snap {
                vcpu_states.push(s);
            }
        }
        if vcpu_states.len() != ncpus {
            return Err(KvmError::from(std::io::Error::other(
                "a vCPU stopped before the snapshot trigger",
            )));
        }

        // Quiesce the virtio device thread so it can't be mid-drain while we
        // capture the queue cursors (consistency).
        if let Some(t) = self.dev_thread.take() {
            self.dev_stop.store(true, Ordering::SeqCst);
            if let Some(efd) = &self.dev_wake {
                let _ = efd.write(1);
            }
            let _ = t.join();
        }
        // Volume drain threads share `dev_stop`; wake + join them too (they DMA
        // into guest RAM, so they must stop before it's unmapped/captured).
        if !self.volume_threads.is_empty() {
            self.dev_stop.store(true, Ordering::SeqCst);
            for w in &self.volume_wakes {
                let _ = w.write(1);
            }
            for t in self.volume_threads.drain(..) {
                let _ = t.join();
            }
        }

        // All vCPUs + the device thread are quiesced — capture consistently.
        self.capture_with_states(vcpu_states)
    }

    /// Capture in-kernel devices + serial + virtio MMIO cursors + a copy of
    /// guest RAM into a [`VmSnapshot`], given already-captured per-vCPU states.
    /// The caller MUST have quiesced execution (vCPUs stopped/parked, device
    /// drain idle) so RAM + cursors are consistent. Shared by the consuming
    /// [`capture_quiesced`](Self::capture_quiesced) and the live
    /// [`RunningVm::snapshot_live`].
    fn capture_with_states(
        &self,
        vcpu_states: Vec<KvmSnapshotState>,
    ) -> Result<VmSnapshot, KvmError> {
        let ncpus = vcpu_states.len();
        let devices = self.vm.capture_devices()?;
        let com1 = lock_recover(&self.com1).snapshot();
        let disk = match (&self.blk_mmio, &self.disk) {
            (Some(mmio), Some((path, size))) => Some(DiskSnap {
                path: path.clone(),
                size: *size,
                mmio: mmio.capture_state(),
            }),
            _ => None,
        };
        let vsock = self.vsock_mmio.as_ref().map(|m| m.capture_state());
        // The guest's TSI listeners (host port-forward routes). The host listener
        // sockets are rebuilt on restore; here we just record each route so a
        // service that was already listening (nginx etc.) keeps being reachable
        // after restore — the guest won't re-`listen()` from restored RAM.
        let vsock_listeners = self
            .vsock
            .as_ref()
            .map(|v| v.muxer().capture_tsi_listeners())
            .unwrap_or_default();
        // Record each data volume's mapping (path/size/mount) + device state so
        // restore re-attaches the same host backing files. Contents aren't copied
        // into the snapshot — they live in the host files (referenced by path).
        let volumes = self
            .volume_mmios
            .iter()
            .zip(self.volume_meta.iter())
            .map(|(mmio, m)| VolumeSnap {
                path: m.path.clone(),
                size: m.size,
                mount: m.mount.clone(),
                mmio: mmio.capture_state(),
            })
            .collect();
        // Record each virtio-fs mount: its device MMIO/queue cursors, the FUSE
        // backend's inode/handle tables, and the DAX slot table. The host dir is
        // referenced by path (contents stay on the host); the FUSE/DAX blobs are
        // O(table-size) metadata, no syscalls. Capture order = device order so
        // restore re-derives the same base/IRQ.
        let virtiofs: Vec<VirtioFsSnap> = self
            .fs_mounts
            .iter()
            .map(|m| VirtioFsSnap {
                host_path: m.host_path.clone(),
                tag: m.tag.clone(),
                mount: m.mount.clone(),
                dax_gpa: m.dax_gpa,
                dax_window_len: m.dax_window_len,
                mmio: m.mmio.capture_state(),
                backend_state: m.backend.snapshot_state().unwrap_or_default(),
                dax_state: m.dax.snapshot_state(),
            })
            .collect();
        let mut ram = vec![0u8; self.mem_size];
        // SAFETY: no vCPU is running; `host` maps `mem_size` bytes.
        unsafe { std::ptr::copy_nonoverlapping(self.host, ram.as_mut_ptr(), self.mem_size) };

        Ok(VmSnapshot {
            num_cpus: ncpus as u8,
            mem_size: self.mem_size,
            vcpus: vcpu_states,
            devices,
            com1,
            disk,
            vsock,
            vsock_listeners,
            volumes,
            tsi_token: self.tsi_token,
            virtiofs,
            ram,
        })
    }

    /// Start all vCPUs on background threads and return a [`RunningVm`] control
    /// handle WITHOUT blocking — the guest runs (serving virtio + serial +
    /// vsock) until the handle's [`wait`](RunningVm::wait) /
    /// [`stop`](RunningVm::stop) / [`snapshot`](RunningVm::snapshot) is called.
    ///
    /// This is the product entry point (the api `Vm` holds the handle so the
    /// guest stays live for exec-over-vsock while a snapshot can be taken on
    /// demand), as opposed to [`run`](LinuxVm::run) which streams serial and
    /// blocks to completion.
    pub fn start_running(mut self) -> RunningVm {
        let stop = Arc::new(AtomicBool::new(false));
        let snapshot_req = Arc::new(AtomicBool::new(false));
        let exits = Arc::new(AtomicU64::new(0));
        // A pause coordinator enables LIVE (non-consuming) snapshots: the vCPU
        // threads park instead of exiting, so the guest resumes after capture.
        let pause = Arc::new(PauseCoord::default());
        let reset_seq = self.reset_seq.clone();
        let (threads, handles) = self.spawn_vcpus_paused(
            stop.clone(),
            snapshot_req.clone(),
            exits,
            false,
            Some(pause.clone()),
        );
        RunningVm {
            vm: self,
            threads,
            stop,
            snapshot_req,
            handles,
            pause,
            reset_seq,
        }
    }

    /// Rebuild a VM from an in-memory [`VmSnapshot`] and restore it to the
    /// snapshotted running state (no boot). Copies the snapshot RAM into a fresh
    /// anonymous mapping; for cross-process restore prefer
    /// [`restore_from_file`](LinuxVm::restore_from_file), which mmaps the RAM
    /// copy-on-write (O(pages-touched), no full copy).
    pub fn restore(snap: &VmSnapshot) -> Result<LinuxVm, KvmError> {
        let vm = Arc::new(KvmVm::create()?);
        vm.create_pit()?;

        let host = unsafe {
            libc::mmap(
                std::ptr::null_mut(),
                snap.mem_size,
                libc::PROT_READ | libc::PROT_WRITE,
                libc::MAP_PRIVATE | libc::MAP_ANONYMOUS | libc::MAP_NORESERVE,
                -1,
                0,
            )
        };
        if host == libc::MAP_FAILED {
            return Err(KvmError::from(std::io::Error::last_os_error()));
        }
        let host = host as *mut u8;
        advise_hugepage(host, snap.mem_size);
        advise_mergeable(host, snap.mem_size);
        // SAFETY: fresh mapping of mem_size bytes; load the snapshotted RAM.
        unsafe { std::ptr::copy_nonoverlapping(snap.ram.as_ptr(), host, snap.mem_size) };
        unsafe {
            if let Err(e) = vm.map_ram(host, 0, snap.mem_size, crate::hypervisor::prot::RWX) {
                libc::munmap(host as *mut libc::c_void, snap.mem_size);
                return Err(e);
            }
        }

        Self::finish_restore(
            vm,
            host,
            snap.mem_size,
            &snap.vcpus,
            &snap.devices,
            &snap.com1,
            &snap.disk,
            &snap.vsock,
            &snap.vsock_listeners,
            &snap.volumes,
            snap.tsi_token,
            &snap.virtiofs,
        )
    }

    /// Restore from a snapshot file with **copy-on-write** guest RAM: the file's
    /// page-aligned RAM region is `mmap`'d `MAP_PRIVATE`, so restore is
    /// O(pages-touched) — no multi-hundred-MB copy. Pages fault in lazily from
    /// the page cache; guest writes COW into private anon pages. This is the fast
    /// path for cross-process / persisted restore.
    pub fn restore_from_file(path: &std::path::Path) -> Result<LinuxVm, KvmError> {
        let mut f = std::fs::File::open(path)?;
        // Detect a differential snapshot (SMSNAP7D) and route to the diff
        // restorer; otherwise this is a full SMSNAP07 (RAM mmapped CoW directly).
        let mut magic = [0u8; 8];
        f.read_exact(&mut magic)?;
        if &magic == b"SMSNAP7D" {
            return Self::restore_diff_from_file(path);
        }
        f.seek(SeekFrom::Start(0))?;
        let meta = read_meta(&mut f)?;
        let vm = Arc::new(KvmVm::create()?);
        vm.create_pit()?;

        // CoW-map the RAM directly from the file at its page-aligned offset via
        // the shared snapshot_frame substrate, which bounds-checks the region
        // against the file length first (a corrupt mem_size/ram_offset would
        // otherwise mmap past EOF → guest SIGBUS = host crash). MAP_NORESERVE so
        // the kernel doesn't reserve swap for pages the guest may never touch.
        let host = crate::snapshot_frame::cow_map_ram(
            &f,
            meta.ram_offset,
            meta.mem_size,
            libc::MAP_NORESERVE,
        )?;
        // CoW file-backed guest RAM: mergeable so KSM can collapse the pages the
        // guest privatises (identical across VMs restored from the same snapshot).
        advise_mergeable(host, meta.mem_size);
        // The mapping keeps its own kernel reference to the file; `f` may drop.
        unsafe {
            if let Err(e) = vm.map_ram(host, 0, meta.mem_size, crate::hypervisor::prot::RWX) {
                libc::munmap(host as *mut libc::c_void, meta.mem_size);
                return Err(e);
            }
        }

        Self::finish_restore(
            vm,
            host,
            meta.mem_size,
            &meta.vcpus,
            &meta.devices,
            &meta.com1,
            &meta.disk,
            &meta.vsock,
            &meta.vsock_listeners,
            &meta.volumes,
            meta.tsi_token,
            &meta.virtiofs,
        )
    }

    /// Restore a differential (`SMSNAP4D`) snapshot: mmap the base snapshot's
    /// RAM copy-on-write, overlay the changed pages from the diff, then restore
    /// the diff's vCPU/device state. The base path is embedded in the diff.
    fn restore_diff_from_file(path: &std::path::Path) -> Result<LinuxVm, KvmError> {
        const PG: usize = 4096;
        let mut f = std::fs::File::open(path)?;
        let mut magic = [0u8; 8];
        f.read_exact(&mut magic)?;
        if &magic != b"SMSNAP7D" {
            return Err(KvmError::from(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "not a differential snapshot",
            )));
        }
        let bp_len = read_u32(&mut f)? as usize;
        let mut bp = vec![0u8; bp_len];
        f.read_exact(&mut bp)?;
        let base_path = String::from_utf8(bp).map_err(|_| {
            KvmError::from(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "diff base path not utf8",
            ))
        })?;
        // The shared meta block (ram_offset placeholder + vCPU/device/disk/vsock).
        let meta = read_meta_body(&mut f)?;

        // mmap the BASE RAM copy-on-write — overlaying the changed pages below
        // makes only those pages private; the rest stay shared with the base.
        let mut bf = std::fs::File::open(&base_path)?;
        let base_meta = read_meta(&mut bf)?;
        if base_meta.mem_size != meta.mem_size {
            return Err(KvmError::from(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "diff/base mem_size mismatch",
            )));
        }
        // Bounds-checked, page-aligned CoW map of the BASE RAM (shared substrate).
        let host = crate::snapshot_frame::cow_map_ram(
            &bf,
            base_meta.ram_offset,
            base_meta.mem_size,
            libc::MAP_NORESERVE,
        )?;
        advise_mergeable(host, base_meta.mem_size);

        // Overlay the changed pages (read sequentially from the diff).
        let num_changed = read_u32(&mut f)?;
        for _ in 0..num_changed {
            let idx = read_u32(&mut f)? as usize;
            let off = idx * PG;
            if off + PG > base_meta.mem_size {
                unsafe { libc::munmap(host as *mut libc::c_void, base_meta.mem_size) };
                return Err(KvmError::from(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    "diff page index out of range",
                )));
            }
            let mut page = [0u8; PG];
            if let Err(e) = f.read_exact(&mut page) {
                unsafe { libc::munmap(host as *mut libc::c_void, base_meta.mem_size) };
                return Err(KvmError::from(e));
            }
            // SAFETY: off+PG <= mem_size (checked); the CoW mapping is writable.
            unsafe { std::ptr::copy_nonoverlapping(page.as_ptr(), host.add(off), PG) };
        }

        let vm = Arc::new(KvmVm::create()?);
        vm.create_pit()?;
        unsafe {
            if let Err(e) = vm.map_ram(host, 0, base_meta.mem_size, crate::hypervisor::prot::RWX) {
                libc::munmap(host as *mut libc::c_void, base_meta.mem_size);
                return Err(e);
            }
        }
        Self::finish_restore(
            vm,
            host,
            base_meta.mem_size,
            &meta.vcpus,
            &meta.devices,
            &meta.com1,
            &meta.disk,
            &meta.vsock,
            &meta.vsock_listeners,
            &meta.volumes,
            meta.tsi_token,
            &meta.virtiofs,
        )
    }

    /// Shared restore tail: with guest RAM already mapped at `host`, restore the
    /// in-kernel devices, every vCPU's CPU state, serial registers, and (if any)
    /// the virtio-blk device chain. Returns a runnable VM.
    fn finish_restore(
        vm: Arc<KvmVm>,
        host: *mut u8,
        mem_size: usize,
        vcpu_states: &[KvmSnapshotState],
        devices: &KvmDeviceState,
        com1_state: &Com1State,
        disk: &Option<DiskSnap>,
        vsock: &Option<MmioSnapshot>,
        vsock_listeners: &[TsiListenerSnapshot],
        volumes: &[VolumeSnap],
        tsi_token: Option<[u8; 32]>,
        virtiofs: &[VirtioFsSnap],
    ) -> Result<LinuxVm, KvmError> {
        vm.restore_devices(devices)?;
        let mut vcpus = Vec::with_capacity(vcpu_states.len());
        for st in vcpu_states {
            let vcpu = vm.create_vcpu()?;
            vcpu.restore_snapshot(st)?;
            vcpus.push(vcpu);
        }

        let mut com1 = Com1::new();
        com1.restore(com1_state);

        // Re-create the virtio-blk device chain if the snapshot had a disk:
        // re-open the backing file, restore the MMIO/queue state (which
        // re-activates the device), re-register ioeventfd/irqfd + the device
        // thread, and kick once to drain anything in-flight at capture.
        let bus = Arc::new(MmioBus::new());
        let dev_stop = Arc::new(AtomicBool::new(false));
        let mut blk_keep = None;
        let mut blk_mmio_keep = None;
        let mut disk_keep = None;
        let mut dev_thread = None;
        let mut dev_wake = None;
        if let Some(d) = disk {
            let blk = Arc::new(VirtioBlk::open_rw("vda", &d.path, d.size)?);
            let gmem = GuestMem::new(host, 0, mem_size);
            let irq_efd = EventFd::new(0)?;
            vm.register_irqfd(&irq_efd, VIRTIO_IRQ)?;
            let irq_efd_dev = irq_efd.try_clone()?;
            let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                let _ = irq_efd_dev.write(1);
            });
            let mmio = Arc::new(MmioVirtio::new(blk.clone(), gmem, irq_raise));
            blk.set_irq_raise(mmio.make_used_buffer_irq());
            mmio.restore_state(&d.mmio);
            bus.register(VIRTIO_BASE, mmio.clone());

            let notify_efd = EventFd::new(0)?;
            vm.register_mmio_ioevent(&notify_efd, VIRTIO_QUEUE_NOTIFY)?;
            let notify_rd = notify_efd.try_clone()?;
            let blk_thread = blk.clone();
            let stop = dev_stop.clone();
            dev_thread = Some(std::thread::spawn(move || loop {
                if notify_rd.read().is_err() {
                    break;
                }
                if stop.load(Ordering::SeqCst) {
                    break;
                }
                blk_thread.notify(0);
            }));
            dev_wake = Some(notify_efd.try_clone()?);
            blk.notify(0);
            disk_keep = Some((d.path.clone(), d.size));
            blk_mmio_keep = Some(mmio);
            blk_keep = Some(blk);
        }

        // Re-attach a virtio-vsock device if the snapshot had one: a FRESH muxer
        // (host state isn't snapshotted) wired back to the guest's existing
        // queues by restoring the MMIO/queue cursors. vsock is bus-routed (no
        // ioeventfd/device-thread), so this is just construct + restore_state.
        // The muxer is re-armed with the snapshot's TSI token: the restored guest
        // RAM still carries the captured `tsi_auth_token[32]` and keeps stamping
        // it, so enforcement must continue with the same value (and a tokenless
        // legacy snapshot restores tokenless — `None` disables enforcement).
        let mut vsock_keep = None;
        let mut vsock_mmio_keep = None;
        if let Some(vmmio) = vsock {
            let vsock_dev = Arc::new(
                Vsock::with_tsi_token(GUEST_CID, tsi_token)
                    .map_err(|e| KvmError::from(std::io::Error::other(format!("{e:?}"))))?,
            );
            let gmem = GuestMem::new(host, 0, mem_size);
            let vm_irq = vm.clone();
            let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                let _ = vm_irq.set_irq(VSOCK_IRQ, true);
                let _ = vm_irq.set_irq(VSOCK_IRQ, false);
            });
            let mmio = Arc::new(MmioVirtio::new(vsock_dev.clone(), gmem, irq_raise));
            vsock_dev.set_irq_raise(mmio.make_used_buffer_irq());
            let vsock_for_kick = vsock_dev.clone();
            vsock_dev
                .muxer()
                .set_kick(Arc::new(move || vsock_for_kick.kick()));
            mmio.restore_state(vmmio);
            bus.register(VSOCK_BASE, mmio.clone());
            // Re-bind the guest's TSI listeners onto fresh host sockets: the guest
            // RAM (restored) still has its services listening, but it won't re-issue
            // `listen()`, so the fresh muxer would otherwise have no host port-forward
            // routes. New ephemeral host ports are picked (transparent to the guest);
            // `inet_port` is preserved so `expose_tcp`/`expose_tls` keep resolving.
            vsock_dev.muxer().restore_tsi_listeners(vsock_listeners);
            vsock_mmio_keep = Some(mmio);
            vsock_keep = Some(vsock_dev);
        }

        // Re-attach each data volume the snapshot recorded: re-open the same host
        // backing file at the deterministic vdb/vdc base+IRQ (order = capture
        // order), restore its MMIO/queue cursors, and kick once. The contents
        // were never copied into the snapshot — they live in the backing file.
        let mut volume_blks = Vec::with_capacity(volumes.len());
        let mut volume_mmios = Vec::with_capacity(volumes.len());
        let mut volume_meta = Vec::with_capacity(volumes.len());
        let mut volume_threads = Vec::new();
        let mut volume_wakes = Vec::new();
        for (i, v) in volumes.iter().enumerate() {
            let base = VOLUME_BASE + (i as u64) * 0x1000;
            let irq = VOLUME_IRQ_BASE + i as u32;
            let name = format!("vd{}", (b'b' + i as u8) as char);
            let (blk, mmio, thread, wake) = register_volume_blk(
                &vm, &bus, host, mem_size, &name, &v.path, v.size, base, irq, &dev_stop,
            )?;
            mmio.restore_state(&v.mmio);
            blk.notify(0);
            volume_blks.push(blk);
            volume_mmios.push(mmio);
            volume_meta.push(VolumeAttach {
                path: v.path.clone(),
                size: v.size,
                mount: v.mount.clone(),
            });
            if let Some(t) = thread {
                volume_threads.push(t);
            }
            if let Some(w) = wake {
                volume_wakes.push(w);
            }
        }

        // Re-attach each virtio-fs mount the snapshot recorded: re-open the same
        // host dir as a fresh FUSE/PosixFs backend, restore its inode/handle
        // tables (fds lazily reopen on first post-restore I/O), re-create the DAX
        // session + KVM mapper, restore the DAX slot table, restore the device's
        // MMIO/queue cursors, and re-register at the deterministic FS base/IRQ
        // (order = capture order). Finally eagerly re-bind every DAX slot so the
        // KVM memslots exist before the guest runs — KVM can't lazily fault them
        // in (a KVM_EXIT_MMIO completes from its data buffer, not a memslot
        // created post-exit). The host dir contents were never copied into the
        // snapshot — they live on the host (by `host_path`).
        let mut fs_mounts: Vec<FsMount> = Vec::with_capacity(virtiofs.len());
        for (i, f) in virtiofs.iter().enumerate() {
            let base = FS_BASE + (i as u64) * 0x1000;
            let irq = FS_IRQ_BASE + i as u32;
            let backend: Arc<dyn crate::fuse::FsBackend> =
                Arc::new(crate::fuse::PosixFs::new(&f.host_path).map_err(|e| {
                    KvmError::from(std::io::Error::other(format!(
                        "virtio-fs root {}: {e}",
                        f.host_path
                    )))
                })?);
            if !f.backend_state.is_empty() {
                backend.restore_state(&f.backend_state).map_err(|e| {
                    KvmError::from(std::io::Error::other(format!(
                        "virtio-fs backend restore {}: {e}",
                        f.host_path
                    )))
                })?;
            }
            let fs_dev = Arc::new(VirtioFs::with_backend(
                VirtioFsConfig {
                    tag: f.tag.clone(),
                    num_request_queues: 1,
                    dax_window_gpa: f.dax_gpa,
                    dax_window_len: f.dax_window_len,
                },
                backend.clone(),
            ));
            let mapper: Arc<dyn crate::fuse::HvfMapper> = Arc::new(KvmDaxMapper { vm: vm.clone() });
            let session = Arc::new(crate::fuse::DaxSession::new(
                f.dax_gpa,
                f.dax_window_len,
                backend.clone(),
                mapper,
            ));
            if !f.dax_state.is_empty() {
                session.restore_state(&f.dax_state).map_err(|e| {
                    KvmError::from(std::io::Error::other(format!(
                        "virtio-fs dax restore {}: {e}",
                        f.host_path
                    )))
                })?;
            }
            fs_dev
                .fuse_server()
                .lock()
                .unwrap_or_else(|e| e.into_inner())
                .set_dax(session.clone());
            let gmem = GuestMem::new(host, 0, mem_size);
            let vm_irq = vm.clone();
            let irq_raise: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
                let _ = vm_irq.set_irq(irq, true);
                let _ = vm_irq.set_irq(irq, false);
            });
            let mmio = Arc::new(MmioVirtio::new(fs_dev.clone(), gmem, irq_raise));
            fs_dev.set_irq_raise(mmio.make_used_buffer_irq());
            mmio.restore_state(&f.mmio);
            bus.register(base, mmio.clone());
            // Eager DAX rebind: re-create the KVM memslots for every active slot
            // before the guest resumes (see method doc for the KVM rationale).
            session.rebind_all().map_err(|e| {
                KvmError::from(std::io::Error::other(format!(
                    "virtio-fs dax rebind {}: errno {e}",
                    f.host_path
                )))
            })?;
            fs_mounts.push(FsMount {
                mmio,
                backend,
                dax: session,
                host_path: f.host_path.clone(),
                tag: f.tag.clone(),
                mount: f.mount.clone(),
                dax_gpa: f.dax_gpa,
                dax_window_len: f.dax_window_len,
            });
        }

        Ok(LinuxVm {
            vm,
            vcpus,
            bus,
            com1: Arc::new(Mutex::new(com1)),
            host,
            mem_size,
            _blk: blk_keep,
            blk_mmio: blk_mmio_keep,
            disk: disk_keep,
            vsock: vsock_keep,
            vsock_mmio: vsock_mmio_keep,
            // Balloon is cold-boot-only (not snapshotted) — a restored VM has no
            // balloon device, matching HVF's "balloon off at restore" default.
            balloon: None,
            balloon_mmio: None,
            dev_thread,
            dev_stop,
            dev_wake,
            _volume_blks: volume_blks,
            volume_threads,
            volume_wakes,
            volume_mmios,
            volume_meta,
            tsi_token,
            fs_mounts,
            bridges: Mutex::new(Vec::new()),
            // The snapshot states ARE the reset baseline: each vCPU re-applies
            // its entry when reset_to_snapshot bumps reset_seq.
            vcpu_baselines: vcpu_states.to_vec(),
            reset_seq: Arc::new(AtomicU64::new(0)),
            reset_intc: Some(devices.clone()),
            reset_com1: Some(*com1_state),
            reset_blk_mmio: disk.as_ref().map(|d| d.mmio.clone()),
            reset_vsock_mmio: vsock.clone(),
            reset_volume_mmios: volumes.iter().map(|v| v.mmio.clone()).collect(),
            reset_fs_mmios: virtiofs.iter().map(|f| f.mmio.clone()).collect(),
        })
    }

    /// A cross-thread handle for opening host→guest vsock streams. Obtain it
    /// before [`run`](LinuxVm::run)'s `&mut` borrow and use it from another
    /// thread while the VM runs. `None` if vsock wasn't enabled.
    pub fn vsock_handle(&self) -> Option<VsockHandle> {
        self.vsock.clone().map(|vsock| VsockHandle { vsock })
    }

    /// Bind a host Unix listener bridged to the guest's exec agent (vsock
    /// `guest_port`, conventionally 1028) and return its path. Each connection
    /// to the path is forwarded to the guest agent via the muxer, so
    /// [`crate::ExecBuilder::new`]`(path)` dials it to run a command in the
    /// guest — the product's exec path, on KVM. Requires `enable_vsock`.
    pub fn start_exec_bridge(&self, guest_port: u32) -> std::io::Result<std::path::PathBuf> {
        let vsock = self.vsock.clone().ok_or_else(|| {
            std::io::Error::new(std::io::ErrorKind::NotConnected, "vsock not enabled")
        })?;
        let path = std::env::temp_dir().join(format!(
            "sm-kvm-exec-{}-{}.sock",
            std::process::id(),
            next_sock_id()
        ));
        let _ = std::fs::remove_file(&path);
        let listener = std::os::unix::net::UnixListener::bind(&path)?;
        // Stoppable + joinable so teardown can reclaim the thread + listener fd
        // (and guarantee no late connection writes into guest RAM after munmap).
        // See `LinuxVm::bridges` / `LinuxVm::drop`.
        let stop = Arc::new(AtomicBool::new(false));
        let stop_c = stop.clone();
        let join = std::thread::spawn(move || {
            for stream in listener.incoming() {
                // Teardown wakes us with a throwaway connect; bail before serving.
                if stop_c.load(Ordering::SeqCst) {
                    break;
                }
                match stream {
                    Ok(s) => {
                        let _ = vsock
                            .muxer()
                            .open_native_to_guest(MuxerStream::Unix(s), guest_port);
                    }
                    Err(_) => break,
                }
            }
        });
        self.bridges
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner)
            .push(crate::vmm::vsock_mux::Acceptor::from_parts(
                stop,
                path.to_string_lossy().into_owned(),
                join,
            ));
        Ok(path)
    }

    /// Bind a host Unix listener bridged to the guest's TSI listeners (workload
    /// networking ingress, i.e. `Vm::expose_tcp` / `Vm::connect`) and return its
    /// path. Each accepted connection is routed to a guest TSI listener by the
    /// shared vsock muxer (port selected per-connection via the `SMUX-PORT`
    /// header the host forwarder writes). This is the KVM counterpart of the HVF
    /// runner's `vsock_mux::start`; egress (guest→host `connect`) needs no
    /// frontend — the muxer handles guest TSI control packets off the TX queue.
    /// Requires `enable_vsock`.
    pub fn start_tsi_mux(&self) -> std::io::Result<std::path::PathBuf> {
        let vsock = self.vsock.clone().ok_or_else(|| {
            std::io::Error::new(std::io::ErrorKind::NotConnected, "vsock not enabled")
        })?;
        let path = std::env::temp_dir().join(format!(
            "sm-kvm-mux-{}-{}.sock",
            std::process::id(),
            next_sock_id()
        ));
        let path_str = path.to_string_lossy().into_owned();
        let acceptor = crate::vmm::vsock_mux::start(&path_str, vsock, None)
            .map_err(|e| std::io::Error::other(format!("vsock_mux::start: {e}")))?;
        // Track it so teardown stops + joins the acceptor thread (no leaked
        // thread / bound socket fd per VM). See `LinuxVm::bridges` / `Drop`.
        self.bridges
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner)
            .push(acceptor);
        Ok(path)
    }

    /// A cross-thread handle for feeding the serial console host→guest input
    /// (keystrokes / piped stdin). Obtain it before [`run`](LinuxVm::run) (which
    /// borrows `&mut self`) and move it to a stdin-reader thread; it shares the
    /// serial device + irqchip via `Arc`, so it works while the VM is running.
    pub fn serial_input(&self) -> SerialInput {
        SerialInput {
            com1: self.com1.clone(),
            vm: self.vm.clone(),
        }
    }
}

/// Output of `LinuxVm::new`'s device assembly (kept off the giant tuple).
struct Assembled {
    vcpus: Vec<KvmVcpu>,
    bus: Arc<MmioBus>,
    blk: Option<Arc<VirtioBlk>>,
    blk_mmio: Option<Arc<MmioVirtio>>,
    disk: Option<(String, u64)>,
    vsock: Option<Arc<Vsock>>,
    vsock_mmio: Option<Arc<MmioVirtio>>,
    balloon: Option<Arc<crate::devices::virtio::balloon::VirtioBalloon>>,
    balloon_mmio: Option<Arc<MmioVirtio>>,
    dev_thread: Option<std::thread::JoinHandle<()>>,
    dev_stop: Arc<AtomicBool>,
    dev_wake: Option<EventFd>,
    volume_blks: Vec<Arc<VirtioBlk>>,
    volume_mmios: Vec<Arc<MmioVirtio>>,
    volume_meta: Vec<VolumeAttach>,
    volume_threads: Vec<std::thread::JoinHandle<()>>,
    volume_wakes: Vec<EventFd>,
    fs_mounts: Vec<FsMount>,
}

/// A captured running VM: every vCPU's CPU state, the in-kernel device state
/// (PIT/irqchip/clock), and a copy of guest RAM. Produced by
/// [`LinuxVm::snapshot_after`], consumed by [`LinuxVm::restore`].
pub struct VmSnapshot {
    num_cpus: u8,
    mem_size: usize,
    vcpus: Vec<KvmSnapshotState>,
    devices: KvmDeviceState,
    com1: Com1State,
    /// Present iff the VM had a virtio-blk disk: its backing (path, size) so
    /// restore can re-open it, plus the device's MMIO/queue-cursor state.
    disk: Option<DiskSnap>,
    /// Present iff the VM had a virtio-vsock device: its MMIO/queue-cursor
    /// state so restore can re-attach a fresh muxer to the guest's queues
    /// (the cid is constant `GUEST_CID`; the muxer itself is host state, not
    /// snapshotted — restore starts a fresh one).
    vsock: Option<MmioSnapshot>,
    /// The guest's TSI listeners (host-side port-forward routes) at capture time.
    /// The host TCP listener fd can't be serialized, so each record carries the
    /// (cid, peer_port, vm_port, family, socktype, inet_port) tuple needed to
    /// re-bind a fresh host listener on restore (see `restore_tsi_listeners`).
    /// Without this, a snapshot taken with a service already `listen()`ing (e.g.
    /// nginx) restores into an empty muxer — the guest never re-issues `listen()`,
    /// so `expose_tcp`/`expose_tls` find no host port. Empty for the common case.
    vsock_listeners: Vec<TsiListenerSnapshot>,
    /// Each attached data volume's mapping (backing path/size, guest mount) and
    /// device state. The contents live in the host backing file (by path) — the
    /// snapshot only records the mapping so restore re-attaches the same files.
    volumes: Vec<VolumeSnap>,
    /// The vsock TSI control-channel auth token the guest was booted with, if
    /// any. The guest's captured `tsi_auth_token[32]` lives in the snapshotted
    /// RAM and keeps being stamped on control DGRAMs after restore, so restore
    /// must hand this same value to the fresh muxer to keep enforcing (and to
    /// avoid rejecting the restored guest's legitimate egress control ops).
    tsi_token: Option<[u8; 32]>,
    /// Each attached virtio-fs mount's device + FUSE backend + DAX slot state, so
    /// restore re-attaches the mount (host dir by path; lazy fd reopen + eager
    /// DAX rebind). Empty for the common no-virtio-fs case.
    virtiofs: Vec<VirtioFsSnap>,
    ram: Vec<u8>,
}

/// virtio-blk snapshot: where the backing file is + the device's MMIO state.
struct DiskSnap {
    path: String,
    size: u64,
    mmio: MmioSnapshot,
}

/// A data-volume (vdb, vdc, …) snapshot: its host backing file (path, size), the
/// guest mount point, and the device's MMIO/queue-cursor state. Like [`DiskSnap`]
/// but for the extra volumes; ordering implies the MMIO base/IRQ on restore.
struct VolumeSnap {
    path: String,
    size: u64,
    mount: String,
    mmio: MmioSnapshot,
}

impl VmSnapshot {
    /// Number of vCPUs captured.
    pub fn num_cpus(&self) -> u8 {
        self.num_cpus
    }
    /// Guest RAM size in bytes.
    pub fn mem_size(&self) -> usize {
        self.mem_size
    }

    /// Serialize everything except the magic + ram-offset header and the RAM
    /// blob: num_cpus, mem_size, Com1 regs, in-kernel device state (PIT + 3
    /// irqchips + clock as POD blobs), each vCPU's CPU state (POD blobs + MSR
    /// index/data pairs), and the optional virtio-blk disk section.
    fn write_meta<W: Write>(&self, w: &mut W) -> std::io::Result<()> {
        self.to_container()?.write_container(w)
    }

    /// Project this KVM snapshot onto the portable [`ContainerMeta`] (7c step 4):
    /// the backend-neutral container both pipelines assemble through. KVM's
    /// interrupt-controller + timer state (PIT + 3 irqchips + kvmclock POD blobs)
    /// is packed into the opaque `intc_blob`; each vCPU's register file into an
    /// opaque per-vCPU blob (via the snapshot-state seam); the clock-ref tuple is
    /// unused on KVM (its kvmclock rides inside `intc_blob`, re-anchored on
    /// restore as before). Devices become the unified `Vec<DeviceRecord>` — disk,
    /// vsock, volumes, virtio-fs in that order — whose backing carries host paths
    /// (+ virtio-fs DAX window/state) but never device content.
    fn to_container(&self) -> std::io::Result<crate::snapshot_frame::ContainerMeta> {
        let mut intc_blob = Vec::new();
        write_blob(&mut intc_blob, pod_bytes(&self.devices.pit))?;
        for chip in &self.devices.irqchips {
            write_blob(&mut intc_blob, pod_bytes(chip))?;
        }
        write_blob(&mut intc_blob, pod_bytes(&self.devices.clock))?;

        let mut vcpu_blobs = Vec::with_capacity(self.vcpus.len());
        for s in &self.vcpus {
            let mut b = Vec::new();
            crate::kvm::KvmVcpu::write_snapshot_state(s, &mut b)?;
            vcpu_blobs.push(b);
        }

        let mut devices: Vec<DeviceRecord> = Vec::new();
        if let Some(d) = &self.disk {
            devices.push(DeviceRecord {
                kind: DeviceKind::Blk,
                mmio: d.mmio.clone(),
                backing: DeviceBacking::Disk {
                    path: d.path.clone(),
                    size: d.size,
                },
            });
        }
        if let Some(m) = &self.vsock {
            devices.push(DeviceRecord {
                kind: DeviceKind::Vsock,
                mmio: m.clone(),
                backing: DeviceBacking::None,
            });
        }
        for v in &self.volumes {
            devices.push(DeviceRecord {
                kind: DeviceKind::Volume,
                mmio: v.mmio.clone(),
                backing: DeviceBacking::Volume {
                    path: v.path.clone(),
                    size: v.size,
                    mount: v.mount.clone(),
                },
            });
        }
        for f in &self.virtiofs {
            devices.push(DeviceRecord {
                kind: DeviceKind::VirtioFs,
                mmio: f.mmio.clone(),
                backing: DeviceBacking::VirtioFs {
                    tag: f.tag.clone(),
                    mount: f.mount.clone(),
                    host_path: f.host_path.clone(),
                    dax_gpa: f.dax_gpa,
                    dax_window_len: f.dax_window_len,
                    backend_state: f.backend_state.clone(),
                    dax_state: f.dax_state.clone(),
                },
            });
        }

        let c = &self.com1;
        Ok(crate::snapshot_frame::ContainerMeta {
            num_cpus: self.num_cpus,
            mem_size: self.mem_size as u64,
            com1: [c.ier, c.lcr, c.mcr, c.scr, c.dll, c.dlm],
            clock_host_ticks: 0,
            clock_ref: 0,
            intc_blob,
            vcpu_blobs,
            devices,
            tsi_token: self.tsi_token,
            vsock_listeners: self.vsock_listeners.clone(),
        })
    }

    /// Write the snapshot to a file for cross-process / persisted restore.
    ///
    /// Format `SMSNAP04`: magic, `ram_offset: u64`, the metadata
    /// ([`write_meta`]), zero-pad to `ram_offset` (a page boundary), then raw
    /// guest RAM. RAM is page-aligned so [`LinuxVm::restore_from_file`] can
    /// `mmap` it copy-on-write instead of copying it. KVM structs are stored as
    /// their in-memory bytes → same arch + kernel ABI only (i.e. same host).
    /// (v04 added the data-volume section to the shared meta block.)
    pub fn save(&self, path: &std::path::Path) -> std::io::Result<()> {
        use std::os::unix::fs::FileExt;
        const PG: usize = 4096;
        let mut meta = Vec::new();
        self.write_meta(&mut meta)?;
        const HDR: usize = 8 + 8; // "SMSNAP07" + ram_offset
        let ram_offset = (HDR + meta.len()).next_multiple_of(4096);

        // Header + meta + zero-pad to the page-aligned RAM offset, written as one
        // buffer at offset 0.
        let mut hdr = Vec::with_capacity(ram_offset);
        hdr.extend_from_slice(b"SMSNAP07");
        hdr.extend_from_slice(&(ram_offset as u64).to_le_bytes());
        hdr.extend_from_slice(&meta);
        hdr.resize(ram_offset, 0);

        let f = std::fs::File::create(path)?;
        f.write_all_at(&hdr, 0)?;

        // SPARSE RAM write: ~88% of an idle guest's pages are zero. Skip all-zero
        // pages (leaving file holes) and write only runs of non-zero pages via
        // pwrite at their absolute offset. Holes read back as zeros (mmap/read),
        // so this is byte-for-byte identical on restore but ~8× smaller on disk
        // for an idle guest. `set_len` below extends the file to the full logical
        // length so trailing zero pages are holes too and `cow_map_ram`'s
        // length check (file_len >= ram_offset + mem_size) holds.
        let ram = &self.ram[..];
        let n = ram.len();
        let mut i = 0;
        while i < n {
            let end = (i + PG).min(n);
            if ram[i..end].iter().all(|&b| b == 0) {
                i = end; // zero page → leave a hole
                continue;
            }
            // Coalesce a run of consecutive non-zero pages into one pwrite.
            let start = i;
            i = end;
            while i < n {
                let e = (i + PG).min(n);
                if ram[i..e].iter().all(|&b| b == 0) {
                    break;
                }
                i = e;
            }
            f.write_all_at(&ram[start..i], (ram_offset + start) as u64)?;
        }
        f.set_len((ram_offset + n) as u64)?;
        Ok(())
    }

    /// Differential save against `base_path` (a full `SMSNAP04` snapshot of the
    /// same `mem_size`): write only the 4 KiB guest-RAM pages that differ from
    /// the base, plus the full vCPU/device state. Format `SMSNAP4D`: magic,
    /// base path (len+bytes), the shared meta block (with a 0 `ram_offset`
    /// placeholder), then `num_changed` `(page_index: u32, 4096 bytes)` records.
    /// [`LinuxVm::restore_from_file`] detects the magic and restores by mmapping
    /// the base RAM copy-on-write and overlaying the changed pages — so a chain
    /// of per-layer builder snapshots each stores only its delta.
    pub fn save_diff(
        &self,
        path: &std::path::Path,
        base_path: &std::path::Path,
    ) -> std::io::Result<()> {
        const PG: usize = 4096;
        // Load + mmap the base RAM read-only to diff against.
        let mut bf = std::fs::File::open(base_path)?;
        let base_meta = read_meta(&mut bf)?;
        if base_meta.mem_size != self.mem_size {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!(
                    "diff base mem_size {} != current {}",
                    base_meta.mem_size, self.mem_size
                ),
            ));
        }
        let base_ptr = unsafe {
            libc::mmap(
                std::ptr::null_mut(),
                self.mem_size,
                libc::PROT_READ,
                libc::MAP_PRIVATE,
                bf.as_raw_fd(),
                base_meta.ram_offset as libc::off_t,
            )
        };
        if base_ptr == libc::MAP_FAILED {
            return Err(std::io::Error::last_os_error());
        }
        // SAFETY: base_ptr maps mem_size readable bytes for the body below.
        let base = unsafe { std::slice::from_raw_parts(base_ptr as *const u8, self.mem_size) };
        let npages = self.mem_size / PG;
        let mut changed: Vec<u32> = Vec::new();
        for i in 0..npages {
            let o = i * PG;
            if self.ram[o..o + PG] != base[o..o + PG] {
                changed.push(i as u32);
            }
        }
        // Any sub-page tail (mem_size is page-aligned, so normally none).
        let tail = npages * PG;

        let mut meta = Vec::new();
        self.write_meta(&mut meta)?;
        let bp = base_path.to_string_lossy();
        let res = (|| -> std::io::Result<()> {
            let mut w = BufWriter::new(std::fs::File::create(path)?);
            w.write_all(b"SMSNAP7D")?;
            w.write_all(&(bp.len() as u32).to_le_bytes())?;
            w.write_all(bp.as_bytes())?;
            w.write_all(&0u64.to_le_bytes())?; // ram_offset placeholder
            w.write_all(&meta)?;
            w.write_all(&(changed.len() as u32).to_le_bytes())?;
            for &i in &changed {
                w.write_all(&i.to_le_bytes())?;
                let o = i as usize * PG;
                w.write_all(&self.ram[o..o + PG])?;
            }
            // Trailing partial page, if any (defensive; usually empty).
            if tail < self.mem_size {
                w.write_all(&self.ram[tail..])?;
            }
            w.flush()
        })();
        unsafe { libc::munmap(base_ptr, self.mem_size) };
        res
    }

    /// Load a snapshot written by [`save`](VmSnapshot::save) fully into memory
    /// (RAM copied into a `Vec`). Cross-process *restore* should prefer
    /// [`LinuxVm::restore_from_file`], which mmaps the RAM copy-on-write instead.
    pub fn load(path: &std::path::Path) -> std::io::Result<VmSnapshot> {
        let mut f = std::fs::File::open(path)?;
        let meta = read_meta(&mut f)?;
        let mut ram = vec![0u8; meta.mem_size];
        f.seek(SeekFrom::Start(meta.ram_offset))?;
        f.read_exact(&mut ram)?;
        Ok(VmSnapshot {
            num_cpus: meta.num_cpus,
            mem_size: meta.mem_size,
            vcpus: meta.vcpus,
            devices: meta.devices,
            com1: meta.com1,
            disk: meta.disk,
            vsock: meta.vsock,
            vsock_listeners: meta.vsock_listeners,
            volumes: meta.volumes,
            tsi_token: meta.tsi_token,
            virtiofs: meta.virtiofs,
            ram,
        })
    }
}

/// Rewrite a FULL snapshot file in place as a differential (`SMSNAP7D`) against
/// `base_path` (a full snapshot of the same `mem_size`). The post-bake memory
/// dedup analog of macOS's clonefile `dedup_against`: loads the full snapshot's
/// RAM, diffs it against the base via the existing `save_diff` (so the file
/// format and restore path are already battle-tested), and atomically replaces
/// the original. On restore, the base RAM is `mmap`'d copy-on-write and shared
/// across every VM on that base (see `restore_diff_from_file`), so disk shrinks
/// to the changed pages AND host RAM is shared. Atomic + best-effort: any error
/// leaves the original full snapshot untouched. Returns the new (diff) file size.
pub fn rewrite_full_as_diff(
    full_path: &std::path::Path,
    base_path: &std::path::Path,
) -> std::io::Result<u64> {
    // Refuse to diff against a base that is itself a diff (chains aren't loaded
    // here) — same-size matching upstream already excludes diffs, but be explicit.
    {
        let mut bf = std::fs::File::open(base_path)?;
        let mut magic = [0u8; 8];
        bf.read_exact(&mut magic)?;
        if &magic == b"SMSNAP7D" {
            return Err(std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "diff base is itself a differential snapshot",
            ));
        }
    }
    let snap = VmSnapshot::load(full_path)?;
    let tmp = full_path.with_extension("snap.diff.tmp");
    let _ = std::fs::remove_file(&tmp);
    let res = (|| {
        snap.save_diff(&tmp, base_path)?;
        let size = std::fs::metadata(&tmp)?.len();
        // Atomic replace: rename only after the diff is fully written + sized.
        std::fs::rename(&tmp, full_path)?;
        Ok(size)
    })();
    if res.is_err() {
        let _ = std::fs::remove_file(&tmp);
    }
    res
}

/// Everything in a snapshot file except the RAM blob, plus where the RAM lives.
struct SnapshotMeta {
    ram_offset: u64,
    num_cpus: u8,
    mem_size: usize,
    com1: Com1State,
    devices: KvmDeviceState,
    vcpus: Vec<KvmSnapshotState>,
    disk: Option<DiskSnap>,
    vsock: Option<MmioSnapshot>,
    vsock_listeners: Vec<TsiListenerSnapshot>,
    volumes: Vec<VolumeSnap>,
    tsi_token: Option<[u8; 32]>,
    virtiofs: Vec<VirtioFsSnap>,
}

/// Read the header + metadata (everything before the RAM blob) from a snapshot.
fn read_meta<R: Read>(r: &mut R) -> std::io::Result<SnapshotMeta> {
    let mut magic = [0u8; 8];
    r.read_exact(&mut magic)?;
    // Single full-snapshot magic. The unified container always carries every
    // section (device vec, token, listeners — count 0 if empty); the legacy
    // SMSNAP04/05/06 tail-layering is gone (no installed base; re-bake policy).
    if &magic != b"SMSNAP07" {
        return Err(std::io::Error::new(
            std::io::ErrorKind::InvalidData,
            "bad snapshot magic/version",
        ));
    }
    read_meta_body(r)
}

/// Read the post-magic metadata block (`ram_offset` + all device/vCPU fields)
/// shared by the full (`SMSNAP07`) and differential (`SMSNAP7D`) formats. For
/// diffs `ram_offset` is a 0 placeholder (there is no contiguous RAM blob).
fn read_meta_body<R: Read>(r: &mut R) -> std::io::Result<SnapshotMeta> {
    let ram_offset = read_u64(r)?;
    // The portable container body (7c step 4): one shared codec, then demux the
    // opaque backend blobs back into KVM's typed fields. The intc_blob holds
    // KVM's PIT + 3 irqchips + kvmclock POD blobs; each vCPU blob its register
    // file; the device vec its disk/vsock/volume/virtio-fs records.
    let meta = crate::snapshot_frame::ContainerMeta::read_container(r)?;

    let com1 = Com1State {
        ier: meta.com1[0],
        lcr: meta.com1[1],
        mcr: meta.com1[2],
        scr: meta.com1[3],
        dll: meta.com1[4],
        dlm: meta.com1[5],
    };

    let mut ic = std::io::Cursor::new(&meta.intc_blob);
    let devices = KvmDeviceState {
        pit: read_blob_pod::<kvm_pit_state2>(&mut ic)?,
        irqchips: [
            read_blob_pod::<kvm_irqchip>(&mut ic)?,
            read_blob_pod::<kvm_irqchip>(&mut ic)?,
            read_blob_pod::<kvm_irqchip>(&mut ic)?,
        ],
        clock: read_blob_pod::<kvm_clock_data>(&mut ic)?,
    };

    let mut vcpus = Vec::with_capacity(meta.vcpu_blobs.len());
    for vb in &meta.vcpu_blobs {
        // Per-vCPU register file via the seam — inverse of the write path above.
        let mut vc = std::io::Cursor::new(vb);
        vcpus.push(crate::kvm::KvmVcpu::read_snapshot_state(&mut vc)?);
    }

    // Demux the unified device vec back into the typed fields the restore path
    // reconstructs from — device reconstruction is unchanged; only the wire
    // framing converged onto ContainerMeta.
    let mut disk = None;
    let mut vsock = None;
    let mut volumes = Vec::new();
    let mut virtiofs = Vec::new();
    for rec in meta.devices {
        let bad = || {
            std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                "device kind/backing mismatch",
            )
        };
        match (rec.kind, rec.backing) {
            (DeviceKind::Blk, DeviceBacking::Disk { path, size }) => {
                disk = Some(DiskSnap {
                    path,
                    size,
                    mmio: rec.mmio,
                });
            }
            (DeviceKind::Vsock, _) => vsock = Some(rec.mmio),
            (DeviceKind::Volume, DeviceBacking::Volume { path, size, mount }) => {
                volumes.push(VolumeSnap {
                    path,
                    size,
                    mount,
                    mmio: rec.mmio,
                });
            }
            (
                DeviceKind::VirtioFs,
                DeviceBacking::VirtioFs {
                    tag,
                    mount,
                    host_path,
                    dax_gpa,
                    dax_window_len,
                    backend_state,
                    dax_state,
                },
            ) => {
                virtiofs.push(VirtioFsSnap {
                    host_path,
                    tag,
                    mount,
                    dax_gpa,
                    dax_window_len,
                    mmio: rec.mmio,
                    backend_state,
                    dax_state,
                });
            }
            _ => return Err(bad()),
        }
    }

    Ok(SnapshotMeta {
        ram_offset,
        num_cpus: meta.num_cpus,
        mem_size: meta.mem_size as usize,
        com1,
        devices,
        vcpus,
        disk,
        vsock,
        vsock_listeners: meta.vsock_listeners,
        volumes,
        tsi_token: meta.tsi_token,
        virtiofs,
    })
}

/// View a `#[repr(C)]` POD value as its raw bytes (for snapshot serialization).
fn pod_bytes<T>(v: &T) -> &[u8] {
    // SAFETY: T is a #[repr(C)] KVM struct (plain data); we only read its bytes.
    unsafe { std::slice::from_raw_parts(v as *const T as *const u8, std::mem::size_of::<T>()) }
}

fn write_blob<W: Write>(w: &mut W, bytes: &[u8]) -> std::io::Result<()> {
    w.write_all(&(bytes.len() as u32).to_le_bytes())?;
    w.write_all(bytes)
}

/// Read a length-prefixed byte blob written by [`write_blob`].
fn read_blob_vec<R: Read>(r: &mut R) -> std::io::Result<Vec<u8>> {
    let len = read_u32(r)? as usize;
    let mut buf = vec![0u8; len];
    r.read_exact(&mut buf)?;
    Ok(buf)
}

/// Read a length-prefixed UTF-8 string blob written by [`write_blob`].
fn read_blob_string<R: Read>(r: &mut R, what: &str) -> std::io::Result<String> {
    let buf = read_blob_vec(r)?;
    String::from_utf8(buf).map_err(|_| {
        std::io::Error::new(std::io::ErrorKind::InvalidData, format!("{what} not utf8"))
    })
}

fn read_u32<R: Read>(r: &mut R) -> std::io::Result<u32> {
    let mut b = [0u8; 4];
    r.read_exact(&mut b)?;
    Ok(u32::from_le_bytes(b))
}
fn read_u64<R: Read>(r: &mut R) -> std::io::Result<u64> {
    let mut b = [0u8; 8];
    r.read_exact(&mut b)?;
    Ok(u64::from_le_bytes(b))
}

/// Read a length-prefixed POD blob and reinterpret it as `T`. Errors if the
/// stored length doesn't match `size_of::<T>()` (arch/ABI mismatch).
fn read_blob_pod<T: Copy>(r: &mut impl Read) -> std::io::Result<T> {
    let len = read_u32(r)? as usize;
    if len != std::mem::size_of::<T>() {
        return Err(std::io::Error::new(
            std::io::ErrorKind::InvalidData,
            format!("blob size {len} != {}", std::mem::size_of::<T>()),
        ));
    }
    let mut buf = vec![0u8; len];
    r.read_exact(&mut buf)?;
    // SAFETY: len == size_of::<T>() and T is a #[repr(C)] Copy POD.
    Ok(unsafe { std::ptr::read_unaligned(buf.as_ptr() as *const T) })
}

/// Cross-thread handle for host→guest vsock connections (see
/// [`LinuxVm::vsock_handle`]).
#[derive(Clone)]
pub struct VsockHandle {
    vsock: Arc<Vsock>,
}

impl VsockHandle {
    /// Open a host→guest vsock stream to a port the guest is listening on (e.g.
    /// the exec agent on 1028). Returns a [`UnixStream`](std::os::unix::net::UnixStream)
    /// bridged to the guest socket by the muxer: write to send to the guest,
    /// read for its replies.
    pub fn connect(&self, guest_port: u32) -> std::io::Result<std::os::unix::net::UnixStream> {
        let (host_end, muxer_end) = std::os::unix::net::UnixStream::pair()?;
        self.vsock
            .muxer()
            .open_native_to_guest(MuxerStream::Unix(muxer_end), guest_port)?;
        Ok(host_end)
    }
}

/// Host→guest serial input handle (see [`LinuxVm::serial_input`]).
#[derive(Clone)]
pub struct SerialInput {
    com1: Arc<Mutex<Com1>>,
    vm: Arc<KvmVm>,
}

impl SerialInput {
    /// Deliver one byte to the guest's serial RX, raising the RX interrupt if
    /// the guest has enabled it. The in-kernel irqchip wakes an idle (HLT'd)
    /// vCPU, so no force-exit is needed.
    pub fn push(&self, byte: u8) {
        let mut c = lock_recover(&self.com1);
        c.push_rx(byte);
        let level = c.irq_line();
        drop(c);
        let _ = self.vm.set_irq(COM1_IRQ, level);
    }
}

/// A booted VM whose vCPUs are executing on background threads, returned by
/// [`LinuxVm::start_running`]. The product `Vm` handle holds this to keep the
/// guest live (serving exec over vsock) while supporting on-demand snapshot or
/// stop. `wait`/`stop`/`snapshot` consume the handle — they quiesce and join
/// the vCPU threads. Dropping it without calling one of those stops the guest
/// safely (the vCPU threads are force-exited and joined BEFORE the inner
/// `LinuxVm` drop munmaps guest RAM — joining first is a memory-safety
/// requirement, since a running vCPU's KVM exit can still touch that RAM).
pub struct RunningVm {
    vm: LinuxVm,
    threads: Vec<std::thread::JoinHandle<(ExitReason, Option<KvmSnapshotState>)>>,
    stop: Arc<AtomicBool>,
    snapshot_req: Arc<AtomicBool>,
    handles: Vec<KvmVcpuHandle>,
    pause: Arc<PauseCoord>,
    /// Shared with every vCPU thread (clone of `LinuxVm::reset_seq`):
    /// `reset_to_snapshot` bumps it after resetting RAM/intc/devices so the
    /// parked vCPUs re-apply their baseline registers on resume.
    reset_seq: Arc<AtomicU64>,
}

/// The live-snapshot pause/resume rendezvous: the snapshotter requests a pause +
/// force-exits every vCPU; each thread captures its state and parks until
/// resumed. This is now the shared, loom-proven `PauseBarrier` (Phase 3 7b.3),
/// parameterized with KVM's per-vCPU snapshot state — the same barrier HVF uses.
type PauseCoord = crate::vcpu_dispatch::PauseBarrier<KvmSnapshotState>;

impl RunningVm {
    /// Drive virtio-balloon inflation on the live guest — see
    /// [`LinuxVm::request_balloon_inflate`]. Returns `true` if a balloon device
    /// was present. The vCPUs are running, so the guest's balloon driver reacts
    /// to the config-change IRQ asynchronously.
    pub fn request_balloon_inflate(&self, pages: u32) -> bool {
        self.vm.request_balloon_inflate(pages)
    }

    /// Join the BSP-first vCPU threads, returning the BSP's exit reason. Helper
    /// for [`wait`](Self::wait) / [`stop`](Self::stop); leaves `self.threads`
    /// empty so `Drop` is a no-op.
    fn join_all(&mut self) -> ExitReason {
        let mut result = ExitReason::Unknown("no vcpus".into());
        for (i, t) in self.threads.drain(..).enumerate() {
            let (r, _) = t
                .join()
                .unwrap_or((ExitReason::Unknown("vcpu thread panicked".into()), None));
            if i == 0 {
                result = r;
            }
        }
        result
    }

    /// Block until the guest stops on its own (kernel halt/reboot); returns the
    /// BSP's exit reason.
    pub fn wait(mut self) -> ExitReason {
        self.join_all()
    }

    /// Force the guest to stop now (force-exit every vCPU) and join. Returns the
    /// BSP's exit reason (typically `Canceled`).
    pub fn stop(mut self) -> ExitReason {
        // `stop` is stored BEFORE force_exit and never cleared — the run loop's
        // resume tail re-reads it after wiping the pause gates, so a force_exit
        // that races into that window can't be swallowed (see run_vcpu). One
        // force_exit then suffices: `immediate_exit` gates the next KVM_RUN entry
        // (not-yet-bound / between-runs) and SIGUSR1 breaks a vCPU blocked inside
        // KVM_RUN, so the join below returns promptly with no re-kicking.
        self.stop.store(true, Ordering::SeqCst);
        KvmVcpuHandle::force_exit(&self.handles);
        self.join_all()
    }

    /// Snapshot the running guest: request a snapshot, force every vCPU to a
    /// clean instruction boundary, and capture a full [`VmSnapshot`]
    /// (vCPU + device + serial + virtio-cursor + RAM state). Consumes the
    /// handle — the guest stops. Restore with [`LinuxVm::restore`] /
    /// [`restore_from_file`](LinuxVm::restore_from_file).
    pub fn snapshot(mut self) -> Result<VmSnapshot, KvmError> {
        let ncpus = self.handles.len();
        self.snapshot_req.store(true, Ordering::SeqCst);
        // A single force_exit is race-free: `immediate_exit` gates guest re-entry
        // (so a not-yet-bound or between-runs vCPU stops at its next KVM_RUN entry)
        // and the SIGUSR1 breaks one already blocked inside KVM_RUN. Exactly one
        // signal is sent, so none can land on the post-break `capture_snapshot`
        // ioctls. Each vCPU stops at a clean instruction boundary (EINTR at entry
        // or between guest instructions), which is the snapshot quiesce point.
        KvmVcpuHandle::force_exit(&self.handles);
        let threads = std::mem::take(&mut self.threads);
        self.vm.capture_quiesced(threads, ncpus)
    }

    /// LIVE snapshot: capture a full [`VmSnapshot`] WITHOUT stopping the guest.
    /// Pauses every vCPU at a clean boundary (force-exit → park), captures CPU +
    /// device + RAM state, then resumes — so the same VM keeps running and can be
    /// snapshotted again (the builder snapshots one long-lived VM per layer).
    /// Assumes guest I/O is quiescent at call time (the builder snapshots between
    /// instructions).
    pub fn snapshot_live(&self) -> Result<VmSnapshot, KvmError> {
        let ncpus = self.handles.len();
        // Freeze every vCPU at a clean boundary via the shared PauseBarrier:
        // request the pause, force-exit so each vCPU lands in `park`, then wait
        // for all of them to deposit their captured state.
        self.pause.request_pause();
        KvmVcpuHandle::force_exit(&self.handles);
        let states = self.pause.wait_all_parked(ncpus);

        // Capture devices + RAM while everything is paused, then resume.
        let snap = self.vm.capture_with_states(states);
        self.pause.resume();
        snap
    }

    /// In-place reset of a live VM to its snapshot baseline (isolated
    /// warm-reuse): far cheaper than teardown+rebuild because the KVM VM, vCPUs,
    /// vCPU threads, RAM mapping, and device threads all persist. Freezes every
    /// vCPU at a clean boundary, drops the dirty copy-on-write RAM pages (so they
    /// re-fault the snapshot baseline), restores the in-kernel intc/timer +
    /// serial + every virtio device's MMIO/queue state to baseline and drains
    /// the vsock muxer's in-flight host connections, bumps `reset_seq`, and
    /// resumes — each vCPU re-applies its baseline registers on resume (see
    /// `run_vcpu`). The guest comes back byte-identical to the snapshot point, so
    /// the next acquire gets a clean VM (isolation) without paying a full rebuild.
    ///
    /// Assumes guest I/O is quiescent at call time (the pool calls this on
    /// release, after the cycle's exec completed) — the same quiescence
    /// assumption HVF's restore makes. Errors if the VM has no snapshot baseline
    /// (a cold-booted VM, which the pool never resets).
    pub fn reset_to_snapshot(&self) -> Result<(), KvmError> {
        let (Some(intc), Some(com1)) = (&self.vm.reset_intc, &self.vm.reset_com1) else {
            return Err(KvmError(
                "reset_to_snapshot: VM has no snapshot baseline (cold-booted?)".into(),
            ));
        };
        let ncpus = self.handles.len();
        // 1. Freeze every vCPU at a clean boundary (parked in the PauseBarrier).
        self.pause.request_pause();
        KvmVcpuHandle::force_exit(&self.handles);
        let _dirty = self.pause.wait_all_parked(ncpus); // discard the dirty states

        // 2. Reset guest RAM to baseline: drop the private (dirty) CoW pages so
        //    the next guest touch re-faults the original snapshot file content.
        //    O(1) syscall; the file stays mapped — the KVM analog of HVF's
        //    remap_cow.
        // SAFETY: `host` is the live guest-RAM mmap (`mem_size` bytes) and every
        // vCPU is parked, so nothing accesses guest RAM concurrently.
        unsafe {
            libc::madvise(
                self.vm.host as *mut libc::c_void,
                self.vm.mem_size,
                libc::MADV_DONTNEED,
            );
        }

        // 3. Restore the in-kernel intc + timer (PIT/PIC/IOAPIC/kvmclock).
        self.vm.vm.restore_devices(intc)?;

        // 4. Reset the 16550 serial to baseline.
        {
            let mut c = self.vm.com1.lock().unwrap_or_else(|e| e.into_inner());
            *c = Com1::new();
            c.restore(com1);
        }

        // 5. Reset virtio device state to baseline (vCPUs parked + guest I/O
        //    quiescent): re-apply each device's MMIO/queue cursors and drain the
        //    vsock muxer's in-flight host connections, so the host device views
        //    match the guest's reset-to-baseline RAM (else avail/used indices
        //    would desync → a wedged or misdelivering device on resume).
        if let (Some(m), Some(b)) = (&self.vm.blk_mmio, &self.vm.reset_blk_mmio) {
            m.restore_state(b);
        }
        if let Some(v) = &self.vm.vsock {
            // Drop in-flight TSI listeners/proxies/streams + pending RX so the
            // next tenant gets a fresh muxer — the documented between-dispatch
            // recycle (the same call HVF's pool-worker restore uses).
            v.muxer().reset();
            v.reset_pending_rx();
        }
        if let (Some(m), Some(b)) = (&self.vm.vsock_mmio, &self.vm.reset_vsock_mmio) {
            m.restore_state(b);
        }
        for (m, b) in self.vm.volume_mmios.iter().zip(&self.vm.reset_volume_mmios) {
            m.restore_state(b);
        }
        for (f, b) in self.vm.fs_mounts.iter().zip(&self.vm.reset_fs_mmios) {
            f.mmio.restore_state(b);
        }

        // 6. Signal the parked vCPUs to re-apply their baseline registers.
        self.reset_seq.fetch_add(1, Ordering::SeqCst);

        // 7. Resume: vCPUs un-park, re-apply baseline regs, re-enter the guest.
        self.pause.resume();
        Ok(())
    }

    /// Start a host-side TLS terminator: accept HTTPS on `cfg.listen_addr`,
    /// terminate with rustls, and bridge the decrypted plaintext to the guest's
    /// TSI listener (via the muxer's auto-bound host TCP port) — the guest sees
    /// plain HTTP. The Mac/HVF equivalent runs in the worker subprocess; here it
    /// runs in-process. Requires the VM to have vsock enabled. Fire-and-forget:
    /// the acceptor thread lives for the VM's lifetime.
    pub fn expose_tls(
        &self,
        cfg: crate::vmm::tls::TlsConfig,
    ) -> Result<std::net::SocketAddr, crate::vmm::tls::StartError> {
        let vsock = self.vm.vsock.clone().ok_or_else(|| {
            crate::vmm::tls::StartError::Config("vsock not enabled on this VM".into())
        })?;
        crate::vmm::tls::start(cfg, vsock)
    }
}

impl Drop for RunningVm {
    fn drop(&mut self) {
        // If the handle is dropped without wait/stop/snapshot, the vCPU threads
        // are still running. They MUST be joined before the inner LinuxVm drops
        // (which munmaps guest RAM) — a live vCPU's KVM exit can still write
        // that RAM. wait/stop/snapshot already drained `threads`, so this is a
        // no-op after them.
        if !self.threads.is_empty() {
            // `stop` set before force_exit and never cleared; the run loop's
            // resume tail re-reads it after wiping the pause gates, so a force_exit
            // racing that window isn't swallowed (see run_vcpu). One force_exit then
            // suffices (immediate_exit gates re-entry + SIGUSR1 breaks a blocked
            // KVM_RUN — see `KvmVcpuHandle::force_exit`), so the joins below return
            // promptly without re-kicking.
            self.stop.store(true, Ordering::SeqCst);
            KvmVcpuHandle::force_exit(&self.handles);
            for t in self.threads.drain(..) {
                let _ = t.join();
            }
        }
    }
}

/// Lock a mutex, recovering the guard even if a previous holder panicked.
///
/// The serial device is shared across every vCPU thread (and the snapshot
/// path). A panic in one thread while it holds the lock would poison the mutex,
/// and a plain `.lock().unwrap()` everywhere else would then panic too — one
/// thread's fault cascading into a full-VM wedge. These critical sections are
/// short and self-contained (a few register writes / a byte of TX), so the
/// worst case of recovering a poisoned guard is a momentary serial glitch, not
/// guest-state corruption. Prefer that to taking the whole VM down.
fn lock_recover<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
    m.lock().unwrap_or_else(std::sync::PoisonError::into_inner)
}

/// Service one *resolved* guest exit (the `KVM_RUN` returned `Ok`): COM1 PIO,
/// the MMIO bus (virtio), and the halt/shutdown/intr terminal cases. Returns
/// `Continue` to re-enter the guest or `Break(reason)` to stop this vCPU.
///
/// Decoupled from [`run_vcpu`]'s EINTR/EAGAIN/live-snapshot-pause *lifecycle* so
/// the exit dispatch is one self-contained, reviewable unit — the structural
/// prerequisite for the shared cross-backend run loop (Phase 3 7b; see
/// `docs/design/vmm-backend-unification-2026-06-07.md` §7b). The lifecycle
/// (force-exit, pause rendezvous, AP wait-for-SIPI backoff) stays in `run_vcpu`;
/// only the post-`Ok` exit handling lives here.
///
/// Reads (`IoIn` / `MmioRead`) are completed IN PLACE by writing the value into
/// the exit's `&mut data` slice, which aliases the `kvm_run` page; the next
/// `KVM_RUN` resumes and KVM itself moves it into the guest register. This is
/// the x86 IO-completion model that the seam's owned `step() -> VcpuExit` cannot
/// express, and is why the KVM loop drives `fd.run()` directly.
///
/// `#[inline]` so the extraction is free: it folds back into `run_vcpu`'s loop
/// exactly as the prior inline `match` did — no call overhead on the per-exit
/// path (already dominated by the `KVM_RUN` world switch).
#[inline]
fn dispatch_kvm_exit(
    exit: KvmExit<'_>,
    vm: &KvmVm,
    bus: &MmioBus,
    com1: &Mutex<Com1>,
    stop: &AtomicBool,
    vcpu: &KvmVcpu,
) -> std::ops::ControlFlow<ExitReason> {
    use std::ops::ControlFlow;
    match exit {
        KvmExit::IoOut(port, data) => {
            if COM1_PORTS.contains(&port) {
                let mut c = lock_recover(com1);
                let mut out = std::io::stdout().lock();
                for &b in data.iter() {
                    if let Some(tx) = c.write(port, b) {
                        let _ = out.write_all(&[tx]);
                    }
                }
                let _ = out.flush();
                let level = c.irq_line();
                drop(out);
                drop(c);
                let _ = vm.set_irq(COM1_IRQ, level);
            }
        }
        KvmExit::IoIn(port, data) => {
            if COM1_PORTS.contains(&port) {
                let mut c = lock_recover(com1);
                let v = c.read(port);
                for b in data.iter_mut() {
                    *b = v;
                }
                let level = c.irq_line();
                drop(c);
                let _ = vm.set_irq(COM1_IRQ, level);
            }
        }
        KvmExit::MmioWrite(addr, data) => {
            let mut buf = [0u8; 8];
            buf[..data.len()].copy_from_slice(data);
            bus.write(addr, u64::from_le_bytes(buf), data.len() as u8);
        }
        KvmExit::MmioRead(addr, data) => {
            if let Some(v) = bus.read(addr, data.len() as u8) {
                let le = v.to_le_bytes();
                data.copy_from_slice(&le[..data.len()]);
            }
        }
        KvmExit::Hlt => return ControlFlow::Break(ExitReason::Halt),
        KvmExit::Shutdown => return ControlFlow::Break(ExitReason::Shutdown),
        KvmExit::Intr => {
            if stop.load(Ordering::SeqCst) || vcpu.should_exit() {
                return ControlFlow::Break(ExitReason::Canceled);
            }
        }
        other => return ControlFlow::Break(ExitReason::Unknown(format!("{other:?}"))),
    }
    ControlFlow::Continue(())
}

/// One vCPU's device-serving loop. Owns the vCPU on its thread; shares the irq
/// chip (`vm`), the MMIO bus, and the serial device. On stop it sets `stop` and
/// force-exits the peers so the whole VM winds down together.
fn run_vcpu(
    vcpu: KvmVcpu,
    vm: Arc<KvmVm>,
    bus: Arc<MmioBus>,
    com1: Arc<Mutex<Com1>>,
    stop: Arc<AtomicBool>,
    snapshot_req: Arc<AtomicBool>,
    exits: Arc<AtomicU64>,
    count_exits: bool,
    handles: Vec<KvmVcpuHandle>,
    pause: Option<Arc<PauseCoord>>,
    vcpu_idx: usize,
    // In-place reset (isolated warm-reuse): this vCPU's snapshot-baseline state
    // (None for a cold-booted VM, which is never reset) and the shared reset
    // counter the host bumps after resetting RAM/intc/devices.
    baseline: Option<KvmSnapshotState>,
    reset_seq: Arc<AtomicU64>,
) -> (ExitReason, Option<KvmSnapshotState>) {
    if let Err(e) = vcpu.bind_thread() {
        return (ExitReason::Unknown(format!("bind_thread: {e}")), None);
    }
    // Last reset generation this vCPU has applied. Starts at the current value
    // (0 at spawn) so the FIRST reset (which bumps to >0) triggers a re-apply,
    // while ordinary snapshot-pause/resume cycles (no bump) do not.
    let mut last_reset_applied = reset_seq.load(Ordering::SeqCst);
    let reason = {
        let mut fd = vcpu.vcpu.borrow_mut();
        loop {
            // A force-exit SIGUSR1 surfaces as EINTR from KVM_RUN (not a
            // successful KVM_EXIT_INTR). A requested stop ends this vCPU; a
            // spurious signal resumes it.
            let exit = match fd.run() {
                Ok(e) => e,
                Err(e) if e.errno() == libc::EINTR => {
                    if stop.load(Ordering::SeqCst) {
                        break ExitReason::Canceled;
                    }
                    // Live-snapshot pause: capture this vCPU's state (via the fd
                    // we already hold — a 2nd RefCell borrow would panic), park
                    // until the snapshotter resumes us (new gen), then continue.
                    // Checked before `should_exit` because the pause is delivered
                    // via force_exit (which sets the exit flag); cleared on resume.
                    if let Some(pc) = &pause {
                        if pc.is_paused() {
                            // Capture via the fd we already hold (a 2nd RefCell
                            // borrow would panic), then park on the shared
                            // PauseBarrier until the snapshotter resumes us.
                            if let Ok(s) = vcpu.capture_snapshot_locked(&fd) {
                                pc.park(vcpu_idx, s);
                            }
                            // Drain the pending SIGUSR1 that force-exited us.
                            // KVM_RUN returns EINTR with the signal STILL pending
                            // (KVM restores the thread mask, which blocks SIGUSR1,
                            // before the no-op handler can run). The consuming
                            // snapshot path is fine because its thread exits; but
                            // on a non-exiting live resume the pending signal
                            // re-fires on every KVM_RUN → EINTR spin. sigtimedwait
                            // dequeues it synchronously (it's blocked at the thread
                            // level here, outside KVM_RUN).
                            unsafe {
                                let mut set: libc::sigset_t = std::mem::zeroed();
                                libc::sigemptyset(&mut set);
                                libc::sigaddset(&mut set, libc::SIGUSR1);
                                let ts = libc::timespec {
                                    tv_sec: 0,
                                    tv_nsec: 0,
                                };
                                while libc::sigtimedwait(&set, std::ptr::null_mut(), &ts) >= 0 {}
                            }
                            // In-place reset: if the host bumped reset_seq while
                            // we were parked, it reset guest RAM + intc + devices
                            // to the snapshot baseline; re-apply THIS vCPU's
                            // baseline registers on our own owning thread (via the
                            // fd we already hold) so we resume from the snapshot
                            // point, not our dirty pre-reset state.
                            let rs = reset_seq.load(Ordering::SeqCst);
                            if rs != last_reset_applied {
                                if let Some(b) = &baseline {
                                    let _ = vcpu.restore_snapshot_locked(&fd, b);
                                }
                                last_reset_applied = rs;
                            }
                            // Clear the re-entry gate that force_exit set, or the
                            // resumed vCPU's next KVM_RUN would EINTR-at-entry
                            // forever. Paired with clear_exit() (the flag side).
                            fd.set_kvm_immediate_exit(0);
                            vcpu.clear_exit();
                            // A concurrent teardown `stop()`/`Drop` may have
                            // force-exited us DURING this resume tail: its SIGUSR1
                            // got swallowed by the sigtimedwait drain above and its
                            // `immediate_exit`/`exit` gates wiped by the two lines
                            // we just ran — so without this check we'd re-enter the
                            // guest and run forever, hanging the join in stop(). The
                            // teardown sets `stop` BEFORE force_exit and never clears
                            // it (unlike the registry `exit` flag), so re-reading it
                            // here — after clearing the pause gates — closes that
                            // race: honor the stop instead of resuming. (A stop that
                            // lands AFTER this read instead re-arms `immediate_exit`,
                            // caught at the next KVM_RUN entry.)
                            if stop.load(Ordering::SeqCst) || vcpu.should_exit() {
                                break ExitReason::Canceled;
                            }
                            // Symmetric guard for a NEW pause/reset requested DURING
                            // this resume tail (e.g. back-to-back reset_to_snapshot
                            // with no guest work between): that reset's force_exit
                            // SIGUSR1 was swallowed by the sigtimedwait drain above
                            // and its immediate_exit gate wiped by set_kvm_immediate
                            // _exit(0) — so we'd re-enter the guest and never park,
                            // wedging the new reset's wait_all_parked forever (the
                            // no-exec rapid-reset hang). request_pause() sets the
                            // paused flag BEFORE force_exit, so re-reading it here
                            // catches that case: re-arm immediate_exit so the
                            // imminent KVM_RUN returns EINTR at entry and re-enters
                            // this pause handler to park. (A pause arriving AFTER
                            // this read sets immediate_exit itself — caught at the
                            // next KVM_RUN entry — so either ordering parks cleanly.)
                            if pc.is_paused() {
                                fd.set_kvm_immediate_exit(1);
                            }
                            continue;
                        }
                    }
                    if vcpu.should_exit() {
                        break ExitReason::Canceled;
                    }
                    continue;
                }
                Err(e) if e.errno() == libc::EAGAIN => {
                    // A secondary CPU not yet brought up: KVM_RUN returns EAGAIN
                    // while the vCPU is in wait-for-SIPI (it does not block).
                    // Back off and retry until the kernel sends INIT-SIPI-SIPI
                    // (or we're told to stop).
                    if stop.load(Ordering::SeqCst) || vcpu.should_exit() {
                        break ExitReason::Canceled;
                    }
                    std::thread::sleep(std::time::Duration::from_millis(1));
                    continue;
                }
                Err(e) => break ExitReason::Unknown(format!("{e}")),
            };
            // Benchmark-only exit counting (KVM_COUNT_EXITS). Low 32 bits =
            // virtio QueueNotify exits (what ioeventfd removes); high 32 bits =
            // all other device exits (serial etc.), for context.
            if count_exits {
                match exit {
                    KvmExit::MmioWrite(addr, _) if addr == VIRTIO_QUEUE_NOTIFY => {
                        exits.fetch_add(1, Ordering::Relaxed);
                    }
                    KvmExit::IoOut(..)
                    | KvmExit::IoIn(..)
                    | KvmExit::MmioWrite(..)
                    | KvmExit::MmioRead(..) => {
                        exits.fetch_add(1 << 32, Ordering::Relaxed);
                    }
                    _ => {}
                }
            }
            // Lifecycle (EINTR/EAGAIN/pause) handled above; the resolved exit's
            // device + terminal dispatch is one self-contained unit.
            if let std::ops::ControlFlow::Break(reason) =
                dispatch_kvm_exit(exit, &vm, &bus, &com1, &stop, &vcpu)
            {
                break reason;
            }
        }
    };

    // If this Canceled was a snapshot quiesce, capture this vCPU's state at the
    // (clean) instruction boundary the force-exit paused it on.
    let snap = if matches!(reason, ExitReason::Canceled) && snapshot_req.load(Ordering::SeqCst) {
        match vcpu.capture_snapshot() {
            Ok(s) => Some(s),
            Err(_) => None,
        }
    } else {
        None
    };

    // This vCPU stopped for a real reason — wind the others down.
    if !matches!(reason, ExitReason::Canceled) {
        stop.store(true, Ordering::SeqCst);
        KvmVcpuHandle::force_exit(&handles);
    }
    (reason, snap)
}

impl Drop for LinuxVm {
    fn drop(&mut self) {
        // Stop + join the host-side bridge acceptor threads (exec bridge + TSI
        // mux) FIRST: they loop on `listener.incoming()` and, per accepted
        // connection, write into guest RAM via the muxer. Joining them here
        // both prevents a post-munmap use-after-free AND reclaims the thread +
        // bound socket fd that would otherwise leak for every VM (pool churn).
        // (`Vsock::shutdown` below is the race-free backstop; this is the clean
        // stop so the threads actually exit instead of blocking forever.)
        let bridges = std::mem::take(
            &mut *self
                .bridges
                .lock()
                .unwrap_or_else(std::sync::PoisonError::into_inner),
        );
        for b in bridges {
            b.shutdown();
        }
        // Stop the device thread BEFORE unmapping guest RAM (it DMAs into it):
        // set the flag, wake it off the ioeventfd, and join.
        if let Some(t) = self.dev_thread.take() {
            self.dev_stop.store(true, Ordering::SeqCst);
            if let Some(efd) = &self.dev_wake {
                let _ = efd.write(1);
            }
            let _ = t.join();
        }
        // Volume drain threads share `dev_stop`; wake + join them too (they DMA
        // into guest RAM, so they must stop before it's unmapped/captured).
        if !self.volume_threads.is_empty() {
            self.dev_stop.store(true, Ordering::SeqCst);
            for w in &self.volume_wakes {
                let _ = w.write(1);
            }
            for t in self.volume_threads.drain(..) {
                let _ = t.join();
            }
        }
        // Stop + join the vsock muxer's I/O thread BEFORE unmapping guest RAM.
        // It drains inbound packets into the guest's RX descriptors (kick →
        // try_drain_rx), so a still-running detached thread would write freed
        // memory after the munmap below — an intermittent use-after-free seen as
        // a SIGSEGV at process exit when many VMs tear down back-to-back.
        if let Some(vsock) = &self.vsock {
            vsock.shutdown();
        }
        // The VM (and its memory slots) are torn down as `vm`/`vcpus` drop;
        // unmap the backing RAM afterwards.
        unsafe { libc::munmap(self.host as *mut libc::c_void, self.mem_size) };
    }
}

#[cfg(test)]
mod snapshot_listener_tests {
    use crate::devices::virtio::vsock::muxer::TsiListenerSnapshot;

    fn roundtrip(l: &TsiListenerSnapshot) -> TsiListenerSnapshot {
        // The canonical per-record codec on the type (the one ContainerMeta uses
        // to carry TSI routes in the unified snapshot container).
        let mut buf = Vec::new();
        l.write_to(&mut buf).expect("write");
        let mut cur = std::io::Cursor::new(buf);
        TsiListenerSnapshot::read_from(&mut cur).expect("read")
    }

    /// A TSI listener record survives serialization byte-for-byte, including the
    /// `Some`/`None` `inet_port` discriminant. This is the per-record codec the
    /// unified snapshot container uses to carry host port-forward routes across a
    /// warm restore (so a service that was already `listen()`ing stays reachable).
    #[test]
    fn tsi_listener_record_roundtrips() {
        let with_inet = TsiListenerSnapshot {
            cid: 3,
            peer_port: 4242906079,
            vm_port: 4242906079,
            family: 2,
            socktype: 1,
            inet_port: Some(80),
        };
        let r = roundtrip(&with_inet);
        assert_eq!(r.cid, with_inet.cid);
        assert_eq!(r.peer_port, with_inet.peer_port);
        assert_eq!(r.vm_port, with_inet.vm_port);
        assert_eq!(r.family, with_inet.family);
        assert_eq!(r.socktype, with_inet.socktype);
        assert_eq!(r.inet_port, Some(80));

        // The None discriminant must round-trip too (no inet pin).
        let no_inet = TsiListenerSnapshot {
            cid: 3,
            peer_port: 7,
            vm_port: 99,
            family: 10,
            socktype: 1,
            inet_port: None,
        };
        assert_eq!(roundtrip(&no_inet).inet_port, None);
    }
}

#[cfg(test)]
mod irq_budget_tests {
    use super::{
        balloon_irq, virtio_irq_budget_ok, FS_IRQ_BASE, IOAPIC_GSI_CEILING, VOLUME_IRQ_BASE,
    };

    /// The common device sets fit: a few volumes + fs mounts, with or without
    /// balloon, all stay within the IOAPIC GSI budget.
    #[test]
    fn typical_device_sets_fit() {
        assert!(virtio_irq_budget_ok(0, 0, false).is_ok());
        assert!(virtio_irq_budget_ok(0, 1, false).is_ok());
        assert!(virtio_irq_budget_ok(3, 2, false).is_ok());
        assert!(virtio_irq_budget_ok(0, 0, true).is_ok()); // balloon at FS_IRQ_BASE
        assert!(virtio_irq_budget_ok(2, 3, true).is_ok());
    }

    /// More volumes than the volume IRQ window allows would alias a volume IRQ
    /// onto the virtio-fs range — rejected (this is the gap the old inline
    /// `VOLUME_IRQ_BASE + i` arithmetic silently shipped).
    #[test]
    fn too_many_volumes_rejected() {
        let max = (FS_IRQ_BASE - VOLUME_IRQ_BASE) as usize;
        assert!(virtio_irq_budget_ok(max, 0, false).is_ok());
        let err = virtio_irq_budget_ok(max + 1, 0, false).unwrap_err();
        assert!(err.contains("too many data volumes"), "got: {err}");
    }

    /// Enough fs mounts (+ balloon) to push the highest GSI to/over the IOAPIC
    /// ceiling is rejected rather than silently giving a device an undelivered line.
    #[test]
    fn ioapic_overflow_rejected() {
        // Largest fs count whose balloon IRQ still fits: balloon_irq(n) < ceiling.
        let max_fs_with_balloon = (IOAPIC_GSI_CEILING - 1 - FS_IRQ_BASE) as usize; // balloon at FS_IRQ_BASE+n must be < ceiling
        assert!(virtio_irq_budget_ok(0, max_fs_with_balloon, true).is_ok());
        assert_eq!(
            balloon_irq(max_fs_with_balloon),
            IOAPIC_GSI_CEILING - 1,
            "boundary balloon IRQ is the top usable GSI"
        );
        let err = virtio_irq_budget_ok(0, max_fs_with_balloon + 1, true).unwrap_err();
        assert!(err.contains("IRQ budget exhausted"), "got: {err}");
    }
}