ktstr 0.15.0

Test harness for Linux process schedulers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
//! Cold-path kernel-memory op dispatcher.
//!
//! Invoked while the freeze rendezvous is held — every vCPU parked,
//! the virtio-blk worker paused, no guest writer can race the host-
//! side reads or writes. Walks the [`KernelOpRequestPayload`]
//! batch entry-by-entry, invokes the matching
//! [`crate::monitor::guest::GuestKernel`] read/write helper per
//! `(direction, target, value)` combination, and assembles a
//! [`KernelOpReplyPayload`] reply.
//!
//! # Semantics
//!
//! * **Batch-fatal first failure.** The first entry whose dispatch
//!   returns an error short-circuits the batch and produces a
//!   `success = false` reply naming the failing entry's index. Entries
//!   AFTER the failure are NOT attempted (skipping them keeps the
//!   reply boundary deterministic — the caller knows everything past
//!   the failing index is in untouched state).
//!
//! * **Writes that landed before the failure are NOT rolled back.**
//!   Earlier-index entries that wrote successfully ARE applied to
//!   guest memory. Cold-path callers that need transactional
//!   semantics across a multi-entry batch must either keep batches
//!   to one entry or accept partial-prefix application — there is no
//!   undo log. The reply's failing-index field is the boundary.
//!
//! * **Read replies are INDEX-ALIGNED with the request entries.**
//!   `reply.read_values[i]` is the result of dispatching
//!   `req.entries[i]`. For writes `reply.read_values` is empty.
//!
//! * **`OrU32` is write-only** under the current dispatcher. A read
//!   direction carrying an `OrU32` value is a wire-format misuse and
//!   fails the batch with a typed error (the variant has no read
//!   semantics — it carries a mask, not a width hint).
//!
//! * **`KernelOpTarget::PerCpuField` resolution** uses a hardcoded
//!   `{symbol → struct_name}` mapping (see
//!   [`struct_name_for_per_cpu_symbol`]) to bridge the wire variant
//!   to BTF: `runqueues` → `rq`, `kernel_cpustat` → `kernel_cpustat`,
//!   etc. Extending the supported symbol set requires an entry there
//!   AND symbol resolution in
//!   [`crate::monitor::symbols::KernelSymbols::from_elf`]. Unknown
//!   symbols fail with a typed error rather than silently producing
//!   nonsense.
//!
//! # Atomicity under freeze rendezvous
//!
//! Every dispatch call is sandwiched between the
//! `freeze_coord_freeze.store(true, Ordering::Release)` flip + the
//! SIGRTMIN / immediate_exit park-ack rendezvous (which establishes
//! a happens-before from every parked vCPU's last guest-side memory
//! op to this dispatch) AND the matching post-dispatch
//! `freeze_coord_freeze.store(false, Ordering::Release)` flip + the
//! post-thaw barrier (which establishes happens-before to the first
//! resumed guest-side memory op). The `Release` /`Acquire` pairs
//! make every host write observable to every subsequent guest read
//! and vice versa without per-write fences.
//!
//! The `OrU32` RMW therefore runs as `read_u32 → OR → write_u32`
//! with NO `compare_exchange` loop — the parked-vCPU contract rules
//! out concurrent guest writes between our load and our store.
//! Hot-path RMW (when implemented as a sibling op type) cannot reuse
//! this pattern; it must use `core::sync::atomic::AtomicU32::from_ptr`
//! and a `compare_exchange` loop against the live guest writer.
//!
//! # Same-rendezvous-epoch invariant
//!
//! For `OrU32` to be race-free, the read and the write MUST occur
//! inside the SAME freeze rendezvous epoch — i.e. within a single
//! invocation of [`dispatch_one_write`], between the `Release` store
//! on `freeze_coord_freeze` (rendezvous entry) and its matching clear
//! (rendezvous exit). Splitting the read + OR + write across freeze
//! boundaries would let the next guest writer interleave between our
//! load and our store, producing torn state silent to the dispatcher
//! and detectable only by KASAN or scheduler-state inconsistency
//! dumps. The structural guarantee is the dispatcher's per-entry
//! sequential walk: `dispatch_one_write` runs the read + OR + write
//! triple in one function body, never yielding between them. A
//! future refactor that extracts the RMW into a helper invoked
//! across multiple rendezvous would silently break this invariant —
//! the `// rmw-invariant-anchor` markers at the OrU32 arms and the
//! `tests::or_u32_rmw_anchors_inside_dispatch_one_write` doc-grep
//! regression test together enforce the pattern at the source level.

use crate::monitor::btf_offsets::{find_struct, nested_member_byte_offset};
use crate::monitor::guest::GuestKernel;
use crate::monitor::idr::translate_any_kva;
use crate::vmm::wire::{
    KERNEL_OP_REASON_MAX, KernelOpDirection, KernelOpEntry, KernelOpReplyPayload,
    KernelOpRequestPayload, KernelOpTarget, KernelOpValue,
};
use btf_rs::Btf;

/// Maximum nodes the [`find_task_by_pid`] walker visits before
/// surfacing a typed error. Matches the cap in
/// [`crate::monitor::scx_walker::walk_scx_tasks_global`]'s
/// `MAX_NODES_PER_LIST` analogue — a corrupt `init_task.tasks` chain
/// (cycle or wild pointer) must not turn the cold-path dispatcher
/// into an unbounded read loop. 65536 covers realistic workloads
/// (pid_max defaults of 32768-4M but typical test VMs run << 4K
/// tasks) while rejecting pathological chains in a bounded time.
const MAX_TASK_WALKER_NODES: u32 = 65536;

/// `TASK_DEAD` flag bit on `task_struct.__state` per
/// `include/linux/sched.h:118` (`#define TASK_DEAD 0x00000080`). A
/// task with this bit set is in the final teardown path — its
/// `task_struct` fields are mid-cleanup and writing through them
/// would corrupt the dying-task state machine. Validation rejects
/// before any field write.
const TASK_DEAD: u32 = 0x80;

/// Lower bound for any KVA accepted as a [`KernelOpTarget::Kva`]
/// target (page-walked, `read_kva_*`/`write_kva_*`).
///
/// `0xFF00_0000_0000_0000` is the conservative 5-level x86_64
/// kernel-half boundary (top 8 bits set; sign-extension from bit 56
/// per `__VIRTUAL_MASK_SHIFT` in `arch/x86/include/asm/page_64_types.h`
/// when `CONFIG_X86_5LEVEL=y`). It accepts every legitimate 4-level
/// kernel-half KVA (≥ `0xFFFF_8000_0000_0000`) AND every 5-level
/// kernel-half KVA (≥ `0xFF00_0000_0000_0000`). The Kva path can
/// safely use a loose threshold here because the downstream
/// `read_kva_*`/`write_kva_*` page-walk returns `Option::None` on
/// unmapped or non-canonical addresses (page-walk safety net).
///
/// INTENTIONALLY DIFFERS from
/// `crate::vmm::x86_64::msr_kaslr::KERNEL_HALF_CANONICAL_4LEVEL`
/// (value `0xFFFF_8000_0000_0000`, shared by `freeze_coord::dispatch`
/// via the same import). That constant checks the 4-level x86_64
/// canonical-bits invariant on the LSTAR MSR + kernel-text-link KVA
/// — a strict per-hardware invariant on known-shape inputs. This
/// dispatcher accepts arbitrary caller-supplied KVAs and must use the
/// looser 5-level superset so 5-level kernel
/// direct-map/vmalloc/vmemmap addresses are not false-rejected. The
/// paired naming (`_CANONICAL_4LEVEL` vs `_CONSERVATIVE_5LEVEL`)
/// telegraphs which is which.
///
/// [`KernelOpTarget::Direct`] does NOT use this threshold — it uses
/// runtime `page_offset + dram_size` range validation via
/// [`validate_direct_target`], because `kva_to_pa` (the Direct
/// path's PA derivation) does `kva.wrapping_sub(page_offset)` with
/// no safety net — a wrap to an in-bounds-but-wrong PA would silently
/// no-op at `write_scalar`/`read_scalar`.
const KERNEL_HALF_CONSERVATIVE_5LEVEL: u64 = 0xFF00_0000_0000_0000;

/// Validate that a [`KernelOpTarget::Direct`] target's KVA range is
/// inside the direct-map region `[page_offset, page_offset + dram_size)`.
///
/// Direct targets compute their PA via
/// `kva_to_pa = kva.wrapping_sub(page_offset)` (no page-walk, no
/// Option-failure). A KVA below `page_offset` underflows and wraps
/// to a huge PA that the downstream `write_scalar`/`read_scalar`
/// silently no-ops on (per `src/monitor/reader.rs:639-687`). A KVA
/// past `page_offset + dram_size` similarly wraps the bounds check.
/// Either case is a silent-data-loss path the [`KERNEL_HALF_CONSERVATIVE_5LEVEL`]
/// alone cannot catch.
///
/// Caller derives `len` from the value width (U32=4, U64=8,
/// Bytes=`bytes.len()`, OrU32=4). `page_offset` from
/// [`GuestKernel::page_offset`]; `dram_size` from
/// [`GuestKernel::mem`]`.size()`.
fn validate_direct_target(
    kva: u64,
    len: u64,
    page_offset: u64,
    dram_size: u64,
) -> Result<(), String> {
    if kva < page_offset {
        return Err(format!(
            "Direct kva={kva:#x} below page_offset={page_offset:#x} \
             (kva_to_pa would wrap; use Kva target for vmalloc/vmemmap)"
        ));
    }
    let direct_map_end = page_offset.checked_add(dram_size).ok_or_else(|| {
        format!("internal: page_offset+dram_size overflow ({page_offset:#x} + {dram_size:#x})")
    })?;
    let kva_end = kva
        .checked_add(len)
        .ok_or_else(|| format!("Direct kva+len overflow ({kva:#x} + {len:#x})"))?;
    if kva_end > direct_map_end {
        return Err(format!(
            "Direct kva={kva:#x} len={len} overruns direct-map end {direct_map_end:#x}"
        ));
    }
    Ok(())
}

/// Validate that a [`KernelOpTarget::Kva`] target's KVA range is in
/// the kernel-half address space.
///
/// The page-walk safety net (`read_kva_*`/`write_kva_*` return
/// `Option::None` on unmapped or non-canonical addresses) catches
/// most invalid KVAs downstream — this helper just rejects the
/// obvious user-half case early so the operator-visible error names
/// the right band ("below kernel-half threshold") rather than
/// "page unmapped".
fn validate_kva_target(kva: u64, len: u64) -> Result<(), String> {
    if kva < KERNEL_HALF_CONSERVATIVE_5LEVEL {
        return Err(user_half_kva_rejection_reason(kva));
    }
    let _ = kva
        .checked_add(len)
        .ok_or_else(|| format!("Kva kva+len overflow ({kva:#x} + {len:#x})"))?;
    Ok(())
}

