supermachine 0.7.70

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
// Status: minimal but FUNCTIONAL — parses TX packets, replies RST
// to any REQUEST (kernel observes a clean refusal instead of
// timing out), pushes responses back via the RX queue, raises IRQ.
// Real connection handling lives in the (next-commit) muxer.

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

use super::super::queue::{Desc, GuestMem, Queue};
use super::super::{VirtioDevice, VIRTIO_ID_VSOCK};
use super::muxer::VsockMuxer;
use super::muxer_rxq::MUXER_RXQ_SIZE;
use super::muxer_thread;
use super::packet::{Header, RxPacket, VSOCK_PKT_HDR_SIZE};

pub const RXQ_INDEX: usize = 0;
pub const TXQ_INDEX: usize = 1;
pub const EVQ_INDEX: usize = 2;

/// Backstop cap on the device's RX staging queue (`pending_rx`). Once
/// reached, `try_drain_rx` STOPS sweeping the muxer rxq, so the muxer
/// rxq fills and its producers apply back-pressure (stash + pause the
/// host-socket read → the kernel TCP-back-pressures the remote sender)
/// instead of this device buffering host→guest data without bound under
/// a stuck/slow guest. Sized to the muxer rxq cap; per-connection vsock
/// credit normally keeps both far below it.
const PENDING_RX_MAX: usize = MUXER_RXQ_SIZE;

pub struct Vsock {
    cid: u64,
    queues: Mutex<Vec<Queue>>,
    activated: std::sync::atomic::AtomicBool,
    /// Queue of RX packets waiting to land in guest descriptors.
    /// Drained on RXQ notify or after TX dispatch.
    pending_rx: Mutex<VecDeque<RxPacket>>,
    /// Cap for `pending_rx` (see [`PENDING_RX_MAX`]). A field rather than
    /// the const directly so tests can force a small value to exercise
    /// the back-pressure hand-off deterministically.
    pending_rx_cap: usize,
    irq_raise: Mutex<Option<Arc<dyn Fn() + Send + Sync>>>,
    /// Muxer that routes incoming TX → outgoing RX. Owns the
    /// connection table + TSI control state. Arc so accept-thread
    /// closures can hold references back to it.
    muxer: Arc<VsockMuxer>,
}

impl Vsock {
    pub fn new(cid: u64) -> Result<Self, muxer_thread::StartError> {
        Self::with_tsi_token(cid, None)
    }

    /// Like [`Vsock::new`] but pre-arms the muxer with a TSI
    /// control-channel auth token. Mismatched / unprefixed control
    /// DGRAMs are silently dropped. See [`super::muxer`].
    pub fn with_tsi_token(
        cid: u64,
        token: Option<[u8; super::muxer::TSI_TOKEN_LEN]>,
    ) -> Result<Self, muxer_thread::StartError> {
        Ok(Self {
            cid,
            queues: Mutex::new(Vec::new()),
            activated: std::sync::atomic::AtomicBool::new(false),
            pending_rx: Mutex::new(VecDeque::new()),
            pending_rx_cap: PENDING_RX_MAX,
            irq_raise: Mutex::new(None),
            muxer: Arc::new(VsockMuxer::with_tsi_token(cid, token)?),
        })
    }

    pub fn muxer(&self) -> &Arc<VsockMuxer> {
        &self.muxer
    }

    /// External wake: drain any pending RX (from device or muxer)
    /// into the guest's RX virtq. Called by the muxer's accept
    /// thread after pushing a REQUEST.
    pub fn kick(&self) {
        self.try_drain_rx();
    }

    pub fn set_irq_raise(&self, f: Arc<dyn Fn() + Send + Sync>) {
        *self.irq_raise.lock().unwrap() = Some(f);
    }

