ktstr 0.15.0

Test harness for Linux process schedulers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
//! Device-side virtio-block: MMIO dispatch, FSM, request state, and
//! `Drop`. The handler bodies live in `handlers.rs`, the request-queue
//! drain bracket lives in `drain.rs`, and the per-device counter type
//! lives in `counters.rs` — see the parent module's submodule-layout
//! doc for the full split rationale.
//!
//! See the parent module `super` for the full execution-model and
//! "why" doc — the module-level rationale for why
//! `add_used` is gated on status-write success, why throttle stalls
//! roll back the chain and arm a timerfd, and the backing-speed
//! caveat — lives there.

pub(crate) use std::fs::File;
// `AsRawFd` for the eventfd raw-fd plumbing (stop_fd / kick_fd
// `as_raw_fd` in the worker-engine drop and respawn paths), which is
// `cfg(not(test))` (the test build uses the inline engine and has no
// worker fds). The backing's vectored IO no longer needs it — that
// routes through the `Backing` trait's `preadv`/`pwritev` (the
// impl-for-File holds the `as_raw_fd` call) rather than a raw fd at
// the call site.
#[cfg(not(test))]
pub(crate) use std::os::unix::io::AsRawFd;
pub(crate) use std::sync::Arc;
pub(crate) use std::sync::OnceLock;
pub(crate) use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
// `std::thread` is used only by the `#[cfg(not(test))]` SpawnedEngine handle;
// the worker spawn/join + mpsc moved with control.rs/lifecycle.rs.
#[cfg(not(test))]
use std::thread;
pub(crate) use std::time::Duration;

pub(crate) use virtio_bindings::virtio_blk::{
    VIRTIO_BLK_F_BLK_SIZE, VIRTIO_BLK_F_FLUSH, VIRTIO_BLK_F_RO, VIRTIO_BLK_F_SEG_MAX,
    VIRTIO_BLK_F_SIZE_MAX, VIRTIO_BLK_ID_BYTES, VIRTIO_BLK_S_IOERR, VIRTIO_BLK_S_OK,
    VIRTIO_BLK_S_UNSUPP, VIRTIO_BLK_T_FLUSH, VIRTIO_BLK_T_GET_ID, VIRTIO_BLK_T_IN,
    VIRTIO_BLK_T_OUT,
};
pub(crate) use virtio_bindings::virtio_config::{
    VIRTIO_CONFIG_S_ACKNOWLEDGE, VIRTIO_CONFIG_S_DRIVER, VIRTIO_CONFIG_S_DRIVER_OK,
    VIRTIO_CONFIG_S_FAILED, VIRTIO_CONFIG_S_FEATURES_OK, VIRTIO_CONFIG_S_NEEDS_RESET,
    VIRTIO_F_VERSION_1,
};
pub(crate) use virtio_bindings::virtio_ids::VIRTIO_ID_BLOCK;
// `VIRTIO_MMIO_INT_CONFIG` and `VIRTIO_MMIO_INT_VRING` are consumed by
// `drain.rs` directly (its own per-name imports) and by `cfg(test)`
// test sub-files via the `super::*;` glob — neither path is visible
// to `clippy --lib`, so the re-export looks unused without the allow.
#[allow(unused_imports)]
pub(crate) use virtio_bindings::virtio_mmio::{
    VIRTIO_MMIO_CONFIG_GENERATION, VIRTIO_MMIO_DEVICE_FEATURES, VIRTIO_MMIO_DEVICE_FEATURES_SEL,
    VIRTIO_MMIO_DEVICE_ID, VIRTIO_MMIO_DRIVER_FEATURES, VIRTIO_MMIO_DRIVER_FEATURES_SEL,
    VIRTIO_MMIO_INT_CONFIG, VIRTIO_MMIO_INT_VRING, VIRTIO_MMIO_INTERRUPT_ACK,
    VIRTIO_MMIO_INTERRUPT_STATUS, VIRTIO_MMIO_MAGIC_VALUE, VIRTIO_MMIO_QUEUE_AVAIL_HIGH,
    VIRTIO_MMIO_QUEUE_AVAIL_LOW, VIRTIO_MMIO_QUEUE_DESC_HIGH, VIRTIO_MMIO_QUEUE_DESC_LOW,
    VIRTIO_MMIO_QUEUE_NOTIFY, VIRTIO_MMIO_QUEUE_NUM, VIRTIO_MMIO_QUEUE_NUM_MAX,
    VIRTIO_MMIO_QUEUE_READY, VIRTIO_MMIO_QUEUE_SEL, VIRTIO_MMIO_QUEUE_USED_HIGH,
    VIRTIO_MMIO_QUEUE_USED_LOW, VIRTIO_MMIO_STATUS, VIRTIO_MMIO_VENDOR_ID, VIRTIO_MMIO_VERSION,
};
pub(crate) use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
// `VirtioQueueError` is matched on in `drain.rs` and in `cfg(test)`
// test sub-files via `super::*;`; clippy --lib doesn't see those.
#[allow(unused_imports)]
pub(crate) use virtio_queue::Error as VirtioQueueError;
#[cfg(test)]
use virtio_queue::Queue;
// `QueueOwnedT::iter` is invoked by `drain.rs` and the test sub-files
// via `super::*;`; clippy --lib doesn't see those.
#[allow(unused_imports)]
pub(crate) use virtio_queue::QueueOwnedT;
#[cfg(not(test))]
use virtio_queue::QueueSync;
pub(crate) use virtio_queue::QueueT;
pub(crate) use vm_memory::{ByteValued, Bytes, GuestAddress, GuestMemory, GuestMemoryMmap};

// `VirtioBlkCounters` lives in `counters.rs`; reach it via `super::*`
// (sourced from `mod.rs`'s `pub(crate) use counters::*;`).
use super::VirtioBlkCounters;
use super::{Backing, advance_iovecs};
// `EpollEvent` / `EventSet` are re-exported because tests for the
// always-compiled `worker_dispatch_event` helper construct EventSet
// values directly via `super::*`, and the helper itself accepts an
// EventSet argument. clippy --lib doesn't see the test consumers,
// so the re-export looks unused without the allow.
#[allow(unused_imports)]
pub(crate) use vmm_sys_util::epoll::{EpollEvent, EventSet};
pub(crate) use vmm_sys_util::eventfd::EventFd;

pub(crate) use super::super::disk_config::DiskThrottle;

pub(crate) const MMIO_MAGIC: u32 = 0x7472_6976; // "virt" in LE
pub(crate) const MMIO_VERSION: u32 = 2; // virtio 1.x MMIO
pub(crate) const VENDOR_ID: u32 = 0;

/// MMIO region size: 4 KiB (one page).
pub const VIRTIO_MMIO_SIZE: u64 = 0x1000;

/// Single request queue. virtio-spec §5.2.2 declares one request
/// queue plus optional multiqueue (`VIRTIO_BLK_F_MQ`); MQ deferred.
pub(crate) const NUM_QUEUES: usize = 1;
pub(crate) const QUEUE_MAX_SIZE: u16 = 256;
pub(crate) const REQ_QUEUE: usize = 0;

/// Queue type used for the request virtqueue. Production uses
/// `QueueSync` (`Arc<Mutex<Queue>>` internally) so the vCPU thread
/// (MMIO config writes — set_size/ready/desc/avail/used addresses)
/// and the dedicated worker thread (drain_bracket_impl — pop, add_used,
/// needs_notification) can share the same queue state safely. Tests
/// run drain_bracket_impl inline on the caller thread so the bare `Queue`
/// (single-threaded, no internal lock) is sufficient and avoids
/// changing the test surface that drives `Queue` methods directly
/// (`disable_notification`, `set_avail_ring_address`, etc.).
///
/// The `QueueT` trait is the single API both implementations honour;
/// every drain-side method this file calls (pop_descriptor_chain,
/// add_used, disable/enable_notification, needs_notification,
/// avail_ring, event_idx_enabled) is part of `QueueT`. Generic
/// helpers like `publish_completion` are bound by `QueueT` so they
/// compile against either alias without further indirection.
#[cfg(not(test))]
pub(crate) type BlkQueue = QueueSync;
#[cfg(test)]
pub(crate) type BlkQueue = Queue;

/// Logical block size advertised to the guest. 512 bytes matches
/// the virtio spec default.
pub const VIRTIO_BLK_SECTOR_SIZE: u32 = 512;

/// Default capacity (256 MiB) used by virtio_blk tests. Mirrors the
/// 256-MiB default in `super::disk_config::DiskConfig::default`.
///
/// Sized for `mkfs.btrfs` minimum without `--mixed`: btrfs needs
/// ~109 MiB for single-profile metadata and ~256 MiB if it picks DUP
/// metadata (which is the default on a single-device fs). Sized
/// below 256 MiB risks `mkfs.btrfs` failing at template-build time.
///
/// `dead_code` allow: only consumed by `#[cfg(test)]` modules
/// (every virtio_blk test fixture passes this as the device's
/// capacity); clippy --lib doesn't see those references.
#[allow(dead_code)]
pub const VIRTIO_BLK_DEFAULT_CAPACITY_BYTES: u64 = 256 * 1024 * 1024;

/// Maximum number of data segments per request the device supports.
/// virtio-v1.2 §5.2.4: `seg_max` is the max scatter-gather buffer
/// count, exclusive of the header and status descriptors. Without
/// `F_SEG_MAX` the guest defaults `max_segments` to 1, which forces
/// `bio_split` and serializes large requests; advertising 128 is the
/// firecracker default and ample for the small files this device
/// targets.
pub(crate) const VIRTIO_BLK_SEG_MAX: u32 = 128;

/// Maximum size in bytes of a single descriptor's data buffer.
/// virtio-v1.2 §5.2.4 (`size_max`): caps per-descriptor length so a
/// guest can't submit a single 4 GB descriptor and force the device
/// to allocate a matching `Vec<u8>` for `read_at`/`write_at`. 1 MiB
/// matches firecracker's default and is far above what the guest's
/// blk-mq layer typically generates (max_sectors_kb defaults to
/// 512 KiB). Without `F_SIZE_MAX` the guest treats per-descriptor
/// length as unbounded — host OOM hazard on a hostile guest.
pub(crate) const VIRTIO_BLK_SIZE_MAX: u32 = 1 << 20;

/// Linux's maximum iovec count (`UIO_MAXIOV`) for a single
/// `preadv`/`pwritev`: passing `iovcnt > IOV_MAX` returns `EINVAL`.
/// The vectored-IO paths build at most `SEG_MAX * 2 = 256` iovecs
/// (see the SAFETY comments at the preadv/pwritev sites for the
/// structural derivation), well under this cap; the `debug_assert`s
/// there trip in test/debug builds if a future change ever breaks
/// that bound.
const IOV_MAX: usize = 1024;

/// Cap on consecutive `EINTR` re-issues of a single blk vectored IO
/// (`preadv`/`pwritev`) before giving up with `S_IOERR`. `EINTR` on a
/// buffered regular-file path implies a PENDING FATAL signal (the
/// wait is `TASK_KILLABLE`, so only `SIGKILL`/group-exit interrupts),
/// which does NOT clear — an unbounded retry would spin until the task
/// is reaped, delaying the freeze-rendezvous failure dump. 16 is
/// generous for any (rare) catchable-signal case while bounding the
/// fatal-signal spin to a handful of iterations.
///
/// Divergence (we get this right): firecracker / vm-memory's
/// `retry_eintr!` loops on `EINTR` UNBOUNDED — on a pending fatal
/// signal that is a latent spin. We bound it: a spinning IO thread
/// delays the freeze-rendezvous dump (the project's vCPU-blocking
/// budget).
const MAX_EINTR_RETRIES: u32 = 16;

/// Device serial number returned by `VIRTIO_BLK_T_GET_ID`. Per
/// virtio-v1.2 §5.2.6.4 (and `virtio_blk.h` `VIRTIO_BLK_ID_BYTES`)
/// the kernel driver passes a 20-byte buffer (`virtblk_get_id` →
/// `blk_rq_map_kern(req, id_str, VIRTIO_BLK_ID_BYTES, GFP_KERNEL)`,
/// drivers/block/virtio_blk.c). The string is exposed at
/// `/sys/block/<dev>/serial` after `serial_show` reads it from the
/// device. The 16-byte payload `ktstr-virtio-blk` is null-padded to
/// 20 bytes; the trailing zeros let `serial_show`'s
/// `strlen(buf)` (after the kernel's `buf[VIRTIO_BLK_ID_BYTES] =
/// '\0'` sentinel) terminate at the first NUL.
pub(crate) const VIRTIO_BLK_SERIAL: [u8; VIRTIO_BLK_ID_BYTES as usize] =
    *b"ktstr-virtio-blk\0\0\0\0";

