1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Status: minimal but FUNCTIONAL — parses TX packets, replies RST
// to any REQUEST (kernel observes a clean refusal instead of
// timing out), pushes responses back via the RX queue, raises IRQ.
// Real connection handling lives in the (next-commit) muxer.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use super::super::queue::Queue;
use super::super::{VirtioDevice, VIRTIO_ID_VSOCK};
use super::muxer::VsockMuxer;
use super::muxer_thread;
use super::packet::{Header, RxPacket, VSOCK_PKT_HDR_SIZE};
pub const RXQ_INDEX: usize = 0;
pub const TXQ_INDEX: usize = 1;
pub const EVQ_INDEX: usize = 2;
pub struct Vsock {
cid: u64,
queues: Mutex<Vec<Queue>>,
activated: std::sync::atomic::AtomicBool,
/// Queue of RX packets waiting to land in guest descriptors.
/// Drained on RXQ notify or after TX dispatch.
pending_rx: Mutex<VecDeque<RxPacket>>,
irq_raise: Mutex<Option<Arc<dyn Fn() + Send + Sync>>>,
/// Muxer that routes incoming TX → outgoing RX. Owns the
/// connection table + TSI control state. Arc so accept-thread
/// closures can hold references back to it.
muxer: Arc<VsockMuxer>,
}
impl Vsock {
pub fn new(cid: u64) -> Result<Self, muxer_thread::StartError> {
Self::with_tsi_token(cid, None)
}
/// Like [`Vsock::new`] but pre-arms the muxer with a TSI
/// control-channel auth token. Mismatched / unprefixed control
/// DGRAMs are silently dropped. See [`super::muxer`].
pub fn with_tsi_token(
cid: u64,
token: Option<[u8; super::muxer::TSI_TOKEN_LEN]>,
) -> Result<Self, muxer_thread::StartError> {
Ok(Self {
cid,
queues: Mutex::new(Vec::new()),
activated: std::sync::atomic::AtomicBool::new(false),
pending_rx: Mutex::new(VecDeque::new()),
irq_raise: Mutex::new(None),
muxer: Arc::new(VsockMuxer::with_tsi_token(cid, token)?),
})
}
pub fn muxer(&self) -> &Arc<VsockMuxer> {
&self.muxer
}
/// External wake: drain any pending RX (from device or muxer)
/// into the guest's RX virtq. Called by the muxer's accept
/// thread after pushing a REQUEST.
pub fn kick(&self) {
self.try_drain_rx();
}
pub fn set_irq_raise(&self, f: Arc<dyn Fn() + Send + Sync>) {
*self.irq_raise.lock().unwrap() = Some(f);
}
pub fn cid(&self) -> u64 {
self.cid
}
/// Drain the per-dispatch RX backlog. Pool-worker mode calls
/// this in tandem with `VsockMuxer::reset` between RESTOREs so
/// the next dispatch starts with no leftover packets.
pub fn reset_pending_rx(&self) {
self.pending_rx.lock().unwrap().clear();
}
/// True when the vsock dataplane has no active per-request state.
pub fn is_transport_idle(&self) -> bool {
self.pending_rx.lock().unwrap().is_empty() && self.muxer.is_transport_idle()
}
/// Push an RX packet for the guest. Drains opportunistically;
/// queued otherwise. Future muxer callers use this to deliver
/// inbound TCP→vsock data.
pub fn push_rx(&self, pkt: RxPacket) {
self.pending_rx.lock().unwrap().push_back(pkt);
self.try_drain_rx();
}
/// Try to land queued RX packets into guest RX descriptors.
fn try_drain_rx(&self) {
if !self.activated.load(std::sync::atomic::Ordering::Acquire) {
return;
}
// First, pull any muxer-pushed RxPackets into our pending
// (they came from accept threads etc.).
{
let mut mq = self.muxer.rxq.lock().unwrap();
let mut p = self.pending_rx.lock().unwrap();
while let Some(pkt) = mq.pop() {
p.push_back(pkt);
}
}
let mut qs = self.queues.lock().unwrap();
let q = match qs.get_mut(RXQ_INDEX) {
Some(q) => q,
None => return,
};
if !q.ready {
return;
}
let mut pending = self.pending_rx.lock().unwrap();
let n_pending = pending.len();
let mut any = false;
while let Some(pkt) = pending.front() {
// Pop a descriptor chain head from RX avail.
let (head, chain) = match q.pop_chain() {
Some(p) => p,
None => break, // no descriptors free; leave queued
};
let first = chain[0];
// Header writable into first descriptor (guest provides
// a writable buffer in RX queue).
let mut hdr = pkt.hdr;
hdr.len = pkt.data.len() as u32;
let hdr_bytes = hdr.encode();
q.mem.write_slice(first.addr, &hdr_bytes);
let mut written = VSOCK_PKT_HDR_SIZE as u32;
if !pkt.data.is_empty() {
// Linux 6.2+: payload may follow header in the SAME
// descriptor; older kernels split. Walk the chain
// until we've placed the whole payload OR run out
// of writable descriptors.
let mut payload_off = 0usize;
// First descriptor may have leftover bytes after
// the header.
let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
if first_leftover > 0 {
let take = first_leftover.min(pkt.data.len());
q.mem
.write_slice(first.addr + VSOCK_PKT_HDR_SIZE as u64, &pkt.data[..take]);
written += take as u32;
payload_off += take;
}
// Remaining descriptors hold payload only.
for d in chain.iter().skip(1) {
if payload_off >= pkt.data.len() {
break;
}
let take = (d.len as usize).min(pkt.data.len() - payload_off);
q.mem
.write_slice(d.addr, &pkt.data[payload_off..payload_off + take]);
written += take as u32;
payload_off += take;
}
debug_assert_eq!(
payload_off,
pkt.data.len(),
"RX descriptor chain too small for {} byte packet",
pkt.data.len()
);
}
q.add_used(head, written);
pending.pop_front();
any = true;
}
if any {
let f_opt = self.irq_raise.lock().unwrap().clone();
drop(pending);
drop(qs);
if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
eprintln!("[vsock] drained {} RX, raising IRQ", n_pending);
}
if let Some(f) = f_opt {
f();
}
}
}
/// Drain the TX queue: parse each packet, generate a response
/// (RST for now — the muxer port replaces this with real
/// routing). After TX drain, try to deliver any pending RX.
fn drain_tx(&self) {
// Snapshot (header, payload) for each TX chain. Read
// payload into a Vec — we'll route it through the muxer
// (which may need it for TSI control DGRAMs and RW data).
let parsed: Vec<(Header, Vec<u8>)> = {
let mut qs = self.queues.lock().unwrap();
let q = match qs.get_mut(TXQ_INDEX) {
Some(q) => q,
None => return,
};
if !q.ready {
return;
}
let mut out = Vec::new();
while let Some((head, chain)) = q.pop_chain() {
if chain.is_empty() {
q.add_used(head, 0);
continue;
}
let first = chain[0];
if (first.len as usize) < VSOCK_PKT_HDR_SIZE {
q.add_used(head, 0);
continue;
}
let mut hdrbuf = [0u8; VSOCK_PKT_HDR_SIZE];
q.mem.read_slice(first.addr, &mut hdrbuf);
let h = Header::parse(&hdrbuf);
// Payload either continues in first.addr+44 (Linux
// 6.2+ single-desc TX) or in chain[1].
let mut payload = Vec::with_capacity(h.len as usize);
if h.len > 0 {
payload.resize(h.len as usize, 0);
let mut off = 0usize;
let first_leftover = (first.len as usize).saturating_sub(VSOCK_PKT_HDR_SIZE);
if first_leftover > 0 {
let take = first_leftover.min(payload.len());
q.mem.read_slice(
first.addr + VSOCK_PKT_HDR_SIZE as u64,
&mut payload[..take],
);
off += take;
}
for d in chain.iter().skip(1) {
if off >= payload.len() {
break;
}
let take = (d.len as usize).min(payload.len() - off);
q.mem.read_slice(d.addr, &mut payload[off..off + take]);
off += take;
}
payload.truncate(off);
}
if crate::devices::virtio::vsock::muxer::vsock_trace_enabled() {
eprintln!(
"[vsock] TX op={} type={} src=({},{}) dst=({},{}) len={}",
h.op, h.type_, h.src_cid, h.src_port, h.dst_cid, h.dst_port, h.len
);
}
out.push((h, payload));
q.add_used(head, 0);
}
out
};
// Route through muxer (no queue lock held).
for (h, payload) in parsed {
for rx in self.muxer.handle_tx(&h, &payload) {
self.pending_rx.lock().unwrap().push_back(rx);
}
}
// Drain any RX intents into RX descriptors + raise IRQ.
self.try_drain_rx();
}
}
impl VirtioDevice for Vsock {
fn device_id(&self) -> u32 {
VIRTIO_ID_VSOCK
}
fn num_queues(&self) -> usize {
3
}
fn config(&self) -> Vec<u8> {
self.cid.to_le_bytes().to_vec()
}
fn features(&self) -> u64 {
// VIRTIO_F_VERSION_1 (bit 32) + VIRTIO_VSOCK_F_DGRAM (bit 3).
// The DGRAM feature bit is what the kernel TSI patches
// gate on — without it advertised by the host, the kernel's
// TSI listen() returns EINVAL because it can't send the
// PROXY_CREATE / LISTEN control DGRAMs.
(1u64 << 32) | (1u64 << 3)
}
fn notify(&self, q: u16) {
match q as usize {
TXQ_INDEX => self.drain_tx(),
RXQ_INDEX => self.try_drain_rx(),
_ => {}
}
}
fn activate(&self, queues: Vec<Queue>) {
*self.queues.lock().unwrap() = queues;
self.activated
.store(true, std::sync::atomic::Ordering::Release);
eprintln!("[vsock] activated cid={}", self.cid);
}
fn snapshot_queues(&self) -> Vec<Queue> {
self.queues.lock().unwrap().clone()
}
}