    /// Stop the vsock dataplane and detach it from guest RAM. The VM MUST call
    /// this before freeing guest RAM.
    ///
    /// Two distinct writers drain inbound packets into the guest's RX
    /// descriptors via [`try_drain_rx`](Self::try_drain_rx) (which `write_slice`s
    /// the vsock header + payload into guest physical addresses):
    ///   1. the muxer's I/O thread (joined by [`muxer.shutdown`]); and
    ///   2. the host-side bridge *acceptor* threads — the exec bridge and TSI
    ///      mux (`crate::kvm::run::LinuxVm::start_exec_bridge` / `start_tsi_mux`).
    ///      Those accept a host connection and call `open_*_to_guest ->
    ///      push_control_and_kick -> try_drain_rx` inline. They outlive the VM
    ///      (they loop on `listener.incoming()`), so a connection accepted DURING
    ///      teardown would otherwise write into RAM we are about to `munmap` — an
    ///      intermittent use-after-free that surfaces as a SIGSEGV inside
    ///      `__memcpy` under load (observed in the full single-threaded test
    ///      sweep, faulting in `try_drain_rx`'s `write_slice`).
    ///
    /// `try_drain_rx` re-reads `q.ready` UNDER the `queues` lock immediately
    /// before *every* guest-RAM access, so clearing `ready` while holding that
    /// same lock is race-free: an in-flight drain holds the lock and we block
    /// until it finishes (RAM still mapped); every later drain observes
    /// `!ready` and returns before dereferencing the soon-to-be-freed mapping.
    /// `activated` is also cleared so the cheap fast-path bails even earlier.
    pub fn shutdown(&self) {
        // 1. Stop + join the muxer's I/O thread.
        self.muxer.shutdown();
        // 2. Quiesce the device so no other thread can reach guest RAM.
        self.activated
            .store(false, std::sync::atomic::Ordering::SeqCst);
        let mut qs = self
            .queues
            .lock()
            .unwrap_or_else(std::sync::PoisonError::into_inner);
        for q in qs.iter_mut() {
            q.ready = false;
        }
    }

    pub fn cid(&self) -> u64 {
        self.cid
    }

    /// Drain the per-dispatch RX backlog. Pool-worker mode calls
    /// this in tandem with `VsockMuxer::reset` between RESTOREs so
    /// the next dispatch starts with no leftover packets.
    pub fn reset_pending_rx(&self) {
        self.pending_rx.lock().unwrap().clear();
    }

    /// True when the vsock dataplane has no active per-request state.
    pub fn is_transport_idle(&self) -> bool {
        self.pending_rx.lock().unwrap().is_empty() && self.muxer.is_transport_idle()
    }

    /// Push an RX packet for the guest. Drains opportunistically;
    /// queued otherwise. Future muxer callers use this to deliver
    /// inbound TCP→vsock data.
    pub fn push_rx(&self, pkt: RxPacket) {
        self.pending_rx.lock().unwrap().push_back(pkt);
        self.try_drain_rx();
    }

    /// Try to land queued RX packets into guest RX descriptors.
    /// Like [`Self::try_drain_rx`] but returns whether any RX was
    /// placed (and the IRQ raised). Callers that combine TX+RX
    /// completion use this to avoid double-raising the IRQ.
    fn try_drain_rx_returning(&self) -> bool {
        // Implementation mirrors try_drain_rx but reports whether
        // any descriptors were filled.
        // Reuse the shared body by inspecting whether any packets
        // are queued — slight overhead but keeps the two paths in
        // lockstep. For now we just call try_drain_rx and read back
        // the queue depth; the IRQ raised inside try_drain_rx fires
        // either way, and we use the return for the OUTER caller
        // (drain_tx) to decide whether to ALSO raise IRQ on pure-
        // TX completion.
        let depth_before = self.pending_rx.lock().unwrap().len();
        self.try_drain_rx();
        let depth_after = self.pending_rx.lock().unwrap().len();
        // "Any RX placed" = depth shrank (or stayed same if it was 0
        // and stayed 0 — in which case the original call did nothing).
        depth_before > depth_after
    }

    /// Move packets from the muxer's rxq into `pending_rx`, stopping at
    /// `pending_rx_cap`. Leaving packets in the muxer rxq is precisely
    /// what makes the muxer-side back-pressure engage (see
    /// [`PENDING_RX_MAX`]): the rxq fills, `push_was_empty` returns the
    /// packet to its producer, and that connection stashes + pauses.
    /// Lock order is muxer rxq → pending_rx (the dataplane invariant).
    fn refill_pending_from_muxer(&self) {
        let mut mq = self.muxer.rxq.lock().unwrap();
        let mut p = self.pending_rx.lock().unwrap();
        while p.len() < self.pending_rx_cap {
            match mq.pop() {
                Some(pkt) => p.push_back(pkt),
                None => break,
            }
        }
    }