/// Build the typed-error reason for [`validate_kva_target`]'s
/// user-half rejection. Extracted as a standalone `pub(super) fn`
/// for the same reason as [`oru32_read_rejection_reason`]: the
/// tests that pin the format invoke the SAME helper the dispatcher
/// uses, avoiding the tautology where the test re-synthesises the
/// expected string.
pub(super) fn user_half_kva_rejection_reason(kva: u64) -> String {
    format!(
        "Kva={kva:#x} below kernel-half 5-level conservative threshold \
         {KERNEL_HALF_CONSERVATIVE_5LEVEL:#x}; use Symbol target or a KVA in the \
         kernel address space"
    )
}

/// Walk the request's batch and produce a reply.
///
/// `kernel` is a [`GuestKernel`] borrowed from the
/// `owned_accessor.guest_kernel()` site in the freeze coordinator;
/// the borrow is valid for the duration of one freeze rendezvous
/// because the owning `GuestMemMapAccessorOwned` outlives the
/// rendezvous (it lives in the coordinator's `OnceLock`).
pub(super) fn dispatch_kernel_op_batch(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    req: &KernelOpRequestPayload,
) -> KernelOpReplyPayload {
    let request_id = req.request_id;
    match req.direction {
        KernelOpDirection::Write => {
            dispatch_write_batch(kernel, btf, kaslr_offset, request_id, &req.entries)
        }
        KernelOpDirection::Read => {
            dispatch_read_batch(kernel, btf, kaslr_offset, request_id, &req.entries)
        }
    }
}

fn dispatch_write_batch(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    request_id: u32,
    entries: &[KernelOpEntry],
) -> KernelOpReplyPayload {
    for (idx, entry) in entries.iter().enumerate() {
        if let Err(reason) =
            dispatch_one_write(kernel, btf, kaslr_offset, &entry.target, &entry.value)
        {
            return error_reply(request_id, format!("entry[{idx}]: {reason}"));
        }
    }
    KernelOpReplyPayload {
        request_id,
        success: true,
        reason: String::new(),
        read_values: Vec::new(),
    }
}

fn dispatch_read_batch(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    request_id: u32,
    entries: &[KernelOpEntry],
) -> KernelOpReplyPayload {
    let mut read_values: Vec<KernelOpValue> = Vec::with_capacity(entries.len());
    for (idx, entry) in entries.iter().enumerate() {
        match dispatch_one_read(kernel, btf, kaslr_offset, &entry.target, &entry.value) {
            Ok(v) => read_values.push(v),
            Err(reason) => return error_reply(request_id, format!("entry[{idx}]: {reason}")),
        }
    }
    KernelOpReplyPayload {
        request_id,
        success: true,
        reason: String::new(),
        read_values,
    }
}

fn dispatch_one_write(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    target: &KernelOpTarget,
    value: &KernelOpValue,
) -> Result<(), String> {
    let page_offset = kernel.page_offset();
    let dram_size = kernel.mem().size();
    match (target, value) {
        // Symbol writes — kernel-half guaranteed by vmlinux linker
        // convention (KernelSymbols::from_elf reads only the vmlinux
        // .symtab; built-in sections + module_alloc both land in
        // kernel-half by construction). No KVA validation needed.
        (KernelOpTarget::Symbol(name), KernelOpValue::U32(v)) => kernel
            .write_symbol_u32(name, *v)
            .map_err(|e| format!("write_symbol_u32('{name}'): {e:#}")),
        (KernelOpTarget::Symbol(name), KernelOpValue::U64(v)) => kernel
            .write_symbol_u64(name, *v)
            .map_err(|e| format!("write_symbol_u64('{name}'): {e:#}")),
        (KernelOpTarget::Symbol(name), KernelOpValue::Bytes(b)) => kernel
            .write_symbol_bytes(name, b)
            .map_err(|e| format!("write_symbol_bytes('{name}'): {e:#}")),
        (KernelOpTarget::Symbol(name), KernelOpValue::OrU32(mask)) => {
            // rmw-invariant-anchor: OrU32 RMW must run inside a
            // single dispatch_one_write invocation; the caller
            // (freeze_and_dispatch closure in mod.rs) holds the
            // freeze rendezvous open for the duration. Extracting
            // this triple into a helper invokable outside
            // dispatch_one_write would lose the rendezvous-epoch
            // coupling — the same-epoch invariant rests on the
            // dispatcher's per-entry sequential walk, not on a
            // local property of dispatch_one_write itself. See
            // KernelValue::OrU32 doc + module doc above for the
            // kernel-writer race model.
            let cur = kernel
                .read_symbol_u32(name)
                .map_err(|e| format!("read_symbol_u32('{name}') for OrU32: {e:#}"))?;
            kernel
                .write_symbol_u32(name, cur | mask)
                .map_err(|e| format!("write_symbol_u32('{name}') for OrU32: {e:#}"))
        }

        // Direct-mapped writes — validate against runtime
        // [page_offset, page_offset+dram_size) BEFORE invoking the
        // underlying write (which uses kva.wrapping_sub(page_offset)
        // with NO page-walk safety net; an out-of-range KVA wraps to
        // a huge PA that write_scalar silently no-ops on per
        // reader.rs:639-687).
        (KernelOpTarget::Direct(kva), KernelOpValue::U32(v)) => {
            validate_direct_target(*kva, 4, page_offset, dram_size)?;
            kernel.write_direct_u32(*kva, *v);
            Ok(())
        }
        (KernelOpTarget::Direct(kva), KernelOpValue::U64(v)) => {
            validate_direct_target(*kva, 8, page_offset, dram_size)?;
            kernel.write_direct_u64(*kva, *v);
            Ok(())
        }
        (KernelOpTarget::Direct(kva), KernelOpValue::Bytes(b)) => {
            validate_direct_target(*kva, b.len() as u64, page_offset, dram_size)?;
            kernel.write_direct_bytes(*kva, b);
            Ok(())
        }
        (KernelOpTarget::Direct(kva), KernelOpValue::OrU32(mask)) => {
            // rmw-invariant-anchor: see OrU32 module doc.
            validate_direct_target(*kva, 4, page_offset, dram_size)?;
            let cur = kernel.read_direct_u32(*kva);
            kernel.write_direct_u32(*kva, cur | mask);
            Ok(())
        }

        // Vmalloc/vmap writes (page-table walked; Option on unmapped)
        // — validate against KERNEL_HALF_CONSERVATIVE_5LEVEL (loose 5-level
        // conservative bound; page-walk catches non-canonical-hole
        // + unmapped via Option::None safety net).
        (KernelOpTarget::Kva(kva), KernelOpValue::U32(v)) => {
            validate_kva_target(*kva, 4)?;
            kernel
                .write_kva_u32(*kva, *v)
                .ok_or_else(|| format!("write_kva_u32({kva:#x}): page unmapped"))
        }
        (KernelOpTarget::Kva(kva), KernelOpValue::U64(v)) => {
            validate_kva_target(*kva, 8)?;
            kernel
                .write_kva_u64(*kva, *v)
                .ok_or_else(|| format!("write_kva_u64({kva:#x}): page unmapped"))
        }
        (KernelOpTarget::Kva(kva), KernelOpValue::Bytes(b)) => {
            validate_kva_target(*kva, b.len() as u64)?;
            kernel
                .write_kva_bytes_chunked(*kva, b)
                .ok_or_else(|| format!("write_kva_bytes_chunked({kva:#x}): page unmapped or short"))
        }
        (KernelOpTarget::Kva(kva), KernelOpValue::OrU32(mask)) => {
            // rmw-invariant-anchor: see OrU32 module doc.
            validate_kva_target(*kva, 4)?;
            let cur = kernel
                .read_kva_u32(*kva)
                .ok_or_else(|| format!("read_kva_u32({kva:#x}) for OrU32: page unmapped"))?;
            kernel
                .write_kva_u32(*kva, cur | mask)
                .ok_or_else(|| format!("write_kva_u32({kva:#x}) for OrU32: page unmapped"))
        }

        // Per-CPU field — resolve symbol KVA + __per_cpu_offset[cpu]
        // arithmetic + BTF nested-path field offset, then write at the
        // per-CPU instance PA. See [`dispatch_per_cpu_field_write`].
        // Cold-path freeze rendezvous gives the atomicity contract
        // shared by every dispatcher arm.
        (KernelOpTarget::PerCpuField { symbol, field, cpu }, value) => {
            dispatch_per_cpu_field_write(kernel, btf, kaslr_offset, symbol, field, *cpu, value)
        }

        // Per-task field — SCX-managed tasks only. Walks
        // `init_task.tasks` (leaders) plus each leader's
        // `signal->thread_head` (threads) to find the task with
        // matching pid AND matching start_time identity
        // (anti-PID-reuse). Runs the 8-layer validation chain
        // (pid, start_time, lifetime, on_rq, scx queued-empty,
        // ext_sched_class, start_boottime), then
        // resolves the dot-separated nested field path via BTF and
        // writes at task_pa + field_offset. Cold-path freeze
        // rendezvous gives us the atomicity contract — every vCPU
        // parked at SIGRTMIN delivery, no concurrent task migration
        // / state transition can race the validate→write sequence.
        // See [`dispatch_task_field_write`] for the full chain.
        (
            KernelOpTarget::TaskField {
                pid,
                expected_start_time_ns,
                field,
            },
            value,
        ) => dispatch_task_field_write(
            kernel,
            btf,
            kaslr_offset,
            *pid,
            *expected_start_time_ns,
            field,
            value,
        ),
    }
}