/// Request out-header. virtio-v1.2 §5.2.6: every request chain
/// starts with a device-readable, 16-byte header carrying the
/// request type, ioprio (ignored), and starting sector. The struct
/// matches virtio_bindings::virtio_blk::virtio_blk_outhdr field for
/// field — it is redeclared here so we can attach `ByteValued` (the
/// bindings struct does not implement it) and use `Bytes::read_obj`
/// directly. `repr(C)` + integer-only fields satisfy the
/// `ByteValued` invariants (§ vm-memory bytes.rs trait docs).
#[repr(C)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkOutHdr {
    /// `VIRTIO_BLK_T_*`. LE per virtio-v1.2 §5.2.6.
    pub(crate) type_: u32,
    /// I/O priority, ignored on this device.
    pub(crate) _ioprio: u32,
    /// Starting sector (512-byte units).
    pub(crate) sector: u64,
}

// SAFETY: VirtioBlkOutHdr is `repr(C)`, contains only `u32` and `u64`
// (themselves `ByteValued`), has no padding (4+4+8 = 16, all aligned),
// and any byte pattern is a valid value (the type/ioprio fields are
// validated separately by the request dispatcher; sector is just a
// number). All `ByteValued` requirements are met.
unsafe impl vm_memory::ByteValued for VirtioBlkOutHdr {}

/// Header size for `VirtioBlkOutHdr`. virtio-v1.2 §5.2.6:
/// type:u32, ioprio:u32, sector:u64.
pub(crate) const VIRTIO_BLK_OUTHDR_SIZE: usize = std::mem::size_of::<VirtioBlkOutHdr>();

/// Legacy CHS geometry sub-struct of `VirtioBlkConfig`, gated on
/// `VIRTIO_BLK_F_GEOMETRY`. Mirrors the kernel uapi
/// `struct virtio_blk_geometry` (cylinders:u16, heads:u8, sectors:u8 —
/// 4 bytes total) at config-space offset 0x10. We don't advertise
/// `F_GEOMETRY` so the field is left zero; the guest driver reads it
/// via `virtio_cread_feature`, which returns `-ENOENT` when the
/// feature bit is not negotiated and the read is skipped.
#[repr(C, packed)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkGeometry {
    pub(crate) cylinders: u16,
    pub(crate) heads: u8,
    pub(crate) sectors: u8,
}

/// Block-device config space (virtio-v1.2 §5.2.4). Mirrors the kernel
/// uapi `struct virtio_blk_config` field-for-field up through
/// `blk_size` (the last field whose feature bit this device
/// advertises). Trailing fields (topology, MQ, discard, write-zeroes,
/// secure-erase, zoned) are gated on feature bits we don't advertise,
/// so the guest driver's `virtio_cread_feature` returns `-ENOENT` for
/// those reads and never depends on the device-side bytes — we serve
/// zeros for any read past `size_of::<VirtioBlkConfig>()`, matching
/// virtio-v1.2 §4.2.2.2 ("reads past the populated config layout
/// return zero").
///
/// The kernel struct is `__attribute__((packed))` (see
/// `include/uapi/linux/virtio_blk.h`), so this redeclaration uses
/// `repr(C, packed)` to match the wire layout byte-for-byte. Without
/// the `packed` attribute the compiler would insert padding after
/// `seg_max` to align `geometry` (which contains a `u16`) — that
/// padding would shift `blk_size` from offset 0x14 to 0x18 and serve
/// the guest a wrong block-size value silently.
#[repr(C, packed)]
#[derive(Copy, Clone, Default, Debug)]
pub(crate) struct VirtioBlkConfig {
    /// Capacity in 512-byte sectors. Always populated; the kernel
    /// driver reads this unconditionally (no feature bit gates it).
    pub(crate) capacity: u64,
    /// Maximum per-descriptor data length, gated on
    /// `VIRTIO_BLK_F_SIZE_MAX`.
    pub(crate) size_max: u32,
    /// Maximum scatter-gather segments per request, gated on
    /// `VIRTIO_BLK_F_SEG_MAX`.
    pub(crate) seg_max: u32,
    /// Legacy CHS geometry, gated on `VIRTIO_BLK_F_GEOMETRY`. We
    /// don't advertise that bit so this field is left zero.
    pub(crate) geometry: VirtioBlkGeometry,
    /// Logical block size, gated on `VIRTIO_BLK_F_BLK_SIZE`.
    pub(crate) blk_size: u32,
}

// SAFETY: `VirtioBlkConfig` and `VirtioBlkGeometry` are
// `repr(C, packed)`. With `packed` the alignment is 1 and there is no
// inter-field padding by definition (every field is byte-aligned). All
// fields are integer types (`u8`, `u16`, `u32`, `u64`) for which every
// bit pattern is a valid value, so reading arbitrary bytes into the
// struct yields a well-defined value. The struct is `Copy`, `Send`,
// and `Sync` (all primitives), satisfying the `ByteValued` supertrait
// bounds. Total size is verified against the kernel uapi layout by
// the `VIRTIO_BLK_CONFIG_SIZE` const assertion below.
unsafe impl vm_memory::ByteValued for VirtioBlkConfig {}
// SAFETY: same justification as `VirtioBlkConfig`. `VirtioBlkGeometry`
// is `repr(C, packed)` with three integer fields (`u16`, `u8`, `u8`),
// no padding, all bit patterns valid, `Copy + Send + Sync`.
unsafe impl vm_memory::ByteValued for VirtioBlkGeometry {}

/// Size of the populated portion of block config space (24 bytes:
/// capacity 8 + size_max 4 + seg_max 4 + geometry 4 + blk_size 4).
/// Reads at config-space offsets `>= VIRTIO_BLK_CONFIG_SIZE` return
/// zero per virtio-v1.2 §4.2.2.2.
pub(crate) const VIRTIO_BLK_CONFIG_SIZE: usize = std::mem::size_of::<VirtioBlkConfig>();
// Compile-time check that the struct layout matches the kernel uapi
// byte budget (8+4+4+4+4 = 24). A mismatch here means either Rust's
// `repr(C, packed)` introduced a divergence from the kernel's
// `__attribute__((packed))` layout, or a field was added/removed —
// in either case the guest would read garbage from a misaligned
// field. Failing to compile is preferable to silently serving wrong
// bytes.
const _: () = assert!(VIRTIO_BLK_CONFIG_SIZE == 24);
// Field-offset checks: the kernel driver reads each field at a
// specific offset via `virtio_cread`. If `repr(C, packed)` ever
// drifts from the kernel's `__attribute__((packed))` layout, these
// asserts catch it at compile time before a wrong-offset bug ships
// to the guest.
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, capacity) == 0x00);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, size_max) == 0x08);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, seg_max) == 0x0C);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, geometry) == 0x10);
const _: () = assert!(std::mem::offset_of!(VirtioBlkConfig, blk_size) == 0x14);

/// One descriptor from a virtio request chain. Used uniformly for
/// every chain role — header, data segments, and status — so the
/// chain-walk code can collect all descriptors into one buffer and
/// then index by position (first = header, middle = data, last =
/// status).
///
/// `is_write_only` mirrors `desc.is_write_only()` from
/// `virtio_queue` (i.e. VRING_DESC_F_WRITE set in the descriptor):
/// device-writable for read data segments and the status byte;
/// device-readable for the request header and write data segments.
#[derive(Clone, Copy, Debug)]
pub(crate) struct ChainDescriptor {
    pub(crate) addr: GuestAddress,
    pub(crate) len: u32,
    pub(crate) is_write_only: bool,
}

/// Status bits required before each phase (mirrors virtio_console).
pub(crate) const S_ACK: u32 = VIRTIO_CONFIG_S_ACKNOWLEDGE;
pub(crate) const S_DRV: u32 = S_ACK | VIRTIO_CONFIG_S_DRIVER;
pub(crate) const S_FEAT: u32 = S_DRV | VIRTIO_CONFIG_S_FEATURES_OK;
/// Test helper — terminal state bits with DRIVER_OK set.
#[cfg(test)]
pub(crate) const S_OK: u32 = S_FEAT | VIRTIO_CONFIG_S_DRIVER_OK;

// Token-bucket throttle primitives live in `throttle`. Pulled out as
// a submodule so the throttle's adversarial test surface (which is
// the most-exercised piece of the device) sits next to its tests
// rather than scattered through the device + worker code. See
// `throttle.rs` for the full type-level rationale.
use super::throttle::*;

/// Publish a chain completion: write the status byte and, on
/// success, mark the chain used. Returns `true` if the device
/// should signal the guest (used-ring index advanced); `false`
/// otherwise.
///
/// Status-write-success gate: `add_used` is called ONLY after a
/// successful status-byte write. Publishing a completion the guest
/// can't observe (status write failed but used-ring advanced) would
/// let the guest's `virtblk_done` read its `vbr->in_hdr.status`
/// byte that's stale from prior blk-mq tag use (initially zero from
/// `__GFP_ZERO` at allocation, stale on reuse) as `BLK_STS_OK`
/// (drivers/block/virtio_blk.c `virtblk_vbr_status` +
/// `virtblk_result(0)`) — silent data corruption for reads, silent
/// dropped writes for writes. On status-write failure the chain
/// stays in the avail ring; virtio_blk has no `mq_ops->timeout`
/// (drivers/block/virtio_blk.c `virtio_mq_ops`), so the guest hangs
/// on this request until `kernel.hung_task_timeout_secs` (default
/// 120 s) fires or a higher-layer retries.
///
/// `used_len` is what `add_used` records as the "bytes written by
/// the device into guest memory". Error paths pass `1` (just the
/// status byte). The success path passes the data-bytes-written
/// total + 1 (for reads) or `1` (for writes/flushes — the device
/// wrote no data back).
///
/// `label` is included in any tracing::warn from this function so
/// operators can identify which gate triggered the publish.
///
/// `too_many_arguments` allow: every parameter is independent
/// per-request state (queue/memory binding, head index, status
/// address+byte, used-len, label) sourced from a different point
/// in the chain-handling pipeline. Bundling would build a struct
/// for one call seam.
#[allow(clippy::too_many_arguments)]
pub(crate) fn publish_completion<Q: QueueT>(
    mem: &GuestMemoryMmap,
    q: &mut Q,
    counters: &VirtioBlkCounters,
    head: u16,
    status_addr: GuestAddress,
    status_byte: u8,
    used_len: u32,
    label: &'static str,
) -> bool {
    if mem.write_slice(&[status_byte], status_addr).is_err() {
        // Status-byte write failed — the chain stays in the avail
        // ring. virtio_blk has no `mq_ops->timeout`, so the guest
        // hangs on this request until `kernel.hung_task_timeout_secs`
        // (default 120 s) fires or a higher-layer retries.
        // Bump io_errors so the host operator sees a counter for
        // every silent-stall event. Error-site callers also bump
        // io_errors before reaching here; the double-count is
        // intentional under hostile-guest scenarios — a guest
        // constructing chains with systematically unmapped
        // status_addr will double-count every request, but the
        // silent-stall it prevents on the success path is the
        // worse failure mode. Silent-swallow on the success path
        // (FLUSH or T_IN/T_OUT/T_GET_ID succeeded but the status
        // descriptor itself is unmapped) would otherwise produce
        // a host-side silent stall — virtio_blk has no
        // `mq_ops->timeout` callback, so blk-mq alone never
        // surfaces the unpublished request as an error; only the
        // guest's hung-task watchdog fires (default 120 s) — and
        // without this counter bump operators would have no
        // host-side signal until the watchdog message hits dmesg.
        counters.record_io_error();
        return false;
    }
    match q.add_used(mem, head, used_len) {
        Ok(()) => true,
        Err(e) => {
            tracing::warn!(head, %e, label, "virtio-blk add_used failed");
            counters.record_io_error();
            false
        }
    }
}