    fn try_drain_rx(&self) {
        if !self.activated.load(std::sync::atomic::Ordering::Acquire) {
            return;
        }
        // Pull muxer-pushed RxPackets into our staging queue, but only up
        // to pending_rx_cap. When pending_rx is full — a stuck/slow guest
        // isn't returning RX descriptors — we STOP draining the muxer rxq
        // so it fills and its producers back-pressure (the host-socket
        // read pauses → the kernel TCP-back-pressures the remote sender),
        // instead of this device buffering host→guest data without bound.
        self.refill_pending_from_muxer();
        let mut qs = self.queues.lock().unwrap();
        let q = match qs.get_mut(RXQ_INDEX) {
            Some(q) => q,
            None => return,
        };
        if !q.ready {
            return;
        }
        let mut pending = self.pending_rx.lock().unwrap();
        let n_pending = pending.len();
        let mut any = false;
        while let Some(pkt) = pending.front() {
            // Pop a descriptor chain head from RX avail.
            let (head, chain) = match q.pop_chain() {
                Some(p) => p,
                None => break, // no descriptors free; leave queued
            };
            // Guard the guest-provided chain (mirror of `drain_tx`):
            //   - an EMPTY chain (`pop_chain` can return `vec![]` when the
            //     head index is out of range) would panic on `chain[0]`;
            //   - a first descriptor smaller than the 44-byte vsock header
            //     can't hold it, and writing the header anyway would spill
            //     past the buffer the guest advertised and over-report the
            //     used length.
            // Either way the descriptor is unusable for THIS packet: ack
            // it (so the guest can recycle the buffer) and try the next
            // one — the packet stays queued (we don't pop_front).
            let first = match chain.first() {
                Some(d) if (d.len as usize) >= VSOCK_PKT_HDR_SIZE => *d,
                _ => {
                    q.add_used(head, 0);
                    any = true;
                    continue;
                }
            };
            // Header writable into first descriptor (guest provides
            // a writable buffer in RX queue).
            let mut hdr = pkt.hdr;
            hdr.len = pkt.data.len() as u32;
            let hdr_bytes = hdr.encode();
            q.mem.write_slice(first.addr, &hdr_bytes);
            let mut written = VSOCK_PKT_HDR_SIZE as u32;
            if !pkt.data.is_empty() {
                // Linux 6.2+: payload may follow header in the SAME
                // descriptor; older kernels split. Walk the chain
                // until we've placed the whole payload OR run out
                // of writable descriptors.
                let mut payload_off = 0usize;
                // First descriptor may have leftover bytes after
                // the header.
                let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
                if first_leftover > 0 {
                    let take = first_leftover.min(pkt.data.len());
                    q.mem
                        .write_slice(first.addr + VSOCK_PKT_HDR_SIZE as u64, &pkt.data[..take]);
                    written += take as u32;
                    payload_off += take;
                }
                // Remaining descriptors hold payload only.
                for d in chain.iter().skip(1) {
                    if payload_off >= pkt.data.len() {
                        break;
                    }
                    let take = (d.len as usize).min(pkt.data.len() - payload_off);
                    q.mem
                        .write_slice(d.addr, &pkt.data[payload_off..payload_off + take]);
                    written += take as u32;
                    payload_off += take;
                }
                debug_assert_eq!(
                    payload_off,
                    pkt.data.len(),
                    "RX descriptor chain too small for {} byte packet",
                    pkt.data.len()
                );
            }
            q.add_used(head, written);
            pending.pop_front();
            any = true;
        }
        if any {
            let f_opt = self.irq_raise.lock().unwrap().clone();
            drop(pending);
            drop(qs);
            if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
                eprintln!("[vsock] drained {} RX, raising IRQ", n_pending);
            }
            if let Some(f) = f_opt {
                f();
            }
        }
    }

    /// Drain the TX queue: parse each packet, generate a response
    /// (RST for now — the muxer port replaces this with real
    /// routing). After TX drain, try to deliver any pending RX.
    fn drain_tx(&self) {
        // Snapshot (header, payload) for each TX chain. Read
        // payload into a Vec — we'll route it through the muxer
        // (which may need it for TSI control DGRAMs and RW data).
        let parsed: Vec<(Header, Vec<u8>)> = {
            let mut qs = self.queues.lock().unwrap();
            let q = match qs.get_mut(TXQ_INDEX) {
                Some(q) => q,
                None => return,
            };
            if !q.ready {
                return;
            }
            let mut out = Vec::new();
            while let Some((head, chain)) = q.pop_chain() {
                if chain.is_empty() {
                    q.add_used(head, 0);
                    continue;
                }
                let first = chain[0];
                if (first.len as usize) < VSOCK_PKT_HDR_SIZE {
                    q.add_used(head, 0);
                    continue;
                }
                let mut hdrbuf = [0u8; VSOCK_PKT_HDR_SIZE];
                q.mem.read_slice(first.addr, &mut hdrbuf);
                let h = Header::parse(&hdrbuf);
                // Payload either continues in first.addr+44 (Linux
                // 6.2+ single-desc TX) or in chain[1].
                let payload = if h.len > 0 {
                    assemble_tx_payload(h.len as usize, &chain, &q.mem)
                } else {
                    Vec::new()
                };
                if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
                    eprintln!(
                        "[vsock] TX op={} type={} src=({},{}) dst=({},{}) len={}",
                        h.op, h.type_, h.src_cid, h.src_port, h.dst_cid, h.dst_port, h.len
                    );
                }
                out.push((h, payload));
                q.add_used(head, 0);
            }
            out
        };
        let tx_any = !parsed.is_empty();
        // Route through muxer (no queue lock held).
        for (h, payload) in parsed {
            for rx in self.muxer.handle_tx(&h, &payload) {
                self.pending_rx.lock().unwrap().push_back(rx);
            }
        }
        // Drain any RX intents into RX descriptors + raise IRQ.
        // ALSO raise IRQ for pure-TX completions (no inbound traffic) —
        // the guest's virtio-vsock worker thread relies on the used-ring
        // notification to learn that TX descriptors have been freed.
        // Without this, on a high-throughput unidirectional stream
        // (workload writing 1+ MiB to stdout, no host→guest traffic),
        // the guest's send buffer fills, its worker sleeps waiting for
        // used-ring advancement, and bytes stop flowing. Pre-0.7.12
        // this manifested as spawn() + readStdout hanging once a
        // workload produced more bytes than the kernel's vsock send
        // buffer (~256 KiB). Pinned by exec_streaming.test.ts.
        let rx_any = self.try_drain_rx_returning();
        if tx_any && !rx_any {
            let f_opt = self.irq_raise.lock().unwrap().clone();
            if let Some(f) = f_opt {
                f();
            }
        }
    }
}