fn dispatch_one_read(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    target: &KernelOpTarget,
    width_hint: &KernelOpValue,
) -> Result<KernelOpValue, String> {
    let page_offset = kernel.page_offset();
    let dram_size = kernel.mem().size();
    match (target, width_hint) {
        // Symbol reads — kernel-half guaranteed by vmlinux .symtab
        // linker convention (see write-side note for full rationale).
        (KernelOpTarget::Symbol(name), KernelOpValue::U32(_)) => kernel
            .read_symbol_u32(name)
            .map(KernelOpValue::U32)
            .map_err(|e| format!("read_symbol_u32('{name}'): {e:#}")),
        (KernelOpTarget::Symbol(name), KernelOpValue::U64(_)) => kernel
            .read_symbol_u64(name)
            .map(KernelOpValue::U64)
            .map_err(|e| format!("read_symbol_u64('{name}'): {e:#}")),
        (KernelOpTarget::Symbol(name), KernelOpValue::Bytes(placeholder)) => kernel
            .read_symbol_bytes(name, placeholder.len())
            .map(KernelOpValue::Bytes)
            .map_err(|e| format!("read_symbol_bytes('{name}', {}): {e:#}", placeholder.len())),

        // Direct-mapped reads — validate against runtime
        // [page_offset, page_offset+dram_size); read_direct_*
        // shares the same wrapping-sub PA derivation as the write
        // path and would silently return [0; N] on out-of-range.
        (KernelOpTarget::Direct(kva), KernelOpValue::U32(_)) => {
            validate_direct_target(*kva, 4, page_offset, dram_size)?;
            Ok(KernelOpValue::U32(kernel.read_direct_u32(*kva)))
        }
        (KernelOpTarget::Direct(kva), KernelOpValue::U64(_)) => {
            validate_direct_target(*kva, 8, page_offset, dram_size)?;
            Ok(KernelOpValue::U64(kernel.read_direct_u64(*kva)))
        }
        (KernelOpTarget::Direct(kva), KernelOpValue::Bytes(placeholder)) => {
            validate_direct_target(*kva, placeholder.len() as u64, page_offset, dram_size)?;
            Ok(KernelOpValue::Bytes(
                kernel.read_direct_bytes(*kva, placeholder.len()),
            ))
        }

        // Vmalloc/vmap reads — validate against KERNEL_HALF_CONSERVATIVE_5LEVEL
        // (page-walk safety net handles non-canonical-hole + unmapped).
        (KernelOpTarget::Kva(kva), KernelOpValue::U32(_)) => {
            validate_kva_target(*kva, 4)?;
            kernel
                .read_kva_u32(*kva)
                .map(KernelOpValue::U32)
                .ok_or_else(|| format!("read_kva_u32({kva:#x}): page unmapped"))
        }
        (KernelOpTarget::Kva(kva), KernelOpValue::U64(_)) => {
            validate_kva_target(*kva, 8)?;
            kernel
                .read_kva_u64(*kva)
                .map(KernelOpValue::U64)
                .ok_or_else(|| format!("read_kva_u64({kva:#x}): page unmapped"))
        }
        (KernelOpTarget::Kva(kva), KernelOpValue::Bytes(placeholder)) => {
            validate_kva_target(*kva, placeholder.len() as u64)?;
            kernel
                .read_kva_bytes_chunked(*kva, placeholder.len())
                .map(KernelOpValue::Bytes)
                .ok_or_else(|| {
                    format!(
                        "read_kva_bytes_chunked({kva:#x}, {}): page unmapped or short",
                        placeholder.len()
                    )
                })
        }

        // Per-CPU field — same symbol + offset + BTF resolution as
        // the write side, then read U32 or U64 at the resolved PA.
        // See [`dispatch_per_cpu_field_read`].
        (KernelOpTarget::PerCpuField { symbol, field, cpu }, width_hint) => {
            dispatch_per_cpu_field_read(kernel, btf, kaslr_offset, symbol, field, *cpu, width_hint)
        }

        // Per-task field — same walker + 8-layer validation as the
        // write side, then read at the task_pa + nested-BTF field
        // offset. The width_hint variant determines whether we return
        // a U32 or U64. Cold-path freeze guarantee from
        // [`dispatch_one_write`]'s TaskField comment applies here too:
        // every vCPU parked, no concurrent mutator.
        (
            KernelOpTarget::TaskField {
                pid,
                expected_start_time_ns,
                field,
            },
            width_hint,
        ) => dispatch_task_field_read(
            kernel,
            btf,
            kaslr_offset,
            *pid,
            *expected_start_time_ns,
            field,
            width_hint,
        ),

        // OrU32 width hint is wire-format misuse on the read side —
        // it carries a mask, not a width, and has no read semantics.
        (_, KernelOpValue::OrU32(mask)) => Err(oru32_read_rejection_reason(*mask)),
    }
}

/// Hardcoded `{per-CPU symbol → struct name}` mapping. The
/// `KernelOpTarget::PerCpuField` wire variant carries the symbol
/// name but not the struct type the symbol is an instance of; this
/// helper bridges the gap so [`nested_member_byte_offset`] can
/// resolve the field offset against the correct BTF struct.
///
/// v1 set tracks the per-CPU symbols ktstr resolves in
/// [`crate::monitor::symbols::KernelSymbols`]: `runqueues` → `rq`,
/// `kernel_cpustat` → `kernel_cpustat`, `kstat` → `kernel_stat`,
/// `tick_cpu_sched` → `tick_sched`. Adding a per-CPU symbol to the
/// dispatcher requires an entry here AND the symbol resolution in
/// `KernelSymbols::from_elf`.
fn struct_name_for_per_cpu_symbol(symbol: &str) -> Result<&'static str, String> {
    match symbol {
        "runqueues" => Ok("rq"),
        "kernel_cpustat" => Ok("kernel_cpustat"),
        "kstat" => Ok("kernel_stat"),
        "tick_cpu_sched" => Ok("tick_sched"),
        _ => Err(format!(
            "PerCpuField: unknown per-CPU symbol '{symbol}' (v1 supports: \
             runqueues, kernel_cpustat, kstat, tick_cpu_sched); extend \
             struct_name_for_per_cpu_symbol + KernelSymbols::from_elf to add"
        )),
    }
}

/// Resolve a `PerCpuField` target to its guest-memory PA. Shared
/// between the write and read dispatcher arms.
///
/// Steps: look up the symbol's struct type via
/// [`struct_name_for_per_cpu_symbol`]; resolve the symbol's template
/// KVA via [`crate::monitor::guest::GuestKernel::symbol_kva`]; read
/// `__per_cpu_offset[cpu]` from guest memory; compute the per-CPU
/// instance KVA via [`crate::monitor::symbols::per_cpu_kva`]; resolve
/// the field's byte offset within the struct via
/// [`nested_member_byte_offset`]; translate the per-CPU instance KVA
/// to PA via [`translate_any_kva`]; return PA + field_off.
///
/// **KASLR-on contract**: `kaslr_offset` is the runtime virt-KASLR
/// slide produced by the freeze coordinator's
/// `coord_kaslr_offset()` accessor (snapshot of the
/// `kern_virt_kaslr` Arc published by the MSR_LSTAR-derive at
/// `mod.rs:10843-10854` AND/OR the KERN_ADDRS `_text` path at
/// `dispatch.rs:388-396`). Both publishers converge on the same Arc
/// via CAS; the accessor's `saturating_sub(1)` bias yields 0 when
/// (a) not yet published (boot-race window) or (b) published as 0
/// (nokaslr cmdline / `#[ktstr_test(kaslr = false)]`). Passing
/// 0 collapses `per_cpu_kva` to the link-time identity — correct
/// for the nokaslr case, silently wrong for "not yet published"
/// (downstream `translate_any_kva` then bounds-rejects to None,
/// producing a typed `"per_cpu_kva={kva:#x} unmapped"` reply error
/// — fail-loud, not silent corruption).
fn resolve_per_cpu_field_pa(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    symbol: &str,
    field: &str,
    cpu: u32,
) -> Result<usize, String> {
    let btf = btf.ok_or_else(|| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: BTF not loaded in this \
             coordinator — cannot resolve struct layout (vmlinux must carry \
             CONFIG_DEBUG_INFO_BTF=y output)"
        )
    })?;

    let struct_name = struct_name_for_per_cpu_symbol(symbol)?;

    let template_kva = kernel.symbol_kva(symbol).ok_or_else(|| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: '{symbol}' symbol absent \
             from vmlinux symtab"
        )
    })?;

    let per_cpu_offset_array_kva = kernel.symbol_kva("__per_cpu_offset").ok_or_else(|| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: '__per_cpu_offset' symbol \
             absent — kernel built without SMP"
        )
    })?;
    let per_cpu_offset_array_pa = kernel.text_kva_to_pa(per_cpu_offset_array_kva);
    let per_cpu_offset = kernel
        .mem()
        .read_u64(per_cpu_offset_array_pa, (cpu as usize) * 8);
    if per_cpu_offset == 0 && cpu > 0 {
        return Err(format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: __per_cpu_offset[{cpu}]=0 \
             (cpu beyond nr_cpu_ids; kernel zero-init slot)"
        ));
    }

    // per_cpu_kva formula: template_kva + kaslr_offset + per_cpu_offset.
    // kaslr_offset comes from the caller-threaded `coord_kaslr_offset()`
    // snapshot of the kern_virt_kaslr Arc — see the function-level doc
    // above for the publisher chain + nokaslr semantics.
    let per_cpu_kva =
        crate::monitor::symbols::per_cpu_kva(template_kva, kaslr_offset, per_cpu_offset);
    // Reject a per-CPU KVA that fell outside the kernel half — a
    // wrapping_add overflow (template_kva + kaslr_offset +
    // per_cpu_offset wrapping past u64::MAX) OR a wildly wrong
    // template_kva (broken symtab) lands here. Without this guard,
    // the wrong KVA could translate to a valid-but-wrong guest page
    // and produce silent garbage; with it, the typed reply error
    // surfaces the failure loud.
    //
    // Floor: the kernel's own `page_offset` — every per-CPU area
    // (first chunk in the direct mapping, subsequent chunks in
    // vmalloc) lives at or above this address, so any value below
    // it is a wrap or broken-template artifact. The kernel publishes
    // its runtime `PAGE_OFFSET` through `kernel.walk_context()` (the
    // monitor reads it from `init_mm.pgd`'s neighborhood at boot),
    // which means the floor adapts to whichever paging mode the
    // guest booted with:
    //   - x86_64 4-level: 0xffff_8880_0000_0000
    //   - x86_64 5-level: 0xff11_0000_0000_0000
    //   - aarch64 VA_BITS=48: 0xffff_0000_0000_0000
    //   - aarch64 VA_BITS=52: 0xfff0_0000_0000_0000
    // A hardcoded 0xffff_0000_0000_0000 would incorrectly reject
    // valid 5-level x86_64 direct-mapping addresses (where per-CPU
    // areas land below the 4-level threshold).
    let kernel_half_floor = kernel.walk_context().page_offset;
    if per_cpu_kva < kernel_half_floor {
        return Err(format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: per_cpu_kva={per_cpu_kva:#x} \
             below kernel page_offset ({kernel_half_floor:#x}) — arithmetic wrap \
             or broken template KVA \
             (template={template_kva:#x} + kaslr={kaslr_offset:#x} + \
             per_cpu_off={per_cpu_offset:#x})"
        ));
    }

    let (struct_t, _) = find_struct(btf, struct_name).map_err(|e| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: 'struct {struct_name}' BTF \
             lookup: {e:#}"
        )
    })?;
    let field_off = nested_member_byte_offset(btf, &struct_t, field).map_err(|e| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: BTF nested-offset for \
             '{field}' within '{struct_name}': {e:#}"
        )
    })?;

    let walk = kernel.walk_context();
    let pa = translate_any_kva(
        kernel.mem(),
        walk.cr3_pa,
        walk.page_offset,
        per_cpu_kva,
        walk.l5,
        walk.tcr_el1,
    )
    .ok_or_else(|| {
        format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: per_cpu_kva={per_cpu_kva:#x} \
             unmapped (translate_any_kva returned None)"
        )
    })?;

    Ok((pa + field_off as u64) as usize)
}

/// PerCpuField write — resolve PA + field_off, then write the value.
/// `OrU32` is supported as a read-modify-write under the same
/// freeze-rendezvous-epoch contract as the other dispatcher arms (see
/// module doc + the `rmw-invariant-anchor` comments).
fn dispatch_per_cpu_field_write(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    symbol: &str,
    field: &str,
    cpu: u32,
    value: &KernelOpValue,
) -> Result<(), String> {
    let pa = resolve_per_cpu_field_pa(kernel, btf, kaslr_offset, symbol, field, cpu)? as u64;
    match value {
        KernelOpValue::U32(v) => {
            kernel.mem().write_u32(pa, 0, *v);
            Ok(())
        }
        KernelOpValue::U64(v) => {
            kernel.mem().write_u64(pa, 0, *v);
            Ok(())
        }
        KernelOpValue::OrU32(mask) => {
            // rmw-invariant-anchor: see OrU32 module doc.
            let cur = kernel.mem().read_u32(pa, 0);
            kernel.mem().write_u32(pa, 0, cur | mask);
            Ok(())
        }
        KernelOpValue::Bytes(_) => Err(format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: Bytes write not supported \
             (per-CPU scheduler fields are scalars)"
        )),
    }
}

/// PerCpuField read — same PA resolution as the write side, then
/// read U32 or U64 at the resolved PA (width_hint variant picks
/// which).
fn dispatch_per_cpu_field_read(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    symbol: &str,
    field: &str,
    cpu: u32,
    width_hint: &KernelOpValue,
) -> Result<KernelOpValue, String> {
    let pa = resolve_per_cpu_field_pa(kernel, btf, kaslr_offset, symbol, field, cpu)? as u64;
    match width_hint {
        KernelOpValue::U32(_) => Ok(KernelOpValue::U32(kernel.mem().read_u32(pa, 0))),
        KernelOpValue::U64(_) => Ok(KernelOpValue::U64(kernel.mem().read_u64(pa, 0))),
        KernelOpValue::Bytes(_) => Err(format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: Bytes read not supported"
        )),
        KernelOpValue::OrU32(_) => Err(format!(
            "PerCpuField {symbol}.{field}[cpu={cpu}]: OrU32 has no read semantic"
        )),
    }
}