// `VirtioBlkCounters` and its `record_*` mutators / `pub fn` readers
// live in `counters.rs`; reach them via the `super::*;` glob (which
// sources from `mod.rs`'s `pub(crate) use counters::*;`). Pulled out
// for module locality so the per-helper invariants and the
// failure-dump-renderer-relevant counter taxonomy doc sit together.

// ----------------------------------------------------------------------------
// Device struct
// ----------------------------------------------------------------------------

/// Worker-thread-owned mutable state. In production this lives on
/// the dedicated worker thread for the device's lifetime; in test
/// builds it lives directly inside `BlkWorker` (Inline mode) so the
/// existing test surface — which calls `process_requests`
/// synchronously and immediately reads back state via
/// `dev.worker.state_mut().ops_bucket` etc. — keeps working.
///
/// The MMIO-side state (interrupt_status, irq_evt, mem, FSM bits)
/// stays on `VirtioBlk` and is shared with the worker via Arc.
pub(crate) struct BlkWorkerState {
    /// Backing file. The worker reads and writes sectors via
    /// `pread`/`pwrite` and never inspects the on-disk contents.
    pub(crate) backing: Box<dyn Backing>,
    /// Token-bucket for ops/sec.
    pub(crate) ops_bucket: TokenBucket,
    /// Token-bucket for bytes/sec.
    pub(crate) bytes_bucket: TokenBucket,
    /// Reusable scratch for the descriptor-walk in `drain_bracket_impl`.
    /// Allocated once at construction with capacity
    /// `VIRTIO_BLK_SEG_MAX + 2` and `clear()`-ed each iteration so
    /// the underlying capacity is reused. Avoids one Vec
    /// allocation per request on the hot path. The push loop in
    /// `drain_bracket_impl` is hard-capped at this capacity — a
    /// hostile guest submitting a chain longer than `seg_max + 2`
    /// gets dropped before status-extraction (an oversized chain
    /// cannot be reliably IOERR-completed because the capped view
    /// loses sight of the chain's true status descriptor). The
    /// data-segment slice given to the handlers is borrowed
    /// directly from `&state.all_descs_scratch[1..chain_len - 1]`
    /// once `status_addr` has been validated — no second Vec, no
    /// copy.
    pub(crate) all_descs_scratch: Vec<ChainDescriptor>,
    /// Reusable per-segment IO buffer. Sized by `resize(len, 0)`
    /// per segment in the read/write handlers. Allocated once and
    /// reused across all segments of all requests; the underlying
    /// `Vec`'s capacity grows monotonically up to
    /// `VIRTIO_BLK_SIZE_MAX` (the per-descriptor cap we advertise),
    /// at which point all subsequent IO is amortized to zero
    /// allocation.
    pub(crate) io_buf_scratch: Vec<u8>,
    /// Capacity in bytes. Computed once at construction
    /// (`capacity_sectors * VIRTIO_BLK_SECTOR_SIZE`) and threaded
    /// into handlers so the multiply isn't repeated per request and
    /// can never overflow on a malicious sector value (the multiply
    /// happens once on host-trusted input).
    pub(crate) capacity_bytes: u64,
    /// Read-only mode. When `true`, the device advertises
    /// `VIRTIO_BLK_F_RO`. `VIRTIO_BLK_T_OUT` requests are rejected
    /// with `VIRTIO_BLK_S_IOERR`; `VIRTIO_BLK_T_FLUSH` requests
    /// return `VIRTIO_BLK_S_OK` (a no-op flush — there's no dirty
    /// data to flush, and a guest issuing a precautionary flush
    /// during mount-readonly should not see an error). Per
    /// virtio-v1.2 §5.2.5.1, when `F_RO` is negotiated the device
    /// is read-only and the guest driver SHOULD treat the device
    /// as read-only; how the driver chooses to do that is
    /// driver business — on Linux `open(2)` is not gated and
    /// writes fail with `EPERM` (no `open`-time error). The
    /// in-device rejection is defense
    /// against a malicious or buggy guest that ignores the
    /// negotiated feature bit.
    pub(crate) read_only: bool,
    /// Counters. `Arc` so external monitor observers can read them
    /// without holding any device borrow; the worker mutates via
    /// the same `Arc`.
    pub(crate) counters: Arc<VirtioBlkCounters>,
    /// Per-worker "is the head-of-queue chain currently stalled?"
    /// flag. Owned by `drain_bracket_impl`; the flag transitions
    /// gate the live gauge updates on the shared `counters` Arc:
    ///
    /// - `false → true` (transition into stall): bump
    ///   `currently_throttled_gauge` via
    ///   `counters.record_throttle_pending_inc()`.
    /// - `true → false` (transition out of stall): decrement via
    ///   `counters.record_throttle_pending_dec()`.
    /// - `true → true` (idempotent re-stall on the same head):
    ///   no gauge update; only `throttled_count` (events)
    ///   advances.
    /// - `false → false` (normal completion without prior stall):
    ///   no gauge update.
    ///
    /// Lives on `BlkWorkerState` (not on the shared counters Arc)
    /// because the transition logic is per-worker — only the
    /// thread that owns the drain knows which transition just
    /// happened. Reading the AtomicU64 gauge alone could not
    /// distinguish "first stall" from "re-stall on the same head"
    /// without per-worker state. Cfg-independent so both Inline
    /// and Spawned engines maintain the same invariant.
    pub(crate) currently_stalled: bool,
    /// Sticky "the queue is structurally broken; stop draining"
    /// flag. Set when the avail-ring iterator returns
    /// `Error::InvalidAvailRingIndex` — the avail.idx the guest
    /// published is more than `queue.size` ahead of the device's
    /// `next_avail`, which the virtio spec forbids
    /// (virtio-v1.2 §2.7.13.3: avail.idx advances monotonically
    /// at most `queue.size` ahead of the device-side cursor; an
    /// excursion beyond that distance is the structural-invariant
    /// violation `iter()` reports as `InvalidAvailRingIndex`).
    /// Without this flag, every subsequent `pop_descriptor_chain`
    /// would re-trip the same error and `enable_notification`
    /// would re-arm immediately, looping the worker forever
    /// against a hostile guest at full vCPU/host CPU cost.
    ///
    /// Once set, `drain_bracket_impl` short-circuits to `Done`
    /// without touching the queue at all — no
    /// `disable_notification`, no `iter`, no `enable_notification`.
    /// The flag clears only on a full virtio reset
    /// (`reset_engine_inline` / `respawn_worker` rebuilds the
    /// state with `queue_poisoned: false`), matching the device's
    /// `VIRTIO_CONFIG_S_NEEDS_RESET` (virtio-v1.2 §2.1.1 bit 0x40)
    /// behaviour: the device tells the guest "I need a reset before
    /// I can service IO" and the only escape is a STATUS=0 MMIO
    /// write. This converges with cloud-hypervisor's NEEDS_RESET
    /// path on hostile-guest queue corruption (NOT the FAILED status
    /// = 0x80, which is the orthogonal "driver gives up" exit per
    /// virtio-v1.2 §2.1.1 bit 0x80 — the framework is signalling
    /// "device needs reset", not "driver gave up").
    ///
    /// Per-worker (not on the shared counters Arc) because only
    /// the drain thread mutates it. Cfg-independent so both
    /// Inline and Spawned engines maintain the same invariant.
    pub(crate) queue_poisoned: bool,
}

/// Wraps the request-processing engine. In Inline mode (cfg(test))
/// the state lives in-line and `process_requests` runs the drain
/// synchronously on the caller thread — preserving the existing
/// 113-test surface that calls `process_requests` then immediately
/// reads back queue + counter state without crossing a thread
/// boundary. In Spawned mode (production) a dedicated worker thread
/// owns the state and is woken by `kick_fd`; the MMIO QUEUE_NOTIFY
/// handler writes 1 to `kick_fd` and returns immediately so the
/// vCPU thread is never blocked by the IO syscall.
///
/// `read_only` and `counters` are duplicated outside the engine so
/// MMIO accessors (`device_features` reads `read_only`, `counters()`
/// returns the Arc) can reach them without coordinating with the
/// worker. They are immutable after construction in Spawned mode and
/// kept in sync with the Inline branch's `BlkWorkerState`.
///
/// The shared resources the worker needs to drive the drain
/// (`Arc<BlkQueue>` queue, `Arc<EventFd>` irq_evt,
/// `Arc<AtomicU32>` interrupt_status, `Arc<OnceLock<GuestMemoryMmap>>` mem)
/// are stored on `VirtioBlk` and cloned into the worker thread at
/// spawn time; the worker holds independent Arc handles for the
/// duration of its run.
pub(crate) struct BlkWorker {
    pub(crate) queues: [BlkQueue; NUM_QUEUES],
    /// `read_only` flag, mirrored on the device side for
    /// `device_features` and direct test inspection
    /// (`dev.worker.read_only`). Set once at construction and never
    /// mutated.
    pub(crate) read_only: bool,
    /// Counters Arc shared with the worker thread; mirrored on the
    /// device side for `counters()` and direct test inspection.
    pub(crate) counters: Arc<VirtioBlkCounters>,
    /// Engine-mode-specific state.
    pub(crate) engine: WorkerEngine,
}

/// Implementation strategy for the request-processing engine.
pub(crate) enum WorkerEngine {
    /// Synchronous in-thread mode (cfg(test)). The drain runs on the
    /// caller thread when `process_requests` is called.
    #[cfg(test)]
    Inline(InlineEngine),
    /// Production mode: a dedicated worker thread owns the state
    /// and drives the drain on receipt of a kick eventfd write.
    #[cfg(not(test))]
    Spawned(SpawnedEngine),
}

/// Inline-mode engine state (cfg(test) only). Holds `BlkWorkerState`
/// directly so the existing test surface that reaches into
/// `dev.worker.<state field>` keeps compiling without renames.
#[cfg(test)]
pub(crate) struct InlineEngine {
    pub(crate) state: BlkWorkerState,
}

/// Test-only accessors: in `cfg(test)` the `BlkWorkerState` lives in
/// the Inline engine; tests reach in via `dev.worker.state_mut()` /
/// `dev.worker.state()` rather than walking the engine enum on every
/// access. The `match` is exhaustive against the single-variant cfg
/// — there is no Spawned variant to handle in test builds.
#[cfg(test)]
impl BlkWorker {
    pub(crate) fn state(&self) -> &BlkWorkerState {
        let WorkerEngine::Inline(engine) = &self.engine;
        &engine.state
    }
    pub(crate) fn state_mut(&mut self) -> &mut BlkWorkerState {
        let WorkerEngine::Inline(engine) = &mut self.engine;
        &mut engine.state
    }
}