/// Assemble a TX packet's payload from its descriptor chain.
///
/// The allocation is capped to the bytes the chain ACTUALLY carries
/// (first-descriptor leftover after the 44-byte header + the remaining
/// descriptors). `header_len` is the guest-controlled `vsock_hdr.len`: a
/// hostile guest can claim `len = u32::MAX` while supplying only the
/// 44-byte header descriptor, which — before this cap — forced a ~4 GiB
/// zero-filled `Vec` before `truncate` threw it away, a guest→host OOM
/// DoS. We never need more than the chain provides; anything the guest
/// over-claims beyond that is truncated regardless.
fn assemble_tx_payload(header_len: usize, chain: &[Desc], mem: &GuestMem) -> Vec<u8> {
    let first = chain[0];
    let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
    let chain_rest: usize = chain.iter().skip(1).map(|d| d.len as usize).sum();
    let avail = first_leftover.saturating_add(chain_rest);
    let want = header_len.min(avail);
    let mut payload = vec![0u8; want];
    let mut off = 0usize;
    if first_leftover > 0 {
        let take = first_leftover.min(want);
        mem.read_slice(first.addr + VSOCK_PKT_HDR_SIZE as u64, &mut payload[..take]);
        off += take;
    }
    for d in chain.iter().skip(1) {
        if off >= want {
            break;
        }
        let take = (d.len as usize).min(want - off);
        mem.read_slice(d.addr, &mut payload[off..off + take]);
        off += take;
    }
    payload.truncate(off);
    payload
}

