vox_types/fd.rs
1//! `Fd` — a file descriptor that travels across a vox connection.
2//!
3//! On the wire an [`Fd`] is encoded as a small varint *index* into a
4//! per-message fd table; the descriptors themselves travel out-of-band in
5//! `SCM_RIGHTS` ancillary data on a Unix-domain socket. This mirrors the
6//! `Tx<T>` → [`ChannelId`](crate::ChannelId) indirection: the bytes on the
7//! wire are just an index, the real resource is carried out-of-band and
8//! re-associated at the peer.
9//!
10//! The send/recv side-channel is plumbed through two thread-locals
11//! ([`collect_fds`] / [`provide_fds`]), installed around (de)serialization —
12//! exactly the shape of the channel binder in [`mod@crate::channel`].
13//!
14//! [`Fd`] itself is Unix-only (codegen refuses it for non-local / non-Rust
15//! targets). [`FrameFds`], [`collect_fds`] and [`provide_fds`] are portable
16//! — on non-Unix targets `FrameFds` is `()` and the helpers are pass-throughs
17//! — so the message-routing rail and generated client code need no `cfg`.
18
19/// The descriptors carried with one frame. `Vec<OwnedFd>` on Unix; `()`
20/// elsewhere (no transport can pass descriptors off-Unix).
21#[cfg(unix)]
22pub type FrameFds = Vec<std::os::fd::OwnedFd>;
23/// The descriptors carried with one frame (none off-Unix).
24#[cfg(not(unix))]
25pub type FrameFds = ();
26
27// ===========================================================================
28// Non-Unix: portable no-op surface.
29// ===========================================================================
30
31#[cfg(not(unix))]
32mod portable {
33 /// No collector off-Unix: run `f`, gather nothing.
34 pub fn collect_fds<R>(f: impl FnOnce() -> R) -> (R, super::FrameFds) {
35 (f(), ())
36 }
37
38 /// No source off-Unix: descriptors cannot arrive, just run `f`.
39 pub fn provide_fds<R>(_fds: super::FrameFds, f: impl FnOnce() -> R) -> R {
40 f()
41 }
42}
43
44#[cfg(not(unix))]
45pub use portable::{collect_fds, provide_fds};
46
47/// Number of descriptors in a frame's fd set (0 off-Unix).
48#[cfg(unix)]
49pub fn frame_fds_len(fds: &FrameFds) -> usize {
50 fds.len()
51}
52/// Number of descriptors in a frame's fd set (0 off-Unix).
53#[cfg(not(unix))]
54pub fn frame_fds_len(_fds: &FrameFds) -> usize {
55 0
56}
57
58// ===========================================================================
59// Unix: the real implementation.
60// ===========================================================================
61
62#[cfg(unix)]
63mod unix {
64 use std::cell::{Cell, RefCell};
65 use std::os::fd::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, OwnedFd, RawFd};
66
67 use facet::{Facet, FacetOpaqueAdapter, OpaqueDeserialize, OpaqueSerialize, PtrConst};
68
69 use super::FrameFds;
70
71 /// Maximum number of descriptors carried in a single `SCM_RIGHTS` control
72 /// message. The kernel hard-caps this (`SCM_MAX_FD`); we surface a clean
73 /// error rather than silently chunking across `sendmsg` calls.
74 pub const SCM_MAX_FD: usize = 253;
75
76 /// `wire_index` sentinel: this `Fd` has not been pushed into a collector.
77 /// Also the value encoded when an outgoing `Fd` carries no descriptor, so
78 /// the peer's `take_fd` fails cleanly instead of the encoder aborting
79 /// across `extern "C"`.
80 const NOT_COLLECTED: u32 = u32::MAX;
81
82 /// A file descriptor that can be sent across a vox connection.
83 ///
84 /// Construct one with [`Fd::new`] from anything that owns a descriptor
85 /// (`OwnedFd`, `File`, `UnixStream`, …). After the value has been
86 /// received on the far side, take ownership with [`Fd::into_owned_fd`]
87 /// or borrow it with [`Fd::as_raw_fd`].
88 ///
89 /// Serializing an `Fd` *duplicates* its descriptor into the transport's
90 /// `SCM_RIGHTS` batch (the source `Fd` keeps ownership), so a response
91 /// may be encoded more than once — the operation store's replay-seal
92 /// pass and the wire pass — and the encoder's size/write double-call is
93 /// deduped by the `Fd` value's address.
94 #[derive(Facet)]
95 #[facet(opaque = FdAdapter, traits(Debug))]
96 pub struct Fd {
97 /// The descriptor. `Some` for an outgoing `Fd`; `Some` for an
98 /// incoming `Fd` built by `deserialize_build`; `None` once taken.
99 inner: Cell<Option<OwnedFd>>,
100 /// Scratch the adapter points `OpaqueSerialize` at. Holds
101 /// `NOT_COLLECTED` until the first `serialize_map`, then the index
102 /// assigned by the send collector. Serialized as a postcard varint.
103 wire_index: Cell<u32>,
104 }
105
106 impl Fd {
107 /// Wrap an owned descriptor for sending.
108 pub fn new(fd: impl Into<OwnedFd>) -> Self {
109 Self {
110 inner: Cell::new(Some(fd.into())),
111 wire_index: Cell::new(NOT_COLLECTED),
112 }
113 }
114
115 /// Borrow the raw descriptor without taking ownership. `None` if it
116 /// has already been taken.
117 pub fn as_raw_fd(&self) -> Option<RawFd> {
118 let taken = self.inner.take();
119 let raw = taken.as_ref().map(|f| f.as_raw_fd());
120 self.inner.set(taken);
121 raw
122 }
123
124 /// Take the owned descriptor out of this `Fd`.
125 pub fn into_owned_fd(self) -> Option<OwnedFd> {
126 self.inner.take()
127 }
128
129 /// Take the descriptor as a raw, *owned* `RawFd`. The caller is
130 /// responsible for closing it.
131 pub fn into_raw_fd(self) -> Option<RawFd> {
132 self.inner.take().map(IntoRawFd::into_raw_fd)
133 }
134 }
135
136 impl std::fmt::Debug for Fd {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self.as_raw_fd() {
139 Some(raw) => f.debug_tuple("Fd").field(&raw).finish(),
140 None => f.debug_tuple("Fd").field(&"<consumed>").finish(),
141 }
142 }
143 }
144
145 // SAFETY: `Cell<Option<OwnedFd>>` / `Cell<u32>` are `Send` (contents are
146 // `Send`); `Fd` is moved, never shared, so `!Sync` is fine — matching
147 // `Payload`'s send-only stance.
148 unsafe impl Send for Fd {}
149
150 // -----------------------------------------------------------------------
151 // Thread-local fd side-channel — same install-around-(de)serialize shape
152 // as `CHANNEL_BINDER` (`channel.rs`).
153 // -----------------------------------------------------------------------
154
155 /// Descriptors gathered while encoding one message, with a dedup map so
156 /// the encoder's size pass + write pass (which both run `serialize_map`
157 /// for the same value) duplicate each descriptor exactly once.
158 struct FdCollector {
159 fds: Vec<OwnedFd>,
160 /// `Fd` value address → assigned index, scoped to this collector.
161 seen: std::collections::HashMap<usize, u32>,
162 }
163
164 std::thread_local! {
165 static FD_COLLECTOR: RefCell<Option<FdCollector>> = const { RefCell::new(None) };
166 static FD_SOURCE: RefCell<Option<Vec<Option<OwnedFd>>>> = const { RefCell::new(None) };
167 }
168
169 /// Install an empty fd collector for the duration of `f`, returning what
170 /// `f` produced together with the descriptors it gathered.
171 ///
172 /// Descriptors are *duplicated* into the collector, so the source `Fd`
173 /// keeps ownership and the same response can be encoded more than once
174 /// (the operation store's replay-seal pass and the wire pass).
175 pub fn collect_fds<R>(f: impl FnOnce() -> R) -> (R, FrameFds) {
176 struct Restore(Option<FdCollector>);
177 impl Drop for Restore {
178 fn drop(&mut self) {
179 FD_COLLECTOR.with(|c| *c.borrow_mut() = self.0.take());
180 }
181 }
182 let fresh = FdCollector {
183 fds: Vec::new(),
184 seen: std::collections::HashMap::new(),
185 };
186 let _restore = Restore(FD_COLLECTOR.with(|c| c.borrow_mut().replace(fresh)));
187 let out = f();
188 let fds = FD_COLLECTOR
189 .with(|c| {
190 c.borrow_mut()
191 .as_mut()
192 .map(|col| std::mem::take(&mut col.fds))
193 })
194 .unwrap_or_default();
195 (out, fds)
196 }
197
198 /// Provide the descriptors received with a frame for the duration of `f`
199 /// (typed payload decoding). Each [`Fd`] decoded inside claims one by
200 /// index.
201 pub fn provide_fds<R>(fds: FrameFds, f: impl FnOnce() -> R) -> R {
202 struct Restore(Option<Vec<Option<OwnedFd>>>);
203 impl Drop for Restore {
204 fn drop(&mut self) {
205 FD_SOURCE.with(|c| *c.borrow_mut() = self.0.take());
206 }
207 }
208 let slots = fds.into_iter().map(Some).collect();
209 let _restore = Restore(FD_SOURCE.with(|c| c.borrow_mut().replace(slots)));
210 f()
211 }
212
213 /// Duplicate `fd` into the active collector, returning its stable index.
214 ///
215 /// `key` is the source `Fd`'s value address: repeated calls for the same
216 /// value within one collector (size pass then write pass) return the
217 /// same index and duplicate the descriptor only once. Returns
218 /// `NOT_COLLECTED` — never panics — when no collector is installed (e.g.
219 /// the operation store's seal pre-encode: an fd response is inherently
220 /// non-replayable) or if `dup` fails; a panic here would abort the
221 /// process across the `extern "C"` encoder trampolines.
222 fn collect_fd(key: usize, fd: BorrowedFd<'_>) -> u32 {
223 FD_COLLECTOR.with(|c| {
224 let mut slot = c.borrow_mut();
225 let Some(col) = slot.as_mut() else {
226 return NOT_COLLECTED;
227 };
228 if let Some(&idx) = col.seen.get(&key) {
229 return idx;
230 }
231 let Ok(dup) = fd.try_clone_to_owned() else {
232 return NOT_COLLECTED;
233 };
234 let idx = col.fds.len() as u32;
235 col.fds.push(dup);
236 col.seen.insert(key, idx);
237 idx
238 })
239 }
240
241 /// Claim descriptor `index` from the active source.
242 fn take_fd(index: u32) -> Result<OwnedFd, String> {
243 if index == NOT_COLLECTED {
244 return Err("Fd was sent without a descriptor".to_string());
245 }
246 FD_SOURCE.with(|c| {
247 let mut slot = c.borrow_mut();
248 let vec = slot
249 .as_mut()
250 .ok_or_else(|| "Fd decoded with no fd source installed".to_string())?;
251 let len = vec.len();
252 let cell = vec
253 .get_mut(index as usize)
254 .ok_or_else(|| format!("Fd wire index {index} out of range ({len})"))?;
255 cell.take()
256 .ok_or_else(|| format!("Fd wire index {index} already claimed"))
257 })
258 }
259
260 /// Adapter that bridges [`Fd`] through the opaque field contract
261 /// (modelled on `PayloadAdapter` in `message.rs`).
262 pub struct FdAdapter;
263
264 impl FacetOpaqueAdapter for FdAdapter {
265 type Error = String;
266 type SendValue<'a> = Fd;
267 type RecvValue<'de> = Fd;
268
269 fn serialize_map(value: &Self::SendValue<'_>) -> OpaqueSerialize {
270 // Borrow (don't consume): `collect_fd` dups, so the same response
271 // can be encoded more than once (seal pass + wire pass) and the
272 // size/write double-call is deduped by the `Fd` value address.
273 // `Cell` has no `&` accessor for non-`Copy` contents, so swap
274 // out and back.
275 let taken = value.inner.take();
276 let idx = match taken.as_ref() {
277 Some(owned) => collect_fd(value as *const Fd as usize, owned.as_fd()),
278 None => NOT_COLLECTED,
279 };
280 value.inner.set(taken);
281 value.wire_index.set(idx);
282 OpaqueSerialize {
283 ptr: PtrConst::new(value.wire_index.as_ptr().cast::<u8>()),
284 shape: <u32 as Facet>::SHAPE,
285 }
286 }
287
288 fn deserialize_build<'de>(
289 input: OpaqueDeserialize<'de>,
290 ) -> Result<Self::RecvValue<'de>, Self::Error> {
291 let bytes = match &input {
292 OpaqueDeserialize::Borrowed(b) => *b,
293 OpaqueDeserialize::Owned(b) => b.as_slice(),
294 };
295 let mut cursor = vox_postcard::decode::Cursor::new(bytes);
296 let index = cursor
297 .read_varint()
298 .map_err(|e| format!("Fd index varint: {e}"))? as u32;
299 let owned = take_fd(index)?;
300 Ok(Fd {
301 inner: Cell::new(Some(owned)),
302 wire_index: Cell::new(index),
303 })
304 }
305 }
306
307 #[cfg(test)]
308 mod tests {
309 use super::*;
310 use std::io::{Read, Seek, Write};
311
312 fn temp_file_with(seed: &[u8]) -> std::fs::File {
313 let mut path = std::env::temp_dir();
314 let nanos = std::time::SystemTime::now()
315 .duration_since(std::time::UNIX_EPOCH)
316 .unwrap()
317 .as_nanos();
318 path.push(format!("vox-fd-test-{}-{nanos}", std::process::id()));
319 let mut f = std::fs::OpenOptions::new()
320 .create(true)
321 .read(true)
322 .write(true)
323 .truncate(true)
324 .open(&path)
325 .unwrap();
326 let _ = std::fs::remove_file(&path);
327 f.write_all(seed).unwrap();
328 f.rewind().unwrap();
329 f
330 }
331
332 #[test]
333 fn fd_round_trips_through_postcard() {
334 let file = temp_file_with(b"vox-fd-payload");
335 let msg = Fd::new(OwnedFd::from(file));
336
337 let (bytes, collected) = collect_fds(|| vox_postcard::to_vec(&msg).expect("encode"));
338 assert_eq!(collected.len(), 1, "one fd collected");
339 assert_eq!(&bytes[..4], &1u32.to_le_bytes());
340 assert_eq!(bytes[4], 0);
341
342 let decoded: Fd = provide_fds(collected, || {
343 vox_postcard::from_slice(&bytes).expect("decode")
344 });
345
346 let mut f = std::fs::File::from(decoded.into_owned_fd().expect("owned fd"));
347 let mut got = String::new();
348 f.read_to_string(&mut got).unwrap();
349 assert_eq!(got, "vox-fd-payload");
350 }
351
352 #[test]
353 fn missing_source_is_a_clean_error() {
354 let msg = Fd::new(OwnedFd::from(temp_file_with(b"x")));
355 let (bytes, _fds) = collect_fds(|| vox_postcard::to_vec(&msg).unwrap());
356 let r = std::panic::catch_unwind(|| vox_postcard::from_slice::<Fd>(&bytes));
357 assert!(
358 r.is_err() || r.unwrap().is_err(),
359 "decoding an Fd with no source must fail"
360 );
361 }
362 }
363}
364
365#[cfg(unix)]
366pub use unix::{Fd, FdAdapter, SCM_MAX_FD, collect_fds, provide_fds};