/// Width of the start-time identity tolerance window used by L2 of
/// [`validate_task_for_field_op`]: the conservative maximum of
/// `1e9 / sysconf(_SC_CLK_TCK)` across typical configurations.
///
/// The test author's `expected_start_time_ns` is computed from
/// `/proc/<pid>/stat` field 22 (`man 5 proc` "starttime"), which the
/// kernel emits in CLK_TCK ticks — typically 10ms for `CLK_TCK=100`
/// (USER_HZ on x86_64 default kernels). The kernel's
/// `task->start_time` carries the exact `ktime_get_ns()` value, so
/// the userspace-derived `expected_start_time_ns` is always
/// ROUNDED DOWN to a tick boundary while the kernel's stored value
/// has sub-tick precision. Without a window, every TaskField op
/// would fail the L2 identity check on first use.
///
/// 10ms is conservative for `CLK_TCK >= 100`. For higher CLK_TCK
/// (e.g. 1000 → 1ms tick) the window is wider than strictly
/// necessary but still narrow enough to reject PID-recycled tasks
/// — the kernel does not recycle a freed PID within 10ms of the
/// original task's exit under normal scheduling pressure (the
/// allocator advances the PID counter monotonically and wraps
/// after `pid_max` ≈ 2^22 entries).
const START_TIME_PROC_TICK_NS: u64 = 10_000_000;

/// BTF-derived byte offsets needed by the 8-layer task validation in
/// [`validate_task_for_field_op`] plus the per-thread walker in
/// [`find_task_by_pid`]. Resolved once per `TaskField` dispatch via
/// [`Self::resolve_from_btf`] (which calls
/// [`nested_member_byte_offset`] on `struct task_struct` for each
/// member, and on `struct signal_struct` for the thread-head
/// linkage).
///
/// Field semantics:
/// - `pid`: `task_struct.pid` (`pid_t`, kernel-side `int` = 4 bytes,
///   `include/linux/sched.h`). L1 pid-equality check.
/// - `start_time`: `task_struct.start_time` (`u64`, ns since boot)
///   at `include/linux/sched.h:1127`. Set ONCE at fork by
///   `copy_process` via `ktime_get_ns()`. L2 anti-PID-reuse identity
///   check.
/// - `state`: `task_struct.__state` (`unsigned int` = 4 bytes) at
///   `include/linux/sched.h:828`. L3 `state & TASK_DEAD` bit-test.
/// - `on_rq`: `task_struct.on_rq` (`int` = 4 bytes) at
///   `include/linux/sched.h:864`. NOT in `sched_entity` — directly
///   on task_struct. Per `task_on_rq_queued` semantics the value is
///   0 when the task is sleeping (the L4 invariant).
/// - `scx_dsq`: `task_struct.scx.dsq` (`struct scx_dispatch_q *` =
///   8 bytes) — nested through `task_struct.scx` + offset of `dsq`
///   in `sched_ext_entity` (`include/linux/sched/ext.h:211`). NULL
///   when task is not queued in any SCX DSQ (L5 part 1).
/// - `scx_runnable_node`: `task_struct.scx.runnable_node`
///   (`struct list_head`) — nested through `task_struct.scx` +
///   offset of `runnable_node` in `sched_ext_entity`
///   (`include/linux/sched/ext.h:227`, `/* rq->scx.runnable_list */`).
///   Empty (next == &self) when task is NOT linked into any per-rq
///   runnable_list. Independent of `scx.dsq` per
///   `include/linux/sched/ext.h` (L5 part 2).
/// - `sched_class`: `task_struct.sched_class`
///   (`const struct sched_class *` = 8 bytes) at sched.h:878.
///   Pointer identity-compared against `ext_sched_class` KVA for
///   the L6 SCX-only check.
/// - `start_boottime`: `task_struct.start_boottime` (`u64` = 8 bytes)
///   at sched.h:1130 ("Boot based time in nsecs"). Set by `copy_process`
///   at fork via `ktime_get_boottime_ns()`. L8 anti-slab-recycle.
/// - `tasks`: `task_struct.tasks` (`struct list_head` = 16 bytes,
///   only the .next offset matters) at sched.h:954. Used by the
///   leader walker for `container_of` math anchored at `init_task.tasks`.
/// - `signal`: `task_struct.signal` (`struct signal_struct *` = 8
///   bytes). Per-leader pointer; the leader's signal struct holds
///   the `thread_head` list anchor for per-thread iteration.
/// - `signal_thread_head`: offset of `thread_head` (`struct list_head`)
///   within `struct signal_struct`. Combined with the dereferenced
///   `signal` pointer to address the per-thread list anchor.
/// - `thread_node`: `task_struct.thread_node` (`struct list_head`) at
///   sched.h:1094. Per-task linkage into `signal->thread_head`.
///   Used by the per-thread walker for `container_of` math.
struct TaskValidationOffsets {
    pid: usize,
    start_time: usize,
    state: usize,
    on_rq: usize,
    scx_dsq: usize,
    scx_runnable_node: usize,
    sched_class: usize,
    start_boottime: usize,
    tasks: usize,
    signal: usize,
    signal_thread_head: usize,
    thread_node: usize,
}

impl TaskValidationOffsets {
    /// Resolve every offset via BTF. A missing field in the kernel's
    /// task_struct or signal_struct BTF returns a typed error naming
    /// the missing field.
    fn resolve_from_btf(btf: &Btf) -> Result<Self, String> {
        let (task_struct_t, _) = find_struct(btf, "task_struct")
            .map_err(|e| format!("BTF: 'struct task_struct' lookup: {e:#}"))?;
        let task_resolve = |path: &str| -> Result<usize, String> {
            nested_member_byte_offset(btf, &task_struct_t, path)
                .map_err(|e| format!("BTF: task_struct.{path} offset: {e:#}"))
        };
        let (signal_struct_t, _) = find_struct(btf, "signal_struct")
            .map_err(|e| format!("BTF: 'struct signal_struct' lookup: {e:#}"))?;
        let signal_thread_head = nested_member_byte_offset(btf, &signal_struct_t, "thread_head")
            .map_err(|e| format!("BTF: signal_struct.thread_head offset: {e:#}"))?;
        Ok(Self {
            pid: task_resolve("pid")?,
            start_time: task_resolve("start_time")?,
            state: task_resolve("__state")?,
            on_rq: task_resolve("on_rq")?,
            scx_dsq: task_resolve("scx.dsq")?,
            scx_runnable_node: task_resolve("scx.runnable_node")?,
            sched_class: task_resolve("sched_class")?,
            start_boottime: task_resolve("start_boottime")?,
            tasks: task_resolve("tasks")?,
            signal: task_resolve("signal")?,
            signal_thread_head,
            thread_node: task_resolve("thread_node")?,
        })
    }
}