impl VirtioDevice for Vsock {
    fn device_id(&self) -> u32 {
        VIRTIO_ID_VSOCK
    }
    fn num_queues(&self) -> usize {
        3
    }
    fn config(&self) -> Vec<u8> {
        self.cid.to_le_bytes().to_vec()
    }
    fn features(&self) -> u64 {
        // VIRTIO_F_VERSION_1 (bit 32) + VIRTIO_VSOCK_F_DGRAM (bit 3).
        // The DGRAM feature bit is what the kernel TSI patches
        // gate on — without it advertised by the host, the kernel's
        // TSI listen() returns EINVAL because it can't send the
        // PROXY_CREATE / LISTEN control DGRAMs.
        (1u64 << 32) | (1u64 << 3)
    }
    fn notify(&self, q: u16) {
        match q as usize {
            TXQ_INDEX => self.drain_tx(),
            RXQ_INDEX => self.try_drain_rx(),
            _ => {}
        }
    }
    fn activate(&self, queues: Vec<Queue>) {
        *self.queues.lock().unwrap() = queues;
        self.activated
            .store(true, std::sync::atomic::Ordering::Release);
        eprintln!("[vsock] activated cid={}", self.cid);
    }
    fn snapshot_queues(&self) -> Vec<Queue> {
        self.queues.lock().unwrap().clone()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn rx_pkt(tag: u8) -> RxPacket {
        RxPacket {
            hdr: Header {
                src_cid: 2,
                dst_cid: 3,
                src_port: 1000,
                dst_port: 2000,
                len: 1,
                type_: 1, // STREAM
                op: 5,    // RW
                flags: 0,
                buf_alloc: 0,
                fwd_cnt: 0,
            },
            data: vec![tag],
        }
    }

    #[test]
    fn pending_rx_sweep_is_bounded_and_never_drops() {
        // Models a stuck/slow guest: it returns no RX descriptors, so
        // pending_rx never drains. Producers keep pushing into the muxer
        // rxq. The sweep must cap pending_rx and LEAVE the overflow in the
        // muxer rxq (so the muxer back-pressures) — never drop it.
        let mut dev = Vsock::new(3).expect("vsock");
        dev.pending_rx_cap = 8;

        for i in 0..100u8 {
            assert!(dev.muxer.rxq.lock().unwrap().push(rx_pkt(i)));
        }

        // Sweep fills pending_rx to the cap and stops; the rest stay
        // queued in the muxer rxq — bounded buffering, zero loss.
        dev.refill_pending_from_muxer();
        assert_eq!(dev.pending_rx.lock().unwrap().len(), 8, "pending_rx capped");
        assert_eq!(
            dev.muxer.rxq.lock().unwrap().len(),
            92,
            "overflow retained in the muxer rxq, not dropped"
        );

        // Re-sweeping while pending_rx is full is a no-op (still no loss).
        dev.refill_pending_from_muxer();
        assert_eq!(dev.pending_rx.lock().unwrap().len(), 8);
        assert_eq!(dev.muxer.rxq.lock().unwrap().len(), 92);

        // Guest consumes (drains pending_rx) → the next sweep pulls more,
        // FIFO, still losing nothing.
        let drained: Vec<u8> = {
            let mut p = dev.pending_rx.lock().unwrap();
            p.drain(..).map(|pkt| pkt.data[0]).collect()
        };
        assert_eq!(drained, (0..8).collect::<Vec<u8>>(), "FIFO order preserved");
        dev.refill_pending_from_muxer();
        assert_eq!(dev.pending_rx.lock().unwrap().len(), 8);
        assert_eq!(dev.muxer.rxq.lock().unwrap().len(), 84);
        // 8 delivered + 8 staged + 84 queued = 100: nothing lost anywhere.
    }

    // ── assemble_tx_payload: guest-controlled length handling ──────────

    const PBASE: u64 = 0x2_0000;

    fn desc(addr: u64, len: u32) -> Desc {
        Desc {
            addr,
            len,
            flags: 0,
            next: 0,
        }
    }

    fn fresh_mem() -> (Vec<u8>, GuestMem) {
        let mut backing = vec![0u8; 0x1_0000];
        let mem = GuestMem::new(backing.as_mut_ptr(), PBASE, backing.len());
        (backing, mem)
    }

    #[test]
    fn assemble_inline_payload_after_header() {
        // Linux 6.2+ single-descriptor TX: payload follows the 44-byte
        // header in the same descriptor.
        let (_b, mem) = fresh_mem();
        let body = b"hello-vsock";
        mem.write_slice(PBASE + VSOCK_PKT_HDR_SIZE as u64, body);
        let chain = [desc(PBASE, VSOCK_PKT_HDR_SIZE as u32 + body.len() as u32)];
        let got = assemble_tx_payload(body.len(), &chain, &mem);
        assert_eq!(got, body);
    }

    #[test]
    fn assemble_payload_split_across_descriptors() {
        let (_b, mem) = fresh_mem();
        // Header-only first descriptor, then two payload descriptors.
        mem.write_slice(PBASE + 0x100, b"AAAAAAAA");
        mem.write_slice(PBASE + 0x200, b"BBBBBBBB");
        let chain = [
            desc(PBASE, VSOCK_PKT_HDR_SIZE as u32),
            desc(PBASE + 0x100, 8),
            desc(PBASE + 0x200, 8),
        ];
        let got = assemble_tx_payload(16, &chain, &mem);
        assert_eq!(got, b"AAAAAAAABBBBBBBB");
    }

    #[test]
    fn assemble_caps_overclaimed_length_no_giant_alloc() {
        // The DoS guard: header claims u32::MAX bytes but the chain only
        // carries 4. Allocation must track the chain, not the claim.
        let (_b, mem) = fresh_mem();
        mem.write_slice(PBASE + VSOCK_PKT_HDR_SIZE as u64, b"abcd");
        let chain = [desc(PBASE, VSOCK_PKT_HDR_SIZE as u32 + 4)];
        let got = assemble_tx_payload(u32::MAX as usize, &chain, &mem);
        assert_eq!(got, b"abcd", "only the bytes actually present are read");
        assert!(
            got.capacity() <= 4096,
            "allocation must be chain-bounded, not {} bytes",
            got.capacity()
        );
    }

    #[test]
    fn assemble_overclaim_with_header_only_chain_is_empty() {
        // u32::MAX claim but ZERO payload bytes in the chain → empty Vec,
        // zero allocation (the worst-case OOM vector).
        let (_b, mem) = fresh_mem();
        let chain = [desc(PBASE, VSOCK_PKT_HDR_SIZE as u32)];
        let got = assemble_tx_payload(u32::MAX as usize, &chain, &mem);
        assert!(got.is_empty());
        assert!(got.capacity() <= 4096);
    }

    #[test]
    fn assemble_truncates_to_available_when_chain_short() {
        let (_b, mem) = fresh_mem();
        mem.write_slice(PBASE + 0x100, b"1234567890");
        let chain = [
            desc(PBASE, VSOCK_PKT_HDR_SIZE as u32),
            desc(PBASE + 0x100, 10),
        ];
        // Claims 100 but only 10 are present.
        let got = assemble_tx_payload(100, &chain, &mem);
        assert_eq!(got, b"1234567890");
    }

    // ── drain_tx end-to-end: hostile lengths don't crash the device ────

    /// Drive one TX packet through `drain_tx` (TXQ is queue index 1).
    /// `first_desc_len` and `len_field` are set independently so a test
    /// can claim a length the chain doesn't back. Returns the TXQ used.idx
    /// (1 ⇒ the chain was consumed exactly once).
    fn drive_one_tx(first_desc_len: u32, len_field: u32, op: u16, payload: &[u8]) -> u16 {
        let base = 0x10_0000u64;
        let size = 256 * 1024;
        let mut backing = vec![0u8; size];
        let mem = GuestMem::new(backing.as_mut_ptr(), base, size);

        // 44-byte header at 0x6000 with op + len field; payload follows.
        let mut hb = Header {
            src_cid: 3,
            dst_cid: 2,
            src_port: 50_000,
            dst_port: 9_999,
            len: 0,
            type_: 1, // STREAM
            op,
            flags: 0,
            buf_alloc: 0,
            fwd_cnt: 0,
        }
        .encode();
        hb[24..28].copy_from_slice(&len_field.to_le_bytes());
        mem.write_slice(base + 0x6000, &hb);
        mem.write_slice(base + 0x6000 + VSOCK_PKT_HDR_SIZE as u64, payload);

        // TXQ desc[0]: header (+payload) — single-descriptor chain.
        let td = base; // TXQ desc table at base+0
        mem.write_u64(td, base + 0x6000);
        mem.write_u32(td + 8, first_desc_len);
        mem.write_u16(td + 12, 0);
        mem.write_u16(td + 14, 0);
        // TXQ avail: ring[0]=head 0; idx=1.
        mem.write_u16(base + 0x0800 + 4, 0);
        mem.write_u16(base + 0x0800 + 2, 1);

        let mk = |d: u64, a: u64, u: u64| {
            let mut q = Queue::new(mem.clone());
            q.size = 8;
            q.ready = true;
            q.desc_table = base + d;
            q.avail_ring = base + a;
            q.used_ring = base + u;
            q
        };
        // index 0 = RXQ (empty avail), 1 = TXQ, 2 = EVQ.
        let rxq = mk(0x2000, 0x2800, 0x3000);
        let txq = mk(0x0000, 0x0800, 0x1000);
        let evq = mk(0x4000, 0x4800, 0x5000);

        let dev = Vsock::new(3).expect("vsock");
        dev.activate(vec![rxq, txq, evq]);
        dev.notify(TXQ_INDEX as u16);

        // Read TXQ used.idx; dev (and its mem clones) drop before backing.
        mem.read_u16(base + 0x1000 + 2)
    }

    #[test]
    fn drain_tx_overclaimed_length_does_not_oom() {
        // RW packet to an unknown connection claiming u32::MAX bytes with
        // only a 44-byte header descriptor. Pre-fix this zero-filled ~4 GiB
        // before discarding it. Must consume the chain and return promptly.
        let used = drive_one_tx(VSOCK_PKT_HDR_SIZE as u32, u32::MAX, 5 /*RW*/, &[]);
        assert_eq!(used, 1, "TX chain must be consumed exactly once");
    }

    #[test]
    fn drain_tx_short_header_is_acked() {
        // First descriptor shorter than the 44-byte header: device acks
        // and skips, never parses — no panic.
        let used = drive_one_tx(20, 0, 5, &[]);
        assert_eq!(used, 1, "short-header chain is acked");
    }

    #[test]
    fn drain_tx_normal_rw_payload_is_consumed() {
        let used = drive_one_tx(VSOCK_PKT_HDR_SIZE as u32 + 8, 8, 5, b"DEADBEEF");
        assert_eq!(used, 1);
    }

    // ── RX descriptor-write path: hostile descriptor chains ────────────

    fn rx_data_pkt(data: &[u8]) -> RxPacket {
        RxPacket {
            hdr: Header {
                src_cid: 2,
                dst_cid: 3,
                src_port: 1,
                dst_port: 2,
                len: 0, // set to data.len() by try_drain_rx before encode
                type_: 1,
                op: 5, // RW
                flags: 0,
                buf_alloc: 0,
                fwd_cnt: 0,
            },
            data: data.to_vec(),
        }
    }

    struct RxOut {
        used_idx: u16,
        used_len: u32,
        buf: Vec<u8>,
    }

    /// Stage `pkt` into pending_rx, set up RXQ (index 0) with one writable
    /// descriptor of `desc_len` bytes, point the avail head at `head`
    /// (set `head >= size` to force an EMPTY chain), run try_drain_rx, and
    /// read back the used ring + the descriptor buffer.
    fn drive_rx(desc_len: u32, head: u16, pkt: RxPacket) -> RxOut {
        use crate::devices::virtio::queue::VRING_DESC_F_WRITE;
        let base = 0x10_0000u64;
        let size = 256 * 1024;
        let mut backing = vec![0u8; size];
        let mem = GuestMem::new(backing.as_mut_ptr(), base, size);

        // RXQ desc[0]: a single writable receive buffer at base+0x6000.
        let d0 = base;
        mem.write_u64(d0, base + 0x6000);
        mem.write_u32(d0 + 8, desc_len);
        mem.write_u16(d0 + 12, VRING_DESC_F_WRITE);
        mem.write_u16(d0 + 14, 0);
        // avail: ring[0] = head; idx = 1.
        mem.write_u16(base + 0x0800 + 4, head);
        mem.write_u16(base + 0x0800 + 2, 1);

        let mk = |d: u64, a: u64, u: u64| {
            let mut q = Queue::new(mem.clone());
            q.size = 8;
            q.ready = true;
            q.desc_table = base + d;
            q.avail_ring = base + a;
            q.used_ring = base + u;
            q
        };
        let rxq = mk(0x0000, 0x0800, 0x1000);
        let txq = mk(0x2000, 0x2800, 0x3000);
        let evq = mk(0x4000, 0x4800, 0x5000);

        let dev = Vsock::new(3).expect("vsock");
        dev.activate(vec![rxq, txq, evq]);
        dev.pending_rx.lock().unwrap().push_back(pkt);
        dev.try_drain_rx();

        let used_idx = mem.read_u16(base + 0x1000 + 2);
        let used_len = mem.read_u32(base + 0x1000 + 8); // used.ring[0].len
        let mut buf = vec![0u8; VSOCK_PKT_HDR_SIZE + 16];
        mem.read_slice(base + 0x6000, &mut buf);
        RxOut {
            used_idx,
            used_len,
            buf,
        }
    }

    #[test]
    fn rx_empty_chain_does_not_panic() {
        // head >= queue size → pop_chain yields an EMPTY chain. Pre-fix
        // `chain[0]` panicked the worker (guest-triggerable DoS). The
        // chain is acked, the packet stays queued, the buffer untouched.
        let out = drive_rx(64, 99, rx_data_pkt(b"x"));
        assert_eq!(out.used_idx, 1, "the unusable chain is acked");
        assert_eq!(out.used_len, 0);
        assert!(out.buf.iter().all(|&b| b == 0), "nothing written");
    }

    #[test]
    fn rx_short_first_descriptor_is_not_overwritten() {
        // First descriptor smaller than the 44-byte header: pre-fix the
        // device wrote 44 bytes past the advertised 8-byte buffer and
        // reported used.len 44. Now it's acked with 0, nothing written.
        let out = drive_rx(8, 0, rx_data_pkt(b"payload"));
        assert_eq!(out.used_idx, 1);
        assert_eq!(out.used_len, 0, "must not over-report a short buffer");
        assert!(
            out.buf.iter().all(|&b| b == 0),
            "header not written past the buffer"
        );
    }

    /// After [`Vsock::shutdown`] the dataplane is quiesced: a pending RX packet
    /// with a perfectly valid writable descriptor must NOT be drained into guest
    /// RAM. This is the teardown gate that prevents the use-after-free SIGSEGV —
    /// a host-side bridge acceptor thread that accepts a connection during VM
    /// teardown calls `try_drain_rx`, and that write must be a no-op once the VM
    /// is about to `munmap` the guest mapping. We assert it both via the public
    /// `kick()` (the muxer/accept-thread entry point) and a direct re-drive:
    /// nothing is written, the used ring stays empty, and the packet survives.
    #[test]
    fn shutdown_quiesces_rx_no_guest_write_after_teardown() {
        use crate::devices::virtio::queue::VRING_DESC_F_WRITE;
        let base = 0x10_0000u64;
        let size = 256 * 1024;
        let mut backing = vec![0u8; size];
        let mem = GuestMem::new(backing.as_mut_ptr(), base, size);

        // One writable RX buffer at base+0x6000, large enough for header+payload.
        let d0 = base;
        mem.write_u64(d0, base + 0x6000);
        mem.write_u32(d0 + 8, VSOCK_PKT_HDR_SIZE as u32 + 8);
        mem.write_u16(d0 + 12, VRING_DESC_F_WRITE);
        mem.write_u16(d0 + 14, 0);
        mem.write_u16(base + 0x0800 + 4, 0); // avail.ring[0] = head 0
        mem.write_u16(base + 0x0800 + 2, 1); // avail.idx = 1

        let mk = |d: u64, a: u64, u: u64| {
            let mut q = Queue::new(mem.clone());
            q.size = 8;
            q.ready = true;
            q.desc_table = base + d;
            q.avail_ring = base + a;
            q.used_ring = base + u;
            q
        };
        let dev = Vsock::new(3).expect("vsock");
        dev.activate(vec![
            mk(0x0000, 0x0800, 0x1000),
            mk(0x2000, 0x2800, 0x3000),
            mk(0x4000, 0x4800, 0x5000),
        ]);

        // Tear the device down (as LinuxVm::drop does before munmap), THEN let a
        // late "connection" try to deliver an RX packet.
        dev.shutdown();
        dev.pending_rx
            .lock()
            .unwrap()
            .push_back(rx_data_pkt(b"LATE"));
        dev.kick(); // == try_drain_rx; the accept-thread path
        dev.try_drain_rx(); // belt-and-suspenders second drive

        // The used ring is untouched and the guest buffer never written.
        assert_eq!(mem.read_u16(base + 0x1000 + 2), 0, "used.idx must stay 0");
        let mut buf = vec![0u8; VSOCK_PKT_HDR_SIZE + 8];
        mem.read_slice(base + 0x6000, &mut buf);
        assert!(
            buf.iter().all(|&b| b == 0),
            "no header/payload may be written into guest RAM after shutdown"
        );
        // The packet stays queued (not silently dropped) — it simply can't land.
        assert_eq!(dev.pending_rx.lock().unwrap().len(), 1, "packet retained");
    }

    #[test]
    fn rx_valid_descriptor_receives_header_and_payload() {
        // A correctly-sized writable descriptor: header (with len set to
        // the payload size) + payload land in the buffer; used.len counts
        // header + payload.
        let out = drive_rx(VSOCK_PKT_HDR_SIZE as u32 + 5, 0, rx_data_pkt(b"HELLO"));
        assert_eq!(out.used_idx, 1);
        assert_eq!(out.used_len, VSOCK_PKT_HDR_SIZE as u32 + 5);
        // Header `len` field (LE u32 at offset 24) reflects the payload.
        let hdr_len = u32::from_le_bytes(out.buf[24..28].try_into().unwrap());
        assert_eq!(hdr_len, 5);
        // Payload follows the 44-byte header.
        assert_eq!(
            &out.buf[VSOCK_PKT_HDR_SIZE..VSOCK_PKT_HDR_SIZE + 5],
            b"HELLO"
        );
    }
}