/// Spawned-mode engine state (production only). The mutable
/// `BlkWorkerState` lives entirely on the worker thread; the device
/// retains only a kick eventfd, a stop eventfd, and the join handle.
/// The pause-eventfd write side lives on `VirtioBlk::pause_evt`
/// (cfg-independent) so `pause()` / `resume()` compile in `cfg(test)`
/// without an engine match — the worker's read clone is taken at
/// spawn time and consumed by `worker_thread_main`'s frame.
#[cfg(not(test))]
pub(crate) struct SpawnedEngine {
    /// Eventfd written by `mmio_write(QUEUE_NOTIFY, …)`; the worker
    /// epoll-waits on it and runs one drain iteration per signal.
    /// Counter-mode (no `EFD_SEMAPHORE` flag) so coalesced kicks
    /// produce one wakeup. Configured `EFD_NONBLOCK` so neither the
    /// vCPU `write(1)` nor the worker `read()` ever blocks.
    pub(crate) kick_fd: EventFd,
    /// Eventfd written by `Drop::drop`; worker reads it and exits.
    /// Counter-mode + `EFD_NONBLOCK`. The worker checks both fds in
    /// the same `epoll_wait` call so a stop signal supersedes any
    /// pending kick.
    pub(crate) stop_fd: EventFd,
    /// Worker thread join handle. Wrapped in `Option` so `Drop`
    /// and `reset()` can `take()` and `join()` it. None after the
    /// thread has been joined.
    ///
    /// The `BlkWorkerState` payload is yielded by
    /// `worker_thread_main` on STOP_TOKEN: `reset()` recovers it
    /// to rebuild fresh throttle buckets and re-spawn a worker
    /// against the post-`q.reset()` queue. `Drop` discards the
    /// returned state with `let _ = handle.join()`. Both paths
    /// observe the same return value; only the consumer differs.
    pub(crate) handle: Option<thread::JoinHandle<BlkWorkerState>>,
    /// State reclaimed from a quiesced worker, awaiting respawn at
    /// the next DRIVER_OK transition. `Some(_)` between
    /// `reset_engine_spawned` (which joins the old worker, captures
    /// its state, and stashes it here) and the guest's subsequent
    /// `STATUS = DRIVER_OK` MMIO write (which `set_status` consumes
    /// to re-spawn a fresh worker). `None` in all other steady
    /// states.
    ///
    /// # Why deferred
    ///
    /// Between `reset()` and DRIVER_OK the guest is rebinding —
    /// queue addresses are zeroed, `QUEUE_READY` is false, and any
    /// kick that lands hits the `queues[REQ_QUEUE].ready()` early
    /// return in `drain_bracket_impl`. A worker spawned eagerly in
    /// `reset()` would sit in `epoll_wait` consuming a thread for
    /// an indeterminate window — the guest's rebind sequence may
    /// take milliseconds to seconds depending on driver
    /// implementation. Deferring the spawn until DRIVER_OK lifts
    /// the cost only when there is real work to service. This
    /// matches cloud-hypervisor's "kill on reset, respawn on
    /// DRIVER_OK" pattern.
    ///
    /// # Race-free invariant
    ///
    /// Both `reset_engine_spawned` and `set_status` execute on the
    /// vCPU thread that received the MMIO write — `reset()` from
    /// `STATUS = 0` and `set_status` from `STATUS = …`. The two
    /// run sequentially within a single vCPU thread context, so
    /// the `respawn_pending` field has no concurrent reader/writer.
    /// A regression that moved either path off the vCPU thread
    /// would need to add explicit synchronisation.
    ///
    /// # Failure consequences
    ///
    /// If `reset_engine_spawned` populated `respawn_pending` but
    /// `respawn_worker` (called from `set_status` on DRIVER_OK)
    /// fails to construct fresh fds or spawn the thread, the
    /// device enters the same permanent-workerless state described
    /// in `respawn_worker`'s "Failure consequences" section. A
    /// reset that produces `respawn_pending = None` (the
    /// `stop_worker_and_reclaim_state` non-Joined outcomes) means
    /// no state to respawn from; the device is permanently dead.
    /// In either case `set_status` clears `respawn_pending` to
    /// avoid a stale state holding scratch buffers and the
    /// backing-file `File` handle alive past the device's
    /// effective lifetime.
    pub(crate) respawn_pending: Option<BlkWorkerState>,
}

/// Process-wide monotonic counter for VirtioBlk instance IDs. Used
/// to derive `instance_id` at construction so tracing logs name the
/// device with a stable small integer instead of a raw heap pointer.
/// Heap pointers expose ASLR offsets and process-layout details
/// (the `host_resource_snapshot` doc treats this kind of detail as
/// environment leakage); a per-process counter preserves the
/// "uniquely identify the device within this process run" property
/// that the diagnostics depend on without leaking the address.
pub(crate) static VIRTIO_BLK_INSTANCE_COUNTER: AtomicU64 = AtomicU64::new(0);

/// Virtio-block MMIO device.
pub struct VirtioBlk {
    pub(crate) queue_select: u32,
    pub(crate) device_features_sel: u32,
    pub(crate) driver_features_sel: u32,
    pub(crate) driver_features: u64,
    /// FSM state bits per virtio-v1.2 §3.1.1 plus the
    /// `VIRTIO_CONFIG_S_NEEDS_RESET` bit set by `drain_bracket_impl`
    /// when the avail ring becomes structurally invalid (the
    /// queue-poison path). `Arc<AtomicU32>` so the worker thread can
    /// fetch_or the NEEDS_RESET bit alongside its INT_CONFIG +
    /// `irq_evt.write(1)` poison-signal sequence; the vCPU thread
    /// reads `STATUS` via `load(Acquire)` from `mmio_read` and writes
    /// it via the FSM in `set_status` / `reset`. Atomic ordering
    /// taxonomy: `set_status` uses
    /// `compare_exchange(_, _, Release, Acquire)` for race-safe
    /// FSM advance against the worker's concurrent
    /// `fetch_or(NEEDS_RESET)` (the only RMW write site on
    /// device_status from the vCPU thread); `reset` uses
    /// `store(0, Release)`; vCPU reads use `load(Acquire)`
    /// (mmio_read, queue_config_allowed, features_write_allowed);
    /// the worker uses `fetch_or(NEEDS_RESET, SeqCst)` on the
    /// queue-poison path. Mirrors the [`Self::interrupt_status`]
    /// shape and rationale.
    pub(crate) device_status: Arc<AtomicU32>,
    /// Worker may be on a separate thread (production cfg) and the
    /// vCPU MMIO reader may race the worker's bit-set, so the value
    /// is wrapped in an `Arc` and updated with atomic ops. Worker
    /// writes the bit via `fetch_or(VIRTIO_MMIO_INT_VRING, Release)`
    /// alongside its `add_used` publish; vCPU `mmio_read` of
    /// `INTERRUPT_STATUS` does `load(Acquire)`; `INTERRUPT_ACK`
    /// clears bits via `fetch_and(!val, AcqRel)`. The Release/Acquire
    /// pair orders the bit set vs. the used-ring `add_used` (which
    /// itself publishes `used.idx` with Release internally), so a
    /// vCPU reading the bit set is guaranteed to also observe the
    /// freshly-published used.idx — no torn observation where the
    /// bit appears before the ring update.
    pub(crate) interrupt_status: Arc<AtomicU32>,
    /// `AtomicU32` for consistency with `interrupt_status`; v0 bumps
    /// only from `reset()` on the vCPU thread, not from any other
    /// thread (the worker thread does not touch config space). The
    /// atomic shape is defense-in-depth for future runtime config
    /// changes that might add a non-vCPU writer.
    pub(crate) config_generation: AtomicU32,
    /// Eventfd for KVM irqfd. Shared `Arc` so the worker thread
    /// (production cfg) can call `write(1)` to fire the IRQ without
    /// taking ownership away from the device. Tests run inline so
    /// the same Arc is read directly via `dev.irq_evt.read()`.
    pub(crate) irq_evt: Arc<EventFd>,
    /// Guest memory reference. Set before starting vCPUs via
    /// `set_mem`. Wrapped in `Arc<OnceLock<…>>` so the worker
    /// thread (production) can pick up `mem` post-construction
    /// without locking on every drain. `set_mem` is the only
    /// writer and KVM wiring guarantees it runs before any vCPU
    /// runs (i.e. before any QUEUE_NOTIFY can fire), so the
    /// reader-side `OnceLock::get` is lock-free in steady state
    /// and returns `&GuestMemoryMmap` directly — no clone needed.
    /// `reset()` does NOT clear `mem`: the same guest memory map
    /// is re-used across re-binds (matching the behaviour of the
    /// original `Mutex<Option<…>>` field, which `set_mem` only
    /// overwrote at boot).
    pub(crate) mem: Arc<OnceLock<GuestMemoryMmap>>,
    /// Capacity in 512-byte sectors. Determines what the guest sees
    /// in the config space's `capacity` field.
    pub(crate) capacity_sectors: u64,
    /// Request-processing state. In production a worker thread owns
    /// the underlying `BlkWorkerState`; in `cfg(test)` the state is
    /// inline so existing tests can read it back synchronously.
    pub(crate) worker: BlkWorker,
    /// One-shot guard so the "queue notify before set_mem"
    /// warning fires at most once per device instance.
    /// Without this, a buggy caller that issues N notifies before
    /// `set_mem` would flood the log with N copies of the same
    /// message. Latched with Relaxed because the order of the
    /// log message vs. other operations doesn't affect
    /// correctness. `Arc` so the worker thread can also
    /// access-and-latch the same flag (production: the warn fires
    /// the first time the worker observes the unset mem during a
    /// drain).
    pub(crate) mem_unset_warned: Arc<AtomicBool>,
    /// Original throttle configuration. Stored so `reset()` can
    /// rebuild fresh `TokenBucket`s on the respawned worker. Per
    /// virtio-v1.2 §2.1 a reset returns the device to its initial
    /// state, which includes the throttle bucket fill: an
    /// adversarial guest must not be able to drain the bucket and
    /// then issue a reset to bypass the rate limit. `DiskThrottle`
    /// is `Copy` (a pair of `Option<NonZeroU64>`) so this is cheap
    /// to keep around.
    pub(crate) throttle: DiskThrottle,
    /// Stable per-device monotonic identifier from
    /// [`VIRTIO_BLK_INSTANCE_COUNTER`]. Replaces the previous
    /// `self as *const _ as usize` heap-pointer field for tracing
    /// log correlation: pointers fingerprint the host's ASLR
    /// layout, an integer counter does not.
    pub(crate) instance_id: u64,
    /// Pause eventfd (host-side handle). [`Self::pause`] writes 1 to
    /// signal the worker; the worker reads the counter and parks on
    /// [`Self::paused`]. Shared `Arc` because the worker owns a clone
    /// for its epoll registration and the device retains this handle
    /// for `pause()`/`resume()` calls from the freeze coordinator.
    /// Cfg-independent so [`Self::pause`] / [`Self::resume`] compile
    /// in `cfg(test)` builds (where the inline engine is a no-op
    /// because the worker thread does not exist).
    pub(crate) pause_evt: Arc<EventFd>,
    /// Worker-parked indicator. Set to `true` by the worker thread
    /// after it drains `pause_fd` and is parked in the
    /// `park_timeout`-loop; the freeze coordinator polls this with
    /// `load(Acquire)` to confirm the worker has reached its parked
    /// state before reading guest memory. Cleared by [`Self::resume`]
    /// (Release-store of `false`); the worker's `park_timeout(10ms)`
    /// observes the clear within 10 ms and resumes its `epoll_wait`
    /// loop.
    pub(crate) paused: Arc<AtomicBool>,
    /// Optional shared parked_evt the worker writes to alongside
    /// the `paused.store(true, Release)` so the freeze
    /// coordinator's rendezvous wakes within microseconds of the
    /// last parker rather than spinning. `None` when no freeze
    /// coordinator is plumbed (test paths). The freeze coordinator
    /// sets this on every device via [`Self::set_parked_evt`]
    /// before the first `pause()` call. Counter-mode EventFd
    /// (NOT EFD_SEMAPHORE): a single drain absorbs any number of
    /// coalesced parker writes.
    pub(crate) parked_evt: Arc<std::sync::Mutex<Option<Arc<EventFd>>>>,
    /// Per-thread CPU placement applied at the top of
    /// `worker_thread_main` before the worker enters its `epoll_wait`
    /// loop. Mirrors the host topology's perf-mode (`pin_target`) and
    /// `--cpu-cap` no-perf (`no_perf_cpus`) split: at most one of the
    /// two is `Some`, both `None` means inherit the parent thread's
    /// affinity (no placement applied). Set via
    /// [`Self::set_worker_placement`] after `with_options`; defaults
    /// to all-`None` so the device works in test fixtures and call
    /// sites that don't supply topology data.
    pub(crate) worker_placement: WorkerPlacement,
}

/// CPU placement for the virtio-blk worker thread. Threaded into
/// `worker_thread_main` and applied via `pin_current_thread` /
/// `set_thread_cpumask` at the top of the worker before entering
/// `epoll_wait`. Mutually exclusive: perf-mode picks a single CPU,
/// `--cpu-cap` no-perf picks an LLC mask, both `None` means inherit
/// the parent thread's affinity (the test/inline path).
// Fields are read by `worker_thread_main` which is itself
// `#[cfg(not(test))]` (worker.rs), so under `cargo check
// --tests` no reader exists and the fields look dead. The
// production path consumes them — keep the allow.
#[allow(dead_code)]
#[derive(Debug, Clone, Default)]
pub struct WorkerPlacement {
    /// Single CPU pin (perf-mode). Equivalent to
    /// `pin_current_thread(cpu, "virtio-blk worker")`.
    pub service_cpu: Option<usize>,
    /// CPU mask (no-perf + `--cpu-cap`). Equivalent to
    /// `set_thread_cpumask(cpus, "virtio-blk worker")`.
    pub no_perf_cpus: Option<Vec<usize>>,
}