/// Walk the kernel's global task list anchored at `init_task.tasks`,
/// PLUS each leader's per-signal `thread_head`, returning the KVA
/// of the `task_struct` whose `pid` matches `target_pid`. Bounded by
/// [`MAX_TASK_WALKER_NODES`] across BOTH walks combined to defend
/// against a corrupt list chain.
///
/// Two-tier walk:
///
/// 1. **Leaders** — `init_task.tasks` is the `LIST_HEAD` anchor for
///    the `for_each_process` macro at `include/linux/sched/signal.h`
///    L638-640:
///    ```text
///    #define for_each_process(p) \
///        for (p = &init_task ; (p = next_task(p)) != &init_task ; )
///    ```
///    where `next_task(p) = list_entry(p->tasks.next, struct
///    task_struct, tasks)`. The walker starts at
///    `init_task.tasks.next`, container_of-decodes each list_head
///    back to its enclosing `task_struct` (a thread-group leader),
///    and terminates when the chain returns to the head.
///
/// 2. **Threads** — for each leader, walk
///    `leader->signal->thread_head` per the `for_each_thread` macro
///    at the same header L654-659. Per-task linkage is
///    `task_struct.thread_node`. Container_of math:
///    `thread_kva = thread_node_kva - offsetof(task_struct,
///    thread_node)`.
///
/// `init_task` is `pid = 0` and is intentionally NOT yielded by
/// `for_each_process` (the macro skips the head). We additionally
/// EXPLICITLY reject any candidate whose task_kva equals
/// `init_task_kva` as defense-in-depth: if a future kernel reshapes
/// the list invariants, init_task must never land in our candidate
/// set.
///
/// Returns:
/// - `Ok(task_kva)` when a matching pid is found (leader OR
///   non-leader thread).
/// - `Err(reason)` on: empty list, unmapped list-head bytes,
///   walker cap exceeded, unmapped intermediate node (chain broken),
///   pid not found, or attempt to match init_task itself.
fn find_task_by_pid(
    kernel: &GuestKernel,
    init_task_kva: u64,
    offs: &TaskValidationOffsets,
    target_pid: u32,
) -> Result<u64, String> {
    let mem = kernel.mem();
    let walk = kernel.walk_context();
    let pid_off = offs.pid;
    let tasks_off = offs.tasks;
    let signal_off = offs.signal;
    let signal_thread_head_off = offs.signal_thread_head;
    let thread_node_off = offs.thread_node;

    // init_task.tasks anchor lives in .data (init_task is a static
    // global at init/init_task.c:96), so text_kva_to_pa is the right
    // translation. List nodes (task_struct) live in slab and use
    // translate_any_kva.
    let head_kva = init_task_kva.checked_add(tasks_off as u64).ok_or_else(|| {
        format!(
            "find_task_by_pid: head_kva overflow init_task={init_task_kva:#x} + \
             tasks_off={tasks_off}"
        )
    })?;
    let head_pa = kernel.text_kva_to_pa(head_kva);

    // list_head.next is the first u64 in the list_head struct.
    let mut node_kva = mem.read_u64(head_pa, 0);
    if node_kva == 0 {
        return Err(format!(
            "find_task_by_pid: init_task.tasks.next read as 0 at head_pa={head_pa:#x} \
             — head bytes unmapped or torn read"
        ));
    }
    if node_kva == head_kva {
        return Err(format!(
            "find_task_by_pid: init_task.tasks is empty (head.next == head) — \
             no user tasks exist; cannot resolve pid={target_pid}"
        ));
    }

    let mut visited: u32 = 0;

    // Tier 1: walk leaders via init_task.tasks.
    while node_kva != head_kva {
        if visited >= MAX_TASK_WALKER_NODES {
            return Err(format!(
                "find_task_by_pid: walker cap {MAX_TASK_WALKER_NODES} exceeded \
                 scanning for pid={target_pid} (visited={visited}); list may be \
                 corrupted (cycle) or pid_max exceeded the cap"
            ));
        }
        visited += 1;

        // container_of: task_kva = list_node_kva - offsetof(task, tasks).
        let leader_kva = node_kva.wrapping_sub(tasks_off as u64);

        // Defense-in-depth: reject init_task even if somehow it
        // leaked into the candidate set. for_each_process skips the
        // head by construction, but defensive reject catches future
        // kernel reshapes or corrupt-chain races.
        if leader_kva == init_task_kva {
            return Err(format!(
                "find_task_by_pid: candidate task_kva={leader_kva:#x} equals \
                 init_task_kva={init_task_kva:#x} (pid=0 swapper); init_task \
                 is not a writable target"
            ));
        }

        let Some(leader_pa) = translate_any_kva(
            mem,
            walk.cr3_pa,
            walk.page_offset,
            leader_kva,
            walk.l5,
            walk.tcr_el1,
        ) else {
            return Err(format!(
                "find_task_by_pid: leader task_kva={leader_kva:#x} unmapped \
                 (visited={visited}); task_struct slab page not present in guest memory"
            ));
        };

        let leader_pid = mem.read_u32(leader_pa, pid_off);
        if leader_pid == target_pid {
            return Ok(leader_kva);
        }

        // Tier 2: walk this leader's threads via signal->thread_head.
        // The signal pointer is at `signal_off` within task_struct;
        // dereference to get signal_struct KVA; thread_head list_head
        // is at `signal_thread_head_off` within signal_struct.
        let signal_kva = mem.read_u64(leader_pa, signal_off);
        if signal_kva != 0 {
            let thread_head_kva = signal_kva.wrapping_add(signal_thread_head_off as u64);
            if let Some(thread_head_pa) = translate_any_kva(
                mem,
                walk.cr3_pa,
                walk.page_offset,
                thread_head_kva,
                walk.l5,
                walk.tcr_el1,
            ) {
                let mut thread_node_kva = mem.read_u64(thread_head_pa, 0);
                while thread_node_kva != 0 && thread_node_kva != thread_head_kva {
                    if visited >= MAX_TASK_WALKER_NODES {
                        return Err(format!(
                            "find_task_by_pid: walker cap {MAX_TASK_WALKER_NODES} \
                             exceeded inside thread-group of leader_pid={leader_pid} \
                             scanning for pid={target_pid}"
                        ));
                    }
                    visited += 1;

                    let thread_kva = thread_node_kva.wrapping_sub(thread_node_off as u64);

                    // The leader's thread_node is also on this list
                    // — skip it (already checked as leader above).
                    if thread_kva != leader_kva {
                        let Some(thread_pa) = translate_any_kva(
                            mem,
                            walk.cr3_pa,
                            walk.page_offset,
                            thread_kva,
                            walk.l5,
                            walk.tcr_el1,
                        ) else {
                            // Skip this thread on translate failure
                            // rather than aborting the whole walk —
                            // partial visibility is better than none.
                            // Advance via the node, not the task.
                            let Some(thread_node_pa) = translate_any_kva(
                                mem,
                                walk.cr3_pa,
                                walk.page_offset,
                                thread_node_kva,
                                walk.l5,
                                walk.tcr_el1,
                            ) else {
                                break; // can't advance — break inner loop
                            };
                            thread_node_kva = mem.read_u64(thread_node_pa, 0);
                            continue;
                        };

                        let thread_pid = mem.read_u32(thread_pa, pid_off);
                        if thread_pid == target_pid {
                            return Ok(thread_kva);
                        }
                    }

                    // Advance to next thread via thread_node.next.
                    let next_kva = mem.read_u64(
                        thread_pa_or_node(
                            mem,
                            walk.cr3_pa,
                            walk.page_offset,
                            walk.l5,
                            walk.tcr_el1,
                            thread_kva,
                            thread_node_kva,
                            thread_node_off,
                        ),
                        0,
                    );
                    if next_kva == 0 {
                        break; // chain broken — break inner loop
                    }
                    thread_node_kva = next_kva;
                }
            }
        }

        // Advance to next leader via this leader's tasks.next.
        let next_kva = mem.read_u64(leader_pa, tasks_off);
        if next_kva == 0 {
            return Err(format!(
                "find_task_by_pid: list_head.next read as 0 at leader_kva={leader_kva:#x} \
                 (visited={visited}); chain broken before finding pid={target_pid}"
            ));
        }
        node_kva = next_kva;
    }

    Err(format!(
        "find_task_by_pid: pid={target_pid} not found in init_task.tasks \
         or any leader's signal->thread_head (visited={visited} entries across \
         leaders + threads)"
    ))
}

/// Resolve the PA holding a thread_node's .next pointer. Used by the
/// per-thread walker to advance after a successful task_pa
/// translation: prefer reading via task_pa + thread_node_off (one
/// translate already paid for); fall back to translating node_kva
/// directly when task_pa is unavailable.
#[allow(clippy::too_many_arguments)]
fn thread_pa_or_node(
    mem: &crate::monitor::reader::GuestMem,
    cr3_pa: u64,
    page_offset: u64,
    l5: bool,
    tcr_el1: u64,
    thread_kva: u64,
    thread_node_kva: u64,
    thread_node_off: usize,
) -> u64 {
    if let Some(task_pa) = translate_any_kva(mem, cr3_pa, page_offset, thread_kva, l5, tcr_el1) {
        task_pa + thread_node_off as u64
    } else {
        translate_any_kva(mem, cr3_pa, page_offset, thread_node_kva, l5, tcr_el1).unwrap_or(0)
    }
}

/// Eight-layer task validation chain. Run AFTER the walker locates
/// the candidate task_struct and BEFORE any field write. Every layer
/// reads from guest memory at the candidate `task_pa` and rejects
/// with a typed error naming the specific layer + observed value.
///
/// Layer order (fail-fast, cheapest first):
/// 1. **pid match**: `task->pid == target_pid`. Defense against
///    slab-recycle where the freed task_struct's memory was reused
///    for another task with a different pid. Also a sanity check on
///    the walker.
/// 2. **start_time identity**: `task->start_time in
///    [expected_start_time_ns, expected_start_time_ns +
///    START_TIME_PROC_TICK_NS)`. The kernel sets `start_time` once
///    at fork via `ktime_get_ns()` in `kernel/fork.c::copy_process`
///    with full nanosecond precision; the value never changes after
///    that. The only userspace-visible source for that field is
///    `/proc/<pid>/stat` field 22, which the kernel emits in clock
///    ticks (1 / `sysconf(_SC_CLK_TCK)`) — typically 10ms — so the
///    test author's `expected_start_time_ns` is always quantized
///    DOWN to a tick boundary while the kernel's `task->start_time`
///    carries the exact ns. Accepting a tick-window (10ms — the
///    conservative max for `CLK_TCK >= 100`) closes the legitimate
///    quantization gap without weakening the anti-PID-reuse defense
///    (the kernel never recycles a PID within 10ms of the original
///    task's exit under normal scheduling pressure).
///    Catches PID-reuse: if the original worker exited and the
///    kernel recycled the PID for an unrelated task, the new task's
///    `start_time` will be far outside the [+0, +tick) window of the
///    captured-at-spawn value, even when the pid matches by
///    coincidence.
/// 3. **lifetime**: `task->__state & TASK_DEAD == 0`. A task in the
///    final teardown path has the `TASK_DEAD` bit set in `__state`
///    (`include/linux/sched.h:118`); writing through it would
///    corrupt the dying-task state machine.
/// 4. **runqueue safety**: `task->on_rq == 0`. Per
///    `task_on_rq_queued` (`kernel/sched/sched.h:2399`) the value
///    is 0 when the task is sleeping. CFS's red-black tree keys on
///    `se.vruntime`; mutating it while the task is queued
///    (on_rq=TASK_ON_RQ_QUEUED=1 or TASK_ON_RQ_MIGRATING=2) corrupts
///    tree ordering.
/// 5. **SCX queued-anywhere safety**: `task->scx.dsq == NULL` AND
///    `task->scx.runnable_node` is list-empty (next == &self). The
///    `dsq` pointer (`include/linux/sched/ext.h:211`) tracks current
///    DSQ residence; the `runnable_node` (L227 `/* rq->scx.runnable_list */`)
///    tracks per-rq runnable bookkeeping INDEPENDENT of `dsq`. Both
///    must be empty to safely modify scheduler-bookkeeping fields.
/// 6. **SCX-only sched_class**: `task->sched_class ==
///    &ext_sched_class`. The dispatcher rejects non-SCX tasks
///    (fair / RT / DL / stop / idle) because EEVDF's `place_entity`
///    overwrites `se->vruntime` on enqueue (silently discarding CFS
///    seeds), RT/DL/stop/idle have different vtime semantics, and
///    SCX's `dsq_vtime` is the only host-writable preserved
///    ordering key in the modern kernel.
/// 7. (REMOVED). The previous gate required
///    `task->policy & ~SCHED_RESET_ON_FORK == SCHED_EXT` per
///    `include/uapi/linux/sched.h:121` as belt-and-suspenders for
///    L6, but it does not hold: `kernel/sched/ext.c::scx_init_task`
///    / `scx_enable_task` set `task->sched_class = &ext_sched_class`
///    when SCX takes over a fair-policy task without modifying
///    `task->policy`, so a worker forked under `SCHED_NORMAL` keeps
///    `policy=0` even after SCX claims it. L6 (sched_class pointer
///    identity) is the canonical SCX-managed gate; `policy` is
///    unreliable for that purpose. The numbering is preserved so
///    the surviving gates keep their layer labels.
/// 8. **anti slab-recycle**: `task->start_boottime != 0`. The
///    `start_boottime` field is set by `copy_process` at fork via
///    `ktime_get_boottime_ns()` (which is never 0 after boot). A
///    freshly-zeroed slab page has start_boottime=0; a live task
///    has it non-zero. Catches slab-recycle that survived L1+L2
///    (pid AND start_time match by coincidence — vanishingly
///    unlikely but defense-in-depth).
///
/// `ext_sched_class_kva` is the resolved `ext_sched_class` KVA the
/// L6 check compares against. Caller resolves via
/// `kernel.symbol_kva("ext_sched_class")`; absent symbol (kernel
/// without CONFIG_SCHED_CLASS_EXT) fails the entire dispatcher path
/// upstream — see [`resolve_and_validate_task_field`].
fn validate_task_for_field_op(
    kernel: &GuestKernel,
    task_pa: u64,
    target_pid: u32,
    expected_start_time_ns: u64,
    offs: &TaskValidationOffsets,
    ext_sched_class_kva: u64,
) -> Result<(), String> {
    let mem = kernel.mem();

    // L1: pid match (anti slab-recycle + walker sanity).
    let pid = mem.read_u32(task_pa, offs.pid);
    if pid != target_pid {
        return Err(format!(
            "validate_task: pid mismatch at task_pa={task_pa:#x} — read pid={pid}, \
             expected {target_pid} (likely slab-recycle since walker found this task)"
        ));
    }

    // L2: start_time identity (anti-PID-reuse).
    //
    // `expected_start_time_ns` is the test author's value derived
    // from /proc/<pid>/stat field 22 (jiffies-quantized: integer
    // ticks * 1e9 / CLK_TCK), so it's always ROUNDED DOWN to a
    // CLK_TCK boundary. The kernel's `task->start_time` carries
    // sub-tick precision from `ktime_get_ns()`, so the legitimate
    // value lands in `[expected, expected + CLK_TCK_NS)`. Accept
    // a 10ms window (conservative max for CLK_TCK >= 100), which
    // still rejects PID-recycled tasks whose start_time falls
    // well outside that range under normal scheduling pressure.
    let observed_start_time = mem.read_u64(task_pa, offs.start_time);
    let skew = observed_start_time.saturating_sub(expected_start_time_ns);
    if observed_start_time < expected_start_time_ns || skew >= START_TIME_PROC_TICK_NS {
        return Err(format!(
            "validate_task: task pid={target_pid} start_time identity mismatch — \
             observed={observed_start_time}ns expected in \
             [{expected_start_time_ns}, {}]ns; \
             original task exited and PID was recycled for an unrelated task",
            expected_start_time_ns + START_TIME_PROC_TICK_NS - 1
        ));
    }

    // L3: lifetime (TASK_DEAD bit not set).
    let state = mem.read_u32(task_pa, offs.state);
    if state & TASK_DEAD != 0 {
        return Err(format!(
            "validate_task: task pid={target_pid} is TASK_DEAD (state={state:#x}); \
             mid-teardown task fields unsafe to write"
        ));
    }

    // L4: runqueue safety (on_rq == 0).
    let on_rq = mem.read_u32(task_pa, offs.on_rq);
    if on_rq != 0 {
        return Err(format!(
            "validate_task: task pid={target_pid} is on_rq={on_rq} (TASK_ON_RQ_QUEUED \
             or MIGRATING); writing scheduler fields would corrupt rb-tree / DSQ \
             ordering. Test author must use a blocking workload pattern \
             (`WorkType::FutexPingPong`, `WorkType::WaitOnFutex`, `WorkType::Sleep`) \
             so the worker is sleeping at cold-op time"
        ));
    }

    // L5: SCX queued-anywhere safety (scx.dsq == NULL AND scx.runnable_node empty).
    let scx_dsq_ptr = mem.read_u64(task_pa, offs.scx_dsq);
    if scx_dsq_ptr != 0 {
        return Err(format!(
            "validate_task: task pid={target_pid} has scx.dsq={scx_dsq_ptr:#x} (queued \
             on an SCX DSQ); modifying ordering keys while queued mangles ordering \
             per include/linux/sched/ext.h:248-254 (dsq_vtime warning). Test author \
             must use a blocking workload pattern \
             (`WorkType::FutexPingPong`, `WorkType::WaitOnFutex`, `WorkType::Sleep`)"
        ));
    }
    // scx.runnable_node is a list_head; "empty" means next == &self
    // (the KVA of the list_head itself). The list_head KVA is
    // task_kva + offsetof(task_struct, scx.runnable_node). We need
    // the task_KVA to compare; derive it from task_pa via the
    // page_offset (slab is direct-mapped).
    let task_kva = task_pa.wrapping_add(kernel.page_offset());
    let runnable_node_kva = task_kva.wrapping_add(offs.scx_runnable_node as u64);
    let runnable_node_next = mem.read_u64(task_pa, offs.scx_runnable_node);
    if runnable_node_next != 0 && runnable_node_next != runnable_node_kva {
        return Err(format!(
            "validate_task: task pid={target_pid} scx.runnable_node is linked \
             (next={runnable_node_next:#x} != self={runnable_node_kva:#x}); task is \
             on a per-rq runnable_list. Test author must use a blocking workload \
             pattern (`WorkType::FutexPingPong`, `WorkType::WaitOnFutex`, \
             `WorkType::Sleep`)"
        ));
    }

    // L6: SCX-only sched_class (must be ext_sched_class).
    let sched_class_kva = mem.read_u64(task_pa, offs.sched_class);
    if sched_class_kva != ext_sched_class_kva {
        return Err(format!(
            "validate_task: task pid={target_pid} sched_class={sched_class_kva:#x} \
             is not ext_sched_class={ext_sched_class_kva:#x}; TaskField writes target \
             SCX-managed tasks only (CFS / RT / DL / stop / idle classes have \
             different vtime semantics — EEVDF's place_entity overwrites se.vruntime \
             on enqueue, RT/DL have RT_BANDWIDTH instant-throttle hazards). Spawn \
             the worker under `SchedPolicy::Ext` to make it SCX-managed"
        ));
    }

    // L7 (REMOVED): `task->policy == SCHED_EXT` was a belt-and-
    // suspenders gate for L6 but it does not actually hold for SCX-
    // managed tasks. `kernel/sched/ext.c::scx_init_task` /
    // `scx_enable_task` set `task->sched_class = &ext_sched_class`
    // when SCX takes over a fair-policy task but does NOT modify
    // `task->policy` — a worker forked under `SCHED_NORMAL` keeps
    // `policy=0` (SCHED_NORMAL) even after SCX claims it. Requiring
    // `policy == SCHED_EXT` rejects every legitimate SCX-managed
    // task that did not explicitly call `sched_setattr(SCHED_EXT)`,
    // which is the common case for ktstr's WorkloadHandle (workers
    // spawn with SchedPolicy::Normal and scx-ktstr's BPF dispatch
    // claims them). L6 (sched_class pointer identity) is the
    // canonical SCX-managed gate; the policy field is unreliable
    // for this check.
    //
    // L8: anti slab-recycle via start_boottime.
    let start_boottime = mem.read_u64(task_pa, offs.start_boottime);
    if start_boottime == 0 {
        return Err(format!(
            "validate_task: task pid={target_pid} start_boottime=0 — possibly a \
             freshly-zeroed slab page mid-slab-recycle; reject rather than risk \
             writing to dead memory"
        ));
    }

    Ok(())
}

/// Resolve TaskField context (init_task KVA, ext_sched_class KVA,
/// validation offsets) and find+validate the target task's PA.
/// Shared between the read and write dispatcher arms — both need
/// identical setup.
///
/// SCX-only: this dispatcher path is for SCX-managed tasks. The
/// `ext_sched_class` symbol is required; a kernel without
/// `CONFIG_SCHED_CLASS_EXT` fails the lookup here and the
/// dispatcher rejects the entire TaskField op.
fn resolve_and_validate_task_field(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    pid: u32,
    expected_start_time_ns: u64,
) -> Result<(u64, btf_rs::Struct), String> {
    let btf = btf.ok_or_else(|| {
        format!(
            "TaskField pid={pid}: BTF not loaded in this coordinator — cannot resolve \
             task_struct layout (vmlinux must carry CONFIG_DEBUG_INFO_BTF=y output)"
        )
    })?;
    let init_task_kva = kernel.symbol_kva("init_task").ok_or_else(|| {
        format!(
            "TaskField pid={pid}: init_task symbol absent from vmlinux symtab \
             (heavily stripped vmlinux); cannot anchor the task-list walker"
        )
    })?;
    let ext_sched_class_link_kva = kernel.symbol_kva("ext_sched_class").ok_or_else(|| {
        format!(
            "TaskField pid={pid}: ext_sched_class symbol absent from vmlinux symtab \
             (kernel built without CONFIG_SCHED_CLASS_EXT=y); TaskField writes are \
             SCX-only and require sched_ext support"
        )
    })?;
    // The vmlinux symtab carries link-time KVAs; under KASLR-on
    // the runtime `task->sched_class` carries the slide. Compare
    // apples to apples: shift the symbol KVA by the runtime
    // virt-KASLR offset before passing to the validator.
    let ext_sched_class_kva = ext_sched_class_link_kva.wrapping_add(kaslr_offset);

    let val_offs = TaskValidationOffsets::resolve_from_btf(btf)?;

    let task_kva = find_task_by_pid(kernel, init_task_kva, &val_offs, pid)?;
    let walk = kernel.walk_context();
    let task_pa = translate_any_kva(
        kernel.mem(),
        walk.cr3_pa,
        walk.page_offset,
        task_kva,
        walk.l5,
        walk.tcr_el1,
    )
    .ok_or_else(|| {
        format!(
            "TaskField pid={pid}: task_kva={task_kva:#x} unmapped at validation step \
             (slab page disappeared between walker and validator — extreme race)"
        )
    })?;

    validate_task_for_field_op(
        kernel,
        task_pa,
        pid,
        expected_start_time_ns,
        &val_offs,
        ext_sched_class_kva,
    )?;

    let (task_struct_t, _) = find_struct(btf, "task_struct")
        .map_err(|e| format!("TaskField pid={pid}: 'struct task_struct' BTF lookup: {e:#}"))?;

    Ok((task_pa, task_struct_t))
}