impl VirtioBlk {
    /// Create a new virtio-block device.
    ///
    /// `backing` is an open File for read+write at sector
    /// granularity (the host formatted it before VM boot).
    /// `capacity_bytes` is the disk capacity advertised to the
    /// guest (rounded down to sector boundary). `throttle` carries
    /// optional IOPS / bandwidth limits.
    ///
    /// `dead_code` allow: only consumed by `#[cfg(test)]` modules;
    /// production callers go through [`Self::with_options`] to set
    /// the read-only flag explicitly.
    #[allow(dead_code)]
    pub fn new(backing: File, capacity_bytes: u64, throttle: DiskThrottle) -> Self {
        Self::with_options(backing, capacity_bytes, throttle, false)
    }

    /// Like [`Self::new`] plus a `read_only` knob. When `read_only`
    /// is `true`, the device advertises `VIRTIO_BLK_F_RO` and
    /// rejects writes regardless of guest behaviour (defense
    /// against a guest that ignores the negotiated feature bit).
    ///
    /// `capacity_bytes` smaller than one sector is clamped to
    /// `capacity_sectors = 0`, producing a 0-sector disk that
    /// IOERRs every request. The device cannot represent a
    /// fractional sector, so a sub-sector allocation is operator
    /// error — log it, continue, and let the existing zero-capacity
    /// reject path surface the failure to the guest.
    pub fn with_options(
        backing: File,
        capacity_bytes: u64,
        throttle: DiskThrottle,
        read_only: bool,
    ) -> Self {
        let irq_evt = Arc::new(
            EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk irq eventfd"),
        );
        if capacity_bytes < VIRTIO_BLK_SECTOR_SIZE as u64 && capacity_bytes != 0 {
            tracing::warn!(
                capacity_bytes,
                sector_size = VIRTIO_BLK_SECTOR_SIZE,
                "virtio-blk capacity_bytes smaller than one sector; clamping \
                 capacity_sectors to 0 (every IO will be rejected)"
            );
        }
        let capacity_sectors = capacity_bytes / VIRTIO_BLK_SECTOR_SIZE as u64;
        let capacity_bytes = capacity_sectors * VIRTIO_BLK_SECTOR_SIZE as u64;
        let (ops_bucket, bytes_bucket) = buckets_from_throttle(throttle);
        let counters = Arc::new(VirtioBlkCounters::default());

        let state = BlkWorkerState {
            // Wrap the production `File` in the `Backing` seam (unsizing
            // coercion `Box<File>` -> `Box<dyn Backing>`). Tests swap
            // `state.backing` for a fault-injecting mock to exercise
            // the write retry / IOERR paths a real File rarely hits.
            backing: Box::new(backing),
            ops_bucket,
            bytes_bucket,
            all_descs_scratch: Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2),
            io_buf_scratch: Vec::new(),
            capacity_bytes,
            read_only,
            counters: Arc::clone(&counters),
            currently_stalled: false,
            queue_poisoned: false,
        };

        let interrupt_status = Arc::new(AtomicU32::new(0));
        let device_status = Arc::new(AtomicU32::new(0));
        let mem = Arc::new(OnceLock::new());
        let mem_unset_warned = Arc::new(AtomicBool::new(false));
        // Pause primitives (failure-dump rendezvous). The
        // `pause_evt` host handle is kept on the `VirtioBlk` for
        // `pause()`/`resume()`; in production a clone of its read
        // side becomes the `pause_fd` registered in the worker's
        // epoll. `paused` is the worker-set / coordinator-cleared
        // ack flag the freeze rendezvous polls. Both Arcs are
        // cfg-independent so the test-mode `pause`/`resume`
        // accessors compile without engine-conditional plumbing
        // (the test-mode worker is inline, so they observe the
        // same eventfd state without an active worker thread).
        let pause_evt = Arc::new(
            EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk pause eventfd"),
        );
        // Initialise to `true` so the freeze coordinator's
        // `is_paused()` poll passes vacuously while no worker is
        // alive — the initial spawn is deferred to DRIVER_OK
        // (see `respawn_pending` engine plumbing below), so any
        // freeze that fires between `with_options` and the first
        // DRIVER_OK MMIO write would otherwise time out at
        // FREEZE_RENDEZVOUS_TIMEOUT (30 s) waiting for a worker
        // that does not exist. The worker's first action inside
        // `worker_thread_main` (after affinity setup, before
        // entering `epoll_wait`) is a Release-store of `false`,
        // which makes the rendezvous start observing real
        // worker state from the moment the worker is genuinely
        // ready to service kicks. Cloud-hypervisor uses the
        // same "paused on construction, cleared by activate"
        // invariant in epoll_helper.rs.
        let paused = Arc::new(AtomicBool::new(true));

        // Build the queue. Production uses `QueueSync` (Arc<Mutex<Queue>>
        // internally) so the vCPU MMIO config writes and the worker
        // thread's drain can share the same queue state. Tests use the
        // bare `Queue` so the existing test surface that drives queue
        // methods directly via `dev.worker.queues[REQ_QUEUE].…` keeps
        // working without a runtime lock.
        let queues = [BlkQueue::new(QUEUE_MAX_SIZE).expect("valid queue size")];

        // Build the engine. cfg(test) keeps the state inline so the
        // existing test surface drives drain_bracket_impl synchronously;
        // cfg(not(test)) spawns a dedicated worker thread that owns
        // the state and waits for kick eventfd writes from
        // `process_requests`.
        #[cfg(test)]
        let engine = WorkerEngine::Inline(InlineEngine { state });

        #[cfg(not(test))]
        let engine = {
            // Counter-mode eventfds (no EFD_SEMAPHORE). EFD_NONBLOCK so
            // the vCPU `write(1)` to kick_fd never blocks even under
            // pathological backpressure (the worker has fallen behind
            // by more than u64::MAX-1 kicks — implausible under any
            // realistic workload, but the non-blocking flag keeps the
            // failure mode "EAGAIN, drop the spurious kick" instead of
            // "vCPU thread blocks on eventfd write").
            let kick_fd =
                EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk kick eventfd");
            let stop_fd =
                EventFd::new(libc::EFD_NONBLOCK).expect("failed to create virtio-blk stop eventfd");
            // Defer the initial worker spawn to the guest's first
            // DRIVER_OK transition (set_status → consume_pending_respawn
            // → respawn_worker). Stashing the seed `BlkWorkerState` in
            // `respawn_pending` collapses the initial-spawn path into
            // the existing respawn path, which `respawn_worker` already
            // implements correctly (placement applied via
            // `self.worker_placement` clone, fresh kick/stop/pause fds
            // built per spawn). This gives `set_worker_placement` a
            // race-free window between construction and DRIVER_OK in
            // which to override the default placement; without
            // deferral the initial worker would spawn with the default
            // placement before setup.rs's setter call could land.
            //
            // Pre-DRIVER_OK kicks land on the now-detached `kick_fd`
            // and accumulate harmlessly; the first post-DRIVER_OK
            // worker observes the queue's `ready()` flag and processes
            // any pre-existing chain state. The kernel's virtio-mmio
            // bind sequence (drivers/virtio/virtio_mmio.c
            // `virtio_mmio_probe` → `vp_finalize_features` →
            // `vm_setup_vq` → `STATUS=DRIVER_OK`) does not fire
            // QUEUE_NOTIFY before DRIVER_OK, so accumulation is
            // bounded at zero in the production path.
            WorkerEngine::Spawned(SpawnedEngine {
                kick_fd,
                stop_fd,
                handle: None,
                respawn_pending: Some(state),
            })
        };

        let worker = BlkWorker {
            queues,
            read_only,
            counters,
            engine,
        };