/// End-to-end TaskField write: resolve init_task + ext_sched_class,
/// walk leaders + threads to find task by pid + start_time identity,
/// run 8-layer validation, resolve field byte offset via BTF nested
/// path, write the value at task_pa + field_off.
fn dispatch_task_field_write(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    pid: u32,
    expected_start_time_ns: u64,
    field: &str,
    value: &KernelOpValue,
) -> Result<(), String> {
    let (task_pa, task_struct_t) =
        resolve_and_validate_task_field(kernel, btf, kaslr_offset, pid, expected_start_time_ns)?;

    // Safe to unwrap: resolve_and_validate_task_field rejected if
    // btf was None.
    let btf = btf.expect("checked in resolve_and_validate_task_field");

    let field_off = nested_member_byte_offset(btf, &task_struct_t, field).map_err(|e| {
        format!("TaskField pid={pid} field={field:?}: BTF nested-offset resolution: {e:#}")
    })?;

    match value {
        KernelOpValue::U32(v) => {
            kernel.mem().write_u32(task_pa, field_off, *v);
            Ok(())
        }
        KernelOpValue::U64(v) => {
            kernel.mem().write_u64(task_pa, field_off, *v);
            Ok(())
        }
        KernelOpValue::Bytes(_) => Err(format!(
            "TaskField pid={pid} field={field:?}: Bytes write not supported in v1 — \
             use U32 or U64 (per-task scheduler fields are scalars)"
        )),
        KernelOpValue::OrU32(_) => Err(format!(
            "TaskField pid={pid} field={field:?}: OrU32 RMW not supported on TaskField \
             in v1 (no current use case; per-task scheduler fields are scalars not flags)"
        )),
    }
}

/// End-to-end TaskField read: same walker + validation as the write,
/// then read U32 or U64 at task_pa + field_off (driven by width_hint
/// variant).
fn dispatch_task_field_read(
    kernel: &GuestKernel,
    btf: Option<&Btf>,
    kaslr_offset: u64,
    pid: u32,
    expected_start_time_ns: u64,
    field: &str,
    width_hint: &KernelOpValue,
) -> Result<KernelOpValue, String> {
    let (task_pa, task_struct_t) =
        resolve_and_validate_task_field(kernel, btf, kaslr_offset, pid, expected_start_time_ns)?;

    let btf = btf.expect("checked in resolve_and_validate_task_field");

    let field_off = nested_member_byte_offset(btf, &task_struct_t, field).map_err(|e| {
        format!("TaskField pid={pid} field={field:?}: BTF nested-offset resolution: {e:#}")
    })?;

    match width_hint {
        KernelOpValue::U32(_) => Ok(KernelOpValue::U32(
            kernel.mem().read_u32(task_pa, field_off),
        )),
        KernelOpValue::U64(_) => Ok(KernelOpValue::U64(
            kernel.mem().read_u64(task_pa, field_off),
        )),
        KernelOpValue::Bytes(_) => Err(format!(
            "TaskField pid={pid} field={field:?}: Bytes read not supported in v1 — \
             use U32 or U64 width hint"
        )),
        KernelOpValue::OrU32(_) => Err(format!(
            "TaskField pid={pid} field={field:?}: OrU32 has no read semantic (covered \
             by the dispatcher's read-direction catch-all but explicit here for clarity)"
        )),
    }
}

/// Build the typed-error reason for the wire-misuse case where a
/// caller routes a `KernelOpValue::OrU32(mask)` through the read
/// direction. OrU32 carries a mask (write semantics), not a width
/// hint — there is no read semantic to derive. The reason names the
/// correct read-width Rust symbol so a confused caller can fix at
/// the call site without source-diving the dispatcher.
pub(super) fn oru32_read_rejection_reason(mask: u32) -> String {
    format!(
        "OrU32(mask={mask:#x}) cannot be used as a Read width — \
         RMW is a write operation. For 32-bit reads use \
         `KernelValueWidth::u32()` instead."
    )
}

/// Frame an error reply with the failure reason truncated at
/// [`KERNEL_OP_REASON_MAX`] to keep the on-wire reply under the
/// guest's RX cap. Truncation walks back to a UTF-8 boundary so
/// `String::truncate`'s panic-on-mid-codepoint contract never trips
/// on a multi-byte reason embedding (a hostile or unicode-using
/// `req.tag` value could otherwise crash the coordinator thread —
/// the same defense the prior stub site at the freeze-coord drain
/// already used).
fn error_reply(request_id: u32, reason: String) -> KernelOpReplyPayload {
    let mut reason = reason;
    if reason.len() > KERNEL_OP_REASON_MAX {
        let cut = super::utf8_safe_truncate_len(&reason, KERNEL_OP_REASON_MAX);
        reason.truncate(cut);
    }
    KernelOpReplyPayload {
        request_id,
        success: false,
        reason,
        read_values: Vec::new(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::vmm::KERNEL_HALF_CANONICAL as KERNEL_HALF_CANONICAL_4LEVEL;

    /// Disambiguation invariant pin. The 5-level conservative threshold
    /// must be PERMISSIVELY LOWER than the 4-level canonical strict
    /// boundary — any address that satisfies the 4-level canonical
    /// check also passes the looser 5-level guard, so KASLR-on builds
    /// that use 5-level paging direct-map / vmalloc / vmemmap KVAs
    /// (which sit below the 4-level threshold but above the 5-level
    /// one) are accepted by [`validate_kva_target`] without
    /// false-rejection. A regression that flipped either value would
    /// silently break dispatch.rs's kernel-text canonical check OR
    /// make kernel_op_dispatch.rs over-permissive.
    ///
    /// `const _: () = assert!(...)` is a const-eval'd assertion that
    /// fails at COMPILE time — strictly stronger than `#[test]` (no
    /// dependency on running cargo test to bite). The collapse-rejection
    /// (`!=`) is implicit in the strict `<`.
    const _: () = assert!(
        KERNEL_HALF_CONSERVATIVE_5LEVEL < KERNEL_HALF_CANONICAL_4LEVEL,
        "5-level threshold must be permissively lower than 4-level canonical",
    );

    /// Under-cap reasons pass through unchanged.
    #[test]
    fn error_reply_passes_short_reason_unchanged() {
        let reply = error_reply(7, "short".to_string());
        assert!(!reply.success);
        assert_eq!(reply.reason, "short");
    }

    /// OrU32 on a read direction surfaces a typed error rather than
    /// silently treating it as a u32 read. Pins the wire-misuse
    /// rejection by invoking the SAME helper the production
    /// `dispatch_one_read` calls and asserting the dispatcher's
    /// error_reply propagates the helper's output verbatim
    /// (with the `entry[idx]:` prefix the batch dispatcher adds).
    /// A regression that drops the rejection, changes the format,
    /// or stops calling the helper trips here. NOT a tautology —
    /// the test does not synthesize its own copy of the format
    /// string; it consumes the production helper.
    #[test]
    fn read_direction_with_oru32_value_rejects() {
        const MASK: u32 = 1 << 5;
        const ENTRY_IDX: usize = 0;
        let helper_reason = oru32_read_rejection_reason(MASK);
        // dispatch_read_batch wraps per-entry errors as
        // `entry[N]: <reason>` (see L122). Compose what the batch
        // dispatcher would emit and pin error_reply produces it
        // unchanged.
        let batch_reason = format!("entry[{ENTRY_IDX}]: {helper_reason}");
        let reply = error_reply(99, batch_reason.clone());
        assert!(!reply.success);
        assert_eq!(reply.request_id, 99);
        assert_eq!(reply.reason, batch_reason);
        // Spot-check the helper's output names the right Rust
        // symbol (`KernelValueWidth::u32()`) so a regression that
        // pointed at the wrong symbol surfaces independently of
        // the batch-prefix machinery.
        assert!(helper_reason.contains("KernelValueWidth::u32()"));
        assert!(helper_reason.contains("OrU32"));
        assert!(helper_reason.contains(&format!("{MASK:#x}")));
    }

    /// PerCpuField unknown-symbol rejection: the hardcoded mapping at
    /// [`struct_name_for_per_cpu_symbol`] returns Err for symbols
    /// outside the v1 supported set (runqueues / kernel_cpustat /
    /// kstat / tick_cpu_sched). A regression that silently accepted
    /// an unknown symbol would silently look up a wrong BTF struct
    /// and produce wrong field offsets.
    #[test]
    fn per_cpu_field_unknown_symbol_rejected() {
        let err = struct_name_for_per_cpu_symbol("not_a_real_per_cpu_symbol")
            .expect_err("unknown symbol must reject");
        assert!(err.contains("PerCpuField"));
        assert!(err.contains("not_a_real_per_cpu_symbol"));
        // Enumerate the v1 supported set in the error to give the
        // caller an actionable next step.
        assert!(err.contains("runqueues"));
        assert!(err.contains("kernel_cpustat"));
        assert!(err.contains("kstat"));
        assert!(err.contains("tick_cpu_sched"));
    }

    /// PerCpuField known-symbol mapping: every entry in the v1
    /// supported set MUST map to the right kernel struct name. A
    /// regression that swapped the runqueues→rq mapping (e.g. typo
    /// to "rq_struct") would silently look up the wrong struct.
    #[test]
    fn per_cpu_field_known_symbol_mapping() {
        assert_eq!(struct_name_for_per_cpu_symbol("runqueues").unwrap(), "rq");
        assert_eq!(
            struct_name_for_per_cpu_symbol("kernel_cpustat").unwrap(),
            "kernel_cpustat"
        );
        assert_eq!(
            struct_name_for_per_cpu_symbol("kstat").unwrap(),
            "kernel_stat"
        );
        assert_eq!(
            struct_name_for_per_cpu_symbol("tick_cpu_sched").unwrap(),
            "tick_sched"
        );
    }

    /// 4-call-site product matrix via source-grep: each of the 4
    /// dispatch arms — Direct/Write, Direct/Read, Kva/Write, Kva/Read
    /// — MUST call validate_direct_target (Direct) or
    /// validate_kva_target (Kva) BEFORE invoking the underlying
    /// kernel.{read,write}_{direct,kva}_* function. A regression that
    /// wires validate into 3/4 sites and drops one silently re-opens
    /// the silent-data-loss class for the missing arm.
    ///
    /// Source-grep approach mirrors the marker-anchor test below:
    /// pin the structural invariant at the source level without
    /// requiring MockGuestKernel infrastructure (which doesn't exist
    /// in-tree yet).
    ///
    /// Self-match exclusion: the searched arm shape appears in this
    /// test's own docstring above + in error-message format strings
    /// below. Restrict the search to source BEFORE `#[cfg(test)]`
    /// (production code only) to avoid counting test-body matches.
    #[test]
    fn dispatch_arms_call_validate_target_helpers() {
        let full_src = include_str!("kernel_op_dispatch.rs");
        let test_mod_start = full_src
            .find("#[cfg(test)]")
            .expect("test module must exist");
        let src = &full_src[..test_mod_start];
        // Each Direct arm shape `KernelOpTarget::Direct(kva), KernelOpValue::*`
        // MUST be followed within ~10 lines by `validate_direct_target(`.
        // Each Kva arm shape `KernelOpTarget::Kva(kva), KernelOpValue::*`
        // MUST be followed within ~10 lines by `validate_kva_target(`.
        // Symbol arms are exempt (vmlinux .symtab kernel-half guarantee);
        // PerCpuField + TaskField arms are exempt (translate_any_kva
        // safety-net handles unmapped/out-of-bounds; resolve_per_cpu_field_pa
        // and find_task_by_pid produce typed errors instead of silent zeros).

        // Count Direct arms.
        let direct_arms: Vec<_> = src
            .match_indices("KernelOpTarget::Direct(kva), KernelOpValue::")
            .collect();
        // Expect 7: 4 in dispatch_one_write (U32/U64/Bytes/OrU32) +
        // 3 in dispatch_one_read (U32/U64/Bytes). OrU32 read is
        // rejected via the catch-all and doesn't have a per-target arm.
        assert_eq!(
            direct_arms.len(),
            7,
            "expected exactly 7 Direct arms (4 write + 3 read); found {}",
            direct_arms.len()
        );
        for (idx, _) in &direct_arms {
            let window_end = (idx + 400).min(src.len());
            let window = &src[*idx..window_end];
            assert!(
                window.contains("validate_direct_target("),
                "Direct arm at byte offset {idx} is missing validate_direct_target() call; \
                 window: {window:?}"
            );
        }

        // Count Kva arms.
        let kva_arms: Vec<_> = src
            .match_indices("KernelOpTarget::Kva(kva), KernelOpValue::")
            .collect();
        assert_eq!(
            kva_arms.len(),
            7,
            "expected exactly 7 Kva arms (4 write + 3 read); found {}",
            kva_arms.len()
        );
        for (idx, _) in &kva_arms {
            let window_end = (idx + 400).min(src.len());
            let window = &src[*idx..window_end];
            assert!(
                window.contains("validate_kva_target("),
                "Kva arm at byte offset {idx} is missing validate_kva_target() call; \
                 window: {window:?}"
            );
        }
    }

    // ---- UTF-8 boundary tests ----

    /// Table-driven UTF-8 boundary classes: 2-byte, 3-byte, 4-byte,
    /// BOM. Each exercises the is_char_boundary walk-back loop with a
    /// different multi-byte codepoint width.
    /// Mixed-width + pure-ASCII + empty paths are distinct from this
    /// table — they're separate tests below because their assertion
    /// shape differs (mixed-width tests walk-back regardless of width;
    /// pure-ASCII tests cap-exact length; empty tests passthrough).
    #[test]
    fn error_reply_truncates_at_utf8_boundary_classes() {
        for (cp, label, padding) in [
            // (codepoint, label for failure context, padding bytes
            // past KERNEL_OP_REASON_MAX to ensure overflow)
            ("é", "2byte_U+00E9", 4),      // U+00E9, 2 bytes (C3 A9)
            ("", "3byte_U+2603", 6),      // U+2603, 3 bytes (E2 98 83)
            ("🦀", "4byte_U+1F980", 8),    // U+1F980, 4 bytes
            ("\u{FEFF}", "BOM_U+FEFF", 6), // U+FEFF, 3 bytes (EF BB BF)
        ] {
            let mut s = String::new();
            while s.len() < KERNEL_OP_REASON_MAX + padding {
                s.push_str(cp);
            }
            let reply = error_reply(42, s);
            assert!(
                reply.reason.len() <= KERNEL_OP_REASON_MAX,
                "{label}: reason.len()={} > cap={KERNEL_OP_REASON_MAX}",
                reply.reason.len()
            );
            assert!(
                reply.reason.is_char_boundary(reply.reason.len()),
                "{label}: truncation landed mid-codepoint"
            );
            let _ = reply.reason.as_str();
        }
    }

    /// Mixed-width input: the cap position is data-dependent —
    /// exercise the is_char_boundary walk-back under all four
    /// widths (1B + 2B + 3B + 4B intermixed) in one pass.
    #[test]
    fn error_reply_truncates_mixed_width_input_at_boundary() {
        let pattern = "Aé☃🦀";
        let mut s = String::new();
        while s.len() < KERNEL_OP_REASON_MAX + 10 {
            s.push_str(pattern);
        }
        let reply = error_reply(99, s);
        assert!(reply.reason.len() <= KERNEL_OP_REASON_MAX);
        assert!(reply.reason.is_char_boundary(reply.reason.len()));
        let _ = reply.reason.as_str();
    }

    /// Pure-ASCII over-cap input: cap lands on a clean boundary
    /// (every byte is a codepoint boundary in ASCII). Tests the
    /// degenerate "walk-back of 0 bytes" path that a regression in
    /// the lower-bound condition could break.
    #[test]
    fn error_reply_truncates_pure_ascii_no_walkback() {
        let s = "A".repeat(KERNEL_OP_REASON_MAX + 16);
        let reply = error_reply(1, s);
        assert_eq!(reply.reason.len(), KERNEL_OP_REASON_MAX);
        assert!(reply.reason.is_char_boundary(reply.reason.len()));
    }

    /// Empty-string passthrough — error_reply must not crash on
    /// `is_char_boundary(0)` of an empty string. Trivial today but
    /// pins the gate's behavior so a refactor that swapped the
    /// `>` for `>=` (forcing walk-back on empty) trips here.
    #[test]
    fn error_reply_zero_length_reason_passes() {
        let reply = error_reply(2, String::new());
        assert!(!reply.success);
        assert_eq!(reply.reason, "");
    }

    mod target_validation;

    // ---- same-rendezvous-epoch marker-anchor test ----

    /// Doc-grep / marker-anchor regression test. Every
    /// OrU32 RMW site in the dispatcher MUST carry a
    /// `// rmw-invariant-anchor` comment. The same-rendezvous-epoch
    /// invariant is structural (per-entry sequential walk in
    /// dispatch_one_write), not type-enforced. A future refactor
    /// that extracts the RMW into a helper or relocates the
    /// read+OR+write triple outside dispatch_one_write breaks the
    /// invariant — this test guards against that by:
    ///   1. Asserting every OrU32 RMW pattern in the source carries
    ///      the marker.
    ///   2. Asserting the count of markers matches the count of
    ///      `KernelOpValue::OrU32` match arms in dispatch_one_write
    ///      (currently 3: Symbol, Direct, Kva).
    ///
    /// A refactor that adds a new RMW site without the marker, or
    /// moves an existing site outside dispatch_one_write, trips here.
    #[test]
    fn or_u32_rmw_anchors_inside_dispatch_one_write() {
        let full_src = include_str!("kernel_op_dispatch.rs");
        // Self-match exclusion (same approach as
        // dispatch_arms_call_validate_target_helpers): the searched
        // arm shape + `| mask)` pattern appear in this test's body.
        // Restrict to production source (before `#[cfg(test)]`).
        let test_mod_start = full_src
            .find("#[cfg(test)]")
            .expect("test module must exist");
        let src = &full_src[..test_mod_start];
        // Strict-count pin: exactly 3 production OrU32 arms.
        // Match-arm-shape `KernelOpValue::OrU32(mask)) => {` is
        // unique to the dispatch_one_write body. Catches a new 4th
        // arm AND catches removal of an existing arm.
        let arm_sites: Vec<_> = src
            .match_indices("KernelOpValue::OrU32(mask)) => {")
            .collect();
        assert_eq!(
            arm_sites.len(),
            3,
            "expected exactly 3 OrU32 write arms (Symbol/Direct/Kva); \
             found {} — if a 4th was added, add the rmw-invariant-anchor \
             comment to it AND update this expected count",
            arm_sites.len()
        );
        // Per-arm pattern pin (see also the extracted-helper pin
        // below): for every OrU32 match arm shape, the next ~400
        // bytes MUST contain a `rmw-invariant-anchor` marker.
        // Catches the refactor that adds a new OrU32 arm without
        // the marker comment.
        for (idx, _) in &arm_sites {
            let window_end = (idx + 400).min(src.len());
            let window = &src[*idx..window_end];
            assert!(
                window.contains("rmw-invariant-anchor"),
                "OrU32 arm at byte offset {idx} is missing the \
                 // rmw-invariant-anchor comment; window: {window:?}"
            );
        }
        // Extracted-helper pin: a refactor that extracts the
        // read+OR+write triple into a helper would LOSE the
        // match-arm shape but the read+OR+write pattern would still
        // exist somewhere. Search for that pattern via its signature
        // `| mask` (the OR operation distinctive to OrU32 RMW) —
        // every occurrence in the source MUST be inside
        // `dispatch_one_write` (between the `fn dispatch_one_write`
        // declaration and the next top-level `fn` after it).
        //
        // Find dispatch_one_write's body extent.
        let dow_start = src
            .find("fn dispatch_one_write(")
            .expect("dispatch_one_write must exist");
        // The body extends until the next top-level `fn` declaration
        // at the same indentation level (search for "\nfn " after
        // dow_start — module-private fns sit at column 0).
        let dow_end = src[dow_start..]
            .find("\nfn ")
            .map(|rel| dow_start + rel)
            .unwrap_or(src.len());
        // Count `| mask` occurrences globally vs inside dispatch_one_write.
        // The 3 OrU32 RMW arms each have `cur | mask` (or `cur | *mask`)
        // inside the write call.
        let global_or_mask: Vec<_> = src.match_indices("| mask").collect();
        let inside_dow: Vec<_> = global_or_mask
            .iter()
            .filter(|(idx, _)| *idx >= dow_start && *idx < dow_end)
            .collect();
        // Allow `| mask` matches in:
        //  - the 3 OrU32 RMW arms (inside dispatch_one_write)
        //  - the docstring/comment text describing the pattern (anywhere)
        // Production OR-with-mask sites OUTSIDE dispatch_one_write are
        // the refactor regression class — none should exist. Practical
        // detection: assert that every `| mask` occurrence followed
        // shortly by `)` (function-call close — the write call) is
        // inside dispatch_one_write.
        for (idx, _) in &global_or_mask {
            // Look ahead 4 bytes for `)` — if present, this is a
            // function-call argument (the production RMW write call).
            // If absent (e.g. `| mask)` appears in a doc comment with
            // surrounding prose), skip.
            let lookahead_end = (idx + 6).min(src.len());
            let lookahead = &src[*idx..lookahead_end];
            if lookahead.contains("| mask)") {
                assert!(
                    *idx >= dow_start && *idx < dow_end,
                    "Production `| mask)` OR-with-mask call at byte offset \
                     {idx} is OUTSIDE dispatch_one_write \
                     [start={dow_start}, end={dow_end}). \
                     A refactor extracted the OrU32 RMW into a helper, \
                     breaking the same-rendezvous-epoch invariant. \
                     Move it back inside dispatch_one_write OR (if \
                     intentional) update this test."
                );
            }
        }
        // Sanity: inside_dow should have exactly 3 entries (the 3 RMW
        // arms each contribute one `| mask`). Doc-comment refs add
        // more globally, but the inside-dow filter should be stable.
        assert_eq!(
            inside_dow.len(),
            3,
            "expected exactly 3 `| mask` production sites inside \
             dispatch_one_write (one per Symbol/Direct/Kva OrU32 arm); \
             found {}",
            inside_dow.len()
        );
    }

    mod task_field;
}