        VirtioBlk {
            queue_select: 0,
            device_features_sel: 0,
            driver_features_sel: 0,
            driver_features: 0,
            device_status,
            interrupt_status,
            config_generation: AtomicU32::new(0),
            irq_evt,
            mem,
            capacity_sectors,
            worker,
            mem_unset_warned,
            throttle,
            instance_id: VIRTIO_BLK_INSTANCE_COUNTER.fetch_add(1, Ordering::Relaxed),
            pause_evt,
            paused,
            parked_evt: Arc::new(std::sync::Mutex::new(None)),
            worker_placement: WorkerPlacement::default(),
        }
    }

    /// Plumb the freeze coordinator's shared parked_evt into this
    /// device. The worker writes to this fd alongside its
    /// `paused.store(true, Release)` so the coordinator's
    /// rendezvous wakes within microseconds of the worker
    /// parking. Called once by `run_vm` before any pause()/resume()
    /// fires; subsequent worker respawns pick up the same fd via
    /// the shared `Arc`.
    ///
    /// `None` is the default — test paths and any future device
    /// without a freeze coordinator skip the wake. The worker
    /// reads through this slot lazily so a setter call AFTER worker
    /// spawn (e.g. plumbing arrives late) still takes effect on
    /// the next pause cycle.
    pub fn set_parked_evt(&self, evt: Arc<EventFd>) {
        if let Ok(mut guard) = self.parked_evt.lock() {
            *guard = Some(evt);
        }
    }

    /// Configure the per-thread CPU placement applied at the top of
    /// the worker's main loop. Mirrors the `set_mem` setter pattern:
    /// called once after `with_options` / `new`, before the device
    /// starts servicing kicks. The placement is captured by the
    /// next worker-thread spawn — and because the initial spawn is
    /// DEFERRED to the guest's first `STATUS = DRIVER_OK` MMIO
    /// write (the seed `BlkWorkerState` lives in `respawn_pending`
    /// until then), a setter call between `with_options` and that
    /// DRIVER_OK transition lands on the very first worker. After
    /// the worker has started, calling this has no effect on the
    /// running thread — only respawned workers pick up the new
    /// placement, matching cloud-hypervisor's "topology applied at
    /// thread start" pattern.
    ///
    /// `WorkerPlacement::service_cpu` and `no_perf_cpus` are mutually
    /// exclusive — the topology layer (perf-mode vs `--cpu-cap`)
    /// produces at most one. Both `None` means inherit the parent
    /// thread's affinity (the test/inline path and the no-topology
    /// fallback for ad-hoc fixtures).
    pub fn set_worker_placement(&mut self, placement: WorkerPlacement) {
        self.worker_placement = placement;
    }

    /// Eventfd for KVM irqfd registration.
    pub fn irq_evt(&self) -> &EventFd {
        &self.irq_evt
    }

    /// Set guest memory reference. Must be called before starting vCPUs.
    ///
    /// Stores the memory inside the device's shared `Arc<OnceLock<…>>`
    /// so the worker thread (production cfg) can observe the
    /// reference on its next drain via a lock-free
    /// `OnceLock::get`. Returning before the worker observes the
    /// value is safe because the worker only consults `mem` in
    /// response to a kick (driven by `mmio_write` of QUEUE_NOTIFY),
    /// and KVM wiring guarantees `set_mem` runs before any vCPU
    /// runs (i.e. before any QUEUE_NOTIFY can fire).
    ///
    /// `OnceLock::set` returns `Err` if the slot is already
    /// populated. The current production wiring (mod.rs `init_virtio_blk`)
    /// calls `set_mem` exactly once per device, so the `Err` branch
    /// is unreachable in normal operation; `reset()` does NOT clear
    /// `mem`, matching the prior `Mutex<Option<…>>` semantics where
    /// the slot was only written at boot. Log on `Err` rather than
    /// panic so a future re-wire bug surfaces as a warning instead
    /// of aborting (a panic here could land mid-teardown when the
    /// caller is already unwinding).
    pub fn set_mem(&mut self, mem: GuestMemoryMmap) {
        if self.mem.set(mem).is_err() {
            tracing::warn!(
                "virtio-blk: set_mem called on already-initialised \
                 device; guest memory binding unchanged (mem is set \
                 once at boot and preserved across reset())"
            );
        }
    }

    /// Advertised capacity in 512-byte sectors.
    ///
    /// `dead_code` allow: used by tests to read back the
    /// rounded-to-sector capacity; the lib pipeline consumes
    /// the `capacity_bytes` input directly through the
    /// config-space rendering path, so the accessor would
    /// otherwise appear unused at lib-build time.
    #[allow(dead_code)]
    pub fn capacity_sectors(&self) -> u64 {
        self.capacity_sectors
    }

    /// Cloneable handle to the host-observability counters. The
    /// monitor thread holds an Arc to read counters without locking
    /// the device.
    pub fn counters(&self) -> Arc<VirtioBlkCounters> {
        Arc::clone(&self.worker.counters)
    }

    /// Cloneable handle to the worker's parked-state flag. The
    /// freeze coordinator holds an `Arc<AtomicBool>` and polls it in
    /// the post-thaw barrier and timeout-diagnostic paths without
    /// taking the device's `PiMutex`. Reading via the device handle
    /// ([`Self::is_paused`]) requires `Arc<PiMutex<VirtioBlk>>::lock`,
    /// which contends with every concurrent device operation —
    /// `mmio_read`/`mmio_write` from the vCPU thread and any other
    /// freeze-coord call site holding the lock. Since the
    /// underlying field is already `Arc<AtomicBool>`, exposing it
    /// directly lets the rendezvous loop poll it lock-free; the
    /// Acquire/Release ordering on `paused` provides the same
    /// happens-before edges with the worker's parked-state writes
    /// that [`Self::is_paused`] does.
    pub fn paused_handle(&self) -> Arc<AtomicBool> {
        Arc::clone(&self.paused)
    }

    pub(crate) fn device_features(&self) -> u64 {
        // VIRTIO_F_VERSION_1: modern virtio.
        // VIRTIO_BLK_F_BLK_SIZE: config carries the logical
        //   block size at offset 0x14 (4 bytes LE).
        // VIRTIO_BLK_F_SEG_MAX: config carries the per-request
        //   max scatter-gather segment count at offset 0x0C.
        //   Without this bit the guest defaults max_segments to 1
        //   and a multi-segment bio gets split serially —
        //   throughput-cripplingly slow for our use case.
        // VIRTIO_BLK_F_SIZE_MAX: config carries the per-descriptor
        //   max byte length at offset 0x08. Without this, a guest
        //   can submit a single 4 GB descriptor and force the
        //   device to allocate a matching `Vec<u8>` for I/O —
        //   host OOM hazard on a hostile guest.
        // VIRTIO_BLK_F_FLUSH: device honours T_FLUSH (fdatasync).
        //   Durability semantics depend on the host filesystem
        //   backing the per-test image: for the default
        //   `tempfile()` path on tmpfs, fdatasync is effectively a
        //   no-op (tmpfs doesn't persist across reboot anyway —
        //   only ordering semantics). For btrfs/ext4-backed run
        //   dirs, fdatasync provides the standard kernel-level
        //   ordering guarantee. Advertising F_FLUSH lets the guest
        //   block layer issue REQ_OP_FLUSH at metadata-commit
        //   boundaries (btrfs in the guest depends on this for
        //   tree consistency).
        // VIRTIO_BLK_F_RO: gated on `read_only`. The kernel block
        //   layer marks the disk read-only via `set_disk_ro` after
        //   F_RO negotiation; the device's defensive T_OUT
        //   rejection guards against uncovered write paths.
        // VIRTIO_RING_F_EVENT_IDX (transport feature, bit 29): the
        //   guest can place an "event index" in the avail ring's
        //   `used_event` field telling the device "do not interrupt
        //   me until used.idx reaches this value." `virtio_queue`
        //   tracks that field internally — when this bit is
        //   negotiated, `Queue::needs_notification` returns false
        //   while the guest's threshold is not reached and the
        //   device can suppress the irqfd write. Notification
        //   suppression on bursty IO (multi-chain queue drains)
        //   reduces vCPU exits proportional to the burst size and
        //   is required for high-throughput virtio-blk under
        //   blk-mq. Wire-up: this advertises the bit, set_status
        //   enables event-idx tracking on the queue when FEATURES_OK
        //   negotiates it, and process_requests consults
        //   `Queue::needs_notification` after each drain to decide
        //   whether to fire the irqfd. The V8 split: process_requests
        //   sets VIRTIO_MMIO_INT_VRING unconditionally on any chain
        //   publish, then consults needs_notification to decide
        //   whether to also fire the irqfd.
        let mut feats = (1u64 << VIRTIO_F_VERSION_1)
            | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
            | (1u64 << VIRTIO_BLK_F_SEG_MAX)
            | (1u64 << VIRTIO_BLK_F_SIZE_MAX)
            | (1u64 << VIRTIO_BLK_F_FLUSH)
            | (1u64 << VIRTIO_RING_F_EVENT_IDX);
        if self.worker.read_only {
            feats |= 1u64 << VIRTIO_BLK_F_RO;
        }
        feats
    }

    pub(crate) fn selected_queue(&self) -> Option<usize> {
        let idx = self.queue_select as usize;
        if idx < NUM_QUEUES { Some(idx) } else { None }
    }

    pub(crate) fn queue_config_allowed(&self) -> bool {
        let status = self.device_status.load(Ordering::Acquire);
        status & S_FEAT == S_FEAT && status & VIRTIO_CONFIG_S_DRIVER_OK == 0
    }

    pub(crate) fn features_write_allowed(&self) -> bool {
        let status = self.device_status.load(Ordering::Acquire);
        status & S_DRV == S_DRV && status & VIRTIO_CONFIG_S_FEATURES_OK == 0
    }

    /// Service `VIRTIO_BLK_T_IN` (read) using a single `preadv(2)`
    /// syscall over a vectored iov chain built from the data
    /// segments. Functionally equivalent to
    /// `Self::handle_read_impl` but coalesces N `pread64` syscalls
    /// (one per segment) plus N memcpy passes (kernel→scratch then
    /// scratch→guest) into one syscall reading directly into guest
    /// memory.
    ///
    /// Mirrors cloud-hypervisor's `block::Request::execute_async`
    /// vectored read path: one iovec per `VolatileSlice` produced by
    /// `mem.get_slices(addr, len)`, then a single vectored read.
    /// `get_slices` handles fragmentation when a descriptor's
    /// `[addr, addr+len)` range spans a guest memory region boundary —
    /// each contiguous host range becomes its own iovec entry.
    ///
    /// `data_len` and `sector` are pre-validated by the caller
    /// (`drain_bracket_impl`): SIZE_MAX, SEG_MAX, sub-sector,
    /// direction, and out-of-range checks all run upstream. The
    /// per-segment direction check is repeated here as
    /// defense-in-depth — matching `Self::handle_read_impl` —
    /// so a future caller that bypasses `drain_bracket_impl` and
    /// calls this helper directly cannot smuggle a device-readable
    /// segment into a T_IN chain (which would have `preadv` write
    /// into a buffer the spec marked read-only from the device's
    /// perspective).
    ///
    /// Short-read handling: `preadv` returns `n` bytes filled
    /// (`n <= data_len`). When `n < data_len` (only reachable on a
    /// short read against a backing whose effective length is below
    /// `capacity_bytes` — pre-validated against `capacity_bytes` by
    /// the caller, so this path is rare in production) the unfilled
    /// `[n..data_len)` byte range is zero-padded by walking the
    /// segments forward from byte `n` and writing zero bytes via
    /// `mem.write_slice`. This mirrors the existing per-segment
    /// short-read pad in `Self::handle_read_impl` and matches the
    /// sparse-file semantic the original implementation relied on.
    ///
    /// Counter taxonomy is preserved exactly:
    /// - `record_read(bytes_from_backing)`: bytes ACTUALLY returned
    ///   by `preadv` (`n`), excluding the zero-pad tail.
    /// - `used.elem.len = bytes_to_guest + 1`: full `data_len` (data
    ///   + zero-pad tail) + 1 status byte (virtio-v1.2 §2.7.7.2 —
    ///     bytes the device wrote into device-writable buffers).
    ///
    /// `too_many_arguments` allow: same disjoint-borrow shape as
    /// `Self::handle_read_impl` — every parameter is a separate
    /// `&self` field that must be passed by reference so the caller
    /// can hold a concurrent mutable borrow of the queues vec.
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn handle_read_vectored_impl(
        backing: &dyn Backing,
        capacity_bytes: u64,
        counters: &VirtioBlkCounters,
        mem: &GuestMemoryMmap,
        sector: u64,
        data_segments: &[ChainDescriptor],
        data_len: u64,
    ) -> (u8, u32) {
        let Some(base_offset) = sector.checked_mul(VIRTIO_BLK_SECTOR_SIZE as u64) else {
            counters.record_io_error();
            return (VIRTIO_BLK_S_IOERR as u8, 1);
        };
        if base_offset
            .checked_add(data_len)
            .is_none_or(|end| end > capacity_bytes)
        {
            counters.record_io_error();
            return (VIRTIO_BLK_S_IOERR as u8, 1);
        }

        // Build the iovec chain: one entry per VolatileSlice produced
        // by `mem.get_slices(addr, len)`. `get_slices` iterates over
        // the contiguous host ranges that together cover the guest
        // address span, so a descriptor whose `[addr, addr+len)`
        // straddles a `GuestMemoryMmap` region boundary contributes
        // multiple iovec entries — preserving correctness without
        // requiring the descriptor to fit in a single region.
        //
        // The `PtrGuardMut`s returned by `slice.ptr_guard_mut()` are
        // collected into `_guards` so they remain alive for the
        // duration of the syscall: with the `xen` feature enabled
        // they wrap an `MmapXenSlice` whose Drop unmaps the host
        // mapping; without `xen` the guard is a thin wrapper around
        // the raw pointer. Either way, holding the guards across
        // the syscall guarantees the `iov_base` pointers stay valid.
        //
        // Local Vec rather than a reusable scratch on `BlkWorkerState`:
        // `libc::iovec` contains a raw pointer, which is `!Send`, so
        // storing a `Vec<libc::iovec>` on `BlkWorkerState` would make
        // the whole struct `!Send` and break the `JoinHandle<…>`
        // payload. The initial capacity is a `VIRTIO_BLK_SEG_MAX + 2`
        // hint (one entry per data segment plus header+status); the
        // Vec grows via reallocation if multi-region descriptors
        // fragment into more iovec entries. The per-call allocation
        // is amortized against the single `preadv` syscall it
        // replaces — a quantum of overhead vastly smaller than the
        // N kernel-mode syscall transitions the legacy per-segment
        // path performed.
        let mut iovecs: Vec<libc::iovec> = Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
        let mut _guards: Vec<vm_memory::volatile_memory::PtrGuardMut> =
            Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
        for seg in data_segments {
            if !seg.is_write_only {
                // Spec violation — a T_IN request's data SGs must
                // be device-writable. Defense-in-depth: the outer
                // gate in process_requests already rejected this
                // chain before throttle. Mirrors the same check in
                // `Self::handle_read_impl` so a future caller
                // that bypasses `drain_bracket_impl` cannot reach
                // `preadv` with a device-readable buffer.
                counters.record_io_error();
                return (VIRTIO_BLK_S_IOERR as u8, 1);
            }
            let len = seg.len as usize;
            if len == 0 {
                // Zero-length data descriptor: legal per virtio
                // (qemu/firecracker accept). Skip — preadv with a
                // zero-length iovec entry is a no-op and skipping
                // avoids an unnecessary `get_slices` round-trip.
                continue;
            }
            for slice_result in mem.get_slices(seg.addr, len) {
                let slice = match slice_result {
                    Ok(s) => s,
                    Err(_) => {
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    }
                };
                let guard = slice.ptr_guard_mut();
                iovecs.push(libc::iovec {
                    iov_base: guard.as_ptr() as *mut libc::c_void,
                    iov_len: slice.len(),
                });
                _guards.push(guard);
            }
        }

        // Tripwire for the structural iovec-count bound derived in the
        // SAFETY comment below (SEG_MAX * 2 = 256 <= IOV_MAX). Debug-
        // only (zero release cost); a future change that broke the
        // bound trips the proptest fuzzers here instead of surfacing as
        // a runtime `preadv` EINVAL.
        debug_assert!(
            iovecs.len() <= IOV_MAX,
            "blk preadv iovcnt {} exceeds IOV_MAX {} — structural bound broke (see SAFETY comment)",
            iovecs.len(),
            IOV_MAX,
        );

        // Empty iovec means data_len == 0 — every data descriptor in
        // the chain had len == 0. The upstream zero-data gate in
        // drain.rs gates on `data_segments.is_empty()`, NOT on
        // `data_len == 0`, so a chain with one or more zero-length
        // data descriptors passes the gate and reaches here. Linux
        // `preadv` with iovcnt=0 returns 0 (lib/iov_iter.c
        // `iovec_from_user`: "Linux has traditionally returned zero
        // for zero segments"), so a syscall would be harmless —
        // skipping it just avoids the kernel-mode round-trip on a
        // path that has nothing to do.
        let bytes_from_backing: u64 = if iovecs.is_empty() {
            0
        } else {
            // SAFETY: `iovecs` is a non-empty Vec of `libc::iovec`
            // entries built from valid host pointers (each came from
            // a `VolatileSlice` produced by `mem.get_slices(...)`
            // and the corresponding `PtrGuardMut` is alive in
            // `_guards` for the duration of this call, keeping the
            // backing pointer valid) — this upholds the
            // `Backing::preadv` precondition that each `iov_base` is
            // valid for `iov_len` bytes across the call. The backing
            // (`&dyn Backing`) is borrowed from the `BlkWorkerState`
            // the caller (`drain_bracket_impl`) owns for the drain.
            //
            // `iovecs.len()` is structurally bounded at <= 256 — a
            // quarter of Linux's `IOV_MAX` (UIO_MAXIOV = 1024) — so
            // `preadv` never sees `iovcnt > IOV_MAX`. Derived from
            // three enforced/structural facts:
            //   - at most `VIRTIO_BLK_SEG_MAX` (128) data segments:
            //     drain.rs rejects `chain_len > SEG_MAX + 2` before
            //     this loop runs;
            //   - each segment is <= `VIRTIO_BLK_SIZE_MAX` (1 MiB):
            //     drain.rs rejects `any(len > SIZE_MAX)` before this
            //     loop runs;
            //   - every `GuestMemoryMmap` region is >= 1 MiB and
            //     MiB-granular: numa_mem allocates per-node memory in
            //     whole MiB (a 0-MiB node is omitted) and the MMIO-gap
            //     split is GiB-aligned, so even the split pieces are
            //     MiB-granular (numa_mem.rs `push_node_regions`).
            // `mem.get_slices(addr, len)` returns one slice per
            // `GuestMemoryMmap` region the `[addr, addr+len)` range
            // spans. A <= 1 MiB segment over >= 1 MiB regions crosses
            // at most one region boundary -> at most 2 slices, so the
            // total is at most `SEG_MAX * 2 = 256`. This is a
            // STRUCTURAL invariant (the three facts above), NOT a
            // code-enforced cap: if a future change lowers the minimum
            // region size, raises SEG_MAX/SIZE_MAX, or makes
            // `get_slices` fragment more finely (e.g. per-page),
            // re-derive this bound or add an explicit
            // `iovcnt <= IOV_MAX` chunking loop.
            // `base_offset` is a `u64` validated above to fit in
            // `[0, capacity_bytes]`; `capacity_bytes` is host-trusted
            // (constructed in `with_options`) so it cannot exceed
            // `i64::MAX` for any realistic disk size and fits in
            // `off_t` losslessly.
            // Retry ONLY a signal-interrupted read (`Err(Interrupted)`,
            // 0 bytes transferred). A short POSITIVE return is EOF for a
            // regular file (a thin/truncated backing shorter than the
            // advertised capacity) — NOT a signal — so it exits the loop
            // and the short-read pad below zero-fills the `[n..data_len)`
            // tail. This is asymmetric to the write path: a read returns
            // at EOF, whereas a write must retry a short-positive to
            // completion. (read(2): a catchable-signal interrupt with 0
            // bytes yields `-EINTR` (Err); a partial-then-signal yields
            // the positive count, indistinguishable from EOF and treated
            // as EOF — retrying an at-EOF read would spin.)
            // Divergence: firecracker errors a short read
            // (`read_exact_volatile` → UnexpectedEof → S_IOERR); we
            // zero-pad it as EOF — the more lenient regular-file-EOF
            // choice, matching qemu's short-read `iov_memset` pad.
            let mut eintr_retries: u32 = 0;
            loop {
                // SAFETY: each iovec base points at live guest memory
                // held by `_guards` for this call; `iovecs.len()` is
                // structurally <= IOV_MAX (see the bound derivation
                // above).
                match unsafe { backing.preadv(&iovecs, base_offset) } {
                    Ok(n) => break n as u64,
                    Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
                        // EINTR here is a pending FATAL signal (the read
                        // waits TASK_KILLABLE; a fatal signal does not
                        // clear), so re-issue is bounded to avoid
                        // spinning a dying IO thread — see
                        // MAX_EINTR_RETRIES.
                        eintr_retries += 1;
                        if eintr_retries > MAX_EINTR_RETRIES {
                            tracing::warn!(
                                sector,
                                "virtio-blk preadv: EINTR retry cap hit (pending fatal signal?); failing"
                            );
                            counters.record_io_error();
                            return (VIRTIO_BLK_S_IOERR as u8, 1);
                        }
                        continue;
                    }
                    Err(e) => {
                        tracing::warn!(sector, %e, "virtio-blk preadv error");
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    }
                }
            }
        };

        // Short-read pad: zero-fill the unfilled `[n..data_len)`
        // tail across the data segments. Walks segments forward,
        // skipping over already-filled bytes (`< n`) and zeroing
        // any remainder. Matches the per-segment behavior in
        // `handle_read_impl` (the unfilled tail of the segment that
        // straddles `n` plus all subsequent segments are zeroed).
        if bytes_from_backing < data_len {
            let mut filled = bytes_from_backing;
            let mut to_zero = data_len - bytes_from_backing;
            // Stack zero buffer sized to a fixed 64 KiB chunk.
            // Smaller than `VIRTIO_BLK_SIZE_MAX` (1 MiB) so a single
            // segment of the maximum size cannot be zeroed in one
            // pass; the inner `while remaining > 0` loop iterates
            // up to 16 times per segment, each iteration writing
            // one 64 KiB chunk via `mem.write_slice`. 64 KiB keeps
            // the stack footprint small while amortizing the
            // per-write_slice overhead across multiple sectors.
            const ZERO_BUF_LEN: usize = 65536;
            let zeros = [0u8; ZERO_BUF_LEN];
            for seg in data_segments {
                if to_zero == 0 {
                    break;
                }
                let seg_len = seg.len as u64;
                if filled >= seg_len {
                    // Segment already fully filled by preadv; skip.
                    filled -= seg_len;
                    continue;
                }
                // Zero from offset `filled` within this segment to
                // its end (or until `to_zero` runs out, whichever
                // comes first).
                let seg_offset = filled as u32;
                let seg_remaining = (seg_len - filled).min(to_zero) as u32;
                let Some(zero_addr_u64) = seg.addr.0.checked_add(seg_offset as u64) else {
                    counters.record_io_error();
                    return (VIRTIO_BLK_S_IOERR as u8, 1);
                };
                let mut zero_addr = GuestAddress(zero_addr_u64);
                let mut remaining = seg_remaining;
                while remaining > 0 {
                    let chunk = (remaining as usize).min(ZERO_BUF_LEN);
                    if mem.write_slice(&zeros[..chunk], zero_addr).is_err() {
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    }
                    let Some(next) = zero_addr.0.checked_add(chunk as u64) else {
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    };
                    zero_addr = GuestAddress(next);
                    remaining -= chunk as u32;
                }
                to_zero -= seg_remaining as u64;
                filled = 0;
            }
        }

        counters.record_read(bytes_from_backing);
        // bytes_to_guest = full data_len (data + zero-pad tail). Cap
        // at u32::MAX; SEG_MAX (128) × SIZE_MAX (1 MiB) = 128 MiB ≪
        // u32::MAX, so the cast cannot truncate.
        let bytes_to_guest = data_len as u32;
        (VIRTIO_BLK_S_OK as u8, bytes_to_guest + 1)
    }

    /// Service `VIRTIO_BLK_T_OUT` (write) using a single `pwritev(2)`
    /// syscall over a vectored iov chain built from the data
    /// segments. Functionally equivalent to
    /// `Self::handle_write_impl` but coalesces N `pwrite64`
    /// syscalls plus N memcpy passes into one syscall writing
    /// directly from guest memory.
    ///
    /// Mirrors cloud-hypervisor's vectored write path with the same
    /// `iovecs.push(...)` build step followed by `write_vectored`.
    ///
    /// `data_len` and `sector` are pre-validated by the caller
    /// (`drain_bracket_impl`): SIZE_MAX, SEG_MAX, sub-sector,
    /// direction, RO-mode, and out-of-range checks all run upstream.
    /// The per-segment direction check is repeated here as
    /// defense-in-depth — matching `Self::handle_write_impl` —
    /// so a future caller that bypasses `drain_bracket_impl` cannot
    /// smuggle a device-writable segment into a T_OUT chain (which
    /// would have `pwritev` read from a buffer the spec marked
    /// device-only-writable from the driver's perspective).
    ///
    /// Short-write handling: `pwritev` may return `0 < n < data_len`
    /// (e.g. ENOSPC mid-write or a signal-interrupted partial). A
    /// short POSITIVE write is recoverable forward progress, so the
    /// loop advances the iovecs past `n` and re-issues the remainder
    /// until the full `data_len` lands — matching the kernel's
    /// `generic_perform_write` copy loop. Only `Ok(0)` (zero forward
    /// progress) or a hard `Err` is a genuine failure → S_IOERR + an
    /// `io_errors` bump; `Err(Interrupted)` is retried up to
    /// `MAX_EINTR_RETRIES`. This DIVERGES from the cfg(test)
    /// per-segment `Self::handle_write_impl`, which rejects on the
    /// first short `Ok(n)` — the single-shot reject was a
    /// guest-data-integrity bug (a completable write failed the
    /// guest's bio).
    ///
    /// Counter taxonomy is preserved exactly:
    /// - `record_write(total_written)`: bytes accepted by the
    ///   backing file. On success `total_written == data_len`.
    /// - `used.elem.len = 1` (status byte only — write data is
    ///   not written back into guest memory).
    ///
    /// `too_many_arguments` allow: same disjoint-borrow shape as
    /// [`Self::handle_read_vectored_impl`].
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn handle_write_vectored_impl(
        backing: &dyn Backing,
        capacity_bytes: u64,
        counters: &VirtioBlkCounters,
        mem: &GuestMemoryMmap,
        sector: u64,
        data_segments: &[ChainDescriptor],
        data_len: u64,
    ) -> (u8, u32) {
        let Some(base_offset) = sector.checked_mul(VIRTIO_BLK_SECTOR_SIZE as u64) else {
            counters.record_io_error();
            return (VIRTIO_BLK_S_IOERR as u8, 1);
        };
        if base_offset
            .checked_add(data_len)
            .is_none_or(|end| end > capacity_bytes)
        {
            counters.record_io_error();
            return (VIRTIO_BLK_S_IOERR as u8, 1);
        }

        // Build the iovec chain. `get_slices` (not `get_slice`) so
        // a descriptor whose `[addr, addr+len)` straddles a region
        // boundary contributes multiple iovec entries. See the
        // `handle_read_vectored_impl` doc for the
        // `_guards`-keep-alive and per-call-allocation rationale.
        //
        // For T_OUT, the iovec entries are read-only with respect to
        // the syscall (pwritev READS the iov_base pointers), so we
        // hold `PtrGuard`s rather than `PtrGuardMut`s — no dirty
        // tracking is needed because we are not modifying guest
        // memory here.
        let mut iovecs: Vec<libc::iovec> = Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
        let mut _guards: Vec<vm_memory::volatile_memory::PtrGuard> =
            Vec::with_capacity(VIRTIO_BLK_SEG_MAX as usize + 2);
        for seg in data_segments {
            if seg.is_write_only {
                // Spec violation — a T_OUT request's data SGs must
                // be device-readable. Defense-in-depth: the outer
                // gate in process_requests already rejected this
                // chain before throttle. Mirrors the same check in
                // `Self::handle_write_impl` so a future caller
                // that bypasses `drain_bracket_impl` cannot reach
                // `pwritev` against a device-writable buffer.
                counters.record_io_error();
                return (VIRTIO_BLK_S_IOERR as u8, 1);
            }
            let len = seg.len as usize;
            if len == 0 {
                continue;
            }
            for slice_result in mem.get_slices(seg.addr, len) {
                let slice = match slice_result {
                    Ok(s) => s,
                    Err(_) => {
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    }
                };
                let guard = slice.ptr_guard();
                iovecs.push(libc::iovec {
                    // `iovec.iov_base` is `*mut c_void` regardless of
                    // direction; the kernel reads from it for
                    // pwritev and writes to it for preadv. Casting
                    // the read-only pointer to `*mut` is fine because
                    // pwritev does not mutate the buffer; the
                    // mut-ness in the type is a libc convention, not
                    // a behavior contract.
                    iov_base: guard.as_ptr() as *mut libc::c_void,
                    iov_len: slice.len(),
                });
                _guards.push(guard);
            }
        }

        // Tripwire for the structural iovec-count bound (SEG_MAX * 2 =
        // 256 <= IOV_MAX); see the pwritev SAFETY comment below.
        // Debug-only; a future change that broke the bound trips the
        // proptest fuzzers here instead of a runtime `pwritev` EINVAL.
        debug_assert!(
            iovecs.len() <= IOV_MAX,
            "blk pwritev iovcnt {} exceeds IOV_MAX {} — structural bound broke (see SAFETY comment)",
            iovecs.len(),
            IOV_MAX,
        );

        if iovecs.is_empty() {
            // data_len == 0 — every data descriptor in the chain had
            // len == 0. The upstream zero-data gate in drain.rs
            // gates on `data_segments.is_empty()`, NOT on
            // `data_len == 0`, so a chain with one or more
            // zero-length data descriptors passes the gate and
            // reaches here. Linux `pwritev` with iovcnt=0 returns 0
            // for the same reason as `preadv` (see the read path
            // doc), so a syscall would be harmless — skipping it
            // just avoids the kernel-mode round-trip on a path
            // that has nothing to do.
            counters.record_write(0);
            return (VIRTIO_BLK_S_OK as u8, 1);
        }

        // Retry-to-completion. A SHORT positive write (`0 < n <
        // remaining`) is recoverable forward progress — `pwritev` may
        // transfer fewer bytes than requested on ENOSPC mid-write or a
        // signal-interrupted partial — so advance the iovecs past `n`
        // and re-issue the remainder until the full `data_len` lands.
        // Only `Ok(0)` (zero forward progress, e.g. immediate ENOSPC)
        // or a hard `Err` is a genuine failure → S_IOERR. The prior
        // single-pwritev returned S_IOERR on ANY short write, failing
        // the guest's bio for a write the device could have completed —
        // a guest-data-integrity bug. Asymmetric to the read path
        // (short positive read = EOF).
        //
        // Reference behavior (matches + intentional divergences):
        //   - kernel `generic_perform_write` (mm/filemap.c): MATCH —
        //     the page-copy loop advances the iter and re-copies until
        //     the byte count drains; a short write is recoverable
        //     progress returned as a positive partial.
        //   - firecracker `write_all_volatile` (vm-memory): MATCH —
        //     loops `offset(n)` past each short write until the buffer
        //     is empty; `Ok(0)` → WriteZero.
        //   - qemu file-posix: also retries to completion, but via a
        //     DIFFERENT mechanism — it abandons the vectored syscall on
        //     a short return and linearizes into a bounce buffer
        //     (`handle_aiocb_rw_linear` loops; returns EINVAL if a
        //     write is still short). We advance the iovecs and re-issue
        //     `pwritev` directly — same goal, no bounce-buffer copy.
        //   - cloud-hypervisor and libkrun: a silent-truncation bug we
        //     do NOT mirror — both accept a short positive write as
        //     success (cloud-hypervisor's block completion handler
        //     treats any `result >= 0` as S_OK with the partial byte
        //     count as `used.len`; libkrun's `consume` runs the write
        //     callback once, no retry), truncating the guest's write.
        let mut remaining: &mut [libc::iovec] = &mut iovecs;
        let mut off = base_offset;
        let mut total_written: u64 = 0;
        let mut eintr_retries: u32 = 0;
        loop {
            // SAFETY: each iovec base points at live guest memory held
            // by `_guards` for this call (`_guards` outlives the loop);
            // `remaining.len()` only shrinks from the
            // structurally-bounded initial `iovecs.len()` (<= IOV_MAX),
            // so iovcnt stays in range.
            let n = match unsafe { backing.pwritev(remaining, off) } {
                Ok(0) => {
                    tracing::warn!(
                        sector,
                        total_written,
                        data_len,
                        "virtio-blk pwritev made zero forward progress; failing"
                    );
                    counters.record_io_error();
                    return (VIRTIO_BLK_S_IOERR as u8, 1);
                }
                Ok(n) => n,
                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
                    // EINTR here is a pending FATAL signal (the write
                    // path returns -EINTR under fatal_signal_pending; a
                    // fatal signal does not clear), so bound the
                    // re-issue to avoid spinning a dying IO thread and
                    // delaying the freeze dump — see MAX_EINTR_RETRIES.
                    eintr_retries += 1;
                    if eintr_retries > MAX_EINTR_RETRIES {
                        tracing::warn!(
                            sector,
                            "virtio-blk pwritev: EINTR retry cap hit (pending fatal signal?); failing"
                        );
                        counters.record_io_error();
                        return (VIRTIO_BLK_S_IOERR as u8, 1);
                    }
                    continue;
                }
                Err(e) => {
                    tracing::warn!(sector, %e, "virtio-blk pwritev error");
                    counters.record_io_error();
                    return (VIRTIO_BLK_S_IOERR as u8, 1);
                }
            };
            total_written += n as u64;
            off += n as u64;
            if total_written == data_len {
                break;
            }
            // total_written < data_len ⇒ n < the remaining iovec bytes,
            // so `advance_iovecs` returns a non-empty slice.
            remaining = advance_iovecs(remaining, n);
            debug_assert!(
                !remaining.is_empty(),
                "virtio-blk pwritev: iovecs exhausted at {total_written} of {data_len} bytes"
            );
        }
        counters.record_write(total_written);
        // used_len: 1 (status byte only — write data is not
        // written back into guest memory).
        (VIRTIO_BLK_S_OK as u8, 1)
    }

    /// Classify a request type into a "pre-throttle terminal status"
    /// when applicable. Returns `Some((status_byte, used_len))` for
    /// requests the device decides without ever touching the backing
    /// file or the throttle (RO-mode writes, RO-mode flushes, unknown
    /// request types). Returns `None` for the normal
    /// IN/OUT/FLUSH/GET_ID paths that need the backend handlers.
    ///
    /// Side effects per branch:
    ///
    /// - **RO-mode `T_OUT` → `S_IOERR`**: increments `io_errors`.
    ///   The guest negotiated `F_RO` (or, defensively, ignored it
    ///   and tried to write anyway). Either way, the device rejected
    ///   real work — that's an IO error from a host-observability
    ///   standpoint, surfaced in the failure-dump counters.
    /// - **RO-mode `T_FLUSH` → `S_OK`**: increments
    ///   `flushes_completed`. The guest issued a real flush; the
    ///   device serviced it (trivially, because nothing's dirty on
    ///   a read-only disk). The counter records the delivery, not
    ///   the work — symmetric with how the writable-disk path
    ///   bumps `flushes_completed` after `fdatasync` returns Ok.
    /// - **`T_GET_ID` → None (regardless of `read_only`)**: T_GET_ID
    ///   is a metadata read that never touches the backing file, so
    ///   the RO disk accepts it the same as a writable disk. Per
    ///   virtio-v1.2 §5.2.6.4 GET_ID is not gated on any feature
    ///   bit and is always accepted.
    /// - **Unknown type → `S_UNSUPP`**: NO counter bump. UNSUPP is
    ///   a graceful decline ("the device doesn't speak this
    ///   request"), not a service failure — the device never tried
    ///   anything that could fail.
    ///
    /// Counter writes belong with the classification because the
    /// dispatch decision IS the moment that bookkeeping happens —
    /// hoisting them out would force the caller to re-derive the
    /// request type.
    ///
    /// Extracted into a free associated function (no `&self`) so it
    /// can be tested directly without constructing a fully-wired
    /// `VirtioBlk` and so `process_requests` can call it while
    /// holding `&mut self.queues[..]`.
    pub(crate) fn classify_pre_throttle(
        req_type: u32,
        read_only: bool,
        counters: &VirtioBlkCounters,
    ) -> Option<(u8, u32)> {
        match req_type {
            VIRTIO_BLK_T_OUT if read_only => {
                counters.record_io_error();
                Some((VIRTIO_BLK_S_IOERR as u8, 1))
            }
            VIRTIO_BLK_T_FLUSH if read_only => {
                // No-op flush on a read-only disk: nothing dirty to
                // flush, but count it as a completed flush for
                // visibility in the failure-dump counters — the
                // guest issued a real flush and the device serviced
                // it.
                counters.record_flush();
                Some((VIRTIO_BLK_S_OK as u8, 1))
            }
            VIRTIO_BLK_T_IN | VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH | VIRTIO_BLK_T_GET_ID => None,
            _ => Some((VIRTIO_BLK_S_UNSUPP as u8, 1)),
        }
    }

    // ------------------------------------------------------------------
    // Request queue processing
    // ------------------------------------------------------------------

    /// Drive the request queue. In `cfg(test)` the drain runs
    /// inline on the caller thread (preserving the synchronous
    /// test surface). In production this is a non-blocking
    /// kick of the worker thread's eventfd — `mmio_write` of
    /// `QUEUE_NOTIFY` returns immediately so the vCPU thread
    /// doesn't block on backing-file IO.
    pub(crate) fn process_requests(&mut self) {
        #[cfg(test)]
        {
            self.drain_inline();
        }
        #[cfg(not(test))]
        {
            // Non-blocking kick. The worker thread's epoll_wait
            // resumes and runs one drain iteration per kick. EAGAIN
            // (counter saturated at u64::MAX-1) is implausible under
            // any realistic workload — the worker would have to be
            // ~2^64 kicks behind — and on encountering it we drop
            // the spurious kick because counter-mode coalesces all
            // pending kicks into a single read by the worker on the
            // next wakeup, so no QUEUE_NOTIFY is permanently lost.
            let WorkerEngine::Spawned(eng) = &self.worker.engine;
            let _ = eng.kick_fd.write(1);
        }
    }

    /// Inline drain (test-mode only). Resolves the Inline engine,
    /// fetches a `&mem` reference from the shared `Arc<OnceLock<…>>`
    /// via a lock-free `OnceLock::get`, and calls `drain_bracket_impl`
    /// directly with the worker state + queue + irq + interrupt_status
    /// borrows. No clone is needed — `drain_bracket_impl` accepts
    /// `&GuestMemoryMmap` and the lifetime ends inside this fn.
    #[cfg(test)]
    pub(crate) fn drain_inline(&mut self) {
        let Some(mem) = self.mem.get() else {
            // Caller (kvm wiring in src/vmm/mod.rs) is supposed to
            // call `set_mem` before any vCPU runs. A queue-notify
            // before that is a wiring bug; surface it once per
            // device so the log isn't flooded if the guest spams
            // notifies on the broken setup.
            if !self.mem_unset_warned.swap(true, Ordering::Relaxed) {
                tracing::warn!(
                    "virtio-blk: queue notify before set_mem; \
                     dropping requests until guest memory is wired"
                );
            }
            return;
        };
        let WorkerEngine::Inline(engine) = &mut self.worker.engine;
        // The cfg(test) inline path discards the DrainOutcome —
        // tests step the throttle bucket forward via
        // `set_last_refill_for_test` and re-issue QUEUE_NOTIFY to
        // exercise the post-stall retry path. There is no timer
        // arming because there is no worker thread to wake.
        let _ = super::drain_bracket_impl(
            &mut engine.state,
            &mut self.worker.queues,
            mem,
            &self.irq_evt,
            &self.interrupt_status,
            &self.device_status,
        );
    }
}